17
17
)
18
18
from typing import (
19
19
Any ,
20
- MutableSet ,
21
- Optional ,
22
20
TYPE_CHECKING ,
23
- Type ,
24
- Union ,
25
21
cast ,
26
22
)
27
23
@@ -267,7 +263,7 @@ def transform(self,
267
263
partition : BundlePartition = BundlePartition .root ,
268
264
* ,
269
265
delete : bool ,
270
- ) -> Union [ list [BundlePartition ], tuple [list [Contribution ], list [Replica ] ]]:
266
+ ) -> list [BundlePartition ] | tuple [list [Contribution ], list [Replica ]]:
271
267
"""
272
268
Return a list of contributions and a list of replicas for the entities
273
269
in the given partition of the specified bundle, or a set of divisions of
@@ -353,7 +349,7 @@ def stringify(value: AnyJSON) -> AnyJSON:
353
349
)
354
350
355
351
def setify (value : CompositeJSON
356
- ) -> Union [ set [tuple [str , AnyJSON ]], set [AnyJSON ] ]:
352
+ ) -> set [tuple [str , AnyJSON ]] | set [AnyJSON ]:
357
353
value = freeze (value )
358
354
return set (
359
355
value .items ()
@@ -578,7 +574,7 @@ def _read_contributions(self,
578
574
) -> list [CataloguedContribution ]:
579
575
es_client = ESClientFactory .get ()
580
576
581
- entity_ids_by_index : dict [str , MutableSet [str ]] = defaultdict (set )
577
+ entity_ids_by_index : dict [str , set [str ]] = defaultdict (set )
582
578
for entity in tallies .keys ():
583
579
index = str (IndexName .create (catalog = entity .catalog ,
584
580
qualifier = entity .entity_type ,
@@ -696,7 +692,7 @@ def _aggregate(self,
696
692
)
697
693
698
694
# Create lookup for transformer by entity type
699
- transformers : dict [tuple [CatalogName , str ], Type [Transformer ]] = {
695
+ transformers : dict [tuple [CatalogName , str ], type [Transformer ]] = {
700
696
(catalog , transformer_cls .entity_type ()): transformer_cls
701
697
for catalog in config .catalogs
702
698
for transformer_cls in self .transformer_types (catalog )
@@ -734,7 +730,7 @@ def _aggregate(self,
734
730
return aggregates
735
731
736
732
def _aggregate_entity (self ,
737
- transformer : Type [Transformer ],
733
+ transformer : type [Transformer ],
738
734
contributions : list [Contribution ]
739
735
) -> JSON :
740
736
contents = self ._reconcile (transformer , contributions )
@@ -756,7 +752,7 @@ def _aggregate_entity(self,
756
752
return aggregate_contents
757
753
758
754
def _reconcile (self ,
759
- transformer : Type [Transformer ],
755
+ transformer : type [Transformer ],
760
756
contributions : Sequence [Contribution ],
761
757
) -> Mapping [EntityType , Entities ]:
762
758
"""
@@ -791,7 +787,7 @@ def _reconcile(self,
791
787
792
788
def _create_writer (self ,
793
789
doc_type : DocumentType ,
794
- catalog : Optional [ CatalogName ]
790
+ catalog : CatalogName | None
795
791
) -> 'IndexWriter' :
796
792
# We allow one conflict retry in the case of duplicate notifications and
797
793
# switch from 'add' to 'update'. After that, there should be no
@@ -814,9 +810,9 @@ def _create_writer(self,
814
810
class IndexWriter :
815
811
816
812
def __init__ (self ,
817
- catalog : Optional [ CatalogName ] ,
813
+ catalog : CatalogName | None ,
818
814
field_types : CataloguedFieldTypes ,
819
- refresh : Union [ bool , str ] ,
815
+ refresh : bool | str ,
820
816
conflict_retry_limit : int ,
821
817
error_retry_limit : int ) -> None :
822
818
"""
@@ -843,7 +839,7 @@ def __init__(self,
843
839
self .es_client = ESClientFactory .get ()
844
840
self .errors : dict [DocumentCoordinates , int ] = defaultdict (int )
845
841
self .conflicts : dict [DocumentCoordinates , int ] = defaultdict (int )
846
- self .retries : Optional [ MutableSet [ DocumentCoordinates ]] = None
842
+ self .retries : set [ DocumentCoordinates ] | None = None
847
843
848
844
bulk_threshold = 32
849
845
@@ -945,7 +941,7 @@ def _on_success(self, doc: Document):
945
941
else :
946
942
log .debug ('Successfully wrote %s.' , coordinates )
947
943
948
- def _on_error (self , doc : Document , e : Union [ Exception , JSON ] ):
944
+ def _on_error (self , doc : Document , e : Exception | JSON ):
949
945
self .errors [doc .coordinates ] += 1
950
946
if self .error_retry_limit is None or self .errors [doc .coordinates ] <= self .error_retry_limit :
951
947
action = 'retrying'
@@ -956,7 +952,7 @@ def _on_error(self, doc: Document, e: Union[Exception, JSON]):
956
952
doc .coordinates , e , self .errors [doc .coordinates ], action ,
957
953
exc_info = isinstance (e , Exception ))
958
954
959
- def _on_conflict (self , doc : Document , e : Union [ Exception , JSON ] ):
955
+ def _on_conflict (self , doc : Document , e : Exception | JSON ):
960
956
self .conflicts [doc .coordinates ] += 1
961
957
self .errors .pop (doc .coordinates , None ) # a conflict resets the error count
962
958
if self .conflict_retry_limit is None or self .conflicts [doc .coordinates ] <= self .conflict_retry_limit :
0 commit comments