"""This file contains utility functions relating to tests and simulations of the decoders.""" import time import numpy as np import typing from tqdm import tqdm from timeit import default_timer import signal from dataclasses import dataclass import pickle import os.path from pathlib import Path import spdlog as spd from utility import noise def count_bit_errors(d: np.array, d_hat: np.array) -> int: """Count the number of wrong bits in a decoded codeword. :param d: Originally sent data :param d_hat: Received data :return: Number of bit errors """ return np.sum(d != d_hat) # TODO: Write unit tests class Simulator: """Class allowing for saving of simulations state. Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. The functionality implemented by this class could be achieved by a bunch of loops and a function. However, storing the state of the simulation as member variables allows for pausing and resuming the simulation at a later state. """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], target_frame_errors: int): """Construct and object of type simulator. :param n: Number of bits in a codeword :param k: Number of bits in a dataword :param decoders: Sequence of decoders to test :param SNRs: Sequence of SNRs for which the BERs should be calculated :param target_frame_errors: Number of frame errors after which to stop the simulation """ # Simulation parameters self._n = n self._k = k self._decoders = decoders self._SNRs = SNRs self._target_frame_errors = target_frame_errors self._x = np.zeros(self._n) self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n # Simulation state self._current_decoder_index = 0 self._current_SNRs_index = 0 self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 # Results & Miscellaneous self._BERs = [[]] self._create_pbars() self._sim_running = False self._sim_done = False def _create_pbars(self): self._overall_pbar = tqdm(total=len(self._decoders), desc="Calculating the answer to life, the universe and everything", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") decoder = self._decoders[self._current_decoder_index] self._decoder_pbar = tqdm(total=len(self._SNRs), desc=f"Calculating BERs for {decoder.__class__.__name__}", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") self._snr_pbar = tqdm(total=self._target_frame_errors, desc=f"Simulating for SNR = {self._SNRs[0]} dB", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") def __getstate__(self): """Custom serialization function called by the 'pickle' module when saving the state of a currently running simulation """ state = self.__dict__.copy() del state['_overall_pbar'] del state['_decoder_pbar'] del state['_snr_pbar'] return state def __setstate__(self, state): """Custom deserialization function called by the 'pickle' module when loading a previously saved simulation """ self.__dict__.update(state) self._create_pbars() self._overall_pbar.update(self._current_decoder_index) self._decoder_pbar.update(self._current_SNRs_index) self._snr_pbar.update(self._curr_num_frame_errors) self._overall_pbar.refresh() self._decoder_pbar.refresh() self._snr_pbar.refresh() def _simulate_transmission(self) -> int: """Simulate the transmission of a single codeword. :return: Number of bit errors that occurred """ SNR = self._SNRs[self._current_SNRs_index] decoder = self._decoders[self._current_decoder_index] y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k) x_hat = decoder.decode(y) return count_bit_errors(self._x, x_hat) def _update_statistics(self, bit_errors: int) -> None: """Update the statistics of the simulator. :param bit_errors: Number of bit errors that occurred during the last transmission """ self._curr_num_iterations += 1 if bit_errors > 0: self._curr_num_frame_errors += 1 self._curr_num_bit_errors += bit_errors self._snr_pbar.update(1) def _advance_state(self) -> None: """Advance the state of the simulator. This function also appends a new BER value to the self._BERs array if the number of target frame errors has been reached """ if self._curr_num_frame_errors >= self._target_frame_errors: self._BERs[self._current_decoder_index] \ .append(self._curr_num_bit_errors / (self._curr_num_iterations * self._n)) self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 if self._current_SNRs_index < len(self._SNRs) - 1: self._current_SNRs_index += 1 self._snr_pbar.reset() self._snr_pbar.set_description(f"Simulating for SNR = {self._SNRs[self._current_SNRs_index]} dB") self._decoder_pbar.update(1) else: if self._current_decoder_index < len(self._decoders) - 1: self._current_decoder_index += 1 self._current_SNRs_index = 0 self._BERs.append([]) self._decoder_pbar.reset() decoder = self._decoders[self._current_decoder_index] self._decoder_pbar.set_description(f"Calculating BERs for {decoder.__class__.__name__}") self._overall_pbar.update(1) else: self._sim_running = False self._sim_done = True self._snr_pbar.close() self._decoder_pbar.close() self._overall_pbar.close() def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ if not self._sim_done: self._sim_running = True while self._sim_running: bit_errors = self._simulate_transmission() self._update_statistics(bit_errors) self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False @property def simulation_done(self) -> bool: """Check whether the simulation is still ongoing or completed. :return: True if the simulation is completed """ return self._sim_done @property def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. :return: Tuple of numpy arrays of the form (SNRs, BERs), where BERs is a list of the form [BER_decoder_1, BER_decoder_2, ...] """ SNRs = np.array(self._SNRs) # If the BERs of a decoder have not been calculated for all SNRs, # fill the rest up with zeros to match the length of the 'SNRs' array BERs = [] for decoder_BER_list in self._BERs: padded = np.pad(decoder_BER_list, (0, len(self._SNRs) - len(decoder_BER_list))) BERs.append(padded) # If the BERs have not been calculated for all decoders, fill up the BERs list # with zero-vectors to match the length of the 'decoders' list for i in range(len(self._decoders) - len(BERs)): BERs.append(np.zeros(len(self._SNRs))) return SNRs, BERs # TODO: Write currently calculated BERs to file when simulation is stopped class SimulationManager: """This class only contains functions relating to stopping and restarting of simulations (and storing of the simulation state in a file, to be resumed at a later date). All actual work is outsourced to a provided simulator class. """ def __init__(self, save_dir: str, results_dir: str): """Construct a SimulationManager object. :param save_dir: Directory in which the simulation state of a paused simulation should be stored :param results_dir: Directory in which the results of the simulation should be stored """ self._save_dir = save_dir self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle" self._logs_filepath = f"{self._save_dir}/logs.txt" self._results_dir = results_dir self._simulator = None Path(self._save_dir).mkdir(parents=True, exist_ok=True) self._logger = spd.FileLogger("SimulationManager", self._logs_filepath) self._logger.set_level(spd.LogLevel.DEBUG) signal.signal(signal.SIGINT, self._exit_gracefully) signal.signal(signal.SIGTERM, self._exit_gracefully) signal.signal(signal.SIGHUP, self._exit_gracefully) # # Functions relating to the pausing and restarting of simulations # def unfinished_simulation_present(self) -> bool: """Check whether the savefile of a previously unfinished simulation is present.""" return os.path.isfile(self._sim_state_filepath) def load_unfinished(self): """Load the state of an unfinished simulation its savefile. Warning: This function deletes the savefile after loading. """ assert self.unfinished_simulation_present() self._logger.info("Loading saved simulation state") with open(self._sim_state_filepath, "rb") as file: self._simulator = pickle.load(file) os.remove(self._sim_state_filepath) def _save_state(self) -> None: """Write the state of the currently configured simulation to a savefile.""" if self._simulator is not None: with open(self._sim_state_filepath, "wb") as file: pickle.dump(self._simulator, file) self._logger.info("Saved simulation state") else: self._logger.info("No simulation state to save: simulator object is 'None'") def _exit_gracefully(self, *args) -> None: """Handler called when the program is interrupted. Pauses and saves the currently running simulation """ self._logger.debug("Intercepted signal SIGINT/SIGTERM") if self._simulator is not None: self._simulator.stop() self._save_state() exit() # # Functions responsible for the actual simulation # def set_simulator(self, simulator: typing.Any) -> None: """Select a simulator to do the actual work.""" self._simulator = simulator def start(self): """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ assert self._simulator is not None self._logger.info("Starting simulation") self._simulator.start() @property def simulation_done(self): """Check whether the configured simulation has been completed.""" return self._simulator.simulation_done def get_current_results(self) -> typing.Any: """Get the current results of the configured simulation.""" return self._simulator.SNRs_and_BERs