Compare commits

4 Commits

10 changed files with 438 additions and 174 deletions

View File

@@ -1,6 +1,5 @@
from banking_breakdown import document_builder
from banking_breakdown import statement_parser
from banking_breakdown import ui
from banking_breakdown import ui, regex_categorizer, statement_parser, \
document_builder
import argparse
@@ -10,6 +9,9 @@ def categorize_func(args):
df = pd.read_csv(args.i, delimiter=args.d)
if args.f is not None:
df = regex_categorizer.assign_categories(df, args.f)
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -17,7 +19,8 @@ def categorize_func(args):
def report_func(args):
print("Report")
report_data = statement_parser.parse_statement(args.i)
document_builder.build_document(report_data)
#

View File

@@ -14,6 +14,8 @@ def _serialize_report_data(report_data: types.ReportData):
report_data.net_income.to_csv('build/net_income.csv', index=False)
report_data.category_overview.to_csv('build/category_overview.csv',
index=False)
report_data.expenses_by_category.to_csv('build/expenses_by_category.csv',
index=False)
report_data.total_value.to_csv('build/total_value.csv', index=False)
report_data.detailed_balance.to_csv('build/detailed_balance.csv',
index=False)

View File

@@ -0,0 +1,54 @@
import pandas as pd
import json
def _is_str_column(s: pd.Series):
"""Check if the type of a pandas DataFrame column is str.
Taken from https://stackoverflow.com/a/67001213/3433817.
"""
if isinstance(s.dtype, pd.StringDtype):
# The series was explicitly created as a string series (Pandas>=1.0.0)
return True
elif s.dtype == 'object':
# Object series, check each value
return all((v is None) or isinstance(v, str) for v in s)
else:
return False
def _read_regex_dict(regex_file: str):
with open(regex_file, 'r') as f:
return json.load(f)
def assign_categories(df: pd.DataFrame, regex_file: str) -> pd.DataFrame:
if 'category' not in df.columns:
df['category'] = [' '] * len(df.index)
regex_dict = _read_regex_dict(regex_file)
df = df.fillna('')
for column in df.columns:
if not _is_str_column(df[column]):
continue
for category in regex_dict:
for regex in regex_dict[category]:
matched = df[column].str.contains(regex, regex=True)
df.loc[matched, 'category'] = category
return df
def main():
df = pd.read_csv('../res/bank_statement_2023_categorized.csv')
df = assign_categories(df, regex_file='../res/regexes.json')
print(df['category'])
if __name__ == "__main__":
main()

View File

@@ -6,94 +6,117 @@ import re
import numpy as np
# def _read_regex_dict(regex_file: str = "res/category_regexes.json"):
# with open(regex_file, 'r') as f:
# return json.load(f)
#
#
# def _tag_with_category(df: pd.DataFrame) -> pd.DataFrame:
# regex_dict = _read_regex_dict()
#
# return df
#
#
# def _compute_total_balance(df: pd.DataFrame) -> pd.DataFrame:
# stripped_df = pd.DataFrame(
# {'t': df["Valutadatum"], 'value': df["Saldo nach Buchung"]})
#
# stripped_df.index = stripped_df['t']
# gb = stripped_df.groupby(pd.Grouper(freq='M'))
#
# result = gb.tail(1)['value'].reset_index()
#
# return result
#
#
# def _compute_net_income(df: pd.DataFrame) -> pd.DataFrame:
# stripped_df = pd.DataFrame({'t': df["Valutadatum"], 'value': df["Betrag"]})
#
# stripped_df.index = stripped_df['t']
# gb = stripped_df.groupby(pd.Grouper(freq='M'))
#
# result = gb["value"].sum().reset_index()
# return result
#
#
# def _compute_category_overview(df: pd.DataFrame) -> pd.DataFrame:
# categories = ["Social life", "Other", "Food", "Hobbies",
# "Rent \\& Utilities", "Education", "Transportation"]
# values = np.array([10, 12, 53, 12, 90, 23, 32])
# values = values / values.sum() * 100
# values = np.round(values, decimals=1)
# values[-1] += 100 - np.sum(values)
#
# category_overview_df = pd.DataFrame(
# {"category": categories, "value": values})
#
# return category_overview_df
#
#
# def _compute_detailed_balance(df: pd.DataFrame) -> pd.DataFrame:
# return pd.DataFrame({'t': df["Valutadatum"],
# 'value': df["Saldo nach Buchung"]})
#
#
# def parse_statement(filename: str) -> types.ReportData:
# df = pd.read_csv(filename, delimiter=';', decimal=",")
# df["Valutadatum"] = pd.to_datetime(df["Valutadatum"], format='%d.%m.%Y')
#
# category_overview_df = _compute_category_overview(df)
# total_balance_df = _compute_total_balance(df)
# net_income_df = _compute_net_income(df)
# detailed_balance_df = _compute_detailed_balance(df)
#
# return types.ReportData(category_overview_df,
# net_income_df,
# total_balance_df,
# detailed_balance_df)
#
#
# def main():
# report_data = parse_statement("../res/bank_statement_2023.csv")
#
#
# if __name__ == "__main__":
# main()
def _escape_string(to_escape: str):
return to_escape.translate(str.maketrans({"&": r"\&"}))
def get_stripped_statement(filename: str) -> pd.DataFrame:
# df = pd.read_csv(filename, delimiter=';', decimal=",")
df = pd.read_csv(filename, delimiter=';')
df["Valutadatum"] = (pd.to_datetime(df["Valutadatum"], format='%d.%m.%Y')
.dt.strftime('%Y-%m-%d'))
def _compute_total_balance(df: pd.DataFrame) -> pd.DataFrame:
stripped_df = pd.DataFrame(
{'t': df["t"], 'value': df["balance"]})
result = pd.DataFrame({'t': df["Valutadatum"],
'other party': df["Name Zahlungsbeteiligter"],
'value': df["Betrag"],
'balance': df["Saldo nach Buchung"],
'category': [''] * len(df["Valutadatum"]),
'description': df["Buchungstext"],
'purpose': df["Verwendungszweck"]
})
stripped_df.index = stripped_df['t']
gb = stripped_df.groupby(pd.Grouper(freq='M'))
result = gb.tail(1)['value'].reset_index()
return result
def _compute_net_income(df: pd.DataFrame) -> pd.DataFrame:
df.index = df['t']
income_df = df.loc[df['value'] > 0]
expenses_df = df.loc[df['value'] < 0]
income_df = income_df.groupby(pd.Grouper(freq='M'))[
'value'].sum().reset_index().round(decimals=2)
expenses_df = expenses_df.groupby(pd.Grouper(freq='M'))[
'value'].sum().reset_index().round(decimals=2)
t = income_df['t']
income = income_df['value'].round(decimals=2)
expenses = expenses_df['value'].round(decimals=2)
net = (income + expenses).round(decimals=2)
result_df = pd.DataFrame(
{'t': t, 'income': income, 'expenses': expenses, 'net': net})
return result_df
def _compute_category_overview(df: pd.DataFrame) -> pd.DataFrame:
df = df.loc[df['value'] < 0]
df = df.drop('t', axis=1)
df = df.groupby(['category']).sum().reset_index()
values = (df['value'] / df['value'].sum() * 100).to_numpy()
values[-1] += 100 - np.sum(values)
values = np.round(values, decimals=1)
categories = [_escape_string(category) for category in df['category']]
category_overview_df = pd.DataFrame(
{"category": categories, "value": values})
category_overview_df = category_overview_df.sort_values('value',
ascending=False)
return category_overview_df
def _compute_expenses_by_category(complete_df: pd.DataFrame) -> pd.DataFrame:
complete_df = complete_df.loc[complete_df['value'] < 0].copy()
complete_df['value'] = -complete_df['value']
complete_df.index = complete_df['t']
complete_gb = complete_df.groupby(pd.Grouper(freq='M'))
categories = complete_df['category'].unique()
data_dict = {category: [] for category in categories}
for (month_date, month_df) in complete_gb:
month_df = month_df.drop('t', axis=1).reset_index().drop('t', axis=1)
category_df = month_df.groupby(['category']).sum().reset_index()
for _, row in category_df.iterrows():
data_dict[row['category']].append(row['value'])
non_listed = list(set(categories) - set(category_df['category']))
for category in non_listed:
data_dict[category].append(0)
result = pd.DataFrame(data_dict)
result = result.reindex(result.mean().sort_values(ascending=False).index,
axis=1)
result = result.round(decimals=2)
result['t'] = complete_gb.tail(1).drop('t', axis=1).reset_index()['t']
return result
def _compute_detailed_balance(df: pd.DataFrame) -> pd.DataFrame:
return pd.DataFrame({'t': df["t"],
'value': df["balance"]})
def parse_statement(filename: str) -> types.ReportData:
df = pd.read_csv(filename)
df["t"] = pd.to_datetime(df["t"], format='%Y-%m-%d')
category_overview_df = _compute_category_overview(df)
total_balance_df = _compute_total_balance(df)
net_income_df = _compute_net_income(df)
detailed_balance_df = _compute_detailed_balance(df)
expenses_by_category_df = _compute_expenses_by_category(df)
return types.ReportData(category_overview_df,
expenses_by_category_df,
net_income_df,
total_balance_df,
detailed_balance_df, )
def main():
report_data = parse_statement("../res/bank_statement_2023_categorized.csv")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,5 @@
{
"asdf": [
"Kinemic"
]
}

View File

@@ -5,6 +5,7 @@ import pandas as pd
@dataclass
class ReportData:
category_overview: pd.DataFrame
expenses_by_category: pd.DataFrame
net_income: pd.DataFrame
total_value: pd.DataFrame
detailed_balance: pd.DataFrame

View File

@@ -35,16 +35,16 @@ class HeaderContextMenu(QMenu):
"""Context menu appearing when right-clicking the header of the QTableView.
"""
def __init__(self, column, pandas_model: PandasModel, callback=None,
def __init__(self, column_index, pandas_model: PandasModel, callback=None,
parent=None):
super(HeaderContextMenu, self).__init__()
self._column = column
self._pandas_model = pandas_model
self._callback = callback
self._column_index = column_index
self._column_text \
= self._pandas_model.headerData(self._column,
= self._pandas_model.headerData(self._column_index,
Qt.Orientation.Horizontal)
# Define assign action
@@ -85,9 +85,11 @@ class HeaderContextMenu(QMenu):
return
if (new_name != self._column_text) and (new_name != ''):
df = self._pandas_model.get_dataframe()
df = df.rename(columns={self._column_text: new_name})
self._pandas_model.set_dataframe(df)
try:
self._pandas_model.rename_column(self._column_text, new_name)
except:
QMessageBox.warning(self, "No action performed",
"An error occurred.")
if self._callback:
self._callback()
@@ -98,10 +100,7 @@ class HeaderContextMenu(QMenu):
f" column '{self._column_text}'?")
if button == QMessageBox.StandardButton.Yes:
df = self._pandas_model.get_dataframe()
df = df.iloc[:, [j for j, c
in enumerate(df.columns) if j != self._column]]
self._pandas_model.set_dataframe(df)
self._pandas_model.delete_column_by_index(self._column_index)
if self._callback:
self._callback()
@@ -119,14 +118,7 @@ class HeaderContextMenu(QMenu):
if not flag:
return
column_titles = list(df.columns)
index1, index2 = column_titles.index(
self._column_text), column_titles.index(other_name)
column_titles[index1], column_titles[index2] \
= column_titles[index2], column_titles[index1]
df = df.reindex(columns=column_titles)
self._pandas_model.set_dataframe(df)
self._pandas_model.switch_columns(self._column_text, other_name)
if self._callback:
self._callback()
@@ -139,14 +131,12 @@ class HeaderContextMenu(QMenu):
if not flag:
return
df = self._pandas_model.get_dataframe()
try:
df[self._column_text] \
= pd.to_datetime(df[self._column_text], format=date_format)
self._pandas_model.assign_date_column(self._column_text,
date_format)
except:
QMessageBox.warning(self, "No action performed",
"An error occurred.")
self._pandas_model.set_dataframe(df)
if self._callback:
self._callback()
@@ -160,19 +150,12 @@ class HeaderContextMenu(QMenu):
if not flag:
return
df = self._pandas_model.get_dataframe()
try:
if decimal_sep == ',':
df[self._column_text] \
= df[self._column_text].str.replace(',', '.').astype(float)
else:
df[self._column_text] = df[self._column_text].astype(float)
self._pandas_model.assign_float_column(self._column_text,
decimal_sep)
except:
QMessageBox.warning(self, "No action performed",
"An error occurred.")
self._pandas_model.set_dataframe(df)
if self._callback:
self._callback()

View File

@@ -45,6 +45,7 @@ class MainWindow(QMainWindow):
self._proxy_model.setSourceModel(self._pandas_model)
self._table_view.setModel(self._proxy_model)
self._proxy_model.setSortRole(Qt.ItemDataRole.EditRole)
self._proxy_model.setDynamicSortFilter(False)
# Set event handlers
@@ -98,6 +99,13 @@ class MainWindow(QMainWindow):
len(col))
self._table_view.setColumnWidth(i, max_char * 10)
def _assign_category_to_selected_transactions(self, category: str):
indexes = self._table_view.selectionModel().selectedRows()
row_indices = [self._table_view.model().mapToSource(index).row()
for index in indexes]
self._pandas_model.assign_category(category, row_indices)
#
# List data updates
#
@@ -107,16 +115,11 @@ class MainWindow(QMainWindow):
self._list_widget.addItem(category)
def _update_categories_from_dataframe(self):
df = self._pandas_model.get_dataframe()
if 'category' not in df.columns:
df['category'] = [' '] * len(df.index)
df_categories = df['category'].unique()
df_categories = self._pandas_model.get_categories()
current_categories = [self._list_widget.item(x).text() for x
in range(self._list_widget.count())]
missing = list(set(df_categories) - set(current_categories))
self._add_categories([category for category
in missing if category != ' '])
@@ -135,19 +138,19 @@ class MainWindow(QMainWindow):
warning_item.hide()
self._warning_layout.removeItem(warning_item)
df = self._pandas_model.get_dataframe()
columns = self._pandas_model.get_columns()
if 't' not in df.columns:
if 't' not in columns:
self._add_warning_item(
"The column 't' does not exist. Please rename the column"
" containing the dates of the transactions to 't'.")
if 'value' not in df.columns:
if 'value' not in columns:
self._add_warning_item(
"The column 'value' does not exist. Please rename the column"
" containing the values of the transactions to 'value'.")
if 'balance' not in df.columns:
if 'balance' not in columns:
self._add_warning_item(
"The column 'balance' does not exist. Please rename the column"
" containing the balance after each transaction to 'balance'")
@@ -159,7 +162,7 @@ class MainWindow(QMainWindow):
def _handle_header_right_click(self, pos):
column = self._table_view.horizontalHeader().logicalIndexAt(pos)
context = HeaderContextMenu(parent=self, column=column,
context = HeaderContextMenu(parent=self, column_index=column,
pandas_model=self._pandas_model,
callback=self._dataframe_update_callback)
context.exec(self.sender().mapToGlobal(pos))
@@ -187,35 +190,19 @@ class MainWindow(QMainWindow):
f"Are you sure you want to delete"
f" category '{selected_item.text()}'?")
df = self._pandas_model.get_dataframe()
if 'category' not in df.columns:
df['category'] = [' '] * len(df.index)
if button == QMessageBox.StandardButton.Yes:
df.loc[df['category'] == selected_item.text(), 'category'] = ' '
self._pandas_model.delete_category(selected_item.text())
self._list_widget.takeItem(self._list_widget.row(selected_item))
self._pandas_model.set_dataframe(df)
def _handle_clear_click(self):
self._assign_category(' ')
def _assign_category(self, category: str):
indexes = self._table_view.selectionModel().selectedRows()
row_indices = [self._table_view.model().mapToSource(index).row()
for index in indexes]
df = self._pandas_model.get_dataframe()
df.loc[row_indices, 'category'] = category
self._pandas_model.set_dataframe(df)
self._assign_category_to_selected_transactions(' ')
def _handle_apply_click(self):
category = self._list_widget.selectedItems()[0].text()
self._assign_category(category)
self._assign_category_to_selected_transactions(category)
def _handle_item_double_click(self, item):
self._assign_category(item.text())
self._assign_category_to_selected_transactions(item.text())
def _handle_save(self):
filename, _ = QFileDialog.getSaveFileName(self, 'Save File')

View File

@@ -1,3 +1,5 @@
import typing
import numpy
import pandas as pd
from PyQt6 import QtCore
@@ -20,9 +22,10 @@ class PandasModel(QtCore.QAbstractTableModel):
self._data = pd.DataFrame()
self._data_str = pd.DataFrame()
self._horizontalHeaders = None
#
# Overloaded functions
#
def rowCount(self, parent=None):
return len(self._data_str.values)
@@ -55,14 +58,112 @@ class PandasModel(QtCore.QAbstractTableModel):
and (role == Qt.ItemDataRole.DisplayRole)):
return super().headerData(section, orientation, role)
return self._horizontalHeaders[section]
return self._data_str.columns[section]
# Other functions
#
# Manipulate categories
#
def assign_category(self, category, row_indices):
if 'category' not in self._data.columns:
self.create_column('category')
self._data.loc[row_indices, 'category'] = category
self._data_str = _get_str_dataframe(self._data)
for row_index in row_indices:
start_index = self.index(row_index, 0)
stop_index = self.index(row_index, len(self._data.columns) - 1)
self.dataChanged.emit(start_index, stop_index)
def delete_category(self, category):
if 'category' not in self._data.columns:
self.create_column('category')
row_indices = self._data.loc[self._data['category'] == category].index
self.assign_category(' ', row_indices)
def get_categories(self) -> typing.List[str]:
if 'category' not in self._data.columns:
self.create_column('category')
return self._data['category'].unique()
#
# Manipulate columns
#
def create_column(self, column, initial_value=' '):
self._data[column] = [initial_value] * len(self._data.index)
self._data_str = _get_str_dataframe(self._data)
self.layoutAboutToBeChanged.emit()
self.layoutChanged.emit()
def delete_column_by_index(self, column_index):
self._data \
= self._data.iloc[:, [j for j, c in enumerate(self._data.columns)
if j != column_index]]
self._data_str = _get_str_dataframe(self._data)
self.layoutAboutToBeChanged.emit()
self.layoutChanged.emit()
def rename_column(self, old_name, new_name):
if new_name in self._data.columns:
raise Exception(
f"A column with the name '{new_name}' already exists.")
self._data = self._data.rename(columns={old_name: new_name})
self._data_str = _get_str_dataframe(self._data)
column_index = self._data.columns.get_loc(new_name)
self.headerDataChanged.emit(Qt.Orientation.Horizontal,
column_index, column_index)
def switch_columns(self, column1, column2):
column_titles = list(self._data.columns)
index1, index2 \
= column_titles.index(column1), column_titles.index(column2)
column_titles[index1], column_titles[index2] \
= column_titles[index2], column_titles[index1]
self._data = self._data.reindex(columns=column_titles)
self._data_str = _get_str_dataframe(self._data)
self.layoutAboutToBeChanged.emit()
self.layoutChanged.emit()
def get_columns(self) -> typing.List[str]:
return list(self._data.columns)
def assign_float_column(self, column, decimal_sep):
if decimal_sep == ',':
self._data[column] \
= self._data[column].str.replace(',', '.').astype(float)
else:
self._data[column] = self._data[column].astype(float)
self._data_str = _get_str_dataframe(self._data)
column_index = self._data.columns.get_loc(column)
start_index = self.index(0, column_index)
stop_index = self.index(len(self._data.index), column_index)
self.dataChanged.emit(start_index, stop_index)
def assign_date_column(self, column, date_format):
self._data[column] \
= pd.to_datetime(self._data[column], format=date_format)
self._data_str = _get_str_dataframe(self._data)
#
# Directly access dataframe
#
def set_dataframe(self, df):
self._data = df
self._data_str = _get_str_dataframe(df)
self._horizontalHeaders = list(df.columns)
self.layoutAboutToBeChanged.emit()
self.layoutChanged.emit()

View File

@@ -157,7 +157,7 @@
]
% Dummy plot to set x axis ticks
\addplot[draw=none]
table[col sep=comma, x=t, y=value]
table[col sep=comma, x=t, y=net]
{net_income.csv};
% Dummy plot to set x axis scale
@@ -165,11 +165,17 @@
table[col sep=comma, x=t, y expr=0]
{detailed_balance.csv};
\addplot[ybar, color=scol2, fill=scol2, line width=1pt]
table[col sep=comma, x=t, y=value, discard if lt={value}{0}]
\addplot[ybar, bar width=0.4cm, draw=none, fill=scol2!30, line width=1pt]
table[col sep=comma, x=t, y=income]
{net_income.csv};
\addplot[ybar, color=scol0, fill=scol0, line width=1pt]
table[col sep=comma, x=t, y=value, discard if gt={value}{0}]
\addplot[ybar, bar width=0.4cm, draw=none, fill=scol0!30, line width=1pt]
table[col sep=comma, x=t, y=expenses]
{net_income.csv};
\addplot[ybar, bar width=0.3cm, draw=none, fill=scol0, line width=1pt]
table[col sep=comma, x=t, y=net, discard if gt={net}{0}]
{net_income.csv};
\addplot[ybar, bar width=0.3cm, draw=none, fill=scol2, line width=1pt]
table[col sep=comma, x=t, y=net, discard if lt={net}{0}]
{net_income.csv};
\end{axis}
\end{tikzpicture}
@@ -211,6 +217,105 @@
\caption{Development of account balance over time}
\end{figure}
\begin{figure}
\centering
\csvautotabular{net_income.csv}
\end{figure}
\begin{figure}[H]
\centering
% Read table
\pgfplotstableread[col sep=comma]{expenses_by_category.csv}\expbycattable
\pgfplotstablegetcolsof{\expbycattable}
\pgfmathtruncatemacro\NumCols{\pgfplotsretval-1}
\begin{subfigure}[c]{\textwidth}
\centering
\begin{tikzpicture}
\begin{axis}[
stack plots=y,
area style,
date coordinates in=x,
width=\textwidth,
height=0.375\textwidth,
xticklabel=\month.\shortyear{\year},
xtick=data,
enlargelimits=false,
xticklabel style={
rotate=60,
anchor=near xticklabel,
},
legend columns=5,
legend style={at={(0.5,-0.6)},anchor=south},
ylabel={Expenses in €},
ymin=0,
]
% For each
\pgfplotsinvokeforeach{0,...,\NumCols/2 -1}{
% Define color
\pgfmathparse{1000 / (\NumCols/2 -1) * #1}
\extractcolormapcolor{tempcol#1}{\pgfmathresult}
% Add plot
\addplot+[tempcol#1]
table[col sep=comma, x=t, y index=#1]
{\expbycattable} \closedcycle;
% Add legend entry (https://tex.stackexchange.com/a/405018)
\pgfplotstablegetcolumnnamebyindex{#1}\of{\expbycattable}\to\pgfplotsretval
\expandafter\addlegendentry\expandafter{\pgfplotsretval}
}
\end{axis}
\end{tikzpicture}
\end{subfigure}\\[1em]
\begin{subfigure}[c]{\textwidth}
\centering
\begin{tikzpicture}
\begin{axis}[
stack plots=y,
area style,
date coordinates in=x,
width=\textwidth,
height=0.375\textwidth,
xticklabel=\month.\shortyear{\year},
xtick=data,
enlargelimits=false,
xticklabel style={
rotate=60,
anchor=near xticklabel,
},
legend columns=5,
legend style={at={(0.5,-0.6)},anchor=south},
ylabel={Expenses in €},
ymin=0,
]
% For each
\pgfplotsinvokeforeach{\NumCols/2,...,\NumCols-1}{
% Define color
\pgfmathparse{1000 * (#1 - \NumCols/2) / (\NumCols-1 - \NumCols/2)}
\extractcolormapcolor{tempcol#1}{\pgfmathresult}
% Add plot
\addplot+[tempcol#1]
table[col sep=comma, x=t, y index=#1]
{\expbycattable} \closedcycle;
% Add legend entry (https://tex.stackexchange.com/a/405018)
\pgfplotstablegetcolumnnamebyindex{#1}\of{\expbycattable}\to\pgfplotsretval
\expandafter\addlegendentry\expandafter{\pgfplotsretval}
}
\end{axis}
\end{tikzpicture}
\end{subfigure}
\caption{Expenses by category}
\end{figure}
\end{document}