From ffabda0e4d234fd6940616bf26d7fac8d17f7782 Mon Sep 17 00:00:00 2001 From: Andreas Tsouchlos Date: Tue, 13 May 2025 14:59:36 +0200 Subject: [PATCH] Add sweep_parameter_space.py --- python/examples/sweep_parameter_space.py | 171 +++++++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 python/examples/sweep_parameter_space.py diff --git a/python/examples/sweep_parameter_space.py b/python/examples/sweep_parameter_space.py new file mode 100644 index 0000000..cd85134 --- /dev/null +++ b/python/examples/sweep_parameter_space.py @@ -0,0 +1,171 @@ +from pathlib import Path +from ray import tune +import ray +import typing +import numpy as np +import galois +import argparse +from dataclasses import dataclass +import pandas as pd + +# autopep8: off +import sys +import os +sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../") + +from hccd import utility, homotopy_generator, path_tracker +# autopep8: on + + +@dataclass +class SimulationArgs: + euler_step_size: float + euler_max_tries: int + newton_max_iter: int + newton_convergence_threshold: float + sigma: int + homotopy_iter: int + + max_frames: int + target_frame_errors: int + + +def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray: + x_hat = np.where(y >= 0, 0, 1) + + s = np.concatenate([y, np.array([0])]) + for i in range(args.homotopy_iter): + x_hat = np.where(s[:-1] >= 0, 0, 1) + if not np.any(np.mod(H @ x_hat, 2)): + return x_hat + + try: + s = tracker.step(s) + except: + return x_hat + + return x_hat + + +def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]: + GF = galois.GF(2) + H_GF = GF(H) + G = H_GF.null_space() + k, n = G.shape + + num_frames = 0 + + bit_errors = 0 + frame_errors = 0 + decoding_failures = 0 + + homotopy = homotopy_generator.HomotopyGenerator(H) + tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, + args.newton_max_iter, args.newton_convergence_threshold, args.sigma) + + for _ in range(args.max_frames): + Eb_N0_lin = 10**(Eb_N0 / 10) + N0 = 1 / (2 * k / n * Eb_N0_lin) + + u = np.random.randint(2, size=k) + # u = np.zeros(shape=k).astype(np.int32) + c = np.array(GF(u) @ G) + x = 1 - 2*c + + y = x + np.sqrt(N0) * np.random.normal(size=n) + + homotopy.update_received(y) + + c_hat = decode(tracker, y, H, args) + + if np.any(np.mod(H @ c_hat, 2)): + tracker.set_sigma(-1 * args.sigma) + c_hat = decode(tracker, y, H, args) + tracker.set_sigma(args.sigma) + + if np.any(np.mod(H @ c_hat, 2)): + decoding_failures += 1 + + bit_errors += np.sum(c_hat != c) + frame_errors += np.any(c_hat != c) + num_frames += 1 + + if frame_errors >= args.target_frame_errors: + break + + BER = bit_errors / (num_frames * n) + FER = frame_errors / num_frames + DFR = decoding_failures / num_frames + + return FER, BER, DFR, frame_errors + + +def simulation(config): + simulation_args = SimulationArgs( + euler_step_size=config["euler_step_size"], + euler_max_tries=20, + newton_max_iter=20, + newton_convergence_threshold=config["newton_convergence_threshold"], + sigma=config["sigma"], + homotopy_iter=config["homotopy_iter"], + max_frames=100000, + target_frame_errors=200) + + FER, BER, DFR, frame_errors = simulate_error_rates_for_SNR( + config["H"], config["Eb_N0"], simulation_args) + + return {"FER": FER, "BER": BER, "DFR": DFR, "frame_errors": frame_errors} + + +def main(): + # Parse command line arguments + + parser = argparse.ArgumentParser() + + parser.add_argument("-c", "--code", type=str, required=True, + help="Path to the alist file containing the parity check matrix of the code") + + args = parser.parse_args() + + H = utility.read_alist_file(args.code) + + # Run parameter sweep + + ray.init() + + search_space = { + "Eb_N0": tune.grid_search([6]), + "newton_convergence_threshold": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])), + "euler_step_size": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])), + "homotopy_iter": tune.grid_search(np.array([500])), + "sigma": tune.grid_search(np.array([1, -1])), + "H": tune.grid_search([H]) + } + + tuner = tune.Tuner( + simulation, + param_space=search_space, + tune_config=tune.TuneConfig( + max_concurrent_trials=12, + ), + run_config=tune.RunConfig( + name="param_sweep", + storage_path=Path.cwd() / "sim_results" + ) + ) + + results = tuner.fit() + + df = results.get_dataframe() + + keep_columns = ["FER", "BER", "DFR", "frame_errors"] + config_columns = [col for col in df.columns if col.startswith("config/")] + columns_to_keep = keep_columns + config_columns + df = df[columns_to_keep].drop(columns=["config/H"]) + + df.to_csv("sweep_results.csv", index=False) + print(df.head()) + + +if __name__ == "__main__": + main()