Modified the SimulationManager to work with the newly implemented Simulator; Added doc comments

This commit is contained in:
Andreas Tsouchlos 2022-11-15 17:26:14 +01:00
parent c24a36db07
commit e9cd215457

View File

@ -25,105 +25,16 @@ def count_bit_errors(d: np.array, d_hat: np.array) -> int:
return np.sum(d != d_hat) return np.sum(d != d_hat)
# def test_decoder(n: int, # TODO: Write unit tests
# k: int,
# decoder: typing.Any,
# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7),
# target_frame_errors: int = 100) \
# -> typing.Tuple[np.array, np.array]:
# """Calculate the Bit Error Rate (BER) for a given decoder for a number of SNRs.
#
# This function assumes the all-zeros assumption holds. Progress is printed to stdout.
#
# :param n: Length of a codeword of the used code
# :param k: Length of a dataword of the used code
# :param decoder: Instance of the decoder to be tested
# :param SNRs: List of SNRs for which the BER should be calculated
# :param target_frame_errors: Number of frame errors after which to stop the simulation
# :param N_max: Maximum number of iterations to perform for each SNR
# :return: Tuple of numpy arrays of the form (SNRs, BERs)
# """
#
# x = np.zeros(n)
# x_bpsk = 1 - 2 * x # Map x from [0, 1]^n to [-1, 1]^n
#
# BERs = []
# for SNR in tqdm(SNRs,
# desc=f"Calculating BERs for {decoder.__class__.__name__}",
# position=1,
# leave=False,
# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
#
# total_bit_errors = 0
# total_bits = 0
# total_frame_errors = 0
#
# pbar = tqdm(total=target_frame_errors,
# desc=f"Simulating for SNR = {SNR} dB",
# position=2,
# leave=False,
# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
#
# while total_frame_errors < target_frame_errors:
# # Simulate channel
# y = noise.add_awgn(x_bpsk, SNR, n, k)
#
# # Decode received frame
# x_hat = decoder.decode(y)
#
# # Calculate statistics
# bit_errors = count_bit_errors(x, x_hat)
# total_bits += x.size
#
# if bit_errors > 0:
# total_frame_errors += 1
# total_bit_errors += bit_errors
# pbar.update(1)
#
# pbar.close()
#
# BERs.append(total_bit_errors / total_bits)
#
# return np.array(SNRs), np.array(BERs)
#
#
# def test_decoders(n: int,
# k: int,
# decoders: typing.List,
# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7),
# target_frame_errors: int = 100) \
# -> typing.Tuple[np.array, np.array]:
# """Calculate the Bit Error Rate (BER) for a number of given decoders for a number of SNRs.
#
# This function assumes the all-zeros assumption holds. Progress is printed to stdout.
#
# :param n: Length of a codeword of the used code
# :param k: Length of a dataword of the used code
# :param decoders: List of decoder objects to be tested
# :param SNRs: List of SNRs for which the BER should be calculated
# :param target_frame_errors: Number of frame errors after which to stop the simulation
# :return: Tuple of the form (SNRs, [BERs_1, BERs_2, ...]) where SNR and BERs_x are numpy arrays
# """
# result_BERs = []
#
# start_time = default_timer()
#
# for decoder in tqdm(decoders,
# desc="Calculating the answer to life, the universe and everything",
# position=0,
# leave=False,
# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
# _, BERs = test_decoder(n, k, decoder, SNRs, target_frame_errors)
# result_BERs.append(BERs)
#
# end_time = default_timer()
# print(f"Elapsed time: {end_time - start_time:.2f}s")
#
# return SNRs, result_BERs
#
class Simulator: class Simulator:
"""Class allowing for saving of simulations state.
Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs.
The functionality implemented by this class could be achieved by a bunch of loops and a function.
However, storing the state of the simulation as member variables allows for pausing and resuming the simulation
at a later state.
"""
def __init__(self, n: int, k: int, def __init__(self, n: int, k: int,
decoders: typing.Sequence[typing.Any], decoders: typing.Sequence[typing.Any],
SNRs: typing.Sequence[float], SNRs: typing.Sequence[float],
@ -159,6 +70,7 @@ class Simulator:
# Results & Miscellaneous # Results & Miscellaneous
self._sim_running = False self._sim_running = False
self._sim_done = False
self._BERs = [[]] self._BERs = [[]]
def _simulate_transmission(self) -> int: def _simulate_transmission(self) -> int:
@ -209,75 +121,70 @@ class Simulator:
self._BERs.append([]) self._BERs.append([])
else: else:
self._sim_running = False self._sim_running = False
self._sim_done = True
def start(self) -> None: def start(self) -> None:
"""Start the simulation. """Start the simulation.
This is a blocking call. A call to the stop() function This is a blocking call. A call to the stop() function
from another thread will stop this function from another thread will stop this function.
""" """
self._sim_running = True if not self._sim_done:
self._sim_running = True
while self._sim_running: while self._sim_running:
bit_errors = self._simulate_transmission() bit_errors = self._simulate_transmission()
self._update_statistics(bit_errors) self._update_statistics(bit_errors)
self._advance_state() self._advance_state()
def stop(self) -> None: def stop(self) -> None:
"""Stop the simulation.""" """Stop the simulation."""
self._sim_running = False self._sim_running = False
@property
def simulation_done(self) -> bool:
"""Check whether the simulation is still ongoing or completed.
:return: True if the simulation is completed
"""
return self._sim_done
# TODO: Make sure the length of each BER_array is the same as the number of SNRs
@property @property
def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]: def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]:
"""Get the current results. """Get the current results.
If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0.
:return: Tuple of numpy arrays of the form (SNRs, BERs) :return: Tuple of numpy arrays of the form (SNRs, BERs), where BERs is a list of the form
[BER_decoder_1, BER_decoder_2, ...]
""" """
SNRs = np.array(self._SNRs) SNRs = np.array(self._SNRs)
# TODO: Make sure the length of each BER_array is the same as the number of SNRs
BERs = [np.array(BER_array) for BER_array in self._BERs] BERs = [np.array(BER_array) for BER_array in self._BERs]
return SNRs, BERs return SNRs, BERs
@dataclass
class SimulationParameters:
n: int
k: int
decoders: typing.Sequence[typing.Any]
SNRs: typing.Sequence[float]
target_frame_errors: int
@dataclass
class SimulationState:
"""Data structure storing the state of the simulation."""
num_frame_errors: int = 0
num_bit_errors: int = 0
num_total_bits: int = 0
# simulation_time: float = 0
current_decoder_index = 0
current_SNRs_index: int = 0
# TODO: Make more generic
# TODO: Remove save data after successful execution # TODO: Remove save data after successful execution
# TODO: Write currently calculated BERs to file when simulation is stopped
class SimulationManager: class SimulationManager:
"""This class only contains functions relating to stopping and restarting of simulations
(and storing of the simulation state in a file, to be resumed at a later date).
All actual work is outsourced to a provided simulator class.
"""
def __init__(self, save_dir: str, results_dir: str): def __init__(self, save_dir: str, results_dir: str):
"""Construct a SimulationManager object.
:param save_dir: Directory in which the simulation state of a paused simulation should be stored
:param results_dir: Directory in which the results of the simulation should be stored
"""
self._save_dir = save_dir self._save_dir = save_dir
self._sim_parameters_filepath = f"{self._save_dir}/sim_parameters.pickle"
self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle" self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle"
self._logs_filepath = f"{self._save_dir}/logs.txt" self._logs_filepath = f"{self._save_dir}/logs.txt"
self._results_dir = results_dir self._results_dir = results_dir
# TODO: Should the be none or SimulationParameters() and SimulationState() respectively? self._simulator = None
self._sim_params = None
self._sim_state = None
self._sim_running = False
Path(self._save_dir).mkdir(parents=True, exist_ok=True) Path(self._save_dir).mkdir(parents=True, exist_ok=True)
@ -292,142 +199,64 @@ class SimulationManager:
# #
def unfinished_simulation_present(self) -> bool: def unfinished_simulation_present(self) -> bool:
return os.path.isfile(self._sim_parameters_filepath) \ """Check whether the savefile of a previously unfinished simulation is present."""
and os.path.isfile(self._sim_state_filepath) return os.path.isfile(self._sim_state_filepath)
def continue_unfinished(self): def load_unfinished(self):
"""Load the state of an unfinished simulation its savefile."""
assert self.unfinished_simulation_present() assert self.unfinished_simulation_present()
with open(self._sim_parameters_filepath, "rb") as file: self._logger.info("Loading saved simulation state")
self._sim_params = pickle.load(file)
with open(self._sim_state_filepath, "rb") as file: with open(self._sim_state_filepath, "rb") as file:
self._sim_state = pickle.load(file) self._simulator = pickle.load(file)
self._logger.info("Loaded saved simulation state")
self.start()
# TODO: Make sure old state is overwritten # TODO: Make sure old state is overwritten
def _save_state(self): def _save_state(self) -> None:
with open(self._sim_parameters_filepath, "wb") as file: """Write the state of the currently configured simulation to a savefile."""
pickle.dump(self._sim_params, file) if self._simulator is not None:
with open(self._sim_state_filepath, "wb") as file: with open(self._sim_state_filepath, "wb") as file:
pickle.dump(self._sim_state, file) pickle.dump(self._simulator, file)
self._logger.info("Saved simulation state") self._logger.info("Saved simulation state")
else:
self._logger.info("No simulation state to save: simulator object is 'None'")
def _exit_gracefully(self, *args): def _exit_gracefully(self, *args) -> None:
"""Handler called when the program is interrupted.
Pauses and saves the currently running simulation
"""
self._logger.debug("Intercepted signal SIGINT/SIGTERM") self._logger.debug("Intercepted signal SIGINT/SIGTERM")
self._sim_running = False if self._simulator is not None:
self._simulator.stop()
if (self._sim_params is not None) and (self._sim_state is not None):
self._save_state() self._save_state()
# #
# Functions responsible for the actual simulation # Functions responsible for the actual simulation
# #
# def test_decoders(self, def set_simulator(self, simulator: typing.Any) -> None:
# n: int, """Select a simulator to do the actual work."""
# k: int, self._simulator = simulator
# decoders: typing.Sequence[typing.Any],
# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7),
# target_frame_errors: int = 100):
# """Calculate the Bit Error Rate (BER) for a number of given decoders for a number of SNRs.
#
# This function assumes the all-zeros assumption holds. Progress is printed to stdout.
#
# :param n: Length of a codeword of the used code
# :param k: Length of a dataword of the used code
# :param decoders: List of decoder objects to be tested
# :param SNRs: List of SNRs for which the BER should be calculated
# :param target_frame_errors: Number of frame errors after which to stop the simulation
# :return: Tuple of the form (SNRs, [BERs_1, BERs_2, ...]) where SNR and BERs_x are numpy arrays
# """
# # TODO
#
# # Save simulation
# self._sim_parameters = SimulationMetaData(n, k, decoders, SNRs, target_frame_errors)
# self._sim_state = SimulationState()
#
# self._logger.info("Initialized new simulation state")
#
# # Simulation
#
# result_BERs = []
#
# start_time = default_timer()
#
# for decoder in tqdm(decoders,
# desc="Calculating the answer to life, the universe and everything",
# position=0,
# leave=False,
# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
# _, BERs = self.test_decoder(n, k, decoder, SNRs, target_frame_errors)
# result_BERs.append(BERs)
#
# end_time = default_timer()
# print(f"Elapsed time: {end_time - start_time:.2f}s")
#
# return SNRs, result_BERs
# def test_decoder(self,
# n: int,
# k: int,
# decoder: typing.Any,
# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7),
# target_frame_errors: int = 100) \
# -> typing.Tuple[np.array, np.array]:
def start(self): def start(self):
self._sim_running = True # TODO: Move this somewhere else """Start the simulation.
decoder = self._sim_params.decoders[self._sim_state.current_decoder_index] This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
assert self._simulator is not None
x = np.zeros(self._sim_params.n) self._logger.info("Starting simulation")
x_bpsk = 1 - 2 * x # Map x from [0, 1]^n to [-1, 1]^n self._simulator.start()
BERs = [] @property
for SNR in tqdm(self._sim_params.SNRs[self._sim_state.current_SNRs_index:], def simulation_done(self):
desc=f"Calculating BERs for {decoder.__class__.__name__}", """Check whether the configured simulation has been completed."""
position=1, return self._simulator.simulation_done
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
pbar = tqdm(total=self._sim_params.target_frame_errors, def get_current_results(self) -> typing.Any:
desc=f"Simulating for SNR = {SNR} dB", """Get the current results of the configured simulation."""
position=2, return self._simulator.SNRs_and_BERs
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
pbar.update(self._sim_state.num_frame_errors)
while self._sim_state.num_frame_errors < self._sim_params.target_frame_errors:
if not self._sim_running:
return
# Simulate channel
y = noise.add_awgn(x_bpsk, SNR, self._sim_params.n, self._sim_params.k)
# Decode received frame
x_hat = decoder.decode(y)
# Calculate statistics
bit_errors = count_bit_errors(x, x_hat)
self._sim_state.num_total_bits += x.size
if bit_errors > 0:
self._sim_state.num_frame_errors += 1
self._sim_state.num_bit_errors += bit_errors
pbar.update(1)
# TODO: Load BERs from file as well
BERs.append(self._sim_state.num_bit_errors / self._sim_state.num_total_bits)
pbar.close()
self._sim_state.current_SNRs_index += 1
self._sim_state.num_frame_errors = 0
self._sim_state.num_bit_errors = 0
self._sim_state.num_total_bits = 0
# return np.array(self._sim_params.SNRs), np.array(BERs)