Compare commits

...

16 Commits

10 changed files with 585 additions and 180 deletions

View File

@@ -1,6 +1,8 @@
# Homotopy Continuation # Homotopy Continuation
### Introduction ## Introduction
### Overview
The aim of a homotopy method consists in solving a system of N nonlinear The aim of a homotopy method consists in solving a system of N nonlinear
equations in N variables \[1, p.1\]: equations in N variables \[1, p.1\]:
@@ -85,6 +87,26 @@ between successive points produced by the iterations can be used as a criterion
for convergence. Of course, if the iterations fail to converge, one must go for convergence. Of course, if the iterations fail to converge, one must go
back to adjust the step size for the Eulers predictor." [2, p.130] back to adjust the step size for the Eulers predictor." [2, p.130]
## Application to Channel Decoding
We can describe the decoding problem using the code constraint polynomial [3]
$$
h(\bm{x}) = \underbrace{\sum_{i=1}^{n}\left(1-x_i^2\right)^2}_{\text{Bipolar constraint}} + \underbrace{\sum_{j=1}^{m}\left(1 - \left(\prod_{i\in A(j)}x_i\right)\right)^2}_{\text{Parity constraint}},
$$
where $A(j) = \left\{i \in [1:n]: H_{j,i} = 1\right\}$ represents the set of
variable nodes involved in parity check j. This polynomial consists of a set of
terms representing the bipolar constraint and a set of terms representing the
parity constraint. In a similar vein, we can define the following set of
polynomial equations to describe codewords:
$$
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
$$
This is a problem we can solve using homotopy continuation.
______________________________________________________________________ ______________________________________________________________________
## References ## References
@@ -97,3 +119,7 @@ Philadelphia, PA 19104), 2003. doi: 10.1137/1.9780898719154.
\[2\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems \[2\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
of nonlinear and polynomial equations,” Communications in Information and of nonlinear and polynomial equations,” Communications in Information and
Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1. Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.
\[3\]: Wadayama, Tadashi, and Satoshi Takabe. "Proximal decoding for LDPC
codes." IEICE Transactions on Fundamentals of Electronics, Communications and
Computer Sciences 106.3 (2023): 359-367.

92
docs/implementation.md Normal file
View File

@@ -0,0 +1,92 @@
# Homotopy Continuation Decoder Implementation
## Path Tracing
As explained in the introduction, we use a predictor-corrector scheme, with an
Euler predictor and a Newton corrector.
We perform one prediction step, followed by corrector steps until some
convergence criterion is satisfied (or we fail). If convergence fails, we
adjust the predictor step size and try again, as described in [1, p.130]. The
implementation of this process we chose can be described in pseudo code as
follows:
```
func perform_prediction_step(y) {...}
func perform_correction_step(y, step_size) {...}
func perform_step(y0) {
for i in range(max_retries) {
step_size = step_size / 2;
// Euler predictor
y = perform_prediction_step(y0);
// Newton corrector
for k in range(max_corrector_iterations) {
y = perform_correction_step(y, step_size);
}
if (corrector converged) break;
}
return y;
}
```
## Channel decoding
### Homotopy definition
We previously introduced the following set of polynomial equations:
$$
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
$$
A naive approach to now define a homotopy from this is to define
$$
G(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \\ x_1\\ \vdots\\ x_m\end{array}\right],
$$
whose zeros are trivial to find, and then use
$H(\bm{x}, t) = (1-t)G(\bm{x}) + tF(\bm{x})$. However, with this approach the
system is overdefined, which leads to problems. Intuitively, what we are doing
with the Euler predictor is locally linearize the solution curve of
$H(\bm{x}, t) = \bm{0}$. If the system is overdefined, there may not be a
solution.
We instead need to find a new set of fewer polynomials with the same set of
zeros. We can find a Gröbner basis of our polynomial system. If the number of
polynomials this yields is the same as the number of variable nodes, we can
then use this new polynomial system $\tilde{F}(\bm{x})$ to define our homotopy
as
$$
\tilde{G}(\bm{x}) = \left[\begin{array}{c}x_1\\ \vdots\\ x_n \end{array}\right], \hspace{3mm} H(\bm{x}, t) = (1-t)\tilde{G}(\bm{x}) + t\tilde{F}(\bm{x})
$$
### Decoding algorithm
```
func decode(y) {
for i in range(max_iterations) {
y = perform_step(y);
x_hat = hard_decision(y);
if (H @ x_hat == 0) return x_hat;
}
return x_hat; # If we don't converge, we still return the last
# estimate in the hopes that it will limit the BER
}
```
______________________________________________________________________
## References
\[1\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
of nonlinear and polynomial equations,” Communications in Information and
Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.

View File

@@ -1,6 +1,7 @@
import argparse import argparse
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt
# autopep8: off # autopep8: off
import sys import sys
@@ -12,108 +13,43 @@ from hccd import path_tracker, homotopy_generator
# autopep8: on # autopep8: on
# class RepetitionCodeHomotopy:
# """Helper type implementing necessary functions for PathTracker.
#
# Repetiton code homotopy:
# G = [[x1],
# [x2],
# [x1]]
#
# F = [[1 - x1**2],
# [1 - x2**2],
# [1 - x1*x2]]
#
# H = (1-t)*G + t*F
#
# Note that
# y := [[x1],
# [x2],
# [t]]
# """
# @staticmethod
# def evaluate_H(y: np.ndarray) -> np.ndarray:
# """Evaluate H at y."""
# x1 = y[0]
# x2 = y[1]
# t = y[2]
#
# print(y)
#
# result = np.zeros(shape=3)
# result[0] = -t*x1**2 + x1*(1-t) + t
# result[1] = -t*x2**2 + x2*(1-t) + t
# result[2] = -t*x1*x2 + x1*(1-t) + t
#
# return result
#
# @staticmethod
# def evaluate_DH(y: np.ndarray) -> np.ndarray:
# """Evaluate Jacobian of H at y."""
# x1 = y[0]
# x2 = y[1]
# t = y[2]
#
# result = np.zeros(shape=(3, 3))
# result[0, 0] = -2*t*x1 + (1-t)
# result[0, 1] = 0
# result[0, 2] = -x1**2 - x1 + 1
# result[1, 0] = 0
# result[1, 1] = -2*t*x2 + (1-t)
# result[1, 2] = -x2**2 - x2 + 1
# result[1, 0] = -t*x2 + (1-t)
# result[1, 1] = -t*x1
# result[1, 2] = -x1*x2 - x1 + 1
#
# return result
def track_path(args): def track_path(args):
H = np.array([[1, 1, 1]]) H = np.array([[1, 1, 0, 0],
[0, 1, 1, 0],
[0, 0, 1, 1]])
# TODO: Make this work received = np.array([0, 0, 0, 0])
homotopy = homotopy_generator.HomotopyGenerator(H)
homotopy = homotopy_generator.HomotopyGenerator(H, received)
# print(f"G: {homotopy.G}")
# print(f"F: {homotopy.F}")
# print(f"H: {homotopy.H}")
# print(f"DH: {homotopy.DH}")
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma) args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
ys_start, ys_prime, ys_hat_e, ys = [], [], [], [] ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
try: y = np.zeros(5) + np.array([0.5, 0.48, -0.1, 0.2, 0])
y = np.zeros(3) for i in range(args.num_iterations):
for i in range(args.num_iterations): y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y) ys_start.append(y_start)
ys_start.append(y_start) ys_prime.append(y_prime)
ys_prime.append(y_prime) ys_hat_e.append(y_hat_e)
ys_hat_e.append(y_hat_e) ys.append(y)
ys.append(y) print(f"Iteration {i}: {y}")
print(f"Iteration {i}: {y}")
except Exception as e:
print(f"Error: {e}")
ys_start = np.array(ys_start) ys_start = np.array(ys_start)
ys_prime = np.array(ys_prime) ys_prime = np.array(ys_prime)
ys_hat_e = np.array(ys_hat_e) ys_hat_e = np.array(ys_hat_e)
ys = np.array(ys) ys = np.array(ys)
df = pd.DataFrame({"x1b": ys_start[:, 0], for y in np.transpose(ys):
"x2b": ys_start[:, 1], plt.plot(y)
"tb": ys_start[:, 2],
"x1p": ys_prime[:, 0],
"x2p": ys_prime[:, 1],
"tp": ys_prime[:, 2],
"x1e": ys_hat_e[:, 0],
"x2e": ys_hat_e[:, 1],
"te": ys_hat_e[:, 2],
"x1n": ys[:, 0],
"x2n": ys[:, 1],
"tn": ys[:, 2]
})
if args.output: plt.show()
df.to_csv(args.output, index=False)
else:
print(df)
def main(): def main():
@@ -131,7 +67,6 @@ def main():
default=0.01, help="Convergence threshold for Newton corrector") default=0.01, help="Convergence threshold for Newton corrector")
parser.add_argument("-s", "--sigma", type=int, default=1, parser.add_argument("-s", "--sigma", type=int, default=1,
help="Direction in which the path is traced") help="Direction in which the path is traced")
parser.add_argument("-o", "--output", type=str, help="Output csv file")
parser.add_argument("-n", "--num-iterations", type=int, default=20, parser.add_argument("-n", "--num-iterations", type=int, default=20,
help="Number of iterations of the example program to run") help="Number of iterations of the example program to run")

View File

@@ -0,0 +1,172 @@
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
from tqdm import tqdm
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in tqdm(range(args.max_frames)):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> pd.DataFrame:
FERs = []
BERs = []
DFRs = []
frame_errors_list = []
for SNR in Eb_N0_list:
FER, BER, DFR, num_frames = simulate_error_rates_for_SNR(H, SNR, args)
FERs.append(FER)
BERs.append(BER)
DFRs.append(DFR)
frame_errors_list.append(num_frames)
print(pd.DataFrame({"SNR": Eb_N0_list[:len(FERs)], "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list}))
return pd.DataFrame({"SNR": Eb_N0_list, "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list})
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
parser.add_argument("--snr", type=float, required=True, nargs=3,
metavar=('start', 'stop', 'step'),
help="SNRs to simulate (Eb/N0) in dB")
parser.add_argument("--max-frames", type=int, default=int(1e6),
help="Maximum number of frames to simulate")
parser.add_argument("--target-frame-errors", type=int, default=200,
help="Number of frame errors after which to stop the simulation")
parser.add_argument("--euler-step-size", type=float,
default=0.05, help="Step size for Euler predictor")
parser.add_argument("--euler-max-tries", type=int, default=5,
help="Maximum number of tries for Euler predictor")
parser.add_argument("--newton-max-iter", type=int, default=5,
help="Maximum number of iterations for Newton corrector")
parser.add_argument("--newton-convergence-threshold", type=float,
default=0.01, help="Convergence threshold for Newton corrector")
parser.add_argument("-s", "--sigma", type=int, default=1,
help="Direction in which the path is traced")
parser.add_argument("-n", "--homotopy-iter", type=int, default=20,
help="Number of iterations of the homotopy continuation method to perform for each decoding")
args = parser.parse_args()
# TODO: Name this section properly
# Do stuff
H = utility.read_alist_file(args.code)
simulation_args = SimulationArgs(
euler_step_size=args.euler_step_size,
euler_max_tries=args.euler_max_tries,
newton_max_iter=args.newton_max_iter,
newton_convergence_threshold=args.newton_convergence_threshold,
sigma=args.sigma,
homotopy_iter=args.homotopy_iter,
max_frames=args.max_frames,
target_frame_errors=args.target_frame_errors)
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
df = simulate_error_rates(H, SNRs, simulation_args)
print(df)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,171 @@
from pathlib import Path
from ray import tune
import ray
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in range(args.max_frames):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulation(config):
simulation_args = SimulationArgs(
euler_step_size=config["euler_step_size"],
euler_max_tries=20,
newton_max_iter=20,
newton_convergence_threshold=config["newton_convergence_threshold"],
sigma=config["sigma"],
homotopy_iter=config["homotopy_iter"],
max_frames=100000,
target_frame_errors=200)
FER, BER, DFR, frame_errors = simulate_error_rates_for_SNR(
config["H"], config["Eb_N0"], simulation_args)
return {"FER": FER, "BER": BER, "DFR": DFR, "frame_errors": frame_errors}
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
args = parser.parse_args()
H = utility.read_alist_file(args.code)
# Run parameter sweep
ray.init()
search_space = {
"Eb_N0": tune.grid_search([6]),
"newton_convergence_threshold": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"euler_step_size": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"homotopy_iter": tune.grid_search(np.array([500])),
"sigma": tune.grid_search(np.array([1, -1])),
"H": tune.grid_search([H])
}
tuner = tune.Tuner(
simulation,
param_space=search_space,
tune_config=tune.TuneConfig(
max_concurrent_trials=12,
),
run_config=tune.RunConfig(
name="param_sweep",
storage_path=Path.cwd() / "sim_results"
)
)
results = tuner.fit()
df = results.get_dataframe()
keep_columns = ["FER", "BER", "DFR", "frame_errors"]
config_columns = [col for col in df.columns if col.startswith("config/")]
columns_to_keep = keep_columns + config_columns
df = df[columns_to_keep].drop(columns=["config/H"])
df.to_csv("sweep_results.csv", index=False)
print(df.head())
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ class ToyHomotopy:
[t]] [t]]
""" """
@staticmethod @staticmethod
def evaluate_H(y: np.ndarray) -> np.ndarray: def H(y: np.ndarray) -> np.ndarray:
"""Evaluate H at y.""" """Evaluate H at y."""
x1 = y[0] x1 = y[0]
x2 = y[1] x2 = y[1]
@@ -43,7 +43,7 @@ class ToyHomotopy:
return result return result
@staticmethod @staticmethod
def evaluate_DH(y: np.ndarray) -> np.ndarray: def DH(y: np.ndarray) -> np.ndarray:
"""Evaluate Jacobian of H at y.""" """Evaluate Jacobian of H at y."""
x1 = y[0] x1 = y[0]
x2 = y[1] x2 = y[1]

View File

@@ -1,6 +0,0 @@
def main():
pass
if __name__ == "__main__":
main()

View File

@@ -17,101 +17,95 @@ class HomotopyGenerator:
self.H_matrix = parity_check_matrix self.H_matrix = parity_check_matrix
self.num_checks, self.num_vars = parity_check_matrix.shape self.num_checks, self.num_vars = parity_check_matrix.shape
# Create symbolic variables
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)] self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
self.t = sp.symbols('t') self.t = sp.symbols('t')
# Generate G, F, and H self._F = self._create_F()
self.G = self._create_G() self._F_lambda = self._create_F_lambda(self._F)
self.F = self._create_F()
self.H = self._create_H()
# Convert to callable functions def update_received(self, received: np.ndarray):
self._H_lambda = self._create_H_lambda() """Update the received vector and recompute G."""
self._DH_lambda = self._create_DH_lambda() self._G = self._create_G(received)
self._G_lambda = self._create_G_lambda(self._G)
self._H = self._create_H()
self._H_lambda = self._create_H_lambda(self._H)
self._DH = self._create_DH()
self._DH_lambda = self._create_DH_lambda(self._DH)
def _create_G(self) -> List[sp.Expr]: def _create_F(self) -> sp.MutableMatrix:
"""Create G polynomial system (the starting system)."""
# For each variable xi, add the polynomial [xi]
G = []
for var in self.x_vars:
G.append(var)
return G
def _create_F(self) -> List[sp.Expr]:
"""Create F polynomial system (the target system)."""
F = [] F = []
# Add 1 - xi^2 for each variable
for var in self.x_vars: for var in self.x_vars:
F.append(1 - var**2) F.append(1 - var**2)
# Add parity check polynomials: 1 - x1*x2*...*xk for each parity check
for row in self.H_matrix: for row in self.H_matrix:
# Create product of variables that participate in this check
term = 1 term = 1
for i, bit in enumerate(row): for i, bit in enumerate(row):
if bit == 1: if bit == 1:
term *= self.x_vars[i] term *= self.x_vars[i]
if term != 1: # Only add if there are variables in this check F.append(1 - term)
F.append(1 - term)
return F groebner_basis = sp.groebner(F)
return sp.MutableMatrix(groebner_basis)
def _create_H(self) -> List[sp.Expr]: def _create_G(self, received) -> sp.MutableMatrix:
"""Create the homotopy H = (1-t)*G + t*F.""" G = []
F_y = self._F_lambda(*received)
for f, f_y_i in zip(self._F, F_y):
G.append(f - f_y_i)
return sp.MutableMatrix(G)
def _create_H(self) -> sp.MutableMatrix:
H = [] H = []
# Make sure G and F have the same length for f, g in zip(self._F, self._G):
# Repeat variables from G if needed to match F's length
G_extended = self.G.copy()
while len(G_extended) < len(self.F):
# Cycle through variables to repeat
for i in range(min(self.num_vars, len(self.F) - len(G_extended))):
G_extended.append(self.x_vars[i % self.num_vars])
# Create the homotopy
for g, f in zip(G_extended, self.F):
H.append((1 - self.t) * g + self.t * f) H.append((1 - self.t) * g + self.t * f)
return H return sp.MutableMatrix(H)
def _create_H_lambda(self) -> Callable: def _create_DH(self) -> sp.MutableMatrix:
"""Create a lambda function to evaluate H."""
all_vars = self.x_vars + [self.t] all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.H, 'numpy') DH = sp.Matrix([[sp.diff(expr, var)
for var in all_vars] for expr in self._H])
def _create_DH_lambda(self) -> Callable: return DH
"""Create a lambda function to evaluate the Jacobian of H."""
def _create_F_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_G_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_H_lambda(self, expr) -> Callable:
all_vars = self.x_vars + [self.t] all_vars = self.x_vars + [self.t]
jacobian = sp.Matrix([[sp.diff(expr, var) return sp.lambdify(all_vars, expr, 'numpy')
for var in all_vars] for expr in self.H])
return sp.lambdify(all_vars, jacobian, 'numpy')
def evaluate_H(self, y: np.ndarray) -> np.ndarray: def _create_DH_lambda(self, expr) -> Callable:
""" all_vars = self.x_vars + [self.t]
Evaluate H at point y. return sp.lambdify(all_vars, expr, 'numpy')
Args: def H(self, y):
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns:
Array containing H evaluated at y.
"""
return np.array(self._H_lambda(*y)) return np.array(self._H_lambda(*y))
def evaluate_DH(self, y: np.ndarray) -> np.ndarray: def DH(self, y):
""" return np.array(self._DH_lambda(*y))
Evaluate the Jacobian of H at point y.
Args:
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns: def main():
Matrix containing the Jacobian of H evaluated at y. import utility
""" H = utility.read_alist_file(
return np.array(self._DH_lambda(*y), dtype=float) "/home/andreas/workspace/work/hiwi/ba-sw/codes/BCH_7_4.alist")
a = HomotopyGenerator(H)
y = np.array([0.1, 0.2, 0.9, 0.1, -0.8, -0.5, -1.0, 0])
print(a.DH(y))
if __name__ == "__main__":
main()

View File

@@ -3,10 +3,6 @@ import typing
import scipy import scipy
def _sign(val):
return -1 * (val < 0) + 1 * (val >= 0)
class PathTracker: class PathTracker:
""" """
Path trakcer for the homotopy continuation method. Uses a Path trakcer for the homotopy continuation method. Uses a
@@ -18,9 +14,9 @@ class PathTracker:
Information and Systems, vol. 15, no. 2, pp. 119-307, 2015 Information and Systems, vol. 15, no. 2, pp. 119-307, 2015
""" """
def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5, def __init__(self, homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
newton_convergence_threshold=0.001, sigma=1): newton_convergence_threshold=0.001, sigma=1):
self.Homotopy = Homotopy self._homotopy = homotopy
self._euler_step_size = euler_step_size self._euler_step_size = euler_step_size
self._euler_max_tries = euler_max_tries self._euler_max_tries = euler_max_tries
self._newton_max_iter = newton_max_iter self._newton_max_iter = newton_max_iter
@@ -29,7 +25,7 @@ class PathTracker:
def step(self, y): def step(self, y):
"""Perform one predictor-corrector step.""" """Perform one predictor-corrector step."""
return self.transparent_step(y)[0] return self.transparent_step(y)[3]
def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]: def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]:
"""Perform one predictor-corrector step, returning intermediate results.""" """Perform one predictor-corrector step, returning intermediate results."""
@@ -43,21 +39,14 @@ class PathTracker:
raise RuntimeError("Newton corrector did not converge") raise RuntimeError("Newton corrector did not converge")
# TODO: Make sure the implementation with null_space instead of QR is correct
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]: def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
# Obtain y_prime # Obtain y_prime
DH = self.Homotopy.evaluate_DH(y) DH = self._homotopy.DH(y)
ns = scipy.linalg.null_space(DH) ns = scipy.linalg.null_space(DH)
y_prime = ns[:, 0] * self._sigma y_prime = ns[:, 0] * self._sigma
# Q, R = np.linalg.qr(np.transpose(DH), mode="complete")
# y_prime = Q[:, 2]
#
# if _sign(np.linalg.det(Q)*np.linalg.det(R[:2, :])) != _sign(self._sigma):
# y_prime = -y_prime
# Perform prediction # Perform prediction
y_hat = y + step_size*y_prime y_hat = y + step_size*y_prime
@@ -70,10 +59,10 @@ class PathTracker:
for _ in range(self._newton_max_iter): for _ in range(self._newton_max_iter):
# Perform correction # Perform correction
DH = self.Homotopy.evaluate_DH(y) DH = self._homotopy.DH(y)
DH_pinv = np.linalg.pinv(DH) DH_pinv = np.linalg.pinv(DH)
y = y - DH_pinv @ self.Homotopy.evaluate_H(y) y = y - np.transpose(DH_pinv @ self._homotopy.H(y))[0]
# Check stopping criterion # Check stopping criterion
@@ -83,3 +72,6 @@ class PathTracker:
prev_y = y prev_y = y
raise RuntimeError("Newton corrector did not converge") raise RuntimeError("Newton corrector did not converge")
def set_sigma(self, sigma):
self._sigma = sigma

29
python/hccd/utility.py Normal file
View File

@@ -0,0 +1,29 @@
import numpy as np
def _parse_alist_header(header):
size = header.split()
return int(size[0]), int(size[1])
def read_alist_file(filename):
"""
This function reads in an alist file and creates the
corresponding parity check matrix H. The format of alist
files is described at:
http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
"""
with open(filename, 'r') as myfile:
data = myfile.readlines()
numCols, numRows = _parse_alist_header(data[0])
H = np.zeros((numRows, numCols))
# The locations of 1s starts in the 5th line of the file
for lineNumber in np.arange(4, 4 + numCols):
indices = data[lineNumber].split()
for index in indices:
H[int(index) - 1, lineNumber - 4] = 1
return H.astype(np.int32)