Skip to content

Commit ae1b8f4

Browse files
authored
Implement redis event storage (#100)
* Implement brute redis event storage * Use redis sorted set * Clean-up * Fix * Fix integration tests * Update redis connection * Refactoring * Refactoring * Remove max data checkers * Refactoring * flake8 happy * Fix pytest config
1 parent c28fe45 commit ae1b8f4

File tree

10 files changed

+119
-4
lines changed

10 files changed

+119
-4
lines changed

Diff for: Dockerfile

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ ENV SUBSCRIPTIONS_PER_NEWSFEED="1024"
1111
ENV NEWSFEED_ID_LENGTH="128"
1212
ENV PROCESSOR_CONCURRENCY="4"
1313

14+
ENV EVENT_STORAGE_DSN="NOTSET"
15+
1416
ENV PYTHONUNBUFFERED=1
1517
ENV PYTHONPATH="${PYTHONPATH}:/code/src/"
1618

Diff for: Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
clean:
2-
find . -type f -name "*.py[co]" -delete
3-
find . -type d -name "__pycache__" -delete
2+
find src -type f -name "*.py[co]" -delete
3+
find src -type d -name "__pycache__" -delete
44

55
test: clean
66
py.test tests/unit --cov=src/

Diff for: docker-compose.yml

+4
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ services:
77
environment:
88
LOG_LEVEL: "DEBUG"
99
API_BASE_PATH: "/api/"
10+
EVENT_STORAGE_DSN: "redis://redis:6379?db=0&connection_timeout=5&minsize=1&maxsize=4"
1011
volumes:
1112
- "./:/code"
1213

14+
redis:
15+
image: "redis:alpine"
16+
1317
nginx:
1418
image: nginx
1519
ports:

Diff for: requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
dependency-injector>=3.14
22
uvloop>=0.14
33
aiohttp[speedups]>=3.6
4+
aioredis>=1.3.0
45

56
-r requirements-dev.txt

Diff for: scripts/integration_check.py

+4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ async def test(self):
9191
newsfeed_id=newsfeed_123,
9292
event_id=event_123_1['id'],
9393
)
94+
await asyncio.sleep(0.1)
9495
assert event_123_1_deleted is True
9596

9697
newsfeed_123_events = await self._api_client.get_events(newsfeed_id=newsfeed_123)
@@ -116,9 +117,11 @@ async def test(self):
116117
subscriber_124=subscriber_124,
117118
subscriber_125=subscriber_125,
118119
)
120+
await asyncio.sleep(0.1)
119121

120122
# Add event
121123
event_123_1 = await self._publish_event(newsfeed_id=newsfeed_123)
124+
await asyncio.sleep(0.1)
122125

123126
# Assert event publishing
124127
await self._assert_event_published_to_all_newsfeeds(
@@ -155,6 +158,7 @@ async def test(self):
155158
newsfeed_id=newsfeed_123,
156159
event=event_123_1,
157160
)
161+
await asyncio.sleep(0.1)
158162

159163
# Assert that all events are deleted
160164
await self._assert_no_events(newsfeed_123, subscriber_124, subscriber_125)

Diff for: setup.cfg

+4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@ warn_redundant_casts = True
1717
warn_unused_ignores = True
1818
warn_return_any = True
1919
no_implicit_reexport = True
20+
ignore_missing_imports = True
2021

2122
[mypy-newsfeed.containers]
2223
ignore_errors = True
2324

2425
[mypy-newsfeed.core.loop]
2526
ignore_errors = True
27+
28+
[tool:pytest]
29+
testpaths = tests

Diff for: src/newsfeed/configuration.py

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def get_config() -> Dict[str, Any]:
1818
'event_storage': {
1919
'max_newsfeeds': os.getenv('MAX_NEWSFEEDS'),
2020
'max_events_per_newsfeed': os.getenv('EVENTS_PER_NEWSFEED'),
21+
'dsn': os.getenv('EVENT_STORAGE_DSN'),
2122
},
2223
'subscription_storage': {
2324
'max_newsfeeds': os.getenv('MAX_NEWSFEEDS'),

Diff for: src/newsfeed/containers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class Infrastructure(containers.DeclarativeContainer):
3232
)
3333

3434
event_storage = providers.Singleton(
35-
infrastructure.event_storages.InMemoryEventStorage,
35+
infrastructure.event_storages.RedisEventStorage,
3636
config=config.event_storage,
3737
)
3838

Diff for: src/newsfeed/infrastructure/event_storages.py

+86-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
"""Infrastructure event storages module."""
22

3+
import json
4+
from contextlib import asynccontextmanager
35
from collections import defaultdict, deque
46
from typing import Dict, Deque, Iterable, Union
57

8+
import aioredis
9+
10+
from .utils import parse_redis_dsn
11+
12+
613
EventData = Dict[str, Union[str, int]]
714

815

@@ -34,7 +41,7 @@ class InMemoryEventStorage(EventStorage):
3441
"""Event storage that stores events in memory."""
3542

3643
def __init__(self, config: Dict[str, str]):
37-
"""Initialize queue."""
44+
"""Initialize storage."""
3845
super().__init__(config)
3946
self._storage: Dict[str, Deque[EventData]] = defaultdict(deque)
4047

@@ -84,6 +91,84 @@ async def delete_by_fqid(self, newsfeed_id: str, event_id: str) -> None:
8491
del newsfeed_storage[event_index]
8592

8693

94+
class RedisEventStorage(EventStorage):
95+
"""Event storage that stores events in redis."""
96+
97+
def __init__(self, config: Dict[str, str]):
98+
"""Initialize storage."""
99+
super().__init__(config)
100+
101+
redis_config = parse_redis_dsn(config['dsn'])
102+
self._pool = aioredis.pool.ConnectionsPool(
103+
address=redis_config['address'],
104+
db=int(redis_config['db']),
105+
create_connection_timeout=int(redis_config['connection_timeout']),
106+
minsize=int(redis_config['minsize']),
107+
maxsize=int(redis_config['maxsize']),
108+
encoding='utf-8',
109+
)
110+
111+
async def get_by_newsfeed_id(self, newsfeed_id: str) -> Iterable[EventData]:
112+
"""Get events data from storage."""
113+
async with self._get_connection() as redis:
114+
newsfeed_storage = []
115+
# TODO:
116+
# - Check async
117+
for event in await redis.lrange(key=f'newsfeed_id:{newsfeed_id}',
118+
start=0,
119+
stop=-1):
120+
newsfeed_storage.append(json.loads(event))
121+
return newsfeed_storage
122+
123+
async def get_by_fqid(self, newsfeed_id: str, event_id: str) -> EventData:
124+
"""Return data of specified event."""
125+
async with self._get_connection() as redis:
126+
event = await redis.get(f'event:{event_id}')
127+
128+
if not event:
129+
raise EventNotFound(
130+
newsfeed_id=newsfeed_id,
131+
event_id=event_id,
132+
)
133+
else:
134+
return json.loads(event)
135+
136+
async def add(self, event_data: EventData) -> None:
137+
"""Add event data to the storage."""
138+
newsfeed_id = str(event_data['newsfeed_id'])
139+
140+
async with self._get_connection() as redis:
141+
await redis.lpush(
142+
key=f'newsfeed_id:{newsfeed_id}',
143+
value=json.dumps(event_data),
144+
)
145+
await redis.append(
146+
key=f"event:{event_data['id']}",
147+
value=json.dumps(event_data),
148+
)
149+
150+
async def delete_by_fqid(self, newsfeed_id: str, event_id: str) -> None:
151+
"""Delete data of specified event."""
152+
async with self._get_connection() as redis:
153+
event_key = f'event:{event_id}'
154+
newsfeed = f'newsfeed_id:{newsfeed_id}'
155+
event = await redis.get(event_key)
156+
if not event:
157+
raise EventNotFound(
158+
newsfeed_id=newsfeed_id,
159+
event_id=event_id,
160+
)
161+
else:
162+
await redis.lrem(newsfeed, 1, event)
163+
await redis.delete(event_key)
164+
165+
@asynccontextmanager
166+
async def _get_connection(self) -> aioredis.commands.Redis:
167+
async with self._pool.get() as connection:
168+
redis = aioredis.commands.Redis(connection)
169+
yield redis
170+
171+
87172
class EventStorageError(Exception):
88173
"""Event-storage-related error."""
89174

Diff for: src/newsfeed/infrastructure/utils.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
"""Utils module for infrastructure."""
2+
3+
from typing import Any, Dict
4+
from urllib.parse import urlparse, parse_qsl
5+
6+
7+
def parse_redis_dsn(dsn: str) -> Dict[str, Any]:
8+
"""Redis dsn parser."""
9+
parsed_dsn = urlparse(dsn)
10+
assert parsed_dsn.scheme == 'redis', ('Unsupported URI scheme', parsed_dsn.scheme)
11+
return {
12+
'address': (parsed_dsn.hostname, int(parsed_dsn.port)),
13+
**dict(parse_qsl(parsed_dsn.query))
14+
}

0 commit comments

Comments
 (0)