@@ -180,20 +180,19 @@ def init_gb(self, do_init: bool=None):
180
180
initialize_space = self .initialize_space # True or False
181
181
if initialize_space and self .gb_config .gb_type == "NebulaHandler" :
182
182
self .gb .add_hosts ('storaged0' , 9779 )
183
- print ('增加NebulaGraph Storage主机中,等待20秒' )
184
- time .sleep (20 )
183
+ self .waiting_host_initialize ()
185
184
186
185
if self .clear_history_data :
187
186
# 初始化space
188
187
self .gb .drop_space ('client' )
189
188
190
189
self .gb .create_space ('client' )
190
+ self .waiting_space_initialize ()
191
191
192
192
# 创建node tags和edge types
193
193
self .create_gb_tags_and_edgetypes ()
194
+ self .waiting_tags_edgetypes_initialize ()
194
195
195
- print ('Node Tags和Edge Types初始化中,等待20秒......' )
196
- time .sleep (20 )
197
196
else :
198
197
self .gb = None
199
198
@@ -223,7 +222,54 @@ def init_sls(self, do_init: bool=None):
223
222
self .embed_config , vb_config = self .vb_config )
224
223
else :
225
224
self .sls = None
225
+
226
+ def wait_for_condition (self , condition , timeout = 40 , wait_time = 0.5 , log_prefix = '' ):
227
+ max_attempts = int (timeout / wait_time )
228
+
229
+ total_time = 0
230
+
231
+ for _ in range (max_attempts ):
232
+ if condition ():
233
+ logger .info (f'[{ log_prefix } ] initialized successfully, total waiting time: { total_time } s' )
234
+ return
235
+
236
+ logger .info (f'[{ log_prefix } ] have waited for { total_time } s...' )
237
+ total_time += wait_time
238
+ time .sleep (wait_time )
239
+
240
+ raise Exception (f'[{ log_prefix } ] not initialized within { timeout } seconds' )
226
241
242
+ def waiting_host_initialize (self ):
243
+ self .wait_for_condition (
244
+ condition = lambda : any (
245
+ host .get ('Host' ) == 'storaged0' and host .get ('Status' ) == 'ONLINE'
246
+ for host in self .gb .show_hosts ()
247
+ ),
248
+ log_prefix = 'host_initializing'
249
+ )
250
+
251
+ def waiting_tags_edgetypes_initialize (self ):
252
+ '''
253
+ Make sure that tags and edgetypes are created properly
254
+ Refer to create_gb_tags_and_edgetypes for more details
255
+ '''
256
+ self .wait_for_condition (
257
+ condition = lambda : (
258
+ len (self .gb .show_tags ()) == len (TYPE2SCHEMA .items ()) - 1 and
259
+ len (self .gb .show_edge_type ()) == (len (TYPE2SCHEMA .items ()) - 1 ) ** 2 * 3
260
+ ),
261
+ log_prefix = 'tags_edgetypes_initializing'
262
+ )
263
+
264
+ def waiting_space_initialize (self ):
265
+ self .wait_for_condition (
266
+ condition = lambda :any (
267
+ space .get ('Name' ) == 'client'
268
+ for space in self .gb .show_space ()
269
+ ),
270
+ log_prefix = 'space_initializing'
271
+ )
272
+
227
273
def _get_local_graph (
228
274
self , nodes : List [GNode ], edges : List [GEdge ], rootid
229
275
) -> Tuple [List [str ], Graph ]:
0 commit comments