diff --git a/README.md b/README.md index f190095..a668ea9 100644 --- a/README.md +++ b/README.md @@ -271,7 +271,8 @@ Several flags can be specified. The most relevant settings are: | -d, --dtw | performs dynamic time warping on the top 3 frequencies (highest contribution) calculated using the DFT if set (default=False) | | -re, --reconstruction | plots reconstruction of top 10 signals on figure | | -np, --no-psd | if set, replace the power density spectrum (a*a/N) with the amplitude spectrum (a) | -| -c, --autocorrelation | if set, autocorrelation is calculated in addition to DFT. The results are merged to a single prediction at the end | +| -au, --autocorrelation | if set, autocorrelation is calculated in addition to DFT. The results are merged to a single prediction at the end | +| -p, --periodicity | Activate calculation of new periodicity score. Options are recurrence period density entropy (RPDE), spectral flatness (SF), correlation (corr) and individual period correlation (ind) | | -w, --window_adaptation | online time window adaptation. If set to true, the time window is shifted on X hits to X times the previous phases from the current instance. X corresponds to frequency_hits | | -fh FREQUENCY_HITS, --frequency_hits FREQUENCY_HITS | specifies the number of hits needed to adapt the time window. A hit occurs once a dominant frequency is found | | -v, --verbose | sets verbose on or off (default=False) | @@ -391,4 +392,4 @@ the [EuroHPC ADMIRE project](https://admire-eurohpc.eu/). [parallel.bedge]: https://img.shields.io/badge/Parallel_Programming:-Ahmad_Tarraf-blue - \ No newline at end of file + diff --git a/daf b/daf new file mode 100644 index 0000000..59524d6 --- /dev/null +++ b/daf @@ -0,0 +1,32 @@ +commit bc2c874a2166cbc0ab6740e83dbd6d1ea8f2d28c (HEAD -> feature_periodic_score, origin/feature_periodic_score) +Merge: f176334 34a108b +Author: AntonBeasis +Date: Tue Jun 3 11:13:19 2025 +0200 + + Merge completed + + create mode 100644 docs/contributing.md + create mode 100644 docs/ftio_server.md + create mode 100644 ftio/analysis/__init__.py + create mode 100644 ftio/analysis/_correlation.py + rename ftio/{freq => analysis}/_logicize.py (55%) + rename ftio/{freq => analysis}/anomaly_detection.py (80%) + create mode 100644 ftio/analysis/signal_analysis.py + create mode 100644 ftio/api/gekkoFs/data_control.py + create mode 100644 ftio/api/gekkoFs/file_queue.py + create mode 100644 ftio/api/gekkoFs/jit/find_procs.py + create mode 100644 ftio/api/gekkoFs/jit/plot_res.py + create mode 100644 ftio/api/gekkoFs/posix_control.py + create mode 100644 ftio/api/io_malleability/diff.py + create mode 100644 ftio/freq/_analysis_figures.py + create mode 100644 ftio/freq/_dft_x_dwt.py + create mode 100644 ftio/freq/_fourier_fit.py + create mode 100644 ftio/freq/_share_signal_data.py + create mode 100644 ftio/freq/prediction.py + create mode 100644 ftio/plot/plot_dft.py + delete mode 100644 ftio/prediction/analysis.py + create mode 100644 ftio/prediction/online_analysis.py + rename ftio/processing/{operations.py => compact_operations.py} (74%) + create mode 100644 ftio/processing/print_output.py + create mode 100644 ftio/util/live_plot.py + create mode 100644 ftio/util/server_ftio.py diff --git a/docs/contributing.md b/docs/contributing.md index 93c885c..89ce639 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -90,4 +90,5 @@ By contributing, you agree that your contributions will be licensed under the sa We sincerely thank the following contributors for their valuable contributions: - [Ahmad Tarraf](https://github.com/a-tarraf) -- [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration \ No newline at end of file +- [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration +- [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis: new periodicity score \ No newline at end of file diff --git a/examples/jupyter_notebooks/quick_example.ipynb b/examples/jupyter_notebooks/quick_example.ipynb index b3320b5..06957e9 100644 --- a/examples/jupyter_notebooks/quick_example.ipynb +++ b/examples/jupyter_notebooks/quick_example.ipynb @@ -18,10 +18,55 @@ "\n", "# Set up data\n", "## 1) overlap for rank level metrics\n", - "b_rank = [0.0,0.0,1000.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,0.0]\n", - "t_rank_s = [0.5,0.0,10.5,10.0,20.5,20.0,30.5,30.0,40.5,40.0,50.5,50.0,60.5,60]\n", - "t_rank_e = [5.0,4.5,15.0,14.5,25.0,24.5,35.0,34.5,45.0,44.5,55.0,54.5,65.0,64.5]\n", - "b,t = overlap(b_rank,t_rank_s, t_rank_e)\n", + "b_rank = [\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + "]\n", + "t_rank_s = [\n", + " 0.5,\n", + " 0.0,\n", + " 10.5,\n", + " 10.0,\n", + " 20.5,\n", + " 20.0,\n", + " 30.5,\n", + " 30.0,\n", + " 40.5,\n", + " 40.0,\n", + " 50.5,\n", + " 50.0,\n", + " 60.5,\n", + " 60,\n", + "]\n", + "t_rank_e = [\n", + " 5.0,\n", + " 4.5,\n", + " 15.0,\n", + " 14.5,\n", + " 25.0,\n", + " 24.5,\n", + " 35.0,\n", + " 34.5,\n", + " 45.0,\n", + " 44.5,\n", + " 55.0,\n", + " 54.5,\n", + " 65.0,\n", + " 64.5,\n", + "]\n", + "b, t = overlap(b_rank, t_rank_s, t_rank_e)\n", "\n", "# ## 2) or directly specify the app level metrics\n", "# t = [10.0, 20.1, 30.0, 40.2, 50.3, 60, 70, 80.0,]\n", @@ -33,19 +78,19 @@ "\n", "# set up data\n", "data = {\n", - " \"time\": np.array(t),\n", - " \"bandwidth\": np.array(b),\n", - " \"total_bytes\": total_bytes,\n", - " \"ranks\": ranks \n", - " }\n", + " \"time\": np.array(t),\n", + " \"bandwidth\": np.array(b),\n", + " \"total_bytes\": total_bytes,\n", + " \"ranks\": ranks,\n", + "}\n", "\n", - "#parse args\n", - "args = parse_args(argv,\"ftio\")\n", + "# parse args\n", + "args = parse_args(argv, \"ftio\")\n", "\n", "# perform prediction\n", "prediction, analysis_figures = core(data, args)\n", "\n", - "#display prediction\n", + "# display prediction\n", "display_prediction(\"ftio\", prediction)" ] }, @@ -76,29 +121,74 @@ "\n", "# Set up data\n", "## 1) overlap for rank level metrics\n", - "b_rank = [0.0,0.0,1000.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,0.0,1000.0,1000.0,0.0,0.0]\n", - "t_rank_s = [0.5,0.0,10.5,10.0,20.5,20.0,30.5,30.0,40.5,40.0,50.5,50.0,60.5,60]\n", - "t_rank_e = [5.0,4.5,15.0,14.5,25.0,24.5,35.0,34.5,45.0,44.5,55.0,54.5,65.0,64.5]\n", - "b,t = overlap(b_rank,t_rank_s, t_rank_e)\n", + "b_rank = [\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + " 1000.0,\n", + " 1000.0,\n", + " 0.0,\n", + " 0.0,\n", + "]\n", + "t_rank_s = [\n", + " 0.5,\n", + " 0.0,\n", + " 10.5,\n", + " 10.0,\n", + " 20.5,\n", + " 20.0,\n", + " 30.5,\n", + " 30.0,\n", + " 40.5,\n", + " 40.0,\n", + " 50.5,\n", + " 50.0,\n", + " 60.5,\n", + " 60,\n", + "]\n", + "t_rank_e = [\n", + " 5.0,\n", + " 4.5,\n", + " 15.0,\n", + " 14.5,\n", + " 25.0,\n", + " 24.5,\n", + " 35.0,\n", + " 34.5,\n", + " 45.0,\n", + " 44.5,\n", + " 55.0,\n", + " 54.5,\n", + " 65.0,\n", + " 64.5,\n", + "]\n", + "b, t = overlap(b_rank, t_rank_s, t_rank_e)\n", "\n", "# command line arguments\n", "argv = [\"-e\", \"plotly\"]\n", "\n", "# set up data\n", "data = {\n", - " \"time\": np.array(t),\n", - " \"bandwidth\": np.array(b),\n", - " \"total_bytes\": total_bytes,\n", - " \"ranks\": ranks \n", - " }\n", + " \"time\": np.array(t),\n", + " \"bandwidth\": np.array(b),\n", + " \"total_bytes\": total_bytes,\n", + " \"ranks\": ranks,\n", + "}\n", "\n", - "#parse args\n", - "args = parse_args(argv,\"ftio\")\n", + "# parse args\n", + "args = parse_args(argv, \"ftio\")\n", "\n", "# perform prediction\n", "prediction, analysis_figures = core(data, args)\n", "\n", - "#display prediction\n", + "# display prediction\n", "display_prediction(\"ftio\", prediction)\n", "analysis_figures.show()" ] diff --git a/ftio/analysis/anomaly_detection.py b/ftio/analysis/anomaly_detection.py index 2a9a790..bbb0ba2 100644 --- a/ftio/analysis/anomaly_detection.py +++ b/ftio/analysis/anomaly_detection.py @@ -30,6 +30,7 @@ from sklearn.neighbors import LocalOutlierFactor, NearestNeighbors from ftio.plot.anomaly_plot import plot_decision_boundaries, plot_outliers +from ftio.plot.cepstrum_plot import plot_cepstrum def outlier_detection( @@ -164,6 +165,8 @@ def z_score( if len(dominant_index) != 0: i[np.array(dominant_index) - 1] = -1 plot_outliers(args, freq_arr, amp, indices, conf, i) + if args.cepstrum: + plot_cepstrum(args, freq_arr, amp, indices) return dominant_index, conf, text diff --git a/ftio/analysis/periodicity_analysis.py b/ftio/analysis/periodicity_analysis.py new file mode 100644 index 0000000..2c45ec9 --- /dev/null +++ b/ftio/analysis/periodicity_analysis.py @@ -0,0 +1,422 @@ +""" +This file provides functions for periodicity analysis in frequency data using various methods, +including Recurrence Period Density Entropy, Spectral Flatness, Correlation, Correlation of Individual Periods, and Peak Sharpness. + +Author: Anton Holderied +Copyright (c) 2025 TU Darmstadt, Germany +Date: Jul 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +from __future__ import annotations + +from concurrent.futures import ThreadPoolExecutor, as_completed + +import matplotlib.pyplot as plt + +# all +import numpy as np +from kneed import KneeLocator +from rich.panel import Panel + +# find_peaks +from scipy.signal import find_peaks +from scipy.stats import kurtosis +from sklearn.cluster import DBSCAN + +# Isolation forest +from sklearn.ensemble import IsolationForest + +# Lof +from sklearn.neighbors import KDTree, LocalOutlierFactor, NearestNeighbors + +from ftio.analysis._correlation import correlation +from ftio.plot.anomaly_plot import plot_decision_boundaries, plot_outliers +from ftio.plot.cepstrum_plot import plot_cepstrum + + +def new_periodicity_scores( + amp: np.ndarray, + signal: np.ndarray, + prediction, + args, + peak_window_half_width: int = 3, +) -> str: + + def compute_peak_sharpness( + spectrum_amp: np.ndarray, + peak_indices_in_spectrum: np.ndarray, + window_half_width: int, + ) -> float: + """ + Computes the average peak sharpness (excess kurtosis) for specified peak regions. + Higher values indicate more peaked regions. + """ + sharpness_scores = [] + for peak_idx in peak_indices_in_spectrum: + start_idx = max(0, peak_idx - window_half_width) + end_idx = min(len(spectrum_amp), peak_idx + window_half_width + 1) + peak_window_spectrum = spectrum_amp[start_idx:end_idx] + if peak_window_spectrum.size < 4: + sharpness_scores.append(-2.0) + continue + if np.std(peak_window_spectrum) < 1e-9: + sharpness_scores.append(-2.0) + continue + k = kurtosis(peak_window_spectrum, fisher=True, bias=False) + sharpness_scores.append(k) + if not sharpness_scores: + return -2.0 + + return float(np.mean(sharpness_scores)) + + def compute_rpde(freq_arr: np.ndarray) -> float: + safe_array = np.where(freq_arr <= 0, 1e-12, freq_arr) + sum = np.sum(safe_array * np.log(safe_array)) + max = np.log(safe_array.size) + rpde_score = 1 - (np.divide(sum, -max)) + return rpde_score + + def compute_spectral_flatness(freq_arr: np.ndarray) -> float: + safe_spectrum = np.where(freq_arr <= 0, 1e-12, freq_arr) + geometric_mean = np.exp(np.mean(np.log(safe_spectrum))) + arithmetic_mean = np.mean(safe_spectrum) + spectral_flatness_score = 1 - float((geometric_mean / arithmetic_mean)) + return spectral_flatness_score + + def signal_correlation( + freq: float, + sampling_rate: float, + signal: np.ndarray, + phi: float, + start_time: float, + text: str = "", + ) -> float: + dt = 1 / sampling_rate + t = start_time + dt * np.arange(len(signal)) + waveform = np.cos(2 * np.pi * freq * t + phi) + + correlation_result = correlation(waveform, signal) + return correlation_result + + def ind_period_correlation( + freq: float, + sampling_rate: float, + signal: np.ndarray, + phi: float, + start_time: float, + text: str = "", + ) -> tuple[float, str]: + """ + Calculates a length-weighted correlation for a signal. + + The function computes a weighted average of correlations from each period. + The weighting is based solely on how complete the period is, giving + less weight to partial periods at the beginning or end of the signal. + """ + dt = 1 / sampling_rate + t = start_time + dt * np.arange(len(signal)) + waveform = np.cos(2 * np.pi * freq * t + phi) + + period = 1 / freq + phase_offset = phi / (2 * np.pi * freq) + + correlations = [] + segment_lengths = [] + + i = 0 + while True: + center_time = (i * period) - start_time - phase_offset + begin = round((center_time - 0.5 * period) * sampling_rate) + end = round((center_time + 0.5 * period) * sampling_rate) + + if end < 1: + i += 1 + continue + + begin_clamped = max(begin, 0) + end_clamped = min(end, len(signal)) + + if begin_clamped < end_clamped: + correlation_result = correlation( + waveform[begin_clamped:end_clamped], signal[begin_clamped:end_clamped] + ) + correlations.append(correlation_result) + segment_lengths.append(end_clamped - begin_clamped) + + text += ( + f"[green]Period {i:3d} (indices {begin_clamped:5d}:{end_clamped:5d}): " + f"[black]Correlation = {correlation_result:.4f}\n" + ) + + if end >= len(signal) - 1: + break + i += 1 + + if not correlations: + text += "[red]No full periods were found in the signal.\n" + return 0.0, text + + full_period_samples = round(period * sampling_rate) + + # The weights are now just the ratio of the segment length to a full period length. + length_weights = [sl / full_period_samples for sl in segment_lengths] + + for i, weight in enumerate(length_weights): + text += ( + f"[blue]Weight for period {i:3d} (completeness): [black]{weight:.4f}\n" + ) + + # Calculate the weighted average using only the length weights. + weighted_correlation = np.average(correlations, weights=length_weights) + + text += f"\n[bold magenta]Final Length-Weighted Correlation: {weighted_correlation:.4f}[/bold magenta]\n" + + return weighted_correlation, text + + def weighted_ind_period_correlation( + freq: float, + sampling_rate: float, + signal: np.ndarray, + phi: float, + start_time: float, + text: str = "", + ) -> tuple[float, str]: + """ + The function iterates through the signal period by period, calculating the + correlation for each. It then computes a final weighted average of these + correlations. + + The weighting for each period is determined by two factors: + 1. **Length Weight**: The proportion of the period that is actually + present in the signal (especially relevant for start/end periods). + 2. **Recency Weight**: A linear ramp from 0.3 for the first period to + 1.0 for the last period, giving more importance to recent data. + """ + if freq == 0.0: + return 0.0, "" + + dt = 1 / sampling_rate + t = start_time + dt * np.arange(len(signal)) + waveform = np.cos(2 * np.pi * freq * t + phi) + + period = 1 / freq + phase_offset = phi / (2 * np.pi * freq) + + # --- Store individual results to be weighted later --- + correlations = [] + segment_lengths = [] + + i = 0 + while True: + center_time = (i * period) - start_time - phase_offset + begin = round((center_time - 0.5 * period) * sampling_rate) + end = round((center_time + 0.5 * period) * sampling_rate) + + if end < 1: + i += 1 + continue + + # Clamp indices to be within the signal bounds + begin_clamped = max(begin, 0) + end_clamped = min(end, len(signal)) # Use len(signal) for slicing + + # Calculate correlation for the valid segment + if begin_clamped < end_clamped: + correlation_result = correlation( + waveform[begin_clamped:end_clamped], signal[begin_clamped:end_clamped] + ) + correlations.append(correlation_result) + segment_lengths.append(end_clamped - begin_clamped) + + text += ( + f"[green]Period {i:3d} (indices {begin_clamped:5d}:{end_clamped:5d}): " + f"[black]Correlation = {correlation_result:.4f}\n" + ) + + if end >= len(signal) - 1: + break + i += 1 + + # --- Perform the weighting if any periods were found --- + if not correlations: + text += "[red]No full periods were found in the signal.\n" + return 0.0, text + + num_periods = len(correlations) + full_period_samples = round(period * sampling_rate) + + # 1. Recency weights: linear ramp from 0.3 to 1.0 + recency_weights = np.linspace(0.3, 1.0, num=num_periods) + + # 2. Combine with length weights and calculate the final weighted average + final_weights = [] + for i in range(num_periods): + # Length weight: actual segment length / full period length + length_weight = segment_lengths[i] / full_period_samples + # Final weight is the product of both factors + combined_weight = recency_weights[i] * length_weight + final_weights.append(combined_weight) + + text += ( + f"[blue]Weight for period {i:3d}: " + f"[black]Recency={recency_weights[i]:.2f}, Length={length_weight:.2f} -> " + f"Combined={combined_weight:.4f}\n" + ) + + # np.average handles the sum(value * weight) / sum(weight) calculation + weighted_correlation = np.average(correlations, weights=final_weights) + + text += f"\n[magenta]Final Weighted Correlation: {weighted_correlation:.4f}[/]\n" + + return weighted_correlation, text + + def parallel_period_correlation( + freq: float, + sampling_rate: float, + signal: np.ndarray, + phi: float, + start_time: float, + workers: int, + text: str = "", + ) -> str: + dt = 1 / sampling_rate + t = start_time + dt * np.arange(len(signal)) + waveform = np.cos(2 * np.pi * freq * t + phi) + + period = 1 / freq + phase_offset = phi / (2 * np.pi * freq) + + # Precompute all (begin, end, i) intervals + intervals = [] + i = 0 + while True: + center_time = (i * period) - start_time - phase_offset + begin = round((center_time - 0.5 * period) * sampling_rate) + end = round((center_time + 0.5 * period) * sampling_rate) + if end < 1: + i += 1 + continue + begin = max(begin, 0) + end = min(end, len(signal) - 1) + intervals.append((begin, end, i)) + if end >= len(signal) - 1: + break + i += 1 + + def compute_correlation(args): + b, e, idx = args + corr_res = correlation(waveform[b:e], signal[b:e]) + return idx, b, e, corr_res + + results = [] + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [ + executor.submit(compute_correlation, interval) for interval in intervals + ] + for future in as_completed(futures): + results.append(future.result()) + + # Sort results by period index to keep order + results.sort(key=lambda x: x[0]) + + for idx, b, e, corr_res in results: + text += ( + f"[green]Correlation for period {idx:3d} with indices {b:5d},{e:5d}: " + f"[black]{corr_res:.4f} \n" + ) + return text + + text = "" + if args.periodicity_detection: + if args.psd: + amp = amp * amp / len(amp) + # text = "[green]Spectrum[/]: Power spectrum\n" + indices = np.arange(1, int(len(amp) / 2) + 1) + amp_tmp = np.array(2 * amp[indices]) + # norm the data + amp_tmp = amp_tmp / amp_tmp.sum() if amp_tmp.sum() > 0 else amp_tmp + + if "rpde" in args.periodicity_detection.lower(): + rpde_score = compute_rpde(amp_tmp) + text += f"\n[green]RPDE score: [black]{rpde_score:.4f}[/]\n" + prediction.periodicity = rpde_score + if args.n_freq > 0: + prediction.top_freqs["periodicity"] = np.full( + len(prediction.top_freqs["freq"]), rpde_score + ) + if "sf" in args.periodicity_detection.lower(): + sf_score = compute_spectral_flatness(amp_tmp) + text += f"\n[green]Spectral flatness score: [black]{sf_score:.4f}[/]\n" + prediction.periodicity = sf_score + if args.n_freq > 0: + prediction.top_freqs["periodicity"] = np.full( + len(prediction.top_freqs["freq"]), sf_score + ) + if ( + "corr" in args.periodicity_detection.lower() + and len(prediction.dominant_freq) != 0 + ): + sampling_freq = prediction.freq + start_time = prediction.t_start + if args.n_freq > 0: + for i, (dominant_freq, phi) in enumerate( + zip(prediction.top_freqs["freq"], prediction.top_freqs["phi"]) + ): + score = signal_correlation( + dominant_freq, sampling_freq, signal, phi, start_time + ) + prediction.top_freqs["periodicity"][i] = score + text += ( + f"[green]Overall correlation score for frequency {dominant_freq:.4f}Hz: " + f"[black]{score:.4f}[/]\n" + ) + prediction.periodicity = prediction.top_freqs["periodicity"][0] + else: + freq, phi = prediction.dominant_freq[0], prediction.phi[0] + score = signal_correlation(freq, sampling_freq, signal, phi, start_time) + prediction.periodicity = score + text += ( + f"[green]Overall correlation score for frequency {freq:.4f}Hz: " + f"[black]{score:.4f}[/]\n" + ) + if ( + "ind" in args.periodicity_detection.lower() + and len(prediction.dominant_freq) != 0 + ): + sampling_freq = prediction.freq + start_time = prediction.t_start + if args.n_freq > 0: + for i, (dominant_freq, phi) in enumerate( + zip(prediction.top_freqs["freq"], prediction.top_freqs["phi"]) + ): + print(dominant_freq) + score, new_text = weighted_ind_period_correlation( + dominant_freq, sampling_freq, signal, phi, start_time + ) + prediction.top_freqs["periodicity"][i] = score + text += f"[green]Individual correlation score for frequency {dominant_freq:.4f}Hz:[/]\n" + text += new_text + prediction.periodicity = prediction.top_freqs["periodicity"][0] + else: + freq, phi = prediction.dominant_freq[0], prediction.phi[0] + score, new_text = weighted_ind_period_correlation( + freq, sampling_freq, signal, phi, start_time + ) + prediction.periodicity = score + text += f"[green]Individual correlation score for frequency {freq:.4f}Hz:[/]\n" + text += new_text + + title = "Periodicity Analysis" + + text = Panel.fit( + text[:-1], + style="white", + border_style="green", + title=title, + title_align="left", + ) + return text diff --git a/ftio/api/gekkoFs/jit/jitsettings.py b/ftio/api/gekkoFs/jit/jitsettings.py index 0a0ca8e..1249c84 100644 --- a/ftio/api/gekkoFs/jit/jitsettings.py +++ b/ftio/api/gekkoFs/jit/jitsettings.py @@ -590,12 +590,15 @@ def set_variables(self) -> None: pass if "dlio" in self.app: - self.stage_in_path = "/d/github/dlio_benchmark/data" # generate data with + # self.stage_in_path = "/d/github/dlio_benchmark/data" + workload = " workload=resnet50_small_a100_pytorch " if self.exclude_daemon: self.app_flags = ( f"{workload} " - f"++workload.workflow.generate_data=False ++workload.workflow.train=True ++workload.workflow.checkpoint=True " # ++workload.workflow.evaluation=True " + f"++workload.workflow.generate_data=False " + # f"++workload.workflow.generate_data=True " + f"++workload.workflow.train=True ++workload.workflow.checkpoint=True " f"++workload.dataset.data_folder={self.run_dir}/data/jit ++workload.checkpoint.checkpoint_folder={self.run_dir}/checkpoints/jit " f"++workload.output.output_folder={self.run_dir}/hydra_log/jit " ) @@ -610,20 +613,20 @@ def set_variables(self) -> None: else: self.app_flags = ( f"{workload} " - f"++workload.workflow.generate_data=False ++workload.workflow.train=True ++workload.workflow.checkpoint=False " # ++workload.workflow.evaluation=True " + f"++workload.workflow.generate_data=True ++workload.workflow.train=True ++workload.workflow.checkpoint=True " # ++workload.workflow.evaluation=True " f"++workload.dataset.data_folder={self.gkfs_mntdir}/data/jit ++workload.checkpoint.checkpoint_folder={self.gkfs_mntdir}/checkpoints/jit " f"++workload.output.output_folder={self.gkfs_mntdir}/hydra_log/jit " ) - self.pre_app_call = [ - # f"cd {self.gkfs_mntdir}", - ( - f"mpirun -np 8 dlio_benchmark " - f"{workload} " - f"++workload.workflow.generate_data=True ++workload.workflow.train=False ++workload.workflow.checkpoint=True " # ++workload.workflow.evaluation=True " - f"++workload.dataset.data_folder={self.gkfs_mntdir}/data/jit ++workload.checkpoint.checkpoint_folder={self.gkfs_mntdir}/checkpoints/jit " - f"++workload.output.output_folder={self.gkfs_mntdir}/hydra_log/jit " - ) - ] + # self.pre_app_call = [ + # # f"cd {self.gkfs_mntdir}", + # ( + # f"mpirun -np 8 dlio_benchmark " + # f"{workload} " + # f"++workload.workflow.generate_data=True ++workload.workflow.train=False ++workload.workflow.checkpoint=True " # ++workload.workflow.evaluation=True " + # f"++workload.dataset.data_folder={self.gkfs_mntdir}/data/jit ++workload.checkpoint.checkpoint_folder={self.gkfs_mntdir}/checkpoints/jit " + # f"++workload.output.output_folder={self.gkfs_mntdir}/hydra_log/jit " + # ) + # ] self.post_app_call = "" # ├─ Nek5000 elif "nek" in self.app: diff --git a/ftio/api/gekkoFs/jit/setup_core.py b/ftio/api/gekkoFs/jit/setup_core.py index 8963a71..886a957 100644 --- a/ftio/api/gekkoFs/jit/setup_core.py +++ b/ftio/api/gekkoFs/jit/setup_core.py @@ -643,7 +643,7 @@ def start_application(settings: JitSettings, runtime: JitTime): additional_arguments += get_env(settings, "mpi") call = ( - f" time mpiexec -np {settings.procs_app} --oversubscribe " + f" time mpiexec -np {int(settings.procs_app)} --oversubscribe " f"{additional_arguments} {settings.app_call} {settings.app_flags}" ) diff --git a/ftio/freq/_dft_workflow.py b/ftio/freq/_dft_workflow.py index 570254d..e9a5545 100644 --- a/ftio/freq/_dft_workflow.py +++ b/ftio/freq/_dft_workflow.py @@ -8,6 +8,7 @@ from rich.panel import Panel from ftio.analysis.anomaly_detection import outlier_detection +from ftio.analysis.periodicity_analysis import new_periodicity_scores from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft import dft from ftio.freq._filter import filter_signal @@ -63,7 +64,14 @@ def ftio_dft( #! Perform DFT tik = time.time() - console.print(f"[cyan]Executing:[/] {args.transformation.upper()} + {args.outlier}\n") + if args.periodicity_detection: + console.print( + f"[cyan]Executing:[/] {args.transformation.upper()} + {args.outlier}\n" + ) + else: + console.print( + f"[cyan]Executing:[/] {args.transformation.upper()} + {args.outlier} + {args.periodicity_detection}\n" + ) n = len(b_sampled) frequencies = args.freq * np.arange(0, n) / n X = dft(b_sampled) @@ -90,6 +98,8 @@ def ftio_dft( #! Assign data prediction.dominant_freq = frequencies[dominant_index] prediction.conf = conf[dominant_index] + if args.periodicity_detection is not None: + prediction.periodicity = conf[dominant_index] prediction.amp = amp[dominant_index] prediction.phi = phi[dominant_index] prediction.t_start = time_stamps[0] @@ -107,10 +117,13 @@ def ftio_dft( prediction.top_freqs = { "freq": frequencies[top_candidates[0:n_freq]], "conf": conf[top_candidates[0:n_freq]], + "periodicity": conf[top_candidates[0:n_freq]], "amp": amp[top_candidates[0:n_freq]], "phi": phi[top_candidates[0:n_freq]], } + periodicity_score = new_periodicity_scores(amp, b_sampled, prediction, args) + t_sampled = time_stamps[0] + np.arange(0, n) * 1 / args.freq #! Fourier fit if set if args.fourier_fit: @@ -140,7 +153,7 @@ def ftio_dft( precision_text = "" # precision_text = precision_dft(amp, phi, dominant_index, b_sampled, t_sampled, frequencies, args.engine) - text = Group(text, outlier_text, precision_text[:-1]) + text = Group(text, outlier_text, periodicity_score, precision_text[:-1]) console.print( Panel.fit( diff --git a/ftio/freq/prediction.py b/ftio/freq/prediction.py index 7e8d31b..c8968ef 100644 --- a/ftio/freq/prediction.py +++ b/ftio/freq/prediction.py @@ -9,6 +9,7 @@ class Prediction: source (str): The name or type of transformation used to generate predictions. dominant_freq (np.ndarray): Array of dominant frequencies identified in the data. conf (np.ndarray): Confidence values associated with each dominant frequency. + periodicity (np.ndarray): Periodicity score results for each frequency. amp (np.ndarray): Amplitudes corresponding to each dominant frequency. phi (np.ndarray): Phase angles for each dominant frequency. t_start (float): Start time index or timestamp for the prediction interval. @@ -35,6 +36,7 @@ def __init__( self._source = transformation self._dominant_freq = np.array([]) self._conf = np.array([]) + self._periodicity = np.array([]) self._amp = np.array([]) self._phi = np.array([]) self._t_start = t_start @@ -89,6 +91,23 @@ def conf(self, value): ) self._conf = value + @property + def periodicity(self): + return self._periodicity + + @periodicity.setter + def periodicity(self, value): + # same logic as dominant_freq + if np.isscalar(value): + value = np.array([value]) + elif isinstance(value, list): + value = np.array(value) + if not isinstance(value, np.ndarray): + raise TypeError( + "periodicity must be a numpy ndarray, list convertible to ndarray, or a numeric scalar" + ) + self._periodicity = value + @property def amp(self): return self._amp @@ -271,6 +290,14 @@ def get_dominant_freq_and_conf(self) -> tuple[float, float]: out_conf = self._conf[dominant_index] return out_freq, out_conf + def get_periodicity(self) -> float: + out_periodicity = np.nan + if len(self._periodicity) > 0: + dominant_index = self.get_dominant_index() + if dominant_index is not None: + out_periodicity = self._periodicity[dominant_index] + return out_periodicity + def get_dominant_freq(self) -> float: """ Return the dominant frequency @@ -409,6 +436,12 @@ def disp_dominant_freq_and_conf(self) -> str: f"[cyan]Confidence:[/] {color_pred(c_d)}" f"{np.round(c_d * 100, 2)}[/] %\n" ) + periodicity = self.get_periodicity() + if not np.isnan(periodicity): + text += ( + f"[cyan]Periodicity:[/] {color_pred(periodicity)}" + f"{np.round(periodicity * 100, 2)}[/] %\n" + ) else: text = ( "[cyan underline]Prediction results:[/]\n" @@ -439,6 +472,7 @@ def to_dict(self): "source": self._source, "dominant_freq": self._dominant_freq, "conf": self._conf, + "periodicity": self._periodicity, "amp": self._amp, "phi": self._phi, "t_start": self.t_start, diff --git a/ftio/parse/args.py b/ftio/parse/args.py index ed915a2..adf86af 100644 --- a/ftio/parse/args.py +++ b/ftio/parse/args.py @@ -159,6 +159,20 @@ def parse_args(argv: list, name="") -> argparse.Namespace: help="outlier detection method: Z-score (default), DB-Scan, Isolation_forest, LOF, find_peaks (from sci-pi)", ) parser.set_defaults(outlier="Z-score") + parser.add_argument( + "-p", + "--periodicity_detection", + choices=["rpde", "sf", "corr", "ind"], + type=str, + help="periodicity detection method after outlier detection: RPDE, Spectral flatness, Correlation, Correlation for individual periods. Default: none", + ) + parser.set_defaults(periodicity_detection=None) + parser.add_argument( + "-ce", + "--cepstrum", + action="store_true", + help="enable Cepstrum plotting for the DFT", + ) parser.add_argument( "-le", "--level", @@ -213,7 +227,7 @@ def parse_args(argv: list, name="") -> argparse.Namespace: ) parser.set_defaults(fourier_fit=False) parser.add_argument( - "-c", + "-au", "--autocorrelation", dest="autocorrelation", action="store_true", diff --git a/ftio/plot/cepstrum_plot.py b/ftio/plot/cepstrum_plot.py new file mode 100644 index 0000000..91a5fc2 --- /dev/null +++ b/ftio/plot/cepstrum_plot.py @@ -0,0 +1,97 @@ +"""Outlier plot function""" + +from __future__ import annotations + +import matplotlib.pyplot as plt +import numpy as np +import plotly.express as px +import plotly.graph_objects as go +from sklearn.inspection import DecisionBoundaryDisplay + +from ftio.freq.freq_html import create_html +from ftio.plot.helper import format_plot +from ftio.plot.spectrum import plot_both_spectrums + + +# ?################################# +# ? Plot outliers +# ?################################# +def plot_cepstrum( + args, + freq_arr: np.ndarray, + amp: np.ndarray, + indecies: np.ndarray, +) -> None: + """Plots outliers for DB scan + + Args: + freq_arr (np.ndarray): _description_ + amp (np.ndarray): aplitude or power + indecies (np.ndarray): _description_ + conf (np.ndarray): _description_ + dominant_index (np.ndarray): _description_ + d (np.ndarray, optional): _description_. Defaults to np.array([]). + """ + colorscale = [(0, "rgb(0,50,150)"), (0.5, "rgb(150,50,150)"), (1, "rgb(255,50,0)")] + # prepare figures + figs = [] + for i in np.arange(0, 4): + f = go.Figure() + f = format_plot(f) + figs.append(f) + + all_colors = [ + px.colors.qualitative.Alphabet[0], + px.colors.qualitative.Plotly[0], + ] + px.colors.qualitative.Plotly[2:] + all_colors = np.array( + all_colors + + px.colors.qualitative.Alphabet + + px.colors.qualitative.Dark24 + + [px.colors.qualitative.Plotly[1]] + ) + + counter = -1 + spec_figs, plt_names = plot_both_spectrums(args, freq_arr, amp, full=False) + for trace in list(spec_figs.select_traces()): + counter += 1 + trace.update(marker={"coloraxis": "coloraxis"}) + figs[counter].add_trace(trace) + if figs[counter].data and "xaxis" in figs[counter].data[0]: + figs[counter].data[0]["xaxis"] = "x1" + figs[counter].data[0]["yaxis"] = "y1" + + for i in np.arange(0, 2): + figs[i].update_layout(coloraxis={"colorscale": colorscale}) + + power_spectrum = amp * amp + powerlog = np.log(power_spectrum + 1e-10) + freq_arr_qf = freq_arr * ((len(freq_arr) / 20) / 5) + cepstrum = np.abs(np.fft.fft(powerlog).real) + + spec_figs, plt_names = plot_both_spectrums(args, freq_arr_qf, cepstrum, full=False) + for trace in list(spec_figs.select_traces()): + counter += 1 + trace.update(marker={"coloraxis": "coloraxis"}) + figs[counter].add_trace(trace) + if figs[counter].data and "xaxis" in figs[counter].data[0]: + figs[counter].data[0]["xaxis"] = "x1" + figs[counter].data[0]["yaxis"] = "y1" + + for i in np.arange(2, 4): + figs[i].update_layout(coloraxis={"colorscale": colorscale}) + + y_title = "Power" if args.psd else "Amplitude" + figs[0].update_xaxes(title_text="Frequency (Hz)") + figs[0].update_yaxes(title_text=plt_names[0]) + figs[1].update_xaxes(title_text="Frequency (Hz)") + figs[1].update_yaxes(title_text=plt_names[1]) + figs[2].update_xaxes(title_text="Quefrequency (s)") + figs[2].update_yaxes(title_text=plt_names[0]) + figs[3].update_xaxes(title_text="Quefrequency (s)") + figs[3].update_yaxes(title_text=plt_names[1]) + + for fig in figs: + fig.update_layout(width=1300, height=400) + configuration = {"toImageButtonOptions": {"format": "png", "scale": 4}} + create_html(figs, args.render, configuration, "cepstrum") diff --git a/ftio/processing/print_output.py b/ftio/processing/print_output.py index 4f5a257..217c77e 100644 --- a/ftio/processing/print_output.py +++ b/ftio/processing/print_output.py @@ -50,27 +50,62 @@ def display_prediction( * 100, 2, ) + if prediction.periodicity is not None and prediction.periodicity.size > 0: + periodicity_array = np.round( + np.where( + np.isinf(prediction.top_freqs["periodicity"]), + 1, + prediction.top_freqs["periodicity"], + ) + * 100, + 2, + ) + else: + periodicity_array = np.array([]) amp_array = prediction.top_freqs["amp"] phi_array = prediction.top_freqs["phi"] table = Table(show_header=True, header_style="bold cyan") - table.add_column("Freq (Hz)", justify="right", style="white", no_wrap=True) - table.add_column("Conf. (%)", justify="right", style="white", no_wrap=True) - table.add_column("Amplitude", justify="right", style="white", no_wrap=True) - table.add_column("Phi", justify="right", style="white", no_wrap=True) - table.add_column("Cosine Wave", justify="right", style="white", no_wrap=True) + + columns = [ + ("Freq (Hz)", "right"), + ("Conf. (%)", "right"), + ] + + if len(periodicity_array) > 0: + columns.append(("Periodicity (%)", "right")) + + columns.extend( + [ + ("Amplitude", "right"), + ("Phi", "right"), + ("Cosine Wave", "right"), + ] + ) + + for col_name, justify in columns: + table.add_column(col_name, justify=justify, style="white", no_wrap=True) description = "" # Add frequency data for i, freq in enumerate(freq_array): wave_name = prediction.get_wave_name(freq, amp_array[i], phi_array[i]) - table.add_row( + row = [ f"{freq:.3e}", f"{conf_array[i]:.2f}", - f"{amp_array[i]:.3e}", - f"{phi_array[i]:.3e}", - wave_name, + ] + + if len(periodicity_array) > 0: + row.append(f"{periodicity_array[i]:.2f}") + + row.extend( + [ + f"{amp_array[i]:.3e}", + f"{phi_array[i]:.3e}", + wave_name, + ] ) + table.add_row(*row) if i == 0: description = wave_name else: diff --git a/test/test_ftio.py b/test/test_ftio.py index 3bff4e7..8f66687 100644 --- a/test/test_ftio.py +++ b/test/test_ftio.py @@ -18,7 +18,7 @@ def test_ftio_core_no_input(): def test_ftio_core_no_input_autocorrelation(): """Test the core functionality of ftio with no input and autocorrelation.""" - args = parse_args(["-e", "no", "-c"], "ftio") + args = parse_args(["-e", "no", "-au"], "ftio") _ = core({}, args) assert True @@ -39,7 +39,7 @@ def test_ftio_core(): def test_ftio_core_autocorrelation(): """Test the core functionality of ftio with autocorrelation.""" file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - args = ["ftio", file, "-e", "no", "-c"] + args = ["ftio", file, "-e", "no", "-au"] _ = core({}, args) assert True diff --git a/test/test_wavelet.py b/test/test_wavelet.py index 6e301e6..dac21c7 100644 --- a/test/test_wavelet.py +++ b/test/test_wavelet.py @@ -14,28 +14,28 @@ def test_wavelet_cont_args(): """Test continuous wavelet transformation with default decomposition level.""" - args = parse_args(["-e", "no", "-c", "-tr", "wave_cont"], "ftio") + args = parse_args(["-e", "no", "-tr", "wave_cont"], "ftio") _ = core([], args) assert True def test_wavelet_disc_args(): """Test discrete wavelet transformation with default decomposition level.""" - args = parse_args(["-e", "no", "-c", "-tr", "wave_disc"], "ftio") + args = parse_args(["-e", "no", "-tr", "wave_disc"], "ftio") _ = core([], args) assert True def test_wavelet_cont_lvl_args(): """Test continuous wavelet transformation with a specified decomposition level.""" - args = parse_args(["-e", "no", "-c", "-tr", "wave_cont", "-le", "5"], "ftio") + args = parse_args(["-e", "no", "-tr", "wave_cont", "-le", "5"], "ftio") _ = core([], args) assert True def test_wavelet_disc_lvl_args(): """Test discrete wavelet transformation with a specified decomposition level.""" - args = parse_args(["-e", "no", "-c", "-tr", "wave_disc", "-le", "5"], "ftio") + args = parse_args(["-e", "no", "-tr", "wave_disc", "-le", "5"], "ftio") _ = core([], args) assert True @@ -51,6 +51,6 @@ def test_wavelet_cont(): os.path.dirname(__file__), "../examples/tmio/ior/collective/1536_new.json", ) - args = ["ftio", file, "-e", "no", "-c", "-tr", "wave_cont"] + args = ["ftio", file, "-e", "no", "-tr", "wave_cont"] _, args = main(args) assert True