Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 79 additions & 84 deletions echonet/l1_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional
from typing import Any, Callable, Dict, List, Optional

import functools
import inspect
import logging
import requests

logger = logging.getLogger(__name__)


class L1Client:
L1_MAINNET_URL = "https://eth-mainnet.g.alchemy.com/v2/{api_key}"
Expand All @@ -19,7 +19,6 @@ class L1Client:
)
# Taken from ethereum_base_layer_contracts.rs
STARKNET_L1_CONTRACT_ADDRESS = "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
RETRIES_COUNT = 2

@dataclass(frozen=True)
class Log:
Expand All @@ -38,52 +37,83 @@ class Log:
removed: bool
block_timestamp: int

@staticmethod
def get_logs(from_block: int, to_block: int, api_key: str) -> List["L1Client.Log"]:
def __init__(
self,
api_key: str,
timeout: int = 10,
retries_count: int = 2,
):
self.api_key = api_key
self.logger = logging.Logger("L1Client")
self.timeout = timeout
self.retries_count = retries_count
self.rpc_url = self.L1_MAINNET_URL.format(api_key=api_key)
self.data_api_url = self.DATA_BLOCKS_BY_TIMESTAMP_URL_FMT.format(api_key=api_key)

def _run_request_with_retry(
self,
request_func: Callable,
additional_log_context: Dict[str, Any],
) -> Optional[Dict]:
caller_name = inspect.currentframe().f_back.f_code.co_name

for attempt in range(self.retries_count):
try:
response = request_func(timeout=self.timeout)
response.raise_for_status()
result = response.json()
self.logger.debug(
f"{caller_name} succeeded on attempt {attempt + 1}",
extra=additional_log_context,
)
return result
except (requests.RequestException, ValueError):
self.logger.debug(
f"{caller_name} attempt {attempt + 1}/{self.retries_count} failed",
extra=additional_log_context,
exc_info=True,
)

self.logger.error(
f"{caller_name} failed after {self.retries_count} attempts, returning None",
extra=additional_log_context,
)

return None

def get_logs(self, from_block: int, to_block: int) -> List["L1Client.Log"]:
"""
Get logs from Ethereum using eth_getLogs RPC method.
Tries up to RETRIES_COUNT times. On failure, logs an error and returns [].
Tries up to retries_count times. On failure, logs an error and returns [].
"""
if from_block > to_block:
raise ValueError("from_block must be less than or equal to to_block")

rpc_url = L1Client.L1_MAINNET_URL.format(api_key=api_key)

payload = {
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [
{
"fromBlock": hex(from_block),
"toBlock": hex(to_block),
"address": L1Client.STARKNET_L1_CONTRACT_ADDRESS,
"topics": [L1Client.LOG_MESSAGE_TO_L2_EVENT_SIGNATURE],
"address": self.STARKNET_L1_CONTRACT_ADDRESS,
"topics": [self.LOG_MESSAGE_TO_L2_EVENT_SIGNATURE],
}
],
"id": 1,
}

for attempt in range(L1Client.RETRIES_COUNT):
try:
response = requests.post(rpc_url, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
logger.debug(
f"get_logs succeeded on attempt {attempt + 1}",
extra={"url": rpc_url, "from_block": from_block, "to_block": to_block},
)
break
except (requests.RequestException, ValueError):
logger.debug(
f"get_logs attempt {attempt + 1}/{L1Client.RETRIES_COUNT} failed",
extra={"url": rpc_url, "from_block": from_block, "to_block": to_block},
exc_info=True,
)
else:
logger.error(
f"get_logs failed after {L1Client.RETRIES_COUNT} attempts, returning []",
extra={"url": rpc_url, "from_block": from_block, "to_block": to_block},
)
request_func = functools.partial(requests.post, self.rpc_url, json=payload)
data = self._run_request_with_retry(
request_func=request_func,
additional_log_context={
"url": self.rpc_url,
"from_block": from_block,
"to_block": to_block,
},
)

if data is None:
return []

results = data.get("result", [])
Expand All @@ -104,43 +134,25 @@ def get_logs(from_block: int, to_block: int, api_key: str) -> List["L1Client.Log
for result in results
]

@staticmethod
def get_timestamp_of_block(block_number: int, api_key: str) -> Optional[int]:
def get_timestamp_of_block(self, block_number: int) -> Optional[int]:
"""
Get block timestamp by block number using eth_getBlockByNumber RPC method.
Tries up to RETRIES_COUNT times. On failure, logs an error and returns None.
Tries up to retries_count times. On failure, logs an error and returns None.
"""
rpc_url = L1Client.L1_MAINNET_URL.format(api_key=api_key)

payload = {
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(block_number), False],
"id": 1,
}

for attempt in range(L1Client.RETRIES_COUNT):
try:
response = requests.post(rpc_url, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
logger.debug(
f"get_timestamp_of_block succeeded on attempt {attempt + 1}",
extra={"url": rpc_url, "block_number": block_number},
)
break # success -> exit loop
except (requests.RequestException, ValueError) as exc:
logger.debug(
f"get_timestamp_of_block attempt {attempt + 1}/{L1Client.RETRIES_COUNT} failed",
extra={"url": rpc_url, "block_number": block_number},
exc_info=True,
)
request_func = functools.partial(requests.post, self.rpc_url, json=payload)
result = self._run_request_with_retry(
request_func=request_func,
additional_log_context={"url": self.rpc_url, "block_number": block_number},
)

else:
logger.error(
f"get_timestamp_of_block failed after {L1Client.RETRIES_COUNT} attempts, returning None",
extra={"url": rpc_url, "block_number": block_number},
)
if result is None:
return None

block = result.get("result")
Expand All @@ -151,14 +163,11 @@ def get_timestamp_of_block(block_number: int, api_key: str) -> Optional[int]:
# Timestamp is hex string, convert to int.
return int(block["timestamp"], 16)

@staticmethod
def get_block_number_by_timestamp(timestamp: int, api_key: str) -> Optional[int]:
def get_block_number_by_timestamp(self, timestamp: int) -> Optional[int]:
"""
Get the block number at/after a given timestamp using blocks-by-timestamp API.
Tries up to RETRIES_COUNT times. On failure, logs an error and returns None.
Tries up to retries_count times. On failure, logs an error and returns None.
"""
rpc_url = L1Client.DATA_BLOCKS_BY_TIMESTAMP_URL_FMT.format(api_key=api_key)

timestamp_iso = (
datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat().replace("+00:00", "Z")
)
Expand All @@ -169,27 +178,13 @@ def get_block_number_by_timestamp(timestamp: int, api_key: str) -> Optional[int]
"direction": "AFTER",
}

for attempt in range(L1Client.RETRIES_COUNT):
try:
response = requests.get(rpc_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
logger.debug(
f"get_block_number_by_timestamp succeeded on attempt {attempt + 1}",
extra={"url": rpc_url, "timestamp": timestamp},
)
break # success -> exit loop
except (requests.RequestException, ValueError) as exc:
logger.debug(
f"get_block_number_by_timestamp attempt {attempt + 1}/{L1Client.RETRIES_COUNT} failed",
extra={"url": rpc_url, "timestamp": timestamp},
exc_info=True,
)
else:
logger.error(
f"get_block_number_by_timestamp failed after {L1Client.RETRIES_COUNT} attempts, returning None",
extra={"url": rpc_url, "timestamp": timestamp},
)
request_func = functools.partial(requests.get, self.data_api_url, params=params)
data = self._run_request_with_retry(
request_func=request_func,
additional_log_context={"url": self.data_api_url, "timestamp": timestamp},
)

if data is None:
return None

items = data.get("data", [])
Expand Down
32 changes: 16 additions & 16 deletions echonet/tests/test_l1_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,24 @@ def test_get_logs_retries_after_exception_and_succeeds_on_second_attempt(self, m

mock_post.side_effect = [request_exception, successful_response]

logs = L1Client.get_logs(
client = L1Client(api_key="api_key")
logs = client.get_logs(
from_block=self.BLOCK_NUMBER_SAMPLE,
to_block=self.BLOCK_NUMBER_SAMPLE,
api_key="api_key",
)

self.assertEqual(mock_post.call_count, 2)
self.assertEqual(logs, [self.EXPECTED_LOG_SAMPLE])

def test_get_logs_raises_on_invalid_block_range(self):
client = L1Client(api_key="api_key")
with self.assertRaisesRegex(
ValueError,
"from_block must be less than or equal to to_block",
):
L1Client.get_logs(
client.get_logs(
from_block=11,
to_block=10,
api_key="api_key",
)

@patch("l1_client.requests.post")
Expand Down Expand Up @@ -116,10 +116,10 @@ def test_get_logs_parses_several_results(self, mock_post):

mock_post.return_value = response_ok

logs = L1Client.get_logs(
client = L1Client(api_key="api_key")
logs = client.get_logs(
from_block=1,
to_block=2,
api_key="api_key",
)

self.assertEqual(mock_post.call_count, 1)
Expand Down Expand Up @@ -161,10 +161,10 @@ def test_get_logs_when_rpc_result_is_empty(self, mock_post):

mock_post.return_value = response_ok

logs = L1Client.get_logs(
client = L1Client(api_key="api_key")
logs = client.get_logs(
from_block=1,
to_block=1,
api_key="api_key",
)

self.assertEqual(mock_post.call_count, 1)
Expand All @@ -180,9 +180,9 @@ def test_get_timestamp_of_block_retries_after_failure_and_succeeds(self, mock_po

mock_post.side_effect = [request_exception, successful_response]

result = L1Client.get_timestamp_of_block(
client = L1Client(api_key="api_key")
result = client.get_timestamp_of_block(
block_number=123,
api_key="api_key",
)

self.assertEqual(mock_post.call_count, 2)
Expand All @@ -196,9 +196,9 @@ def test_get_timestamp_of_block_returns_none_when_rpc_result_is_empty(self, mock

mock_post.return_value = response_ok

result = L1Client.get_timestamp_of_block(
client = L1Client(api_key="api_key")
result = client.get_timestamp_of_block(
block_number=123,
api_key="api_key",
)

self.assertEqual(mock_post.call_count, 1)
Expand All @@ -216,9 +216,9 @@ def test_get_block_number_by_timestamp_retries_after_failure_and_succeeds(self,

mock_get.side_effect = [request_exception, successful_response]

result = L1Client.get_block_number_by_timestamp(
client = L1Client(api_key="api_key")
result = client.get_block_number_by_timestamp(
timestamp=1_600_000_000,
api_key="api_key",
)

self.assertEqual(mock_get.call_count, 2)
Expand All @@ -232,9 +232,9 @@ def test_get_block_number_by_timestamp_returns_none_when_rpc_result_is_empty(sel

mock_get.return_value = response_ok

result = L1Client.get_block_number_by_timestamp(
client = L1Client(api_key="api_key")
result = client.get_block_number_by_timestamp(
timestamp=1_600_000_000,
api_key="api_key",
)

self.assertEqual(mock_get.call_count, 1)
Expand Down