import numpy as np class ProximalDecoder: """Class implementing the Proximal Decoding algorithm. See "Proximal Decoding for LDPC Codes" by Tadashi Wadayama, and Satoshi Takabe. """ # TODO: Is 'R' actually called 'decoding matrix'? # TODO: How large should eta be? # TODO: How large should step_size be? def __init__(self, H: np.array, R: np.array, K: int = 100, step_size: float = 0.1, gamma: float = 0.05, eta: float = 1.5): """Construct a new ProximalDecoder Object. :param H: Parity Check Matrix :param R: Decoding matrix :param K: Max number of iterations to perform when decoding :param step_size: Step size for the gradient descent process :param gamma: Positive constant. Arises in the approximation of the prior PDF :param eta: Positive constant slightly larger than one. See 3.2, p. 3 """ self._H = H self._R = R self._K = K self._step_size = step_size self._gamma = gamma self._eta = eta self._A = [] self._B = [] for row in self._H: A_k = np.argwhere(row == 1) self._A.append(A_k[:, 0]) for column in self._H.T: B_k = np.argwhere(column == 1) self._B.append(B_k[:, 0]) @staticmethod def _L_awgn(s: np.array, y: np.array) -> np.array: """Variation of the negative log-likelihood for the special case of AWGN noise. See 4.1, p. 4. """ return s - y # TODO: Is this correct? def _grad_h(self, x: np.array) -> np.array: """Gradient of the code-constraint polynomial. See 2.3, p. 2.""" # Calculate first term result = 4 * (x**2 - 1) * x # Calculate second term for k, x_k in enumerate(x): sum_result = 0 for i in self._B[k]: prod = np.prod(x[self._A[i]]) sum_result += prod**2 - prod term_2 = 2 / x_k * sum_result result[k] += term_2 return np.array(result) # TODO: Is the 'projection onto [-eta, eta]' actually just clipping? def _projection(self, x): """Project a vector onto [-eta, eta]^n in order to avoid numerical instability. Detailed in 3.2, p. 3 (Equation (15)). :param x: :return: x clipped to [-eta, eta]^n """ return np.clip(x, -self._eta, self._eta) def _check_parity(self, y_hat: np.array) -> bool: """Perform a parity check for a given codeword. :param y_hat: codeword to be checked (element of [0, 1]^n) :return: True if the parity check passes, i.e. the codeword is valid. False otherwise """ syndrome = np.dot(self._H, y_hat) % 2 return not np.any(syndrome) def decode(self, y: np.array) -> np.array: """Decode a received signal. The algorithm is detailed in 3.2, p.3. This function assumes a BPSK-like modulated signal ([-1, 1]^n instead of [0, 1]^n) and an AWGN channel. :param y: Vector of received values. (y = x + w, where 'x' is element of [-1, 1]^n and 'w' is noise) :return: Most probably sent dataword (element of [0, 1]^k) """ s = np.zeros(y.size) x_hat = np.zeros(y.size) for k in range(self._K): r = s - self._step_size * self._L_awgn(s, y) s = r - self._gamma * self._grad_h(r) s = self._projection(s) # Equation (15) x_hat = np.sign(s) x_hat = (x_hat == 1) * 1 # Map the codeword from [-1, 1]^n to [0, 1]^n if self._check_parity(x_hat): break return np.dot(self._R, x_hat)