-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathelasticdatastore.py
79 lines (61 loc) · 2.59 KB
/
elasticdatastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import datetime
import logging
from datastore import Datastore
from elasticsearch import Elasticsearch
from elasticsearch import helpers
class ElasticDatastore(Datastore):
TX_INDEX_NAME = "ethereum-transaction"
B_INDEX_NAME = "ethereum-block"
def __init__(self):
super().__init__()
self.elastic = Elasticsearch([self.es_url], maxsize=self.es_maxsize,
timeout=30, max_retries=10, retry_on_timeout=True)
@classmethod
def config(cls, es_url, es_maxsize):
cls.es_url = es_url
cls.es_maxsize = es_maxsize
def extract(self, rpc_block):
block = rpc_block["result"]
transactions = block["transactions"]
tx_hashes = list()
tx_value_sum = 0
block_nb = int(block["number"], 0)
block_timestamp = datetime.datetime.fromtimestamp(int(block["timestamp"], 0))
for tx in transactions:
tx["blockNumber"] = block_nb
tx["blockTimestamp"] = block_timestamp
# Convert wei into ether
tx["value"] = int(tx["value"], 0) / self.WEI_ETH_FACTOR
tx_value_sum += tx["value"]
self.actions.append(
{"_index": self.TX_INDEX_NAME, "_type": "tx", "_id": tx["hash"], "_source": tx}
)
tx_hashes.append(tx["hash"])
block["transactions"] = tx_hashes
block["number"] = block_nb
block["timestamp"] = block_timestamp
block["gasLimit"] = int(block["gasLimit"], 0)
block["gasUsed"] = int(block["gasUsed"], 0)
block["size"] = int(block["size"], 0)
block["transactionCount"] = len(tx_hashes)
block["txValueSum"] = tx_value_sum
self.actions.append(
{"_index": self.B_INDEX_NAME, "_type": "b", "_id": block_nb, "_source": block}
)
def save(self):
nb_blocks = sum(act["_type"] == "b" for act in self.actions)
nb_txs = sum(act["_type"] == "tx" for act in self.actions)
if self.actions:
try:
helpers.bulk(self.elastic, self.actions)
return "{} blocks and {} transactions indexed".format(
nb_blocks, nb_txs
)
except helpers.BulkIndexError as exception:
print("Issue with {} blocks:\n{}\n".format(nb_blocks, exception))
blocks = (act for act in self.actions if act["_type"] == "b")
for block in blocks:
logging.error("block: " + str(block["_id"]))
@staticmethod
def request(url, **kwargs):
return Elasticsearch([url]).search(**kwargs)