From b33a0735f00f0c86884c394fe5118e6fac895e20 Mon Sep 17 00:00:00 2001 From: Andreas Tsouchlos Date: Fri, 25 Nov 2022 19:34:19 +0100 Subject: [PATCH] Put simulation code into separate module --- sw/main.py | 14 +- sw/plot_results.py | 5 +- sw/test/test_utility.py | 20 +- sw/utility/simulation/__init__.py | 2 + sw/utility/simulation/management.py | 236 +++++++++++++++++ .../simulators.py} | 238 +----------------- 6 files changed, 252 insertions(+), 263 deletions(-) create mode 100644 sw/utility/simulation/__init__.py create mode 100644 sw/utility/simulation/management.py rename sw/utility/{simulation.py => simulation/simulators.py} (51%) diff --git a/sw/main.py b/sw/main.py index b2583ff..1ba512f 100644 --- a/sw/main.py +++ b/sw/main.py @@ -2,10 +2,12 @@ import numpy as np from decoders import proximal, maximum_likelihood from cpp_modules import cpp_decoders -from utility import simulation, codes +from utility import codes +from utility.simulation import SimulationManager +from utility.simulation.simulators import ProximalDecoderSimulator -def start_new_simulation(sim_mgr: simulation.SimulationManager): +def start_new_simulation(sim_mgr: SimulationManager): sim_name = "test" # H = codes.Gs["Hamming_7_4"] @@ -24,7 +26,7 @@ def start_new_simulation(sim_mgr: simulation.SimulationManager): cpp_decoders.ProximalDecoder(H.astype('int32'), gamma=0.01, K=1000, omega=1e-4, eta=1.5), cpp_decoders.ProximalDecoder(H.astype('int32'), gamma=0.05, K=1000, - omega=5*1e-5, eta=1.5), + omega=5 * 1e-5, eta=1.5), cpp_decoders.ProximalDecoder(H.astype('int32'), gamma=0.15, K=1000, omega=1e-4, eta=1.5) ] @@ -35,7 +37,7 @@ def start_new_simulation(sim_mgr: simulation.SimulationManager): "proximal $\\gamma = 0.15$" ] - sim = simulation.ProximalDecoderSimulator(n=n, k=k, decoders=decoders, + sim = ProximalDecoderSimulator(n=n, k=k, decoders=decoders, target_frame_errors=100, SNRs=np.arange(1, 6, 0.5), max_num_iterations=3000) @@ -51,8 +53,8 @@ def main(): results_dir = "sim_results" saves_dir = "sim_saves" - sim_mgr = simulation.SimulationManager(results_dir=results_dir, - saves_dir=saves_dir) + sim_mgr = SimulationManager(results_dir=results_dir, + saves_dir=saves_dir) # Calculate BERs diff --git a/sw/plot_results.py b/sw/plot_results.py index a894aef..b43f28c 100644 --- a/sw/plot_results.py +++ b/sw/plot_results.py @@ -3,7 +3,8 @@ import typing import matplotlib.pyplot as plt import seaborn as sns import os -from utility import visualization, simulation +from utility import visualization +from utility.simulation import SimulationDeSerializer # TODO: This should be the responsibility of the DeSerializer @@ -26,7 +27,7 @@ def plot_results() -> None: slugs = get_sim_slugs(results_dir) - deserializer = simulation.SimulationDeSerializer(save_dir=saves_dir, + deserializer = SimulationDeSerializer(save_dir=saves_dir, results_dir=results_dir) # Read data diff --git a/sw/test/test_utility.py b/sw/test/test_utility.py index 0cd4cd1..2a589ed 100644 --- a/sw/test/test_utility.py +++ b/sw/test/test_utility.py @@ -1,25 +1,7 @@ import unittest import numpy as np -from utility import simulation, noise, codes - - -class CountBitErrorsTestCase(unittest.TestCase): - """Test case for bit error counting.""" - - def test_count_bit_errors(self): - d1 = np.array([0, 0, 0, 0]) - y_hat1 = np.array([0, 1, 0, 1]) - - d2 = np.array([0, 0, 0, 0]) - y_hat2 = np.array([0, 0, 0, 0]) - - d3 = np.array([0, 0, 0, 0]) - y_hat3 = np.array([1, 1, 1, 1]) - - self.assertEqual(simulation.count_bit_errors(d1, y_hat1), 2) - self.assertEqual(simulation.count_bit_errors(d2, y_hat2), 0) - self.assertEqual(simulation.count_bit_errors(d3, y_hat3), 4) +from utility import noise, codes # TODO: Rewrite tests for new SNR calculation diff --git a/sw/utility/simulation/__init__.py b/sw/utility/simulation/__init__.py new file mode 100644 index 0000000..f3d224e --- /dev/null +++ b/sw/utility/simulation/__init__.py @@ -0,0 +1,2 @@ +from utility.simulation.management import SimulationManager, \ + SimulationDeSerializer diff --git a/sw/utility/simulation/management.py b/sw/utility/simulation/management.py new file mode 100644 index 0000000..15b9008 --- /dev/null +++ b/sw/utility/simulation/management.py @@ -0,0 +1,236 @@ +import json +import pandas as pd +import typing +import signal +import pickle +import os +from pathlib import Path +import platform +from datetime import datetime +import timeit + +from utility import misc + + +class SimulationDeSerializer: + """Class responsible for file management, de- and serialization of + Simulator objects.""" + + def __init__(self, save_dir: str, results_dir: str): + self._saves_dir = save_dir + self._results_dir = results_dir + + Path(self._saves_dir).mkdir(parents=True, exist_ok=True) + Path(self._results_dir).mkdir(parents=True, exist_ok=True) + + def _get_savefile_path(self, sim_name): + return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle" + + def _get_metadata_path(self, sim_name): + return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json" + + def _get_results_path(self, sim_name): + return f"{self._results_dir}/{misc.slugify(sim_name)}.csv" + + def _read_metadata(self, sim_name) -> typing.Dict: + with open(self._get_metadata_path(sim_name), 'r', + encoding='utf-8') as f: + return json.load(f) + + def _save_metadata(self, sim_name, metadata) -> None: + with open(self._get_metadata_path(sim_name), 'w+', + encoding='utf-8') as f: + json.dump(metadata, f, ensure_ascii=False, indent=4) + + def unfinished_sim_present(self, sim_name: str): + """Check if the savefile of a previously paused simulation is + present. + + :param sim_name: Name + :return: True if a paused simulation with the given name is found + """ + return os.path.isfile( + self._get_savefile_path(sim_name)) and os.path.isfile( + self._get_metadata_path(sim_name)) + + # TODO: Make the directories configurable in the init function + def get_unfinished_sims(self) -> typing.List[str]: + """Get a list unfinished simulations.""" + save_files = [f for f in os.listdir(self._saves_dir) if + os.path.isfile(os.path.join(self._saves_dir, f))] + + state_files = [f for f in save_files if f.endswith("_state.pickle")] + sim_slugs = [f.removesuffix("_state.pickle") for f in state_files] + + sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs] + + return sim_names + + def remove_unfinished_sim(self, sim_name: str): + """Remove the savefile of a previously paused simulation. + + :param sim_name: Name of the simulation + """ + os.remove(self._get_savefile_path(sim_name)) + # os.remove(self._get_metadata_path(sim_name)) + + def save_state(self, simulator: typing.Any, sim_name: str, + metadata: typing.Dict) -> None: + """Save the state of a currently running simulation. + + :param simulator: Simulator object + :param sim_name: Name of the simulation + :param metadata: Metadata to be saved besides the actual state + """ + # Save metadata + self._save_metadata(sim_name, metadata) + + # Save simulation state + with open(self._get_savefile_path(sim_name), "wb") as file: + pickle.dump(simulator, file) + + def read_state(self, sim_name: str) -> typing.Tuple[ + typing.Any, typing.Dict]: + """Read the saved state of a paused simulation. + + :param sim_name: Name of the simulation + :return: Tuple of the form (simulator, metadata) + """ + # Read metadata + metadata = self._read_metadata(sim_name) + + # Read simulation state + simulator = None + with open(self._get_savefile_path(sim_name), "rb") as file: + simulator = pickle.load(file) + + return simulator, metadata + + # TODO: Is the simulator object actually necessary here? + def save_results(self, simulator: typing.Any, sim_name: str, + metadata: typing.Dict) -> None: + """Save simulation results to file. + + :param simulator: Simulator object. Used to obtain the data + :param sim_name: Name of the simulation. Determines the filename + :param metadata: Metadata to be saved besides the actual simulation + results + """ + # Save metadata + self._save_metadata(sim_name, metadata) + + # Save results + df = simulator.get_current_results() + df.to_csv(self._get_results_path(sim_name)) + + def read_results(self, sim_name: str) -> typing.Tuple[ + pd.DataFrame, typing.Dict]: + """Read simulation results from file. + + :param sim_name: Name of the simulation. + :return: Tuple of the form (data, metadata), where data is a pandas + dataframe and metadata is a dict + """ + # Read metadata + metadata = self._read_metadata(sim_name) + + # Read results + results = pd.read_csv(self._get_results_path(sim_name)) + + return results, metadata + + +# TODO: Autosave simulation every so often +# TODO: Comment explaining what a Simulator class is +class SimulationManager: + """This class only contains functions relating to stopping and + restarting of simulations (and storing of the simulation state in a + file, to be resumed at a later date). + + All actual work is outsourced to a provided simulator class. + """ + + def __init__(self, saves_dir: str, results_dir: str): + """Construct a SimulationManager object. + + :param saves_dir: Directory in which the simulation state of a paused + simulation should be stored + :param results_dir: Directory in which the results of the simulation + should be stored + """ + self._de_serializer = SimulationDeSerializer(saves_dir, results_dir) + + self._simulator = None + self._sim_name = None + self._metadata = {"duration": 0} + self._sim_start_time = None + + signal.signal(signal.SIGINT, self._exit_gracefully) + signal.signal(signal.SIGTERM, self._exit_gracefully) + signal.signal(signal.SIGHUP, self._exit_gracefully) + + def _sim_configured(self) -> bool: + """Check whether 'configure_simulation()' has been called.""" + return (self._simulator is not None) and ( + self._sim_name is not None) and ( + self._metadata is not None) + + def configure_simulation(self, simulator: typing.Any, name: str, + column_labels: typing.Sequence[str]) -> None: + """Configure a new simulation.""" + self._simulator = simulator + self._sim_name = name + self._metadata["name"] = name + self._metadata["labels"] = column_labels + self._metadata["platform"] = platform.platform() + + def get_unfinished(self) -> typing.List[str]: + """Get a list of names of all present unfinished simulations.""" + return self._de_serializer.get_unfinished_sims() + + def load_unfinished(self, sim_name: str) -> None: + """Load the state of an unfinished simulation form its savefile. + + Warning: This function deletes the savefile after loading. + """ + assert self._de_serializer.unfinished_sim_present(sim_name) + + self._sim_name = sim_name + self._simulator, self._metadata = self._de_serializer.read_state( + sim_name) + + self._de_serializer.remove_unfinished_sim(sim_name) + + # TODO: Metadata is being written twice here. Should save_results() also + # save the metadata? + def _exit_gracefully(self, *args) -> None: + """Handler called when the program is interrupted. Pauses and saves + the currently running simulation.""" + if self._sim_configured(): + self._simulator.stop() + + self._metadata["end_time"] = f"{datetime.now(tz=None)}" + self._metadata["duration"] \ + += timeit.default_timer() - self._sim_start_time + + self._de_serializer.save_state(self._simulator, self._sim_name, + self._metadata) + self._de_serializer.save_results(self._simulator, self._sim_name, + self._metadata) + + exit() + + def simulate(self) -> None: + """Start the simulation. This is a blocking call.""" + assert self._sim_configured() + + self._sim_start_time = timeit.default_timer() + + self._simulator.start() + + self._metadata["end_time"] = f"{datetime.now(tz=None)}" + self._metadata["duration"] \ + += timeit.default_timer() - self._sim_start_time + + self._de_serializer.save_results(self._simulator, self._sim_name, + self._metadata) diff --git a/sw/utility/simulation.py b/sw/utility/simulation/simulators.py similarity index 51% rename from sw/utility/simulation.py rename to sw/utility/simulation/simulators.py index ddb6ab1..a8f69a6 100644 --- a/sw/utility/simulation.py +++ b/sw/utility/simulation/simulators.py @@ -1,19 +1,9 @@ -"""This file contains utility functions relating to tests and simulations of -the decoders.""" -import json import pandas as pd import numpy as np import typing from tqdm import tqdm -import signal -import pickle -import os -from pathlib import Path -import platform -from datetime import datetime -import timeit -from utility import noise, misc +from utility import noise def count_bit_errors(d: np.array, d_hat: np.array) -> int: @@ -262,228 +252,4 @@ class ProximalDecoderSimulator: for i, avg_K in enumerate(self._avg_K): data[f"AvgK_{i}"] = avg_K - return pd.DataFrame(data) - - -class SimulationDeSerializer: - """Class responsible for file management, de- and serialization of - Simulator objects.""" - - def __init__(self, save_dir: str, results_dir: str): - self._saves_dir = save_dir - self._results_dir = results_dir - - Path(self._saves_dir).mkdir(parents=True, exist_ok=True) - Path(self._results_dir).mkdir(parents=True, exist_ok=True) - - def _get_savefile_path(self, sim_name): - return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle" - - def _get_metadata_path(self, sim_name): - return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json" - - def _get_results_path(self, sim_name): - return f"{self._results_dir}/{misc.slugify(sim_name)}.csv" - - def _read_metadata(self, sim_name) -> typing.Dict: - with open(self._get_metadata_path(sim_name), 'r', - encoding='utf-8') as f: - return json.load(f) - - def _save_metadata(self, sim_name, metadata) -> None: - with open(self._get_metadata_path(sim_name), 'w+', - encoding='utf-8') as f: - json.dump(metadata, f, ensure_ascii=False, indent=4) - - def unfinished_sim_present(self, sim_name: str): - """Check if the savefile of a previously paused simulation is - present. - - :param sim_name: Name - :return: True if a paused simulation with the given name is found - """ - return os.path.isfile( - self._get_savefile_path(sim_name)) and os.path.isfile( - self._get_metadata_path(sim_name)) - - # TODO: Make the directories configurable in the init function - def get_unfinished_sims(self) -> typing.List[str]: - """Get a list unfinished simulations.""" - save_files = [f for f in os.listdir(self._saves_dir) if - os.path.isfile(os.path.join(self._saves_dir, f))] - - state_files = [f for f in save_files if f.endswith("_state.pickle")] - sim_slugs = [f.removesuffix("_state.pickle") for f in state_files] - - sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs] - - return sim_names - - def remove_unfinished_sim(self, sim_name: str): - """Remove the savefile of a previously paused simulation. - - :param sim_name: Name of the simulation - """ - os.remove(self._get_savefile_path(sim_name)) - # os.remove(self._get_metadata_path(sim_name)) - - def save_state(self, simulator: typing.Any, sim_name: str, - metadata: typing.Dict) -> None: - """Save the state of a currently running simulation. - - :param simulator: Simulator object - :param sim_name: Name of the simulation - :param metadata: Metadata to be saved besides the actual state - """ - # Save metadata - self._save_metadata(sim_name, metadata) - - # Save simulation state - with open(self._get_savefile_path(sim_name), "wb") as file: - pickle.dump(simulator, file) - - def read_state(self, sim_name: str) -> typing.Tuple[ - typing.Any, typing.Dict]: - """Read the saved state of a paused simulation. - - :param sim_name: Name of the simulation - :return: Tuple of the form (simulator, metadata) - """ - # Read metadata - metadata = self._read_metadata(sim_name) - - # Read simulation state - simulator = None - with open(self._get_savefile_path(sim_name), "rb") as file: - simulator = pickle.load(file) - - return simulator, metadata - - # TODO: Is the simulator object actually necessary here? - def save_results(self, simulator: typing.Any, sim_name: str, - metadata: typing.Dict) -> None: - """Save simulation results to file. - - :param simulator: Simulator object. Used to obtain the data - :param sim_name: Name of the simulation. Determines the filename - :param metadata: Metadata to be saved besides the actual simulation - results - """ - # Save metadata - self._save_metadata(sim_name, metadata) - - # Save results - df = simulator.get_current_results() - df.to_csv(self._get_results_path(sim_name)) - - def read_results(self, sim_name: str) -> typing.Tuple[ - pd.DataFrame, typing.Dict]: - """Read simulation results from file. - - :param sim_name: Name of the simulation. - :return: Tuple of the form (data, metadata), where data is a pandas - dataframe and metadata is a dict - """ - # Read metadata - metadata = self._read_metadata(sim_name) - - # Read results - results = pd.read_csv(self._get_results_path(sim_name)) - - return results, metadata - - -# TODO: Autosave simulation every so often -# TODO: Comment explaining what a Simulator class is -class SimulationManager: - """This class only contains functions relating to stopping and - restarting of simulations (and storing of the simulation state in a - file, to be resumed at a later date). - - All actual work is outsourced to a provided simulator class. - """ - - def __init__(self, saves_dir: str, results_dir: str): - """Construct a SimulationManager object. - - :param saves_dir: Directory in which the simulation state of a paused - simulation should be stored - :param results_dir: Directory in which the results of the simulation - should be stored - """ - self._de_serializer = SimulationDeSerializer(saves_dir, results_dir) - - self._simulator = None - self._sim_name = None - self._metadata = {"duration": 0} - self._sim_start_time = None - - signal.signal(signal.SIGINT, self._exit_gracefully) - signal.signal(signal.SIGTERM, self._exit_gracefully) - signal.signal(signal.SIGHUP, self._exit_gracefully) - - def _sim_configured(self) -> bool: - """Check whether 'configure_simulation()' has been called.""" - return (self._simulator is not None) and ( - self._sim_name is not None) and ( - self._metadata is not None) - - def configure_simulation(self, simulator: typing.Any, name: str, - column_labels: typing.Sequence[str]) -> None: - """Configure a new simulation.""" - self._simulator = simulator - self._sim_name = name - self._metadata["name"] = name - self._metadata["labels"] = column_labels - self._metadata["platform"] = platform.platform() - - def get_unfinished(self) -> typing.List[str]: - """Get a list of names of all present unfinished simulations.""" - return self._de_serializer.get_unfinished_sims() - - def load_unfinished(self, sim_name: str) -> None: - """Load the state of an unfinished simulation form its savefile. - - Warning: This function deletes the savefile after loading. - """ - assert self._de_serializer.unfinished_sim_present(sim_name) - - self._sim_name = sim_name - self._simulator, self._metadata = self._de_serializer.read_state( - sim_name) - - self._de_serializer.remove_unfinished_sim(sim_name) - - # TODO: Metadata is being written twice here. Should save_results() also - # save the metadata? - def _exit_gracefully(self, *args) -> None: - """Handler called when the program is interrupted. Pauses and saves - the currently running simulation.""" - if self._sim_configured(): - self._simulator.stop() - - self._metadata["end_time"] = f"{datetime.now(tz=None)}" - self._metadata["duration"] \ - += timeit.default_timer() - self._sim_start_time - - self._de_serializer.save_state(self._simulator, self._sim_name, - self._metadata) - self._de_serializer.save_results(self._simulator, self._sim_name, - self._metadata) - - exit() - - def simulate(self) -> None: - """Start the simulation. This is a blocking call.""" - assert self._sim_configured() - - self._sim_start_time = timeit.default_timer() - - self._simulator.start() - - self._metadata["end_time"] = f"{datetime.now(tz=None)}" - self._metadata["duration"] \ - += timeit.default_timer() - self._sim_start_time - - self._de_serializer.save_results(self._simulator, self._sim_name, - self._metadata) + return pd.DataFrame(data) \ No newline at end of file