Compare commits
No commits in common. "d5cf6dacf72fbfc1e791f3d5ea859a78196173ef" and "38c28caf877c962b67f56fbb1fa53d4480edbeb9" have entirely different histories.
d5cf6dacf7
...
38c28caf87
@ -1,92 +0,0 @@
|
|||||||
# Homotopy Continuation Decoder Implementation
|
|
||||||
|
|
||||||
## Path Tracing
|
|
||||||
|
|
||||||
As explained in the introduction, we use a predictor-corrector scheme, with an
|
|
||||||
Euler predictor and a Newton corrector.
|
|
||||||
|
|
||||||
We perform one prediction step, followed by corrector steps until some
|
|
||||||
convergence criterion is satisfied (or we fail). If convergence fails, we
|
|
||||||
adjust the predictor step size and try again, as described in [1, p.130]. The
|
|
||||||
implementation of this process we chose can be described in pseudo code as
|
|
||||||
follows:
|
|
||||||
|
|
||||||
```
|
|
||||||
func perform_prediction_step(y) {...}
|
|
||||||
func perform_correction_step(y, step_size) {...}
|
|
||||||
|
|
||||||
func perform_step(y0) {
|
|
||||||
for i in range(max_retries) {
|
|
||||||
step_size = step_size / 2;
|
|
||||||
|
|
||||||
// Euler predictor
|
|
||||||
|
|
||||||
y = perform_prediction_step(y0);
|
|
||||||
|
|
||||||
// Newton corrector
|
|
||||||
|
|
||||||
for k in range(max_corrector_iterations) {
|
|
||||||
y = perform_correction_step(y, step_size);
|
|
||||||
}
|
|
||||||
if (corrector converged) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return y;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## Channel decoding
|
|
||||||
|
|
||||||
### Homotopy definition
|
|
||||||
|
|
||||||
We previously introduced the following set of polynomial equations:
|
|
||||||
|
|
||||||
$$
|
|
||||||
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
|
|
||||||
$$
|
|
||||||
|
|
||||||
A naive approach to now define a homotopy from this is to define
|
|
||||||
|
|
||||||
$$
|
|
||||||
G(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \\ x_1\\ \vdots\\ x_m\end{array}\right],
|
|
||||||
$$
|
|
||||||
|
|
||||||
whose zeros are trivial to find, and then use
|
|
||||||
$H(\bm{x}, t) = (1-t)G(\bm{x}) + tF(\bm{x})$. However, with this approach the
|
|
||||||
system is overdefined, which leads to problems. Intuitively, what we are doing
|
|
||||||
with the Euler predictor is locally linearize the solution curve of
|
|
||||||
$H(\bm{x}, t) = \bm{0}$. If the system is overdefined, there may not be a
|
|
||||||
solution.
|
|
||||||
|
|
||||||
We instead need to find a new set of fewer polynomials with the same set of
|
|
||||||
zeros. We can find a Gröbner basis of our polynomial system. If the number of
|
|
||||||
polynomials this yields is the same as the number of variable nodes, we can
|
|
||||||
then use this new polynomial system $\tilde{F}(\bm{x})$ to define our homotopy
|
|
||||||
as
|
|
||||||
|
|
||||||
$$
|
|
||||||
\tilde{G}(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \end{array}\right], \hspace{3mm} H(\bm{x}, t) = (1-t)\tilde{G}(\bm{x}) + t\tilde{F}(\bm{x})
|
|
||||||
$$
|
|
||||||
|
|
||||||
### Decoding algorithm
|
|
||||||
|
|
||||||
```
|
|
||||||
func decode(y) {
|
|
||||||
for i in range(max_iterations) {
|
|
||||||
y = perform_step(y);
|
|
||||||
|
|
||||||
x_hat = hard_decision(y);
|
|
||||||
if (H @ x_hat == 0) return x_hat;
|
|
||||||
}
|
|
||||||
return x_hat; # If we don't converge, we still return the last
|
|
||||||
# estimate in the hopes that it will limit the BER
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
______________________________________________________________________
|
|
||||||
|
|
||||||
## References
|
|
||||||
|
|
||||||
\[1\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
|
|
||||||
of nonlinear and polynomial equations,” Communications in Information and
|
|
||||||
Systems, vol. 15, no. 2, pp. 119–307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.
|
|
||||||
@ -1,7 +1,6 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
|
|
||||||
# autopep8: off
|
# autopep8: off
|
||||||
import sys
|
import sys
|
||||||
@ -13,41 +12,108 @@ from hccd import path_tracker, homotopy_generator
|
|||||||
# autopep8: on
|
# autopep8: on
|
||||||
|
|
||||||
|
|
||||||
|
# class RepetitionCodeHomotopy:
|
||||||
|
# """Helper type implementing necessary functions for PathTracker.
|
||||||
|
#
|
||||||
|
# Repetiton code homotopy:
|
||||||
|
# G = [[x1],
|
||||||
|
# [x2],
|
||||||
|
# [x1]]
|
||||||
|
#
|
||||||
|
# F = [[1 - x1**2],
|
||||||
|
# [1 - x2**2],
|
||||||
|
# [1 - x1*x2]]
|
||||||
|
#
|
||||||
|
# H = (1-t)*G + t*F
|
||||||
|
#
|
||||||
|
# Note that
|
||||||
|
# y := [[x1],
|
||||||
|
# [x2],
|
||||||
|
# [t]]
|
||||||
|
# """
|
||||||
|
# @staticmethod
|
||||||
|
# def evaluate_H(y: np.ndarray) -> np.ndarray:
|
||||||
|
# """Evaluate H at y."""
|
||||||
|
# x1 = y[0]
|
||||||
|
# x2 = y[1]
|
||||||
|
# t = y[2]
|
||||||
|
#
|
||||||
|
# print(y)
|
||||||
|
#
|
||||||
|
# result = np.zeros(shape=3)
|
||||||
|
# result[0] = -t*x1**2 + x1*(1-t) + t
|
||||||
|
# result[1] = -t*x2**2 + x2*(1-t) + t
|
||||||
|
# result[2] = -t*x1*x2 + x1*(1-t) + t
|
||||||
|
#
|
||||||
|
# return result
|
||||||
|
#
|
||||||
|
# @staticmethod
|
||||||
|
# def evaluate_DH(y: np.ndarray) -> np.ndarray:
|
||||||
|
# """Evaluate Jacobian of H at y."""
|
||||||
|
# x1 = y[0]
|
||||||
|
# x2 = y[1]
|
||||||
|
# t = y[2]
|
||||||
|
#
|
||||||
|
# result = np.zeros(shape=(3, 3))
|
||||||
|
# result[0, 0] = -2*t*x1 + (1-t)
|
||||||
|
# result[0, 1] = 0
|
||||||
|
# result[0, 2] = -x1**2 - x1 + 1
|
||||||
|
# result[1, 0] = 0
|
||||||
|
# result[1, 1] = -2*t*x2 + (1-t)
|
||||||
|
# result[1, 2] = -x2**2 - x2 + 1
|
||||||
|
# result[1, 0] = -t*x2 + (1-t)
|
||||||
|
# result[1, 1] = -t*x1
|
||||||
|
# result[1, 2] = -x1*x2 - x1 + 1
|
||||||
|
#
|
||||||
|
# return result
|
||||||
|
|
||||||
|
|
||||||
def track_path(args):
|
def track_path(args):
|
||||||
H = np.array([[1, 1, 0, 0],
|
H = np.array([[1, 1, 1]])
|
||||||
[0, 1, 1, 0],
|
|
||||||
[0, 0, 1, 1]])
|
|
||||||
|
|
||||||
|
# TODO: Make this work
|
||||||
homotopy = homotopy_generator.HomotopyGenerator(H)
|
homotopy = homotopy_generator.HomotopyGenerator(H)
|
||||||
|
|
||||||
print(f"G: {homotopy.G}")
|
|
||||||
print(f"F: {homotopy.F}")
|
|
||||||
print(f"H: {homotopy.H}")
|
|
||||||
print(f"DH: {homotopy.DH}")
|
|
||||||
|
|
||||||
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
||||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||||
|
|
||||||
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
|
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
|
||||||
|
|
||||||
y = np.zeros(5) + np.array([0.5, 0.48, -0.1, 0.2, 0])
|
try:
|
||||||
for i in range(args.num_iterations):
|
y = np.zeros(3)
|
||||||
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
for i in range(args.num_iterations):
|
||||||
ys_start.append(y_start)
|
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
||||||
ys_prime.append(y_prime)
|
ys_start.append(y_start)
|
||||||
ys_hat_e.append(y_hat_e)
|
ys_prime.append(y_prime)
|
||||||
ys.append(y)
|
ys_hat_e.append(y_hat_e)
|
||||||
print(f"Iteration {i}: {y}")
|
ys.append(y)
|
||||||
|
print(f"Iteration {i}: {y}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
|
||||||
ys_start = np.array(ys_start)
|
ys_start = np.array(ys_start)
|
||||||
ys_prime = np.array(ys_prime)
|
ys_prime = np.array(ys_prime)
|
||||||
ys_hat_e = np.array(ys_hat_e)
|
ys_hat_e = np.array(ys_hat_e)
|
||||||
ys = np.array(ys)
|
ys = np.array(ys)
|
||||||
|
|
||||||
for y in np.transpose(ys):
|
df = pd.DataFrame({"x1b": ys_start[:, 0],
|
||||||
plt.plot(y)
|
"x2b": ys_start[:, 1],
|
||||||
|
"tb": ys_start[:, 2],
|
||||||
|
"x1p": ys_prime[:, 0],
|
||||||
|
"x2p": ys_prime[:, 1],
|
||||||
|
"tp": ys_prime[:, 2],
|
||||||
|
"x1e": ys_hat_e[:, 0],
|
||||||
|
"x2e": ys_hat_e[:, 1],
|
||||||
|
"te": ys_hat_e[:, 2],
|
||||||
|
"x1n": ys[:, 0],
|
||||||
|
"x2n": ys[:, 1],
|
||||||
|
"tn": ys[:, 2]
|
||||||
|
})
|
||||||
|
|
||||||
plt.show()
|
if args.output:
|
||||||
|
df.to_csv(args.output, index=False)
|
||||||
|
else:
|
||||||
|
print(df)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -65,6 +131,7 @@ def main():
|
|||||||
default=0.01, help="Convergence threshold for Newton corrector")
|
default=0.01, help="Convergence threshold for Newton corrector")
|
||||||
parser.add_argument("-s", "--sigma", type=int, default=1,
|
parser.add_argument("-s", "--sigma", type=int, default=1,
|
||||||
help="Direction in which the path is traced")
|
help="Direction in which the path is traced")
|
||||||
|
parser.add_argument("-o", "--output", type=str, help="Output csv file")
|
||||||
parser.add_argument("-n", "--num-iterations", type=int, default=20,
|
parser.add_argument("-n", "--num-iterations", type=int, default=20,
|
||||||
help="Number of iterations of the example program to run")
|
help="Number of iterations of the example program to run")
|
||||||
|
|
||||||
|
|||||||
@ -17,63 +17,78 @@ class HomotopyGenerator:
|
|||||||
self.H_matrix = parity_check_matrix
|
self.H_matrix = parity_check_matrix
|
||||||
self.num_checks, self.num_vars = parity_check_matrix.shape
|
self.num_checks, self.num_vars = parity_check_matrix.shape
|
||||||
|
|
||||||
|
# Create symbolic variables
|
||||||
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
||||||
self.t = sp.symbols('t')
|
self.t = sp.symbols('t')
|
||||||
|
|
||||||
|
# Generate G, F, and H
|
||||||
self.G = self._create_G()
|
self.G = self._create_G()
|
||||||
self.F = self._create_F()
|
self.F = self._create_F()
|
||||||
self.H = self._create_H()
|
self.H = self._create_H()
|
||||||
self.DH = self._create_DH(self.H)
|
|
||||||
|
|
||||||
|
# Convert to callable functions
|
||||||
self._H_lambda = self._create_H_lambda()
|
self._H_lambda = self._create_H_lambda()
|
||||||
self._DH_lambda = self._create_DH_lambda()
|
self._DH_lambda = self._create_DH_lambda()
|
||||||
|
|
||||||
def _create_G(self) -> List[sp.Expr]:
|
def _create_G(self) -> List[sp.Expr]:
|
||||||
|
"""Create G polynomial system (the starting system)."""
|
||||||
|
# For each variable xi, add the polynomial [xi]
|
||||||
G = []
|
G = []
|
||||||
for var in self.x_vars:
|
for var in self.x_vars:
|
||||||
G.append(var)
|
G.append(var)
|
||||||
|
|
||||||
return G
|
return G
|
||||||
|
|
||||||
def _create_F(self) -> sp.MutableMatrix:
|
def _create_F(self) -> List[sp.Expr]:
|
||||||
|
"""Create F polynomial system (the target system)."""
|
||||||
F = []
|
F = []
|
||||||
|
|
||||||
|
# Add 1 - xi^2 for each variable
|
||||||
for var in self.x_vars:
|
for var in self.x_vars:
|
||||||
F.append(1 - var**2)
|
F.append(1 - var**2)
|
||||||
|
|
||||||
|
# Add parity check polynomials: 1 - x1*x2*...*xk for each parity check
|
||||||
for row in self.H_matrix:
|
for row in self.H_matrix:
|
||||||
|
# Create product of variables that participate in this check
|
||||||
term = 1
|
term = 1
|
||||||
for i, bit in enumerate(row):
|
for i, bit in enumerate(row):
|
||||||
if bit == 1:
|
if bit == 1:
|
||||||
term *= self.x_vars[i]
|
term *= self.x_vars[i]
|
||||||
|
|
||||||
F.append(1 - term)
|
if term != 1: # Only add if there are variables in this check
|
||||||
|
F.append(1 - term)
|
||||||
|
|
||||||
groebner_basis = sp.groebner(F)
|
return F
|
||||||
return sp.MutableMatrix(groebner_basis)
|
|
||||||
|
|
||||||
def _create_H(self) -> List[sp.Expr]:
|
def _create_H(self) -> List[sp.Expr]:
|
||||||
|
"""Create the homotopy H = (1-t)*G + t*F."""
|
||||||
H = []
|
H = []
|
||||||
|
|
||||||
for g, f in zip(self.G, self.F):
|
# Make sure G and F have the same length
|
||||||
|
# Repeat variables from G if needed to match F's length
|
||||||
|
G_extended = self.G.copy()
|
||||||
|
while len(G_extended) < len(self.F):
|
||||||
|
# Cycle through variables to repeat
|
||||||
|
for i in range(min(self.num_vars, len(self.F) - len(G_extended))):
|
||||||
|
G_extended.append(self.x_vars[i % self.num_vars])
|
||||||
|
|
||||||
|
# Create the homotopy
|
||||||
|
for g, f in zip(G_extended, self.F):
|
||||||
H.append((1 - self.t) * g + self.t * f)
|
H.append((1 - self.t) * g + self.t * f)
|
||||||
|
|
||||||
return H
|
return H
|
||||||
|
|
||||||
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix:
|
|
||||||
all_vars = self.x_vars + [self.t]
|
|
||||||
DH = sp.Matrix([[sp.diff(expr, var)
|
|
||||||
for var in all_vars] for expr in self.H])
|
|
||||||
|
|
||||||
return DH
|
|
||||||
|
|
||||||
def _create_H_lambda(self) -> Callable:
|
def _create_H_lambda(self) -> Callable:
|
||||||
|
"""Create a lambda function to evaluate H."""
|
||||||
all_vars = self.x_vars + [self.t]
|
all_vars = self.x_vars + [self.t]
|
||||||
return sp.lambdify(all_vars, self.H, 'numpy')
|
return sp.lambdify(all_vars, self.H, 'numpy')
|
||||||
|
|
||||||
def _create_DH_lambda(self) -> Callable:
|
def _create_DH_lambda(self) -> Callable:
|
||||||
|
"""Create a lambda function to evaluate the Jacobian of H."""
|
||||||
all_vars = self.x_vars + [self.t]
|
all_vars = self.x_vars + [self.t]
|
||||||
return sp.lambdify(all_vars, self.DH, 'numpy')
|
jacobian = sp.Matrix([[sp.diff(expr, var)
|
||||||
|
for var in all_vars] for expr in self.H])
|
||||||
|
return sp.lambdify(all_vars, jacobian, 'numpy')
|
||||||
|
|
||||||
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
|
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
|
||||||
"""
|
"""
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user