Skip to content
This repository was archived by the owner on Jul 1, 2021. It is now read-only.

Commit 31691c0

Browse files
authored
Persist peer info between runs (#270)
Temporarily blacklist peers when specific errors occur. e.g. when a peer has the wrong genesis don't attempt to reconnect to it for at least a day. This improves the speed at which peers are found because less time is wasted reconnecting to peers we already know don't work. - Create SQLlite-backed SQLitePeerInfo - Create memory-backed MemoryPeerInfo - FullNode stores peer info in a file called 'nodedb' in the data dir
1 parent c2d3ca6 commit 31691c0

File tree

10 files changed

+491
-5
lines changed

10 files changed

+491
-5
lines changed

docs/release_notes/trinity.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Trinity
44
Unreleased (latest source)
55
--------------------------
66

7+
- `#270 <https://github.com/ethereum/trinity/pull/270>`_: Performance: Persist information on peers between runs
78
- `#268 <https://github.com/ethereum/trinity/pull/268>`_: Maintenance: Add more bootnodes, use all the Geth and Parity bootnodes
89
- `#227 <https://github.com/ethereum/trinity/pull/227>`_: Bugfix: Do not accidentially create many processes that sit idle
910
- `#227 <https://github.com/ethereum/trinity/pull/227>`_: Tests: Cover APIs that also hit the database in `trinity attach` tests

p2p/exceptions.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,27 @@ class HandshakeFailure(BaseP2PError):
3131
pass
3232

3333

34+
class WrongNetworkFailure(HandshakeFailure):
35+
"""
36+
Disconnected from the peer because it's on a different network than we're on
37+
"""
38+
pass
39+
40+
41+
class WrongGenesisFailure(HandshakeFailure):
42+
"""
43+
Disconnected from the peer because it has a different genesis than we do
44+
"""
45+
pass
46+
47+
48+
class TooManyPeersFailure(HandshakeFailure):
49+
"""
50+
The remote disconnected from us because it has too many peers
51+
"""
52+
pass
53+
54+
3455
class MalformedMessage(BaseP2PError):
3556
"""
3657
Raised when a p2p command is received with a malformed message
@@ -167,3 +188,10 @@ class UnableToGetDiscV5Ticket(BaseP2PError):
167188
Raised when we're unable to get a discv5 ticket from a remote peer.
168189
"""
169190
pass
191+
192+
193+
class BadDatabaseError(BaseP2PError):
194+
"""
195+
The local database wasn't in quite the format we were expecting
196+
"""
197+
pass

p2p/peer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
NoMatchingPeerCapabilities,
5454
PeerConnectionLost,
5555
RemoteDisconnected,
56+
TooManyPeersFailure,
5657
UnexpectedMessage,
5758
UnknownProtocolCommand,
5859
UnreachablePeer,
@@ -293,6 +294,8 @@ async def do_sub_proto_handshake(self) -> None:
293294
if isinstance(cmd, Disconnect):
294295
msg = cast(Dict[str, Any], msg)
295296
# Peers sometimes send a disconnect msg before they send the sub-proto handshake.
297+
if msg['reason'] == DisconnectReason.too_many_peers.value:
298+
raise TooManyPeersFailure(f'{self} disconnected from us before handshake')
296299
raise HandshakeFailure(
297300
f"{self} disconnected before completing sub-proto handshake: {msg['reason_name']}"
298301
)
@@ -316,6 +319,8 @@ async def do_p2p_handshake(self) -> None:
316319
if isinstance(cmd, Disconnect):
317320
msg = cast(Dict[str, Any], msg)
318321
# Peers sometimes send a disconnect msg before they send the initial P2P handshake.
322+
if msg['reason'] == DisconnectReason.too_many_peers.value:
323+
raise TooManyPeersFailure(f'{self} disconnected from us before handshake')
319324
raise HandshakeFailure(
320325
f"{self} disconnected before completing sub-proto handshake: {msg['reason_name']}"
321326
)

p2p/peer_pool.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@
6060
PeerMessage,
6161
PeerSubscriber,
6262
)
63+
from p2p.persistence import (
64+
BasePeerInfo,
65+
NoopPeerInfo,
66+
)
6367
from p2p.p2p_proto import (
6468
DisconnectReason,
6569
)
@@ -79,11 +83,14 @@ def __init__(self,
7983
privkey: datatypes.PrivateKey,
8084
context: BasePeerContext,
8185
max_peers: int = DEFAULT_MAX_PEERS,
86+
peer_info: BasePeerInfo = NoopPeerInfo(),
8287
token: CancelToken = None,
8388
event_bus: Endpoint = None
8489
) -> None:
8590
super().__init__(token)
8691

92+
self.peer_info = peer_info
93+
8794
self.privkey = privkey
8895
self.max_peers = max_peers
8996
self.context = context
@@ -246,8 +253,9 @@ async def connect(self, remote: Node) -> BasePeer:
246253
if remote in self.connected_nodes:
247254
self.logger.debug("Skipping %s; already connected to it", remote)
248255
return None
256+
if not self.peer_info.should_connect_to(remote):
257+
return None
249258
expected_exceptions = (
250-
HandshakeFailure,
251259
PeerConnectionLost,
252260
TimeoutError,
253261
UnreachablePeer,
@@ -274,6 +282,9 @@ async def connect(self, remote: Node) -> BasePeer:
274282
self.logger.error('Got malformed response from %r during handshake', remote)
275283
# dump the full stacktrace in the debug logs
276284
self.logger.debug('Got malformed response from %r', remote, exc_info=True)
285+
except HandshakeFailure as e:
286+
self.logger.debug("Could not complete handshake with %r: %s", remote, repr(e))
287+
self.peer_info.record_failure(remote, e)
277288
except expected_exceptions as e:
278289
self.logger.debug("Could not complete handshake with %r: %s", remote, repr(e))
279290
except Exception:

p2p/persistence.py

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
from abc import ABC, abstractmethod
2+
from collections import namedtuple
3+
import datetime
4+
import functools
5+
from pathlib import Path
6+
import sqlite3
7+
from typing import Any, Callable, TypeVar, cast, Dict, Type, Optional
8+
9+
from trinity._utils.logging import HasExtendedDebugLogger
10+
11+
from p2p.kademlia import Node
12+
from p2p.exceptions import (
13+
BadDatabaseError,
14+
BaseP2PError,
15+
HandshakeFailure,
16+
TooManyPeersFailure,
17+
WrongNetworkFailure,
18+
WrongGenesisFailure,
19+
)
20+
21+
22+
BadNode = namedtuple('BadNode', ['enode', 'until', 'reason', 'error_count'])
23+
24+
25+
ONE_DAY = 60 * 60 * 24
26+
FAILURE_TIMEOUTS: Dict[Type[Exception], int] = {
27+
HandshakeFailure: 10, # 10 seconds
28+
WrongNetworkFailure: ONE_DAY,
29+
WrongGenesisFailure: ONE_DAY,
30+
TooManyPeersFailure: 60, # one minute
31+
}
32+
33+
34+
def timeout_for_failure(failure: BaseP2PError) -> int:
35+
for cls in type(failure).__mro__:
36+
if cls in FAILURE_TIMEOUTS:
37+
return FAILURE_TIMEOUTS[cls]
38+
failure_name = type(failure).__name__
39+
raise Exception(f'Unknown failure type: {failure_name}')
40+
41+
42+
def time_to_str(time: datetime.datetime) -> str:
43+
return time.isoformat(timespec='seconds')
44+
45+
46+
def str_to_time(as_str: str) -> datetime.datetime:
47+
# use datetime.datetime.fromisoformat once support for 3.6 is dropped
48+
return datetime.datetime.strptime(as_str, "%Y-%m-%dT%H:%M:%S")
49+
50+
51+
def utc_to_local(utc: datetime.datetime) -> datetime.datetime:
52+
local_tz = datetime.datetime.now().astimezone()
53+
return utc + local_tz.utcoffset()
54+
55+
56+
class BasePeerInfo(ABC, HasExtendedDebugLogger):
57+
@abstractmethod
58+
def record_failure(self, remote: Node, failure: BaseP2PError) -> None:
59+
pass
60+
61+
@abstractmethod
62+
def should_connect_to(self, remote: Node) -> bool:
63+
pass
64+
65+
66+
class NoopPeerInfo(BasePeerInfo):
67+
def record_failure(self, remote: Node, failure: BaseP2PError) -> None:
68+
pass
69+
70+
def should_connect_to(self, remote: Node) -> bool:
71+
return True
72+
73+
74+
class ClosedException(Exception):
75+
'This should never happen, this represents a logic error somewhere in the code'
76+
pass
77+
78+
79+
T = TypeVar('T', bound=Callable[..., Any])
80+
81+
82+
def must_be_open(func: T) -> T:
83+
@functools.wraps(func)
84+
def wrapper(self: 'SQLitePeerInfo', *args: Any, **kwargs: Any) -> Any:
85+
if self.closed:
86+
msg = "SQLitePeerInfo cannot be used after it's been closed"
87+
raise ClosedException(msg)
88+
return func(self, *args, **kwargs)
89+
return cast(T, wrapper)
90+
91+
92+
class SQLitePeerInfo(BasePeerInfo):
93+
def __init__(self, path: Path) -> None:
94+
self.path = path
95+
self.closed = False
96+
97+
# python 3.6 does not support sqlite3.connect(Path)
98+
self.db = sqlite3.connect(str(self.path))
99+
self.db.row_factory = sqlite3.Row
100+
self.setup_schema()
101+
102+
def __str__(self) -> str:
103+
return f'<SQLitePeerInfo({self.path})>'
104+
105+
@must_be_open
106+
def record_failure(self, remote: Node, failure: BaseP2PError) -> None:
107+
failure_name = type(failure).__name__
108+
timeout = timeout_for_failure(failure)
109+
110+
self._record_bad_node(
111+
remote,
112+
timeout=timeout, # one minute
113+
reason=failure_name
114+
)
115+
116+
@must_be_open
117+
def _record_bad_node(self, remote: Node, timeout: int, reason: str) -> None:
118+
enode = remote.uri()
119+
bad_node = self._fetch_bad_node(remote)
120+
now = datetime.datetime.utcnow()
121+
if bad_node:
122+
new_error_count = bad_node.error_count + 1
123+
usable_time = now + datetime.timedelta(seconds=timeout * new_error_count)
124+
local_time = utc_to_local(usable_time)
125+
self.logger.debug(
126+
'%s will not be retried until %s because %s', remote, local_time, reason
127+
)
128+
self._update_bad_node(enode, usable_time, reason, new_error_count)
129+
return
130+
131+
usable_time = now + datetime.timedelta(seconds=timeout)
132+
local_time = utc_to_local(usable_time)
133+
self.logger.debug(
134+
'%s will not be retried until %s because %s', remote, local_time, reason
135+
)
136+
self._insert_bad_node(enode, usable_time, reason, error_count=1)
137+
138+
@must_be_open
139+
def should_connect_to(self, remote: Node) -> bool:
140+
bad_node = self._fetch_bad_node(remote)
141+
142+
if not bad_node:
143+
return True
144+
145+
until = str_to_time(bad_node.until)
146+
if datetime.datetime.utcnow() < until:
147+
local_time = utc_to_local(until)
148+
self.logger.debug(
149+
'skipping %s, it failed because "%s" and is not usable until %s',
150+
remote, bad_node.reason, local_time
151+
)
152+
return False
153+
154+
return True
155+
156+
def _fetch_bad_node(self, remote: Node) -> Optional[BadNode]:
157+
enode = remote.uri()
158+
cursor = self.db.execute('SELECT * from bad_nodes WHERE enode = ?', (enode,))
159+
row = cursor.fetchone()
160+
if not row:
161+
return None
162+
result = BadNode(row['enode'], row['until'], row['reason'], row['error_count'])
163+
return result
164+
165+
def _insert_bad_node(self,
166+
enode: str,
167+
until: datetime.datetime,
168+
reason: str,
169+
error_count: int) -> None:
170+
with self.db:
171+
self.db.execute(
172+
'''
173+
INSERT INTO bad_nodes (enode, until, reason, error_count)
174+
VALUES (?, ?, ?, ?)
175+
''',
176+
(enode, time_to_str(until), reason, error_count),
177+
)
178+
179+
def _update_bad_node(self,
180+
enode: str,
181+
until: datetime.datetime,
182+
reason: str,
183+
error_count: int) -> None:
184+
with self.db:
185+
self.db.execute(
186+
'''
187+
UPDATE bad_nodes
188+
SET until = ?, reason = ?, error_count = ?
189+
WHERE enode = ?
190+
''',
191+
(time_to_str(until), reason, error_count, enode),
192+
)
193+
194+
def close(self) -> None:
195+
self.db.close()
196+
self.db = None
197+
self.closed = True
198+
199+
@must_be_open
200+
def setup_schema(self) -> None:
201+
try:
202+
if self._schema_already_created():
203+
return
204+
except Exception:
205+
self.close()
206+
raise
207+
208+
with self.db:
209+
self.db.execute('create table bad_nodes (enode, until, reason, error_count)')
210+
self.db.execute('create table schema_version (version)')
211+
self.db.execute('insert into schema_version VALUES (1)')
212+
213+
def _schema_already_created(self) -> bool:
214+
"Inspects the database to see if the expected tables already exist"
215+
216+
count = self.db.execute("""
217+
SELECT count() FROM sqlite_master
218+
WHERE type='table' AND name='schema_version'
219+
""").fetchone()['count()']
220+
if count == 0:
221+
return False
222+
223+
# a schema_version table already exists, get the version
224+
cur = self.db.execute("SELECT version FROM schema_version")
225+
rows = cur.fetchall()
226+
if len(rows) != 1:
227+
self.logger.error(
228+
"malformed nodedb. try deleting %s. (got rows: %s)",
229+
self.path, rows,
230+
)
231+
raise BadDatabaseError(
232+
"malformed nodedb: Expected one row in schema_version and got %s",
233+
len(rows),
234+
)
235+
version = rows[0]['version']
236+
if version != 1:
237+
# in the future this block might kick off a schema migration
238+
self.logger.error("malformed. try deleting %s", self.path)
239+
raise BadDatabaseError(
240+
"cannot read nodedb: version %s is unsupported", version
241+
)
242+
243+
# schema_version exists and is 1, this database has already been initialized!
244+
return True
245+
246+
247+
class MemoryPeerInfo(SQLitePeerInfo):
248+
def __init__(self) -> None:
249+
super().__init__(Path(":memory:"))
250+
251+
def __str__(self) -> str:
252+
return '<MemoryPeerInfo()>'

0 commit comments

Comments
 (0)