Compare commits

..

12 Commits

9 changed files with 469 additions and 74 deletions

View File

@@ -1,6 +1,8 @@
# Homotopy Continuation
### Introduction
## Introduction
### Overview
The aim of a homotopy method consists in solving a system of N nonlinear
equations in N variables \[1, p.1\]:
@@ -85,6 +87,26 @@ between successive points produced by the iterations can be used as a criterion
for convergence. Of course, if the iterations fail to converge, one must go
back to adjust the step size for the Eulers predictor." [2, p.130]
## Application to Channel Decoding
We can describe the decoding problem using the code constraint polynomial [3]
$$
h(\bm{x}) = \underbrace{\sum_{i=1}^{n}\left(1-x_i^2\right)^2}_{\text{Bipolar constraint}} + \underbrace{\sum_{j=1}^{m}\left(1 - \left(\prod_{i\in A(j)}x_i\right)\right)^2}_{\text{Parity constraint}},
$$
where $A(j) = \left\{i \in [1:n]: H_{j,i} = 1\right\}$ represents the set of
variable nodes involved in parity check j. This polynomial consists of a set of
terms representing the bipolar constraint and a set of terms representing the
parity constraint. In a similar vein, we can define the following set of
polynomial equations to describe codewords:
$$
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
$$
This is a problem we can solve using homotopy continuation.
______________________________________________________________________
## References
@@ -97,3 +119,7 @@ Philadelphia, PA 19104), 2003. doi: 10.1137/1.9780898719154.
\[2\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
of nonlinear and polynomial equations,” Communications in Information and
Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.
\[3\]: Wadayama, Tadashi, and Satoshi Takabe. "Proximal decoding for LDPC
codes." IEICE Transactions on Fundamentals of Electronics, Communications and
Computer Sciences 106.3 (2023): 359-367.

View File

@@ -18,12 +18,14 @@ def track_path(args):
[0, 1, 1, 0],
[0, 0, 1, 1]])
homotopy = homotopy_generator.HomotopyGenerator(H)
received = np.array([0, 0, 0, 0])
print(f"G: {homotopy.G}")
print(f"F: {homotopy.F}")
print(f"H: {homotopy.H}")
print(f"DH: {homotopy.DH}")
homotopy = homotopy_generator.HomotopyGenerator(H, received)
# print(f"G: {homotopy.G}")
# print(f"F: {homotopy.F}")
# print(f"H: {homotopy.H}")
# print(f"DH: {homotopy.DH}")
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)

View File

@@ -0,0 +1,172 @@
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
from tqdm import tqdm
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in tqdm(range(args.max_frames)):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> pd.DataFrame:
FERs = []
BERs = []
DFRs = []
frame_errors_list = []
for SNR in Eb_N0_list:
FER, BER, DFR, num_frames = simulate_error_rates_for_SNR(H, SNR, args)
FERs.append(FER)
BERs.append(BER)
DFRs.append(DFR)
frame_errors_list.append(num_frames)
print(pd.DataFrame({"SNR": Eb_N0_list[:len(FERs)], "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list}))
return pd.DataFrame({"SNR": Eb_N0_list, "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list})
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
parser.add_argument("--snr", type=float, required=True, nargs=3,
metavar=('start', 'stop', 'step'),
help="SNRs to simulate (Eb/N0) in dB")
parser.add_argument("--max-frames", type=int, default=int(1e6),
help="Maximum number of frames to simulate")
parser.add_argument("--target-frame-errors", type=int, default=200,
help="Number of frame errors after which to stop the simulation")
parser.add_argument("--euler-step-size", type=float,
default=0.05, help="Step size for Euler predictor")
parser.add_argument("--euler-max-tries", type=int, default=5,
help="Maximum number of tries for Euler predictor")
parser.add_argument("--newton-max-iter", type=int, default=5,
help="Maximum number of iterations for Newton corrector")
parser.add_argument("--newton-convergence-threshold", type=float,
default=0.01, help="Convergence threshold for Newton corrector")
parser.add_argument("-s", "--sigma", type=int, default=1,
help="Direction in which the path is traced")
parser.add_argument("-n", "--homotopy-iter", type=int, default=20,
help="Number of iterations of the homotopy continuation method to perform for each decoding")
args = parser.parse_args()
# TODO: Name this section properly
# Do stuff
H = utility.read_alist_file(args.code)
simulation_args = SimulationArgs(
euler_step_size=args.euler_step_size,
euler_max_tries=args.euler_max_tries,
newton_max_iter=args.newton_max_iter,
newton_convergence_threshold=args.newton_convergence_threshold,
sigma=args.sigma,
homotopy_iter=args.homotopy_iter,
max_frames=args.max_frames,
target_frame_errors=args.target_frame_errors)
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
df = simulate_error_rates(H, SNRs, simulation_args)
print(df)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,171 @@
from pathlib import Path
from ray import tune
import ray
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in range(args.max_frames):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulation(config):
simulation_args = SimulationArgs(
euler_step_size=config["euler_step_size"],
euler_max_tries=20,
newton_max_iter=20,
newton_convergence_threshold=config["newton_convergence_threshold"],
sigma=config["sigma"],
homotopy_iter=config["homotopy_iter"],
max_frames=100000,
target_frame_errors=200)
FER, BER, DFR, frame_errors = simulate_error_rates_for_SNR(
config["H"], config["Eb_N0"], simulation_args)
return {"FER": FER, "BER": BER, "DFR": DFR, "frame_errors": frame_errors}
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
args = parser.parse_args()
H = utility.read_alist_file(args.code)
# Run parameter sweep
ray.init()
search_space = {
"Eb_N0": tune.grid_search([6]),
"newton_convergence_threshold": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"euler_step_size": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"homotopy_iter": tune.grid_search(np.array([500])),
"sigma": tune.grid_search(np.array([1, -1])),
"H": tune.grid_search([H])
}
tuner = tune.Tuner(
simulation,
param_space=search_space,
tune_config=tune.TuneConfig(
max_concurrent_trials=12,
),
run_config=tune.RunConfig(
name="param_sweep",
storage_path=Path.cwd() / "sim_results"
)
)
results = tuner.fit()
df = results.get_dataframe()
keep_columns = ["FER", "BER", "DFR", "frame_errors"]
config_columns = [col for col in df.columns if col.startswith("config/")]
columns_to_keep = keep_columns + config_columns
df = df[columns_to_keep].drop(columns=["config/H"])
df.to_csv("sweep_results.csv", index=False)
print(df.head())
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ class ToyHomotopy:
[t]]
"""
@staticmethod
def evaluate_H(y: np.ndarray) -> np.ndarray:
def H(y: np.ndarray) -> np.ndarray:
"""Evaluate H at y."""
x1 = y[0]
x2 = y[1]
@@ -43,7 +43,7 @@ class ToyHomotopy:
return result
@staticmethod
def evaluate_DH(y: np.ndarray) -> np.ndarray:
def DH(y: np.ndarray) -> np.ndarray:
"""Evaluate Jacobian of H at y."""
x1 = y[0]
x2 = y[1]

View File

@@ -1,6 +0,0 @@
def main():
pass
if __name__ == "__main__":
main()

View File

@@ -20,20 +20,17 @@ class HomotopyGenerator:
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
self.t = sp.symbols('t')
self.G = self._create_G()
self.F = self._create_F()
self.H = self._create_H()
self.DH = self._create_DH(self.H)
self._F = self._create_F()
self._F_lambda = self._create_F_lambda(self._F)
self._H_lambda = self._create_H_lambda()
self._DH_lambda = self._create_DH_lambda()
def _create_G(self) -> List[sp.Expr]:
G = []
for var in self.x_vars:
G.append(var)
return G
def update_received(self, received: np.ndarray):
"""Update the received vector and recompute G."""
self._G = self._create_G(received)
self._G_lambda = self._create_G_lambda(self._G)
self._H = self._create_H()
self._H_lambda = self._create_H_lambda(self._H)
self._DH = self._create_DH()
self._DH_lambda = self._create_DH_lambda(self._DH)
def _create_F(self) -> sp.MutableMatrix:
F = []
@@ -52,51 +49,63 @@ class HomotopyGenerator:
groebner_basis = sp.groebner(F)
return sp.MutableMatrix(groebner_basis)
def _create_H(self) -> List[sp.Expr]:
def _create_G(self, received) -> sp.MutableMatrix:
G = []
F_y = self._F_lambda(*received)
for f, f_y_i in zip(self._F, F_y):
G.append(f - f_y_i)
return sp.MutableMatrix(G)
def _create_H(self) -> sp.MutableMatrix:
H = []
for g, f in zip(self.G, self.F):
for f, g in zip(self._F, self._G):
H.append((1 - self.t) * g + self.t * f)
return H
return sp.MutableMatrix(H)
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix:
def _create_DH(self) -> sp.MutableMatrix:
all_vars = self.x_vars + [self.t]
DH = sp.Matrix([[sp.diff(expr, var)
for var in all_vars] for expr in self.H])
for var in all_vars] for expr in self._H])
return DH
def _create_H_lambda(self) -> Callable:
def _create_F_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_G_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_H_lambda(self, expr) -> Callable:
all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.H, 'numpy')
return sp.lambdify(all_vars, expr, 'numpy')
def _create_DH_lambda(self) -> Callable:
def _create_DH_lambda(self, expr) -> Callable:
all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.DH, 'numpy')
return sp.lambdify(all_vars, expr, 'numpy')
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
"""
Evaluate H at point y.
Args:
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns:
Array containing H evaluated at y.
"""
def H(self, y):
return np.array(self._H_lambda(*y))
def evaluate_DH(self, y: np.ndarray) -> np.ndarray:
"""
Evaluate the Jacobian of H at point y.
def DH(self, y):
return np.array(self._DH_lambda(*y))
Args:
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns:
Matrix containing the Jacobian of H evaluated at y.
"""
return np.array(self._DH_lambda(*y), dtype=float)
def main():
import utility
H = utility.read_alist_file(
"/home/andreas/workspace/work/hiwi/ba-sw/codes/BCH_7_4.alist")
a = HomotopyGenerator(H)
y = np.array([0.1, 0.2, 0.9, 0.1, -0.8, -0.5, -1.0, 0])
print(a.DH(y))
if __name__ == "__main__":
main()

View File

@@ -3,10 +3,6 @@ import typing
import scipy
def _sign(val):
return -1 * (val < 0) + 1 * (val >= 0)
class PathTracker:
"""
Path trakcer for the homotopy continuation method. Uses a
@@ -18,9 +14,9 @@ class PathTracker:
Information and Systems, vol. 15, no. 2, pp. 119-307, 2015
"""
def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
def __init__(self, homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
newton_convergence_threshold=0.001, sigma=1):
self.Homotopy = Homotopy
self._homotopy = homotopy
self._euler_step_size = euler_step_size
self._euler_max_tries = euler_max_tries
self._newton_max_iter = newton_max_iter
@@ -29,7 +25,7 @@ class PathTracker:
def step(self, y):
"""Perform one predictor-corrector step."""
return self.transparent_step(y)[0]
return self.transparent_step(y)[3]
def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]:
"""Perform one predictor-corrector step, returning intermediate results."""
@@ -43,21 +39,14 @@ class PathTracker:
raise RuntimeError("Newton corrector did not converge")
# TODO: Make sure the implementation with null_space instead of QR is correct
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
# Obtain y_prime
DH = self.Homotopy.evaluate_DH(y)
DH = self._homotopy.DH(y)
ns = scipy.linalg.null_space(DH)
y_prime = ns[:, 0] * self._sigma
# Q, R = np.linalg.qr(np.transpose(DH), mode="complete")
# y_prime = Q[:, 2]
#
# if _sign(np.linalg.det(Q)*np.linalg.det(R[:2, :])) != _sign(self._sigma):
# y_prime = -y_prime
# Perform prediction
y_hat = y + step_size*y_prime
@@ -70,10 +59,10 @@ class PathTracker:
for _ in range(self._newton_max_iter):
# Perform correction
DH = self.Homotopy.evaluate_DH(y)
DH = self._homotopy.DH(y)
DH_pinv = np.linalg.pinv(DH)
y = y - DH_pinv @ self.Homotopy.evaluate_H(y)
y = y - np.transpose(DH_pinv @ self._homotopy.H(y))[0]
# Check stopping criterion
@@ -83,3 +72,6 @@ class PathTracker:
prev_y = y
raise RuntimeError("Newton corrector did not converge")
def set_sigma(self, sigma):
self._sigma = sigma

29
python/hccd/utility.py Normal file
View File

@@ -0,0 +1,29 @@
import numpy as np
def _parse_alist_header(header):
size = header.split()
return int(size[0]), int(size[1])
def read_alist_file(filename):
"""
This function reads in an alist file and creates the
corresponding parity check matrix H. The format of alist
files is described at:
http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
"""
with open(filename, 'r') as myfile:
data = myfile.readlines()
numCols, numRows = _parse_alist_header(data[0])
H = np.zeros((numRows, numCols))
# The locations of 1s starts in the 5th line of the file
for lineNumber in np.arange(4, 4 + numCols):
indices = data[lineNumber].split()
for index in indices:
H[int(index) - 1, lineNumber - 4] = 1
return H.astype(np.int32)