Finished initial (non-working) implementation of proximal decoder

This commit is contained in:
2022-11-04 21:07:35 +01:00
parent accc318a77
commit 6444914296
8 changed files with 301 additions and 0 deletions

2
sw/decoders/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""This package contains a number of different decoder implementations for LDPC codes
"""

86
sw/decoders/proximal.py Normal file
View File

@@ -0,0 +1,86 @@
import numpy as np
from tqdm import tqdm
class ProximalDecoder:
"""Class implementing the Proximal Decoding algorithm. See "Proximal Decoding for LDPC Codes" by Tadashi
Wadayama, and Satoshi Takabe.
"""
def __init__(self, H: np.array, K: int = 10, step_size: float = 0.1, gamma: float = 0.05):
"""Construct a new ProximalDecoder Object.
:param H: Parity Check Matrix
:param K: Max number of iterations to perform when decoding
:param step_size: Step size for the gradient descent process
:param gamma: Positive constant. Arises in the approximation of the prior PDF
"""
self._H = H
self._K = K
self._step_size = step_size
self._gamma = gamma
@staticmethod
def _L_awgn(s: np.array, y: np.array) -> np.array:
"""Variation of the negative log-likelihood for the special case of AWGN noise. See 4.1, p. 4."""
return s - y
def _grad_h(self, x: np.array) -> np.array:
"""Gradient of the code-constraint polynomial. See 2.3, p. 2."""
# Calculate first term
result = 4 * (x**2 - 1) * x
# Calculate second term
for k, x_k in enumerate(x):
# TODO: Perform this operation for each row simultaneously
B_k = np.argwhere(self._H[:, k] == 1)
B_k = B_k[:, 0] # Get rid of one layer of arrays
# TODO: Perform the summation with np.sum()
sum_result = 0
for i in B_k:
# TODO: Perform this operation for each column simultaneously
A_i = np.argwhere(self._H[i] == 1)
A_i = A_i[:, 0] # Get rid of one layer of arrays
prod = 1
for j in A_i:
prod *= x[j]
sum_result += prod**2 - prod
term_2 = 2 / x_k * sum_result
result[k] += term_2
return np.array(result)
def _check_parity(self, y_hat: np.array) -> bool:
"""Perform a parity check for a given codeword.
:param y_hat: codeword to be checked
:return: True if the parity check passes, i.e. the codeword is valid. False otherwise
"""
syndrome = np.dot(self._H, y_hat) % 2
return not np.any(syndrome)
def decode(self, y: np.array) -> np.array:
"""Decode a received signal. The algorithm is detailed in 3.2, p.3.
This function assumes an AWGN channel.
:param y: Vector of received values
:return: Most probably sent symbol
"""
s = 0
x_hat = 0
for k in range(self._K):
r = s - self._step_size * self._L_awgn(s, y)
s = r - self._gamma * self._grad_h(r)
x_hat = (np.sign(s) == 1) * 1
if self._check_parity(x_hat):
break
return x_hat

80
sw/decoders/utility.py Normal file
View File

@@ -0,0 +1,80 @@
"""This file contains various utility functions that can be used in combination with the decoders.
"""
import numpy as np
import typing
from tqdm import tqdm
def _get_noise_amp_from_SNR(SNR: float, signal_amp: float = 1) -> float:
"""Calculate the amplitude of the noise from an SNR and the signal amplitude
:param SNR: Signal-to-Noise-Ratio in dB
:param signal_amp: Signal Amplitude (linear)
:return: Noise Amplitude (linear)
"""
SNR_linear = 10 ** (SNR / 10)
noise_amp = (1 / np.sqrt(SNR_linear)) * signal_amp
return noise_amp
def add_awgn(c: np.array, SNR: float) -> np.array:
"""Add Additive White Gaussian Noise to a data vector. As this function adds random noise to the input,
the output changes, even if it is called multiple times with the same input.
:param c: Binary vector representing the data to be transmitted
:param SNR: Signal-to-Noise-Ratio in dB
:return: Data vector with added noise
"""
noise_amp = _get_noise_amp_from_SNR(SNR, signal_amp=1)
y = c + np.random.normal(scale=noise_amp, size=c.size)
return y
def count_bit_errors(d: np.array, d_hat: np.array) -> int:
"""Count the number of wrong bits in a decoded codeword.
:param d: Originally sent data
:param d_hat: Received data
:return: Number of bit errors
"""
return np.sum(d != d_hat)
def test_decoder(decoder: typing.Any,
c: np.array,
SNRs: typing.Sequence[float] = np.linspace(1, 4, 7),
N=10000) \
-> typing.Tuple[np.array, np.array]:
"""Calculate the Bit Error Rate (BER) for a given decoder for a number of SNRs.
This function prints it's progress to stdout
:param decoder: Instance of the decoder to be tested
:param c: Codeword whose transmission is to be simulated
:param SNRs: List of SNRs for which the BER should be calculated
:param N: Number of iterations to perform for each SNR
:return: Tuple of numpy arrays of the form (SNRs, BERs)
"""
BERs = []
for SNR in tqdm(SNRs, desc="Calculating Bit-Error-Rates",
position=0,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
total_bit_errors = 0
for n in tqdm(range(N), desc=f"Simulating for SNR = {SNR} dB",
position=1,
leave=False,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"):
y = add_awgn(c, SNR)
y_hat = decoder.decode(y)
total_bit_errors += count_bit_errors(c, y_hat)
total_bits = c.size * N
BERs.append(total_bit_errors / total_bits)
return np.array(SNRs), np.array(BERs)