Skip to content

kv: iterator support #629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 153 additions & 2 deletions kinode/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use lib::types::core::{
PackageId, PrintSender, Printout, ProcessId, Request, Response, FD_MANAGER_PROCESS_ID,
KV_PROCESS_ID,
};
use rand::random;
use rocksdb::OptimisticTransactionDB;
use std::{
collections::{HashMap, VecDeque},
Expand All @@ -24,6 +25,8 @@ struct KvState {
/// access order of dbs, used to cull if we hit the fds limit
access_order: Arc<Mutex<UniqueQueue<(PackageId, String)>>>,
txs: Arc<DashMap<u64, Vec<(KvAction, Option<Vec<u8>>)>>>,
/// track active iterators: (package_id, db_name, iterator_id) -> (prefix, current_key)
iterators: Arc<DashMap<(PackageId, String, u64), (Vec<u8>, Vec<u8>)>>,
fds_limit: u64,
}

Expand All @@ -42,6 +45,7 @@ impl KvState {
open_kvs: Arc::new(DashMap::new()),
access_order: Arc::new(Mutex::new(UniqueQueue::new())),
txs: Arc::new(DashMap::new()),
iterators: Arc::new(DashMap::new()),
fds_limit: 10,
}
}
Expand Down Expand Up @@ -98,6 +102,117 @@ impl KvState {
self.remove_db(key.0, key.1).await;
}
}

async fn handle_iter_start(
&mut self,
package_id: PackageId,
db: String,
prefix: Option<Vec<u8>>,
) -> Result<u64, KvError> {
// Ensure DB exists
let db_key = (package_id.clone(), db.clone());
if !self.open_kvs.contains_key(&db_key) {
return Err(KvError::NoDb);
}

// Generate unique iterator ID
let mut iterator_id = random::<u64>();
while self
.iterators
.contains_key(&(package_id.clone(), db.clone(), iterator_id))
{
iterator_id = random::<u64>();
}

// Store initial state: (prefix, current_key)
self.iterators.insert(
(package_id, db, iterator_id),
(prefix.unwrap_or_default(), Vec::new()),
);

Ok(iterator_id)
}

async fn handle_iter_next(
&mut self,
package_id: PackageId,
db: String,
iterator_id: u64,
count: u64,
) -> Result<(Vec<(Vec<u8>, Vec<u8>)>, bool), KvError> {
let iter_key = (package_id.clone(), db.clone(), iterator_id);
let db_key = (package_id, db);

// Get DB and iterator state
let db = self.open_kvs.get(&db_key).ok_or(KvError::NoDb)?;
let (prefix, current_key) = self
.iterators
.get(&iter_key)
.ok_or(KvError::NoIterator)?
.clone();

let mut entries = Vec::new();

// create iterator based on whether we're doing prefix iteration
let mut iter = if !prefix.is_empty() {
if !current_key.is_empty() {
db.iterator(rocksdb::IteratorMode::From(
&current_key,
rocksdb::Direction::Forward,
))
} else {
db.prefix_iterator(&prefix)
}
} else {
if !current_key.is_empty() {
db.iterator(rocksdb::IteratorMode::From(
&current_key,
rocksdb::Direction::Forward,
))
} else {
db.iterator(rocksdb::IteratorMode::Start)
}
};

let mut items_collected = 0;

// collect entries until we hit our batch size
while let Some(Ok((key, value))) = iter.next() {
let key_vec = key.to_vec();
// if we have a prefix, check that the key still starts with it
if !prefix.is_empty() {
if key_vec.len() < prefix.len() || !key_vec.starts_with(&prefix) {
// we've moved past our prefix range, we're done
self.iterators.remove(&iter_key);
return Ok((entries, true));
}
}

entries.push((key_vec.clone(), value.to_vec()));
items_collected += 1;

if items_collected >= count {
// not done, save last key for next batch
self.iterators.insert(iter_key, (prefix, key_vec));
return Ok((entries, false));
}
}

// no more entries, clean up iterator state
self.iterators.remove(&iter_key);
Ok((entries, true))
}

async fn handle_iter_close(
&mut self,
package_id: PackageId,
db: String,
iterator_id: u64,
) -> Result<(), KvError> {
let iter_key = (package_id, db, iterator_id);
self.iterators.remove(&iter_key);
Ok(())
}
}

pub async fn kv(
Expand Down Expand Up @@ -222,8 +337,8 @@ async fn handle_request(

let request: KvRequest = match serde_json::from_slice(&body) {
Ok(r) => r,
Err(e) => {
println!("kv: got invalid Request: {}", e);
Err(_e) => {
// println!("kv: got invalid Request: {}", e);
return Err(KvError::InputError {
error: "didn't serialize to KvAction.".into(),
});
Expand Down Expand Up @@ -379,6 +494,39 @@ async fn handle_request(
}
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
KvAction::IterStart { prefix } => {
let iterator_id = state
.handle_iter_start(
request.package_id.clone(),
request.db.clone(),
prefix.clone(),
)
.await?;
(
serde_json::to_vec(&KvResponse::IterStart { iterator_id }).unwrap(),
None,
)
}
KvAction::IterNext { iterator_id, count } => {
let (entries, done) = state
.handle_iter_next(
request.package_id.clone(),
request.db.clone(),
*iterator_id,
*count,
)
.await?;
(
serde_json::to_vec(&KvResponse::IterNext { done }).unwrap(),
Some(serde_json::to_vec(&entries).unwrap()),
)
}
KvAction::IterClose { iterator_id } => {
state
.handle_iter_close(request.package_id.clone(), request.db.clone(), *iterator_id)
.await?;
(serde_json::to_vec(&KvResponse::Ok).unwrap(), None)
}
};

if let Some(target) = km.rsvp.or_else(|| expects_response.map(|_| source)) {
Expand Down Expand Up @@ -534,6 +682,9 @@ async fn check_caps(
Ok(())
}
KvAction::Backup { .. } => Ok(()),
KvAction::IterStart { .. } => Ok(()),
KvAction::IterNext { .. } => Ok(()),
KvAction::IterClose { .. } => Ok(()),
}
}

Expand Down
120 changes: 112 additions & 8 deletions lib/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,153 @@ use crate::types::core::{CapMessage, PackageId};
use serde::{Deserialize, Serialize};
use thiserror::Error;

/// IPC Request format for the kv:distro:sys runtime module.
/// Actions are sent to a specific key value database, `db` is the name,
/// `package_id` is the [`PackageId`]. Capabilities are checked, you can access another process's
/// database if it has given you the [`crate::Capability`].
#[derive(Debug, Serialize, Deserialize)]
pub struct KvRequest {
pub package_id: PackageId,
pub db: String,
pub action: KvAction,
}

/// IPC Action format, representing operations that can be performed on the key-value runtime module.
/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum KvAction {
/// Opens an existing key-value database or creates a new one if it doesn't exist.
Open,
/// Permanently deletes the entire key-value database.
RemoveDb,
/// Sets a value for the specified key in the database.
///
/// # Parameters
/// * `key` - The key as a byte vector
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
Set { key: Vec<u8>, tx_id: Option<u64> },
/// Deletes a key-value pair from the database.
///
/// # Parameters
/// * `key` - The key to delete as a byte vector
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
Delete { key: Vec<u8>, tx_id: Option<u64> },
/// Retrieves the value associated with the specified key.
///
/// # Parameters
/// * `key` - The key to look up as a byte vector
Get { key: Vec<u8> },
/// Begins a new transaction for atomic operations.
BeginTx,
/// Commits all operations in the specified transaction.
///
/// # Parameters
/// * `tx_id` - The ID of the transaction to commit
Commit { tx_id: u64 },
/// Creates a backup of the database.
Backup,
/// Starts an iterator over the database contents.
///
/// # Parameters
/// * `prefix` - Optional byte vector to filter keys by prefix
IterStart { prefix: Option<Vec<u8>> },
/// Advances the iterator and returns the next batch of items.
///
/// # Parameters
/// * `iterator_id` - The ID of the iterator to advance
/// * `count` - Maximum number of items to return
IterNext { iterator_id: u64, count: u64 },
/// Closes an active iterator.
///
/// # Parameters
/// * `iterator_id` - The ID of the iterator to close
IterClose { iterator_id: u64 },
}

/// Response types for key-value store operations.
/// These responses are returned after processing a KvAction request.
#[derive(Debug, Serialize, Deserialize)]
pub enum KvResponse {
/// Indicates successful completion of an operation.
Ok,
/// Returns the transaction ID for a newly created transaction.
///
/// # Fields
/// * `tx_id` - The ID of the newly created transaction
BeginTx { tx_id: u64 },
/// Returns the key that was retrieved from the database.
///
/// # Fields
/// * `key` - The retrieved key as a byte vector
Get { key: Vec<u8> },
/// Indicates an error occurred during the operation.
///
/// # Fields
/// * `error` - The specific error that occurred
Err { error: KvError },
/// Returns the ID of a newly created iterator.
///
/// # Fields
/// * `iterator_id` - The ID of the created iterator
IterStart { iterator_id: u64 },
/// Indicates whether the iterator has more items.
///
/// # Fields
/// * `done` - True if there are no more items to iterate over
IterNext { done: bool },
/// Confirms the closure of an iterator.
///
/// # Fields
/// * `iterator_id` - The ID of the closed iterator
IterClose { iterator_id: u64 },
}

/// Errors that can occur during key-value store operations.
/// These errors are returned as part of `KvResponse::Err` when an operation fails.
#[derive(Debug, Serialize, Deserialize, Error)]
pub enum KvError {
#[error("DbDoesNotExist")]
/// The requested database does not exist.
#[error("Database does not exist")]
NoDb,
#[error("KeyNotFound")]

/// The requested key was not found in the database.
#[error("Key not found in database")]
KeyNotFound,
#[error("no Tx found")]

/// No active transaction found for the given transaction ID.
#[error("Transaction not found")]
NoTx,
#[error("No capability: {error}")]

/// The specified iterator was not found.
#[error("Iterator not found")]
NoIterator,

/// The operation requires capabilities that the caller doesn't have.
///
/// # Fields
/// * `error` - Description of the missing capability or permission
#[error("Missing required capability: {error}")]
NoCap { error: String },
#[error("rocksdb internal error: {error}")]

/// An internal RocksDB error occurred during the operation.
///
/// # Fields
/// * `action` - The operation that was being performed
/// * `error` - The specific error message from RocksDB
#[error("RocksDB error during {action}: {error}")]
RocksDBError { action: String, error: String },
#[error("input bytes/json/key error: {error}")]

/// Error parsing or processing input data.
///
/// # Fields
/// * `error` - Description of what was invalid about the input
#[error("Invalid input: {error}")]
InputError { error: String },
#[error("IO error: {error}")]

/// An I/O error occurred during the operation.
///
/// # Fields
/// * `error` - Description of the I/O error
#[error("I/O error: {error}")]
IOError { error: String },
}

Expand Down
Loading