Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,21 @@ Usage
You can also set the optional ``ETHEREUM_LOGS_BATCH_SIZE`` setting which limits the maximum amount of the blocks that can be read at a time from the celery task.


*******************
Using event filters
*******************
***************
Operation modes
***************

The event listener can operate in different modes:

* ``blocks``: this is the default mode, and goes block by block and tx by tx fetching the receipts and searching for logs matching the monitored ones.
* ``filters``: this uses ``web3.eth.filter`` to search each of the monitored events in the range.
* ``auto``: in this mode, the listener automatically chooses whether to use ``blocks`` mode or ``filters`` mode, by checking if the number of blocks to process exceeds a given threshold (``ETHEREUM_LOGS_AUTO_THRESHOLD``, default 5000). If it exceeds that threshold, it uses ``filters`` mode, otherwise, it uses ``blocks`` mode.

If your Ethereum Node supports log filters, you can activate it in the Django settings and it will use filters instead of iterating thru all blocks and all transactions.

.. code-block:: python

ETHEREUM_LOGS_FILTER_AVAILABLE = True
ETHEREUM_LOGS_MODE = "auto"
ETHEREUM_LOGS_AUTO_THRESHOLD = 500



Expand Down
1 change: 0 additions & 1 deletion django_ethereum_events/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
default_app_config = 'django_ethereum_events.apps.EthereumEventsConfig'
42 changes: 29 additions & 13 deletions django_ethereum_events/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ def __init__(self, *args, **kwargs):
def _get_block_range(self):
current = self.web3.eth.blockNumber
step = getattr(settings, "ETHEREUM_LOGS_BATCH_SIZE", 10000)
stay_behind = getattr(settings, "ETHEREUM_LOGS_STAY_BEHIND_BLOCKS", None)
if stay_behind:
current -= stay_behind
if self.daemon.block_number < current:
start = self.daemon.block_number + 1
return start, min(current, start + step)

return None, None

def get_pending_blocks(self):
def get_pending_blocks(self, start=None, end=None):
"""
Retrieve the blocks that have not been processed.

Expand All @@ -42,7 +45,8 @@ def get_pending_blocks(self):
the unprocessed block numbers.

"""
start, end = self._get_block_range()
if start is None and end is None:
start, end = self._get_block_range()
if start is None:
return []
else:
Expand Down Expand Up @@ -73,6 +77,8 @@ def get_block_logs(self, block_number):

for log in receipt.get('logs', []):
address = log['address']
if not log['topics']:
continue
topic = log['topics'][0].hex()
if (address, topic) in self.decoder.monitored_events:
relevant_logs.append(log)
Expand Down Expand Up @@ -139,18 +145,26 @@ def check_for_state_updates(self, block_number):
self.decoder.refresh_state(block_number=block_number)
refresh_cache_update_value(update_required=False)

def execute(self):
def execute(self, mode=None):
"""Program loop, does all the underlying work."""
if getattr(settings, "ETHEREUM_LOGS_FILTER_AVAILABLE", False):
self._execute_using_filters()
else:
self._execute_iterating_all_blocks()

def _execute_using_filters(self):
"""Uses filters to fetch required logs"""
start, end = self._get_block_range()
if start is None:
return

mode = mode or getattr(settings, "ETHEREUM_LOGS_MODE", None)
if mode is None:
mode = "filters" if getattr(settings, "ETHEREUM_LOGS_FILTER_AVAILABLE", False) else "blocks"
elif mode == "auto":
threshold = getattr(settings, "ETHEREUM_LOGS_AUTO_THRESHOLD", 5000)
mode = "blocks" if (end - start) <= threshold else "filters"

if mode == "filters":
self._execute_using_filters(start, end)
else:
self._execute_iterating_all_blocks(start, end)

def _execute_using_filters(self, start, end):
"""Uses filters to fetch required logs"""
all_logs = []

for (address, topic) in self.decoder.monitored_events.keys():
Expand All @@ -160,16 +174,18 @@ def _execute_using_filters(self):
"fromBlock": start,
"toBlock": end,
})
all_logs.extend(log_filter.get_all_entries())
all_logs.extend(
[evt for evt in log_filter.get_all_entries() if not evt.get("removed", False)]
)

all_logs.sort(key=lambda log: (log["blockNumber"], log["logIndex"]))
decoded_logs = self.decoder.decode_logs(all_logs)
self.save_events(decoded_logs)
self.update_block_number(end)

def _execute_iterating_all_blocks(self):
def _execute_iterating_all_blocks(self, start, end):
"""Executes iterating thru all blocks and txs"""
pending_blocks = self.get_pending_blocks()
pending_blocks = self.get_pending_blocks(start, end)
for block in pending_blocks:
self.check_for_state_updates(block)
logs = self.get_block_logs(block)
Expand Down
5 changes: 1 addition & 4 deletions django_ethereum_events/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

from django.core.validators import MinLengthValidator
from django.db import models
try:
from django.utils.translation import ugettext_lazy as _
except ImportError:
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_lazy as _

from solo.models import SingletonModel

Expand Down