diff --git a/python/examples/simulate_error_rate.py b/python/examples/simulate_error_rate.py new file mode 100644 index 0000000..52ed0e1 --- /dev/null +++ b/python/examples/simulate_error_rate.py @@ -0,0 +1,150 @@ +import typing +import numpy as np +import galois +import argparse +from dataclasses import dataclass +from tqdm import tqdm + +# autopep8: off +import sys +import os +sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../") + +from hccd import utility, homotopy_generator, path_tracker +# autopep8: on + + +@dataclass +class SimulationArgs: + euler_step_size: float + euler_max_tries: int + newton_max_iter: int + newton_convergence_threshold: float + sigma: int + homotopy_iter: int + + max_frames: int + target_frame_errors: int + + +def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray: + x_hat = np.mod(np.round(y), 2).astype('int32') + + s = np.concatenate([y, np.array([0])]) + for i in range(args.homotopy_iter): + x_hat = np.mod(np.round(s[:-1]), 2).astype('int32') + if not np.any(np.mod(H @ x_hat, 2)): + return x_hat + + # if s[-1] > 1.5: + # return x_hat + + try: + s = tracker.step(s) + except: + return x_hat + + return x_hat + + +def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray]: + GF = galois.GF(2) + H_GF = GF(H) + G = H_GF.null_space() + k, n = G.shape + + homotopy = homotopy_generator.HomotopyGenerator(H) + + # print(f"G: {homotopy.G}") + # print(f"F: {homotopy.F}") + # print(f"H: {homotopy.H}") + # print(f"DH: {homotopy.DH}") + + tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, + args.newton_max_iter, args.newton_convergence_threshold, args.sigma) + + num_frames = 0 + + bit_errors = 0 + frame_errors = 0 + decoding_failures = 0 + + for _ in tqdm(range(args.max_frames)): + Eb_N0_lin = 10**(Eb_N0 / 10) + N0 = 1 / (2 * k / n * Eb_N0_lin) + y = np.zeros(n) + np.sqrt(N0) * np.random.normal(size=n) + + x_hat = decode(tracker, y, H, args) + + bit_errors += np.sum(x_hat != np.zeros(n)) + frame_errors += np.any(x_hat != np.zeros(n)) + + if np.any(np.mod(H @ x_hat, 2)): + decoding_failures += 1 + + num_frames += 1 + + if frame_errors > args.target_frame_errors: + break + + BER = bit_errors / (num_frames * n) + FER = frame_errors / num_frames + DFR = decoding_failures / num_frames + + return FER, BER, DFR + + +def main(): + # Parse command line arguments + + parser = argparse.ArgumentParser() + + parser.add_argument("-c", "--code", type=str, required=True, + help="Path to the alist file containing the parity check matrix of the code") + # TODO: Extend this script to multiple SRNs + parser.add_argument("--snr", type=float, required=True, + help="Eb/N0 to use for this simulation") + parser.add_argument("--max-frames", type=int, default=int(1e6), + help="Maximum number of frames to simulate") + parser.add_argument("--target-frame-errors", type=int, default=200, + help="Number of frame errors after which to stop the simulation") + + parser.add_argument("--euler-step-size", type=float, + default=0.05, help="Step size for Euler predictor") + parser.add_argument("--euler-max-tries", type=int, default=5, + help="Maximum number of tries for Euler predictor") + parser.add_argument("--newton-max-iter", type=int, default=5, + help="Maximum number of iterations for Newton corrector") + parser.add_argument("--newton-convergence-threshold", type=float, + default=0.01, help="Convergence threshold for Newton corrector") + parser.add_argument("-s", "--sigma", type=int, default=1, + help="Direction in which the path is traced") + parser.add_argument("-n", "--homotopy-iter", type=int, default=20, + help="Number of iterations of the homotopy continuation method to perform for each decoding") + + args = parser.parse_args() + + # TODO: Name this section properly + # Do stuff + + H = utility.read_alist_file(args.code) + + simulation_args = SimulationArgs( + euler_step_size=args.euler_step_size, + euler_max_tries=args.euler_max_tries, + newton_max_iter=args.newton_max_iter, + newton_convergence_threshold=args.newton_convergence_threshold, + sigma=args.sigma, + homotopy_iter=args.homotopy_iter, + max_frames=args.max_frames, + target_frame_errors=args.target_frame_errors) + + FER, BER, DFR = simulate_error_rates_for_SNR(H, args.snr, simulation_args) + + print(f"FER: {FER}") + print(f"DFR: {DFR}") + print(f"BER: {BER}") + + +if __name__ == "__main__": + main() diff --git a/python/hccd/utility.py b/python/hccd/utility.py new file mode 100644 index 0000000..df8c796 --- /dev/null +++ b/python/hccd/utility.py @@ -0,0 +1,29 @@ +import numpy as np + + +def _parse_alist_header(header): + size = header.split() + return int(size[0]), int(size[1]) + + +def read_alist_file(filename): + """ + This function reads in an alist file and creates the + corresponding parity check matrix H. The format of alist + files is described at: + http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html + """ + + with open(filename, 'r') as myfile: + data = myfile.readlines() + numCols, numRows = _parse_alist_header(data[0]) + + H = np.zeros((numRows, numCols)) + + # The locations of 1s starts in the 5th line of the file + for lineNumber in np.arange(4, 4 + numCols): + indices = data[lineNumber].split() + for index in indices: + H[int(index) - 1, lineNumber - 4] = 1 + + return H.astype(np.int32)