Moved de-serialization functions from SimulationManager to SimulationDeSerializer

This commit is contained in:
Andreas Tsouchlos 2022-11-18 13:06:31 +01:00
parent a128ed72ae
commit 3b022b33ad
2 changed files with 131 additions and 77 deletions

19
sw/utility/misc.py Normal file
View File

@ -0,0 +1,19 @@
import unicodedata
import re
def slugify(value, allow_unicode=False):
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize('NFKC', value)
else:
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = re.sub(r'[^\w\s-]', '', value.lower())
return re.sub(r'[-\s]+', '-', value).strip('-_')

View File

@ -1,6 +1,6 @@
"""This file contains utility functions relating to tests and simulations of the decoders."""
import json
import pandas as pd
import numpy as np
import typing
from tqdm import tqdm
@ -8,9 +8,8 @@ import signal
import pickle
import os.path
from pathlib import Path
import spdlog as spd
from utility import noise
from utility import noise, misc
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
@ -27,11 +26,12 @@ def count_bit_errors(d: np.array, d_hat: np.array) -> int:
class Simulator:
"""Class allowing for saving of simulations state.
Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder for various SNRs.
Given a list of decoders, this class allows for simulating the Bit-Error-Rates of each decoder
for various SNRs.
The functionality implemented by this class could be achieved by a bunch of loops and a function.
However, storing the state of the simulation as member variables allows for pausing and resuming the simulation
at a later state.
However, storing the state of the simulation as member variables allows for pausing and resuming
the simulation at a later time.
"""
def __init__(self, n: int, k: int,
@ -73,7 +73,6 @@ class Simulator:
self._create_pbars()
self._sim_running = False
self._sim_done = False
def _create_pbars(self):
self._overall_pbar = tqdm(total=len(self._decoders),
@ -178,7 +177,6 @@ class Simulator:
self._overall_pbar.update(1)
else:
self._sim_running = False
self._sim_done = True
self._snr_pbar.close()
self._decoder_pbar.close()
@ -190,26 +188,17 @@ class Simulator:
This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
if not self._sim_done:
self._sim_running = True
self._sim_running = True
while self._sim_running:
bit_errors = self._simulate_transmission()
self._update_statistics(bit_errors)
self._advance_state()
while self._sim_running:
bit_errors = self._simulate_transmission()
self._update_statistics(bit_errors)
self._advance_state()
def stop(self) -> None:
"""Stop the simulation."""
self._sim_running = False
@property
def simulation_done(self) -> bool:
"""Check whether the simulation is still ongoing or completed.
:return: True if the simulation is completed
"""
return self._sim_done
@property
def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]:
"""Get the current results.
@ -236,7 +225,77 @@ class Simulator:
return SNRs, BERs
# TODO: Write currently calculated BERs to file when simulation is stopped
# TODO: Fix typing.Any or Simulator
class SimulationDeSerializer:
def __init__(self, save_dir: str, results_dir: str):
self._save_dir = save_dir
self._results_dir = results_dir
def _get_savefile_path(self, sim_name):
return f"{self._save_dir}/{misc.slugify(sim_name)}.pickle"
def _get_metadata_path(self, sim_name):
return f"{self._save_dir}/{misc.slugify(sim_name)}.json"
def _get_results_path(self, sim_name):
return f"{self._results_dir}/{misc.slugify(sim_name)}.csv"
def unfinished_sim_present(self, sim_name: str):
return os.path.isfile(self._get_savefile_path(sim_name)) \
and os.path.isfile(self._get_metadata_path(sim_name))
def remove_unfinished_sim(self, sim_name):
os.remove(self._get_savefile_path(sim_name))
os.remove(self._get_metadata_path(sim_name))
def save_state(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None:
Path(self._save_dir).mkdir(parents=True, exist_ok=True)
# Save metadata
with open(self._get_metadata_path(sim_name), 'w+', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=4)
# Save simulation state
with open(self._get_savefile_path(sim_name), "wb") as file:
pickle.dump(simulator, file)
# Save results
self.save_results(simulator, sim_name, metadata)
def read_state(self, sim_name: str) -> typing.Tuple[typing.Any, typing.Dict]:
metadata = None
simulator = None
# Read metadata
with open(self._get_metadata_path(sim_name), 'r', encoding='utf-8') as f:
metadata = json.load(f)
# Read simulation state
with open(self._get_savefile_path(sim_name), "rb") as file:
simulator = pickle.load(file)
return simulator, metadata
def save_results(self, simulator: typing.Any, sim_name: str, metadata: typing.Dict) -> None:
Path(self._save_dir).mkdir(parents=True, exist_ok=True)
SNRs, BERs = simulator.SNRs_and_BERs
data_dict = {"SNR": SNRs}
for i, BER in enumerate(BERs):
data_dict[f"BER_{i}"] = BER
df = pd.DataFrame(data_dict)
df.to_csv(self._get_results_path(sim_name))
# TODO: Read metadata
def read_results(self, sim_name: str) -> pd.DataFrame:
data = pd.read_csv(self._get_results_path(sim_name))
return data
# TODO: Fix typing.Any or Simulator
class SimulationManager:
"""This class only contains functions relating to stopping and restarting of simulations
(and storing of the simulation state in a file, to be resumed at a later date).
@ -250,91 +309,67 @@ class SimulationManager:
:param save_dir: Directory in which the simulation state of a paused simulation should be stored
:param results_dir: Directory in which the results of the simulation should be stored
"""
self._save_dir = save_dir
self._sim_state_filepath = f"{self._save_dir}/sim_state.pickle"
self._logs_filepath = f"{self._save_dir}/logs.txt"
self._results_dir = results_dir
self._de_serializer = SimulationDeSerializer(save_dir, results_dir)
self._simulator = None
Path(self._save_dir).mkdir(parents=True, exist_ok=True)
self._logger = spd.FileLogger("SimulationManager", self._logs_filepath)
self._logger.set_level(spd.LogLevel.DEBUG)
self._sim_name = None
self._metadata = {}
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)
signal.signal(signal.SIGHUP, self._exit_gracefully)
#
# Functions relating to the pausing and restarting of simulations
#
def _sim_configured(self) -> bool:
return (self._simulator is not None)\
and (self._sim_name is not None)\
and (self._metadata is not None)
def unfinished_simulation_present(self) -> bool:
def configure_simulation(self, simulator: typing.Any, name: str, column_names: typing.Sequence[str]) -> None:
"""Configure a new simulation."""
self._simulator = simulator
self._sim_name = name
self._metadata["column_names"] = column_names
def unfinished_simulation_present(self, sim_name: str) -> bool:
"""Check whether the savefile of a previously unfinished simulation is present."""
return os.path.isfile(self._sim_state_filepath)
return self._de_serializer.unfinished_sim_present(sim_name)
def load_unfinished(self):
# TODO: Where should the files be removed?
def load_unfinished(self, sim_name: str) -> None:
"""Load the state of an unfinished simulation its savefile.
Warning: This function deletes the savefile after loading.
"""
assert self.unfinished_simulation_present()
assert self.unfinished_simulation_present(sim_name)
self._logger.info("Loading saved simulation state")
self._sim_name = sim_name
self._simulator, self._metadata = self._de_serializer.read_state(sim_name)
with open(self._sim_state_filepath, "rb") as file:
self._simulator = pickle.load(file)
os.remove(self._sim_state_filepath)
def _save_state(self) -> None:
"""Write the state of the currently configured simulation to a savefile."""
if self._simulator is not None:
with open(self._sim_state_filepath, "wb") as file:
pickle.dump(self._simulator, file)
self._logger.info("Saved simulation state")
else:
self._logger.info("No simulation state to save: simulator object is 'None'")
self._de_serializer.remove_unfinished_sim(sim_name)
def _exit_gracefully(self, *args) -> None:
"""Handler called when the program is interrupted.
Pauses and saves the currently running simulation
"""
self._logger.debug("Intercepted signal SIGINT/SIGTERM")
if self._simulator is not None:
if self._sim_configured():
self._simulator.stop()
self._save_state()
self._de_serializer.save_state(self._simulator, self._sim_name, self._metadata)
self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata)
exit()
#
# Functions responsible for the actual simulation
#
def set_simulator(self, simulator: typing.Any) -> None:
"""Select a simulator to do the actual work."""
self._simulator = simulator
def start(self):
def start(self) -> None:
"""Start the simulation.
This is a blocking call. A call to the stop() function
from another thread will stop this function.
"""
assert self._simulator is not None
assert self._sim_configured()
self._logger.info("Starting simulation")
self._simulator.start()
@property
def simulation_done(self) -> None:
"""Check whether the configured simulation has been completed."""
return self._simulator.simulation_done
def save_results(self) -> None:
assert self._sim_configured()
def get_current_results(self) -> typing.Any:
"""Get the current results of the configured simulation."""
return self._simulator.SNRs_and_BERs
self._de_serializer.save_results(self._simulator, self._sim_name, self._metadata)