@@ -42,13 +42,12 @@ def main(self):
42
42
43
43
date_format = "%Y-%m-%dT%H:%M:%S.%fZ"
44
44
45
- # removing total_retries as a config for now
46
45
def __init__ (
47
46
self ,
48
47
staging_area : str ,
49
48
ignore_dangling_inputs : bool ,
50
49
validate_json : bool ,
51
- # total_retries,
50
+ total_retries ,
52
51
) -> None :
53
52
super ().__init__ ()
54
53
self .staging_area = staging_area
@@ -58,7 +57,7 @@ def __init__(
58
57
self .gcs = gcs .Client ()
59
58
60
59
# Number of retries for validation
61
- # self.total_retries = total_retries
60
+ self .total_retries = total_retries
62
61
# A boolean to tell us if this is a delta or non-delta staging area
63
62
self .is_delta = None
64
63
# A mapping of data file name to metadata id
@@ -299,8 +298,7 @@ def validate_file_json(
299
298
if self .validate_json :
300
299
print (f"Validating JSON of { file_name } " )
301
300
try :
302
- # self.validator.validate_json(file_json, self.total_retries, schema)
303
- self .validator .validate_json (file_json , schema )
301
+ self .validator .validate_json (file_json , self .total_retries , schema )
304
302
except Exception as e :
305
303
log .error ("File %s failed json validation." , file_name )
306
304
self .file_errors [file_name ] = e
@@ -371,31 +369,27 @@ class SchemaValidator:
371
369
def validate_json (
372
370
cls ,
373
371
file_json : JSON ,
374
- # total_retries: int,
372
+ total_retries : int ,
375
373
schema : Optional [JSON ] = None ,
376
374
) -> None :
377
375
if schema is None :
378
376
try :
379
- # schema = cls._download_schema(file_json["describedBy"], total_retries)
380
- schema = cls ._download_schema (file_json ["describedBy" ])
377
+ schema = cls ._download_schema (file_json ["describedBy" ], total_retries )
381
378
except json .decoder .JSONDecodeError as e :
382
379
schema_url = file_json ["describedBy" ]
383
380
raise Exception ("Failed to parse schema JSON" , schema_url ) from e
384
381
validate (file_json , schema , format_checker = FormatChecker ())
385
382
386
383
@classmethod
387
384
# setting to maxsize=None so as not to evict old values, and maybe help avoid connectivity issues (DI-22)
388
- # Could also have used the dagster.RetryPolicy for this
389
385
@lru_cache (maxsize = None )
390
- # def _download_schema(cls, schema_url: str, total_retries: int) -> JSON:
391
- def _download_schema (cls , schema_url : str ) -> JSON :
386
+ def _download_schema (cls , schema_url : str , total_retries : int ) -> JSON :
392
387
log .debug ("Downloading schema %s" , schema_url )
393
388
394
389
s = requests .Session ()
395
- # log.debug(f"total_retries = {total_retries}")
390
+ log .debug (f"total_retries = { total_retries } " )
396
391
retries = Retry (
397
- # total=total_retries,
398
- total = 12 ,
392
+ total = total_retries ,
399
393
backoff_factor = 0.2 ,
400
394
status_forcelist = [500 , 502 , 503 , 504 ],
401
395
)
0 commit comments