@@ -24,3 +24,52 @@ def postgresql_sync(db_ip, db_port, db_name, db_username, db_password, table_nam
24
24
print ("Reading data from PostgreSQL table..." )
25
25
df = spark .read .jdbc (url = jdbc_url , table = table_name , properties = properties )
26
26
df .show ()
27
+
28
+ # Task 3: Query and save data
29
+ metadata_path = os .path .join (os .path .dirname (path ), "metadata" )
30
+ os .makedirs (metadata_path , exist_ok = True )
31
+ offset_file = os .path .join (metadata_path , "offset" )
32
+ schema_file = os .path .join (metadata_path , "schema" )
33
+
34
+ if os .path .exists (offset_file ):
35
+ # Read offset file
36
+ with open (offset_file , "r" ) as f :
37
+ max_id = int (f .read ().strip ())
38
+ print (f"Found offset: { max_id } . Querying rows with { unique_id } > { max_id } " )
39
+ filtered_df = df .filter (f"{ unique_id } > { max_id } " )
40
+ write_mode = "append"
41
+
42
+ # Check schema consistency
43
+ if os .path .exists (path ):
44
+ previous_schema_path = os .path .join (metadata_path , "schema" )
45
+ if os .path .exists (previous_schema_path ):
46
+ with open (previous_schema_path , "r" ) as f :
47
+ previous_schema = json .loads (f .read ())
48
+ current_schema = json .loads (filtered_df .schema .json ())
49
+ if previous_schema != current_schema :
50
+ print ("ERROR: Schema mismatch detected. Exiting without writing data." )
51
+ spark .stop ()
52
+ sys .exit (1 )
53
+ else :
54
+ print ("Schema matches with previously written data. Proceeding with writing." )
55
+ else :
56
+ print ("No offset file found. Querying all data." )
57
+ filtered_df = df
58
+ write_mode = "overwrite"
59
+
60
+ # Save data to path
61
+ filtered_df .write .mode (write_mode ).parquet (path )
62
+ print (f"Data saved to { path } in { write_mode } mode." )
63
+
64
+ # Update offset and schema
65
+ max_processed_id = filtered_df .agg ({unique_id : "max" }).collect ()[0 ][0 ]
66
+ if max_processed_id is not None :
67
+ with open (offset_file , "w" ) as f :
68
+ f .write (str (max_processed_id ))
69
+ print (f"Updated offset file with max ID: { max_processed_id } " )
70
+
71
+ with open (schema_file , "w" ) as f :
72
+ f .write (filtered_df .schema .json ())
73
+ print (f"Schema saved to { schema_file } ." )
74
+
75
+ spark .stop ()
0 commit comments