Skip to content

Commit 7ef5169

Browse files
chore: simplify schema creation from storage (#1368)
remove functions that creates schema from ingestors and queriers separately reused function `fetch_schema` that fetches all schema files and merges the schemas into one this ensures the schema is always the latest
1 parent 085519a commit 7ef5169

File tree

4 files changed

+27
-65
lines changed

4 files changed

+27
-65
lines changed

src/handlers/http/query.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,22 @@ pub async fn create_streams_for_querier() -> Result<(), QueryError> {
439439
let querier_streams = PARSEABLE.streams.list();
440440

441441
let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect();
442-
443-
let storage_streams = store.list_streams().await?;
444-
445-
let missing_streams: Vec<_> = storage_streams
442+
// fetch querier streams which have field list blank
443+
// now missing streams should be list of streams which are in storage but not in querier
444+
// and also have no fields in the schema
445+
// this is to ensure that we do not create streams for querier which already exist in querier
446+
447+
let missing_streams: Vec<_> = store
448+
.list_streams()
449+
.await?
446450
.into_iter()
447-
.filter(|stream_name| !querier_streams_set.contains(stream_name))
451+
.filter(|stream_name| {
452+
!querier_streams_set.contains(stream_name)
453+
|| PARSEABLE
454+
.get_stream(stream_name)
455+
.map(|s| s.get_schema().fields().is_empty())
456+
.unwrap_or(false)
457+
})
448458
.collect();
449459

450460
if missing_streams.is_empty() {

src/migration/mod.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async fn migration_stream(
206206
) -> anyhow::Result<Option<LogStreamMetadata>> {
207207
let mut arrow_schema: Schema = Schema::empty();
208208

209-
let schema = fetch_or_create_schema(stream, storage).await?;
209+
let schema = storage.create_schema_from_storage(stream).await?;
210210
let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?;
211211

212212
let mut stream_meta_found = true;
@@ -234,29 +234,6 @@ async fn migration_stream(
234234
Ok(Some(metadata))
235235
}
236236

237-
async fn fetch_or_create_schema(
238-
stream: &str,
239-
storage: &dyn ObjectStorage,
240-
) -> anyhow::Result<Bytes> {
241-
let schema_path = schema_path(stream);
242-
if let Ok(schema) = storage.get_object(&schema_path).await {
243-
Ok(schema)
244-
} else {
245-
let querier_schema = storage
246-
.create_schema_from_querier(stream)
247-
.await
248-
.unwrap_or_default();
249-
if !querier_schema.is_empty() {
250-
Ok(querier_schema)
251-
} else {
252-
Ok(storage
253-
.create_schema_from_ingestor(stream)
254-
.await
255-
.unwrap_or_default())
256-
}
257-
}
258-
}
259-
260237
async fn fetch_or_create_stream_metadata(
261238
stream: &str,
262239
storage: &dyn ObjectStorage,

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ impl Parseable {
281281

282282
let (stream_metadata_bytes, schema_bytes) = try_join!(
283283
storage.create_stream_from_ingestor(stream_name),
284-
storage.create_schema_from_ingestor(stream_name)
284+
storage.create_schema_from_storage(stream_name)
285285
)?;
286286

287287
let stream_metadata = if stream_metadata_bytes.is_empty() {

src/storage/object_storage.rs

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot};
4949
use crate::correlation::{CorrelationConfig, CorrelationError};
5050
use crate::event::format::LogSource;
5151
use crate::event::format::LogSourceEntry;
52+
use crate::handlers::http::fetch_schema;
5253
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
5354
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
5455
use crate::handlers::http::users::CORRELATION_DIR;
@@ -652,44 +653,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
652653
Ok(Bytes::new())
653654
}
654655

655-
///create schema from querier schema from storage
656-
async fn create_schema_from_querier(
656+
///create schema from storage
657+
async fn create_schema_from_storage(
657658
&self,
658659
stream_name: &str,
659660
) -> Result<Bytes, ObjectStorageError> {
660-
let path =
661-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
662-
if let Ok(querier_schema_bytes) = self.get_object(&path).await {
663-
self.put_object(&schema_path(stream_name), querier_schema_bytes.clone())
664-
.await?;
665-
return Ok(querier_schema_bytes);
666-
}
667-
Ok(Bytes::new())
668-
}
669-
670-
///create schema from ingestor schema from storage
671-
async fn create_schema_from_ingestor(
672-
&self,
673-
stream_name: &str,
674-
) -> Result<Bytes, ObjectStorageError> {
675-
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
676-
if let Some(schema_obs) = self
677-
.get_objects(
678-
Some(&path),
679-
Box::new(|file_name| {
680-
file_name.starts_with(".ingestor") && file_name.ends_with("schema")
681-
}),
682-
)
683-
.await
684-
.into_iter()
685-
.next()
686-
{
687-
let schema_ob = &schema_obs[0];
688-
self.put_object(&schema_path(stream_name), schema_ob.clone())
689-
.await?;
690-
return Ok(schema_ob.clone());
691-
}
692-
Ok(Bytes::new())
661+
let schema = fetch_schema(stream_name).await?;
662+
// convert to bytes
663+
let schema = serde_json::to_vec(&schema)?;
664+
let schema_bytes = Bytes::from(schema);
665+
self.put_object(&schema_path(stream_name), schema_bytes.clone())
666+
.await?;
667+
Ok(schema_bytes)
693668
}
694669

695670
async fn get_stream_meta_from_storage(

0 commit comments

Comments
 (0)