173 lines
5.4 KiB
Python
173 lines
5.4 KiB
Python
import typing
|
|
import numpy as np
|
|
import galois
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
|
|
# autopep8: off
|
|
import sys
|
|
import os
|
|
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
|
|
|
|
from hccd import utility, homotopy_generator, path_tracker
|
|
# autopep8: on
|
|
|
|
|
|
@dataclass
|
|
class SimulationArgs:
|
|
euler_step_size: float
|
|
euler_max_tries: int
|
|
newton_max_iter: int
|
|
newton_convergence_threshold: float
|
|
sigma: int
|
|
homotopy_iter: int
|
|
|
|
max_frames: int
|
|
target_frame_errors: int
|
|
|
|
|
|
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
|
|
x_hat = np.where(y >= 0, 0, 1)
|
|
|
|
s = np.concatenate([y, np.array([0])])
|
|
for i in range(args.homotopy_iter):
|
|
x_hat = np.where(s[:-1] >= 0, 0, 1)
|
|
if not np.any(np.mod(H @ x_hat, 2)):
|
|
return x_hat
|
|
|
|
try:
|
|
s = tracker.step(s)
|
|
except:
|
|
return x_hat
|
|
|
|
return x_hat
|
|
|
|
|
|
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
|
|
GF = galois.GF(2)
|
|
H_GF = GF(H)
|
|
G = H_GF.null_space()
|
|
k, n = G.shape
|
|
|
|
num_frames = 0
|
|
|
|
bit_errors = 0
|
|
frame_errors = 0
|
|
decoding_failures = 0
|
|
|
|
homotopy = homotopy_generator.HomotopyGenerator(H)
|
|
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
|
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
|
|
|
for _ in tqdm(range(args.max_frames)):
|
|
Eb_N0_lin = 10**(Eb_N0 / 10)
|
|
N0 = 1 / (2 * k / n * Eb_N0_lin)
|
|
|
|
u = np.random.randint(2, size=k)
|
|
# u = np.zeros(shape=k).astype(np.int32)
|
|
c = np.array(GF(u) @ G)
|
|
x = 1 - 2*c
|
|
|
|
y = x + np.sqrt(N0) * np.random.normal(size=n)
|
|
|
|
homotopy.update_received(y)
|
|
|
|
c_hat = decode(tracker, y, H, args)
|
|
|
|
if np.any(np.mod(H @ c_hat, 2)):
|
|
tracker.set_sigma(-1 * args.sigma)
|
|
c_hat = decode(tracker, y, H, args)
|
|
tracker.set_sigma(args.sigma)
|
|
|
|
if np.any(np.mod(H @ c_hat, 2)):
|
|
decoding_failures += 1
|
|
|
|
bit_errors += np.sum(c_hat != c)
|
|
frame_errors += np.any(c_hat != c)
|
|
num_frames += 1
|
|
|
|
if frame_errors >= args.target_frame_errors:
|
|
break
|
|
|
|
BER = bit_errors / (num_frames * n)
|
|
FER = frame_errors / num_frames
|
|
DFR = decoding_failures / num_frames
|
|
|
|
return FER, BER, DFR, frame_errors
|
|
|
|
|
|
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> pd.DataFrame:
|
|
FERs = []
|
|
BERs = []
|
|
DFRs = []
|
|
frame_errors_list = []
|
|
for SNR in Eb_N0_list:
|
|
FER, BER, DFR, num_frames = simulate_error_rates_for_SNR(H, SNR, args)
|
|
FERs.append(FER)
|
|
BERs.append(BER)
|
|
DFRs.append(DFR)
|
|
frame_errors_list.append(num_frames)
|
|
print(pd.DataFrame({"SNR": Eb_N0_list[:len(FERs)], "FER": FERs, "BER": BERs,
|
|
"DFR": DFRs, "frame_errors": frame_errors_list}))
|
|
|
|
return pd.DataFrame({"SNR": Eb_N0_list, "FER": FERs, "BER": BERs,
|
|
"DFR": DFRs, "frame_errors": frame_errors_list})
|
|
|
|
|
|
def main():
|
|
# Parse command line arguments
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument("-c", "--code", type=str, required=True,
|
|
help="Path to the alist file containing the parity check matrix of the code")
|
|
parser.add_argument("--snr", type=float, required=True, nargs=3,
|
|
metavar=('start', 'stop', 'step'),
|
|
help="SNRs to simulate (Eb/N0) in dB")
|
|
|
|
parser.add_argument("--max-frames", type=int, default=int(1e6),
|
|
help="Maximum number of frames to simulate")
|
|
parser.add_argument("--target-frame-errors", type=int, default=200,
|
|
help="Number of frame errors after which to stop the simulation")
|
|
|
|
parser.add_argument("--euler-step-size", type=float,
|
|
default=0.05, help="Step size for Euler predictor")
|
|
parser.add_argument("--euler-max-tries", type=int, default=5,
|
|
help="Maximum number of tries for Euler predictor")
|
|
parser.add_argument("--newton-max-iter", type=int, default=5,
|
|
help="Maximum number of iterations for Newton corrector")
|
|
parser.add_argument("--newton-convergence-threshold", type=float,
|
|
default=0.01, help="Convergence threshold for Newton corrector")
|
|
parser.add_argument("-s", "--sigma", type=int, default=1,
|
|
help="Direction in which the path is traced")
|
|
parser.add_argument("-n", "--homotopy-iter", type=int, default=20,
|
|
help="Number of iterations of the homotopy continuation method to perform for each decoding")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# TODO: Name this section properly
|
|
# Do stuff
|
|
|
|
H = utility.read_alist_file(args.code)
|
|
|
|
simulation_args = SimulationArgs(
|
|
euler_step_size=args.euler_step_size,
|
|
euler_max_tries=args.euler_max_tries,
|
|
newton_max_iter=args.newton_max_iter,
|
|
newton_convergence_threshold=args.newton_convergence_threshold,
|
|
sigma=args.sigma,
|
|
homotopy_iter=args.homotopy_iter,
|
|
max_frames=args.max_frames,
|
|
target_frame_errors=args.target_frame_errors)
|
|
|
|
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
|
|
df = simulate_error_rates(H, SNRs, simulation_args)
|
|
|
|
print(df)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|