Compare commits

..

12 Commits

9 changed files with 469 additions and 74 deletions

View File

@@ -1,6 +1,8 @@
# Homotopy Continuation # Homotopy Continuation
### Introduction ## Introduction
### Overview
The aim of a homotopy method consists in solving a system of N nonlinear The aim of a homotopy method consists in solving a system of N nonlinear
equations in N variables \[1, p.1\]: equations in N variables \[1, p.1\]:
@@ -85,6 +87,26 @@ between successive points produced by the iterations can be used as a criterion
for convergence. Of course, if the iterations fail to converge, one must go for convergence. Of course, if the iterations fail to converge, one must go
back to adjust the step size for the Eulers predictor." [2, p.130] back to adjust the step size for the Eulers predictor." [2, p.130]
## Application to Channel Decoding
We can describe the decoding problem using the code constraint polynomial [3]
$$
h(\bm{x}) = \underbrace{\sum_{i=1}^{n}\left(1-x_i^2\right)^2}_{\text{Bipolar constraint}} + \underbrace{\sum_{j=1}^{m}\left(1 - \left(\prod_{i\in A(j)}x_i\right)\right)^2}_{\text{Parity constraint}},
$$
where $A(j) = \left\{i \in [1:n]: H_{j,i} = 1\right\}$ represents the set of
variable nodes involved in parity check j. This polynomial consists of a set of
terms representing the bipolar constraint and a set of terms representing the
parity constraint. In a similar vein, we can define the following set of
polynomial equations to describe codewords:
$$
F(\bm{x}) = \left[\begin{array}{c}1 - x_1^2 \\ \vdots\\ 1 - x_n^2 \\ 1 - \prod_{i \in A(1)}x_i \\ \vdots\\ 1 - \prod_{i \in A(m)}x_i\end{array}\right] \overset{!}{=} \bm{0}.
$$
This is a problem we can solve using homotopy continuation.
______________________________________________________________________ ______________________________________________________________________
## References ## References
@@ -97,3 +119,7 @@ Philadelphia, PA 19104), 2003. doi: 10.1137/1.9780898719154.
\[2\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems \[2\]: T. Chen and T.-Y. Li, “Homotopy continuation method for solving systems
of nonlinear and polynomial equations,” Communications in Information and of nonlinear and polynomial equations,” Communications in Information and
Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1. Systems, vol. 15, no. 2, pp. 119307, 2015, doi: 10.4310/CIS.2015.v15.n2.a1.
\[3\]: Wadayama, Tadashi, and Satoshi Takabe. "Proximal decoding for LDPC
codes." IEICE Transactions on Fundamentals of Electronics, Communications and
Computer Sciences 106.3 (2023): 359-367.

View File

@@ -18,12 +18,14 @@ def track_path(args):
[0, 1, 1, 0], [0, 1, 1, 0],
[0, 0, 1, 1]]) [0, 0, 1, 1]])
homotopy = homotopy_generator.HomotopyGenerator(H) received = np.array([0, 0, 0, 0])
print(f"G: {homotopy.G}") homotopy = homotopy_generator.HomotopyGenerator(H, received)
print(f"F: {homotopy.F}")
print(f"H: {homotopy.H}") # print(f"G: {homotopy.G}")
print(f"DH: {homotopy.DH}") # print(f"F: {homotopy.F}")
# print(f"H: {homotopy.H}")
# print(f"DH: {homotopy.DH}")
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma) args.newton_max_iter, args.newton_convergence_threshold, args.sigma)

View File

@@ -0,0 +1,172 @@
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
from tqdm import tqdm
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in tqdm(range(args.max_frames)):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> pd.DataFrame:
FERs = []
BERs = []
DFRs = []
frame_errors_list = []
for SNR in Eb_N0_list:
FER, BER, DFR, num_frames = simulate_error_rates_for_SNR(H, SNR, args)
FERs.append(FER)
BERs.append(BER)
DFRs.append(DFR)
frame_errors_list.append(num_frames)
print(pd.DataFrame({"SNR": Eb_N0_list[:len(FERs)], "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list}))
return pd.DataFrame({"SNR": Eb_N0_list, "FER": FERs, "BER": BERs,
"DFR": DFRs, "frame_errors": frame_errors_list})
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
parser.add_argument("--snr", type=float, required=True, nargs=3,
metavar=('start', 'stop', 'step'),
help="SNRs to simulate (Eb/N0) in dB")
parser.add_argument("--max-frames", type=int, default=int(1e6),
help="Maximum number of frames to simulate")
parser.add_argument("--target-frame-errors", type=int, default=200,
help="Number of frame errors after which to stop the simulation")
parser.add_argument("--euler-step-size", type=float,
default=0.05, help="Step size for Euler predictor")
parser.add_argument("--euler-max-tries", type=int, default=5,
help="Maximum number of tries for Euler predictor")
parser.add_argument("--newton-max-iter", type=int, default=5,
help="Maximum number of iterations for Newton corrector")
parser.add_argument("--newton-convergence-threshold", type=float,
default=0.01, help="Convergence threshold for Newton corrector")
parser.add_argument("-s", "--sigma", type=int, default=1,
help="Direction in which the path is traced")
parser.add_argument("-n", "--homotopy-iter", type=int, default=20,
help="Number of iterations of the homotopy continuation method to perform for each decoding")
args = parser.parse_args()
# TODO: Name this section properly
# Do stuff
H = utility.read_alist_file(args.code)
simulation_args = SimulationArgs(
euler_step_size=args.euler_step_size,
euler_max_tries=args.euler_max_tries,
newton_max_iter=args.newton_max_iter,
newton_convergence_threshold=args.newton_convergence_threshold,
sigma=args.sigma,
homotopy_iter=args.homotopy_iter,
max_frames=args.max_frames,
target_frame_errors=args.target_frame_errors)
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
df = simulate_error_rates(H, SNRs, simulation_args)
print(df)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,171 @@
from pathlib import Path
from ray import tune
import ray
import typing
import numpy as np
import galois
import argparse
from dataclasses import dataclass
import pandas as pd
# autopep8: off
import sys
import os
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
from hccd import utility, homotopy_generator, path_tracker
# autopep8: on
@dataclass
class SimulationArgs:
euler_step_size: float
euler_max_tries: int
newton_max_iter: int
newton_convergence_threshold: float
sigma: int
homotopy_iter: int
max_frames: int
target_frame_errors: int
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
x_hat = np.where(y >= 0, 0, 1)
s = np.concatenate([y, np.array([0])])
for i in range(args.homotopy_iter):
x_hat = np.where(s[:-1] >= 0, 0, 1)
if not np.any(np.mod(H @ x_hat, 2)):
return x_hat
try:
s = tracker.step(s)
except:
return x_hat
return x_hat
def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple[float, float, float, int]:
GF = galois.GF(2)
H_GF = GF(H)
G = H_GF.null_space()
k, n = G.shape
num_frames = 0
bit_errors = 0
frame_errors = 0
decoding_failures = 0
homotopy = homotopy_generator.HomotopyGenerator(H)
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
for _ in range(args.max_frames):
Eb_N0_lin = 10**(Eb_N0 / 10)
N0 = 1 / (2 * k / n * Eb_N0_lin)
u = np.random.randint(2, size=k)
# u = np.zeros(shape=k).astype(np.int32)
c = np.array(GF(u) @ G)
x = 1 - 2*c
y = x + np.sqrt(N0) * np.random.normal(size=n)
homotopy.update_received(y)
c_hat = decode(tracker, y, H, args)
if np.any(np.mod(H @ c_hat, 2)):
tracker.set_sigma(-1 * args.sigma)
c_hat = decode(tracker, y, H, args)
tracker.set_sigma(args.sigma)
if np.any(np.mod(H @ c_hat, 2)):
decoding_failures += 1
bit_errors += np.sum(c_hat != c)
frame_errors += np.any(c_hat != c)
num_frames += 1
if frame_errors >= args.target_frame_errors:
break
BER = bit_errors / (num_frames * n)
FER = frame_errors / num_frames
DFR = decoding_failures / num_frames
return FER, BER, DFR, frame_errors
def simulation(config):
simulation_args = SimulationArgs(
euler_step_size=config["euler_step_size"],
euler_max_tries=20,
newton_max_iter=20,
newton_convergence_threshold=config["newton_convergence_threshold"],
sigma=config["sigma"],
homotopy_iter=config["homotopy_iter"],
max_frames=100000,
target_frame_errors=200)
FER, BER, DFR, frame_errors = simulate_error_rates_for_SNR(
config["H"], config["Eb_N0"], simulation_args)
return {"FER": FER, "BER": BER, "DFR": DFR, "frame_errors": frame_errors}
def main():
# Parse command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--code", type=str, required=True,
help="Path to the alist file containing the parity check matrix of the code")
args = parser.parse_args()
H = utility.read_alist_file(args.code)
# Run parameter sweep
ray.init()
search_space = {
"Eb_N0": tune.grid_search([6]),
"newton_convergence_threshold": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"euler_step_size": tune.grid_search(np.array([0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1])),
"homotopy_iter": tune.grid_search(np.array([500])),
"sigma": tune.grid_search(np.array([1, -1])),
"H": tune.grid_search([H])
}
tuner = tune.Tuner(
simulation,
param_space=search_space,
tune_config=tune.TuneConfig(
max_concurrent_trials=12,
),
run_config=tune.RunConfig(
name="param_sweep",
storage_path=Path.cwd() / "sim_results"
)
)
results = tuner.fit()
df = results.get_dataframe()
keep_columns = ["FER", "BER", "DFR", "frame_errors"]
config_columns = [col for col in df.columns if col.startswith("config/")]
columns_to_keep = keep_columns + config_columns
df = df[columns_to_keep].drop(columns=["config/H"])
df.to_csv("sweep_results.csv", index=False)
print(df.head())
if __name__ == "__main__":
main()

View File

@@ -30,7 +30,7 @@ class ToyHomotopy:
[t]] [t]]
""" """
@staticmethod @staticmethod
def evaluate_H(y: np.ndarray) -> np.ndarray: def H(y: np.ndarray) -> np.ndarray:
"""Evaluate H at y.""" """Evaluate H at y."""
x1 = y[0] x1 = y[0]
x2 = y[1] x2 = y[1]
@@ -43,7 +43,7 @@ class ToyHomotopy:
return result return result
@staticmethod @staticmethod
def evaluate_DH(y: np.ndarray) -> np.ndarray: def DH(y: np.ndarray) -> np.ndarray:
"""Evaluate Jacobian of H at y.""" """Evaluate Jacobian of H at y."""
x1 = y[0] x1 = y[0]
x2 = y[1] x2 = y[1]

View File

@@ -1,6 +0,0 @@
def main():
pass
if __name__ == "__main__":
main()

View File

@@ -20,20 +20,17 @@ class HomotopyGenerator:
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)] self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
self.t = sp.symbols('t') self.t = sp.symbols('t')
self.G = self._create_G() self._F = self._create_F()
self.F = self._create_F() self._F_lambda = self._create_F_lambda(self._F)
self.H = self._create_H()
self.DH = self._create_DH(self.H)
self._H_lambda = self._create_H_lambda() def update_received(self, received: np.ndarray):
self._DH_lambda = self._create_DH_lambda() """Update the received vector and recompute G."""
self._G = self._create_G(received)
def _create_G(self) -> List[sp.Expr]: self._G_lambda = self._create_G_lambda(self._G)
G = [] self._H = self._create_H()
for var in self.x_vars: self._H_lambda = self._create_H_lambda(self._H)
G.append(var) self._DH = self._create_DH()
self._DH_lambda = self._create_DH_lambda(self._DH)
return G
def _create_F(self) -> sp.MutableMatrix: def _create_F(self) -> sp.MutableMatrix:
F = [] F = []
@@ -52,51 +49,63 @@ class HomotopyGenerator:
groebner_basis = sp.groebner(F) groebner_basis = sp.groebner(F)
return sp.MutableMatrix(groebner_basis) return sp.MutableMatrix(groebner_basis)
def _create_H(self) -> List[sp.Expr]: def _create_G(self, received) -> sp.MutableMatrix:
G = []
F_y = self._F_lambda(*received)
for f, f_y_i in zip(self._F, F_y):
G.append(f - f_y_i)
return sp.MutableMatrix(G)
def _create_H(self) -> sp.MutableMatrix:
H = [] H = []
for g, f in zip(self.G, self.F): for f, g in zip(self._F, self._G):
H.append((1 - self.t) * g + self.t * f) H.append((1 - self.t) * g + self.t * f)
return H return sp.MutableMatrix(H)
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix: def _create_DH(self) -> sp.MutableMatrix:
all_vars = self.x_vars + [self.t] all_vars = self.x_vars + [self.t]
DH = sp.Matrix([[sp.diff(expr, var) DH = sp.Matrix([[sp.diff(expr, var)
for var in all_vars] for expr in self.H]) for var in all_vars] for expr in self._H])
return DH return DH
def _create_H_lambda(self) -> Callable: def _create_F_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_G_lambda(self, expr) -> Callable:
all_vars = self.x_vars
return sp.lambdify(all_vars, expr, 'numpy')
def _create_H_lambda(self, expr) -> Callable:
all_vars = self.x_vars + [self.t] all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.H, 'numpy') return sp.lambdify(all_vars, expr, 'numpy')
def _create_DH_lambda(self) -> Callable: def _create_DH_lambda(self, expr) -> Callable:
all_vars = self.x_vars + [self.t] all_vars = self.x_vars + [self.t]
return sp.lambdify(all_vars, self.DH, 'numpy') return sp.lambdify(all_vars, expr, 'numpy')
def evaluate_H(self, y: np.ndarray) -> np.ndarray: def H(self, y):
"""
Evaluate H at point y.
Args:
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns:
Array containing H evaluated at y.
"""
return np.array(self._H_lambda(*y)) return np.array(self._H_lambda(*y))
def evaluate_DH(self, y: np.ndarray) -> np.ndarray: def DH(self, y):
""" return np.array(self._DH_lambda(*y))
Evaluate the Jacobian of H at point y.
Args:
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
and t is the homotopy parameter.
Returns: def main():
Matrix containing the Jacobian of H evaluated at y. import utility
""" H = utility.read_alist_file(
return np.array(self._DH_lambda(*y), dtype=float) "/home/andreas/workspace/work/hiwi/ba-sw/codes/BCH_7_4.alist")
a = HomotopyGenerator(H)
y = np.array([0.1, 0.2, 0.9, 0.1, -0.8, -0.5, -1.0, 0])
print(a.DH(y))
if __name__ == "__main__":
main()

View File

@@ -3,10 +3,6 @@ import typing
import scipy import scipy
def _sign(val):
return -1 * (val < 0) + 1 * (val >= 0)
class PathTracker: class PathTracker:
""" """
Path trakcer for the homotopy continuation method. Uses a Path trakcer for the homotopy continuation method. Uses a
@@ -18,9 +14,9 @@ class PathTracker:
Information and Systems, vol. 15, no. 2, pp. 119-307, 2015 Information and Systems, vol. 15, no. 2, pp. 119-307, 2015
""" """
def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5, def __init__(self, homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
newton_convergence_threshold=0.001, sigma=1): newton_convergence_threshold=0.001, sigma=1):
self.Homotopy = Homotopy self._homotopy = homotopy
self._euler_step_size = euler_step_size self._euler_step_size = euler_step_size
self._euler_max_tries = euler_max_tries self._euler_max_tries = euler_max_tries
self._newton_max_iter = newton_max_iter self._newton_max_iter = newton_max_iter
@@ -29,7 +25,7 @@ class PathTracker:
def step(self, y): def step(self, y):
"""Perform one predictor-corrector step.""" """Perform one predictor-corrector step."""
return self.transparent_step(y)[0] return self.transparent_step(y)[3]
def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]: def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]:
"""Perform one predictor-corrector step, returning intermediate results.""" """Perform one predictor-corrector step, returning intermediate results."""
@@ -43,21 +39,14 @@ class PathTracker:
raise RuntimeError("Newton corrector did not converge") raise RuntimeError("Newton corrector did not converge")
# TODO: Make sure the implementation with null_space instead of QR is correct
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]: def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
# Obtain y_prime # Obtain y_prime
DH = self.Homotopy.evaluate_DH(y) DH = self._homotopy.DH(y)
ns = scipy.linalg.null_space(DH) ns = scipy.linalg.null_space(DH)
y_prime = ns[:, 0] * self._sigma y_prime = ns[:, 0] * self._sigma
# Q, R = np.linalg.qr(np.transpose(DH), mode="complete")
# y_prime = Q[:, 2]
#
# if _sign(np.linalg.det(Q)*np.linalg.det(R[:2, :])) != _sign(self._sigma):
# y_prime = -y_prime
# Perform prediction # Perform prediction
y_hat = y + step_size*y_prime y_hat = y + step_size*y_prime
@@ -70,10 +59,10 @@ class PathTracker:
for _ in range(self._newton_max_iter): for _ in range(self._newton_max_iter):
# Perform correction # Perform correction
DH = self.Homotopy.evaluate_DH(y) DH = self._homotopy.DH(y)
DH_pinv = np.linalg.pinv(DH) DH_pinv = np.linalg.pinv(DH)
y = y - DH_pinv @ self.Homotopy.evaluate_H(y) y = y - np.transpose(DH_pinv @ self._homotopy.H(y))[0]
# Check stopping criterion # Check stopping criterion
@@ -83,3 +72,6 @@ class PathTracker:
prev_y = y prev_y = y
raise RuntimeError("Newton corrector did not converge") raise RuntimeError("Newton corrector did not converge")
def set_sigma(self, sigma):
self._sigma = sigma

29
python/hccd/utility.py Normal file
View File

@@ -0,0 +1,29 @@
import numpy as np
def _parse_alist_header(header):
size = header.split()
return int(size[0]), int(size[1])
def read_alist_file(filename):
"""
This function reads in an alist file and creates the
corresponding parity check matrix H. The format of alist
files is described at:
http://www.inference.phy.cam.ac.uk/mackay/codes/alist.html
"""
with open(filename, 'r') as myfile:
data = myfile.readlines()
numCols, numRows = _parse_alist_header(data[0])
H = np.zeros((numRows, numCols))
# The locations of 1s starts in the 5th line of the file
for lineNumber in np.arange(4, 4 + numCols):
indices = data[lineNumber].split()
for index in indices:
H[int(index) - 1, lineNumber - 4] = 1
return H.astype(np.int32)