Implemented Simulator
This commit is contained in:
parent
8f4b7e1fa1
commit
c24a36db07
@ -130,13 +130,11 @@ class Simulator:
|
|||||||
target_frame_errors: int):
|
target_frame_errors: int):
|
||||||
"""Construct and object of type simulator.
|
"""Construct and object of type simulator.
|
||||||
|
|
||||||
TODO: ...
|
:param n: Number of bits in a codeword
|
||||||
|
:param k: Number of bits in a dataword
|
||||||
:param n:
|
:param decoders: Sequence of decoders to test
|
||||||
:param k:
|
:param SNRs: Sequence of SNRs for which the BERs should be calculated
|
||||||
:param decoders:
|
:param target_frame_errors: Number of frame errors after which to stop the simulation
|
||||||
:param SNRs:
|
|
||||||
:param target_frame_errors:
|
|
||||||
"""
|
"""
|
||||||
# Simulation parameters
|
# Simulation parameters
|
||||||
|
|
||||||
@ -156,110 +154,93 @@ class Simulator:
|
|||||||
|
|
||||||
self._curr_num_frame_errors = 0
|
self._curr_num_frame_errors = 0
|
||||||
self._curr_num_bit_errors = 0
|
self._curr_num_bit_errors = 0
|
||||||
self._curr_num_total_bits = 0
|
self._curr_num_iterations = 0
|
||||||
|
|
||||||
# Results
|
# Results & Miscellaneous
|
||||||
|
|
||||||
self._BERs = []
|
self._sim_running = False
|
||||||
|
self._BERs = [[]]
|
||||||
|
|
||||||
def _update_sim_state(self, bit_errors: int):
|
def _simulate_transmission(self) -> int:
|
||||||
pass
|
"""Simulate the transmission of a single codeword.
|
||||||
|
|
||||||
|
:return: Number of bit errors that occurred
|
||||||
|
"""
|
||||||
|
SNR = self._SNRs[self._current_SNRs_index]
|
||||||
|
decoder = self._decoders[self._current_decoder_index]
|
||||||
|
|
||||||
def _simulate_transmission(self, decoder: typing.Any, SNR: float) -> bool:
|
|
||||||
# Simulate channel
|
|
||||||
y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k)
|
y = noise.add_awgn(self._x_bpsk, SNR, self._n, self._k)
|
||||||
|
|
||||||
# Decode received frame
|
|
||||||
x_hat = decoder.decode(y)
|
x_hat = decoder.decode(y)
|
||||||
|
|
||||||
# Update statistics
|
return count_bit_errors(self._x, x_hat)
|
||||||
bit_errors = count_bit_errors(self._x, x_hat)
|
|
||||||
self._curr_num_total_bits += self._x.size
|
def _update_statistics(self, bit_errors: int) -> None:
|
||||||
|
"""Update the statistics of the simulator.
|
||||||
|
|
||||||
|
:param bit_errors: Number of bit errors that occurred during the last transmission
|
||||||
|
"""
|
||||||
|
self._curr_num_iterations += 1
|
||||||
|
|
||||||
if bit_errors > 0:
|
if bit_errors > 0:
|
||||||
self._curr_num_frame_errors += 1
|
self._curr_num_frame_errors += 1
|
||||||
self._curr_num_total_bits += bit_errors
|
self._curr_num_bit_errors += bit_errors
|
||||||
|
|
||||||
return bit_errors > 0
|
def _advance_state(self) -> None:
|
||||||
|
"""Advance the state of the simulator.
|
||||||
|
|
||||||
def _simulate_SNR(self, SNR):
|
This function also appends a new BER value to the self._BERs array
|
||||||
pbar = tqdm(total=self._target_frame_errors,
|
if the number of target frame errors has been reached
|
||||||
desc=f"Simulating for SNR = {SNR} dB", position=2, leave=False,
|
"""
|
||||||
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
|
if self._curr_num_frame_errors >= self._target_frame_errors:
|
||||||
|
# TODO: Properly handle the multiple decoders
|
||||||
|
self._BERs[self._current_decoder_index]\
|
||||||
|
.append(self._curr_num_bit_errors / (self._curr_num_iterations * self._n))
|
||||||
|
|
||||||
while self._curr_num_frame_errors < self._target_frame_errors:
|
self._curr_num_frame_errors = 0
|
||||||
error_occurred = self._simulate_transmission()
|
self._curr_num_bit_errors = 0
|
||||||
if error_occurred:
|
self._curr_num_iterations = 0
|
||||||
pbar.update(1)
|
|
||||||
pbar.close()
|
|
||||||
|
|
||||||
self._BERs.append(self._curr_num_bit_errors / self._curr_num_total_bits)
|
if self._current_SNRs_index < len(self._SNRs)-1:
|
||||||
|
self._current_SNRs_index += 1
|
||||||
|
else:
|
||||||
|
if self._current_decoder_index < len(self._decoders)-1:
|
||||||
|
self._current_decoder_index += 1
|
||||||
|
self._current_SNRs_index = 0
|
||||||
|
self._BERs.append([])
|
||||||
|
else:
|
||||||
|
self._sim_running = False
|
||||||
|
|
||||||
def
|
def start(self) -> None:
|
||||||
|
"""Start the simulation.
|
||||||
|
|
||||||
def test_decoder(self) -> typing.Tuple[np.array, np.array]:
|
This is a blocking call. A call to the stop() function
|
||||||
"""Calculate the Bit Error Rate (BER) for a given decoder for a number of SNRs.
|
from another thread will stop this function
|
||||||
|
"""
|
||||||
|
self._sim_running = True
|
||||||
|
|
||||||
This function assumes the all-zeros assumption holds. Progress is printed to stdout.
|
while self._sim_running:
|
||||||
|
bit_errors = self._simulate_transmission()
|
||||||
|
self._update_statistics(bit_errors)
|
||||||
|
self._advance_state()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Stop the simulation."""
|
||||||
|
self._sim_running = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def SNRs_and_BERs(self) -> typing.Tuple[np.array, np.array]:
|
||||||
|
"""Get the current results.
|
||||||
|
|
||||||
|
If the simulation has not yet completed, the BERs which have not yet been calculated are set to 0.
|
||||||
|
|
||||||
:return: Tuple of numpy arrays of the form (SNRs, BERs)
|
:return: Tuple of numpy arrays of the form (SNRs, BERs)
|
||||||
"""
|
"""
|
||||||
|
SNRs = np.array(self._SNRs)
|
||||||
|
|
||||||
decoder = self._decoders[self._current_decoder_index]
|
# TODO: Make sure the length of each BER_array is the same as the number of SNRs
|
||||||
|
BERs = [np.array(BER_array) for BER_array in self._BERs]
|
||||||
for SNR in tqdm(self._SNRs[self._current_SNRs_index:],
|
|
||||||
desc=f"Calculating BERs for {decoder.__class__.__name__}",
|
|
||||||
position=1, leave=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
|
|
||||||
|
|
||||||
pbar = tqdm(total=self._target_frame_errors,
|
|
||||||
desc=f"Simulating for SNR = {SNR} dB", position=2, leave=False,
|
|
||||||
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]")
|
|
||||||
|
|
||||||
while self._curr_num_frame_errors < self._target_frame_errors:
|
|
||||||
error_occurred = self._simulate_transmission()
|
|
||||||
if error_occurred:
|
|
||||||
pbar.update(1)
|
|
||||||
pbar.close()
|
|
||||||
|
|
||||||
self._BERs.append(self._curr_num_bit_errors / self._curr_num_total_bits)
|
|
||||||
|
|
||||||
return np.array(self._SNRs), np.array(self._BERs)
|
|
||||||
|
|
||||||
|
|
||||||
def test_decoders(n: int,
|
|
||||||
k: int,
|
|
||||||
decoders: typing.List,
|
|
||||||
SNRs: typing.Sequence[float] = np.linspace(1, 7, 7),
|
|
||||||
target_frame_errors: int = 100) \
|
|
||||||
-> typing.Tuple[np.array, np.array]:
|
|
||||||
"""Calculate the Bit Error Rate (BER) for a number of given decoders for a number of SNRs.
|
|
||||||
|
|
||||||
This function assumes the all-zeros assumption holds. Progress is printed to stdout.
|
|
||||||
|
|
||||||
:param n: Length of a codeword of the used code
|
|
||||||
:param k: Length of a dataword of the used code
|
|
||||||
:param decoders: List of decoder objects to be tested
|
|
||||||
:param SNRs: List of SNRs for which the BER should be calculated
|
|
||||||
:param target_frame_errors: Number of frame errors after which to stop the simulation
|
|
||||||
:return: Tuple of the form (SNRs, [BERs_1, BERs_2, ...]) where SNR and BERs_x are numpy arrays
|
|
||||||
"""
|
|
||||||
result_BERs = []
|
|
||||||
|
|
||||||
start_time = default_timer()
|
|
||||||
|
|
||||||
for decoder in tqdm(decoders,
|
|
||||||
desc="Calculating the answer to life, the universe and everything",
|
|
||||||
position=0,
|
|
||||||
leave=False,
|
|
||||||
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
|
|
||||||
_, BERs = test_decoder(n, k, decoder, SNRs, target_frame_errors)
|
|
||||||
result_BERs.append(BERs)
|
|
||||||
|
|
||||||
end_time = default_timer()
|
|
||||||
print(f"Elapsed time: {end_time - start_time:.2f}s")
|
|
||||||
|
|
||||||
return SNRs, result_BERs
|
|
||||||
|
|
||||||
|
return SNRs, BERs
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SimulationParameters:
|
class SimulationParameters:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user