from pathlib import Path from ray import tune import ray import typing import numpy as np import galois import argparse from dataclasses import dataclass import pandas as pd # autopep8: off import sys import os sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../") from hccd import utility, homotopy_generator, path_tracker # autopep8: on @dataclass class SimulationArgs: euler_step_size: float euler_max_tries: int newton_max_iter: int newton_convergence_threshold: float sigma: int homotopy_iter: int max_frames: int target_frame_errors: int def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray: x_hat = np.where(y >= 0, 0, 1) s = np.concatenate([y, np.array([0])]) for i in range(args.homotopy_iter): x_hat = np.where(s[:-1] >= 0, 0, 1) if not np.any(np.mod(H @ x_hat, 2)): return x_hat try: s = tracker.step(s) except: return x_hat return x_hat def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]: GF = galois.GF(2) H_GF = GF(H) G = H_GF.null_space() k, n = G.shape num_frames = 0 bit_errors = 0 frame_errors = 0 decoding_failures = 0 homotopy = homotopy_generator.HomotopyGenerator(H) tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, args.newton_max_iter, args.newton_convergence_threshold, args.sigma) for _ in range(args.max_frames): Eb_N0_lin = 10**(Eb_N0 / 10) N0 = 1 / (2 * k / n * Eb_N0_lin) u = np.random.randint(2, size=k) # u = np.zeros(shape=k).astype(np.int32) c = np.array(GF(u) @ G) x = 1 - 2*c y = x + np.sqrt(N0) * np.random.normal(size=n) homotopy.update_received(y) c_hat = decode(tracker, y, H, args) if np.any(np.mod(H @ c_hat, 2)): tracker.set_sigma(-1 * args.sigma) c_hat = decode(tracker, y, H, args) tracker.set_sigma(args.sigma) if np.any(np.mod(H @ c_hat, 2)): decoding_failures += 1 bit_errors += np.sum(c_hat != c) frame_errors += np.any(c_hat != c) num_frames += 1 if frame_errors >= args.target_frame_errors: break BER = bit_errors / (num_frames * n) FER = frame_errors / num_frames DFR = decoding_failures / num_frames return FER, BER, DFR, frame_errors def simulation(config): simulation_args = SimulationArgs( euler_step_size=config["euler_step_size"], euler_max_tries=20, newton_max_iter=20, newton_convergence_threshold=config["newton_convergence_threshold"], sigma=config["sigma"], homotopy_iter=config["homotopy_iter"], max_frames=100000, target_frame_errors=200) FER, BER, DFR, frame_errors = simulate_error_rates_for_SNR( config["H"], config["Eb_N0"], simulation_args) return {"FER": FER, "BER": BER, "DFR": DFR, "frame_errors": frame_errors} def main(): # Parse command line arguments parser = argparse.ArgumentParser() parser.add_argument("-c", "--code", type=str, required=True, help="Path to the alist file containing the parity check matrix of the code") args = parser.parse_args() H = utility.read_alist_file(args.code) # Run parameter sweep ray.init() search_space = { "Eb_N0": tune.grid_search([6]), "newton_convergence_threshold": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])), "euler_step_size": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])), "homotopy_iter": tune.grid_search(np.array([500])), "sigma": tune.grid_search(np.array([1, -1])), "H": tune.grid_search([H]) } tuner = tune.Tuner( simulation, param_space=search_space, tune_config=tune.TuneConfig( max_concurrent_trials=12, ), run_config=tune.RunConfig( name="param_sweep", storage_path=Path.cwd() / "sim_results" ) ) results = tuner.fit() df = results.get_dataframe() keep_columns = ["FER", "BER", "DFR", "frame_errors"] config_columns = [col for col in df.columns if col.startswith("config/")] columns_to_keep = keep_columns + config_columns df = df[columns_to_keep].drop(columns=["config/H"]) df.to_csv("sweep_results.csv", index=False) print(df.head()) if __name__ == "__main__": main()