ba-thesis/sw/simulate_2d_BER.py

146 lines
3.7 KiB
Python

import typing
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import signal
from timeit import default_timer
from tqdm import tqdm
from dataclasses import dataclass
from types import MappingProxyType
from utility import codes, noise, misc
from utility.simulation.simulators import GenericMultithreadedSimulator
from cpp_modules.cpp_decoders import ProximalDecoder_204_102 as ProximalDecoder
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
return np.sum(d != d_hat)
def task_func(params):
"""Function called by the GenericMultithreadedSimulator instance.
Calculate the BER, FER, and DFR for a given SNR and gamma.
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)
decoder = params["decoder"]
max_iterations = params["max_iterations"]
SNR = params["SNR"]
n = params["n"]
k = params["k"]
c = np.zeros(n)
x_bpsk = c + 1
total_bit_errors = 0
total_frame_errors = 0
dec_fails = 0
num_iterations = 0
for i in range(max_iterations):
x = noise.add_awgn(x_bpsk, SNR, n, k)
x_hat, k_max = decoder.decode(x)
bit_errors = count_bit_errors(x_hat, c)
if bit_errors > 0:
total_bit_errors += bit_errors
total_frame_errors += 1
num_iterations += 1
if k_max == -1:
dec_fails += 1
if total_frame_errors > 100:
break
BER = total_bit_errors / (num_iterations * n)
FER = total_frame_errors / num_iterations
DFR = dec_fails / (num_iterations + dec_fails)
return {"BER": BER, "FER": FER, "DFR": DFR,
"num_iterations": num_iterations}
def simulate(H_file, SNRs, max_iterations, omega, K, gammas):
sim = GenericMultithreadedSimulator()
# Define fixed simulation params
H = codes.read_alist_file(f"res/{H_file}")
n_min_k, n = H.shape
k = n - n_min_k
# Define params different for each task
task_params = []
for i, SNR in enumerate(SNRs):
for j, gamma in enumerate(gammas):
decoder = ProximalDecoder(H=H.astype('int32'), K=K, omega=omega,
gamma=gamma)
task_params.append(
{"decoder": decoder, "max_iterations": max_iterations,
"SNR": SNR, "gamma": gamma, "n": n, "k": k})
# Set up simulation
sim.task_params = task_params
sim.task_func = task_func
sim.start_or_continue()
return sim.current_results
def main():
# Set up simulation params
sim_name = "2d_BER_FER_DFR"
# H_file = "BCH_7_4.alist"
# H_file = "BCH_31_11.alist"
# H_file = "BCH_31_26.alist"
# H_file = "96.3.965.alist"
H_file = "204.33.486.alist"
# H_file = "204.33.484.alist"
# H_file = "204.55.187.alist"
# H_file = "408.33.844.alist"
SNRs = np.arange(1, 6, 0.5)
max_iterations = 20000
omega = 0.05
K = 100
gammas = np.arange(0.0, 0.17, 0.01)
# Run simulation
start_time = default_timer()
results = simulate(H_file, SNRs, max_iterations, omega, K, gammas)
end_time = default_timer()
print(f"duration: {end_time - start_time}")
df = misc.pgf_reformat_data_3d(results=results, x_param_name="SNR",
y_param_name="gamma",
z_param_names=["BER", "FER", "DFR",
"num_iterations"])
# df.sort_values(by=["gamma", "SNR"]).to_csv(
# f"sim_results/{sim_name}_{misc.slugify(H_file)}.csv", index=False)
sns.set_theme()
ax = sns.lineplot(data=df, x="SNR", y="BER", hue="gamma")
ax.set_yscale('log')
ax.set_ylim((5e-5, 2e-0))
plt.show()
if __name__ == "__main__":
main()