Skip to content

Commit 40c9b26

Browse files
fix for converting arrows to parquet
1 parent ca4b25a commit 40c9b26

File tree

2 files changed

+33
-34
lines changed

2 files changed

+33
-34
lines changed

src/event/mod.rs

+1-12
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,7 @@ pub struct Event {
4747
// Events holds the schema related to a each event for a single log stream
4848
impl Event {
4949
pub async fn process(self) -> Result<(), EventError> {
50-
let mut key = get_schema_key(&self.rb.schema().fields);
51-
if self.time_partition.is_some() {
52-
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
53-
key.push_str(&parsed_timestamp_to_min);
54-
}
55-
56-
if !self.custom_partition_values.is_empty() {
57-
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
58-
key.push_str(&format!("&{k}={v}"));
59-
}
60-
}
61-
50+
let key = get_schema_key(&self.rb.schema().fields);
6251
if self.is_first_event {
6352
commit_schema(&self.stream_name, self.rb.schema())?;
6453
}

src/staging/streams.rs

+32-22
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::{
2828
use arrow_array::RecordBatch;
2929
use arrow_ipc::writer::StreamWriter;
3030
use arrow_schema::Schema;
31-
use chrono::{NaiveDateTime, Timelike, Utc};
31+
use chrono::{DateTime, Datelike, NaiveDateTime, Timelike, Utc};
3232
use derive_more::{Deref, DerefMut};
3333
use itertools::Itertools;
3434
use parquet::{
@@ -136,8 +136,7 @@ impl<'a> Stream<'a> {
136136
hostname.push_str(&INGESTOR_META.get_ingestor_id());
137137
}
138138
let filename = format!(
139-
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
140-
Utc::now().format("%Y%m%dT%H%M"),
139+
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
141140
parsed_timestamp.date(),
142141
parsed_timestamp.hour(),
143142
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
@@ -155,13 +154,13 @@ impl<'a> Stream<'a> {
155154
return vec![];
156155
};
157156

158-
let paths = dir
157+
let paths: Vec<PathBuf> = dir
159158
.flatten()
160159
.map(|file| file.path())
161160
.filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows")))
162-
.sorted_by_key(|f| f.metadata().unwrap().modified().unwrap())
161+
.sorted_by_key(|f| f.metadata().unwrap().created().unwrap())
163162
.collect();
164-
163+
165164
paths
166165
}
167166

@@ -172,24 +171,36 @@ impl<'a> Stream<'a> {
172171
/// Only includes ones starting from the previous minute
173172
pub fn arrow_files_grouped_exclude_time(
174173
&self,
175-
exclude: NaiveDateTime,
176174
shutdown_signal: bool,
177175
) -> HashMap<PathBuf, Vec<PathBuf>> {
176+
let now = Utc::now();
177+
178+
// Extract date and time components of current time
179+
let now_date = (now.year(), now.month(), now.day());
180+
let now_time = (now.hour(), now.minute());
181+
178182
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
179183
let mut arrow_files = self.arrow_files();
180-
181-
// if the shutdown signal is false i.e. normal condition
182-
// don't keep the ones for the current minute
183-
if !shutdown_signal {
184-
arrow_files.retain(|path| {
185-
!path
186-
.file_name()
187-
.unwrap()
188-
.to_str()
189-
.unwrap()
190-
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
191-
});
192-
}
184+
arrow_files = arrow_files
185+
.into_iter()
186+
.filter(|path| {
187+
let created_at = path.metadata().unwrap().created().unwrap();
188+
let created_at: DateTime<Utc> = created_at.into();
189+
let created_date = (created_at.year(), created_at.month(), created_at.day());
190+
let created_time = (created_at.hour(), created_at.minute());
191+
192+
let same_date = now_date == created_date;
193+
let same_time = now_time == created_time;
194+
195+
// if the shutdown signal is false i.e. normal condition
196+
// don't keep the ones for the current minute
197+
if !shutdown_signal {
198+
!same_date || !same_time
199+
} else {
200+
true
201+
}
202+
})
203+
.collect();
193204

194205
let random_string =
195206
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15);
@@ -306,8 +317,7 @@ impl<'a> Stream<'a> {
306317
) -> Result<Option<Schema>, StagingError> {
307318
let mut schemas = Vec::new();
308319

309-
let time = chrono::Utc::now().naive_utc();
310-
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
320+
let staging_files = self.arrow_files_grouped_exclude_time(shutdown_signal);
311321
if staging_files.is_empty() {
312322
metrics::STAGING_FILES
313323
.with_label_values(&[&self.stream_name])

0 commit comments

Comments
 (0)