Skip to content

Commit 88c9f77

Browse files
committed
distributed writes 1
1 parent 74d9491 commit 88c9f77

File tree

8 files changed

+111
-78
lines changed

8 files changed

+111
-78
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ wheels/
1111

1212
# data
1313
trie_checkpoints
14-
trie_wal
14+
trie_wal
15+
16+
tmp/

README.md

+11
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,17 @@ A node in the ring owns some portion of the trie. The size of these portions is
4040

4141
Increasing k-factor decreases memory usage of tries (since trie can be split into smaller shards) at a cost of slightly increasing memory usage of coordinator (rangemap). Main issue is functional since you can't currently search for a trie with a length less than the k-factor.
4242

43+
## Components
44+
45+
* API
46+
* Coordinator
47+
* ShardedTrie
48+
* LocalDB
49+
* Gossip
50+
* ~Registration
51+
52+
Request flow: API -> Coordinator -> ShardedTrie -> LocalDB
53+
4354
## Developing locally
4455

4556
```sh

checkpoint.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
import glob
77

88
class Checkpoint:
9-
def __init__(self, config, distributed_trie):
9+
def __init__(self, config, sharded_trie):
1010
self.config = config
11-
self.distributed_trie = distributed_trie
11+
self.sharded_trie = sharded_trie
1212
self.dir = config.current()["checkpoint_dir"]
1313
self._ensure_dir()
1414

@@ -20,7 +20,7 @@ def load(self):
2020
for line in f.readlines():
2121
prefix, object = line.rstrip().split(" ", maxsplit=1)
2222
trie_dict = json.loads(object)
23-
self.distributed_trie.load(prefix, trie_dict)
23+
self.sharded_trie.load(prefix, trie_dict)
2424
return True
2525

2626

@@ -29,7 +29,7 @@ def save(self):
2929
timestamp = str(int(time.time() * 100))
3030
tmp = open(os.path.join(os.getcwd(), self.dir, timestamp), "a+")
3131
try:
32-
for prefix, trie in self.distributed_trie.range_tries.items():
32+
for prefix, trie in self.sharded_trie.range_tries.items():
3333
print(f"{prefix} {json.dumps(trie.dict())}", file=tmp)
3434
checkpointed = True
3535
finally:
@@ -68,13 +68,12 @@ def _ensure_dir(self):
6868

6969

7070
# from config import Config
71-
# from distributed_trie import DistributedTrie
71+
# from sharded_trie import ShardedTrie
7272
# from wal import Wal
7373

7474
# config = Config("config.yaml")
7575
# wal = Wal(config)
76-
# distributed_trie = DistributedTrie(config, wal)
77-
# trie = distributed_trie
76+
# trie = ShardedTrie(config, wal)
7877

7978
# trie.add("twitter")
8079
# trie.add("twitch")

coordinator.py

+59-15
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,45 @@
22
import random
33

44
class Coordinator:
5-
def __init__(self, config, cluster, rangemap):
5+
def __init__(self, config, cluster, rangemap, local_trie):
66
self.config = config
77
self.cluster = cluster
88
self.rangemap = rangemap
9+
self.local_trie = local_trie
910

10-
def matches(self, query, nodes):
11-
node = self.random_healthy_node(nodes)
11+
def matches(self, query):
12+
preference_list = self.preference_list(query)
13+
if self.config.node_id in preference_list:
14+
return self.local_trie.matches(query)
15+
node = self.random_healthy_node(preference_list)
1216
if not node: return []
1317
try:
1418
req = httpx.get(f"{node}/search", params={"q": query})
1519
if req.status_code == 200:
16-
# todo decode
1720
return req.json()
1821
except Exception as err:
1922
print(f"Unexpected {err=}, when proxying to {node} with query {query} {type(err)=}")
2023
return []
2124

25+
def add(self, word):
26+
return self._write_word("POST", word)
27+
28+
def remove(self, word):
29+
return self._write_word("DELETE", word)
30+
31+
def preference_list(self, word):
32+
k_factor = self.config.current()["k_factor"]
33+
prefix, rem = word[:k_factor], word[k_factor+1:]
34+
if len(prefix) != k_factor:
35+
# We don't support word lengths at or below the k-factor
36+
return []
37+
return self.healthy_nodes(self.rangemap.nodes_for_key(prefix))
38+
39+
def healthy_nodes(self, nodes):
40+
if len(nodes) == 0: return set()
41+
pnodes = self.cluster.nodes
42+
return set([node["host"] for node in pnodes if node["host"] in nodes and node["status"]["healthy"]])
43+
2244
def random_healthy_node(self, node_hosts):
2345
nodes = []
2446
# todo change cluster.nodes to hash
@@ -27,17 +49,39 @@ def random_healthy_node(self, node_hosts):
2749
if len(nodes) == 0: return
2850
return random.choice(nodes)
2951

30-
def add(self, word, nodes):
31-
return self._write_word("POST", nodes, word)
32-
33-
def remove(self, word, nodes):
34-
return self._write_word("DELETE", nodes, word)
35-
36-
def _write_word(self, method, nodes, word):
37-
# TODO: proxy in parallel to <replication_factor> healthy nodes, not just one
38-
node = self.random_healthy_node(nodes)
39-
if not node: return False
40-
return self._proxy_request(method, f"{node}/api/v1/words", { "word": word })
52+
def _write_word(self, method, word):
53+
# TODO: keep tombstone (and check at coordinator if in local tombstone before sending unnecessary reqs)
54+
# TODO: this whole thing is gross
55+
preference_list = self.preference_list(word.word)
56+
if len(preference_list) == 0: return False
57+
no_failures = True
58+
quorum = int((self.config.replication_factor / 2) + 1) # todo: allow configuring quorum separately
59+
is_member = self.config.node_id in preference_list
60+
if self.config.node_id in preference_list:
61+
quorum -= 1
62+
no_failures = no_failures and self.local_write(method, word.word)
63+
64+
if quorum > 0 and no_failures and word.gossip:
65+
visited = set()
66+
if is_member:
67+
visited.add(self.config.node_id)
68+
# TODO: send in parallel, handle failures (try another node if one fails, sloppy quorum etc)
69+
# TODO: rollbacks
70+
for node in preference_list:
71+
if node in visited: continue
72+
if quorum <= 0: return True
73+
quorum -= 1
74+
self._proxy_request(method, f"{node}/api/v1/words", { "word": word.word, "gossip": False })
75+
return quorum <= 0 and no_failures
76+
77+
def local_write(self, method, word):
78+
match method:
79+
case "POST":
80+
return self.local_trie.add(word)
81+
case "DELETE":
82+
return self.local_trie.remove(word)
83+
case _:
84+
return False
4185

4286
def _proxy_request(self, method, path, word, data=None):
4387
try:

main.py

+19-19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Union
2-
from pydantic import BaseModel
2+
from pydantic import BaseModel, Field
33
from fastapi import FastAPI, APIRouter
44
from config import Config
55
from cluster import Cluster
@@ -9,7 +9,7 @@
99
from apscheduler.schedulers.background import BackgroundScheduler
1010
from apscheduler.triggers.interval import IntervalTrigger
1111
import base64
12-
from distributed_trie import DistributedTrie
12+
from sharded_trie import ShardedTrie
1313
from wal import Wal
1414
from checkpoint import Checkpoint
1515
from coordinator import Coordinator
@@ -18,19 +18,11 @@
1818
cluster = Cluster(config)
1919
rangemap = RangeMap(cluster, config)
2020
wal = Wal(config)
21-
coordinator = Coordinator(config, cluster, rangemap)
22-
distributed_trie = DistributedTrie(config, wal, rangemap, coordinator)
23-
trie = distributed_trie
21+
trie = ShardedTrie(config, wal) # TODO: rename to trieDB
22+
coordinator = Coordinator(config, cluster, rangemap, trie)
2423
checkpointer = Checkpoint(config, trie)
25-
if not checkpointer.load() and distributed_trie.replay_wal():
26-
trie.add("twitter")
27-
trie.add("twitch")
28-
trie.add("twilight")
29-
trie.add("twigs")
30-
trie.add("twig")
31-
trie.add("tough")
32-
trie.add("thought")
33-
24+
checkpointer.load()
25+
trie.replay_wal()
3426

3527
def node_healthchecks():
3628
print(f"Checking health of nodes at {datetime.now()}")
@@ -87,7 +79,7 @@ def prefix_to_nodes(q: str):
8779

8880
@app.get("/search")
8981
def search(q: str):
90-
return distributed_trie.matches(q)
82+
return coordinator.matches(q)
9183

9284
# API
9385

@@ -99,17 +91,25 @@ def search(q: str):
9991

10092
class Word(BaseModel):
10193
word: str
94+
gossip: bool = Field(default=True)
10295

10396
@api_router.post("/words")
10497
async def create_word(word: Word):
10598
# check added strings are in lexicon
106-
trie.add(word.word)
107-
return { "message": f"Added '{word.word}' to Trifecta db" }
99+
ok = coordinator.add(word)
100+
if ok:
101+
return { "message": f"Added '{word.word}' to Trifecta db" }
102+
else:
103+
return { "error": f"Did not add '{word.word}' to Trifecta db" }
108104

109105
@api_router.delete("/words")
110106
async def delete_word(word: Word):
111-
trie.remove(word.word)
112-
return { "message": f"Deleted '{word.word}' from Trifecta db" }
107+
ok = coordinator.remove(word)
108+
if ok:
109+
return { "message": f"Removed '{word.word}' to Trifecta db" }
110+
else:
111+
return { "error": f"Did not remove '{word.word}' to Trifecta db" }
112+
113113

114114
app.include_router(api_router)
115115

distributed_trie.py sharded_trie.py

+6-28
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,19 @@
55
from datetime import datetime
66
import threading
77

8-
class DistributedTrie:
9-
def __init__(self, config, wal, rangemap, coordinator):
8+
# TODO: Extract KV store from this, provide it as API for in-mem component
9+
class ShardedTrie:
10+
def __init__(self, config, wal):
1011
self.range_tries = {}
1112
self.config = config
1213
self.checkpoint_dir = config.current()["checkpoint_dir"]
1314
self.wal = wal
14-
self.rangemap = rangemap
15-
self.coordinator = coordinator
1615
# TODO: Lock per trie
1716
self.sem = threading.Semaphore()
1817

1918
def matches(self, query):
2019
k_factor = self.config.current()["k_factor"]
2120
prefix, rem = query[:k_factor], query[k_factor+1:]
22-
if len(prefix) != k_factor:
23-
# We don't support prefix search at or below the k-factor
24-
return []
25-
preference_list = self.rangemap.nodes_for_key(prefix)
26-
if self.config.node_id not in preference_list:
27-
print("Forwarding")
28-
return self.coordinator.matches(query, preference_list)
2921
if prefix not in self.range_tries:
3022
return []
3123
return self.range_tries[prefix].matches(query)
@@ -37,15 +29,8 @@ def add(self, word, commit_to_wal=True):
3729
if len(prefix) != k_factor:
3830
# we dont support words less than k factor
3931
return
40-
preference_list = self.rangemap.nodes_for_key(prefix)
41-
# TODO: Forward to other nodes if not already forwarding and replication_factor > 1
42-
# right now we *only* forward if we hit the wrong node (same for remove)
43-
if self.config.node_id not in preference_list and commit_to_wal:
44-
print("I'm not coordinator, forwarding")
45-
return self.coordinator.add(word, preference_list)
4632
self.sem.acquire()
47-
self.wal.commit("add", word)
48-
33+
if commit_to_wal: self.wal.commit("add", word)
4934
if prefix not in self.range_tries:
5035
self.range_tries[prefix] = Trie()
5136
trie = self.range_tries[prefix]
@@ -59,20 +44,13 @@ def remove(self, word, commit_to_wal=True):
5944
if len(prefix) != k_factor or prefix not in self.range_tries:
6045
# we dont support words less than k factor
6146
return
62-
preference_list = self.rangemap.nodes_for_key(prefix)
63-
if self.config.node_id not in preference_list and commit_to_wal:
64-
print("I'm not coordinator, forwarding")
65-
return self.coordinator.remove(word, preference_list)
6647
self.sem.acquire()
67-
self.wal.commit("remove", word)
48+
if commit_to_wal: self.wal.commit("remove", word)
6849
trie = self.range_tries[prefix]
6950
trie.remove(word)
7051
self.sem.release()
7152
return True
72-
73-
def is_coordinator_for_key(self, key):
74-
return self.config.node_id in self.rangemap.nodes_for_key(key)
75-
53+
7654
def load(self, prefix, trie_dict):
7755
trie = Trie()
7856
trie.load(trie_dict)

todo.md

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
# TODO
22

33
* Distributed writes
4-
* Writes should be forwarded to a shard owner
5-
* If no node is healthy, return an error
6-
* Get writes accepted by configured quorum before returning a result
7-
* Version tries (keep a timestamp of last write on keys and share timestamp)
4+
* Version tries (keep a timestamp of last write on keys and share timestamp)
85
* Last write wins
9-
* Send whole subtries in write forwards
10-
* Expected that client will handle locking etc. and user will configure trifecta for use case
6+
* Treat a subtrie as an object. Send whole trie in write forwards
117

128
---
139

@@ -29,6 +25,8 @@
2925
* Address todos
3026
* Logging + metrics + stats page
3127
* Limits and pagination
28+
* Parallel writes, aborts, hedging
29+
* Rollback writes if quorum fails
3230

3331
---------------------------
3432

@@ -43,4 +41,5 @@ Ideas
4341
1. A shard-aware client that skips one potential hop
4442
1. Namespacing and/or word tagging
4543
1. Sorting (popularity:desc, alphabetical:desc)
46-
1. Concurrent healthchecks / gossips
44+
1. Concurrent healthchecks / gossips
45+
1. Expected that client will handle locking etc. and user will configure trifecta for use case (i.e. writes to triedb should be done in sequence by clients), but maybe we can handle this for them (by versioning/tombstoning words, not tries)

wal.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def readlines(self):
2828
for walfile in globbed:
2929
with open(walfile, 'r') as f:
3030
for line in f.readlines():
31-
action, object = line.rstrip().split(" ")
31+
action, object = line.rstrip().split(" ", maxsplit=1)
3232
yield (action, object)
3333

3434
def rotate(self):

0 commit comments

Comments
 (0)