-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathservice.py
142 lines (121 loc) · 4.56 KB
/
service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from datetime import datetime
from typing import List
from sqlmodel import Session, text
from core import engine
from model import Transaction, User
from schema import Balance, CreateTransaction, TransactionResponse, UserSplit
from util import convert_transaction_to_ledger
def create_user(user_email: str) -> dict:
new_user = User(email=user_email)
with Session(engine) as session:
try:
session.add(new_user)
session.commit()
session.refresh(new_user)
return {"id": new_user.id, "email": new_user.email}
except Exception:
session.rollback()
raise ValueError("Email already registered")
def get_user_balance(user_id: int) -> List[Balance]:
with Session(engine) as session:
query = text(
"""
SELECT
user_id,
SUM(total_amount) AS total_amount
FROM (
SELECT
l.to_user_id AS user_id,
l.amount AS total_amount
FROM ledgers l
INNER JOIN transactions t ON l.transaction_id = t.id
WHERE t.deleted_at IS NULL
AND l.from_user_id = :user_id
UNION ALL
SELECT
l.from_user_id AS user_id,
-l.amount AS total_amount
FROM ledgers l
INNER JOIN transactions t ON l.transaction_id = t.id
WHERE t.deleted_at IS NULL
AND l.to_user_id = :user_id
) subquery
GROUP BY user_id
HAVING SUM(total_amount) != 0;
"""
)
result = session.execute(query, {"user_id": user_id})
balance_rows = result.fetchall()
return [
Balance(user_id=row.user_id, total_amount=row.total_amount)
for row in balance_rows
]
def clear_user_balance(user_id: int) -> None:
balances = get_user_balance(user_id)
for balance in balances:
if balance.total_amount > 0:
create_transaction(
CreateTransaction(
description="Clearing Balance",
total_amount=balance.total_amount,
split_type="uneven",
computation_type="amount",
from_users=[
UserSplit(user_id=balance.user_id, value=balance.total_amount)
],
to_users=[UserSplit(user_id=user_id, value=balance.total_amount)],
)
)
elif balance.total_amount < 0:
create_transaction(
CreateTransaction(
description="Clearing Balance",
total_amount=-balance.total_amount,
split_type="uneven",
computation_type="amount",
from_users=[
UserSplit(user_id=user_id, value=-balance.total_amount)
],
to_users=[
UserSplit(user_id=balance.user_id, value=-balance.total_amount)
],
)
)
def update_transaction(
transaction_id: int,
transaction: CreateTransaction,
) -> TransactionResponse:
with Session(engine) as session:
old_transaction = session.get(Transaction, transaction_id)
if not old_transaction:
raise ValueError("Transaction not found")
if old_transaction.deleted_at is not None:
raise ValueError("Transaction already deleted")
old_transaction.deleted_at = datetime.utcnow()
session.add(old_transaction)
session.commit()
return create_transaction(transaction)
def create_transaction(
transaction: CreateTransaction,
) -> TransactionResponse:
ledger_entries = convert_transaction_to_ledger(transaction)
with Session(engine) as session:
transaction = Transaction(
description=transaction.description,
total_amount=transaction.total_amount,
)
session.add(transaction)
session.commit()
session.refresh(transaction)
for entry in ledger_entries:
entry.transaction_id = transaction.id
session.add(entry)
session.commit()
for ledger_entry in ledger_entries:
session.refresh(ledger_entry)
return TransactionResponse(
id=transaction.id,
description=transaction.description,
total_amount=transaction.total_amount,
ledgers=ledger_entries,
)