16
16
import os
17
17
import socket
18
18
import getpass
19
+ import time
19
20
import tempfile
20
21
import warnings
21
22
import ipaddress
29
30
from paramiko .client import SSHClient , AutoAddPolicy
30
31
from subprocess import Popen , PIPE
31
32
33
+ from obshell import ClientSet , TaskExecuteFailedError
32
34
from obshell .log import logger
33
35
from obshell .pkg import load_rpm_pcakge , ExtractFile
36
+ from obshell .auth .password import PasswordAuth
37
+ from obshell .model .info import Agentidentity
34
38
35
39
36
40
class TempFileMananger :
@@ -283,41 +287,19 @@ def initialize_nodes(rpm_packages: List[str], force_clean: bool, configs: List[N
283
287
284
288
if force_clean :
285
289
for ssh_client in ssh_clients .values ():
286
- clean_server (ssh_client , ssh_client .config .work_dir )
290
+ _clean_node (ssh_client , ssh_client .config .work_dir )
287
291
else :
288
292
for ssh_client in ssh_clients .values ():
289
293
if not check_remote_dir_empty (ssh_client , ssh_client .config .work_dir ):
290
294
raise Exception (f"{ ssh_client .config .ip } :{ ssh_client .config .work_dir } is not empty, please clean it first" )
291
295
292
296
for rpm_package in rpm_packages :
293
- logger .debug ('load rpm package %s' % rpm_package )
294
- files , links = load_rpm_pcakge (rpm_package )
295
-
296
- write_files_to_servers (ssh_clients , configs , files )
297
-
298
- for link_path , target in links .items ():
299
- for config in configs :
300
- ssh_client = ssh_clients [config .ip ]
301
- dest_path = get_dest_path (config .work_dir , link_path )
302
-
303
- if target .startswith ('./' ):
304
- target_path = get_dest_path (config .work_dir , target )
305
- else :
306
- target_path = target
307
-
308
- dir_path = os .path .dirname (dest_path )
309
- cmd = 'cd %s; ln -sf %s %s' % (dir_path , target_path , dest_path )
310
- logger .debug ('create link %s -> %s' % (dest_path , target_path ))
311
- ret = ssh_client .execute (cmd )
312
-
313
- if not ret :
314
- raise Exception ('Failed to create link %s -> %s: %s' % (dest_path , target_path , ret .stderr ))
297
+ install_package (rpm_package , ssh_clients , configs )
315
298
316
299
for ssh_client in ssh_clients .values ():
317
300
ret = check_observer_version (ssh_client , ssh_client .config .work_dir )
318
301
if not ret :
319
302
raise Exception (f'Check { ssh_client .config .ip } :{ ssh_client .config .work_dir } observer version failed, maybe be oceanbase-ce-libs not installed. Reason: { ret .stderr } ' )
320
-
321
303
except Exception as e :
322
304
raise e
323
305
finally :
@@ -326,13 +308,62 @@ def initialize_nodes(rpm_packages: List[str], force_clean: bool, configs: List[N
326
308
return True
327
309
328
310
311
+ def install_obshell (rpm_package : str , configs : List [NodeConfig ]):
312
+ """
313
+ Installs and configures the OBShell RPM package on the specified list of node configurations.
314
+
315
+ Parameters:
316
+ rpm_package (str): The name of the RPM package to be installed.
317
+ configs (List[NodeConfig]): A list of node configurations, each containing the node's IP and other relevant information.
318
+
319
+ Returns:
320
+ bool: Returns True if the installation is successful, otherwise returns False.
321
+
322
+ Process:
323
+ 1. Creates a dictionary of SSH clients based on the provided list of node configurations.
324
+ 2. Attempts to install the RPM package on each node.
325
+ 3. Ensures all SSH clients are closed if an exception occurs during the installation process.
326
+ """
327
+ ssh_clients = {config .ip : SshClient (config ) for config in configs }
328
+ try :
329
+ return install_package (rpm_package , ssh_clients , configs )
330
+ finally :
331
+ for ssh_client in ssh_clients .values ():
332
+ ssh_client .close ()
333
+
334
+
335
+ def install_package (rpm_package : str , ssh_clients : Dict [str , SshClient ], configs : List [NodeConfig ]):
336
+ logger .debug ('load rpm package %s' % rpm_package )
337
+ files , links = load_rpm_pcakge (rpm_package )
338
+
339
+ write_files_to_servers (ssh_clients , configs , files )
340
+
341
+ for link_path , target in links .items ():
342
+ for config in configs :
343
+ ssh_client = ssh_clients [config .ip ]
344
+ dest_path = get_dest_path (config .work_dir , link_path )
345
+
346
+ if target .startswith ('./' ):
347
+ target_path = get_dest_path (config .work_dir , target )
348
+ else :
349
+ target_path = target
350
+
351
+ dir_path = os .path .dirname (dest_path )
352
+ cmd = 'mkdir -p %s; cd %s; ln -sf %s %s' % (dir_path , dir_path , target_path , dest_path )
353
+ logger .debug ('create link %s -> %s' % (dest_path , target_path ))
354
+ ret = ssh_client .execute (cmd )
355
+ if not ret :
356
+ raise Exception ('Failed to create link %s -> %s: %s' % (dest_path , target_path , ret .stderr ))
357
+ return True
358
+
359
+
329
360
class WriteFilesWorker (object ):
330
361
331
362
def __init__ (self , id , config : NodeConfig , temp_file_manager : TempFileMananger = None ):
332
363
self .id = id
333
364
self .config = config
334
365
self .temp_file_manager = temp_file_manager
335
- self .files = []
366
+ self .files : List [ ExtractFile ] = []
336
367
self .size = 0
337
368
338
369
def add_file (self , file : ExtractFile ):
@@ -379,15 +410,19 @@ def write_files_to_servers(ssh_clients: Dict[str, SshClient] , configs: List[Nod
379
410
for clients in ssh_clients .values ():
380
411
paraller_write_files (clients .config , files )
381
412
382
- # for followers, copy files from primary node
413
+ # Copy installed files from one node to other nodes on the same machine to improve installation efficiency.
383
414
for config in configs :
384
415
client = ssh_clients [config .ip ]
385
416
primary_config = client .config
386
417
if primary_config .work_dir == config .work_dir :
387
418
continue
388
- ret = client .execute ("cp -r %s/* %s" % (primary_config .work_dir , config .work_dir ))
389
- if not ret :
390
- raise Exception ('Failed to copy files from %s to %s: %s' % (primary_config .ip , config .ip , ret .stderr ))
419
+
420
+ for file in files :
421
+ remote_file_path = get_dest_path (primary_config .work_dir , file .path )
422
+ dest_path = get_dest_path (config .work_dir , file .path )
423
+ ret = client .execute ('mkdir -p %s; cp %s %s' % (os .path .dirname (dest_path ), remote_file_path , dest_path ))
424
+ if not ret :
425
+ raise Exception ('Failed to copy files from %s to %s: %s' % (primary_config .ip , config .ip , ret .stderr ))
391
426
return True
392
427
393
428
@@ -439,21 +474,30 @@ def paraller_write_files(config: NodeConfig, files: List[ExtractFile]):
439
474
return True
440
475
441
476
442
- def clean_server (client : SshClient , work_dir : str ):
443
- for file in ["daemon.pid" , "obshell.pid" , "observer.pid" ]:
444
- pid_file = os .path .join (work_dir , 'run' , file )
445
- if not client .execute ('[ -f %s ]' % pid_file ):
446
- continue
447
- ret = client .execute ('kill -9 `cat %s`' % pid_file )
448
- if not ret :
449
- logger .debug ('Failed to kill %s(%s): %s' % (client .config .ip , pid_file , ret .stderr ))
477
+ def _clean_node (client : SshClient , work_dir : str ):
478
+ for file in ["daemon" , "obshell" , "observer" ]:
479
+ _stop_process (client , work_dir , file )
450
480
451
481
ret = client .execute ('rm -fr %s' % work_dir )
452
482
if not ret :
453
483
raise Exception ('Failed to clean %s work dir %s: %s' % (client .config .ip , work_dir , ret .stderr ))
454
484
return True
455
485
456
486
487
+ def _stop_obshell (client : SshClient , work_dir : str ):
488
+ for proc in ["daemon" , "obshell" ]:
489
+ _stop_process (client , work_dir , proc )
490
+
491
+
492
+ def _stop_process (client : SshClient , work_dir : str , process_name : str ):
493
+ pid_file = os .path .join (work_dir , 'run' , f'{ process_name } .pid' )
494
+ if not client .execute ('[ -f %s ]' % pid_file ):
495
+ return
496
+ ret = client .execute ('kill -9 `cat %s`' % pid_file )
497
+ if not ret :
498
+ logger .debug ('Failed to kill %s(%s): %s' % (client .config .ip , pid_file , ret .stderr ))
499
+
500
+
457
501
def get_dest_path (work_dir : str , file_path : str ) -> str :
458
502
if file_path .startswith ('./home/admin/oceanbase' ):
459
503
file_path = file_path [23 :]
@@ -492,3 +536,77 @@ def start_obshell(configs: List[NodeConfig]):
492
536
ssh_client .close ()
493
537
logger .debug ('start obshell servers success' )
494
538
return True
539
+
540
+
541
+ def _start_obshell (client : SshClient , work_dir : str , ip : str , obshell_port : int , password : str = None ):
542
+ logger .debug ('start obshell %s:%s' % (ip , obshell_port ))
543
+ cmd = '%s/bin/obshell admin start --ip %s --port %s' % (work_dir , ip , obshell_port )
544
+ if password is not None :
545
+ cmd = "export OB_ROOT_PASSWORD=%s; %s" % (password , cmd )
546
+ return client .execute (cmd )
547
+
548
+
549
+ def takeover (password , configs : List [NodeConfig ]):
550
+ """
551
+ Takes over the observer nodes using the provided password and node configurations.
552
+
553
+ Parameters:
554
+ password (str): The password to authenticate with the observer nodes.
555
+ configs (List[NodeConfig]): A list of node configurations, each containing the node's IP, work directory, OBShell port, and other relevant information.
556
+
557
+ Returns:
558
+ bool: Returns True if the takeover is successful, otherwise raises an exception.
559
+
560
+ Process:
561
+ 1. Attempts to connect to each SSH client.
562
+ 2. Iterates over the node configurations and starts the OBShell on each node using the provided password and configuration details.
563
+ 3. If starting the OBShell fails on any node, raises an exception with the node's IP and the error message.
564
+ 4. Ensures all SSH clients are closed after the takeover process, regardless of whether it was successful or not.
565
+ """
566
+ logger .debug ('takeover observer...' )
567
+ ssh_clients = {config .ip : SshClient (config ) for config in configs }
568
+ try :
569
+ for ssh_client in ssh_clients .values ():
570
+ ssh_client .connect ()
571
+
572
+ # stop obshell
573
+ for config in configs :
574
+ _stop_obshell (ssh_clients [config .ip ], config .work_dir )
575
+
576
+ for config in configs :
577
+ ret = _start_obshell (ssh_clients [config .ip ], config .work_dir , config .ip , config .obshell_port , password )
578
+ if not ret :
579
+ raise Exception ('Failed to takeover %s observer: %s' % (config .ip , ret .stderr ))
580
+
581
+ times = 60
582
+ while times :
583
+ try :
584
+ time .sleep (10 )
585
+ times -= 1
586
+ count = 0
587
+ for config in configs :
588
+ client = ClientSet (config .ip , config .obshell_port , auth = PasswordAuth (password ))
589
+ info = client .v1 .get_status ()
590
+ if info .agent .identity == Agentidentity .TAKE_OVER_MASTER .value :
591
+ dag = client .v1 .get_agent_last_maintenance_dag ()
592
+ logger .debug ('find takeover observer dag %s, wait...' % dag .generic_id )
593
+ client .v1 .wait_dag_succeed (dag .generic_id )
594
+ count = len (configs )
595
+ break
596
+ elif info .agent .identity == Agentidentity .CLUSTER_AGENT .value :
597
+ count += 1
598
+ if count == len (configs ):
599
+ logger .debug ('takeover observer success' )
600
+ return True
601
+ except TaskExecuteFailedError as e :
602
+ logger .debug ('takeover observer failed: %s, retry...' % e )
603
+ raise e
604
+ except Exception as e :
605
+ if times :
606
+ logger .debug ('takeover observer failed: %s, retry...' % e )
607
+ continue
608
+ else :
609
+ raise e
610
+ finally :
611
+ for ssh_client in ssh_clients .values ():
612
+ ssh_client .close ()
0 commit comments