Skip to content

WIP: Discovery API #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
project(${TARGET_NAME})
include_directories(src/include duckdb/third_party/httplib duckdb/parquet/include)

set(EXTENSION_SOURCES src/httpserver_extension.cpp)
set(EXTENSION_SOURCES src/httpserver_extension.cpp src/discovery.cpp)

if(MINGW)
set(OPENSSL_USE_STATIC_LIBS TRUE)
Expand Down
71 changes: 71 additions & 0 deletions docs/DISCOVERY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## Discovery API

- [x] POST `/subscribe/{secretHash}` - Register a peer
- [x] GET `/discovery/{secretHash}` - Get list of active peers
- [x] DELETE `/unsubscribe/{secretHash}/{peerId}` - Remove peer

### Start Server
```sql
D SELECT httpserve_enable_discovery('true');
D SELECT httpserve_start('0.0.0.0',9999, '');
┌──────────────────────────────────────┐
│ httpserve_start('0.0.0.0', 9999, '') │
│ varchar │
├──────────────────────────────────────┤
│ HTTP server started on 0.0.0.0:9999 │
└──────────────────────────────────────┘
```

### Register a Peer under a secret hash
#### CURL
```bash
curl -X POST "https://localhost:9999/subscribe/secretHash" \
-H "Content-Type: application/json" \
-d '{ "name": "service1", "endpoint": "http://192.168.1.100:8080", "ttl": 300 }
```
#### SQL
```sql
INSTALL http_client FROM community; LOAD http_client; LOAD json;
WITH __input AS (
SELECT
http_post(
'http://localhost:9999/subscribe/secretHash',
headers => MAP {
},
params => MAP {
'name': 'quackpipe1',
'endpoint': 'https://1.1.1.1',
}
) AS res
) SELECT res->>'reason' as res, res->>'status' as status FROM __input;
```

### Check `peers` table
```sql
D SELECT name, endpoint, source_address as sourceAddress, peer_id as peerId, metadata, ttl, strftime(registered_at, '%Y-%m-%d %H:%M:%S') as registered_at FROM peers WHERE hash = 'secretHash';
┌──────────┬───────────────────────────┬───────────────┬──────────────────────────────────┬──────────┬───────┬─────────────────────┐
│ name │ endpoint │ sourceAddress │ peerId │ metadata │ ttl │ registered_at │
│ varchar │ varchar │ varchar │ varchar │ varchar │ int64 │ varchar │
├──────────┼───────────────────────────┼───────────────┼──────────────────────────────────┼──────────┼───────┼─────────────────────┤
│ service1 │ http://192.168.1.100:8080 │ xxx.xx.xx.xxx │ 0872c98634ce7e608e19aa1a1e6cf784 │ {} │ 300 │ 2024-11-14 19:44:23 │
└──────────┴───────────────────────────┴───────────────┴──────────────────────────────────┴──────────┴───────┴─────────────────────┘
```

### Discover Peers
#### CURL
```bash
curl "http://localhost:9999/discovery/secretHash"
```
#### SQL
```sql
D SELECT * FROM read_ndjson_auto('http://localhost:9999/discovery/secretHash');
┌──────────┬──────────────────────┬────────────────┬──────────────────────────────┬──────────┬─────────┬─────────────────────┐
│ name │ endpoint │ source_address │ peer_id │ metadata │ ttl │ registered_at │
│ varchar │ varchar │ varchar │ uuid │ varchar │ varchar │ timestamp │
├──────────┼──────────────────────┼────────────────┼──────────────────────────────┼──────────┼─────────┼─────────────────────┤
│ service1 │ http://192.168.1.1… │ 127.0.0.1 │ 0872c986-34ce-7e60-8e19-aa… │ │ 3600 │ 2024-11-15 14:13:50 │
└──────────┴──────────────────────┴────────────────┴──────────────────────────────┴──────────┴─────────┴─────────────────────┘
D
```

⚠️ minor issue with peer_id being a UUID and clients hating it
102 changes: 102 additions & 0 deletions src/discovery.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "discovery.hpp"
#include "mbedtls_wrapper.hpp"
#include "duckdb/common/common.hpp"
#include <chrono>
#include <iostream>

namespace duckdb {

// Static member definition
std::unique_ptr<PeerDiscovery> PeerDiscovery::instance = nullptr;

// Constructor implementation
PeerDiscovery::PeerDiscovery(DatabaseInstance& database) : db(database) {
initTables();
}

void PeerDiscovery::Initialize(DatabaseInstance& db) {
instance = std::unique_ptr<PeerDiscovery>(new PeerDiscovery(db));
}

void PeerDiscovery::initTables() {
Connection conn(db);
conn.Query("CREATE TABLE IF NOT EXISTS peers ("
"hash VARCHAR, peer_id VARCHAR, name VARCHAR, endpoint VARCHAR,"
"source_address VARCHAR, ttl BIGINT, metadata VARCHAR,"
"registered_at TIMESTAMP, PRIMARY KEY (hash, peer_id))");

// conn.Query("CREATE INDEX IF NOT EXISTS idx_peers_ttl ON peers(registered_at, ttl)");
}

std::string PeerDiscovery::generateDeterministicId(const std::string& name, const std::string& endpoint) {
std::string combined = name + ":" + endpoint;
hash_bytes hash;
duckdb_mbedtls::MbedTlsWrapper::ComputeSha256Hash(combined.c_str(), combined.length(), (char*)hash);

std::string result;
for (int i = 0; i < 16; i++) {
char buf[3];
snprintf(buf, sizeof(buf), "%02x", hash[i]);
result += buf;
}
return result;
}


void PeerDiscovery::registerPeer(const std::string& hash, const PeerData& data) {
Connection conn(db);
std::string peerId = generateDeterministicId(data.name, data.endpoint);

auto stmt = conn.Prepare(
"INSERT OR REPLACE INTO peers (hash, peer_id, name, endpoint, source_address, ttl, metadata, registered_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP)");

vector<Value> params;
params.push_back(Value(hash));
params.push_back(Value(peerId));
params.push_back(Value(data.name));
params.push_back(Value(data.endpoint));
params.push_back(Value(data.sourceAddress));
params.push_back(Value::BIGINT(data.ttl)); /* needs stoi(data.ttl) for dumb clients? */
params.push_back(Value(data.metadata));

stmt->Execute(params);
}

std::unique_ptr<MaterializedQueryResult> PeerDiscovery::getPeers(const std::string& hash, bool ndjson) {
Connection conn(db);

// Prepare the statement
auto stmt = conn.Prepare(
"SELECT name, endpoint, source_address, "
"peer_id, metadata, ttl, "
"strftime(registered_at, '%Y-%m-%d %H:%M:%S') AS registered_at "
"FROM peers WHERE hash = $1 AND EXTRACT(EPOCH FROM age(now()::TIMESTAMP, registered_at)) <= ttl");

// Execute the statement with the parameter
vector<Value> params = {Value(hash)};
auto raw_result = stmt->Execute(params);
auto result = raw_result->Cast<StreamQueryResult>().Materialize();

return std::move(result);
}

void PeerDiscovery::removePeer(const std::string& hash, const std::string& peerId) {
Connection conn(db);
auto stmt = conn.Prepare("DELETE FROM peers WHERE hash = $1 AND peer_id = $2");
stmt->Execute(hash, peerId);
}

void PeerDiscovery::updateHeartbeat(const std::string& hash, const std::string& peerId) {
Connection conn(db);
auto stmt = conn.Prepare("UPDATE peers SET registered_at = CURRENT_TIMESTAMP "
"WHERE hash = $1 AND peer_id = $2");
stmt->Execute(hash, peerId);
}

void PeerDiscovery::cleanupExpired() {
Connection conn(db);
conn.Query("DELETE FROM peers WHERE EXTRACT(EPOCH FROM age(now()::TIMESTAMP, registered_at)) <= ttl");
}

} // namespace duckdb
Loading
Loading