Moved python files from sw to sw/python; Moved scritps into sw/python/scripts

This commit is contained in:
2022-12-08 14:31:23 +01:00
parent 7c01f0a7e3
commit 3938c4aa31
37 changed files with 136 additions and 421 deletions

View File

@@ -0,0 +1,77 @@
"""Simulation package.
This package provides a way to easily define simulations in such a way that
they can be paused and resumed.
General Structure
=================
The package consists of 3 main components:
- The 'SimulationDeSerializer': Responsible for file IO
- The 'Simulator': Responsible for the actual simulating
- The 'SimulationManager': Delegates work to the DeSerializer and the
Simulator
The Simulator Class
===================
For each new simulating task, a new 'Simulator' must be defined. The
requirements for this class are the following:
- Must define the 'start_or_continue()', 'stop()' and
'get_current_results()' functions
- Must be picklable in order to store the simulation state
An example simulator could look as follows:
----------------------------------------------------------------
class SomeSimulator:
def __init__(self, num_iterations):
self._num_iterations = num_iterations
self._current_iter = 0
self._simulation_running = False
self._results = pd.DataFrame()
def _perform_iteration(self):
# Perform iteration and append results
...
def start_or_continue(self) -> None:
self._simulation_running = True
while self._simulation_running and (
self._current_iter < self._num_iterations):
self._perform_iteration()
def stop(self) -> None:
self._simulation_running = False
def get_current_results(self) -> pd.DataFrame:
return self._results
----------------------------------------------------------------
Usage
=====
To start a new simulation:
----------------------------------------------------------------
sim_mgr = SimulationManager(results_dir="results", saves_dir="saves")
sim = SomeSimulator(num_iterations=100)
sim_mgr.configure_simulation(simulator=sim, name='Some Simulation', \
column_labels=['label1', 'label2'])
sim_mgr.start()
----------------------------------------------------------------
To check for a previously interrupted simulation and continue:
----------------------------------------------------------------
sim_mgr = SimulationManager(results_dir="results", saves_dir="saves")
unfinished_sims = sim_mgr.get_unfinished()
if len(unfinished_sims) > 0:
sim_mgr.load_unfinished(unfinished_sims[0])
sim_mgr.simulate()
----------------------------------------------------------------
"""
from utility.simulation.management import SimulationManager, \
SimulationDeSerializer

View File

@@ -0,0 +1,239 @@
import json
import pandas as pd
import typing
import signal
import pickle
import os
from pathlib import Path
import platform
from datetime import datetime
import timeit
import collections.abc
from utility import misc
class SimulationDeSerializer:
"""Class responsible for file management, de- and serialization of
Simulator objects."""
def __init__(self, save_dir: str, results_dir: str):
self._saves_dir = save_dir
self._results_dir = results_dir
Path(self._saves_dir).mkdir(parents=True, exist_ok=True)
Path(self._results_dir).mkdir(parents=True, exist_ok=True)
def _get_savefile_path(self, sim_name):
return f"{self._saves_dir}/{misc.slugify(sim_name)}_state.pickle"
def _get_metadata_path(self, sim_name):
return f"{self._results_dir}/{misc.slugify(sim_name)}_metadata.json"
def _get_results_path(self, sim_name):
return f"{self._results_dir}/{misc.slugify(sim_name)}.csv"
def _read_metadata(self, sim_name) -> typing.Dict:
with open(self._get_metadata_path(sim_name), 'r',
encoding='utf-8') as f:
return json.load(f)
def _save_metadata(self, sim_name, metadata) -> None:
with open(self._get_metadata_path(sim_name), 'w+',
encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=4)
def unfinished_sim_present(self, sim_name: str):
"""Check if the savefile of a previously paused simulation is
present.
:param sim_name: Name
:return: True if a paused simulation with the given name is found
"""
return os.path.isfile(
self._get_savefile_path(sim_name)) and os.path.isfile(
self._get_metadata_path(sim_name))
# TODO: Make the directories configurable in the init function
def get_unfinished_sims(self) -> typing.List[str]:
"""Get a list unfinished simulations."""
save_files = [f for f in os.listdir(self._saves_dir) if
os.path.isfile(os.path.join(self._saves_dir, f))]
state_files = [f for f in save_files if f.endswith("_state.pickle")]
sim_slugs = [f.removesuffix("_state.pickle") for f in state_files]
sim_names = [self._read_metadata(slug)["name"] for slug in sim_slugs]
return sim_names
def remove_unfinished_sim(self, sim_name: str):
"""Remove the savefile of a previously paused simulation.
:param sim_name: Name of the simulation
"""
os.remove(self._get_savefile_path(sim_name))
# os.remove(self._get_metadata_path(sim_name))
def save_state(self, simulator: typing.Any, sim_name: str,
metadata: typing.Dict) -> None:
"""Save the state of a currently running simulation.
:param simulator: Simulator object
:param sim_name: Name of the simulation
:param metadata: Metadata to be saved besides the actual state
"""
# Save metadata
self._save_metadata(sim_name, metadata)
# Save simulation state
with open(self._get_savefile_path(sim_name), "wb") as file:
pickle.dump(simulator, file)
def read_state(self, sim_name: str) -> typing.Tuple[
typing.Any, typing.Dict]:
"""Read the saved state of a paused simulation.
:param sim_name: Name of the simulation
:return: Tuple of the form (simulator, metadata)
"""
# Read metadata
metadata = self._read_metadata(sim_name)
# Read simulation state
simulator = None
with open(self._get_savefile_path(sim_name), "rb") as file:
simulator = pickle.load(file)
return simulator, metadata
# TODO: Is the simulator object actually necessary here?
def save_results(self, simulator: typing.Any, sim_name: str,
metadata: typing.Dict) -> None:
"""Save simulation results to file.
:param simulator: Simulator object. Used to obtain the data
:param sim_name: Name of the simulation. Determines the filename
:param metadata: Metadata to be saved besides the actual simulation
results
"""
# Save metadata
self._save_metadata(sim_name, metadata)
# Save current results
simulator.current_results.to_csv(self._get_results_path(sim_name),
index=False)
def read_results(self, sim_name: str) -> typing.Tuple[
pd.DataFrame, typing.Dict]:
"""Read simulation results from file.
:param sim_name: Name of the simulation.
:return: Tuple of the form (data, metadata), where data is a pandas
dataframe and metadata is a dict
"""
# Read metadata
metadata = self._read_metadata(sim_name)
# Read results
results = pd.read_csv(self._get_results_path(sim_name))
return results, metadata
# TODO: Autosave simulation every so often
# TODO: Comment explaining what a Simulator class is
class SimulationManager:
"""This class only contains functions relating to stopping and
restarting of simulations (and storing of the simulation state in a
file, to be resumed at a later date).
All actual work is outsourced to a provided simulator class.
"""
def __init__(self, saves_dir: str, results_dir: str):
"""Construct a SimulationManager object.
:param saves_dir: Directory in which the simulation state of a paused
simulation should be stored
:param results_dir: Directory in which the results of the simulation
should be stored
"""
self._de_serializer = SimulationDeSerializer(saves_dir, results_dir)
self._simulator = None
self._sim_name = None
self._metadata = {"duration": 0}
self._sim_start_time = None
def _sim_configured(self) -> bool:
"""Check whether 'configure_simulation()' has been called."""
return (self._simulator is not None) and (
self._sim_name is not None) and (
self._metadata is not None)
def configure_simulation(self, simulator: typing.Any, name: str,
additional_metadata: dict = {}) -> None:
"""Configure a new simulation."""
self._simulator = simulator
self._sim_name = name
self._metadata["name"] = name
self._metadata["platform"] = platform.platform()
self._metadata.update(additional_metadata)
def get_unfinished(self) -> typing.List[str]:
"""Get a list of names of all present unfinished simulations."""
return self._de_serializer.get_unfinished_sims()
def load_unfinished(self, sim_name: str) -> None:
"""Load the state of an unfinished simulation form its savefile.
Warning: This function deletes the savefile after loading.
"""
assert self._de_serializer.unfinished_sim_present(sim_name)
self._sim_name = sim_name
self._simulator, self._metadata = self._de_serializer.read_state(
sim_name)
self._de_serializer.remove_unfinished_sim(sim_name)
# TODO: Metadata is being written twice here. Should save_results() also
# save the metadata?
def _exit_gracefully(self, *args) -> None:
"""Handler called when the program is interrupted. Pauses and saves
the currently running simulation."""
if self._sim_configured():
self._simulator.stop()
self._metadata["end_time"] = f"{datetime.now(tz=None)}"
self._metadata["duration"] \
+= timeit.default_timer() - self._sim_start_time
self._de_serializer.save_state(self._simulator, self._sim_name,
self._metadata)
self._de_serializer.save_results(self._simulator, self._sim_name,
self._metadata)
exit()
def simulate(self) -> None:
"""Start the simulation. This is a blocking call."""
assert self._sim_configured()
try:
self._sim_start_time = timeit.default_timer()
self._simulator.start_or_continue()
self._metadata["end_time"] = f"{datetime.now(tz=None)}"
self._metadata["duration"] \
+= timeit.default_timer() - self._sim_start_time
self._de_serializer.save_results(self._simulator, self._sim_name,
self._metadata)
except KeyboardInterrupt:
self._exit_gracefully()
def get_current_results(self) -> pd.DataFrame:
return self._simulator.current_results

View File

@@ -0,0 +1,121 @@
import pandas as pd
import numpy as np
import typing
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, process, wait
from functools import partial
from multiprocessing import Lock
from utility import noise
# TODO: Fix ProximalDecoder_Dynamic
# from cpp_modules.cpp_decoders import ProximalDecoder_Dynamic as
# ProximalDecoder
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
"""Count the number of wrong bits in a decoded codeword.
:param d: Originally sent data
:param d_hat: Received data
:return: Number of bit errors
"""
return np.sum(d != d_hat)
class HashableDict:
"""Class behaving like an immutable dict. More importantly it is
hashable and thus usable as a key type for another dict."""
def __init__(self, data_dict):
assert (isinstance(data_dict, dict))
for key, val in data_dict.items():
self.__dict__[key] = val
def __getitem__(self, item):
return self.__dict__[item]
def __str__(self):
return str(self.__dict__)
class GenericMultithreadedSimulator:
def __init__(self, max_workers=8):
self._format_func = None
self._task_func = None
self._task_params = None
self._max_workers = max_workers
self._results = {}
self._executor = None
@property
def task_params(self):
return self._task_params
@task_params.setter
def task_params(self, sim_params):
self._task_params = {HashableDict(iteration_params): iteration_params
for iteration_params in sim_params}
@property
def task_func(self):
return self._task_func
@task_func.setter
def task_func(self, func):
self._task_func = func
@property
def format_func(self):
return self._format_func
@format_func.setter
def format_func(self, func):
self._format_func = func
def start_or_continue(self):
assert self._task_func is not None
assert self._task_params is not None
assert self._format_func is not None
self._executor = ProcessPoolExecutor(max_workers=self._max_workers)
with tqdm(total=(len(self._task_params)), leave=False) as pbar:
def done_callback(key, f):
try:
pbar.update(1)
self._results[key] = f.result()
del self._task_params[key]
except process.BrokenProcessPool:
# This exception is thrown when the program is
# prematurely stopped with a KeyboardInterrupt
pass
futures = []
for key, params in list(self._task_params.items()):
future = self._executor.submit(self._task_func, params)
future.add_done_callback(partial(done_callback, key))
futures.append(future)
self._executor.shutdown(wait=True, cancel_futures=False)
def stop(self):
assert self._executor is not None, "The simulation has to be started" \
" before it can be stopped"
self._executor.shutdown(wait=True, cancel_futures=True)
@property
def current_results(self):
return self._format_func(self._results)
def __getstate__(self):
state = self.__dict__.copy()
state["_executor"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._executor = ProcessPoolExecutor()