"""This file contains utility functions relating to tests and simulations of the decoders.""" import json import pandas as pd import numpy as np import typing from tqdm import tqdm import signal import pickle import os from pathlib import Path import platform from datetime import datetime import timeit from utility import noise, misc def count_bit_errors(d: np.array, d_hat: np.array) -> int: """Count the number of wrong bits in a decoded codeword. :param d: Originally sent data :param d_hat: Received data :return: Number of bit errors """ return np.sum(d != d_hat) # TODO: Write unit tests class Simulator: """Class allowing for saving of simulations state. Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. The functionality implemented by this class could be achieved by a bunch of loops and a function. However, storing the state of the simulation as member variables allows for pausing and resuming the simulation at a later time. """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], target_frame_errors: int, max_num_iterations: int): """Construct and object of type simulator. :param n: Number of bits in a codeword :param k: Number of bits in a dataword :param decoders: Sequence of decoders to test :param SNRs: Sequence of SNRs for which the BERs should be calculated :param target_frame_errors: Number of frame errors after which to stop the simulation """ # Simulation parameters self._n = n self._k = k self._decoders = decoders self._SNRs = SNRs self._target_frame_errors = target_frame_errors self._max_num_iterations = max_num_iterations self._x = np.zeros(self._n) self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n # Simulation state self._curr_decoder_index = 0 self._curr_SNRs_index = 0 self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 self._curr_num_dec_fails = 0 # Results & Miscellaneous self._BERs = [np.zeros(len(SNRs)) for i in range(len(decoders))] self._dec_fails = [np.zeros(len(SNRs)) for i in range(len(decoders))] self._create_pbars() self._sim_running = False def _create_pbars(self): self._overall_pbar = tqdm(total=len(self._decoders), desc="Calculating the answer to life, " "the universe and everything", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt} [{elapsed}]") decoder = self._decoders[self._curr_decoder_index] self._decoder_pbar = tqdm(total=len(self._SNRs), desc=f"Calculating" f"g BERs" f" for {decoder.__class__.__name__}", leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{" "total_fmt}") self._snr_pbar = tqdm(total=self._max_num_iterations, desc=f"Simulating for SNR = {self._SNRs[0]} dB", leave=False, ) def __getstate__(self) -> typing.Dict: """Custom serialization function called by the 'pickle' module when saving the state of a currently running simulation """ state = self.__dict__.copy() del state['_overall_pbar'] del state['_decoder_pbar'] del state['_snr_pbar'] return state def __setstate__(self, state) -> None: """Custom deserialization function called by the 'pickle' module when loading a previously saved simulation :param state: Dictionary storing the serialized version of an object of this class """ self.__dict__.update(state) self._create_pbars() self._overall_pbar.update(self._curr_decoder_index) self._decoder_pbar.update(self._curr_SNRs_index) self._snr_pbar.update(self._curr_num_frame_errors) self._overall_pbar.refresh() self._decoder_pbar.refresh() self._snr_pbar.refresh() def _simulate_transmission(self) -> int: """Simulate the transmission of a single codeword. :return: Number of bit errors that occurred """ SNR = self._SNRs[self._curr_SNRs_index] decoder = self._decoders[self._curr_decoder_index] y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k) x_hat = decoder.decode(y) # Handle decoding failure if x_hat is not None: return count_bit_errors(self._x, x_hat) else: self._curr_num_dec_fails += 1 return 0 def _update_statistics(self, bit_errors: int) -> None: """Update the statistics of the simulator. :param bit_errors: Number of bit errors that occurred during the last transmission """ self._curr_num_iterations += 1 self._snr_pbar.update(1) if bit_errors > 0: self._curr_num_frame_errors += 1 self._curr_num_bit_errors += bit_errors def _advance_state(self) -> None: """Advance the state of the simulator. This function also handles setting the result arrays and progress bars. """ if (self._curr_num_frame_errors >= self._target_frame_errors) or ( self._curr_num_iterations > self._max_num_iterations): # Adjust the number of iterations to ignore decoding failures adj_num_iterations = self._curr_num_iterations - \ self._curr_num_dec_fails if adj_num_iterations == 0: self._BERs[self._curr_decoder_index][self._curr_SNRs_index] = 1 else: self._BERs[self._curr_decoder_index][self._curr_SNRs_index] \ = self._curr_num_bit_errors / ( adj_num_iterations * self._n) self._dec_fails[self._curr_decoder_index][self._curr_SNRs_index] \ = self._curr_num_dec_fails self._curr_num_frame_errors = 0 self._curr_num_bit_errors = 0 self._curr_num_iterations = 0 self._curr_num_dec_fails = 0 if self._curr_SNRs_index < len(self._SNRs) - 1: self._curr_SNRs_index += 1 self._snr_pbar.reset() self._overall_pbar.refresh() self._snr_pbar.set_description( f"Simulating for SNR = " f"{self._SNRs[self._curr_SNRs_index]} dB") self._decoder_pbar.update(1) else: if self._curr_decoder_index < len(self._decoders) - 1: self._curr_decoder_index += 1 self._curr_SNRs_index = 0 self._decoder_pbar.reset() decoder = self._decoders[self._curr_decoder_index] self._decoder_pbar.set_description( f"Calculating BERs for {decoder.__class__.__name__}") self._overall_pbar.update(1) else: self._sim_running = False self._snr_pbar.close() self._decoder_pbar.close() self._overall_pbar.close() def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function from another thread will stop this function. """ self._sim_running = True while self._sim_running: bit_errors = self._simulate_transmission() self._update_statistics(bit_errors) self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False def get_current_results(self) -> pd.DataFrame: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. :return: pandas Dataframe with the columns ["SNR", "BER_1", "BER_2", ..., "DecFails_1", "DecFails_2", ...] """ data = {"SNR": np.array(self._SNRs)} for i, decoder_BERs in enumerate(self._BERs): data[f"BER_{i}"] = decoder_BERs for i, decoder_dec_fails in enumerate(self._dec_fails): data[f"DecFails_{i}"] = decoder_dec_fails return pd.DataFrame(data) class SimulationDeSerializer: """Class responsible for file management, de- and serialization of Simulator objects.""" def __init__(self, save_dir: str, results_dir: str): self._saves_dir = save_dir self._results_dir = results_dir Path(self._saves_dir).mkdir(parents=True, exist_ok=True) Path(self._results_dir).mkdir(parents=True, exist_ok=True) def _get_savefile_path(self, sim_name): return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle" def _get_metadata_path(self, sim_name): return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json" def _get_results_path(self, sim_name): return f"{self._results_dir}/{misc.slugify(sim_name)}.csv" def _read_metadata(self, sim_name) -> typing.Dict: with open(self._get_metadata_path(sim_name), 'r', encoding='utf-8') as f: return json.load(f) def _save_metadata(self, sim_name, metadata) -> None: with open(self._get_metadata_path(sim_name), 'w+', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=4) def unfinished_sim_present(self, sim_name: str): """Check if the savefile of a previously paused simulation is present. :param sim_name: Name :return: True if a paused simulation with the given name is found """ return os.path.isfile( self._get_savefile_path(sim_name)) and os.path.isfile( self._get_metadata_path(sim_name)) # TODO: Make the directories configurable in the init function def get_unfinished_sims(self) -> typing.List[str]: """Get a list unfinished simulations.""" save_files = [f for f in os.listdir(self._saves_dir) if os.path.isfile(os.path.join(self._saves_dir, f))] state_files = [f for f in save_files if f.endswith("_state.pickle")] sim_slugs = [f.removesuffix("_state.pickle") for f in state_files] sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs] return sim_names def remove_unfinished_sim(self, sim_name: str): """Remove the savefile of a previously paused simulation. :param sim_name: Name of the simulation """ os.remove(self._get_savefile_path(sim_name)) # os.remove(self._get_metadata_path(sim_name)) def save_state(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None: """Save the state of a currently running simulation. :param simulator: Simulator object :param sim_name: Name of the simulation :param metadata: Metadata to be saved besides the actual state """ # Save metadata self._save_metadata(sim_name, metadata) # Save simulation state with open(self._get_savefile_path(sim_name), "wb") as file: pickle.dump(simulator, file) def read_state(self, sim_name: str) -> typing.Tuple[ typing.Any, typing.Dict]: """Read the saved state of a paused simulation. :param sim_name: Name of the simulation :return: Tuple of the form (simulator, metadata) """ # Read metadata metadata = self._read_metadata(sim_name) # Read simulation state simulator = None with open(self._get_savefile_path(sim_name), "rb") as file: simulator = pickle.load(file) return simulator, metadata # TODO: Is the simulator object actually necessary here? def save_results(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None: """Save simulation results to file. :param simulator: Simulator object. Used to obtain the data :param sim_name: Name of the simulation. Determines the filename :param metadata: Metadata to be saved besides the actual simulation results """ # Save metadata self._save_metadata(sim_name, metadata) # Save results df = simulator.get_current_results() df.to_csv(self._get_results_path(sim_name)) def read_results(self, sim_name: str) -> typing.Tuple[ pd.DataFrame, typing.Dict]: """Read simulation results from file. :param sim_name: Name of the simulation. :return: Tuple of the form (data, metadata), where data is a pandas dataframe and metadata is a dict """ # Read metadata metadata = self._read_metadata(sim_name) # Read results results = pd.read_csv(self._get_results_path(sim_name)) return results, metadata # TODO: Autosave simulation every so often # TODO: Comment explaining what a Simulator class is class SimulationManager: """This class only contains functions relating to stopping and restarting of simulations (and storing of the simulation state in a file, to be resumed at a later date). All actual work is outsourced to a provided simulator class. """ def __init__(self, saves_dir: str, results_dir: str): """Construct a SimulationManager object. :param saves_dir: Directory in which the simulation state of a paused simulation should be stored :param results_dir: Directory in which the results of the simulation should be stored """ self._de_serializer = SimulationDeSerializer(saves_dir, results_dir) self._simulator = None self._sim_name = None self._metadata = {"duration": 0} self._sim_start_time = None signal.signal(signal.SIGINT, self._exit_gracefully) signal.signal(signal.SIGTERM, self._exit_gracefully) signal.signal(signal.SIGHUP, self._exit_gracefully) def _sim_configured(self) -> bool: """Check whether 'configure_simulation()' has been called.""" return (self._simulator is not None) and ( self._sim_name is not None) and ( self._metadata is not None) def configure_simulation(self, simulator: typing.Any, name: str, column_labels: typing.Sequence[str]) -> None: """Configure a new simulation.""" self._simulator = simulator self._sim_name = name self._metadata["name"] = name self._metadata["labels"] = column_labels self._metadata["platform"] = platform.platform() def get_unfinished(self) -> typing.List[str]: """Get a list of names of all present unfinished simulations.""" return self._de_serializer.get_unfinished_sims() def load_unfinished(self, sim_name: str) -> None: """Load the state of an unfinished simulation form its savefile. Warning: This function deletes the savefile after loading. """ assert self._de_serializer.unfinished_sim_present(sim_name) self._sim_name = sim_name self._simulator, self._metadata = self._de_serializer.read_state( sim_name) self._de_serializer.remove_unfinished_sim(sim_name) # TODO: Metadata is being written twice here. Should save_results() also # save the metadata? def _exit_gracefully(self, *args) -> None: """Handler called when the program is interrupted. Pauses and saves the currently running simulation.""" if self._sim_configured(): self._simulator.stop() self._metadata["end_time"] = f"{datetime.now(tz=None)}" self._metadata["duration"] \ += timeit.default_timer() - self._sim_start_time self._de_serializer.save_state(self._simulator, self._sim_name, self._metadata) self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata) exit() def simulate(self) -> None: """Start the simulation. This is a blocking call.""" assert self._sim_configured() self._sim_start_time = timeit.default_timer() self._simulator.start() self._metadata["end_time"] = f"{datetime.now(tz=None)}" self._metadata["duration"] \ += timeit.default_timer() - self._sim_start_time self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata)