Skip to content

Commit 29315f2

Browse files
chore: simplify schema creation from storage
remove functions that creates schema from ingestors and queriers separately reused function `fetch_schema` that fetches all schema files and merges the schemas into one this ensures the schema is always the latest
1 parent d4a22e9 commit 29315f2

File tree

3 files changed

+12
-60
lines changed

3 files changed

+12
-60
lines changed

src/migration/mod.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ async fn migration_stream(
184184
) -> anyhow::Result<Option<LogStreamMetadata>> {
185185
let mut arrow_schema: Schema = Schema::empty();
186186

187-
let schema = fetch_or_create_schema(stream, storage).await?;
187+
let schema = storage.create_schema_from_storage(stream).await?;
188188
let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?;
189189

190190
let mut stream_meta_found = true;
@@ -212,29 +212,6 @@ async fn migration_stream(
212212
Ok(Some(metadata))
213213
}
214214

215-
async fn fetch_or_create_schema(
216-
stream: &str,
217-
storage: &dyn ObjectStorage,
218-
) -> anyhow::Result<Bytes> {
219-
let schema_path = schema_path(stream);
220-
if let Ok(schema) = storage.get_object(&schema_path).await {
221-
Ok(schema)
222-
} else {
223-
let querier_schema = storage
224-
.create_schema_from_querier(stream)
225-
.await
226-
.unwrap_or_default();
227-
if !querier_schema.is_empty() {
228-
Ok(querier_schema)
229-
} else {
230-
Ok(storage
231-
.create_schema_from_ingestor(stream)
232-
.await
233-
.unwrap_or_default())
234-
}
235-
}
236-
}
237-
238215
async fn fetch_or_create_stream_metadata(
239216
stream: &str,
240217
storage: &dyn ObjectStorage,

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl Parseable {
273273

274274
let (stream_metadata_bytes, schema_bytes) = try_join!(
275275
storage.create_stream_from_ingestor(stream_name),
276-
storage.create_schema_from_ingestor(stream_name)
276+
storage.create_schema_from_storage(stream_name)
277277
)?;
278278

279279
let stream_metadata = if stream_metadata_bytes.is_empty() {

src/storage/object_storage.rs

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot};
4949
use crate::correlation::{CorrelationConfig, CorrelationError};
5050
use crate::event::format::LogSource;
5151
use crate::event::format::LogSourceEntry;
52+
use crate::handlers::http::fetch_schema;
5253
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
5354
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
5455
use crate::handlers::http::users::CORRELATION_DIR;
@@ -652,44 +653,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
652653
Ok(Bytes::new())
653654
}
654655

655-
///create schema from querier schema from storage
656-
async fn create_schema_from_querier(
656+
///create schema from storage
657+
async fn create_schema_from_storage(
657658
&self,
658659
stream_name: &str,
659660
) -> Result<Bytes, ObjectStorageError> {
660-
let path =
661-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
662-
if let Ok(querier_schema_bytes) = self.get_object(&path).await {
663-
self.put_object(&schema_path(stream_name), querier_schema_bytes.clone())
664-
.await?;
665-
return Ok(querier_schema_bytes);
666-
}
667-
Ok(Bytes::new())
668-
}
669-
670-
///create schema from ingestor schema from storage
671-
async fn create_schema_from_ingestor(
672-
&self,
673-
stream_name: &str,
674-
) -> Result<Bytes, ObjectStorageError> {
675-
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
676-
if let Some(schema_obs) = self
677-
.get_objects(
678-
Some(&path),
679-
Box::new(|file_name| {
680-
file_name.starts_with(".ingestor") && file_name.ends_with("schema")
681-
}),
682-
)
683-
.await
684-
.into_iter()
685-
.next()
686-
{
687-
let schema_ob = &schema_obs[0];
688-
self.put_object(&schema_path(stream_name), schema_ob.clone())
689-
.await?;
690-
return Ok(schema_ob.clone());
691-
}
692-
Ok(Bytes::new())
661+
let schema = fetch_schema(stream_name).await?;
662+
// convert to bytes
663+
let schema = serde_json::to_vec(&schema)?;
664+
let schema_bytes = Bytes::from(schema);
665+
self.put_object(&schema_path(stream_name), schema_bytes.clone())
666+
.await?;
667+
Ok(schema_bytes)
693668
}
694669

695670
async fn get_stream_meta_from_storage(

0 commit comments

Comments
 (0)