ba-thesis/sw/utility/simulation.py

326 lines
11 KiB
Python

"""This file contains utility functions relating to tests and simulations of the decoders."""
import time
import numpy as np
import typing
from tqdm import tqdm
from timeit import default_timer
import signal
from dataclasses import dataclass
import pickle
import os.path
from pathlib import Path
import spdlog as spd
from utility import noise
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
"""Count the number of wrong bits in a decoded codeword.
:param d: Originally sent data
:param d_hat: Received data
:return: Number of bit errors
"""
return np.sum(d != d_hat)
# TODO: Write unit tests
class Simulator:
"""Class allowing for saving of simulations state.
Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs.
The functionality implemented by this class could be achieved by a bunch of loops and a function.
However, storing the state of the simulation as member variables allows for pausing and resuming the simulation
at a later state.
"""
def __init__(self, n: int, k: int,
decoders: typing.Sequence[typing.Any],
SNRs: typing.Sequence[float],
target_frame_errors: int):
"""Construct and object of type simulator.
:param n: Number of bits in a codeword
:param k: Number of bits in a dataword
:param decoders: Sequence of decoders to test
:param SNRs: Sequence of SNRs for which the BERs should be calculated
:param target_frame_errors: Number of frame errors after which to stop the simulation
"""
# Simulation parameters
self._n = n
self._k = k
self._decoders = decoders
self._SNRs = SNRs
self._target_frame_errors = target_frame_errors
self._x = np.zeros(self._n)
self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n
# Simulation state
self._current_decoder_index = 0
self._current_SNRs_index = 0
self._curr_num_frame_errors = 0
self._curr_num_bit_errors = 0
self._curr_num_iterations = 0
# Results & Miscellaneous
self._BERs = [[]]
self._create_pbars()
self._sim_running = False
self._sim_done = False
def _create_pbars(self):
self._overall_pbar = tqdm(total=len(self._decoders),
desc="Calculating the answer to life, the universe and everything",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}")
decoder = self._decoders[self._current_decoder_index]
self._decoder_pbar = tqdm(total=len(self._SNRs),
desc=f"Calculating BERs for {decoder.__class__.__name__}",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}")
self._snr_pbar = tqdm(total=self._target_frame_errors,
desc=f"Simulating for SNR = {self._SNRs[0]} dB",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
def __getstate__(self):
"""Custom serialization function called by the 'pickle' module
when saving the state of a currently running simulation
"""
state = self.__dict__.copy()
del state['_overall_pbar']
del state['_decoder_pbar']
del state['_snr_pbar']
return state
def __setstate__(self, state):
"""Custom deserialization function called by the 'pickle' module
when loading a previously saved simulation
"""
self.__dict__.update(state)
self._create_pbars()
self._overall_pbar.update(self._current_decoder_index)
self._decoder_pbar.update(self._current_SNRs_index)
self._snr_pbar.update(self._curr_num_frame_errors)
self._overall_pbar.refresh()
self._decoder_pbar.refresh()
self._snr_pbar.refresh()
def _simulate_transmission(self) -> int:
"""Simulate the transmission of a single codeword.
:return: Number of bit errors that occurred
"""
SNR = self._SNRs[self._current_SNRs_index]
decoder = self._decoders[self._current_decoder_index]
y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k)
x_hat = decoder.decode(y)
return count_bit_errors(self._x, x_hat)
def _update_statistics(self, bit_errors: int) -> None:
"""Update the statistics of the simulator.
:param bit_errors: Number of bit errors that occurred during the last transmission
"""
self._curr_num_iterations += 1
if bit_errors > 0:
self._curr_num_frame_errors += 1
self._curr_num_bit_errors += bit_errors
self._snr_pbar.update(1)
def _advance_state(self) -> None:
"""Advance the state of the simulator.
This function also appends a new BER value to the self._BERs array
if the number of target frame errors has been reached
"""
if self._curr_num_frame_errors >= self._target_frame_errors:
# TODO: Properly handle the multiple decoders
self._BERs[self._current_decoder_index] \
.append(self._curr_num_bit_errors / (self._curr_num_iterations * self._n))
self._curr_num_frame_errors = 0
self._curr_num_bit_errors = 0
self._curr_num_iterations = 0
if self._current_SNRs_index < len(self._SNRs) - 1:
self._current_SNRs_index += 1
self._snr_pbar.reset()
self._snr_pbar.set_description(f"Simulating for SNR = {self._SNRs[self._current_SNRs_index]} dB")
self._decoder_pbar.update(1)
else:
if self._current_decoder_index < len(self._decoders) - 1:
self._current_decoder_index += 1
self._current_SNRs_index = 0
self._BERs.append([])
self._decoder_pbar.reset()
decoder = self._decoders[self._current_decoder_index]
self._decoder_pbar.set_description(f"Calculating BERs for {decoder.__class__.__name__}")
self._overall_pbar.update(1)
else:
self._sim_running = False
self._sim_done = True
self._snr_pbar.close()
self._decoder_pbar.close()
self._overall_pbar.close()
def start(self) -> None:
"""Start the simulation.
This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
if not self._sim_done:
self._sim_running = True
while self._sim_running:
bit_errors = self._simulate_transmission()
self._update_statistics(bit_errors)
self._advance_state()
def stop(self) -> None:
"""Stop the simulation."""
self._sim_running = False
@property
def simulation_done(self) -> bool:
"""Check whether the simulation is still ongoing or completed.
:return: True if the simulation is completed
"""
return self._sim_done
# TODO: Make sure the length of each BER_array is the same as the number of SNRs
@property
def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]:
"""Get the current results.
If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0.
:return: Tuple of numpy arrays of the form (SNRs, BERs), where BERs is a list of the form
[BER_decoder_1, BER_decoder_2, ...]
"""
SNRs = np.array(self._SNRs)
BERs = [np.array(BER_array) for BER_array in self._BERs]
return SNRs, BERs
# TODO: Remove save data after successful execution
# TODO: Write currently calculated BERs to file when simulation is stopped
class SimulationManager:
"""This class only contains functions relating to stopping and restarting of simulations
(and storing of the simulation state in a file, to be resumed at a later date).
All actual work is outsourced to a provided simulator class.
"""
def __init__(self, save_dir: str, results_dir: str):
"""Construct a SimulationManager object.
:param save_dir: Directory in which the simulation state of a paused simulation should be stored
:param results_dir: Directory in which the results of the simulation should be stored
"""
self._save_dir = save_dir
self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle"
self._logs_filepath = f"{self._save_dir}/logs.txt"
self._results_dir = results_dir
self._simulator = None
Path(self._save_dir).mkdir(parents=True, exist_ok=True)
self._logger = spd.FileLogger("SimulationManager", self._logs_filepath)
self._logger.set_level(spd.LogLevel.DEBUG)
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
#
# Functions relating to the pausing and restarting of simulations
#
def unfinished_simulation_present(self) -> bool:
"""Check whether the savefile of a previously unfinished simulation is present."""
return os.path.isfile(self._sim_state_filepath)
def load_unfinished(self):
"""Load the state of an unfinished simulation its savefile."""
assert self.unfinished_simulation_present()
self._logger.info("Loading saved simulation state")
with open(self._sim_state_filepath, "rb") as file:
self._simulator = pickle.load(file)
# TODO: Make sure old state is overwritten
def _save_state(self) -> None:
"""Write the state of the currently configured simulation to a savefile."""
if self._simulator is not None:
with open(self._sim_state_filepath, "wb") as file:
pickle.dump(self._simulator, file)
self._logger.info("Saved simulation state")
else:
self._logger.info("No simulation state to save: simulator object is 'None'")
def _exit_gracefully(self, *args) -> None:
"""Handler called when the program is interrupted.
Pauses and saves the currently running simulation
"""
self._logger.debug("Intercepted signal SIGINT/SIGTERM")
if self._simulator is not None:
self._simulator.stop()
self._save_state()
#
# Functions responsible for the actual simulation
#
def set_simulator(self, simulator: typing.Any) -> None:
"""Select a simulator to do the actual work."""
self._simulator = simulator
def start(self):
"""Start the simulation.
This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
assert self._simulator is not None
self._logger.info("Starting simulation")
self._simulator.start()
@property
def simulation_done(self):
"""Check whether the configured simulation has been completed."""
return self._simulator.simulation_done
def get_current_results(self) -> typing.Any:
"""Get the current results of the configured simulation."""
return self._simulator.SNRs_and_BERs