Compare commits
3 Commits
fffb15595b
...
f12339f89f
| Author | SHA1 | Date | |
|---|---|---|---|
| f12339f89f | |||
| ad274699a3 | |||
| 5529cd92cc |
@ -18,12 +18,14 @@ def track_path(args):
|
|||||||
[0, 1, 1, 0],
|
[0, 1, 1, 0],
|
||||||
[0, 0, 1, 1]])
|
[0, 0, 1, 1]])
|
||||||
|
|
||||||
homotopy = homotopy_generator.HomotopyGenerator(H)
|
received = np.array([0, 0, 0, 0])
|
||||||
|
|
||||||
print(f"G: {homotopy.G}")
|
homotopy = homotopy_generator.HomotopyGenerator(H, received)
|
||||||
print(f"F: {homotopy.F}")
|
|
||||||
print(f"H: {homotopy.H}")
|
# print(f"G: {homotopy.G}")
|
||||||
print(f"DH: {homotopy.DH}")
|
# print(f"F: {homotopy.F}")
|
||||||
|
# print(f"H: {homotopy.H}")
|
||||||
|
# print(f"DH: {homotopy.DH}")
|
||||||
|
|
||||||
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
||||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||||
|
|||||||
@ -29,17 +29,14 @@ class SimulationArgs:
|
|||||||
|
|
||||||
|
|
||||||
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
|
def decode(tracker, y, H, args: SimulationArgs) -> np.ndarray:
|
||||||
x_hat = np.mod(np.round(y), 2).astype('int32')
|
x_hat = np.where(y >= 0, 0, 1)
|
||||||
|
|
||||||
s = np.concatenate([y, np.array([0])])
|
s = np.concatenate([y, np.array([0])])
|
||||||
for i in range(args.homotopy_iter):
|
for i in range(args.homotopy_iter):
|
||||||
x_hat = np.mod(np.round(s[:-1]), 2).astype('int32')
|
x_hat = np.where(s[:-1] >= 0, 0, 1)
|
||||||
if not np.any(np.mod(H @ x_hat, 2)):
|
if not np.any(np.mod(H @ x_hat, 2)):
|
||||||
return x_hat
|
return x_hat
|
||||||
|
|
||||||
# if s[-1] > 1.5:
|
|
||||||
# return x_hat
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
s = tracker.step(s)
|
s = tracker.step(s)
|
||||||
except:
|
except:
|
||||||
@ -54,35 +51,41 @@ def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple
|
|||||||
G = H_GF.null_space()
|
G = H_GF.null_space()
|
||||||
k, n = G.shape
|
k, n = G.shape
|
||||||
|
|
||||||
homotopy = homotopy_generator.HomotopyGenerator(H)
|
|
||||||
|
|
||||||
# print(f"G: {homotopy.G}")
|
|
||||||
# print(f"F: {homotopy.F}")
|
|
||||||
# print(f"H: {homotopy.H}")
|
|
||||||
# print(f"DH: {homotopy.DH}")
|
|
||||||
|
|
||||||
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
|
||||||
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
|
||||||
|
|
||||||
num_frames = 0
|
num_frames = 0
|
||||||
|
|
||||||
bit_errors = 0
|
bit_errors = 0
|
||||||
frame_errors = 0
|
frame_errors = 0
|
||||||
decoding_failures = 0
|
decoding_failures = 0
|
||||||
|
|
||||||
|
homotopy = homotopy_generator.HomotopyGenerator(H)
|
||||||
|
tracker = path_tracker.PathTracker(homotopy, args.euler_step_size, args.euler_max_tries,
|
||||||
|
args.newton_max_iter, args.newton_convergence_threshold, args.sigma)
|
||||||
|
|
||||||
for _ in tqdm(range(args.max_frames)):
|
for _ in tqdm(range(args.max_frames)):
|
||||||
Eb_N0_lin = 10**(Eb_N0 / 10)
|
Eb_N0_lin = 10**(Eb_N0 / 10)
|
||||||
N0 = 1 / (2 * k / n * Eb_N0_lin)
|
N0 = 1 / (2 * k / n * Eb_N0_lin)
|
||||||
y = np.zeros(n) + np.sqrt(N0) * np.random.normal(size=n)
|
|
||||||
|
|
||||||
x_hat = decode(tracker, y, H, args)
|
u = np.random.randint(2, size=k)
|
||||||
|
# u = np.zeros(shape=k).astype(np.int32)
|
||||||
|
c = np.array(GF(u) @ G)
|
||||||
|
x = 1 - 2*c
|
||||||
|
|
||||||
bit_errors += np.sum(x_hat != np.zeros(n))
|
y = x + np.sqrt(N0) * np.random.normal(size=n)
|
||||||
frame_errors += np.any(x_hat != np.zeros(n))
|
|
||||||
|
|
||||||
if np.any(np.mod(H @ x_hat, 2)):
|
homotopy.update_received(y)
|
||||||
decoding_failures += 1
|
|
||||||
|
|
||||||
|
c_hat = decode(tracker, y, H, args)
|
||||||
|
|
||||||
|
if np.any(np.mod(H @ c_hat, 2)):
|
||||||
|
tracker.set_sigma(-1 * args.sigma)
|
||||||
|
c_hat = decode(tracker, y, H, args)
|
||||||
|
tracker.set_sigma(args.sigma)
|
||||||
|
|
||||||
|
if np.any(np.mod(H @ c_hat, 2)):
|
||||||
|
decoding_failures += 1
|
||||||
|
|
||||||
|
bit_errors += np.sum(c_hat != c)
|
||||||
|
frame_errors += np.any(c_hat != c)
|
||||||
num_frames += 1
|
num_frames += 1
|
||||||
|
|
||||||
if frame_errors >= args.target_frame_errors:
|
if frame_errors >= args.target_frame_errors:
|
||||||
@ -95,7 +98,7 @@ def simulate_error_rates_for_SNR(H, Eb_N0, args: SimulationArgs) -> typing.Tuple
|
|||||||
return FER, BER, DFR, frame_errors
|
return FER, BER, DFR, frame_errors
|
||||||
|
|
||||||
|
|
||||||
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> typing.Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
|
def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> pd.DataFrame:
|
||||||
FERs = []
|
FERs = []
|
||||||
BERs = []
|
BERs = []
|
||||||
DFRs = []
|
DFRs = []
|
||||||
@ -106,8 +109,11 @@ def simulate_error_rates(H, Eb_N0_list, args: SimulationArgs) -> typing.Tuple[np
|
|||||||
BERs.append(BER)
|
BERs.append(BER)
|
||||||
DFRs.append(DFR)
|
DFRs.append(DFR)
|
||||||
frame_errors_list.append(num_frames)
|
frame_errors_list.append(num_frames)
|
||||||
|
print(pd.DataFrame({"SNR": Eb_N0_list[:len(FERs)], "FER": FERs, "BER": BERs,
|
||||||
|
"DFR": DFRs, "frame_errors": frame_errors_list}))
|
||||||
|
|
||||||
return np.array(FERs), np.array(BERs), np.array(DFRs), np.array(frame_errors_list)
|
return pd.DataFrame({"SNR": Eb_N0_list, "FER": FERs, "BER": BERs,
|
||||||
|
"DFR": DFRs, "frame_errors": frame_errors_list})
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
@ -157,11 +163,10 @@ def main():
|
|||||||
target_frame_errors=args.target_frame_errors)
|
target_frame_errors=args.target_frame_errors)
|
||||||
|
|
||||||
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
|
SNRs = np.arange(args.snr[0], args.snr[1], args.snr[2])
|
||||||
FERs, BERs, DFRs, frame_errors_list = simulate_error_rates(
|
df = simulate_error_rates(H, SNRs, simulation_args)
|
||||||
H, SNRs, simulation_args)
|
|
||||||
|
|
||||||
df = pd.DataFrame({"SNR": SNRs, "FER": FERs, "BER": BERs, "DFR": DFRs, "frame_errors": frame_errors_list})
|
|
||||||
print(df)
|
print(df)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
@ -30,7 +30,7 @@ class ToyHomotopy:
|
|||||||
[t]]
|
[t]]
|
||||||
"""
|
"""
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def evaluate_H(y: np.ndarray) -> np.ndarray:
|
def H(y: np.ndarray) -> np.ndarray:
|
||||||
"""Evaluate H at y."""
|
"""Evaluate H at y."""
|
||||||
x1 = y[0]
|
x1 = y[0]
|
||||||
x2 = y[1]
|
x2 = y[1]
|
||||||
@ -43,7 +43,7 @@ class ToyHomotopy:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def evaluate_DH(y: np.ndarray) -> np.ndarray:
|
def DH(y: np.ndarray) -> np.ndarray:
|
||||||
"""Evaluate Jacobian of H at y."""
|
"""Evaluate Jacobian of H at y."""
|
||||||
x1 = y[0]
|
x1 = y[0]
|
||||||
x2 = y[1]
|
x2 = y[1]
|
||||||
|
|||||||
@ -20,20 +20,23 @@ class HomotopyGenerator:
|
|||||||
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
self.x_vars = [sp.symbols(f'x{i+1}') for i in range(self.num_vars)]
|
||||||
self.t = sp.symbols('t')
|
self.t = sp.symbols('t')
|
||||||
|
|
||||||
self.G = self._create_G()
|
self._F = self._create_F()
|
||||||
self.F = self._create_F()
|
self._F_lambda = self._create_F_lambda(self._F)
|
||||||
self.H = self._create_H()
|
# self._G = self._create_G(received)
|
||||||
self.DH = self._create_DH(self.H)
|
# self._G_lambda = self._create_G_lambda(self._G)
|
||||||
|
# self._H = self._create_H()
|
||||||
|
# self._H_lambda = self._create_H_lambda(self._H)
|
||||||
|
# self._DH = self._create_DH()
|
||||||
|
# self._DH_lambda = self._create_DH_lambda(self._DH)
|
||||||
|
|
||||||
self._H_lambda = self._create_H_lambda()
|
def update_received(self, received: np.ndarray):
|
||||||
self._DH_lambda = self._create_DH_lambda()
|
"""Update the received vector and recompute G."""
|
||||||
|
self._G = self._create_G(received)
|
||||||
def _create_G(self) -> List[sp.Expr]:
|
self._G_lambda = self._create_G_lambda(self._G)
|
||||||
G = []
|
self._H = self._create_H()
|
||||||
for var in self.x_vars:
|
self._H_lambda = self._create_H_lambda(self._H)
|
||||||
G.append(var)
|
self._DH = self._create_DH()
|
||||||
|
self._DH_lambda = self._create_DH_lambda(self._DH)
|
||||||
return G
|
|
||||||
|
|
||||||
def _create_F(self) -> sp.MutableMatrix:
|
def _create_F(self) -> sp.MutableMatrix:
|
||||||
F = []
|
F = []
|
||||||
@ -52,51 +55,63 @@ class HomotopyGenerator:
|
|||||||
groebner_basis = sp.groebner(F)
|
groebner_basis = sp.groebner(F)
|
||||||
return sp.MutableMatrix(groebner_basis)
|
return sp.MutableMatrix(groebner_basis)
|
||||||
|
|
||||||
def _create_H(self) -> List[sp.Expr]:
|
def _create_G(self, received) -> sp.MutableMatrix:
|
||||||
|
G = []
|
||||||
|
F_y = self._F_lambda(*received)
|
||||||
|
|
||||||
|
for f, f_y_i in zip(self._F, F_y):
|
||||||
|
G.append(f - f_y_i)
|
||||||
|
|
||||||
|
return sp.MutableMatrix(G)
|
||||||
|
|
||||||
|
def _create_H(self) -> sp.MutableMatrix:
|
||||||
H = []
|
H = []
|
||||||
|
|
||||||
for g, f in zip(self.G, self.F):
|
for f, g in zip(self._F, self._G):
|
||||||
H.append((1 - self.t) * g + self.t * f)
|
H.append((1 - self.t) * g + self.t * f)
|
||||||
|
|
||||||
return H
|
return sp.MutableMatrix(H)
|
||||||
|
|
||||||
def _create_DH(self, H: List[sp.Expr]) -> sp.MutableMatrix:
|
def _create_DH(self) -> sp.MutableMatrix:
|
||||||
all_vars = self.x_vars + [self.t]
|
all_vars = self.x_vars + [self.t]
|
||||||
DH = sp.Matrix([[sp.diff(expr, var)
|
DH = sp.Matrix([[sp.diff(expr, var)
|
||||||
for var in all_vars] for expr in self.H])
|
for var in all_vars] for expr in self._H])
|
||||||
|
|
||||||
return DH
|
return DH
|
||||||
|
|
||||||
def _create_H_lambda(self) -> Callable:
|
def _create_F_lambda(self, expr) -> Callable:
|
||||||
|
all_vars = self.x_vars
|
||||||
|
return sp.lambdify(all_vars, expr, 'numpy')
|
||||||
|
|
||||||
|
def _create_G_lambda(self, expr) -> Callable:
|
||||||
|
all_vars = self.x_vars
|
||||||
|
return sp.lambdify(all_vars, expr, 'numpy')
|
||||||
|
|
||||||
|
def _create_H_lambda(self, expr) -> Callable:
|
||||||
all_vars = self.x_vars + [self.t]
|
all_vars = self.x_vars + [self.t]
|
||||||
return sp.lambdify(all_vars, self.H, 'numpy')
|
return sp.lambdify(all_vars, expr, 'numpy')
|
||||||
|
|
||||||
def _create_DH_lambda(self) -> Callable:
|
def _create_DH_lambda(self, expr) -> Callable:
|
||||||
all_vars = self.x_vars + [self.t]
|
all_vars = self.x_vars + [self.t]
|
||||||
return sp.lambdify(all_vars, self.DH, 'numpy')
|
return sp.lambdify(all_vars, expr, 'numpy')
|
||||||
|
|
||||||
def evaluate_H(self, y: np.ndarray) -> np.ndarray:
|
def H(self, y):
|
||||||
"""
|
|
||||||
Evaluate H at point y.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
|
|
||||||
and t is the homotopy parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Array containing H evaluated at y.
|
|
||||||
"""
|
|
||||||
return np.array(self._H_lambda(*y))
|
return np.array(self._H_lambda(*y))
|
||||||
|
|
||||||
def evaluate_DH(self, y: np.ndarray) -> np.ndarray:
|
def DH(self, y):
|
||||||
"""
|
return np.array(self._DH_lambda(*y))
|
||||||
Evaluate the Jacobian of H at point y.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
y: Array of form [x1, x2, ..., xn, t] where xi are the variables
|
|
||||||
and t is the homotopy parameter.
|
|
||||||
|
|
||||||
Returns:
|
def main():
|
||||||
Matrix containing the Jacobian of H evaluated at y.
|
import utility
|
||||||
"""
|
H = utility.read_alist_file(
|
||||||
return np.array(self._DH_lambda(*y), dtype=float)
|
"/home/andreas/workspace/work/hiwi/ba-sw/codes/BCH_7_4.alist")
|
||||||
|
a = HomotopyGenerator(H)
|
||||||
|
|
||||||
|
y = np.array([0.1, 0.2, 0.9, 0.1, -0.8, -0.5, -1.0, 0])
|
||||||
|
|
||||||
|
print(a.DH(y))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
@ -18,9 +18,9 @@ class PathTracker:
|
|||||||
Information and Systems, vol. 15, no. 2, pp. 119-307, 2015
|
Information and Systems, vol. 15, no. 2, pp. 119-307, 2015
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, Homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
|
def __init__(self, homotopy, euler_step_size=0.05, euler_max_tries=10, newton_max_iter=5,
|
||||||
newton_convergence_threshold=0.001, sigma=1):
|
newton_convergence_threshold=0.001, sigma=1):
|
||||||
self.Homotopy = Homotopy
|
self._homotopy = homotopy
|
||||||
self._euler_step_size = euler_step_size
|
self._euler_step_size = euler_step_size
|
||||||
self._euler_max_tries = euler_max_tries
|
self._euler_max_tries = euler_max_tries
|
||||||
self._newton_max_iter = newton_max_iter
|
self._newton_max_iter = newton_max_iter
|
||||||
@ -47,7 +47,7 @@ class PathTracker:
|
|||||||
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
|
def _perform_euler_predictor_step(self, y, step_size) -> typing.Tuple[np.ndarray, np.ndarray]:
|
||||||
# Obtain y_prime
|
# Obtain y_prime
|
||||||
|
|
||||||
DH = self.Homotopy.evaluate_DH(y)
|
DH = self._homotopy.DH(y)
|
||||||
ns = scipy.linalg.null_space(DH)
|
ns = scipy.linalg.null_space(DH)
|
||||||
|
|
||||||
y_prime = ns[:, 0] * self._sigma
|
y_prime = ns[:, 0] * self._sigma
|
||||||
@ -70,10 +70,10 @@ class PathTracker:
|
|||||||
for _ in range(self._newton_max_iter):
|
for _ in range(self._newton_max_iter):
|
||||||
# Perform correction
|
# Perform correction
|
||||||
|
|
||||||
DH = self.Homotopy.evaluate_DH(y)
|
DH = self._homotopy.DH(y)
|
||||||
DH_pinv = np.linalg.pinv(DH)
|
DH_pinv = np.linalg.pinv(DH)
|
||||||
|
|
||||||
y = y - DH_pinv @ self.Homotopy.evaluate_H(y)
|
y = y - np.transpose(DH_pinv @ self._homotopy.H(y))[0]
|
||||||
|
|
||||||
# Check stopping criterion
|
# Check stopping criterion
|
||||||
|
|
||||||
@ -83,3 +83,6 @@ class PathTracker:
|
|||||||
prev_y = y
|
prev_y = y
|
||||||
|
|
||||||
raise RuntimeError("Newton corrector did not converge")
|
raise RuntimeError("Newton corrector did not converge")
|
||||||
|
|
||||||
|
def set_sigma(self, sigma):
|
||||||
|
self._sigma = sigma
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user