Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ By contributing, you agree that your contributions will be licensed under the sa
We sincerely thank the following contributors for their valuable contributions:
- [Ahmad Tarraf](https://github.com/a-tarraf)
- [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration
- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score
- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score
- [Tim Dieringer](https://github.com/Tim-Dieringer): bachelor thesis: Additional integration for Metric Proxy
128 changes: 128 additions & 0 deletions ftio/api/metric_proxy/proxy_zmq.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually used, or do you use the prediction server already implemented in FTIO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ftio/api/metric_proxy/proxy_zmq.py is a zmq server to specifically support what metric proxy needs, so it is used. It's probably possible to change proxy_zmq.py to rely on some of the already implemented functions instead of using a complete custom method but that would require refactoring.

Copy link
Member

@A-Tarraf A-Tarraf Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is fine. I was just wondering because we introduced a new class (prediction) and wanted to avoid dictionaries. But if there is no way to use the class, we can keep it this way. I would just suggest that you document these points alongside some instructions (e.g., call tree?) in a separate file for FTIO as described here: https://github.com/tuda-parallel/FTIO/blob/feature/pattern_change_detection/docs/students_contribute.md#-module-documentation-and-licensing. Also, maybe you can add some test cases so that we do not break these functionalities in the future with newer commits (https://github.com/tuda-parallel/FTIO/blob/feature/pattern_change_detection/docs/students_contribute.md#-module-documentation-and-licensing)

Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import math
import time
import numpy as np
import zmq
import msgpack
from rich.console import Console

from multiprocessing import Pool, cpu_count
from ftio.api.metric_proxy.parallel_proxy import execute, execute_parallel
from ftio.prediction.tasks import ftio_metric_task, ftio_metric_task_save

from ftio.api.metric_proxy.parse_proxy import filter_metrics
from ftio.freq.helper import MyConsole
import signal

CONSOLE = MyConsole()
CONSOLE.set(True)

CURRENT_ADDRESS = None
IDLE_TIMEOUT = 100
last_request = time.time()

def sanitize(obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {k: sanitize(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize(v) for v in obj]
return obj


def handle_request(msg: bytes) -> bytes:
"""Handle one FTIO request via ZMQ."""
global CURRENT_ADDRESS

if msg == b"ping":
return b"pong"

if msg.startswith(b"New Address: "):
new_address = msg[len(b"New Address: "):].decode()
CURRENT_ADDRESS = new_address
return b"Address updated"

try:
req = msgpack.unpackb(msg, raw=False)
argv = req.get("argv", [])
raw_metrics = req.get("metrics", [])

metrics = filter_metrics(raw_metrics, filter_deriv=False)
print(f"Processing {len(metrics)} metrics")

print(f"With Arguments: {argv}")
argv.extend(["-e", "no"])

disable_parallel = req.get("disable_parallel", False)

ranks = 32


except Exception as e:
return msgpack.packb({"error": f"Invalid request: {e}"}, use_bin_type=True)

try:
t = time.process_time()
if disable_parallel:
data = execute(metrics, argv, ranks, False)
else:
data = execute_parallel(metrics, argv, ranks)
elapsed_time = time.process_time() - t
CONSOLE.info(f"[blue]Calculation time: {elapsed_time} s[/]")

native_data = sanitize(list(data))

return msgpack.packb(native_data, use_bin_type=True)

except Exception as e:
print(f"Error during processing: {e}")
return msgpack.packb({"error": str(e)}, use_bin_type=True)


def main(address: str = "tcp://*:0"):
"""FTIO ZMQ Server entrypoint for Metric Proxy."""
global CURRENT_ADDRESS, last_request, POOL
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(address)
CURRENT_ADDRESS = address

signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)

endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode()
print(endpoint, flush=True)

console = Console()
console.print(f"[green]FTIO ZMQ Server listening on {endpoint}[/]")

try:
while True:
if socket.poll(timeout=1000):
msg = socket.recv()
console.print(f"[cyan]Received request ({len(msg)} bytes)[/]")
last_request = time.time()
reply = handle_request(msg)
socket.send(reply)

if reply == b"Address updated":
console.print(f"[yellow]Updated address to {CURRENT_ADDRESS}[/]")
socket.close()
socket = context.socket(zmq.REP)
socket.bind(CURRENT_ADDRESS)
else:
if time.time() - last_request > IDLE_TIMEOUT:
console.print("Idle timeout reached, shutting down server")
break
finally:
socket.close(linger=0)
context.term()


def shutdown_handler(signum, frame):
raise SystemExit



if __name__ == "__main__":
main()
34 changes: 31 additions & 3 deletions ftio/prediction/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# from ftio.prediction.helper import get_dominant
# from ftio.plot.freq_plot import convert_and_plot
from ftio.freq.helper import MyConsole
from ftio.freq.prediction import Prediction
from ftio.parse.args import parse_args
from ftio.plot.freq_plot import convert_and_plot
from ftio.processing.print_output import display_prediction
Expand Down Expand Up @@ -71,7 +72,33 @@ def ftio_metric_task_save(
) -> None:
prediction = ftio_metric_task(metric, arrays, argv, ranks, show)
# freq = get_dominant(prediction) #just get a single dominant value
if prediction:
names = []
if prediction.top_freqs:
freqs = prediction.top_freqs["freq"]
amps = prediction.top_freqs["amp"]
phis = prediction.top_freqs["phi"]

for f, a, p in zip(freqs, amps, phis):
names.append(prediction.get_wave_name(f, a, p))

data.append(
{
"metric": f"{metric}",
"dominant_freq": prediction.dominant_freq,
"conf": prediction.conf,
"amp": prediction.amp,
"phi": prediction.phi,
"t_start": prediction.t_start,
"t_end": prediction.t_end,
"total_bytes": prediction.total_bytes,
"ranks": prediction.ranks,
"freq": float(prediction.freq),
"top_freq": prediction.top_freqs,
"n_samples": prediction.n_samples,
"wave_names": names,
}
)
#if prediction:
# data.append(
# {
# "metric": f"{metric}",
Expand All @@ -87,7 +114,8 @@ def ftio_metric_task_save(
# "top_freq": prediction.top_freqs,
# }
# )
prediction.metric = metric
data.append(prediction)
# caused issues with msgpack serialization
#prediction.metric = metric
#data.append(prediction)
else:
CONSOLE.info(f"\n[yellow underline]Warning: {metric} returned {prediction}[/]")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ convert_trace = "ftio.util.convert_old_trace:main"
trace_ftio = "ftio.api.trace_analysis.trace_ftio_v2:main"
trace_analysis = "ftio.api.trace_analysis.trace_analysis:main"
admire_proxy_invoke_ftio = "ftio.api.metric_proxy.proxy_invoke_ftio:main"
admire_proxy_zmq = "ftio.api.metric_proxy.proxy_zmq:main"
jit_plot = "ftio.api.gekkoFs.jit.jit_plot:main"
server_ftio = "ftio.util.server_ftio:main_cli"

Expand Down