1
- import pymongo
2
- import pymysql
3
1
import re
2
+ import time
4
3
import json
5
- from bson .objectid import ObjectId
4
+ import pymongo
5
+ import pymysql
6
+ import traceback
6
7
from datetime import datetime
7
-
8
- # Database credentials
9
- mongo_db_url = "mongodb://localhost:27017/"
10
- mongo_db_name = "stock-market"
11
- mysql_db_host = 'localhost'
12
- mysql_db_user = 'root'
13
- mysql_db_password = 'mysql'
14
- mysql_db_name = 'Stock-Market' # It will be converted to stock_market
8
+ from bson .objectid import ObjectId
9
+ from config import mongo_config , mysql_config , mysql_db_name
15
10
16
11
DEFAULT_VARCHAR_SIZE = 25
17
12
MAX_VARCHAR_LENGTH = 1000
18
13
19
14
def camel_to_snake (name ):
15
+ name = name .replace (' ' , '_' )
20
16
name = re .sub ('(.)([A-Z][a-z]+)' , r'\1_\2' , name )
21
17
return re .sub ('([a-z0-9])([A-Z])' , r'\1_\2' , name ).lower ()
22
18
23
- def isChildKeyPresentInValue (childKey , valueValues ):
24
- for value in valueValues :
25
- if childKey == value :
26
- return True
27
- return False
19
+ def is_child_key_present_in_value (child_key , value_values ):
20
+ return child_key in value_values
21
+
22
+ def convert_document (document , prefix = '' ):
23
+ return {f"{ prefix } _{ key } " if prefix else key : convert_value (value ) for key , value in document .items ()}
24
+
25
+ def enquote (identifier ):
26
+ return f"`{ identifier } `"
28
27
29
28
def type_to_mysql (column_name , py_type , max_length ):
30
- if py_type == 'str' :
31
- varchar_size = min (max (DEFAULT_VARCHAR_SIZE , (max_length // DEFAULT_VARCHAR_SIZE + 1 ) * DEFAULT_VARCHAR_SIZE ), MAX_VARCHAR_LENGTH )
32
- return f'VARCHAR({ varchar_size } )' if varchar_size <= MAX_VARCHAR_LENGTH else 'TEXT'
33
- elif py_type == 'int' :
34
- return 'INT'
35
- elif py_type == 'float' :
36
- return 'FLOAT'
37
- elif py_type == 'bool' :
38
- return 'BOOLEAN'
39
- elif py_type == 'ObjectId' :
40
- return 'VARCHAR(24)'
41
- elif py_type == 'datetime' :
42
- return 'DATETIME'
43
- elif py_type == 'Int64' :
44
- return 'BIGINT'
45
- elif py_type == 'Decimal128' :
46
- return 'DECIMAL(38, 3)'
47
- elif py_type == 'list' :
48
- return 'TEXT' # Lists are serialized as JSON strings
49
- elif py_type == 'bytes' :
50
- return 'BLOB'
51
- elif py_type == 'NoneType' :
52
- return 'TEXT'
53
- elif py_type == 're.Pattern' :
54
- return 'TEXT' # Regular expressions can be stored as text
55
- else :
56
- return 'VARCHAR(255)'
29
+ DEFAULT_VARCHAR_SIZE = 25
30
+ MAX_VARCHAR_LENGTH = 1000
31
+ type_mapping = {
32
+ 'str' : f'VARCHAR({ min (max (DEFAULT_VARCHAR_SIZE , (max_length // DEFAULT_VARCHAR_SIZE + 1 ) * DEFAULT_VARCHAR_SIZE ), MAX_VARCHAR_LENGTH )} )' if max_length <= MAX_VARCHAR_LENGTH else 'TEXT' ,
33
+ 'int' : 'INT' ,
34
+ 'float' : 'FLOAT' ,
35
+ 'bool' : 'BOOLEAN' ,
36
+ 'ObjectId' : 'VARCHAR(24)' ,
37
+ 'datetime' : 'DATETIME' ,
38
+ 'Int64' : 'BIGINT' ,
39
+ 'Decimal128' : 'DECIMAL(38, 3)' ,
40
+ 'list' : 'TEXT' ,
41
+ 'bytes' : 'BLOB' ,
42
+ 'NoneType' : 'TEXT' ,
43
+ 're.Pattern' : 'TEXT' ,
44
+ }
45
+ return type_mapping .get (py_type , 'VARCHAR(255)' )
46
+
57
47
def process_nested_document (doc , prefix = '' ):
58
48
structure = {}
59
- for childKey , value in doc .items ():
49
+ for child_key , value in doc .items ():
60
50
if isinstance (value , dict ):
61
- valueValues = value .values ()
62
- if isChildKeyPresentInValue ( childKey , valueValues ):
63
- valueKeys = value .keys ()
64
- for inner_key in valueKeys :
51
+ value_values = value .values ()
52
+ if is_child_key_present_in_value ( child_key , value_values ):
53
+ value_keys = value .keys ()
54
+ for inner_key in value_keys :
65
55
new_key = f"{ prefix } _{ camel_to_snake (inner_key )} " if prefix else camel_to_snake (inner_key )
66
- innerKeyValue = value [inner_key ]
67
- max_length = len (str (innerKeyValue ))
68
- structure [new_key ] = type_to_mysql (new_key , type (innerKeyValue ).__name__ , max_length )
69
- if isinstance (innerKeyValue , dict ):
70
- structure .update (process_nested_document (value , new_key ))
56
+ inner_key_value = value [inner_key ]
57
+ max_length = len (str (inner_key_value ))
58
+ structure [new_key ] = type_to_mysql (new_key , type (inner_key_value ).__name__ , max_length )
59
+ if isinstance (inner_key_value , dict ):
60
+ structure .update (process_nested_document (inner_key_value , new_key ))
71
61
else :
72
- # add the prefix here
73
- new_key = f"{ prefix } _{ camel_to_snake (childKey )} " if prefix else camel_to_snake (childKey )
62
+ new_key = f"{ prefix } _{ camel_to_snake (child_key )} " if prefix else camel_to_snake (child_key )
74
63
structure .update (process_nested_document (value , new_key ))
75
64
else :
76
65
max_length = len (str (value ))
77
- new_key = f"{ prefix } _{ camel_to_snake (childKey )} " if prefix else camel_to_snake (childKey )
66
+ new_key = f"{ prefix } _{ camel_to_snake (child_key )} " if prefix else camel_to_snake (child_key )
78
67
structure [new_key ] = type_to_mysql (new_key , type (value ).__name__ , max_length )
79
68
return structure
80
69
81
70
def convert_nested_document (doc , prefix = '' ):
82
71
new_document = {}
83
- for childKey , doc in doc .items ():
84
- if isinstance (doc , dict ):
85
- valueValues = doc .values ()
86
- if isChildKeyPresentInValue ( childKey , valueValues ):
87
- valueKeys = doc .keys ()
88
- for inner_key in valueKeys :
72
+ for child_key , doc_value in doc .items ():
73
+ if isinstance (doc_value , dict ):
74
+ value_values = doc_value .values ()
75
+ if is_child_key_present_in_value ( child_key , value_values ):
76
+ value_keys = doc_value .keys ()
77
+ for inner_key in value_keys :
89
78
new_key = f"{ prefix } _{ camel_to_snake (inner_key )} " if prefix else camel_to_snake (inner_key )
90
- innerKeyValue = doc [inner_key ]
91
- if isinstance (innerKeyValue , ObjectId ):
92
- new_document [new_key ] = str (doc )
93
- elif isinstance (innerKeyValue , datetime ):
94
- new_document [new_key ] = doc .strftime ('%Y-%m-%d %H:%M:%S' )
95
- elif isinstance (innerKeyValue , list ):
96
- new_document [new_key ] = json .dumps (doc , default = str )
97
- elif isinstance (innerKeyValue , dict ):
98
- new_document .update (convert_nested_document (innerKeyValue , new_key ))
99
- elif doc is None :
100
- new_document [new_key ] = 'NULL'
101
- elif isinstance (innerKeyValue , bool ):
102
- new_document [new_key ] = 1 if doc else 0
103
- else :
104
- new_document [new_key ] = innerKeyValue
79
+ inner_key_value = doc_value [inner_key ]
80
+ new_document [new_key ] = convert_value (inner_key_value )
81
+ if isinstance (inner_key_value , dict ):
82
+ new_document .update (convert_nested_document (inner_key_value , new_key ))
105
83
else :
106
- new_key = f"{ prefix } _{ camel_to_snake (childKey )} " if prefix else camel_to_snake (childKey )
107
- new_document .update (convert_nested_document (doc , new_key ))
84
+ new_key = f"{ prefix } _{ camel_to_snake (child_key )} " if prefix else camel_to_snake (child_key )
85
+ new_document .update (convert_nested_document (doc_value , new_key ))
108
86
else :
109
- new_key = f"{ prefix } _{ camel_to_snake (childKey )} " if prefix else camel_to_snake (childKey )
110
- if isinstance (doc , ObjectId ):
111
- new_document [new_key ] = str (doc )
112
- elif isinstance (doc , datetime ):
113
- new_document [new_key ] = doc .strftime ('%Y-%m-%d %H:%M:%S' )
114
- elif isinstance (doc , list ):
115
- new_document [new_key ] = json .dumps (doc , default = str )
116
- elif isinstance (doc , dict ):
117
- new_document .update (convert_nested_document (doc , new_key ))
118
- elif doc is None :
119
- new_document [new_key ] = 'NULL'
120
- elif isinstance (doc , bool ):
121
- new_document [new_key ] = 1 if doc else 0
122
- else :
123
- new_document [new_key ] = doc
87
+ new_key = f"{ prefix } _{ camel_to_snake (child_key )} " if prefix else camel_to_snake (child_key )
88
+ new_document [new_key ] = convert_value (doc_value )
124
89
return new_document
125
90
126
- # Convert the document to a MySQL-friendly format
127
- def convert_document (document , prefix = '' ):
128
- new_document = {}
129
- for key , value in document .items ():
130
- new_key = f"{ prefix } _{ key } " if prefix else key
131
- # Handle ObjectId
132
- if isinstance (value , ObjectId ):
133
- new_document [new_key ] = str (value )
134
- # Handle Date
135
- elif isinstance (value , datetime ):
136
- new_document [new_key ] = value .strftime ('%Y-%m-%d %H:%M:%S' )
137
- # Handle Array
138
- elif isinstance (value , list ):
139
- # Serialize the list as a JSON string
140
- new_document [new_key ] = json .dumps (value , default = str )
141
- # Handle Nested Document
142
- elif isinstance (value , dict ):
143
- new_document .update (convert_nested_document (value , new_key ))
144
- # Handle Null
145
- elif value is None :
146
- new_document [new_key ] = 'NULL'
147
- # Handle Boolean
148
- elif isinstance (value , bool ):
149
- new_document [new_key ] = 1 if value else 0
150
- # Handle all other types
151
- else :
152
- new_document [new_key ] = value
153
- return new_document
154
-
155
- def enquote (identifier ):
156
- return f"`{ identifier } `"
91
+ def convert_value (value ):
92
+ if isinstance (value , ObjectId ):
93
+ return str (value )
94
+ elif isinstance (value , datetime ):
95
+ return value .strftime ('%Y-%m-%d %H:%M:%S' )
96
+ elif isinstance (value , list ):
97
+ return json .dumps (value , default = str )
98
+ elif value is None :
99
+ return 'NULL'
100
+ elif isinstance (value , bool ):
101
+ return 1 if value else 0
102
+ else :
103
+ return value
157
104
158
105
def create_mysql_table (mysql_cursor , collection_name , document ):
159
106
collection_name = camel_to_snake (collection_name )
160
- # Determine the maximum length of each field in the document
161
107
max_lengths = {camel_to_snake (key ): len (str (value )) for key , value in document .items () if key not in ['_id' , '_class' ]}
162
-
163
- # Adjust the structure based on the maximum lengths
164
108
structure = {}
165
109
for key , value in document .items ():
166
110
if key not in ['_id' , '_class' ]:
@@ -170,51 +114,48 @@ def create_mysql_table(mysql_cursor, collection_name, document):
170
114
structure [camel_to_snake (key )] = type_to_mysql (camel_to_snake (key ), type (value ).__name__ , max_lengths .get (camel_to_snake (key )))
171
115
172
116
column_definitions = []
173
- # Create the MySQL table based on the adjusted structure
174
117
if "id" not in structure :
175
118
column_definitions .append ("id INT AUTO_INCREMENT PRIMARY KEY" )
176
119
column_definitions .extend ([f'{ enquote (key )} { structure [key ]} ' for key in structure .keys ()])
177
- sql = f"CREATE TABLE { enquote (collection_name )} ({ ', ' .join (column_definitions )} )"
178
- print (sql ,'\n ' ) # Print the SQL statement
120
+ sql = f"CREATE TABLE IF NOT EXISTS { enquote (collection_name )} ({ ', ' .join (column_definitions )} );"
179
121
mysql_cursor .execute (sql )
180
122
123
+
181
124
def insert_into_mysql (mysql_cursor , collection_name , document ):
182
125
collection_name = camel_to_snake (collection_name )
183
- # Remove _id and _class from the document and convert keys to snake_case
184
126
document = {camel_to_snake (key ): value for key , value in document .items () if key not in ['_id' , '_class' ]}
185
- # Convert the document to a MySQL-friendly format
186
127
document = convert_document (document )
187
128
keys = ', ' .join (enquote (key ) for key in document .keys ())
188
129
values = ', ' .join (['%s' for _ in document .values ()])
189
130
sql = f"INSERT INTO { enquote (collection_name )} ({ keys } ) VALUES ({ values } )"
190
- # Convert values to a tuple to use with execute
191
131
values_tuple = tuple (str (value ) for value in document .values ())
192
132
quoted_values_tuple = tuple (f"'{ value } '" if isinstance (value , str ) else value for value in values_tuple )
193
- # Print the SQL statement with actual values
194
133
print (sql % quoted_values_tuple )
195
134
while True :
196
135
try :
197
136
mysql_cursor .execute (sql , values_tuple )
198
- break # Success, exit the loop
137
+ break
199
138
except pymysql .err .OperationalError as e :
200
139
if 'Unknown column' in str (e ):
201
- # Handle multiple missing fields
202
140
missing_fields = re .findall (r"Unknown column '([^']+)'" , str (e ))
203
141
for field in missing_fields :
204
142
field_length = len (str (document [field ]))
205
- mongo_type = type (document [field ]).__name__
143
+ mongo_type = type (document [field ]).__name__
206
144
field_type = type_to_mysql (field , mongo_type , field_length )
207
145
mysql_cursor .execute (f"ALTER TABLE { collection_name } ADD COLUMN { field } { field_type } " )
146
+ elif 'Incorrect datetime value' in str (e ):
147
+ field = re .search (r"column '([^']+)'" , str (e )).group (1 )
148
+ value = re .search (r"'([^']+)'" , str (e )).group (1 )
149
+ value_length = len (value )
150
+ varchar_size = min (max (DEFAULT_VARCHAR_SIZE , (value_length // DEFAULT_VARCHAR_SIZE + 1 ) * DEFAULT_VARCHAR_SIZE ), MAX_VARCHAR_LENGTH )
151
+ mysql_cursor .execute (f"ALTER TABLE { collection_name } MODIFY { field } VARCHAR({ varchar_size } )" )
208
152
else :
209
153
raise
210
154
except pymysql .err .DataError as e :
211
- # If a Data Too Long error occurs, increase the length of the affected field
212
155
if 'Data too long' in str (e ):
213
156
field = re .search (r"'(.+)'" , str (e )).group (1 )
214
- # Get the current data type and length of the field
215
157
mysql_cursor .execute (f"SELECT DATA_TYPE, CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{ collection_name } ' AND COLUMN_NAME = '{ field } '" )
216
158
current_type , current_length = mysql_cursor .fetchone ()
217
- # Decide the new type and length based on the current type and length
218
159
if current_type == 'varchar' :
219
160
new_length = current_length * 2
220
161
if new_length > MAX_VARCHAR_LENGTH :
@@ -227,40 +168,48 @@ def insert_into_mysql(mysql_cursor, collection_name, document):
227
168
new_type = 'LONGTEXT'
228
169
else :
229
170
raise ValueError (f"Cannot increase size of field { field } of type { current_type } " )
230
- # Alter the table to change the type of the field
231
171
mysql_cursor .execute (f"ALTER TABLE { collection_name } MODIFY { field } { new_type } " )
172
+ elif 'Data truncated' in str (e ) or 'Incorrect integer value' in str (e ):
173
+ field = re .search (r"column '([^']+)'" , str (e )).group (1 )
174
+ mysql_cursor .execute (f"ALTER TABLE { collection_name } MODIFY { field } VARCHAR({ DEFAULT_VARCHAR_SIZE } )" )
232
175
else :
233
176
raise
234
177
235
- # Connect to MongoDB
236
- mongo_client = pymongo .MongoClient (mongo_db_url )
237
- mongo_db = mongo_client [mongo_db_name ]
238
-
239
- # Use a context manager to handle the MySQL database connection
240
- with pymysql .connect (host = mysql_db_host , user = mysql_db_user , password = mysql_db_password ) as mysql_conn :
241
- mysql_cursor = mysql_conn .cursor ()
242
- mysql_db_name = mysql_db_name .replace ('-' , '_' )
243
- # Drop the database if it exists
244
- mysql_cursor .execute (f"DROP DATABASE IF EXISTS { mysql_db_name } " )
245
- # Create the database
246
- mysql_cursor .execute (f"CREATE DATABASE { mysql_db_name } " )
247
- # Use the database
248
- mysql_cursor .execute (f"USE { mysql_db_name } " )
249
-
250
- # Iterate over all collections in MongoDB
251
- for collection_name in mongo_db .list_collection_names ():
252
- print ('\n collection_name=' , collection_name )
253
- collection = mongo_db [collection_name ]
254
- # Get the structure of the collection
255
- document = collection .find_one ()
256
- # Create a table in MySQL based on the collection's structure
257
- create_mysql_table (mysql_cursor , collection_name , document )
258
- # Insert data from MongoDB to MySQL
259
- for document in collection .find ():
260
- insert_into_mysql (mysql_cursor , collection_name , document )
261
-
262
- # Commit the transaction
263
- mysql_conn .commit ()
264
-
265
- # Close the MongoDB connection
266
- mongo_client .close ()
178
+ def main ():
179
+ try :
180
+ start_time = time .time () # Record the start time
181
+ # Connect to MongoDB
182
+ with pymongo .MongoClient (mongo_config ['url' ]) as mongo_client :
183
+ mongo_db = mongo_client [mongo_config ['db_name' ]]
184
+
185
+ # Use a context manager to handle the MySQL database connection
186
+ with pymysql .connect (** mysql_config ) as mysql_conn :
187
+ mysql_cursor = mysql_conn .cursor ()
188
+ local_mysql_db_name = mysql_db_name .replace ('-' , '_' ) # Use db_name directly here
189
+ # Drop the database if it exists
190
+ mysql_cursor .execute (f"DROP DATABASE IF EXISTS { local_mysql_db_name } " )
191
+ # Create the database
192
+ mysql_cursor .execute (f"CREATE DATABASE { local_mysql_db_name } " )
193
+ # Use the database
194
+ mysql_cursor .execute (f"USE { local_mysql_db_name } " )
195
+ # Iterate over all collections in MongoDB
196
+ for collection_name in mongo_db .list_collection_names ():
197
+ print ('\n collection_name=' , collection_name )
198
+ collection = mongo_db [collection_name ]
199
+ # Get the structure of the collection
200
+ document = collection .find_one ()
201
+ # Create a table in MySQL based on the collection's structure
202
+ create_mysql_table (mysql_cursor , collection_name , document )
203
+ # Insert data from MongoDB to MySQL
204
+ for document in collection .find ():
205
+ insert_into_mysql (mysql_cursor , collection_name , document )
206
+ # Commit the transaction
207
+ mysql_conn .commit ()
208
+ end_time = time .time () # Record the end time
209
+ total_time = round (end_time - start_time , 2 )
210
+ print (f"\n \n ========= Total time taken to migrate: { total_time } seconds =========" )
211
+ except Exception as e :
212
+ traceback .print_exc (e )
213
+
214
+ if __name__ == "__main__" :
215
+ main ()
0 commit comments