Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ defmodule Cron do
interval: :timer.hours(24),
fun: &Model.KademliaSql.clear_invalid_objects/0
},
%Job{
name: "Ticket GC",
interval: :timer.hours(1),
fun: &Model.KademliaSql.prune_stale_objects/0
},
%Job{
name: "Cleanup LRU cache",
interval: :timer.hours(1),
Expand Down
60 changes: 52 additions & 8 deletions lib/kademlia_light.ex
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,56 @@ defmodule KademliaLight do
end

def network() do
call(fn _from, state -> {:reply, state.network, state} end)
network = GenServer.call(__MODULE__, :network)

Debouncer.immediate(
{__MODULE__, :ensure_network_integrity},
fn -> ensure_network_integrity(GenServer.call(__MODULE__, :network)) end,
60_000
)

network
end

defp ensure_network_integrity(network) do
missing =
KBuckets.to_list(network)
|> Enum.reject(&KBuckets.is_self/1)
|> Enum.filter(fn item ->
KademliaSql.object(KBuckets.key(item)) == nil
end)

if not Enum.empty?(missing) do
missing_ids = Enum.map(missing, &KBuckets.key/1)

"KademliaLight network missing #{length(missing)} objects; clearing cached table: #{inspect(missing_ids)}"
|> Logger.error()

drop_nodes(missing_ids)
end
end

@impl true
def handle_call(:reset, _from, _state) do
{:reply, :ok, %KademliaLight{network: KBuckets.new()}}
end

def handle_call({:drop_nodes, keys}, _from, state = %KademliaLight{network: network}) do
network =
Enum.reduce(keys, network, fn key, acc ->
case KBuckets.item(acc, key) do
nil -> acc
item -> KBuckets.delete_item(acc, item)
end
end)

{:reply, :ok, %{state | network: network}}
end

def handle_call(:network, _from, state) do
{:reply, state.network, state}
end

def handle_call({:call, fun}, from, state) do
fun.(from, state)
end
Expand Down Expand Up @@ -256,6 +302,10 @@ defmodule KademliaLight do
GenServer.cast(__MODULE__, {:register_node, node_id})
end

def drop_nodes(keys) when is_list(keys) do
GenServer.call(__MODULE__, {:drop_nodes, keys}, 60_000)
end

# Private call used by PeerHandlerV2 when connections are established
@impl true
def handle_cast({:register_node, node_id}, state) do
Expand Down Expand Up @@ -496,9 +546,7 @@ defmodule KademliaLight do

@doc "Method used for testing"
def reset() do
call(fn _from, _state ->
{:reply, :ok, %KademliaLight{network: KBuckets.new()}}
end)
GenServer.call(__MODULE__, :reset)
end

def clean() do
Expand Down Expand Up @@ -587,10 +635,6 @@ defmodule KademliaLight do
end
end

defp call(fun) do
GenServer.call(__MODULE__, {:call, fun})
end

def hash(binary) do
Diode.hash(binary)
end
Expand Down
85 changes: 76 additions & 9 deletions lib/model/kademliasql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
defmodule Model.KademliaSql do
alias RemoteChain.RPCCache
alias Model.Sql
alias DiodeClient.{Object}
import DiodeClient.Object.TicketV2
alias DiodeClient.Object
alias KBuckets
import DiodeClient.Object.TicketV2, only: :macros
require Logger

@stale_prune_seconds 246_060

def query!(sql, params \\ []) do
Sql.query!(__MODULE__, sql, params)
end
Expand All @@ -16,9 +19,24 @@ defmodule Model.KademliaSql do
query!("""
CREATE TABLE IF NOT EXISTS p2p_objects (
key BLOB PRIMARY KEY,
object BLOB
object BLOB,
stored_at INTEGER NOT NULL
)
""")

ensure_stored_at_column()
end

defp ensure_stored_at_column() do
unless has_column?("stored_at") do
query!("ALTER TABLE p2p_objects ADD COLUMN stored_at INTEGER NOT NULL DEFAULT 0")
query!("UPDATE p2p_objects SET stored_at = strftime('%s','now') WHERE stored_at = 0")
end
end

defp has_column?(column) do
query!("PRAGMA table_info(p2p_objects)")
|> Enum.any?(fn [_, name, _type, _notnull, _default, _pk] -> name == column end)
end

def clear() do
Expand Down Expand Up @@ -65,23 +83,30 @@ defmodule Model.KademliaSql do

def put_object(key, object) do
object = BertInt.encode!(object)
query!("REPLACE INTO p2p_objects (key, object) VALUES(?1, ?2)", [key, object])

query!(
"REPLACE INTO p2p_objects (key, object, stored_at) VALUES(?1, ?2, ?3)",
[key, object, now_seconds()]
)
end

def delete_object(key) do
query!("DELETE FROM p2p_objects WHERE key = ?1", [key])
end

def object(key) do
Sql.fetch!(__MODULE__, "SELECT object FROM p2p_objects WHERE key = ?1", key)
case Sql.query!(__MODULE__, "SELECT object FROM p2p_objects WHERE key = ?1", [key]) do
[[object_blob]] -> BertInt.decode!(object_blob)
[] -> nil
end
end

def scan() do
query!("SELECT key, object FROM p2p_objects")
|> Enum.map(fn [key, obj] ->
obj = BertInt.decode!(obj) |> Object.decode!()
{key, obj}
|> Enum.reduce([], fn [key, object_blob], acc ->
[{key, Object.decode!(BertInt.decode!(object_blob))} | acc]
end)
|> Enum.reverse()
end

@spec objects(integer, integer) :: any
Expand All @@ -100,7 +125,10 @@ defmodule Model.KademliaSql do
[bstart, bend]
)
end
|> Enum.map(fn [key, obj] -> {key, BertInt.decode!(obj)} end)
|> Enum.reduce([], fn [key, object_blob], acc ->
[{key, BertInt.decode!(object_blob)} | acc]
end)
|> Enum.reverse()
end

defp await(chain) do
Expand All @@ -115,6 +143,45 @@ defmodule Model.KademliaSql do
end
end

def prune_stale_objects() do
cutoff = now_seconds() - @stale_prune_seconds

removed = prune_stale_chunk(cutoff, 0)

if removed > 0 do
Logger.info("Removed #{removed} stale ticket objects")
end

removed
end

defp prune_stale_chunk(cutoff, acc) do
self = KBuckets.key(Diode.wallet())

stale_keys =
query!("SELECT key FROM p2p_objects WHERE stored_at < ?1 AND key != ?2 LIMIT 100", [
cutoff,
self
])
|> Enum.map(&hd/1)

case stale_keys do
[] ->
acc

keys ->
KademliaLight.drop_nodes(keys)

for key <- keys do
query!("DELETE FROM p2p_objects WHERE key = ?1", [key])
end

prune_stale_chunk(cutoff, acc + length(keys))
end
end

defp now_seconds(), do: System.os_time(:second)

def clear_invalid_objects() do
await(Chains.Diode)
await(Chains.Moonbeam)
Expand Down
1 change: 1 addition & 0 deletions lib/network/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ defmodule Network.Rpc do
retries: encode16(item.retries)
}
end)
|> Enum.reverse()
|> Enum.concat(connected)
|> Enum.reduce(%{}, fn item, acc ->
Map.put_new(acc, item.node_id, item)
Expand Down
45 changes: 45 additions & 0 deletions test/object_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
defmodule ObjectTest do
use ExUnit.Case, async: true
import TestHelper
import DiodeClient.Object.TicketV2, only: [ticketv2: 1]
alias Object.Server

# Testing forward compatibility of server tickets
Expand Down Expand Up @@ -63,4 +64,48 @@ defmodule ObjectTest do
assert Object.decode!(Map.get(map, key)) == object
end
end

test "ticket gc keeps recent rows" do
key = <<1::256>>
persist_ticket(key)

ttl = 246_060
recent = System.os_time(:second) - ttl + 10

Model.KademliaSql.query!("UPDATE p2p_objects SET stored_at = ?1 WHERE key = ?2", [recent, key])

assert Model.KademliaSql.prune_stale_objects() == 0
assert Model.KademliaSql.object(key) != nil
end

test "ticket gc removes stale rows" do
key = <<2::256>>
persist_ticket(key)

ttl = 246_060
old = System.os_time(:second) - ttl - 10
Model.KademliaSql.query!("UPDATE p2p_objects SET stored_at = ?1 WHERE key = ?2", [old, key])

assert Model.KademliaSql.prune_stale_objects() == 1
assert Model.KademliaSql.object(key) == nil
end

defp persist_ticket(key, opts \\ []) do
ticket =
ticketv2(
chain_id: chain().chain_id(),
server_id: Wallet.address!(Diode.wallet()),
total_connections: 1,
total_bytes: Keyword.get(opts, :total_bytes, 1),
local_address: "foo",
epoch: Keyword.get(opts, :epoch, epoch()),
fleet_contract: developer_fleet_address(),
device_signature: <<0::520>>,
server_signature: <<0::520>>
)
|> Object.encode!()

Model.KademliaSql.put_object(key, ticket)
key
end
end