10
10
//! Logic related to applying updates from a [`mz_catalog::durable::DurableCatalogState`] to a
11
11
//! [`CatalogState`].
12
12
13
- use std:: collections:: { BTreeMap , BTreeSet } ;
13
+ use std:: collections:: { BTreeMap , BTreeSet , VecDeque } ;
14
14
use std:: fmt:: Debug ;
15
+ use std:: str:: FromStr ;
15
16
16
17
use mz_catalog:: builtin:: { Builtin , BUILTIN_LOG_LOOKUP , BUILTIN_LOOKUP } ;
17
18
use mz_catalog:: memory:: error:: { Error , ErrorKind } ;
@@ -32,9 +33,9 @@ use mz_sql::names::{
32
33
ItemQualifiers , QualifiedItemName , QualifiedSchemaName , ResolvedDatabaseSpecifier , ResolvedIds ,
33
34
SchemaSpecifier ,
34
35
} ;
35
- use mz_sql:: rbac;
36
36
use mz_sql:: session:: user:: MZ_SYSTEM_ROLE_ID ;
37
37
use mz_sql:: session:: vars:: { VarError , VarInput } ;
38
+ use mz_sql:: { plan, rbac} ;
38
39
use mz_sql_parser:: ast:: Expr ;
39
40
use mz_storage_types:: sources:: Timeline ;
40
41
use once_cell:: sync:: Lazy ;
@@ -46,6 +47,8 @@ use crate::AdapterError;
46
47
47
48
enum ApplyUpdateError {
48
49
Error ( Error ) ,
50
+ AwaitingIdDependency ( ( GlobalId , mz_catalog:: durable:: Item , Diff ) ) ,
51
+ AwaitingNameDependency ( ( String , mz_catalog:: durable:: Item , Diff ) ) ,
49
52
}
50
53
51
54
impl CatalogState {
@@ -66,13 +69,126 @@ impl CatalogState {
66
69
& mut self ,
67
70
updates : Vec < StateUpdate > ,
68
71
) -> Result < ( ) , Error > {
69
- for StateUpdate { kind, diff } in updates {
72
+ // TODO(v100): we can refactor this to assume that items are
73
+ // topologically sorted by their `GlobalId`.
74
+ let mut awaiting_id_dependencies: BTreeMap < GlobalId , Vec < _ > > = BTreeMap :: new ( ) ;
75
+ let mut awaiting_name_dependencies: BTreeMap < String , Vec < _ > > = BTreeMap :: new ( ) ;
76
+ let mut updates: VecDeque < _ > = updates. into_iter ( ) . collect ( ) ;
77
+ while let Some ( StateUpdate { kind, diff } ) = updates. pop_front ( ) {
70
78
assert_eq ! (
71
79
diff, 1 ,
72
80
"initial catalog updates should be consolidated: ({kind:?}, {diff:?})"
73
81
) ;
74
- self . apply_update ( kind, diff)
75
- . map_err ( |ApplyUpdateError :: Error ( err) | err) ?;
82
+ match self . apply_update ( kind, diff) {
83
+ Ok ( None ) => { }
84
+ Ok ( Some ( id) ) => {
85
+ // Enqueue any items waiting on this dependency.
86
+ let mut resolved_dependent_items = Vec :: new ( ) ;
87
+ if let Some ( dependent_items) = awaiting_id_dependencies. remove ( & id) {
88
+ resolved_dependent_items. extend ( dependent_items) ;
89
+ }
90
+ let entry = self . get_entry ( & id) ;
91
+ let full_name = self . resolve_full_name ( entry. name ( ) , None ) ;
92
+ if let Some ( dependent_items) =
93
+ awaiting_name_dependencies. remove ( & full_name. to_string ( ) )
94
+ {
95
+ resolved_dependent_items. extend ( dependent_items) ;
96
+ }
97
+ let resolved_dependent_items =
98
+ resolved_dependent_items
99
+ . into_iter ( )
100
+ . map ( |( item, diff) | StateUpdate {
101
+ kind : StateUpdateKind :: Item ( item) ,
102
+ diff,
103
+ } ) ;
104
+ updates. extend ( resolved_dependent_items) ;
105
+ }
106
+ Err ( ApplyUpdateError :: Error ( err) ) => return Err ( err) ,
107
+ Err ( ApplyUpdateError :: AwaitingIdDependency ( ( id, item, diff) ) ) => {
108
+ awaiting_id_dependencies
109
+ . entry ( id)
110
+ . or_default ( )
111
+ . push ( ( item, diff) ) ;
112
+ }
113
+ Err ( ApplyUpdateError :: AwaitingNameDependency ( ( name, item, diff) ) ) => {
114
+ awaiting_name_dependencies
115
+ . entry ( name)
116
+ . or_default ( )
117
+ . push ( ( item, diff) ) ;
118
+ }
119
+ }
120
+ }
121
+
122
+ // Error on any unsatisfied dependencies.
123
+ if let Some ( ( missing_dep, mut dependents) ) = awaiting_id_dependencies. into_iter ( ) . next ( ) {
124
+ let (
125
+ mz_catalog:: durable:: Item {
126
+ id,
127
+ oid : _,
128
+ schema_id,
129
+ name,
130
+ create_sql : _,
131
+ owner_id : _,
132
+ privileges : _,
133
+ } ,
134
+ diff,
135
+ ) = dependents. remove ( 0 ) ;
136
+ let schema = self . find_non_temp_schema ( & schema_id) ;
137
+ let name = QualifiedItemName {
138
+ qualifiers : ItemQualifiers {
139
+ database_spec : schema. database ( ) . clone ( ) ,
140
+ schema_spec : schema. id ( ) . clone ( ) ,
141
+ } ,
142
+ item : name,
143
+ } ;
144
+ let name = self . resolve_full_name ( & name, None ) ;
145
+ let action = if diff == 1 { "deserialize" } else { "remove" } ;
146
+
147
+ return Err ( Error :: new ( ErrorKind :: Corruption {
148
+ detail : format ! (
149
+ "failed to {} item {} ({}): {}" ,
150
+ action,
151
+ id,
152
+ name,
153
+ plan:: PlanError :: InvalidId ( missing_dep)
154
+ ) ,
155
+ } ) ) ;
156
+ }
157
+
158
+ if let Some ( ( missing_dep, mut dependents) ) = awaiting_name_dependencies. into_iter ( ) . next ( ) {
159
+ let (
160
+ mz_catalog:: durable:: Item {
161
+ id,
162
+ oid : _,
163
+ schema_id,
164
+ name,
165
+ create_sql : _,
166
+ owner_id : _,
167
+ privileges : _,
168
+ } ,
169
+ diff,
170
+ ) = dependents. remove ( 0 ) ;
171
+ let schema = self . find_non_temp_schema ( & schema_id) ;
172
+ let name = QualifiedItemName {
173
+ qualifiers : ItemQualifiers {
174
+ database_spec : schema. database ( ) . clone ( ) ,
175
+ schema_spec : schema. id ( ) . clone ( ) ,
176
+ } ,
177
+ item : name,
178
+ } ;
179
+ let name = self . resolve_full_name ( & name, None ) ;
180
+ let action = if diff == 1 { "deserialize" } else { "remove" } ;
181
+ return Err ( Error :: new ( ErrorKind :: Corruption {
182
+ detail : format ! (
183
+ "failed to {} item {} ({}): {}" ,
184
+ action,
185
+ id,
186
+ name,
187
+ Error {
188
+ kind: ErrorKind :: Sql ( SqlCatalogError :: UnknownItem ( missing_dep) )
189
+ }
190
+ ) ,
191
+ } ) ) ;
76
192
}
77
193
78
194
Ok ( ( ) )
@@ -615,12 +731,18 @@ impl CatalogState {
615
731
/// Applies an item update to `self`.
616
732
///
617
733
/// Returns a `GlobalId` on success, if the applied update added a new `GlobalID` to `self`.
734
+ /// Returns a dependency on failure, if the update could not be applied due to a missing
735
+ /// dependency.
618
736
#[ instrument( level = "debug" ) ]
619
737
fn apply_item_update (
620
738
& mut self ,
621
739
item : mz_catalog:: durable:: Item ,
622
740
diff : Diff ,
623
741
) -> Result < Option < GlobalId > , ApplyUpdateError > {
742
+ // If we knew beforehand that the items were being applied in dependency
743
+ // order, then we could fully delegate to `self.insert_item(...)` and`self.drop_item(...)`.
744
+ // However, we don't know that items are applied in dependency order, so we must handle the
745
+ // case that the item is valid, but we haven't applied all of its dependencies yet.
624
746
match diff {
625
747
1 => {
626
748
// TODO(benesch): a better way of detecting when a view has depended
@@ -641,6 +763,27 @@ impl CatalogState {
641
763
} ,
642
764
) ) ) ;
643
765
}
766
+ // If we were missing a dependency, wait for it to be added.
767
+ Err ( AdapterError :: PlanError ( plan:: PlanError :: InvalidId ( missing_dep) ) ) => {
768
+ return Err ( ApplyUpdateError :: AwaitingIdDependency ( (
769
+ missing_dep,
770
+ item,
771
+ diff,
772
+ ) ) ) ;
773
+ }
774
+ // If we were missing a dependency, wait for it to be added.
775
+ Err ( AdapterError :: PlanError ( plan:: PlanError :: Catalog (
776
+ SqlCatalogError :: UnknownItem ( missing_dep) ,
777
+ ) ) ) => {
778
+ return match GlobalId :: from_str ( & missing_dep) {
779
+ Ok ( id) => Err ( ApplyUpdateError :: AwaitingIdDependency ( ( id, item, diff) ) ) ,
780
+ Err ( _) => Err ( ApplyUpdateError :: AwaitingNameDependency ( (
781
+ missing_dep,
782
+ item,
783
+ diff,
784
+ ) ) ) ,
785
+ }
786
+ }
644
787
Err ( e) => {
645
788
let schema = self . find_non_temp_schema ( & item. schema_id ) ;
646
789
let name = QualifiedItemName {
@@ -678,6 +821,13 @@ impl CatalogState {
678
821
Ok ( Some ( item. id ) )
679
822
}
680
823
-1 => {
824
+ let entry = self . get_entry ( & item. id ) ;
825
+ if let Some ( id) = entry. referenced_by ( ) . first ( ) {
826
+ return Err ( ApplyUpdateError :: AwaitingIdDependency ( ( * id, item, diff) ) ) ;
827
+ }
828
+ if let Some ( id) = entry. used_by ( ) . first ( ) {
829
+ return Err ( ApplyUpdateError :: AwaitingIdDependency ( ( * id, item, diff) ) ) ;
830
+ }
681
831
self . drop_item ( item. id ) ;
682
832
Ok ( None )
683
833
}
0 commit comments