ba-thesis/sw/utility/simulation.py

473 lines
17 KiB
Python

"""This file contains utility functions relating to tests and simulations of
the decoders."""
import json
import pandas as pd
import numpy as np
import typing
from tqdm import tqdm
import signal
import pickle
import os
from pathlib import Path
import platform
from datetime import datetime
import timeit
from utility import noise, misc
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
"""Count the number of wrong bits in a decoded codeword.
:param d: Originally sent data
:param d_hat: Received data
:return: Number of bit errors
"""
return np.sum(d != d_hat)
# TODO: Write unit tests
class Simulator:
"""Class allowing for saving of simulations state.
Given a list of decoders, this class allows for simulating the
Bit-Error-Rates of each decoder for various SNRs.
The functionality implemented by this class could be achieved by a bunch
of loops and a function. However, storing the state of the simulation as
member variables allows for pausing and resuming the simulation at a
later time.
"""
def __init__(self, n: int, k: int,
decoders: typing.Sequence[typing.Any],
SNRs: typing.Sequence[float],
target_frame_errors: int,
max_num_iterations: int):
"""Construct and object of type simulator.
:param n: Number of bits in a codeword
:param k: Number of bits in a dataword
:param decoders: Sequence of decoders to test
:param SNRs: Sequence of SNRs for which the BERs should be calculated
:param target_frame_errors: Number of frame errors after which to
stop the simulation
"""
# Simulation parameters
self._n = n
self._k = k
self._decoders = decoders
self._SNRs = SNRs
self._target_frame_errors = target_frame_errors
self._max_num_iterations = max_num_iterations
self._x = np.zeros(self._n)
self._x_bpsk = 1 - 2 * self._x # Map x from [0, 1]^n to [-1, 1]^n
# Simulation state
self._current_decoder_index = 0
self._current_SNRs_index = 0
self._curr_num_frame_errors = 0
self._curr_num_bit_errors = 0
self._curr_num_iterations = 0
# Results & Miscellaneous
self._BERs = [[]]
self._create_pbars()
self._sim_running = False
def _create_pbars(self):
self._overall_pbar = tqdm(total=len(self._decoders),
desc="Calculating the answer to life, "
"the universe and everything",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{"
"total_fmt}")
decoder = self._decoders[self._current_decoder_index]
self._decoder_pbar = tqdm(total=len(self._SNRs),
desc=f"Calculating"
f"g BERs"
f" for {decoder.__class__.__name__}",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{"
"total_fmt}")
self._snr_pbar = tqdm(total=self._target_frame_errors,
desc=f"Simulating for SNR = {self._SNRs[0]} dB",
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} "
"[{elapsed}<{remaining}]")
def __getstate__(self) -> typing.Dict:
"""Custom serialization function called by the 'pickle' module
when saving the state of a currently running simulation
"""
state = self.__dict__.copy()
del state['_overall_pbar']
del state['_decoder_pbar']
del state['_snr_pbar']
return state
def __setstate__(self, state) -> None:
"""Custom deserialization function called by the 'pickle' module
when loading a previously saved simulation
:param state: Dictionary storing the serialized version of an object
of this class
"""
self.__dict__.update(state)
self._create_pbars()
self._overall_pbar.update(self._current_decoder_index)
self._decoder_pbar.update(self._current_SNRs_index)
self._snr_pbar.update(self._curr_num_frame_errors)
self._overall_pbar.refresh()
self._decoder_pbar.refresh()
self._snr_pbar.refresh()
def _simulate_transmission(self) -> int:
"""Simulate the transmission of a single codeword.
:return: Number of bit errors that occurred
"""
SNR = self._SNRs[self._current_SNRs_index]
decoder = self._decoders[self._current_decoder_index]
y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k)
x_hat = decoder.decode(y)
return count_bit_errors(self._x, x_hat)
def _update_statistics(self, bit_errors: int) -> None:
"""Update the statistics of the simulator.
:param bit_errors: Number of bit errors that occurred during the
last transmission
"""
self._curr_num_iterations += 1
if bit_errors > 0:
self._curr_num_frame_errors += 1
self._curr_num_bit_errors += bit_errors
self._snr_pbar.update(1)
def _advance_state(self) -> None:
"""Advance the state of the simulator.
This function also appends a new BER value to the self._BERs array
if the number of target frame errors has been reached
"""
if (self._curr_num_frame_errors >= self._target_frame_errors) or (
self._curr_num_iterations > self._max_num_iterations):
self._BERs[self._current_decoder_index] \
.append(self._curr_num_bit_errors / (
self._curr_num_iterations * self._n))
self._curr_num_frame_errors = 0
self._curr_num_bit_errors = 0
self._curr_num_iterations = 0
if self._current_SNRs_index < len(self._SNRs) - 1:
self._current_SNRs_index += 1
self._snr_pbar.reset()
self._snr_pbar.set_description(
f"Simulating for SNR = "
f"{self._SNRs[self._current_SNRs_index]} dB")
self._decoder_pbar.update(1)
else:
if self._current_decoder_index < len(self._decoders) - 1:
self._current_decoder_index += 1
self._current_SNRs_index = 0
self._BERs.append([])
self._decoder_pbar.reset()
decoder = self._decoders[self._current_decoder_index]
self._decoder_pbar.set_description(
f"Calculating BERs for {decoder.__class__.__name__}")
self._overall_pbar.update(1)
else:
self._sim_running = False
self._snr_pbar.close()
self._decoder_pbar.close()
self._overall_pbar.close()
def start(self) -> None:
"""Start the simulation.
This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
self._sim_running = True
while self._sim_running:
bit_errors = self._simulate_transmission()
self._update_statistics(bit_errors)
self._advance_state()
def stop(self) -> None:
"""Stop the simulation."""
self._sim_running = False
def get_current_results(self) -> pd.DataFrame:
"""Get the current results.
If the simulation has not yet completed, the BERs which have not yet
been calculated are set to 0.
:return: pandas Dataframe with the columns ["SNR", "BER_1", "BER_2",
...]
"""
data = {"SNR": np.array(self._SNRs)}
# If the BERs of a decoder have not been calculated for all SNRs,
# fill the rest up with zeros to match the length of the 'SNRs' array
for i, decoder_BER_list in enumerate(self._BERs):
padded = np.pad(decoder_BER_list,
(0, len(self._SNRs) - len(decoder_BER_list)))
data[f"BER_{i}"] = padded
# If the BERs have not been calculated for all decoders, fill up the
# BERs list
# with zero-vectors to match the length of the 'decoders' list
for i in range(len(self._decoders), len(self._BERs)):
data[f"BER_{i}"] = np.zeros(len(self._SNRs))
return pd.DataFrame(data)
class SimulationDeSerializer:
"""Class responsible for file management, de- and serialization of
Simulator objects."""
def __init__(self, save_dir: str, results_dir: str):
self._saves_dir = save_dir
self._results_dir = results_dir
Path(self._saves_dir).mkdir(parents=True, exist_ok=True)
Path(self._results_dir).mkdir(parents=True, exist_ok=True)
def _get_savefile_path(self, sim_name):
return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle"
def _get_metadata_path(self, sim_name):
return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json"
def _get_results_path(self, sim_name):
return f"{self._results_dir}/{misc.slugify(sim_name)}.csv"
def _read_metadata(self, sim_name) -> typing.Dict:
with open(self._get_metadata_path(sim_name), 'r',
encoding='utf-8') as f:
return json.load(f)
def _save_metadata(self, sim_name, metadata) -> None:
with open(self._get_metadata_path(sim_name), 'w+',
encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=4)
def unfinished_sim_present(self, sim_name: str):
"""Check if the savefile of a previously paused simulation is
present.
:param sim_name: Name
:return: True if a paused simulation with the given name is found
"""
return os.path.isfile(
self._get_savefile_path(sim_name)) and os.path.isfile(
self._get_metadata_path(sim_name))
# TODO: Make the directories configurable in the init function
def get_unfinished_sims(self) -> typing.List[str]:
"""Get a list unfinished simulations."""
save_files = [f for f in os.listdir(self._saves_dir) if
os.path.isfile(os.path.join(self._saves_dir, f))]
state_files = [f for f in save_files if f.endswith("_state.pickle")]
sim_slugs = [f.removesuffix("_state.pickle") for f in state_files]
sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs]
return sim_names
def remove_unfinished_sim(self, sim_name: str):
"""Remove the savefile of a previously paused simulation.
:param sim_name: Name of the simulation
"""
os.remove(self._get_savefile_path(sim_name))
# os.remove(self._get_metadata_path(sim_name))
def save_state(self, simulator: typing.Any, sim_name: str,
metadata: typing.Dict) -> None:
"""Save the state of a currently running simulation.
:param simulator: Simulator object
:param sim_name: Name of the simulation
:param metadata: Metadata to be saved besides the actual state
"""
# Save metadata
self._save_metadata(sim_name, metadata)
# Save simulation state
with open(self._get_savefile_path(sim_name), "wb") as file:
pickle.dump(simulator, file)
def read_state(self, sim_name: str) -> typing.Tuple[
typing.Any, typing.Dict]:
"""Read the saved state of a paused simulation.
:param sim_name: Name of the simulation
:return: Tuple of the form (simulator, metadata)
"""
# Read metadata
metadata = self._read_metadata(sim_name)
# Read simulation state
simulator = None
with open(self._get_savefile_path(sim_name), "rb") as file:
simulator = pickle.load(file)
return simulator, metadata
# TODO: Is the simulator object actually necessary here?
def save_results(self, simulator: typing.Any, sim_name: str,
metadata: typing.Dict) -> None:
"""Save simulation results to file.
:param simulator: Simulator object. Used to obtain the data
:param sim_name: Name of the simulation. Determines the filename
:param metadata: Metadata to be saved besides the actual simulation
results
"""
# Save metadata
self._save_metadata(sim_name, metadata)
# Save results
df = simulator.get_current_results()
df.to_csv(self._get_results_path(sim_name))
def read_results(self, sim_name: str) -> typing.Tuple[
pd.DataFrame, typing.Dict]:
"""Read simulation results from file.
:param sim_name: Name of the simulation.
:return: Tuple of the form (data, metadata), where data is a pandas
dataframe and metadata is a dict
"""
# Read metadata
metadata = self._read_metadata(sim_name)
# Read results
results = pd.read_csv(self._get_results_path(sim_name))
return results, metadata
# TODO: Autosave simulation every so often
# TODO: Comment explaining what a Simulator class is
class SimulationManager:
"""This class only contains functions relating to stopping and
restarting of simulations (and storing of the simulation state in a
file, to be resumed at a later date).
All actual work is outsourced to a provided simulator class.
"""
def __init__(self, saves_dir: str, results_dir: str):
"""Construct a SimulationManager object.
:param saves_dir: Directory in which the simulation state of a paused
simulation should be stored
:param results_dir: Directory in which the results of the simulation
should be stored
"""
self._de_serializer = SimulationDeSerializer(saves_dir, results_dir)
self._simulator = None
self._sim_name = None
self._metadata = {"duration": 0}
self._sim_start_time = None
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGHUP, self._exit_gracefully)
def _sim_configured(self) -> bool:
"""Check whether 'configure_simulation()' has been called."""
return (self._simulator is not None) and (
self._sim_name is not None) and (
self._metadata is not None)
def configure_simulation(self, simulator: typing.Any, name: str,
column_labels: typing.Sequence[str]) -> None:
"""Configure a new simulation."""
self._simulator = simulator
self._sim_name = name
self._metadata["name"] = name
self._metadata["labels"] = column_labels
self._metadata["platform"] = platform.platform()
def get_unfinished(self) -> typing.List[str]:
"""Get a list of names of all present unfinished simulations."""
return self._de_serializer.get_unfinished_sims()
def load_unfinished(self, sim_name: str) -> None:
"""Load the state of an unfinished simulation form its savefile.
Warning: This function deletes the savefile after loading.
"""
assert self._de_serializer.unfinished_sim_present(sim_name)
self._sim_name = sim_name
self._simulator, self._metadata = self._de_serializer.read_state(
sim_name)
self._de_serializer.remove_unfinished_sim(sim_name)
# TODO: Metadata is being written twice here. Should save_results() also
# save the metadata?
def _exit_gracefully(self, *args) -> None:
"""Handler called when the program is interrupted. Pauses and saves
the currently running simulation."""
if self._sim_configured():
self._simulator.stop()
self._metadata["end_time"] = f"{datetime.now(tz=None)}"
self._metadata["duration"] \
+= timeit.default_timer() - self._sim_start_time
self._de_serializer.save_state(self._simulator, self._sim_name,
self._metadata)
self._de_serializer.save_results(self._simulator, self._sim_name,
self._metadata)
exit()
def simulate(self) -> None:
"""Start the simulation. This is a blocking call."""
assert self._sim_configured()
self._sim_start_time = timeit.default_timer()
self._simulator.start()
self._metadata["end_time"] = f"{datetime.now(tz=None)}"
self._metadata["duration"] \
+= timeit.default_timer() - self._sim_start_time
self._de_serializer.save_results(self._simulator, self._sim_name,
self._metadata)