Skip to content

messages::serialize: disable compression for tx updates #2904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::db::relational_db::RelationalDB;
use spacetimedb::error::DBError;
use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
Expand All @@ -9,7 +10,6 @@ use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::subscription::tx::DeltaTx;
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_execution::pipelined::PipelinedProject;
Expand Down Expand Up @@ -151,14 +151,7 @@ fn eval(c: &mut Criterion) {
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query: ExecutionSet = query.into();

b.iter(|| {
drop(black_box(query.eval::<BsatnFormat>(
&raw.db,
&tx,
None,
Compression::None,
)))
})
b.iter(|| drop(black_box(query.eval::<BsatnFormat>(&raw.db, &tx, None))))
});
};

Expand Down
40 changes: 12 additions & 28 deletions crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ pub trait WebsocketFormat: Sized {
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;

/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
/// This allows some formats to e.g., compress the update.
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
///
/// We don't compress individual table updates anymore for any format.
/// Previously we did, but the benefits, if any, were unclear.
/// Note that each message is still compressed before being sent to clients,
/// but we no longer have to hold a tx lock when doing so.
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate;
}

/// Messages sent from the client to the server.
Expand Down Expand Up @@ -770,7 +774,7 @@ impl WebsocketFormat for JsonFormat {

type QueryUpdate = QueryUpdate<Self>;

fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate {
qu
}
}
Expand Down Expand Up @@ -813,24 +817,8 @@ impl WebsocketFormat for BsatnFormat {

type QueryUpdate = CompressableQueryUpdate<Self>;

fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();

match decide_compression(qu_len_would_have_been, compression) {
Compression::None => CompressableQueryUpdate::Uncompressed(qu),
Compression::Brotli => {
let bytes = bsatn::to_vec(&qu).unwrap();
let mut out = Vec::new();
brotli_compress(&bytes, &mut out);
CompressableQueryUpdate::Brotli(out.into())
}
Compression::Gzip => {
let bytes = bsatn::to_vec(&qu).unwrap();
let mut out = Vec::new();
gzip_compress(&bytes, &mut out);
CompressableQueryUpdate::Gzip(out.into())
}
}
fn into_query_update(qu: QueryUpdate<Self>) -> Self::QueryUpdate {
CompressableQueryUpdate::Uncompressed(qu)
}
}

Expand All @@ -846,13 +834,9 @@ pub enum Compression {
Gzip,
}

pub fn decide_compression(len: usize, compression: Compression) -> Compression {
/// The threshold beyond which we start to compress messages.
/// 1KiB was chosen without measurement.
/// TODO(perf): measure!
const COMPRESS_THRESHOLD: usize = 1024;

if len > COMPRESS_THRESHOLD {
/// Based on the `len` of a message and a `threshold`, potentially clamp `compression` to `None`.
pub fn decide_compression(len: usize, threshold: usize, compression: Compression) -> Compression {
if len > threshold {
compression
} else {
Compression::None
Expand Down
26 changes: 24 additions & 2 deletions crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use bytestring::ByteString;
use derive_more::From;
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, Compression, FormatSwitch, JsonFormat, OneOffTable, RowListLen, WebsocketFormat,
BsatnFormat, Compression, FormatSwitch, JsonFormat, OneOffTable, RowListLen, ServerMessage, WebsocketFormat,
SERVER_MSG_COMPRESSION_TAG_BROTLI, SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE,
};
use spacetimedb_lib::identity::RequestId;
Expand Down Expand Up @@ -147,8 +147,30 @@ pub fn serialize(
bsatn::to_writer(w.into_inner(), &msg).unwrap()
});

/// The threshold beyond which we start to compress messages for non-update messages.
/// This is 1 KiB currently.
const COMPRESS_THRESHOLD_OTHER: usize = 1024;
/// The threshold beyond which we start to compress messages for update messages.
/// This is 4 KiB currently.
const COMPRESS_THRESHOLD_UPDATE: usize = usize::MAX;
// Route to the correct compression threshold.
let threshold = match msg {
ServerMessage::TransactionUpdate(_) | ServerMessage::TransactionUpdateLight(_) => {
COMPRESS_THRESHOLD_UPDATE
}

ServerMessage::InitialSubscription(_)
| ServerMessage::IdentityToken(_)
| ServerMessage::OneOffQueryResponse(_)
| ServerMessage::SubscribeApplied(_)
| ServerMessage::UnsubscribeApplied(_)
| ServerMessage::SubscriptionError(_)
| ServerMessage::SubscribeMultiApplied(_)
| ServerMessage::UnsubscribeMultiApplied(_) => COMPRESS_THRESHOLD_OTHER,
};

// Conditionally compress the message.
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), threshold, config.compression) {
Compression::None => buffer.uncompressed(),
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),
Expand Down
8 changes: 2 additions & 6 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use derive_more::From;
use indexmap::IndexSet;
use itertools::Itertools;
use prometheus::{Histogram, IntGauge};
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
use spacetimedb_client_api_messages::websocket::{ByteListLen, OneOffTable, QueryUpdate, WebsocketFormat};
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_execution::pipelined::PipelinedProject;
Expand Down Expand Up @@ -139,11 +139,7 @@ impl UpdatesRelValue<'_> {
let num_rows = nr_del + nr_ins;
let num_bytes = deletes.num_bytes() + inserts.num_bytes();
let qu = QueryUpdate { deletes, inserts };
// We don't compress individual table updates.
// Previously we were, but the benefits, if any, were unclear.
// Note, each message is still compressed before being sent to clients,
// but we no longer have to hold a tx lock when doing so.
let cqu = F::into_query_update(qu, Compression::None);
let cqu = F::into_query_update(qu);
(cqu, num_rows, num_bytes)
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/subscription/execution_unit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateRelValue,
use crate::messages::websocket::TableUpdate;
use crate::util::slow::SlowQueryLogger;
use crate::vm::{build_query, TxMode};
use spacetimedb_client_api_messages::websocket::{
Compression, QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat,
};
use spacetimedb_client_api_messages::websocket::{QueryUpdate, RowListLen as _, SingleQueryUpdate, WebsocketFormat};
use spacetimedb_lib::db::error::AuthError;
use spacetimedb_lib::relation::DbTable;
use spacetimedb_lib::{Identity, ProductValue};
Expand Down Expand Up @@ -242,7 +240,6 @@ impl ExecutionUnit {
tx: &Tx,
sql: &str,
slow_query_threshold: Option<Duration>,
compression: Compression,
) -> Option<TableUpdate<F>> {
let _slow_query = SlowQueryLogger::new(sql, slow_query_threshold, tx.ctx.workload()).log_guard();

Expand All @@ -255,7 +252,7 @@ impl ExecutionUnit {
(!inserts.is_empty()).then(|| {
let deletes = F::List::default();
let qu = QueryUpdate { deletes, inserts };
let update = F::into_query_update(qu, compression);
let update = F::into_query_update(qu);
TableUpdate::new(
self.return_table(),
self.return_name(),
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use module_subscription_manager::Plan;
use prometheus::IntCounter;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_client_api_messages::websocket::{
ByteListLen, Compression, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
ByteListLen, DatabaseUpdate, QueryUpdate, SingleQueryUpdate, TableUpdate, WebsocketFormat,
};
use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore};
use spacetimedb_lib::{metrics::ExecutionMetrics, Identity};
Expand Down Expand Up @@ -147,10 +147,7 @@ where
inserts: empty,
},
};
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table update too.
let update = F::into_query_update(qu, Compression::None);
let update = F::into_query_update(qu);
(
TableUpdate::new(table_id, table_name, SingleQueryUpdate { update, num_rows }),
metrics,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ mod tests {
use crate::vm::tests::create_table_with_rows;
use crate::vm::DbProgram;
use itertools::Itertools;
use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
use spacetimedb_client_api_messages::websocket::BsatnFormat;
use spacetimedb_lib::bsatn;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::error::ResultTest;
Expand Down Expand Up @@ -353,7 +353,7 @@ mod tests {
total_tables: usize,
rows: &[ProductValue],
) -> ResultTest<()> {
let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
let result = s.eval::<BsatnFormat>(db, tx, None).tables;
assert_eq!(
result.len(),
total_tables,
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::sql::ast::SchemaViewer;
use crate::vm::{build_query, TxMode};
use anyhow::Context;
use itertools::Either;
use spacetimedb_client_api_messages::websocket::{Compression, WebsocketFormat};
use spacetimedb_client_api_messages::websocket::WebsocketFormat;
use spacetimedb_data_structures::map::HashSet;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
use spacetimedb_lib::db::error::AuthError;
Expand Down Expand Up @@ -516,14 +516,13 @@ impl ExecutionSet {
db: &RelationalDB,
tx: &Tx,
slow_query_threshold: Option<Duration>,
compression: Compression,
) -> ws::DatabaseUpdate<F> {
// evaluate each of the execution units in this ExecutionSet in parallel
let tables = self
.exec_units
// if you need eval to run single-threaded for debugging, change this to .iter()
.iter()
.filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold, compression))
.filter_map(|unit| unit.eval(db, tx, &unit.sql, slow_query_threshold))
.collect();
ws::DatabaseUpdate { tables }
}
Expand Down
Loading