Skip to content

Commit c3fc1cb

Browse files
authored
storage: add some extra logging (#32379)
<!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent f7d75a8 commit c3fc1cb

File tree

3 files changed

+29
-23
lines changed

3 files changed

+29
-23
lines changed

src/storage/src/upsert.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ async fn drain_staged_input<S, G, T, FromTime, E>(
561561
drain_style: DrainStyle<'_, T>,
562562
error_emitter: &mut E,
563563
state: &mut UpsertState<'_, S, T, Option<FromTime>>,
564+
source_config: &crate::source::SourceExportCreationConfig,
564565
) where
565566
S: UpsertStateBackend<T, Option<FromTime>>,
566567
G: Scope,
@@ -632,7 +633,7 @@ async fn drain_staged_input<S, G, T, FromTime, E>(
632633
let existing_value = &mut command_state.get_mut().value;
633634

634635
if let Some(cs) = existing_value.as_mut() {
635-
cs.ensure_decoded(bincode_opts);
636+
cs.ensure_decoded(bincode_opts, source_config.id);
636637
}
637638

638639
// Skip this command if its order key is below the one in the upsert state.
@@ -775,7 +776,7 @@ where
775776
state().await,
776777
upsert_shared_metrics,
777778
&upsert_metrics,
778-
source_config.source_statistics,
779+
source_config.source_statistics.clone(),
779780
upsert_config.shrink_upsert_unused_buffers_by_ratio,
780781
);
781782
let mut events = vec![];
@@ -931,6 +932,7 @@ where
931932
DrainStyle::ToUpper(&upper),
932933
&mut error_emitter,
933934
&mut state,
935+
&source_config,
934936
)
935937
.await;
936938

@@ -967,6 +969,7 @@ where
967969
DrainStyle::AtTime(partial_drain_time),
968970
&mut error_emitter,
969971
&mut state,
972+
&source_config,
970973
)
971974
.await;
972975

src/storage/src/upsert/types.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ use bincode::Options;
9292
use itertools::Itertools;
9393
use mz_ore::cast::CastFrom;
9494
use mz_ore::error::ErrorExt;
95-
use mz_repr::Diff;
95+
use mz_repr::{Diff, GlobalId};
9696
use serde::{Serialize, de::DeserializeOwned};
9797

9898
use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics};
@@ -683,16 +683,16 @@ impl<T: Eq, O: Default> StateValue<T, O> {
683683
/// Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
684684
/// then mutate it to its `Value` state, so the `upsert` operator can use it.
685685
#[allow(clippy::as_conversions)]
686-
pub fn ensure_decoded(&mut self, bincode_opts: BincodeOpts) {
686+
pub fn ensure_decoded(&mut self, bincode_opts: BincodeOpts, source_id: GlobalId) {
687687
match self {
688688
StateValue::Consolidating(consolidating) => {
689689
match consolidating.diff_sum.0 {
690690
1 => {
691691
let len = usize::try_from(consolidating.len_sum.0)
692692
.map_err(|_| {
693693
format!(
694-
"len_sum can't be made into a usize, state: {}",
695-
consolidating
694+
"len_sum can't be made into a usize, state: {}, {}",
695+
consolidating, source_id,
696696
)
697697
})
698698
.expect("invalid upsert state");
@@ -701,10 +701,11 @@ impl<T: Eq, O: Default> StateValue<T, O> {
701701
.get(..len)
702702
.ok_or_else(|| {
703703
format!(
704-
"value_xor is not the same length ({}) as len ({}), state: {}",
704+
"value_xor is not the same length ({}) as len ({}), state: {}, {}",
705705
consolidating.value_xor.len(),
706706
len,
707-
consolidating
707+
consolidating,
708+
source_id,
708709
)
709710
})
710711
.expect("invalid upsert state");
@@ -713,8 +714,9 @@ impl<T: Eq, O: Default> StateValue<T, O> {
713714
consolidating.checksum_sum.0,
714715
// Hash the value, not the full buffer, which may have extra 0's
715716
seahash::hash(value) as i64,
716-
"invalid upsert state: checksum_sum does not match, state: {}",
717-
consolidating
717+
"invalid upsert state: checksum_sum does not match, state: {}, {}",
718+
consolidating,
719+
source_id,
718720
);
719721
*self = Self::Value(Value::FinalizedValue(
720722
bincode_opts.deserialize(value).unwrap(),
@@ -724,30 +726,31 @@ impl<T: Eq, O: Default> StateValue<T, O> {
724726
0 => {
725727
assert_eq!(
726728
consolidating.len_sum.0, 0,
727-
"invalid upsert state: len_sum is non-0, state: {}",
728-
consolidating
729+
"invalid upsert state: len_sum is non-0, state: {}, {}",
730+
consolidating, source_id,
729731
);
730732
assert_eq!(
731733
consolidating.checksum_sum.0, 0,
732-
"invalid upsert state: checksum_sum is non-0, state: {}",
733-
consolidating
734+
"invalid upsert state: checksum_sum is non-0, state: {}, {}",
735+
consolidating, source_id,
734736
);
735737
assert!(
736738
consolidating.value_xor.iter().all(|&x| x == 0),
737739
"invalid upsert state: value_xor not all 0s with 0 diff. \
738-
Non-zero positions: {:?}, state: {}",
740+
Non-zero positions: {:?}, state: {}, {}",
739741
consolidating
740742
.value_xor
741743
.iter()
742744
.positions(|&x| x != 0)
743745
.collect::<Vec<_>>(),
744-
consolidating
746+
consolidating,
747+
source_id,
745748
);
746749
*self = Self::Value(Value::Tombstone(Default::default()));
747750
}
748751
other => panic!(
749-
"invalid upsert state: non 0/1 diff_sum: {}, state: {}",
750-
other, consolidating
752+
"invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}",
753+
other, consolidating, source_id
751754
),
752755
}
753756
}
@@ -1470,7 +1473,7 @@ mod tests {
14701473
s.merge_update(longer_row, Diff::ONE, opts, &mut buf);
14711474

14721475
// Assert that the `Consolidating` value is fully merged.
1473-
s.ensure_decoded(opts);
1476+
s.ensure_decoded(opts, GlobalId::User(1));
14741477
}
14751478

14761479
// We guard some of our assumptions. Increasing in-memory size of StateValue
@@ -1530,7 +1533,7 @@ mod tests {
15301533
s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
15311534
s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
15321535

1533-
s.ensure_decoded(opts);
1536+
s.ensure_decoded(opts, GlobalId::User(1));
15341537
}
15351538

15361539
#[mz_ore::test]
@@ -1549,7 +1552,7 @@ mod tests {
15491552
s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
15501553
s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
15511554

1552-
s.ensure_decoded(opts);
1555+
s.ensure_decoded(opts, GlobalId::User(1));
15531556
}
15541557

15551558
#[mz_ore::test]
@@ -1566,6 +1569,6 @@ mod tests {
15661569
s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
15671570
s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
15681571

1569-
s.ensure_decoded(opts);
1572+
s.ensure_decoded(opts, GlobalId::User(1));
15701573
}
15711574
}

src/storage/src/upsert_continual_feedback.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ where
755755
let existing_state_cell = &mut command_state.get_mut().value;
756756

757757
if let Some(cs) = existing_state_cell.as_mut() {
758-
cs.ensure_decoded(bincode_opts);
758+
cs.ensure_decoded(bincode_opts, source_config.id);
759759
}
760760

761761
// Skip this command if its order key is below the one in the upsert state.

0 commit comments

Comments
 (0)