-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaintedBlockState.py
209 lines (178 loc) · 8.71 KB
/
taintedBlockState.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from taintedAddressBalances import TaintedAddressBalances
from decimal import *
from os.path import exists
import json
from web3 import Web3
from sortedcontainers import SortedDict
from datetime import datetime
from taintedTokenBalance import TaintedTokenBalance
class TaintedBlockState:
def __init__(self, blockNumber):
self.blockNumber = blockNumber
self.blockISODate = '0'
self.taintedEOAs = {}
self.taintedContracts = {}
self.taintedServices = {}
self.seedTaintedTxns = {}
self.blockStateHistory = SortedDict()
def loadState(self,jsonfile):
# check if the file exists
if not exists(jsonfile):
print("file does not exist")
return False
json_str = json.loads(open(jsonfile).read())
self.blockNumber = json_str['blockNumber']
self.seedTaintedTxns = json_str['seedTaintedTxns']
for ta in json_str['taintedEOAs']:
self.taintedEOAs[ta['address']] = TaintedAddressBalances('0x0')
self.taintedEOAs[ta['address']].loadTaintedAddressBalances(ta)
for ta in json_str['taintedServices']:
self.taintedServices[ta['address']] = TaintedAddressBalances('0x0')
self.taintedServices[ta['address']].loadTaintedAddressBalances(ta)
for ta in json_str['taintedContracts']:
self.taintedContracts[ta['address']] = TaintedAddressBalances('0x0')
self.taintedContracts[ta['address']].loadTaintedAddressBalances(ta)
def toString(self):
desc_str = '{}'.format(self.blockNumber)
for ta in self.taintedEOAs:
desc_str = '{}\n{}'.format(desc_str,self.taintedEOAs[ta].toString())
return desc_str
def addTaintedEOA(self,addr):
if not addr in self.taintedEOAs:
self.taintedEOAs[addr] = TaintedAddressBalances(addr)
def addTaintedService(self,addr):
if not addr in self.taintedServices:
self.taintedServices[addr] = TaintedAddressBalances(addr)
def addTaintedContract(self,addr):
if not addr in self.taintedContracts:
self.taintedContracts[addr] = TaintedAddressBalances(addr)
def updateAddressName(self,addr,name):
if addr in self.taintedEOAs:
self.taintedEOAs[addr].name = name
elif addr in self.taintedContracts:
self.taintedContracts[addr].name = name
elif addr in self.taintedServices:
self.taintedServices[addr].name = name
def updateAddressCategory(self,addr,category):
if addr in self.taintedEOAs:
self.taintedEOAs[addr].category = category
elif addr in self.taintedContracts:
self.taintedContracts[addr].category = category
elif addr in self.taintedServices:
self.taintedServices[addr].category = category
def updateBlockNumber(self,blocknumber):
self.blockNumber = blocknumber
def updateBlockISODate(self,timestamp):
self.blockISODate = datetime.fromtimestamp(timestamp).isoformat()
# update the block history to save the state at a given point
# in time
def updateBlockHistory(self):
# is this the latest block, if yes just write it as the history
if len(self.blockStateHistory) == 0:
self.blockStateHistory[self.blockNumber] = self.toJson()
elif self.blockNumber > self.blockStateHistory.peekitem()[0]:
self.blockStateHistory[self.blockNumber] = self.toJson()
# check if processing this transaction would taint the receiver
def isTaintedDebit(self,addr,token,value):
return self.taintedEOAs[Web3.to_checksum_address(addr)].token_balances[token].isTaintedDebit(value)
def addTokenToAddress(self,token,decimal,addr):
addr = Web3.to_checksum_address(addr)
if addr in self.taintedEOAs:
if not token in self.taintedEOAs[addr].token_balances:
self.taintedEOAs[addr].token_balances[token] = TaintedTokenBalance(token,int(decimal))
elif addr in self.taintedServices:
if not token in self.taintedServices[addr].token_balances:
self.taintedServices[addr].token_balances[token] = TaintedTokenBalance(token,int(decimal))
elif addr in self.taintedContracts:
if not token in self.taintedContracts[addr].token_balances:
self.taintedContracts[addr].token_balances[token] = TaintedTokenBalance(token,int(decimal))
def processDebit(self,addr,token,value):
addr = Web3.to_checksum_address(addr)
t_debit, u_debit, q = self.taintedEOAs[addr].token_balances[token].processDebit(value)
return t_debit, u_debit, q
def processCredit(self,addr,token,q):
addr = Web3.to_checksum_address(addr)
t_credit = 0
u_credit = 0
if addr in self.taintedEOAs:
t_credit, u_credit = self.taintedEOAs[addr].token_balances[token].processCredit(q)
elif addr in self.taintedContracts:
t_credit, u_credit = self.taintedContracts[addr].token_balances[token].processCredit(q)
elif addr in self.taintedServices:
t_credit, u_credit = self.taintedServices[addr].token_balances[token].processCredit(q)
else:
print('ADDRESS NOT IN ANY TAINTED LIST')
return t_credit, u_credit
def processGasDebit(self,addr,token,value):
addr = Web3.to_checksum_address(addr)
t_debit, u_debit, q = self.taintedEOAs[addr].token_balances[token].processDebit(value)
self.taintedEOAs[addr].recordGas(t_debit)
return t_debit, u_debit, q
def getTokenBalances(self,addr):
addr = Web3.to_checksum_address(addr)
if addr in self.taintedEOAs:
return self.taintedEOAs[addr].token_balances
if addr in self.taintedContracts:
return self.taintedContracts[addr].token_balances
if addr in self.taintedServices:
return self.taintedServices[addr].token_balances
def toJson(self):
ta_jsonstr = ''
ta_services_jsonstr = ''
ta_contracts_jsonstr = ''
for ta in self.taintedEOAs:
if ta_jsonstr == '':
ta_jsonstr = '{}'.format(self.taintedEOAs[ta].toJson())
else:
ta_jsonstr = '{},\n{}'.format(ta_jsonstr,self.taintedEOAs[ta].toJson())
for ta in self.taintedServices:
if ta_services_jsonstr == '':
ta_services_jsonstr = '{}'.format(self.taintedServices[ta].toJson())
else:
ta_services_jsonstr = '{},\n{}'.format(ta_services_jsonstr,self.taintedServices[ta].toJson())
for ta in self.taintedContracts:
if ta_contracts_jsonstr == '':
ta_contracts_jsonstr = '{}'.format(self.taintedContracts[ta].toJson())
else:
ta_contracts_jsonstr = '{},\n{}'.format(ta_contracts_jsonstr,self.taintedContracts[ta].toJson())
jsonstr = '{{"timeStamp":"{}","blockNumber":"{}", "seedTaintedTxns":[], "taintedServices":[\n{}\n], "taintedContracts":[\n{}\n], "taintedEOAs":[\n{}\n]}}'.format(self.blockISODate,self.blockNumber,ta_services_jsonstr,ta_contracts_jsonstr,ta_jsonstr)
return jsonstr
# check if there is a tainted txn q associated with this txn
def getTaintedTxnQueue(self,txn):
q = []
if not 'dummy' in txn:
for st in self.seedTaintedTxns:
if st['txn']['blockNumber'] == txn['blockNumber'] and st['txn']['hash'] == txn['hash'] and st['txn']['value'] == txn['value'] and st['txn']['transactionIndex'] == txn['transactionIndex']:
q = st['queue']
return q
def isSeedTxn(self,txn):
if not 'dummy' in txn:
for st in self.seedTaintedTxns:
if st['txn']['hash'] == txn['hash']:
return True
return False
def isTaintedEOA(self,addr):
if Web3.to_checksum_address(addr) in self.taintedEOAs:
return True
return False
def isTaintedAddr(self,addr):
addr = Web3.to_checksum_address(addr)
if addr in self.taintedEOAs or addr in self.taintedContracts or addr in self.taintedServices:
return True
return False
##--------------
# UNIT TESTS
def main():
testq = TaintedBlockState('0')
testq.loadState('./TestData/testbs.json')
print(testq.toString())
#testq.token_balances['ETH'].processCredit('50000000000000000000')
#print(testq.toString())
#testq.token_balances['ETH'].processCredit('50000000000000000000')
#print(testq.toString())
#testq.token_balances['ETH'].processDebit('5000000000')
#print(testq.toString())
#testq.token_balances['ETH'].processDebit('70000000000000000000000')
#print(testq.toString())
if __name__ == "__main__":
main()