Skip to content

Commit f53b1bd

Browse files
zxqfd555-pwManul from Pathway
authored and
Manul from Pathway
committed
fix endless stream of parsing errors when the schema is incorrect (#8455)
GitOrigin-RevId: 3579f473828ce1d7161d5e8fc9a2c8ad3f8602ca
1 parent 84e51a6 commit f53b1bd

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
55
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66
## [Unreleased]
77

8+
### Changed
9+
- Input connectors now throttle parsing error messages if their share is more than 10% of the parsing attempts.
10+
811
## [0.21.0] - 2025-03-19
912

1013
### Added

src/connectors/mod.rs

+24-3
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,17 @@ impl StartedConnectorState {
8888
}
8989
}
9090

91+
const MAX_PARSE_ERRORS_IN_LOG: usize = 128;
92+
9193
pub struct Connector {
9294
commit_duration: Option<Duration>,
9395
current_timestamp: Timestamp,
9496
num_columns: usize,
9597
current_frontier: OffsetAntichain,
9698
skip_all_errors: bool,
9799
error_logger: Rc<dyn LogError>,
100+
n_parse_attempts: usize,
101+
n_parse_errors_in_log: usize,
98102
}
99103

100104
#[derive(Debug)]
@@ -201,6 +205,8 @@ impl Connector {
201205
current_frontier: OffsetAntichain::new(),
202206
skip_all_errors,
203207
error_logger,
208+
n_parse_attempts: 0,
209+
n_parse_errors_in_log: 0,
204210
}
205211
}
206212

@@ -624,7 +630,10 @@ impl Connector {
624630
}
625631
ReadResult::Data(reader_context, offset) => {
626632
let mut parsed_entries = match parser.parse(&reader_context) {
627-
Ok(entries) => entries,
633+
Ok(entries) => {
634+
self.log_parse_success();
635+
entries
636+
}
628637
Err(e) => {
629638
self.log_parse_error(e);
630639
return;
@@ -815,13 +824,25 @@ impl Connector {
815824
}
816825
}
817826

818-
fn log_parse_error(&self, error: DynError) {
827+
fn log_parse_error(&mut self, error: DynError) {
828+
self.n_parse_attempts += 1;
819829
if self.skip_all_errors {
820-
error!("Parse error: {error}");
830+
self.n_parse_errors_in_log += 1;
831+
let needs_error_log = self.n_parse_errors_in_log <= MAX_PARSE_ERRORS_IN_LOG
832+
|| self.n_parse_errors_in_log * 10 <= self.n_parse_attempts;
833+
if needs_error_log {
834+
error!("Parse error: {error}");
835+
} else if self.n_parse_errors_in_log == MAX_PARSE_ERRORS_IN_LOG + 1 {
836+
error!("Too many parse errors, some of them will be omitted...");
837+
}
821838
} else {
822839
self.error_logger.log_error(error.into());
823840
}
824841
}
842+
843+
fn log_parse_success(&mut self) {
844+
self.n_parse_attempts += 1;
845+
}
825846
}
826847

827848
pub struct SnapshotReaderState {

0 commit comments

Comments
 (0)