import pandas as pd import numpy as np import typing from tqdm import tqdm from utility import noise def count_bit_errors(d: np.array, d_hat: np.array) -> int: """Count the number of wrong bits in a decoded codeword. :param d: Originally sent data :param d_hat: Received data :return: Number of bit errors """ return np.sum(d != d_hat) # TODO: Write unit tests class ProximalDecoderSimulator: """Class allowing for saving of simulations state. Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. The functionality implemented by this class could be achieved by a bunch of loops and a function. However, storing the state of the simulation as member variables allows for pausing and resuming the simulation at a later time. """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], target_frame_errors: int, max_num_iterations: int): """Construct and object of type simulator. :param n: Number of bits in a codeword :param k: Number of bits in a dataword :param decoders: Sequence of decoders to test :param SNRs: Sequence of SNRs for which the BERs should be calculated :param target_frame_errors: Number of frame errors after which to stop the simulation """ # Simulation parameters self._n = n self._k = k self._decoders = decoders self._SNRs = SNRs self._target_frame_errors = target_frame_errors self._max_num_iterations = max_num_iterations self._x = np.zeros(self._n) self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n # Simulation state self._curr_decoder_index = 0 self._curr_SNRs_index = 0 self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 self._curr_num_dec_fails = 0 # Results & Miscellaneous self._BERs = [np.zeros(len(SNRs)) for i in range(len(decoders))] self._dec_fails = [np.zeros(len(SNRs)) for i in range(len(decoders))] self._avg_K = [np.zeros(len(SNRs)) for i in range(len(decoders))] self._create_pbars() self._sim_running = False def _create_pbars(self): self._overall_pbar = tqdm(total=len(self._decoders), desc="Calculating the answer to life, " "the universe and everything", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt} [{elapsed}]") decoder = self._decoders[self._curr_decoder_index] self._decoder_pbar = tqdm(total=len(self._SNRs), desc=f"Calculating" f"g BERs" f" for {decoder.__class__.__name__}", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt}") self._snr_pbar = tqdm(total=self._max_num_iterations, desc=f"Simulating for SNR = {self._SNRs[0]} dB", leave=False, ) def __getstate__(self) -> typing.Dict: """Custom serialization function called by the 'pickle' module when saving the state of a currently running simulation """ state = self.__dict__.copy() del state['_overall_pbar'] del state['_decoder_pbar'] del state['_snr_pbar'] return state def __setstate__(self, state) -> None: """Custom deserialization function called by the 'pickle' module when loading a previously saved simulation :param state: Dictionary storing the serialized version of an object of this class """ self.__dict__.update(state) self._create_pbars() self._overall_pbar.update(self._curr_decoder_index) self._decoder_pbar.update(self._curr_SNRs_index) self._snr_pbar.update(self._curr_num_frame_errors) self._overall_pbar.refresh() self._decoder_pbar.refresh() self._snr_pbar.refresh() def _simulate_transmission(self) -> int: """Simulate the transmission of a single codeword. :return: Number of bit errors that occurred """ SNR = self._SNRs[self._curr_SNRs_index] decoder = self._decoders[self._curr_decoder_index] y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k) x_hat, K = decoder.decode(y) # Handle decoding failure if x_hat is not None: self._avg_K[self._curr_decoder_index][self._curr_SNRs_index] += K return count_bit_errors(self._x, x_hat) else: self._curr_num_dec_fails += 1 return 0 def _update_statistics(self, bit_errors: int) -> None: """Update the statistics of the simulator. :param bit_errors: Number of bit errors that occurred during the last transmission """ self._curr_num_iterations += 1 self._snr_pbar.update(1) if bit_errors > 0: self._curr_num_frame_errors += 1 self._curr_num_bit_errors += bit_errors def _advance_state(self) -> None: """Advance the state of the simulator. This function also handles setting the result arrays and progress bars. """ if (self._curr_num_frame_errors >= self._target_frame_errors) or ( self._curr_num_iterations > self._max_num_iterations): # Adjust the number of iterations to ignore decoding failures adj_num_iterations = self._curr_num_iterations - \ self._curr_num_dec_fails if adj_num_iterations == 0: self._BERs[self._curr_decoder_index][self._curr_SNRs_index] = 1 else: self._BERs[self._curr_decoder_index][self._curr_SNRs_index] \ = self._curr_num_bit_errors / ( adj_num_iterations * self._n) self._avg_K[self._curr_decoder_index][self._curr_SNRs_index]\ = \ self._avg_K[self._curr_decoder_index][ self._curr_SNRs_index] / adj_num_iterations self._dec_fails[self._curr_decoder_index][self._curr_SNRs_index] \ = self._curr_num_dec_fails self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 self._curr_num_dec_fails = 0 if self._curr_SNRs_index < len(self._SNRs) - 1: self._curr_SNRs_index += 1 self._snr_pbar.reset() self._overall_pbar.refresh() self._snr_pbar.set_description( f"Simulating for SNR = " f"{self._SNRs[self._curr_SNRs_index]} dB") self._decoder_pbar.update(1) else: if self._curr_decoder_index < len(self._decoders) - 1: self._curr_decoder_index += 1 self._curr_SNRs_index = 0 self._decoder_pbar.reset() decoder = self._decoders[self._curr_decoder_index] self._decoder_pbar.set_description( f"Calculating BERs for {decoder.__class__.__name__}") self._overall_pbar.update(1) else: self._sim_running = False self._snr_pbar.close() self._decoder_pbar.close() self._overall_pbar.close() def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ self._sim_running = True while self._sim_running: bit_errors = self._simulate_transmission() self._update_statistics(bit_errors) self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False def get_current_results(self) -> pd.DataFrame: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. :return: pandas Dataframe with the columns ["SNR", "BER_1", "BER_2", ..., "DecFails_1", "DecFails_2", ...] """ data = {"SNR": np.array(self._SNRs)} for i, decoder_BERs in enumerate(self._BERs): data[f"BER_{i}"] = decoder_BERs for i, decoder_dec_fails in enumerate(self._dec_fails): data[f"DecFails_{i}"] = decoder_dec_fails for i, avg_K in enumerate(self._avg_K): data[f"AvgK_{i}"] = avg_K return pd.DataFrame(data)