ba-thesis/sw/decoders/proximal.py

107 lines
3.6 KiB
Python

import numpy as np
from tqdm import tqdm
class ProximalDecoder:
"""Class implementing the Proximal Decoding algorithm. See "Proximal Decoding for LDPC Codes" by Tadashi
Wadayama, and Satoshi Takabe.
"""
# TODO: How large should K be?
# TODO: How large should eta be?
# TODO: How large should step_size be?
def __init__(self, H: np.array, K: int = 100, step_size: float = 0.5, gamma: float = 0.05, eta: float = 1.1):
"""Construct a new ProximalDecoder Object.
:param H: Parity Check Matrix
:param K: Max number of iterations to perform when decoding
:param step_size: Step size for the gradient descent process
:param gamma: Positive constant. Arises in the approximation of the prior PDF
:param eta: Positive constant slightly larger than one. See 3.2, p. 3
"""
self._H = H
self._K = K
self._step_size = step_size
self._gamma = gamma
self._eta = eta
@staticmethod
def _L_awgn(s: np.array, y: np.array) -> np.array:
"""Variation of the negative log-likelihood for the special case of AWGN noise. See 4.1, p. 4."""
return s - y
# TODO: Is this correct?
def _grad_h(self, x: np.array) -> np.array:
"""Gradient of the code-constraint polynomial. See 2.3, p. 2."""
# Calculate first term
result = 4 * (x**2 - 1) * x
# Calculate second term
for k, x_k in enumerate(x):
# TODO: Perform this operation for each row simultaneously
B_k = np.argwhere(self._H[:, k] == 1)
B_k = B_k[:, 0] # Get rid of one layer of arrays
# TODO: Perform the summation with np.sum()
sum_result = 0
for i in B_k:
# TODO: Perform this operation for each column simultaneously
A_i = np.argwhere(self._H[i] == 1)
A_i = A_i[:, 0] # Get rid of one layer of arrays
prod = 1
for j in A_i:
prod *= x[j]
sum_result += prod**2 - prod
term_2 = 2 / x_k * sum_result
result[k] += term_2
return np.array(result)
# TODO: Is this correct?
def _projection(self, x):
"""Project a vector onto [-eta, eta]^n in order to avoid numerical instability.
Detailed in 3.2, p. 3 (Equation (15)).
:param x:
:return: x clipped to [-eta, eta]^n
"""
return np.clip(x, -self._eta, self._eta)
def _check_parity(self, y_hat: np.array) -> bool:
"""Perform a parity check for a given codeword.
:param y_hat: codeword to be checked (element of [-1, 1]^n)
:return: True if the parity check passes, i.e. the codeword is valid. False otherwise
"""
y_hat_binary = (y_hat == 1) * 1 # Map the codeword from [-1, 1]^n to [0, 1]^n
syndrome = np.dot(self._H, y_hat) % 2
return not np.any(syndrome)
def decode(self, y: np.array) -> np.array:
"""Decode a received signal. The algorithm is detailed in 3.2, p.3.
This function assumes a BPSK-like modulated signal ([-1, 1]^n instead of [0, 1]^n) and an AWGN channel.
:param y: Vector of received values. (y = x + n, where 'x' is element of [-1, 1]^m and 'n' is noise)
:return: Most probably sent symbol
"""
s = 0
x_hat = 0
for k in range(self._K):
r = s - self._step_size * self._L_awgn(s, y)
s = r - self._gamma * self._grad_h(r)
s = self._projection(s) # Equation (15)
x_hat = (np.sign(s) == 1) * 1
if self._check_parity(x_hat):
break
return x_hat