"""This file contains utility functions relating to tests and simulations of the decoders.""" import json import pandas as pd import numpy as np import typing from tqdm import tqdm import signal import pickle import os from pathlib import Path from utility import noise, misc def count_bit_errors(d: np.array, d_hat: np.array) -> int: """Count the number of wrong bits in a decoded codeword. :param d: Originally sent data :param d_hat: Received data :return: Number of bit errors """ return np.sum(d != d_hat) # TODO: Write unit tests # TODO: Create generic Simulator Interface which should be implemented for # specific applications class Simulator: """Class allowing for saving of simulations state. Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. The functionality implemented by this class could be achieved by a bunch of loops and a function. However, storing the state of the simulation as member variables allows for pausing and resuming the simulation at a later time. """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], target_frame_errors: int): """Construct and object of type simulator. :param n: Number of bits in a codeword :param k: Number of bits in a dataword :param decoders: Sequence of decoders to test :param SNRs: Sequence of SNRs for which the BERs should be calculated :param target_frame_errors: Number of frame errors after which to stop the simulation """ # Simulation parameters self._n = n self._k = k self._decoders = decoders self._SNRs = SNRs self._target_frame_errors = target_frame_errors self._x = np.zeros(self._n) self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n # Simulation state self._current_decoder_index = 0 self._current_SNRs_index = 0 self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 # Results & Miscellaneous self._BERs = [[]] self._create_pbars() self._sim_running = False def _create_pbars(self): self._overall_pbar = tqdm(total=len(self._decoders), desc="Calculating the answer to life, " "the universe and everything", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt}") decoder = self._decoders[self._current_decoder_index] self._decoder_pbar = tqdm(total=len(self._SNRs), desc=f"Calculatin" f"g BERs" f" for {decoder.__class__.__name__}", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt}") self._snr_pbar = tqdm(total=self._target_frame_errors, desc=f"Simulating for SNR = {self._SNRs[0]} dB", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} " "[{elapsed}<{remaining}]") def __getstate__(self) -> typing.Dict: """Custom serialization function called by the 'pickle' module when saving the state of a currently running simulation """ state = self.__dict__.copy() del state['_overall_pbar'] del state['_decoder_pbar'] del state['_snr_pbar'] return state def __setstate__(self, state) -> None: """Custom deserialization function called by the 'pickle' module when loading a previously saved simulation :param state: Dictionary storing the serialized version of an object of this class """ self.__dict__.update(state) self._create_pbars() self._overall_pbar.update(self._current_decoder_index) self._decoder_pbar.update(self._current_SNRs_index) self._snr_pbar.update(self._curr_num_frame_errors) self._overall_pbar.refresh() self._decoder_pbar.refresh() self._snr_pbar.refresh() def _simulate_transmission(self) -> int: """Simulate the transmission of a single codeword. :return: Number of bit errors that occurred """ SNR = self._SNRs[self._current_SNRs_index] decoder = self._decoders[self._current_decoder_index] y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k) x_hat = decoder.decode(y) return count_bit_errors(self._x, x_hat) def _update_statistics(self, bit_errors: int) -> None: """Update the statistics of the simulator. :param bit_errors: Number of bit errors that occurred during the last transmission """ self._curr_num_iterations += 1 if bit_errors > 0: self._curr_num_frame_errors += 1 self._curr_num_bit_errors += bit_errors self._snr_pbar.update(1) def _advance_state(self) -> None: """Advance the state of the simulator. This function also appends a new BER value to the self._BERs array if the number of target frame errors has been reached """ if self._curr_num_frame_errors >= self._target_frame_errors: self._BERs[self._current_decoder_index] \ .append(self._curr_num_bit_errors / ( self._curr_num_iterations * self._n)) self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 if self._current_SNRs_index < len(self._SNRs) - 1: self._current_SNRs_index += 1 self._snr_pbar.reset() self._snr_pbar.set_description( f"Simulating for SNR = " f"{self._SNRs[self._current_SNRs_index]} dB") self._decoder_pbar.update(1) else: if self._current_decoder_index < len(self._decoders) - 1: self._current_decoder_index += 1 self._current_SNRs_index = 0 self._BERs.append([]) self._decoder_pbar.reset() decoder = self._decoders[self._current_decoder_index] self._decoder_pbar.set_description( f"Calculating BERs for {decoder.__class__.__name__}") self._overall_pbar.update(1) else: self._sim_running = False self._snr_pbar.close() self._decoder_pbar.close() self._overall_pbar.close() def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ self._sim_running = True while self._sim_running: bit_errors = self._simulate_transmission() self._update_statistics(bit_errors) self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False def get_current_results(self) -> pd.DataFrame: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. :return: pandas Dataframe with the columns ["SNR", "BER_1", "BER_2", ...] """ data = {"SNR": np.array(self._SNRs)} # If the BERs of a decoder have not been calculated for all SNRs, # fill the rest up with zeros to match the length of the 'SNRs' array for i, decoder_BER_list in enumerate(self._BERs): padded = np.pad(decoder_BER_list, (0, len(self._SNRs) - len(decoder_BER_list))) data[f"BER_{i}"] = padded # If the BERs have not been calculated for all decoders, fill up the # BERs list # with zero-vectors to match the length of the 'decoders' list for i in range(len(self._decoders), len(self._BERs)): data[f"BER_{i}"] = np.zeros(len(self._SNRs)) return pd.DataFrame(data) # TODO: Fix typing.Any or Simulator class SimulationDeSerializer: """Class responsible for file management, de- and serialization of Simulator objects.""" def __init__(self, save_dir: str, results_dir: str): self._saves_dir = save_dir self._results_dir = results_dir Path(self._saves_dir).mkdir(parents=True, exist_ok=True) Path(self._results_dir).mkdir(parents=True, exist_ok=True) def _get_savefile_path(self, sim_name): return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle" def _get_metadata_path(self, sim_name): return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json" def _get_results_path(self, sim_name): return f"{self._results_dir}/{misc.slugify(sim_name)}.csv" def _read_metadata(self, sim_name) -> typing.Dict: with open(self._get_metadata_path(sim_name), 'r', encoding='utf-8') as f: return json.load(f) def _save_metadata(self, sim_name, metadata) -> None: with open(self._get_metadata_path(sim_name), 'w+', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=4) def unfinished_sim_present(self, sim_name: str): """Check if the savefile of a previously paused simulation is present. :param sim_name: Name :return: True if a paused simulation with the given name is found """ return os.path.isfile( self._get_savefile_path(sim_name)) and os.path.isfile( self._get_metadata_path(sim_name)) # TODO: Make the directories configurable in the init function def get_unfinished_sims(self) -> typing.List[str]: """Get a list unfinished simulations.""" result = [] save_files = [f for f in os.listdir(self._saves_dir) if os.path.isfile(os.path.join(self._saves_dir, f))] state_files = [f for f in save_files if f.endswith("_state.pickle")] sim_slugs = [f.removesuffix("_state.pickle") for f in state_files] sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs] return sim_names def remove_unfinished_sim(self, sim_name): """Remove the savefile of a previously paused simulation. :param sim_name: Name of the simulation """ os.remove(self._get_savefile_path(sim_name)) # os.remove(self._get_metadata_path(sim_name)) def save_state(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None: """Save the state of a currently running simulation. :param simulator: Simulator object :param sim_name: Name of the simulation :param metadata: Metadata to be saved besides the actual state """ # Save metadata self._save_metadata(sim_name, metadata) # Save simulation state with open(self._get_savefile_path(sim_name), "wb") as file: pickle.dump(simulator, file) def read_state(self, sim_name: str) -> typing.Tuple[ typing.Any, typing.Dict]: """Read the saved state of a paused simulation. :param sim_name: Name of the simulation :return: Tuple of the form (simulator, metadata) """ metadata = None simulator = None # Read metadata metadata = self._read_metadata(sim_name) # Read simulation state with open(self._get_savefile_path(sim_name), "rb") as file: simulator = pickle.load(file) return simulator, metadata # TODO: Is the simulator object actually necessary here? def save_results(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None: """Save simulation results to file. :param simulator: Simulator object. Used to obtain the data :param sim_name: Name of the simulation. Determines the filename :param metadata: Metadata to be saved besides the actual simulation results """ # Save metadata self._save_metadata(sim_name, metadata) # Save results df = simulator.get_current_results() df.to_csv(self._get_results_path(sim_name)) def read_results(self, sim_name: str) -> typing.Tuple[ pd.DataFrame, typing.Dict]: """Read simulation results from file. :param sim_name: Name of the simulation. :return: Tuple of the form (data, metadata), where data is a pandas dataframe and metadata is a dict """ # Read metadata metadata = self._read_metadata(sim_name) # Read results results = pd.read_csv(self._get_results_path(sim_name)) return results, metadata # TODO: Fix typing.Any or Simulator # TODO: Autosave simulation every so often class SimulationManager: """This class only contains functions relating to stopping and restarting of simulations (and storing of the simulation state in a file, to be resumed at a later date). All actual work is outsourced to a provided simulator class. """ def __init__(self, saves_dir: str, results_dir: str): """Construct a SimulationManager object. :param saves_dir: Directory in which the simulation state of a paused simulation should be stored :param results_dir: Directory in which the results of the simulation should be stored """ self._de_serializer = SimulationDeSerializer(saves_dir, results_dir) self._simulator = None self._sim_name = None self._metadata = {} signal.signal(signal.SIGINT, self._exit_gracefully) signal.signal(signal.SIGTERM, self._exit_gracefully) signal.signal(signal.SIGHUP, self._exit_gracefully) def _sim_configured(self) -> bool: """Check whether 'configure_simulation()' has been called.""" return (self._simulator is not None) \ and (self._sim_name is not None) \ and (self._metadata is not None) def configure_simulation(self, simulator: typing.Any, name: str, column_labels: typing.Sequence[str]) -> None: """Configure a new simulation.""" self._simulator = simulator self._sim_name = name self._metadata["name"] = name self._metadata["labels"] = column_labels def get_unfinished(self) -> typing.List[str]: """Get a list of names of all present unfinished simulations.""" return self._de_serializer.get_unfinished_sims() def load_unfinished(self, sim_name: str) -> None: """Load the state of an unfinished simulation its savefile. Warning: This function deletes the savefile after loading. """ assert self._de_serializer.unfinished_sim_present(sim_name) self._sim_name = sim_name self._simulator, self._metadata = self._de_serializer.read_state( sim_name) self._de_serializer.remove_unfinished_sim(sim_name) def _exit_gracefully(self, *args) -> None: """Handler called when the program is interrupted. Pauses and saves the currently running simulation.""" if self._sim_configured(): self._simulator.stop() self._de_serializer.save_state(self._simulator, self._sim_name, self._metadata) self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata) exit() def simulate(self) -> None: """Start the simulation. This is a blocking call.""" assert self._sim_configured() self._simulator.start() self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata)