Files
rohde-engineering-competition/fft/vector_fft.py
2026-02-18 21:32:22 +01:00

223 lines
7.7 KiB
Python

import numpy as np
import sys
import typing
buffer = np.zeros(1024).astype('float32')
# TODO temp memory
class MemoryWrapper:
"""Class allowing for easier access to the memory buffer."""
def __init__(self, N0: int, imag_offset: int):
self._current_offset = 0
self._num_sub_ffts = 0
self._N0 = N0
self._imag_offset = imag_offset
def set_kN(self, k, num_sub_ffts):
N = self._N0 / num_sub_ffts
i = bit_reverse(k, int(np.log2(N)))
self._current_offset = i * num_sub_ffts
self._num_sub_ffts = num_sub_ffts
@property
def OddReal(self) -> np.array:
start_index = self._current_offset + self._num_sub_ffts
end_index = self._current_offset + self._num_sub_ffts * 2
return buffer[start_index:end_index]
@OddReal.setter
def OddReal(self, o):
start_index = self._current_offset + self._num_sub_ffts
end_index = self._current_offset + self._num_sub_ffts * 2
buffer[start_index:end_index] = o
@property
def OddImag(self) -> np.array:
start_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts
end_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts * 2
return buffer[start_index:end_index]
@OddImag.setter
def OddImag(self, o):
start_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts
end_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts * 2
buffer[start_index: end_index] = o
@property
def EvenReal(self) -> np.array:
start_index = self._current_offset
end_index = self._current_offset + self._num_sub_ffts
return buffer[start_index:end_index]
@EvenReal.setter
def EvenReal(self, e):
start_index = self._current_offset
end_index = self._current_offset + self._num_sub_ffts
buffer[start_index:end_index] = e
@property
def EvenImag(self) -> np.array:
start_index = self._imag_offset + self._current_offset
end_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts
return buffer[start_index:end_index]
@EvenImag.setter
def EvenImag(self, e):
start_index = self._imag_offset + self._current_offset
end_index = self._imag_offset \
+ self._current_offset + self._num_sub_ffts
buffer[start_index:end_index] = e
# def mul_n(l_start: int, r_start: int, out_start, num: int):
# for i in range(num):
# buffer[out_start + i] = buffer[l_start + i] * buffer[r_start * i]
def mul_32_factor(l_start: int, factor: float, out_start, num: int):
if num > 32:
raise ValueError('A maximum of 32 elements can be multiplied')
for i in range(num):
buffer[out_start + i] = buffer[l_start + i] * factor
def add_32(l_start: int, r_start: int, out_start, num: int):
if num > 32:
raise ValueError('A maximum of 32 elements can be added')
for i in range(num):
buffer[out_start + i] = buffer[l_start + i] + buffer[r_start * i]
def sub_32(l_start: int, r_start: int, out_start: int, num: int):
if num > 32:
raise ValueError('A maximum of 32 elements can be subtracted')
for i in range(num):
buffer[out_start + i] = buffer[l_start + i] - buffer[r_start * i]
def bit_reverse(val: int, num_bits: int):
"""Reverse the bits of an input integer.
:param val: Value to reverse
:param num_bits: Number of bits to consider
"""
b = '{:0{width}b}'.format(val, width=num_bits)
return int(b[::-1], 2)
def fft_reorder(x: np.array, N: int) -> np.array:
"""Reorder the elements of an array so that the indices are bit-inverse
values of the original ones.
"""
result = np.zeros(x.size)
for i in range(x.size):
result[i] = x[bit_reverse(i, int(np.log2(N)))]
return result
def fft(x_re: np.array, x_im: np.array) -> typing.Tuple[np.array, np.array]:
"""Perform the fft algorithm on a given vector.
"""
imag_mem_offset = x_re.size
temp_mem_offset = x_re.size * 2
# Copy data to hardware buffer
for i in range(x_re.size):
buffer[i] = x_re[i]
# for i in range(x_im.size):
# buffer[i + imag_offset] = x_im[i]
N0 = x_re.size
mem_wrapper = MemoryWrapper(N0, x_re.size)
num_sub_ffts = N0
while num_sub_ffts > 0:
N = N0 / num_sub_ffts
for k in range(int(N // 2)):
# mem_wrapper.set_kN(k, num_sub_ffts)
#
# factor_re = np.exp(-2j * np.pi * k / N).real
# factor_im = np.exp(-2j * np.pi * k / N).imag
#
# temp_re = factor_re * mem_wrapper.OddReal \
# - factor_im * mem_wrapper.OddImag
# temp_im = factor_re * mem_wrapper.OddImag \
# + factor_im * mem_wrapper.OddReal
#
# mem_wrapper.OddReal = mem_wrapper.EvenReal - temp_re
# mem_wrapper.OddImag = mem_wrapper.EvenImag - temp_im
# mem_wrapper.EvenReal = mem_wrapper.EvenReal + temp_re
# mem_wrapper.EvenImag = mem_wrapper.EvenImag + temp_im
i = bit_reverse(k, int(np.log2(N)))
fft_offset = i * num_sub_ffts
odd_offset = num_sub_ffts
factor_re = np.exp(-2j * np.pi * k / N).real
factor_im = np.exp(-2j * np.pi * k / N).imag
# Compute temp variables
# TODO: Don't hardcode 32
temp_re_start = temp_mem_offset
temp_im_start = temp_re_start + imag_mem_offset
even_re_start = fft_offset
even_im_start = fft_offset + imag_mem_offset
odd_re_start = fft_offset + odd_offset
odd_im_start = fft_offset + imag_mem_offset
mul_32_factor(l_start=odd_re_start, factor=factor_re,
out_start=temp_re_start, num=4)
mul_32_factor(l_start=odd_im_start, factor=-factor_im,
out_start=temp_im_start, num=4)
add_32(l_start=temp_re_start, r_start=temp_mem_offset + 32,
out_start=temp_mem_offset, num=4)
mul_32_factor(l_start=(fft_offset + odd_offset + imag_mem_offset),
factor=factor_re,
out_start=temp_mem_offset + 32, num=4)
mul_32_factor(l_start=(fft_offset + odd_offset + imag_mem_offset),
factor=factor_im,
out_start=temp_mem_offset + 64, num=4)
add_32(l_start=temp_mem_offset + 32, r_start=temp_mem_offset + 64,
out_start=temp_mem_offset + 32, num=4)
num_sub_ffts = num_sub_ffts // 2
buffer[0:imag_mem_offset] = fft_reorder(buffer[0:imag_mem_offset],
imag_mem_offset)
buffer[imag_mem_offset:2 * imag_mem_offset] = fft_reorder(
buffer[imag_mem_offset:2 * imag_mem_offset], imag_mem_offset)
return buffer[0:imag_mem_offset], buffer[
imag_mem_offset:imag_mem_offset * 2]
def main():
x_re = np.arange(8)
x_im = np.zeros(x_re.size)
X_re, X_im = fft(x_re, x_im)
# for k, (X_k, X_exp_k) in enumerate(zip(X_re, np.fft.fft(x_re))):
# print(
# f"X[{k:02}]: {X_k.real:07.2f} + j({X_k.imag:07.2f})\t\tX_exp["
# f"{k:02}]: {X_exp_k.real:07.2f} + j({X_exp_k.imag:07.2f})")
for k, ((X_re_k, X_im_k), X_exp_k) in enumerate(
zip(zip(X_re, X_im), np.fft.fft(x_re))):
print(
f"X[{k:02}]: {X_re_k:07.2f} + j({X_im_k:07.2f})\t\tX_exp["
f"{k:02}]: {X_exp_k.real:07.2f} + j({X_exp_k.imag:07.2f})")
if __name__ == "__main__":
main()