1
+ import os
2
+ import sqlite3
3
+ from datetime import datetime
4
+ from google .oauth2 import service_account
5
+ from google .cloud import bigquery
6
+ from backend .procs .sqlite3 import sources
7
+ from backend .procs .sqlite3 import generate_erd
8
+ from backend .procs .sqlite3 import generate_selected_entities
9
+ from backend .procs .sqlite3 import properties
10
+
11
+ class BigQuery :
12
+ def __init__ (self , ** kwargs ):
13
+ self .todo = []
14
+ self .config = kwargs .get ('turboVaultconfigs' )
15
+ root = os .path .join (os .path .dirname (os .path .abspath (__file__ )).split ('\\ procs\\ sqlite3' )[0 ])
16
+ root = '\\ ' .join (root .split ('\\ ' )[0 :- 1 ]) ## get one step back from the root folder
17
+ self .model_path = self .config .get ('model_path' )
18
+ self .model_path = os .path .join (root , self .model_path .replace ('../' , '' ).replace ('/' , '\\ ' ))
19
+ self .project_id = self .config .get ('project_id' )
20
+ self .credential_path = self .config .get ( 'credential_path' )
21
+ self .metadata_dataset = self .config .get ('metadata_dataset' )
22
+ self .data_structure = {
23
+ 'print2FeedbackConsole' : kwargs .get ('print2FeedbackConsole' ),
24
+ 'console_outputs' : True ,
25
+ 'cursor' : None ,
26
+ 'source' : None ,
27
+ 'generated_timestamp' : datetime .now ().strftime ("%Y%m%d%H%M%S" ),
28
+ 'rdv_default_schema' : self .config .get ("rdv_schema" ),
29
+ 'model_path' : self .model_path ,
30
+ 'hashdiff_naming' : self .config .get ('hashdiff_naming' ),
31
+ 'stage_default_schema' : self .config .get ("stage_schema" ),
32
+ 'source_list' : None ,
33
+ 'generateSources' : False ,
34
+ 'source_name' : None , # "Source" field splits into this field
35
+ 'source_object' : None , # "Source" field splits into this field
36
+ }
37
+
38
+
39
+ def setTODO (self , ** kwargs ):
40
+ self .SourceYML = kwargs .pop ('SourceYML' )
41
+ self .todo = kwargs .pop ('Tasks' )
42
+ self .DBDocs = kwargs .pop ('DBDocs' )
43
+ self .Properties = kwargs .pop ('Properties' )
44
+ self .selectedSources = kwargs .pop ('Sources' )
45
+
46
+ def __initializeInMemoryDatabase (self ):
47
+ os .environ ["GOOGLE_APPLICATION_CREDENTIALS" ] = self .credential_path
48
+ credentials = service_account .Credentials .from_service_account_file (self .credential_path )
49
+
50
+ bigquery_client = bigquery .Client (project = self .project_id ,credentials = credentials )
51
+
52
+ sql_source_data = f"""SELECT * FROM `{ self .metadata_dataset } .source_data`"""
53
+ df_source_data = bigquery_client .query (sql_source_data ).to_dataframe ()
54
+
55
+ sql_hub_entities = f"SELECT * FROM { self .metadata_dataset } .standard_hub"
56
+ df_hub_entities = bigquery_client .query (sql_hub_entities ).to_dataframe ()
57
+
58
+ sql_hub_satellites = f"SELECT * FROM { self .metadata_dataset } .standard_satellite"
59
+ df_hub_satellites = bigquery_client .query (sql_hub_satellites ).to_dataframe ()
60
+
61
+ sql_link_entities = f"SELECT * FROM { self .metadata_dataset } .standard_link"
62
+ df_link_entities = bigquery_client .query (sql_link_entities ).to_dataframe ()
63
+
64
+ sql_pit_entities = f"SELECT * FROM { self .metadata_dataset } .pit"
65
+ df_pit_entities = bigquery_client .query (sql_pit_entities ).to_dataframe ()
66
+
67
+ sql_non_historized_satellite_entities = f"SELECT * FROM { self .metadata_dataset } .non_historized_satellite"
68
+ df_non_historized_satellite_entities = bigquery_client .query (sql_non_historized_satellite_entities ).to_dataframe ()
69
+
70
+ sql_non_historized_link_entities = f"SELECT * FROM { self .metadata_dataset } .non_historized_link"
71
+ df_non_historized_link_entities = bigquery_client .query (sql_non_historized_link_entities ).to_dataframe ()
72
+
73
+ sql_ref_table_entities = f"SELECT * FROM { self .metadata_dataset } .ref_table"
74
+ df_ref_table_entities = bigquery_client .query (sql_ref_table_entities ).to_dataframe ()
75
+
76
+ sql_ref_hub_entities = f"SELECT * FROM { self .metadata_dataset } .ref_hub"
77
+ df_ref_hub_entities = bigquery_client .query (sql_ref_hub_entities ).to_dataframe ()
78
+
79
+ sql_ref_sat_entities = f"SELECT * FROM { self .metadata_dataset } .ref_sat"
80
+ df_ref_sat_entities = bigquery_client .query (sql_ref_sat_entities ).to_dataframe ()
81
+
82
+ sql_multiactiv_satellite_entities = f"SELECT * FROM { self .metadata_dataset } .multiactive_satellite"
83
+ df_multiactiv_satellite_entities = bigquery_client .query (sql_multiactiv_satellite_entities ).to_dataframe ()
84
+
85
+ db = sqlite3 .connect (':memory:' )
86
+ dfs = {
87
+ "source_data" : df_source_data ,
88
+ "standard_hub" : df_hub_entities ,
89
+ "standard_link" : df_link_entities ,
90
+ "standard_satellite" : df_hub_satellites ,
91
+ "pit" : df_pit_entities ,
92
+ "non_historized_satellite" : df_non_historized_satellite_entities ,
93
+ "non_historized_link" : df_non_historized_link_entities ,
94
+ "multiactive_satellite" : df_multiactiv_satellite_entities ,
95
+ "ref_table" : df_ref_table_entities ,
96
+ "ref_hub" : df_ref_hub_entities ,
97
+ "ref_sat" : df_ref_sat_entities
98
+ }
99
+
100
+ for table , df in dfs .items ():
101
+ df .to_sql (table , db )
102
+
103
+ return db .cursor ()
104
+
105
+ def read (self ):
106
+ self .data_structure ['cursor' ]= self .__initializeInMemoryDatabase ()
107
+ self .data_structure ['cursor' ].execute ("SELECT DISTINCT SOURCE_SYSTEM || '_' || SOURCE_OBJECT FROM source_data" )
108
+ results = self .data_structure ['cursor' ].fetchall ()
109
+ source_list = []
110
+ for row in results :
111
+ source_list .append (row [0 ])
112
+ self .data_structure ['source_list' ] = source_list
113
+ self .catchDatabase ()
114
+
115
+ def catchDatabase (self ):
116
+ if os .path .exists ('dump.db' ):
117
+ os .remove ('dump.db' )
118
+ self .data_structure ['cursor' ].execute ("vacuum main into 'dump.db'" )
119
+ self .data_structure ['cursor' ].close ()
120
+
121
+ def reloadDatabase (self ):
122
+ db = sqlite3 .connect ('dump.db' )
123
+ dest = sqlite3 .connect (':memory:' )
124
+ db .backup (dest )
125
+ db .close ()
126
+ os .remove ('dump.db' )
127
+ return dest .cursor ()
128
+
129
+ def run (self ):
130
+ self .data_structure ['cursor' ] = self .reloadDatabase ()
131
+ if self .SourceYML :
132
+ sources .gen_sources (self .data_structure )
133
+ try :
134
+ for self .data_structure ['source' ] in self .selectedSources :
135
+ self .data_structure ['source' ] = self .data_structure ['source' ].replace ('_' ,'_.._' )
136
+ seperatedNameAsList = self .data_structure ['source' ].split ('_.._' )
137
+ self .data_structure ['source_name' ] = seperatedNameAsList [0 ]
138
+ self .data_structure ['source_object' ] = '' .join (seperatedNameAsList [1 :])
139
+ generate_selected_entities .generate_selected_entities (self .todo , self .data_structure )
140
+ if self .Properties :
141
+ properties .gen_properties (self .data_structure )
142
+ self .data_structure ['print2FeedbackConsole' ](message = 'Process successfully executed and models are ready to be used in Datavault 4dbt.' )
143
+ except Exception as e :
144
+ self .data_structure ['print2FeedbackConsole' ](message = 'No sources selected!' )
145
+
146
+ if self .DBDocs :
147
+ generate_erd .generate_erd (self .data_structure ['cursor' ], self .selectedSources , self .data_structure ['generated_timestamp' ],self .data_structure ['model_path' ],self .data_structure ['hashdiff_naming' ])
148
+ self .data_structure ['cursor' ].close ()
0 commit comments