diff --git a/.gitignore b/.gitignore index 127a77b..fa145d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ latex/build/ latex/tmp/ +.idea +__pycache__ diff --git a/sw/decoders/__init__.py b/sw/decoders/__init__.py new file mode 100644 index 0000000..b3cda83 --- /dev/null +++ b/sw/decoders/__init__.py @@ -0,0 +1,2 @@ +"""This package contains a number of different decoder implementations for LDPC codes +""" \ No newline at end of file diff --git a/sw/decoders/proximal.py b/sw/decoders/proximal.py new file mode 100644 index 0000000..f943cd6 --- /dev/null +++ b/sw/decoders/proximal.py @@ -0,0 +1,86 @@ +import numpy as np +from tqdm import tqdm + + +class ProximalDecoder: + """Class implementing the Proximal Decoding algorithm. See "Proximal Decoding for LDPC Codes" by Tadashi + Wadayama, and Satoshi Takabe. + """ + def __init__(self, H: np.array, K: int = 10, step_size: float = 0.1, gamma: float = 0.05): + """Construct a new ProximalDecoder Object. + + :param H: Parity Check Matrix + :param K: Max number of iterations to perform when decoding + :param step_size: Step size for the gradient descent process + :param gamma: Positive constant. Arises in the approximation of the prior PDF + """ + self._H = H + self._K = K + self._step_size = step_size + self._gamma = gamma + + @staticmethod + def _L_awgn(s: np.array, y: np.array) -> np.array: + """Variation of the negative log-likelihood for the special case of AWGN noise. See 4.1, p. 4.""" + return s - y + + def _grad_h(self, x: np.array) -> np.array: + """Gradient of the code-constraint polynomial. See 2.3, p. 2.""" + # Calculate first term + + result = 4 * (x**2 - 1) * x + + # Calculate second term + + for k, x_k in enumerate(x): + # TODO: Perform this operation for each row simultaneously + B_k = np.argwhere(self._H[:, k] == 1) + B_k = B_k[:, 0] # Get rid of one layer of arrays + + # TODO: Perform the summation with np.sum() + sum_result = 0 + for i in B_k: + # TODO: Perform this operation for each column simultaneously + A_i = np.argwhere(self._H[i] == 1) + A_i = A_i[:, 0] # Get rid of one layer of arrays + + prod = 1 + for j in A_i: + prod *= x[j] + + sum_result += prod**2 - prod + + term_2 = 2 / x_k * sum_result + + result[k] += term_2 + + return np.array(result) + + def _check_parity(self, y_hat: np.array) -> bool: + """Perform a parity check for a given codeword. + + :param y_hat: codeword to be checked + :return: True if the parity check passes, i.e. the codeword is valid. False otherwise + """ + syndrome = np.dot(self._H, y_hat) % 2 + return not np.any(syndrome) + + def decode(self, y: np.array) -> np.array: + """Decode a received signal. The algorithm is detailed in 3.2, p.3. + + This function assumes an AWGN channel. + + :param y: Vector of received values + :return: Most probably sent symbol + """ + s = 0 + x_hat = 0 + for k in range(self._K): + r = s - self._step_size * self._L_awgn(s, y) + s = r - self._gamma * self._grad_h(r) + x_hat = (np.sign(s) == 1) * 1 + + if self._check_parity(x_hat): + break + + return x_hat diff --git a/sw/decoders/utility.py b/sw/decoders/utility.py new file mode 100644 index 0000000..0fe573e --- /dev/null +++ b/sw/decoders/utility.py @@ -0,0 +1,80 @@ +"""This file contains various utility functions that can be used in combination with the decoders. +""" + +import numpy as np +import typing +from tqdm import tqdm + + +def _get_noise_amp_from_SNR(SNR: float, signal_amp: float = 1) -> float: + """Calculate the amplitude of the noise from an SNR and the signal amplitude + + :param SNR: Signal-to-Noise-Ratio in dB + :param signal_amp: Signal Amplitude (linear) + :return: Noise Amplitude (linear) + """ + SNR_linear = 10 ** (SNR / 10) + noise_amp = (1 / np.sqrt(SNR_linear)) * signal_amp + + return noise_amp + + +def add_awgn(c: np.array, SNR: float) -> np.array: + """Add Additive White Gaussian Noise to a data vector. As this function adds random noise to the input, + the output changes, even if it is called multiple times with the same input. + + :param c: Binary vector representing the data to be transmitted + :param SNR: Signal-to-Noise-Ratio in dB + :return: Data vector with added noise + """ + noise_amp = _get_noise_amp_from_SNR(SNR, signal_amp=1) + y = c + np.random.normal(scale=noise_amp, size=c.size) + return y + + +def count_bit_errors(d: np.array, d_hat: np.array) -> int: + """Count the number of wrong bits in a decoded codeword. + + :param d: Originally sent data + :param d_hat: Received data + :return: Number of bit errors + """ + return np.sum(d != d_hat) + + +def test_decoder(decoder: typing.Any, + c: np.array, + SNRs: typing.Sequence[float] = np.linspace(1, 4, 7), + N=10000) \ + -> typing.Tuple[np.array, np.array]: + """Calculate the Bit Error Rate (BER) for a given decoder for a number of SNRs. + + This function prints it's progress to stdout + + :param decoder: Instance of the decoder to be tested + :param c: Codeword whose transmission is to be simulated + :param SNRs: List of SNRs for which the BER should be calculated + :param N: Number of iterations to perform for each SNR + :return: Tuple of numpy arrays of the form (SNRs, BERs) + """ + BERs = [] + for SNR in tqdm(SNRs, desc="Calculating Bit-Error-Rates", + position=0, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): + + total_bit_errors = 0 + + for n in tqdm(range(N), desc=f"Simulating for SNR = {SNR} dB", + position=1, + leave=False, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}"): + + y = add_awgn(c, SNR) + y_hat = decoder.decode(y) + + total_bit_errors += count_bit_errors(c, y_hat) + + total_bits = c.size * N + BERs.append(total_bit_errors / total_bits) + + return np.array(SNRs), np.array(BERs) diff --git a/sw/main.py b/sw/main.py new file mode 100644 index 0000000..5c1019a --- /dev/null +++ b/sw/main.py @@ -0,0 +1,36 @@ +import numpy as np +import matplotlib.pyplot as plt +import seaborn as sns + +from decoders import proximal +from decoders import utility + + +def main(): + # Hamming(7,4) code + + G = np.array([[1, 1, 1, 0, 0, 0, 0], + [1, 0, 0, 1, 1, 0, 0], + [0, 1, 0, 1, 0, 1, 0], + [1, 1, 0, 1, 0, 0, 1]]) + + H = np.array([[1, 0, 1, 0, 1, 0, 1], + [0, 1, 1, 0, 0, 1, 1], + [0, 0, 0, 1, 1, 1, 1]]) + + # Test decoder + + d = np.array([0, 1, 0, 1]) + c = np.dot(G.transpose(), d) % 2 + + print(f"Simulating with c = {c}") + + decoder = proximal.ProximalDecoder(H, K=1000) + SNRs, BERs = utility.test_decoder(decoder, c, N=100) + + plt.stem(SNRs, BERs) + plt.show() + + +if __name__ == "__main__": + main() diff --git a/sw/test/__init__.py b/sw/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sw/test/test_proximal.py b/sw/test/test_proximal.py new file mode 100644 index 0000000..ff49684 --- /dev/null +++ b/sw/test/test_proximal.py @@ -0,0 +1,55 @@ +import unittest +import numpy as np +from decoders import proximal + + +class CheckParityTestCase(unittest.TestCase): + """Test case for the check_parity function.""" + def test_check_parity(self): + # Hamming(7,4) code + G = np.array([[1, 1, 1, 0, 0, 0, 0], + [1, 0, 0, 1, 1, 0, 0], + [0, 1, 0, 1, 0, 1, 0], + [1, 1, 0, 1, 0, 0, 1]]) + H = np.array([[1, 0, 1, 0, 1, 0, 1], + [0, 1, 1, 0, 0, 1, 1], + [0, 0, 0, 1, 1, 1, 1]]) + + decoder = proximal.ProximalDecoder(H) + + d1 = np.array([0, 1, 0, 1]) + c1 = np.dot(np.transpose(G), d1) % 2 + + d2 = np.array([0, 0, 0, 0]) + c2 = np.dot(np.transpose(G), d2) % 2 + + d3 = np.array([1, 1, 1, 1]) + c3 = np.dot(np.transpose(G), d3) % 2 + + invalid_codeword = np.array([0, 1, 1, 0, 1, 1, 1]) + + self.assertEqual(decoder._check_parity(c1), True) + self.assertEqual(decoder._check_parity(c2), True) + self.assertEqual(decoder._check_parity(c3), True) + self.assertEqual(decoder._check_parity(invalid_codeword), False) + + +class GradientTestCase(unittest.TestCase): + """Test case for the calculation of the gradient of the code-constraint-polynomial""" + def test_grad_h(self): + H = np.array([[1, 0, 1], + [0, 1, 0]]) + x = np.array([2, 3, 4]) + + decoder = proximal.ProximalDecoder(H) + grad = decoder._grad_h(x) + + expected = 4 * (x**2 - 1)*x + 2 / x * np.array([0, 2, 0]) + + print(f"expected: {expected}") + + self.assertEqual(np.array_equal(grad, expected), True) + + +if __name__ == "__main__": + unittest.main() diff --git a/sw/test/test_utility.py b/sw/test/test_utility.py new file mode 100644 index 0000000..1a2dc35 --- /dev/null +++ b/sw/test/test_utility.py @@ -0,0 +1,40 @@ +import unittest +import numpy as np +from decoders import utility + + +class CountBitErrorsTestCase(unittest.TestCase): + """Test case for bit error counting.""" + def test_count_bit_errors(self): + d1 = np.array([0, 0, 0, 0]) + y_hat1 = np.array([0, 1, 0, 1]) + + d2 = np.array([0, 0, 0, 0]) + y_hat2 = np.array([0, 0, 0, 0]) + + d3 = np.array([0, 0, 0, 0]) + y_hat3 = np.array([1, 1, 1, 1]) + + self.assertEqual(utility.count_bit_errors(d1, y_hat1), 2) + self.assertEqual(utility.count_bit_errors(d2, y_hat2), 0) + self.assertEqual(utility.count_bit_errors(d3, y_hat3), 4) + + +class NoiseAmpFromSNRTestCase(unittest.TestCase): + """Test case for noise amplitude calculation""" + def test_get_noise_amp_from_SNR(self): + SNR1 = 0 + SNR2 = 6 + SNR3 = 20 + SNR4 = -20 + SNR5 = 60 + + self.assertEqual(utility._get_noise_amp_from_SNR(SNR1, signal_amp=1), 1) + self.assertAlmostEqual(utility._get_noise_amp_from_SNR(SNR2, signal_amp=1), 0.5, places=2) + self.assertEqual(utility._get_noise_amp_from_SNR(SNR3, signal_amp=1), 0.1) + self.assertEqual(utility._get_noise_amp_from_SNR(SNR4, signal_amp=1), 10) + self.assertEqual(utility._get_noise_amp_from_SNR(SNR5, signal_amp=2), 0.002) + + +if __name__ == '__main__': + unittest.main()