Compare commits
4 Commits
38c28caf87
...
d5cf6dacf7
| Author | SHA1 | Date | |
|---|---|---|---|
| d5cf6dacf7 | |||
| 97fc0695bd | |||
| 2143f29885 | |||
| c2ab678c17 |
92
docs/implementation.md
Normal file
92
docs/implementation.md
Normal file
@ -0,0 +1,92 @@
|
||||
# Homotopy Continuation Decoder Implementation
|
||||
|
||||
## Path Tracing
|
||||
|
||||
As explained in the introduction, we use a predictor-corrector scheme, with an
|
||||
Euler predictor and a Newton corrector.
|
||||
|
||||
We perform one prediction step, followed by corrector steps until some
|
||||
convergence criterion is satisfied (or we fail). If convergence fails, we
|
||||
adjust the predictor step size and try again, as described in [1, p.130]. The
|
||||
implementation of this process we chose can be described in pseudo code as
|
||||
follows:
|
||||
|
||||
```
|
||||
func perform_prediction_step(y) {...}
|
||||
func perform_correction_step(y, step_size) {...}
|
||||
|
||||
func perform_step(y0) {
|
||||
for i in range(max_retries) {
|
||||
step_size = step_size / 2;
|
||||
|
||||
// Euler predictor
|
||||
|
||||
y = perform_prediction_step(y0);
|
||||
|
||||
// Newton corrector
|
||||
|
||||
for k in range(max_corrector_iterations) {
|
||||
y = perform_correction_step(y, step_size);
|
||||
}
|
||||
if (corrector converged) break;
|
||||
}
|
||||
|
||||
return y;
|
||||
}
|
||||
```
|
||||
|
||||
## Channel decoding
|
||||
|
||||
### Homotopy definition
|
||||
|
||||
We previously introduced the following set of polynomial equations:
|
||||
|
||||
$$
|
||||
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
|
||||
$$
|
||||
|
||||
A naive approach to now define a homotopy from this is to define
|
||||
|
||||
$$
|
||||
G(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \\ x_1\\ \vdots\\ x_m\end{array}\right],
|
||||
$$
|
||||
|
||||
whose zeros are trivial to find, and then use
|
||||
$H(\bm{x}, t) = (1-t)G(\bm{x}) + tF(\bm{x})$. However, with this approach the
|
||||
system is overdefined, which leads to problems. Intuitively, what we are doing
|
||||
with the Euler predictor is locally linearize the solution curve of
|
||||
$H(\bm{x}, t) = \bm{0}$. If the system is overdefined, there may not be a
|
||||
solution.
|
||||
|
||||
We instead need to find a new set of fewer polynomials with the same set of
|
||||
zeros. We can find a Gröbner basis of our polynomial system. If the number of
|
||||
polynomials this yields is the same as the number of variable nodes, we can
|
||||
then use this new polynomial system $\tilde{F}(\bm{x})$ to define our homotopy
|
||||
as
|
||||
|
||||
$$
|
||||
\tilde{G}(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \end{array}\right], \hspace{3mm} H(\bm{x}, t) = (1-t)\tilde{G}(\bm{x}) + t\tilde{F}(\bm{x})
|
||||
$$
|
||||
|
||||
### Decoding algorithm
|
||||
|
||||
```
|
||||
func decode(y) {
|
||||
for i in range(max_iterations) {
|
||||
y = perform_step(y);
|
||||
|
||||
x_hat = hard_decision(y);
|
||||
if (H @ x_hat == 0) return x_hat;
|
||||
}
|
||||
return x_hat; # If we don't converge, we still return the last
|
||||
# estimate in the hopes that it will limit the BER
|
||||
}
|
||||
```
|
||||
|
||||
______________________________________________________________________
|
||||
|
||||
## References
|
||||
|
||||
\[1\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
|
||||
of nonlinear and polynomial equations,” Communications in Information and
|
||||
Systems, vol. 15, no. 2, pp. 119–307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.
|
||||
@ -1,6 +1,7 @@
|
||||
import argparse
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# autopep8: off
|
||||
import sys
|
||||
@ -12,108 +13,41 @@ from hccd import path_tracker, homotopy_generator
|
||||
# autopep8: on
|
||||
|
||||
|
||||
# class RepetitionCodeHomotopy:
|
||||
# """Helper type implementing necessary functions for PathTracker.
|
||||
#
|
||||
# Repetiton code homotopy:
|
||||
# G = [[x1],
|
||||
# [x2],
|
||||
# [x1]]
|
||||
#
|
||||
# F = [[1 - x1**2],
|
||||
# [1 - x2**2],
|
||||
# [1 - x1*x2]]
|
||||
#
|
||||
# H = (1-t)*G + t*F
|
||||
#
|
||||
# Note that
|
||||
# y := [[x1],
|
||||
# [x2],
|
||||
# [t]]
|
||||
# """
|
||||
# @staticmethod
|
||||
# def evaluate_H(y: np.ndarray) -> np.ndarray:
|
||||
# """Evaluate H at y."""
|
||||
# x1 = y[0]
|
||||
# x2 = y[1]
|
||||
# t = y[2]
|
||||
#
|
||||
# print(y)
|
||||
#
|
||||
# result = np.zeros(shape=3)
|
||||
# result[0] = -t*x1**2 + x1*(1-t) + t
|
||||
# result[1] = -t*x2**2 + x2*(1-t) + t
|
||||
# result[2] = -t*x1*x2 + x1*(1-t) + t
|
||||
#
|
||||
# return result
|
||||
#
|
||||
# @staticmethod
|
||||
# def evaluate_DH(y: np.ndarray) -> np.ndarray:
|
||||
# """Evaluate Jacobian of H at y."""
|
||||
# x1 = y[0]
|
||||
# x2 = y[1]
|
||||
# t = y[2]
|
||||
#
|
||||
# result = np.zeros(shape=(3, 3))
|
||||
# result[0, 0] = -2*t*x1 + (1-t)
|
||||
# result[0, 1] = 0
|
||||
# result[0, 2] = -x1**2 - x1 + 1
|
||||
# result[1, 0] = 0
|
||||
# result[1, 1] = -2*t*x2 + (1-t)
|
||||
# result[1, 2] = -x2**2 - x2 + 1
|
||||
# result[1, 0] = -t*x2 + (1-t)
|
||||
# result[1, 1] = -t*x1
|
||||
# result[1, 2] = -x1*x2 - x1 + 1
|
||||
#
|
||||
# return result
|
||||
|
||||
|
||||
def track_path(args):
|
||||
H = np.array([[1, 1, 1]])
|
||||
H = np.array([[1, 1, 0, 0],
|
||||
[0, 1, 1, 0],
|
||||
[0, 0, 1, 1]])
|
||||
|
||||
# TODO: Make this work
|
||||
homotopy = homotopy_generator.HomotopyGenerator(H)
|
||||
|
||||
print(f"G: {homotopy.G}")
|
||||
print(f"F: {homotopy.F}")
|
||||
print(f"H: {homotopy.H}")
|
||||
print(f"DH: {homotopy.DH}")
|
||||
|
||||
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||
|
||||
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
|
||||
|
||||
try:
|
||||
y = np.zeros(3)
|
||||
for i in range(args.num_iterations):
|
||||
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
||||
ys_start.append(y_start)
|
||||
ys_prime.append(y_prime)
|
||||
ys_hat_e.append(y_hat_e)
|
||||
ys.append(y)
|
||||
print(f"Iteration {i}: {y}")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
y = np.zeros(5) + np.array([0.5, 0.48, -0.1, 0.2, 0])
|
||||
for i in range(args.num_iterations):
|
||||
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
||||
ys_start.append(y_start)
|
||||
ys_prime.append(y_prime)
|
||||
ys_hat_e.append(y_hat_e)
|
||||
ys.append(y)
|
||||
print(f"Iteration {i}: {y}")
|
||||
|
||||
ys_start = np.array(ys_start)
|
||||
ys_prime = np.array(ys_prime)
|
||||
ys_hat_e = np.array(ys_hat_e)
|
||||
ys = np.array(ys)
|
||||
|
||||
df = pd.DataFrame({"x1b": ys_start[:, 0],
|
||||
"x2b": ys_start[:, 1],
|
||||
"tb": ys_start[:, 2],
|
||||
"x1p": ys_prime[:, 0],
|
||||
"x2p": ys_prime[:, 1],
|
||||
"tp": ys_prime[:, 2],
|
||||
"x1e": ys_hat_e[:, 0],
|
||||
"x2e": ys_hat_e[:, 1],
|
||||
"te": ys_hat_e[:, 2],
|
||||
"x1n": ys[:, 0],
|
||||
"x2n": ys[:, 1],
|
||||
"tn": ys[:, 2]
|
||||
})
|
||||
for y in np.transpose(ys):
|
||||
plt.plot(y)
|
||||
|
||||
if args.output:
|
||||
df.to_csv(args.output, index=False)
|
||||
else:
|
||||
print(df)
|
||||
plt.show()
|
||||
|
||||
|
||||
def main():
|
||||
@ -131,7 +65,6 @@ def main():
|
||||
default=0.01, help="Convergence threshold for Newton corrector")
|
||||
parser.add_argument("-s", "--sigma", type=int, default=1,
|
||||
help="Direction in which the path is traced")
|
||||
parser.add_argument("-o", "--output", type=str, help="Output csv file")
|
||||
parser.add_argument("-n", "--num-iterations", type=int, default=20,
|
||||
help="Number of iterations of the example program to run")
|
||||
|
||||
|
||||
@ -17,78 +17,63 @@ class HomotopyGenerator:
|
||||
self.H_matrix = parity_check_matrix
|
||||
self.num_checks, self.num_vars = parity_check_matrix.shape
|
||||
|
||||
# Create symbolic variables
|
||||
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
||||
self.t = sp.symbols('t')
|
||||
|
||||
# Generate G, F, and H
|
||||
self.G = self._create_G()
|
||||
self.F = self._create_F()
|
||||
self.H = self._create_H()
|
||||
self.DH = self._create_DH(self.H)
|
||||
|
||||
# Convert to callable functions
|
||||
self._H_lambda = self._create_H_lambda()
|
||||
self._DH_lambda = self._create_DH_lambda()
|
||||
|
||||
def _create_G(self) -> List[sp.Expr]:
|
||||
"""Create G polynomial system (the starting system)."""
|
||||
# For each variable xi, add the polynomial [xi]
|
||||
G = []
|
||||
for var in self.x_vars:
|
||||
G.append(var)
|
||||
|
||||
return G
|
||||
|
||||
def _create_F(self) -> List[sp.Expr]:
|
||||
"""Create F polynomial system (the target system)."""
|
||||
def _create_F(self) -> sp.MutableMatrix:
|
||||
F = []
|
||||
|
||||
# Add 1 - xi^2 for each variable
|
||||
for var in self.x_vars:
|
||||
F.append(1 - var**2)
|
||||
|
||||
# Add parity check polynomials: 1 - x1*x2*...*xk for each parity check
|
||||
for row in self.H_matrix:
|
||||
# Create product of variables that participate in this check
|
||||
term = 1
|
||||
for i, bit in enumerate(row):
|
||||
if bit == 1:
|
||||
term *= self.x_vars[i]
|
||||
|
||||
if term != 1: # Only add if there are variables in this check
|
||||
F.append(1 - term)
|
||||
F.append(1 - term)
|
||||
|
||||
return F
|
||||
groebner_basis = sp.groebner(F)
|
||||
return sp.MutableMatrix(groebner_basis)
|
||||
|
||||
def _create_H(self) -> List[sp.Expr]:
|
||||
"""Create the homotopy H = (1-t)*G + t*F."""
|
||||
H = []
|
||||
|
||||
# Make sure G and F have the same length
|
||||
# Repeat variables from G if needed to match F's length
|
||||
G_extended = self.G.copy()
|
||||
while len(G_extended) < len(self.F):
|
||||
# Cycle through variables to repeat
|
||||
for i in range(min(self.num_vars, len(self.F) - len(G_extended))):
|
||||
G_extended.append(self.x_vars[i % self.num_vars])
|
||||
|
||||
# Create the homotopy
|
||||
for g, f in zip(G_extended, self.F):
|
||||
for g, f in zip(self.G, self.F):
|
||||
H.append((1 - self.t) * g + self.t * f)
|
||||
|
||||
return H
|
||||
|
||||
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix:
|
||||
all_vars = self.x_vars + [self.t]
|
||||
DH = sp.Matrix([[sp.diff(expr, var)
|
||||
for var in all_vars] for expr in self.H])
|
||||
|
||||
return DH
|
||||
|
||||
def _create_H_lambda(self) -> Callable:
|
||||
"""Create a lambda function to evaluate H."""
|
||||
all_vars = self.x_vars + [self.t]
|
||||
return sp.lambdify(all_vars, self.H, 'numpy')
|
||||
|
||||
def _create_DH_lambda(self) -> Callable:
|
||||
"""Create a lambda function to evaluate the Jacobian of H."""
|
||||
all_vars = self.x_vars + [self.t]
|
||||
jacobian = sp.Matrix([[sp.diff(expr, var)
|
||||
for var in all_vars] for expr in self.H])
|
||||
return sp.lambdify(all_vars, jacobian, 'numpy')
|
||||
return sp.lambdify(all_vars, self.DH, 'numpy')
|
||||
|
||||
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user