diff --git a/sw/utility/simulation.py b/sw/utility/simulation.py index 3847a11..fc47606 100644 --- a/sw/utility/simulation.py +++ b/sw/utility/simulation.py @@ -25,105 +25,16 @@ def count_bit_errors(d: np.array, d_hat: np.array) -> int: return np.sum(d != d_hat) -# def test_decoder(n: int, -# k: int, -# decoder: typing.Any, -# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7), -# target_frame_errors: int = 100) \ -# -> typing.Tuple[np.array, np.array]: -# """Calculate the Bit Error Rate (BER) for a given decoder for a number of SNRs. -# -# This function assumes the all-zeros assumption holds. Progress is printed to stdout. -# -# :param n: Length of a codeword of the used code -# :param k: Length of a dataword of the used code -# :param decoder: Instance of the decoder to be tested -# :param SNRs: List of SNRs for which the BER should be calculated -# :param target_frame_errors: Number of frame errors after which to stop the simulation -# :param N_max: Maximum number of iterations to perform for each SNR -# :return: Tuple of numpy arrays of the form (SNRs, BERs) -# """ -# -# x = np.zeros(n) -# x_bpsk = 1 - 2 * x # Map x from [0, 1]^n to [-1, 1]^n -# -# BERs = [] -# for SNR in tqdm(SNRs, -# desc=f"Calculating BERs for {decoder.__class__.__name__}", -# position=1, -# leave=False, -# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): -# -# total_bit_errors = 0 -# total_bits = 0 -# total_frame_errors = 0 -# -# pbar = tqdm(total=target_frame_errors, -# desc=f"Simulating for SNR = {SNR} dB", -# position=2, -# leave=False, -# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") -# -# while total_frame_errors < target_frame_errors: -# # Simulate channel -# y = noise.add_awgn(x_bpsk, SNR, n, k) -# -# # Decode received frame -# x_hat = decoder.decode(y) -# -# # Calculate statistics -# bit_errors = count_bit_errors(x, x_hat) -# total_bits += x.size -# -# if bit_errors > 0: -# total_frame_errors += 1 -# total_bit_errors += bit_errors -# pbar.update(1) -# -# pbar.close() -# -# BERs.append(total_bit_errors / total_bits) -# -# return np.array(SNRs), np.array(BERs) -# -# -# def test_decoders(n: int, -# k: int, -# decoders: typing.List, -# SNRs: typing.Sequence[float] = np.linspace(1, 7, 7), -# target_frame_errors: int = 100) \ -# -> typing.Tuple[np.array, np.array]: -# """Calculate the Bit Error Rate (BER) for a number of given decoders for a number of SNRs. -# -# This function assumes the all-zeros assumption holds. Progress is printed to stdout. -# -# :param n: Length of a codeword of the used code -# :param k: Length of a dataword of the used code -# :param decoders: List of decoder objects to be tested -# :param SNRs: List of SNRs for which the BER should be calculated -# :param target_frame_errors: Number of frame errors after which to stop the simulation -# :return: Tuple of the form (SNRs, [BERs_1, BERs_2, ...]) where SNR and BERs_x are numpy arrays -# """ -# result_BERs = [] -# -# start_time = default_timer() -# -# for decoder in tqdm(decoders, -# desc="Calculating the answer to life, the universe and everything", -# position=0, -# leave=False, -# bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): -# _, BERs = test_decoder(n, k, decoder, SNRs, target_frame_errors) -# result_BERs.append(BERs) -# -# end_time = default_timer() -# print(f"Elapsed time: {end_time - start_time:.2f}s") -# -# return SNRs, result_BERs -# - - +# TODO: Write unit tests class Simulator: + """Class allowing for saving of simulations state. + + Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs. + + The functionality implemented by this class could be achieved by a bunch of loops and a function. + However, storing the state of the simulation as member variables allows for pausing and resuming the simulation + at a later state. + """ def __init__(self, n: int, k: int, decoders: typing.Sequence[typing.Any], SNRs: typing.Sequence[float], @@ -159,6 +70,7 @@ class Simulator: # Results & Miscellaneous self._sim_running = False + self._sim_done = False self._BERs = [[]] def _simulate_transmission(self) -> int: @@ -209,75 +121,70 @@ class Simulator: self._BERs.append([]) else: self._sim_running = False + self._sim_done = True def start(self) -> None: """Start the simulation. This is a blocking call. A call to the stop() function - from another thread will stop this function + from another thread will stop this function. """ - self._sim_running = True + if not self._sim_done: + self._sim_running = True - while self._sim_running: - bit_errors = self._simulate_transmission() - self._update_statistics(bit_errors) - self._advance_state() + while self._sim_running: + bit_errors = self._simulate_transmission() + self._update_statistics(bit_errors) + self._advance_state() def stop(self) -> None: """Stop the simulation.""" self._sim_running = False + @property + def simulation_done(self) -> bool: + """Check whether the simulation is still ongoing or completed. + + :return: True if the simulation is completed + """ + return self._sim_done + + # TODO: Make sure the length of each BER_array is the same as the number of SNRs @property def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]: """Get the current results. If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0. - :return: Tuple of numpy arrays of the form (SNRs, BERs) + :return: Tuple of numpy arrays of the form (SNRs, BERs), where BERs is a list of the form + [BER_decoder_1, BER_decoder_2, ...] """ SNRs = np.array(self._SNRs) - - # TODO: Make sure the length of each BER_array is the same as the number of SNRs BERs = [np.array(BER_array) for BER_array in self._BERs] return SNRs, BERs -@dataclass -class SimulationParameters: - n: int - k: int - decoders: typing.Sequence[typing.Any] - SNRs: typing.Sequence[float] - target_frame_errors: int - -@dataclass -class SimulationState: - """Data structure storing the state of the simulation.""" - num_frame_errors: int = 0 - num_bit_errors: int = 0 - num_total_bits: int = 0 - # simulation_time: float = 0 - - current_decoder_index = 0 - current_SNRs_index: int = 0 - - -# TODO: Make more generic # TODO: Remove save data after successful execution +# TODO: Write currently calculated BERs to file when simulation is stopped class SimulationManager: + """This class only contains functions relating to stopping and restarting of simulations + (and storing of the simulation state in a file, to be resumed at a later date). + + All actual work is outsourced to a provided simulator class. + """ def __init__(self, save_dir: str, results_dir: str): + """Construct a SimulationManager object. + + :param save_dir: Directory in which the simulation state of a paused simulation should be stored + :param results_dir: Directory in which the results of the simulation should be stored + """ self._save_dir = save_dir - self._sim_parameters_filepath = f"{self._save_dir}/sim_parameters.pickle" self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle" self._logs_filepath = f"{self._save_dir}/logs.txt" self._results_dir = results_dir - # TODO: Should the be none or SimulationParameters() and SimulationState() respectively? - self._sim_params = None - self._sim_state = None - - self._sim_running = False + self._simulator = None Path(self._save_dir).mkdir(parents=True, exist_ok=True) @@ -292,142 +199,64 @@ class SimulationManager: # def unfinished_simulation_present(self) -> bool: - return os.path.isfile(self._sim_parameters_filepath) \ - and os.path.isfile(self._sim_state_filepath) + """Check whether the savefile of a previously unfinished simulation is present.""" + return os.path.isfile(self._sim_state_filepath) - def continue_unfinished(self): + def load_unfinished(self): + """Load the state of an unfinished simulation its savefile.""" assert self.unfinished_simulation_present() - with open(self._sim_parameters_filepath, "rb") as file: - self._sim_params = pickle.load(file) + self._logger.info("Loading saved simulation state") + with open(self._sim_state_filepath, "rb") as file: - self._sim_state = pickle.load(file) - - self._logger.info("Loaded saved simulation state") - - self.start() + self._simulator = pickle.load(file) # TODO: Make sure old state is overwritten - def _save_state(self): - with open(self._sim_parameters_filepath, "wb") as file: - pickle.dump(self._sim_params, file) - with open(self._sim_state_filepath, "wb") as file: - pickle.dump(self._sim_state, file) + def _save_state(self) -> None: + """Write the state of the currently configured simulation to a savefile.""" + if self._simulator is not None: + with open(self._sim_state_filepath, "wb") as file: + pickle.dump(self._simulator, file) - self._logger.info("Saved simulation state") + self._logger.info("Saved simulation state") + else: + self._logger.info("No simulation state to save: simulator object is 'None'") - def _exit_gracefully(self, *args): + def _exit_gracefully(self, *args) -> None: + """Handler called when the program is interrupted. + + Pauses and saves the currently running simulation + """ self._logger.debug("Intercepted signal SIGINT/SIGTERM") - self._sim_running = False - - if (self._sim_params is not None) and (self._sim_state is not None): + if self._simulator is not None: + self._simulator.stop() self._save_state() # # Functions responsible for the actual simulation # - # def test_decoders(self, - # n: int, - # k: int, - # decoders: typing.Sequence[typing.Any], - # SNRs: typing.Sequence[float] = np.linspace(1, 7, 7), - # target_frame_errors: int = 100): - # """Calculate the Bit Error Rate (BER) for a number of given decoders for a number of SNRs. - # - # This function assumes the all-zeros assumption holds. Progress is printed to stdout. - # - # :param n: Length of a codeword of the used code - # :param k: Length of a dataword of the used code - # :param decoders: List of decoder objects to be tested - # :param SNRs: List of SNRs for which the BER should be calculated - # :param target_frame_errors: Number of frame errors after which to stop the simulation - # :return: Tuple of the form (SNRs, [BERs_1, BERs_2, ...]) where SNR and BERs_x are numpy arrays - # """ - # # TODO - # - # # Save simulation - # self._sim_parameters = SimulationMetaData(n, k, decoders, SNRs, target_frame_errors) - # self._sim_state = SimulationState() - # - # self._logger.info("Initialized new simulation state") - # - # # Simulation - # - # result_BERs = [] - # - # start_time = default_timer() - # - # for decoder in tqdm(decoders, - # desc="Calculating the answer to life, the universe and everything", - # position=0, - # leave=False, - # bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): - # _, BERs = self.test_decoder(n, k, decoder, SNRs, target_frame_errors) - # result_BERs.append(BERs) - # - # end_time = default_timer() - # print(f"Elapsed time: {end_time - start_time:.2f}s") - # - # return SNRs, result_BERs + def set_simulator(self, simulator: typing.Any) -> None: + """Select a simulator to do the actual work.""" + self._simulator = simulator - # def test_decoder(self, - # n: int, - # k: int, - # decoder: typing.Any, - # SNRs: typing.Sequence[float] = np.linspace(1, 7, 7), - # target_frame_errors: int = 100) \ - # -> typing.Tuple[np.array, np.array]: def start(self): - self._sim_running = True # TODO: Move this somewhere else + """Start the simulation. - decoder = self._sim_params.decoders[self._sim_state.current_decoder_index] + This is a blocking call. A call to the stop() function + from another thread will stop this function. + """ + assert self._simulator is not None - x = np.zeros(self._sim_params.n) - x_bpsk = 1 - 2 * x # Map x from [0, 1]^n to [-1, 1]^n + self._logger.info("Starting simulation") + self._simulator.start() - BERs = [] - for SNR in tqdm(self._sim_params.SNRs[self._sim_state.current_SNRs_index:], - desc=f"Calculating BERs for {decoder.__class__.__name__}", - position=1, - leave=False, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): + @property + def simulation_done(self): + """Check whether the configured simulation has been completed.""" + return self._simulator.simulation_done - pbar = tqdm(total=self._sim_params.target_frame_errors, - desc=f"Simulating for SNR = {SNR} dB", - position=2, - leave=False, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") - - pbar.update(self._sim_state.num_frame_errors) - - while self._sim_state.num_frame_errors < self._sim_params.target_frame_errors: - if not self._sim_running: - return - - # Simulate channel - y = noise.add_awgn(x_bpsk, SNR, self._sim_params.n, self._sim_params.k) - - # Decode received frame - x_hat = decoder.decode(y) - - # Calculate statistics - bit_errors = count_bit_errors(x, x_hat) - self._sim_state.num_total_bits += x.size - - if bit_errors > 0: - self._sim_state.num_frame_errors += 1 - self._sim_state.num_bit_errors += bit_errors - pbar.update(1) - - # TODO: Load BERs from file as well - BERs.append(self._sim_state.num_bit_errors / self._sim_state.num_total_bits) - - pbar.close() - self._sim_state.current_SNRs_index += 1 - self._sim_state.num_frame_errors = 0 - self._sim_state.num_bit_errors = 0 - self._sim_state.num_total_bits = 0 - -# return np.array(self._sim_params.SNRs), np.array(BERs) + def get_current_results(self) -> typing.Any: + """Get the current results of the configured simulation.""" + return self._simulator.SNRs_and_BERs