Add files from test project
This commit is contained in:
142
python/examples/repetition_code.py
Normal file
142
python/examples/repetition_code.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import argparse
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
# autopep8: off
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
|
||||
|
||||
# TODO: How do I import PathTracker and HomotopyGenerator properly?
|
||||
from hccd import path_tracker, homotopy_generator
|
||||
# autopep8: on
|
||||
|
||||
|
||||
# class RepetitionCodeHomotopy:
|
||||
# """Helper type implementing necessary functions for PathTracker.
|
||||
#
|
||||
# Repetiton code homotopy:
|
||||
# G = [[x1],
|
||||
# [x2],
|
||||
# [x1]]
|
||||
#
|
||||
# F = [[1 - x1**2],
|
||||
# [1 - x2**2],
|
||||
# [1 - x1*x2]]
|
||||
#
|
||||
# H = (1-t)*G + t*F
|
||||
#
|
||||
# Note that
|
||||
# y := [[x1],
|
||||
# [x2],
|
||||
# [t]]
|
||||
# """
|
||||
# @staticmethod
|
||||
# def evaluate_H(y: np.ndarray) -> np.ndarray:
|
||||
# """Evaluate H at y."""
|
||||
# x1 = y[0]
|
||||
# x2 = y[1]
|
||||
# t = y[2]
|
||||
#
|
||||
# print(y)
|
||||
#
|
||||
# result = np.zeros(shape=3)
|
||||
# result[0] = -t*x1**2 + x1*(1-t) + t
|
||||
# result[1] = -t*x2**2 + x2*(1-t) + t
|
||||
# result[2] = -t*x1*x2 + x1*(1-t) + t
|
||||
#
|
||||
# return result
|
||||
#
|
||||
# @staticmethod
|
||||
# def evaluate_DH(y: np.ndarray) -> np.ndarray:
|
||||
# """Evaluate Jacobian of H at y."""
|
||||
# x1 = y[0]
|
||||
# x2 = y[1]
|
||||
# t = y[2]
|
||||
#
|
||||
# result = np.zeros(shape=(3, 3))
|
||||
# result[0, 0] = -2*t*x1 + (1-t)
|
||||
# result[0, 1] = 0
|
||||
# result[0, 2] = -x1**2 - x1 + 1
|
||||
# result[1, 0] = 0
|
||||
# result[1, 1] = -2*t*x2 + (1-t)
|
||||
# result[1, 2] = -x2**2 - x2 + 1
|
||||
# result[1, 0] = -t*x2 + (1-t)
|
||||
# result[1, 1] = -t*x1
|
||||
# result[1, 2] = -x1*x2 - x1 + 1
|
||||
#
|
||||
# return result
|
||||
|
||||
|
||||
def track_path(args):
|
||||
H = np.array([[1, 1, 1]])
|
||||
homotopy = homotopy_generator.HomotopyGenerator(H)
|
||||
|
||||
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||
|
||||
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
|
||||
|
||||
try:
|
||||
y = np.zeros(3)
|
||||
for i in range(args.num_iterations):
|
||||
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
||||
ys_start.append(y_start)
|
||||
ys_prime.append(y_prime)
|
||||
ys_hat_e.append(y_hat_e)
|
||||
ys.append(y)
|
||||
print(f"Iteration {i}: {y}")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
ys_start = np.array(ys_start)
|
||||
ys_prime = np.array(ys_prime)
|
||||
ys_hat_e = np.array(ys_hat_e)
|
||||
ys = np.array(ys)
|
||||
|
||||
df = pd.DataFrame({"x1b": ys_start[:, 0],
|
||||
"x2b": ys_start[:, 1],
|
||||
"tb": ys_start[:, 2],
|
||||
"x1p": ys_prime[:, 0],
|
||||
"x2p": ys_prime[:, 1],
|
||||
"tp": ys_prime[:, 2],
|
||||
"x1e": ys_hat_e[:, 0],
|
||||
"x2e": ys_hat_e[:, 1],
|
||||
"te": ys_hat_e[:, 2],
|
||||
"x1n": ys[:, 0],
|
||||
"x2n": ys[:, 1],
|
||||
"tn": ys[:, 2]
|
||||
})
|
||||
|
||||
if args.output:
|
||||
df.to_csv(args.output, index=False)
|
||||
else:
|
||||
print(df)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Homotopy continuation path tracker')
|
||||
|
||||
parser.add_argument("--verbose", default=False, action='store_true')
|
||||
parser.add_argument("--euler-step-size", type=float,
|
||||
default=0.05, help="Step size for Euler predictor")
|
||||
parser.add_argument("--euler-max-tries", type=int, default=5,
|
||||
help="Maximum number of tries for Euler predictor")
|
||||
parser.add_argument("--newton-max-iter", type=int, default=5,
|
||||
help="Maximum number of iterations for Newton corrector")
|
||||
parser.add_argument("--newton-convergence-threshold", type=float,
|
||||
default=0.01, help="Convergence threshold for Newton corrector")
|
||||
parser.add_argument("-s", "--sigma", type=int, default=1,
|
||||
help="Direction in which the path is traced")
|
||||
parser.add_argument("-o", "--output", type=str, help="Output csv file")
|
||||
parser.add_argument("-n", "--num-iterations", type=int, default=20,
|
||||
help="Number of iterations of the example program to run")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
track_path(args)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
131
python/examples/toy_homotopy.py
Normal file
131
python/examples/toy_homotopy.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import argparse
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
# autopep8: off
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(f"{os.path.dirname(os.path.abspath(__file__))}/../")
|
||||
|
||||
# TODO: How do I import PathTracker and HomotopyGenerator properly?
|
||||
from hccd import path_tracker, homotopy_generator
|
||||
# autopep8: on
|
||||
|
||||
|
||||
class ToyHomotopy:
|
||||
"""Helper type implementing necessary functions for PathTracker.
|
||||
|
||||
Toy example homotopy:
|
||||
G = [[x1],
|
||||
[x2]]
|
||||
|
||||
F = [[x1 + x2 ],
|
||||
[x2 + 0.5]]
|
||||
|
||||
H = (1-t)*G + t*F
|
||||
|
||||
Note that
|
||||
y := [[x1],
|
||||
[x2],
|
||||
[t]]
|
||||
"""
|
||||
@staticmethod
|
||||
def evaluate_H(y: np.ndarray) -> np.ndarray:
|
||||
"""Evaluate H at y."""
|
||||
x1 = y[0]
|
||||
x2 = y[1]
|
||||
t = y[2]
|
||||
|
||||
result = np.zeros(shape=2)
|
||||
result[0] = x1 + t * x2
|
||||
result[1] = x2 + t * 0.5
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def evaluate_DH(y: np.ndarray) -> np.ndarray:
|
||||
"""Evaluate Jacobian of H at y."""
|
||||
x1 = y[0]
|
||||
x2 = y[1]
|
||||
t = y[2]
|
||||
|
||||
result = np.zeros(shape=(2, 3))
|
||||
result[0, 0] = 1
|
||||
result[0, 1] = t
|
||||
result[0, 2] = x2
|
||||
result[1, 0] = 0
|
||||
result[1, 1] = 1
|
||||
result[1, 2] = 0.5
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def track_path(args):
|
||||
tracker = path_tracker.PathTracker(ToyHomotopy, args.euler_step_size, args.euler_max_tries,
|
||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||
|
||||
ys_start, ys_prime, ys_hat_e, ys = [], [], [], []
|
||||
|
||||
try:
|
||||
y = np.zeros(3)
|
||||
for i in range(args.num_iterations):
|
||||
y_start, y_prime, y_hat_e, y = tracker.transparent_step(y)
|
||||
ys_start.append(y_start)
|
||||
ys_prime.append(y_prime)
|
||||
ys_hat_e.append(y_hat_e)
|
||||
ys.append(y)
|
||||
print(f"Iteration {i}: {y}")
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
ys_start = np.array(ys_start)
|
||||
ys_prime = np.array(ys_prime)
|
||||
ys_hat_e = np.array(ys_hat_e)
|
||||
ys = np.array(ys)
|
||||
|
||||
df = pd.DataFrame({"x1b": ys_start[:, 0],
|
||||
"x2b": ys_start[:, 1],
|
||||
"tb": ys_start[:, 2],
|
||||
"x1p": ys_prime[:, 0],
|
||||
"x2p": ys_prime[:, 1],
|
||||
"tp": ys_prime[:, 2],
|
||||
"x1e": ys_hat_e[:, 0],
|
||||
"x2e": ys_hat_e[:, 1],
|
||||
"te": ys_hat_e[:, 2],
|
||||
"x1n": ys[:, 0],
|
||||
"x2n": ys[:, 1],
|
||||
"tn": ys[:, 2]
|
||||
})
|
||||
|
||||
if args.output:
|
||||
df.to_csv(args.output, index=False)
|
||||
else:
|
||||
print(df)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Homotopy continuation path tracker')
|
||||
|
||||
parser.add_argument("--verbose", default=False, action='store_true')
|
||||
parser.add_argument("--euler-step-size", type=float,
|
||||
default=0.05, help="Step size for Euler predictor")
|
||||
parser.add_argument("--euler-max-tries", type=int, default=5,
|
||||
help="Maximum number of tries for Euler predictor")
|
||||
parser.add_argument("--newton-max-iter", type=int, default=5,
|
||||
help="Maximum number of iterations for Newton corrector")
|
||||
parser.add_argument("--newton-convergence-threshold", type=float,
|
||||
default=0.01, help="Convergence threshold for Newton corrector")
|
||||
parser.add_argument("-s", "--sigma", type=int, default=1,
|
||||
help="Direction in which the path is traced")
|
||||
parser.add_argument("-o", "--output", type=str, help="Output csv file")
|
||||
parser.add_argument("-n", "--num-iterations", type=int, default=20,
|
||||
help="Number of iterations of the example program to run")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
track_path(args)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
0
python/hccd/__init__.py
Normal file
0
python/hccd/__init__.py
Normal file
6
python/hccd/__main__.py
Normal file
6
python/hccd/__main__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
def main():
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
117
python/hccd/homotopy_generator.py
Normal file
117
python/hccd/homotopy_generator.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import numpy as np
|
||||
import sympy as sp
|
||||
from typing import List, Callable
|
||||
|
||||
|
||||
class HomotopyGenerator:
|
||||
"""Generates homotopy functions from a binary parity check matrix."""
|
||||
|
||||
def __init__(self, parity_check_matrix: np.ndarray):
|
||||
"""
|
||||
Initialize with a parity check matrix.
|
||||
|
||||
Args:
|
||||
parity_check_matrix: Binary matrix where rows represent parity checks
|
||||
and columns represent variables.
|
||||
"""
|
||||
self.H_matrix = parity_check_matrix
|
||||
self.num_checks, self.num_vars = parity_check_matrix.shape
|
||||
|
||||
# Create symbolic variables
|
||||
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
||||
self.t = sp.symbols('t')
|
||||
|
||||
# Generate G, F, and H
|
||||
self.G = self._create_G()
|
||||
self.F = self._create_F()
|
||||
self.H = self._create_H()
|
||||
|
||||
# Convert to callable functions
|
||||
self._H_lambda = self._create_H_lambda()
|
||||
self._DH_lambda = self._create_DH_lambda()
|
||||
|
||||
def _create_G(self) -> List[sp.Expr]:
|
||||
"""Create G polynomial system (the starting system)."""
|
||||
# For each variable xi, add the polynomial [xi]
|
||||
G = []
|
||||
for var in self.x_vars:
|
||||
G.append(var)
|
||||
|
||||
return G
|
||||
|
||||
def _create_F(self) -> List[sp.Expr]:
|
||||
"""Create F polynomial system (the target system)."""
|
||||
F = []
|
||||
|
||||
# Add 1 - xi^2 for each variable
|
||||
for var in self.x_vars:
|
||||
F.append(1 - var**2)
|
||||
|
||||
# Add parity check polynomials: 1 - x1*x2*...*xk for each parity check
|
||||
for row in self.H_matrix:
|
||||
# Create product of variables that participate in this check
|
||||
term = 1
|
||||
for i, bit in enumerate(row):
|
||||
if bit == 1:
|
||||
term *= self.x_vars[i]
|
||||
|
||||
if term != 1: # Only add if there are variables in this check
|
||||
F.append(1 - term)
|
||||
|
||||
return F
|
||||
|
||||
def _create_H(self) -> List[sp.Expr]:
|
||||
"""Create the homotopy H = (1-t)*G + t*F."""
|
||||
H = []
|
||||
|
||||
# Make sure G and F have the same length
|
||||
# Repeat variables from G if needed to match F's length
|
||||
G_extended = self.G.copy()
|
||||
while len(G_extended) < len(self.F):
|
||||
# Cycle through variables to repeat
|
||||
for i in range(min(self.num_vars, len(self.F) - len(G_extended))):
|
||||
G_extended.append(self.x_vars[i % self.num_vars])
|
||||
|
||||
# Create the homotopy
|
||||
for g, f in zip(G_extended, self.F):
|
||||
H.append((1 - self.t) * g + self.t * f)
|
||||
|
||||
return H
|
||||
|
||||
def _create_H_lambda(self) -> Callable:
|
||||
"""Create a lambda function to evaluate H."""
|
||||
all_vars = self.x_vars + [self.t]
|
||||
return sp.lambdify(all_vars, self.H, 'numpy')
|
||||
|
||||
def _create_DH_lambda(self) -> Callable:
|
||||
"""Create a lambda function to evaluate the Jacobian of H."""
|
||||
all_vars = self.x_vars + [self.t]
|
||||
jacobian = sp.Matrix([[sp.diff(expr, var)
|
||||
for var in all_vars] for expr in self.H])
|
||||
return sp.lambdify(all_vars, jacobian, 'numpy')
|
||||
|
||||
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Evaluate H at point y.
|
||||
|
||||
Args:
|
||||
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
|
||||
and t is the homotopy parameter.
|
||||
|
||||
Returns:
|
||||
Array containing H evaluated at y.
|
||||
"""
|
||||
return np.array(self._H_lambda(*y))
|
||||
|
||||
def evaluate_DH(self, y: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Evaluate the Jacobian of H at point y.
|
||||
|
||||
Args:
|
||||
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
|
||||
and t is the homotopy parameter.
|
||||
|
||||
Returns:
|
||||
Matrix containing the Jacobian of H evaluated at y.
|
||||
"""
|
||||
return np.array(self._DH_lambda(*y), dtype=float)
|
||||
84
python/hccd/path_tracker.py
Normal file
84
python/hccd/path_tracker.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import numpy as np
|
||||
import typing
|
||||
import scipy
|
||||
|
||||
|
||||
def _sign(val):
|
||||
return -1 * (val < 0) + 1 * (val >= 0)
|
||||
|
||||
|
||||
class PathTracker:
|
||||
"""
|
||||
Path trakcer for the homotopy continuation method. Uses a
|
||||
predictor-corrector scheme to trace a path defined by a homotopy.
|
||||
|
||||
References:
|
||||
[1] T. Chen and T.-Y. Li, “Homotopy continuation method for solving
|
||||
systems of nonlinear and polynomial equations,” Communications in
|
||||
Information and Systems, vol. 15, no. 2, pp. 119–307, 2015
|
||||
"""
|
||||
|
||||
def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
|
||||
newton_convergence_threshold=0.001, sigma=1):
|
||||
self.Homotopy = Homotopy
|
||||
self._euler_step_size = euler_step_size
|
||||
self._euler_max_tries = euler_max_tries
|
||||
self._newton_max_iter = newton_max_iter
|
||||
self._newton_convergence_threshold = newton_convergence_threshold
|
||||
self._sigma = sigma
|
||||
|
||||
def step(self, y):
|
||||
"""Perform one predictor-corrector step."""
|
||||
return self.transparent_step(y)[0]
|
||||
|
||||
def transparent_step(self, y) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,]:
|
||||
"""Perform one predictor-corrector step, returning intermediate results."""
|
||||
for i in range(self._euler_max_tries):
|
||||
step_size = self._euler_step_size / (1 << i)
|
||||
|
||||
y_hat, y_prime = self._perform_euler_predictor_step(y, step_size)
|
||||
y_hat_n = self._perform_newtown_corrector_step(y_hat)
|
||||
|
||||
return y, y_prime, y_hat, y_hat_n
|
||||
|
||||
raise RuntimeError("Newton corrector did not converge")
|
||||
|
||||
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
|
||||
# Obtain y_prime
|
||||
|
||||
DH = self.Homotopy.evaluate_DH(y)
|
||||
ns = scipy.linalg.null_space(DH)
|
||||
|
||||
y_prime = ns[:, 0] * self._sigma
|
||||
|
||||
# Q, R = np.linalg.qr(np.transpose(DH), mode="complete")
|
||||
# y_prime = Q[:, 2]
|
||||
#
|
||||
# if _sign(np.linalg.det(Q)*np.linalg.det(R[:2, :])) != _sign(self._sigma):
|
||||
# y_prime = -y_prime
|
||||
|
||||
# Perform prediction
|
||||
|
||||
y_hat = y + step_size*y_prime
|
||||
|
||||
return y_hat, y_prime
|
||||
|
||||
def _perform_newtown_corrector_step(self, y) -> np.ndarray:
|
||||
prev_y = y
|
||||
|
||||
for _ in range(self._newton_max_iter):
|
||||
# Perform correction
|
||||
|
||||
DH = self.Homotopy.evaluate_DH(y)
|
||||
DH_pinv = np.linalg.pinv(DH)
|
||||
|
||||
y = y - DH_pinv @ self.Homotopy.evaluate_H(y)
|
||||
|
||||
# Check stopping criterion
|
||||
|
||||
if np.linalg.norm(y - prev_y) < self._newton_convergence_threshold:
|
||||
return y
|
||||
|
||||
prev_y = y
|
||||
|
||||
raise RuntimeError("Newton corrector did not converge")
|
||||
Reference in New Issue
Block a user