diff --git a/python/examples/repetition_code.py b/python/examples/repetition_code.py index c2571fe..9a3bfbc 100644 --- a/python/examples/repetition_code.py +++ b/python/examples/repetition_code.py @@ -18,12 +18,14 @@ def track_path(args): [0, 1, 1, 0], [0, 0, 1, 1]]) - homotopy = homotopy_generator.HomotopyGenerator(H) + received = np.array([0, 0, 0, 0]) - print(f"G: {homotopy.G}") - print(f"F: {homotopy.F}") - print(f"H: {homotopy.H}") - print(f"DH: {homotopy.DH}") + homotopy = homotopy_generator.HomotopyGenerator(H, received) + + # print(f"G: {homotopy.G}") + # print(f"F: {homotopy.F}") + # print(f"H: {homotopy.H}") + # print(f"DH: {homotopy.DH}") tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, args.newton_max_iter, args.newton_convergence_threshold, args.sigma) diff --git a/python/examples/simulate_error_rate.py b/python/examples/simulate_error_rate.py index e64a8a8..c71e3ad 100644 --- a/python/examples/simulate_error_rate.py +++ b/python/examples/simulate_error_rate.py @@ -54,35 +54,35 @@ def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple G = H_GF.null_space() k, n = G.shape - homotopy = homotopy_generator.HomotopyGenerator(H) - - # print(f"G: {homotopy.G}") - # print(f"F: {homotopy.F}") - # print(f"H: {homotopy.H}") - # print(f"DH: {homotopy.DH}") - - tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, - args.newton_max_iter, args.newton_convergence_threshold, args.sigma) - num_frames = 0 bit_errors = 0 frame_errors = 0 decoding_failures = 0 + homotopy = homotopy_generator.HomotopyGenerator(H) + tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries, + args.newton_max_iter, args.newton_convergence_threshold, args.sigma) + for _ in tqdm(range(args.max_frames)): Eb_N0_lin = 10**(Eb_N0 / 10) N0 = 1 / (2 * k / n * Eb_N0_lin) y = np.zeros(n) + np.sqrt(N0) * np.random.normal(size=n) + homotopy.update_received(y) + x_hat = decode(tracker, y, H, args) + if np.any(np.mod(H @ x_hat, 2)): + tracker.set_sigma(-1 * args.sigma) + x_hat = decode(tracker, y, H, args) + tracker.set_sigma(args.sigma) + + if np.any(np.mod(H @ x_hat, 2)): + decoding_failures += 1 + bit_errors += np.sum(x_hat != np.zeros(n)) frame_errors += np.any(x_hat != np.zeros(n)) - - if np.any(np.mod(H @ x_hat, 2)): - decoding_failures += 1 - num_frames += 1 if frame_errors >= args.target_frame_errors: @@ -160,8 +160,10 @@ def main(): FERs, BERs, DFRs, frame_errors_list = simulate_error_rates( H, SNRs, simulation_args) - df = pd.DataFrame({"SNR": SNRs, "FER": FERs, "BER": BERs, "DFR": DFRs, "frame_errors": frame_errors_list}) + df = pd.DataFrame({"SNR": SNRs, "FER": FERs, "BER": BERs, + "DFR": DFRs, "frame_errors": frame_errors_list}) print(df) + if __name__ == "__main__": main() diff --git a/python/examples/toy_homotopy.py b/python/examples/toy_homotopy.py index 1e6925b..e284b2d 100644 --- a/python/examples/toy_homotopy.py +++ b/python/examples/toy_homotopy.py @@ -30,7 +30,7 @@ class ToyHomotopy: [t]] """ @staticmethod - def evaluate_H(y: np.ndarray) -> np.ndarray: + def H(y: np.ndarray) -> np.ndarray: """Evaluate H at y.""" x1 = y[0] x2 = y[1] @@ -43,7 +43,7 @@ class ToyHomotopy: return result @staticmethod - def evaluate_DH(y: np.ndarray) -> np.ndarray: + def DH(y: np.ndarray) -> np.ndarray: """Evaluate Jacobian of H at y.""" x1 = y[0] x2 = y[1] diff --git a/python/hccd/homotopy_generator.py b/python/hccd/homotopy_generator.py index bd6f423..e6e3ff0 100644 --- a/python/hccd/homotopy_generator.py +++ b/python/hccd/homotopy_generator.py @@ -20,20 +20,23 @@ class HomotopyGenerator: self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)] self.t = sp.symbols('t') - self.G = self._create_G() - self.F = self._create_F() - self.H = self._create_H() - self.DH = self._create_DH(self.H) + self._F = self._create_F() + self._F_lambda = self._create_F_lambda(self._F) + # self._G = self._create_G(received) + # self._G_lambda = self._create_G_lambda(self._G) + # self._H = self._create_H() + # self._H_lambda = self._create_H_lambda(self._H) + # self._DH = self._create_DH() + # self._DH_lambda = self._create_DH_lambda(self._DH) - self._H_lambda = self._create_H_lambda() - self._DH_lambda = self._create_DH_lambda() - - def _create_G(self) -> List[sp.Expr]: - G = [] - for var in self.x_vars: - G.append(var) - - return G + def update_received(self, received: np.ndarray): + """Update the received vector and recompute G.""" + self._G = self._create_G(received) + self._G_lambda = self._create_G_lambda(self._G) + self._H = self._create_H() + self._H_lambda = self._create_H_lambda(self._H) + self._DH = self._create_DH() + self._DH_lambda = self._create_DH_lambda(self._DH) def _create_F(self) -> sp.MutableMatrix: F = [] @@ -52,51 +55,63 @@ class HomotopyGenerator: groebner_basis = sp.groebner(F) return sp.MutableMatrix(groebner_basis) - def _create_H(self) -> List[sp.Expr]: + def _create_G(self, received) -> sp.MutableMatrix: + G = [] + F_y = self._F_lambda(*received) + + for f, f_y_i in zip(self._F, F_y): + G.append(f - (1 - self.t) * f_y_i) + + return sp.MutableMatrix(G) + + def _create_H(self) -> sp.MutableMatrix: H = [] - for g, f in zip(self.G, self.F): + for f, g in zip(self._F, self._G): H.append((1 - self.t) * g + self.t * f) - return H + return sp.MutableMatrix(H) - def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix: + def _create_DH(self) -> sp.MutableMatrix: all_vars = self.x_vars + [self.t] DH = sp.Matrix([[sp.diff(expr, var) - for var in all_vars] for expr in self.H]) + for var in all_vars] for expr in self._H]) return DH - def _create_H_lambda(self) -> Callable: + def _create_F_lambda(self, expr) -> Callable: + all_vars = self.x_vars + return sp.lambdify(all_vars, expr, 'numpy') + + def _create_G_lambda(self, expr) -> Callable: + all_vars = self.x_vars + return sp.lambdify(all_vars, expr, 'numpy') + + def _create_H_lambda(self, expr) -> Callable: all_vars = self.x_vars + [self.t] - return sp.lambdify(all_vars, self.H, 'numpy') + return sp.lambdify(all_vars, expr, 'numpy') - def _create_DH_lambda(self) -> Callable: + def _create_DH_lambda(self, expr) -> Callable: all_vars = self.x_vars + [self.t] - return sp.lambdify(all_vars, self.DH, 'numpy') + return sp.lambdify(all_vars, expr, 'numpy') - def evaluate_H(self, y: np.ndarray) -> np.ndarray: - """ - Evaluate H at point y. - - Args: - y: Array of form [x1, x2, ..., xn, t] where xi are the variables - and t is the homotopy parameter. - - Returns: - Array containing H evaluated at y. - """ + def H(self, y): return np.array(self._H_lambda(*y)) - def evaluate_DH(self, y: np.ndarray) -> np.ndarray: - """ - Evaluate the Jacobian of H at point y. + def DH(self, y): + return np.array(self._DH_lambda(*y)) - Args: - y: Array of form [x1, x2, ..., xn, t] where xi are the variables - and t is the homotopy parameter. - Returns: - Matrix containing the Jacobian of H evaluated at y. - """ - return np.array(self._DH_lambda(*y), dtype=float) +def main(): + import utility + H = utility.read_alist_file( + "/home/andreas/workspace/work/hiwi/ba-sw/codes/BCH_7_4.alist") + a = HomotopyGenerator(H) + + y = np.array([0.1, 0.2, 0.9, 0.1, -0.8, -0.5, -1.0, 0]) + + print(a.DH(y)) + + +if __name__ == "__main__": + main() diff --git a/python/hccd/path_tracker.py b/python/hccd/path_tracker.py index f42a98f..9635ff5 100644 --- a/python/hccd/path_tracker.py +++ b/python/hccd/path_tracker.py @@ -18,9 +18,9 @@ class PathTracker: Information and Systems, vol. 15, no. 2, pp. 119-307, 2015 """ - def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5, + def __init__(self, homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5, newton_convergence_threshold=0.001, sigma=1): - self.Homotopy = Homotopy + self._homotopy = homotopy self._euler_step_size = euler_step_size self._euler_max_tries = euler_max_tries self._newton_max_iter = newton_max_iter @@ -47,7 +47,7 @@ class PathTracker: def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]: # Obtain y_prime - DH = self.Homotopy.evaluate_DH(y) + DH = self._homotopy.DH(y) ns = scipy.linalg.null_space(DH) y_prime = ns[:, 0] * self._sigma @@ -70,10 +70,10 @@ class PathTracker: for _ in range(self._newton_max_iter): # Perform correction - DH = self.Homotopy.evaluate_DH(y) + DH = self._homotopy.DH(y) DH_pinv = np.linalg.pinv(DH) - y = y - DH_pinv @ self.Homotopy.evaluate_H(y) + y = y - np.transpose(DH_pinv @ self._homotopy.H(y))[0] # Check stopping criterion @@ -83,3 +83,6 @@ class PathTracker: prev_y = y raise RuntimeError("Newton corrector did not converge") + + def set_sigma(self, sigma): + self._sigma = sigma