"""This file contains utility functions relating to tests and simulations of the decoders.""" import numpy as np import typing from tqdm import tqdm import signal import pickle import os.path from pathlib import Path import spdlog as spd from utility import noise def count_bit_errors(d: np.array, d_hat: np.array) -> int: """Count the number of wrong bits in a decoded codeword. :param d: Originally sent data :param d_hat: Received data :return: Number of bit errors """ return np.sum(d != d_hat) # TODO: Write unit tests class Simulator: """Class allowing for saving of simulations state. Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. The functionality implemented by this class could be achieved by a bunch of loops and a function. However, storing the state of the simulation as member variables allows for pausing and resuming the simulation at a later state. """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], target_frame_errors: int): """Construct and object of type simulator. :param n: Number of bits in a codeword :param k: Number of bits in a dataword :param decoders: Sequence of decoders to test :param SNRs: Sequence of SNRs for which the BERs should be calculated :param target_frame_errors: Number of frame errors after which to stop the simulation """ # Simulation parameters self._n = n self._k = k self._decoders = decoders self._SNRs = SNRs self._target_frame_errors = target_frame_errors self._x = np.zeros(self._n) self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n # Simulation state self._current_decoder_index = 0 self._current_SNRs_index = 0 self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 # Results & Miscellaneous self._BERs = [[]] self._create_pbars() self._sim_running = False self._sim_done = False def _create_pbars(self): self._overall_pbar = tqdm(total=len(self._decoders), desc="Calculating the answer to life, the universe and everything", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") decoder = self._decoders[self._current_decoder_index] self._decoder_pbar = tqdm(total=len(self._SNRs), desc=f"Calculating BERs for {decoder.__class__.__name__}", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}") self._snr_pbar = tqdm(total=self._target_frame_errors, desc=f"Simulating for SNR = {self._SNRs[0]} dB", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") def __getstate__(self) -> typing.Dict: """Custom serialization function called by the 'pickle' module when saving the state of a currently running simulation """ state = self.__dict__.copy() del state['_overall_pbar'] del state['_decoder_pbar'] del state['_snr_pbar'] return state def __setstate__(self, state) -> None: """Custom deserialization function called by the 'pickle' module when loading a previously saved simulation :param state: Dictionary storing the serialized version of an object of this class """ self.__dict__.update(state) self._create_pbars() self._overall_pbar.update(self._current_decoder_index) self._decoder_pbar.update(self._current_SNRs_index) self._snr_pbar.update(self._curr_num_frame_errors) self._overall_pbar.refresh() self._decoder_pbar.refresh() self._snr_pbar.refresh() def _simulate_transmission(self) -> int: """Simulate the transmission of a single codeword. :return: Number of bit errors that occurred """ SNR = self._SNRs[self._current_SNRs_index] decoder = self._decoders[self._current_decoder_index] y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k) x_hat = decoder.decode(y) return count_bit_errors(self._x, x_hat) def _update_statistics(self, bit_errors: int) -> None: """Update the statistics of the simulator. :param bit_errors: Number of bit errors that occurred during the last transmission """ self._curr_num_iterations += 1 if bit_errors > 0: self._curr_num_frame_errors += 1 self._curr_num_bit_errors += bit_errors self._snr_pbar.update(1) def _advance_state(self) -> None: """Advance the state of the simulator. This function also appends a new BER value to the self._BERs array if the number of target frame errors has been reached """ if self._curr_num_frame_errors >= self._target_frame_errors: self._BERs[self._current_decoder_index] \ .append(self._curr_num_bit_errors / (self._curr_num_iterations * self._n)) self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 if self._current_SNRs_index < len(self._SNRs) - 1: self._current_SNRs_index += 1 self._snr_pbar.reset() self._snr_pbar.set_description(f"Simulating for SNR = {self._SNRs[self._current_SNRs_index]} dB") self._decoder_pbar.update(1) else: if self._current_decoder_index < len(self._decoders) - 1: self._current_decoder_index += 1 self._current_SNRs_index = 0 self._BERs.append([]) self._decoder_pbar.reset() decoder = self._decoders[self._current_decoder_index] self._decoder_pbar.set_description(f"Calculating BERs for {decoder.__class__.__name__}") self._overall_pbar.update(1) else: self._sim_running = False self._sim_done = True self._snr_pbar.close() self._decoder_pbar.close() self._overall_pbar.close() def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ if not self._sim_done: self._sim_running = True while self._sim_running: bit_errors = self._simulate_transmission() self._update_statistics(bit_errors) self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False @property def simulation_done(self) -> bool: """Check whether the simulation is still ongoing or completed. :return: True if the simulation is completed """ return self._sim_done @property def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. :return: Tuple of numpy arrays of the form (SNRs, BERs), where BERs is a list of the form [BER_decoder_1, BER_decoder_2, ...] """ SNRs = np.array(self._SNRs) # If the BERs of a decoder have not been calculated for all SNRs, # fill the rest up with zeros to match the length of the 'SNRs' array BERs = [] for decoder_BER_list in self._BERs: padded = np.pad(decoder_BER_list, (0, len(self._SNRs) - len(decoder_BER_list))) BERs.append(padded) # If the BERs have not been calculated for all decoders, fill up the BERs list # with zero-vectors to match the length of the 'decoders' list for i in range(len(self._decoders) - len(BERs)): BERs.append(np.zeros(len(self._SNRs))) return SNRs, BERs # TODO: Write currently calculated BERs to file when simulation is stopped class SimulationManager: """This class only contains functions relating to stopping and restarting of simulations (and storing of the simulation state in a file, to be resumed at a later date). All actual work is outsourced to a provided simulator class. """ def __init__(self, save_dir: str, results_dir: str): """Construct a SimulationManager object. :param save_dir: Directory in which the simulation state of a paused simulation should be stored :param results_dir: Directory in which the results of the simulation should be stored """ self._save_dir = save_dir self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle" self._logs_filepath = f"{self._save_dir}/logs.txt" self._results_dir = results_dir self._simulator = None Path(self._save_dir).mkdir(parents=True, exist_ok=True) self._logger = spd.FileLogger("SimulationManager", self._logs_filepath) self._logger.set_level(spd.LogLevel.DEBUG) signal.signal(signal.SIGINT, self._exit_gracefully) signal.signal(signal.SIGTERM, self._exit_gracefully) signal.signal(signal.SIGHUP, self._exit_gracefully) # # Functions relating to the pausing and restarting of simulations # def unfinished_simulation_present(self) -> bool: """Check whether the savefile of a previously unfinished simulation is present.""" return os.path.isfile(self._sim_state_filepath) def load_unfinished(self): """Load the state of an unfinished simulation its savefile. Warning: This function deletes the savefile after loading. """ assert self.unfinished_simulation_present() self._logger.info("Loading saved simulation state") with open(self._sim_state_filepath, "rb") as file: self._simulator = pickle.load(file) os.remove(self._sim_state_filepath) def _save_state(self) -> None: """Write the state of the currently configured simulation to a savefile.""" if self._simulator is not None: with open(self._sim_state_filepath, "wb") as file: pickle.dump(self._simulator, file) self._logger.info("Saved simulation state") else: self._logger.info("No simulation state to save: simulator object is 'None'") def _exit_gracefully(self, *args) -> None: """Handler called when the program is interrupted. Pauses and saves the currently running simulation """ self._logger.debug("Intercepted signal SIGINT/SIGTERM") if self._simulator is not None: self._simulator.stop() self._save_state() exit() # # Functions responsible for the actual simulation # def set_simulator(self, simulator: typing.Any) -> None: """Select a simulator to do the actual work.""" self._simulator = simulator def start(self): """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ assert self._simulator is not None self._logger.info("Starting simulation") self._simulator.start() @property def simulation_done(self) -> None: """Check whether the configured simulation has been completed.""" return self._simulator.simulation_done def get_current_results(self) -> typing.Any: """Get the current results of the configured simulation.""" return self._simulator.SNRs_and_BERs