Compare commits

...

4 Commits

3 changed files with 126 additions and 116 deletions

92
docs/implementation.md Normal file
View File

@ -0,0 +1,92 @@
# Homotopy Continuation Decoder Implementation
## Path Tracing
As explained in the introduction, we use a predictor-corrector scheme, with an
Euler predictor and a Newton corrector.
We perform one prediction step, followed by corrector steps until some
convergence criterion is satisfied (or we fail). If convergence fails, we
adjust the predictor step size and try again, as described in [1, p.130]. The
implementation of this process we chose can be described in pseudo code as
follows:
```
func perform_prediction_step(y) {...}
func perform_correction_step(y, step_size) {...}
func perform_step(y0) {
for i in range(max_retries) {
step_size = step_size / 2;
// Euler predictor
y = perform_prediction_step(y0);
// Newton corrector
for k in range(max_corrector_iterations) {
y = perform_correction_step(y, step_size);
}
if (corrector converged) break;
}
return y;
}
```
## Channel decoding
### Homotopy definition
We previously introduced the following set of polynomial equations:
$$
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
$$
A naive approach to now define a homotopy from this is to define
$$
G(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \\ x_1\\ \vdots\\ x_m\end{array}\right],
$$
whose zeros are trivial to find, and then use
$H(\bm{x}, t) = (1-t)G(\bm{x}) + tF(\bm{x})$. However, with this approach the
system is overdefined, which leads to problems. Intuitively, what we are doing
with the Euler predictor is locally linearize the solution curve of
$H(\bm{x}, t) = \bm{0}$. If the system is overdefined, there may not be a
solution.
We instead need to find a new set of fewer polynomials with the same set of
zeros. We can find a Gröbner basis of our polynomial system. If the number of
polynomials this yields is the same as the number of variable nodes, we can
then use this new polynomial system $\tilde{F}(\bm{x})$ to define our homotopy
as
$$
\tilde{G}(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \end{array}\right], \hspace{3mm} H(\bm{x}, t) = (1-t)\tilde{G}(\bm{x}) + t\tilde{F}(\bm{x})
$$
### Decoding algorithm
```
func decode(y) {
for i in range(max_iterations) {
y = perform_step(y);
x_hat = hard_decision(y);
if (H @ x_hat == 0) return x_hat;
}
return x_hat; # If we don't converge, we still return the last
# estimate in the hopes that it will limit the BER
}
```
______________________________________________________________________
## References
\[1\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
of nonlinear and polynomial equations,” Communications in Information and
Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.

View File

@ -1,6 +1,7 @@
import argparse
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# autopep8: off
import sys
@ -12,108 +13,41 @@ from hccd import path_tracker, homotopy_generator
# autopep8: on
# class RepetitionCodeHomotopy:
# """Helper type implementing necessary functions for PathTracker.
#
# Repetiton code homotopy:
# G = [[x1],
# [x2],
# [x1]]
#
# F = [[1 - x1**2],
# [1 - x2**2],
# [1 - x1*x2]]
#
# H = (1-t)*G + t*F
#
# Note that
# y := [[x1],
# [x2],
# [t]]
# """
# @staticmethod
# def evaluate_H(y: np.ndarray) -> np.ndarray:
# """Evaluate H at y."""
# x1 = y[0]
# x2 = y[1]
# t = y[2]
#
# print(y)
#
# result = np.zeros(shape=3)
# result[0] = -t*x1**2 + x1*(1-t) + t
# result[1] = -t*x2**2 + x2*(1-t) + t
# result[2] = -t*x1*x2 + x1*(1-t) + t
#
# return result
#
# @staticmethod
# def evaluate_DH(y: np.ndarray) -> np.ndarray:
# """Evaluate Jacobian of H at y."""
# x1 = y[0]
# x2 = y[1]
# t = y[2]
#
# result = np.zeros(shape=(3, 3))
# result[0, 0] = -2*t*x1 + (1-t)
# result[0, 1] = 0
# result[0, 2] = -x1**2 - x1 + 1
# result[1, 0] = 0
# result[1, 1] = -2*t*x2 + (1-t)
# result[1, 2] = -x2**2 - x2 + 1
# result[1, 0] = -t*x2 + (1-t)
# result[1, 1] = -t*x1
# result[1, 2] = -x1*x2 - x1 + 1
#
# return result
def track_path(args):
H = np.array([[1, 1, 1]])
H = np.array([[1, 1, 0, 0],
[0, 1, 1, 0],
[0, 0, 1, 1]])
# TODO: Make this work
homotopy = homotopy_generator.HomotopyGenerator(H)
print(f"G: {homotopy.G}")
print(f"F: {homotopy.F}")
print(f"H: {homotopy.H}")
print(f"DH: {homotopy.DH}")
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
try:
y = np.zeros(3)
for i in range(args.num_iterations):
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
ys_start.append(y_start)
ys_prime.append(y_prime)
ys_hat_e.append(y_hat_e)
ys.append(y)
print(f"Iteration {i}: {y}")
except Exception as e:
print(f"Error: {e}")
y = np.zeros(5) + np.array([0.5, 0.48, -0.1, 0.2, 0])
for i in range(args.num_iterations):
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
ys_start.append(y_start)
ys_prime.append(y_prime)
ys_hat_e.append(y_hat_e)
ys.append(y)
print(f"Iteration {i}: {y}")
ys_start = np.array(ys_start)
ys_prime = np.array(ys_prime)
ys_hat_e = np.array(ys_hat_e)
ys = np.array(ys)
df = pd.DataFrame({"x1b": ys_start[:, 0],
"x2b": ys_start[:, 1],
"tb": ys_start[:, 2],
"x1p": ys_prime[:, 0],
"x2p": ys_prime[:, 1],
"tp": ys_prime[:, 2],
"x1e": ys_hat_e[:, 0],
"x2e": ys_hat_e[:, 1],
"te": ys_hat_e[:, 2],
"x1n": ys[:, 0],
"x2n": ys[:, 1],
"tn": ys[:, 2]
})
for y in np.transpose(ys):
plt.plot(y)
if args.output:
df.to_csv(args.output, index=False)
else:
print(df)
plt.show()
def main():
@ -131,7 +65,6 @@ def main():
default=0.01, help="Convergence threshold for Newton corrector")
parser.add_argument("-s", "--sigma", type=int, default=1,
help="Direction in which the path is traced")
parser.add_argument("-o", "--output", type=str, help="Output csv file")
parser.add_argument("-n", "--num-iterations", type=int, default=20,
help="Number of iterations of the example program to run")

View File

@ -17,78 +17,63 @@ class HomotopyGenerator:
self.H_matrix = parity_check_matrix
self.num_checks, self.num_vars = parity_check_matrix.shape
# Create symbolic variables
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
self.t = sp.symbols('t')
# Generate G, F, and H
self.G = self._create_G()
self.F = self._create_F()
self.H = self._create_H()
self.DH = self._create_DH(self.H)
# Convert to callable functions
self._H_lambda = self._create_H_lambda()
self._DH_lambda = self._create_DH_lambda()
def _create_G(self) -> List[sp.Expr]:
"""Create G polynomial system (the starting system)."""
# For each variable xi, add the polynomial [xi]
G = []
for var in self.x_vars:
G.append(var)
return G
def _create_F(self) -> List[sp.Expr]:
"""Create F polynomial system (the target system)."""
def _create_F(self) -> sp.MutableMatrix:
F = []
# Add 1 - xi^2 for each variable
for var in self.x_vars:
F.append(1 - var**2)
# Add parity check polynomials: 1 - x1*x2*...*xk for each parity check
for row in self.H_matrix:
# Create product of variables that participate in this check
term = 1
for i, bit in enumerate(row):
if bit == 1:
term *= self.x_vars[i]
if term != 1: # Only add if there are variables in this check
F.append(1 - term)
F.append(1 - term)
return F
groebner_basis = sp.groebner(F)
return sp.MutableMatrix(groebner_basis)
def _create_H(self) -> List[sp.Expr]:
"""Create the homotopy H = (1-t)*G + t*F."""
H = []
# Make sure G and F have the same length
# Repeat variables from G if needed to match F's length
G_extended = self.G.copy()
while len(G_extended) < len(self.F):
# Cycle through variables to repeat
for i in range(min(self.num_vars, len(self.F) - len(G_extended))):
G_extended.append(self.x_vars[i % self.num_vars])
# Create the homotopy
for g, f in zip(G_extended, self.F):
for g, f in zip(self.G, self.F):
H.append((1 - self.t) * g + self.t * f)
return H
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix:
all_vars = self.x_vars + [self.t]
DH = sp.Matrix([[sp.diff(expr, var)
for var in all_vars] for expr in self.H])
return DH
def _create_H_lambda(self) -> Callable:
"""Create a lambda function to evaluate H."""
all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.H, 'numpy')
def _create_DH_lambda(self) -> Callable:
"""Create a lambda function to evaluate the Jacobian of H."""
all_vars = self.x_vars + [self.t]
jacobian = sp.Matrix([[sp.diff(expr, var)
for var in all_vars] for expr in self.H])
return sp.lambdify(all_vars, jacobian, 'numpy')
return sp.lambdify(all_vars, self.DH, 'numpy')
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
"""