Skip to content

Commit

Permalink
feat(pubky): add get()
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Jul 23, 2024
1 parent 62cc13b commit 8cf18a3
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 90 deletions.
4 changes: 1 addition & 3 deletions pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use heed::{
BoxedError, BytesDecode, BytesEncode, Database,
};

use pubky_common::crypto::Hash;

/// hash of the blob => bytes.
pub type BlobsTable = Database<Hash, Bytes>;
pub type BlobsTable = Database<Bytes, Bytes>;

pub const BLOBS_TABLE: &str = "blobs";
23 changes: 20 additions & 3 deletions pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use heed::{
BoxedError, BytesDecode, BytesEncode, Database,
};

use pubky_common::crypto::Hash;
use pubky_common::{crypto::Hash, timestamp::Timestamp};

/// full_path(pubky/*path) => Entry.
pub type EntriesTable = Database<Hash, Entry>;
pub type EntriesTable = Database<Bytes, Bytes>;

pub const ENTRIES_TABLE: &str = "entries";

Expand All @@ -30,7 +30,10 @@ pub struct Entry {

impl Entry {
pub fn new() -> Self {
Default::default()
Self {
timestamp: Timestamp::now().into_inner(),
..Default::default()
}
}

// === Setters ===
Expand Down Expand Up @@ -63,4 +66,18 @@ impl Entry {
pub fn content_type(&self) -> &str {
&self.content_type
}

// === Public Method ===

pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
}

pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
if bytes[0] > 0 {
panic!("Unknown Entry version");
}

Ok(from_bytes(bytes)?)
}
}
40 changes: 28 additions & 12 deletions pubky-homeserver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,49 @@ impl IntoResponse for Error {

impl From<QueryRejection> for Error {
fn from(error: QueryRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}

impl From<ExtensionRejection> for Error {
fn from(error: ExtensionRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}

impl From<PathRejection> for Error {
fn from(error: PathRejection) -> Self {
Self::new(StatusCode::BAD_REQUEST, Some(error))
Self::new(StatusCode::BAD_REQUEST, error.into())
}
}

impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, Some(error))
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

impl From<heed::Error> for Error {
fn from(error: heed::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

impl From<anyhow::Error> for Error {
fn from(error: anyhow::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

impl From<postcard::Error> for Error {
fn from(error: postcard::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

impl From<axum::Error> for Error {
fn from(error: axum::Error) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
}
}

Expand All @@ -89,11 +113,3 @@ impl From<pkarr::Error> for Error {
Self::new(StatusCode::BAD_REQUEST, Some(error))
}
}

impl From<heed::Error> for Error {
fn from(error: heed::Error) -> Self {
debug!(?error);

Self::with_status(StatusCode::INTERNAL_SERVER_ERROR)
}
}
29 changes: 29 additions & 0 deletions pubky-homeserver/src/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,32 @@ where
Ok(Pubky(public_key))
}
}

pub struct EntryPath(pub(crate) String);

impl EntryPath {
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
}

#[async_trait]
impl<S> FromRequestParts<S> for EntryPath
where
S: Send + Sync,
{
type Rejection = Response;

async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let params: Path<HashMap<String, String>> =
parts.extract().await.map_err(IntoResponse::into_response)?;

// TODO: enforce path limits like no trailing '/'

let path = params
.get("path")
.ok_or_else(|| (StatusCode::NOT_FOUND, "entry path missing").into_response())?;

Ok(EntryPath(path.to_string()))
}
}
5 changes: 3 additions & 2 deletions pubky-homeserver/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tower_http::trace::TraceLayer;
use crate::server::AppState;

mod auth;
mod drive;
mod public;
mod root;

pub fn create_app(state: AppState) -> Router {
Expand All @@ -19,7 +19,8 @@ pub fn create_app(state: AppState) -> Router {
.route("/:pubky/session", get(auth::session))
.route("/:pubky/session", post(auth::signin))
.route("/:pubky/session", delete(auth::signout))
.route("/:pubky/*key", put(drive::put))
.route("/:pubky/*path", put(public::put))
.route("/:pubky/*path", get(public::get))
.layer(TraceLayer::new_for_http())
.layer(CookieManagerLayer::new())
// TODO: revisit if we enable streaming big payloads
Expand Down
63 changes: 0 additions & 63 deletions pubky-homeserver/src/routes/drive.rs

This file was deleted.

149 changes: 149 additions & 0 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use axum::{
body::{Body, Bytes},
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
RequestExt, Router,
};
use axum_extra::body::AsyncReadBody;
use futures_util::stream::StreamExt;

use tracing::debug;

use pubky_common::crypto::Hasher;

use crate::{
database::tables::{
blobs::{BlobsTable, BLOBS_TABLE},
entries::{EntriesTable, Entry, ENTRIES_TABLE},
},
error::{Error, Result},
extractors::{EntryPath, Pubky},
server::AppState,
};

pub async fn put(
State(state): State<AppState>,
pubky: Pubky,
path: EntryPath,
mut body: Body,
) -> Result<impl IntoResponse> {
// TODO: return an error if path does not start with '/pub/'

let mut stream = body.into_data_stream();

let (tx, rx) = flume::bounded::<Bytes>(1);

// TODO: refactor Database to clean up this scope.
let done = tokio::task::spawn_blocking(move || -> Result<()> {
// TODO: this is a blocking operation, which is ok for small
// payloads (we have 16 kb limit for now) but later we need
// to stream this to filesystem, and keep track of any failed
// writes to GC these files later.

let public_key = pubky.public_key();

// TODO: Authorize

let mut wtxn = state.db.env.write_txn()?;
let blobs: BlobsTable = state
.db
.env
.open_database(&wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");

let entries: EntriesTable = state
.db
.env
.open_database(&wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");

let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;

while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}

let hash = hasher.finalize();

blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;

let mut entry = Entry::new();

entry.set_content_hash(hash);
entry.set_content_length(length);

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

entries.put(&mut wtxn, &key, &entry.serialize());

Ok(())
});

while let Some(next) = stream.next().await {
let chunk = next?;

tx.send(chunk);
}

drop(tx);
done.await.expect("join error")?;

// TODO: return relevant headers, like Etag?

Ok(())
}

pub async fn get(
State(state): State<AppState>,
pubky: Pubky,
path: EntryPath,
) -> Result<impl IntoResponse> {
// TODO: check the path, return an error if doesn't start with `/pub/`

// TODO: Enable streaming

let public_key = pubky.public_key();

let mut rtxn = state.db.env.read_txn()?;

let entries: EntriesTable = state
.db
.env
.open_database(&rtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");

let blobs: BlobsTable = state
.db
.env
.open_database(&rtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");

let mut count = 0;

for x in entries.iter(&rtxn)? {
count += 1
}

return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into()));

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

if let Some(bytes) = entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? {
return Ok(blob.to_vec());
};
};

Err(Error::new(StatusCode::NOT_FOUND, path.0.into()))
}
1 change: 1 addition & 0 deletions pubky/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ureq = { version = "2.10.0", features = ["cookies"] }
thiserror = "1.0.62"
url = "2.5.2"
flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
bytes = "1.6.1"

[dev-dependencies]
pubky_homeserver = { path = "../pubky-homeserver" }
Expand Down
Loading

0 comments on commit 8cf18a3

Please sign in to comment.