Skip to content

monitor.tracker

get_memory_information(dir_prefix='/dev/shm')

The get_memory_information function is a wrapper around the go tool pprof command. It takes in an optional argument, dir_prefix, which defaults to /dev/shm. The function then runs the go tool pprof command with arguments -tags and {dir_prefix}/memory.prof. The output of this command is captured and returned as a string.

Parameters:

Name Type Description Default
dir_prefix str

str: Specify the directory prefix for

'/dev/shm'

Returns:

Type Description
str

A string that contains the memory profile

Source code in src/fjformer/monitor/tracker.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_memory_information(dir_prefix: str = "/dev/shm") -> str:
    """
    The get_memory_information function is a wrapper around the go tool pprof command.
    It takes in an optional argument, dir_prefix, which defaults to /dev/shm.
    The function then runs the go tool pprof command with arguments -tags and {dir_prefix}/memory.prof.
    The output of this command is captured and returned as a string.

    :param dir_prefix: str: Specify the directory prefix for
    :return: A string that contains the memory profile

    """
    return subprocess.run(
        args=["go", "tool", "pprof", "-tags", f"{dir_prefix}/memory.prof"],
        stdout=subprocess.PIPE,
        stderr=subprocess.DEVNULL,
    ).stdout.decode("utf-8")

initialise_tracking(interval=1.0, dir_prefix='/dev/shm')

The initialise_tracking function starts a daemon thread that periodically saves the memory profile to disk. The outer function starts the daemon thread and returns a context manager that stops it when the context exits. The inner function uses posix.rename() to atomically replace an existing file, so we can be sure that any given memory profile was taken at some point during the lifetime of its context.

Parameters:

Name Type Description Default
interval float

float: Set the time interval between each memory profile

1.0
dir_prefix str

str: Specify the directory where the memory profile will be saved

'/dev/shm'

Returns:

Type Description
None

A thread object

Source code in src/fjformer/monitor/tracker.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def initialise_tracking(interval: float = 1., dir_prefix: str = "/dev/shm") -> None:
    """
    The initialise_tracking function starts a daemon thread that periodically saves the memory profile to disk.
    The outer function starts the daemon thread and returns a context manager that stops it when
    the context exits.  The inner function uses posix.rename() to atomically replace an existing file,
    so we can be sure that any given memory profile was taken at some point during the lifetime of its
    context.

    :param interval: float: Set the time interval between each memory profile
    :param dir_prefix: str: Specify the directory where the memory profile will be saved
    :return: A thread object

    """

    def inner():
        """
        The inner function is a daemon thread that periodically saves the memory profile to disk.
        The outer function starts the daemon thread and returns a context manager that stops it when
        the context exits.  The inner function uses posix.rename() to atomically replace an existing file,
        so we can be sure that any given memory profile was taken at some point during the lifetime of its
        context.

        :return: A thread object

        """
        while True:
            jax.profiler.save_device_memory_profile(f"{dir_prefix}/memory.prof.new")
            if posix is not None:
                os.rename(f"{dir_prefix}/memory.prof.new", f"{dir_prefix}/memory.prof")
            else:
                posix.rename(f"{dir_prefix}/memory.prof.new", f"{dir_prefix}/memory.prof")
            time.sleep(interval)

    thread = threading.Thread(target=inner, daemon=True)
    thread.start()

is_notebook()

Returns True if the code is being run in a notebook, False otherwise.

Source code in src/fjformer/monitor/tracker.py
18
19
20
def is_notebook():
    """Returns True if the code is being run in a notebook, False otherwise."""
    return os.environ.get("IPYTHON") is not None

run(note_book=None, interval=1, dir_prefix='/dev/shm', dpr=True)

The run function is a simple wrapper around the go tool pprof command. It runs the command every interval seconds and prints out its output to stdout. If you are running this in a Jupyter notebook, it will print to an IPython display object instead of stdout.

Parameters:

Name Type Description Default
note_book

Determine whether the program is running in a notebook or not

None
interval float

float: Specify the interval between each refresh

1
dir_prefix str

str: Specify the directory where the memory

'/dev/shm'
dpr

Determine whether to display the output in a notebook or not

True

Returns:

Type Description

The output of the pprof command

Source code in src/fjformer/monitor/tracker.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def run(note_book=None, interval: float = 1, dir_prefix: str = "/dev/shm", dpr=True):
    """
    The run function is a simple wrapper around the go tool pprof command.
    It runs the command every interval seconds and prints out its output to stdout.
    If you are running this in a Jupyter notebook, it will print to an IPython display object instead of stdout.


    :param note_book: Determine whether the program is running in a notebook or not
    :param interval: float: Specify the interval between each refresh
    :param dir_prefix: str: Specify the directory where the memory
    :param dpr: Determine whether to display the output in a notebook or not
    :return: The output of the pprof command

    """
    if note_book is None:
        note_book = is_notebook()
    std = curses.initscr() if not note_book else None
    try:
        while True:
            if not note_book and dpr:
                std.clear()
            output = subprocess.run(
                args=["go", "tool", "pprof", "-tags", f"{dir_prefix}/memory.prof"],
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
            ).stdout.decode("utf-8")
            if not note_book and dpr:
                std.addstr(output)
                std.refresh()
            if note_book and dpr:
                IPython.display.clear_output(True)
                print(output)

            with open(f"{dir_prefix}/memory.json", "w") as fin:
                json.dump({
                    "log": output
                }, fin)
            time.sleep(interval)
    except KeyboardInterrupt:
        curses.endwin()

threaded_log(interval=1.0, dir_prefix='/dev/shm', save_mem_json=False)

The threaded_log function is a wrapper around the get_memory_information function. It allows you to monitor your memory usage in real time, and optionally save it to a JSON file. The threaded_log function returns a threading.Thread object that can be started with .start() and stopped with .join().

Parameters:

Name Type Description Default
interval float

float: Set the time interval between each memory log

1.0
dir_prefix str

str: Specify the directory to save the memory

'/dev/shm'
save_mem_json bool

bool: Save the memory information to a json file

False

Returns:

Type Description
Thread

A threading

Source code in src/fjformer/monitor/tracker.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def threaded_log(interval: float = 1., dir_prefix: str = "/dev/shm", save_mem_json: bool = False) -> threading.Thread:
    """
    The threaded_log function is a wrapper around the get_memory_information function.
    It allows you to monitor your memory usage in real time, and optionally save it to a JSON file.
    The threaded_log function returns a threading.Thread object that can be started with .start() and stopped with .join().


    :param interval: float: Set the time interval between each memory log
    :param dir_prefix: str: Specify the directory to save the memory
    :param save_mem_json: bool: Save the memory information to a json file
    :return: A threading

    """
    note_book = is_notebook()

    def show_():

        std = curses.initscr() if not note_book else None
        try:
            while True:
                mem_info = get_memory_information()
                if not note_book:
                    std.clear()
                    std.addstr(mem_info)
                    std.refresh()
                if note_book:
                    IPython.display.clear_output(True)
                    print(mem_info)
                if save_mem_json:
                    with open(f"{dir_prefix}/memory.json", "w") as fin:
                        json.dump({
                            "log": mem_info
                        }, fin)
                time.sleep(interval)
        except KeyboardInterrupt:
            curses.endwin()

    thread = threading.Thread(
        target=show_
    )
    return thread