Skip to content

Commit d37da0d

Browse files
authored
Merge pull request #92 from camaze/nebula_optimize_cjn
【CodeFuse】optimize nebula startup time
2 parents fc4817a + 076d208 commit d37da0d

File tree

3 files changed

+82
-8
lines changed

3 files changed

+82
-8
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ zdatafront*
1919
*antgroup*
2020
*ipynb
2121
*log
22-
22+
examples/mysql/db
2323

2424
# frontend
2525
frontend/node_modules

muagent/db_handler/graph_db_handler/nebula_handler.py

+31-3
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,36 @@ def __init__(self,gb_config : GBConfig = None):
6363
self.nb_pw = '' or 'nebula'
6464
self.space_name = "client"
6565
else:
66-
logger.info('NebulaGraph容器启动中,等待20秒')
67-
time.sleep(20)
68-
self.connection_pool.init([(gb_config.extra_kwargs.get("host"), gb_config.extra_kwargs.get("port"))], config)
66+
logger.info('NebulaGraph容器启动中,请等待')
67+
68+
if self.nebula_started(gb_config):
69+
self.connection_pool.init([(gb_config.extra_kwargs.get("host"), gb_config.extra_kwargs.get("port"))], config)
70+
6971
self.username = gb_config.extra_kwargs.get("username")
7072
self.nb_pw = gb_config.extra_kwargs.get("password")
7173
self.space_name = gb_config.extra_kwargs.get("space")
7274

75+
def nebula_started(self, gb_config: GBConfig):
76+
'''
77+
Continuously ping the nebula server until it has started
78+
If it does not start within 40 seconds, raise an exception
79+
'''
80+
# 80 attempts at 0.5 seconds each = 40 seconds
81+
max_attempts = 80
82+
83+
host = gb_config.extra_kwargs.get('host')
84+
port = gb_config.extra_kwargs.get('port')
85+
86+
for attempt in range(max_attempts):
87+
if self.connection_pool.ping([host, port]):
88+
logger.info(f"nebula server is ok, total waiting time: {attempt * 0.5}s")
89+
return True
90+
91+
logger.info(f"ping nebula server.. waiting time: {attempt * 0.5}s")
92+
time.sleep(0.5)
93+
94+
raise Exception('The nebula server did not start within 40 seconds.')
95+
7396
def execute_cypher(self, cypher: str, space_name: str = '',ignore_log: bool = False, format_res: str = 'as_primitive', use_space_name: bool = True):
7497
'''
7598
@param space_name: space_name, if provided, will execute use space_name first
@@ -172,6 +195,11 @@ def show_space(self):
172195
def drop_space(self, space_name):
173196
cypher = f'DROP SPACE {space_name}'
174197
return self.execute_cypher(cypher)
198+
199+
def show_hosts(self):
200+
cypher = 'SHOW HOSTS'
201+
resp = self.execute_cypher(cypher, use_space_name=False)
202+
return resp
175203

176204
def create_tag(self, tag_name: str, prop_dict: dict = {}):
177205
'''

muagent/service/ekg_construct/ekg_construct_base.py

+50-4
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,19 @@ def init_gb(self, do_init: bool=None):
180180
initialize_space = self.initialize_space # True or False
181181
if initialize_space and self.gb_config.gb_type=="NebulaHandler":
182182
self.gb.add_hosts('storaged0', 9779)
183-
print('增加NebulaGraph Storage主机中,等待20秒')
184-
time.sleep(20)
183+
self.waiting_host_initialize()
185184

186185
if self.clear_history_data:
187186
# 初始化space
188187
self.gb.drop_space('client')
189188

190189
self.gb.create_space('client')
190+
self.waiting_space_initialize()
191191

192192
# 创建node tags和edge types
193193
self.create_gb_tags_and_edgetypes()
194+
self.waiting_tags_edgetypes_initialize()
194195

195-
print('Node Tags和Edge Types初始化中,等待20秒......')
196-
time.sleep(20)
197196
else:
198197
self.gb = None
199198

@@ -223,7 +222,54 @@ def init_sls(self, do_init: bool=None):
223222
self.embed_config, vb_config=self.vb_config)
224223
else:
225224
self.sls = None
225+
226+
def wait_for_condition(self, condition, timeout=40, wait_time=0.5, log_prefix=''):
227+
max_attempts = int(timeout / wait_time)
228+
229+
total_time = 0
230+
231+
for _ in range(max_attempts):
232+
if condition():
233+
logger.info(f'[{log_prefix}] initialized successfully, total waiting time: {total_time}s')
234+
return
235+
236+
logger.info(f'[{log_prefix}] have waited for {total_time}s...')
237+
total_time += wait_time
238+
time.sleep(wait_time)
239+
240+
raise Exception(f'[{log_prefix}] not initialized within {timeout} seconds')
226241

242+
def waiting_host_initialize(self):
243+
self.wait_for_condition(
244+
condition=lambda: any(
245+
host.get('Host') == 'storaged0' and host.get('Status') == 'ONLINE'
246+
for host in self.gb.show_hosts()
247+
),
248+
log_prefix='host_initializing'
249+
)
250+
251+
def waiting_tags_edgetypes_initialize(self):
252+
'''
253+
Make sure that tags and edgetypes are created properly
254+
Refer to create_gb_tags_and_edgetypes for more details
255+
'''
256+
self.wait_for_condition(
257+
condition=lambda: (
258+
len(self.gb.show_tags()) == len(TYPE2SCHEMA.items()) - 1 and
259+
len(self.gb.show_edge_type()) == (len(TYPE2SCHEMA.items()) - 1) ** 2 * 3
260+
),
261+
log_prefix='tags_edgetypes_initializing'
262+
)
263+
264+
def waiting_space_initialize(self):
265+
self.wait_for_condition(
266+
condition=lambda:any(
267+
space.get('Name') == 'client'
268+
for space in self.gb.show_space()
269+
),
270+
log_prefix='space_initializing'
271+
)
272+
227273
def _get_local_graph(
228274
self, nodes: List[GNode], edges: List[GEdge], rootid
229275
) -> Tuple[List[str], Graph]:

0 commit comments

Comments
 (0)