Skip to content

Commit 19d6e22

Browse files
Merge pull request #723 from adorton-adobe/feature/start-end-sync
Start/end sync signals
2 parents 2adbdb9 + d996ae7 commit 19d6e22

File tree

4 files changed

+98
-48
lines changed

4 files changed

+98
-48
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
'ldap3',
5353
'PyYAML',
5454
'six',
55-
'umapi-client>=2.14',
55+
'umapi-client==2.18',
5656
'click',
5757
'click-default-group',
5858
'configparser==3.7.4',

user_sync/connector/umapi.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ def send_commands(self, commands, callback=None):
180180
if action is not None:
181181
action_manager.add_action(action, callback)
182182

183+
def start_sync(self):
184+
"""Send the start sync signal to the connector"""
185+
self.connection.start_sync()
186+
187+
def end_sync(self):
188+
"""Send the end sync signal to the connector"""
189+
self.connection.end_sync()
190+
183191

184192
class Commands(object):
185193
def __init__(self, identity_type=None, email=None, username=None, domain=None):
@@ -195,6 +203,12 @@ def __init__(self, identity_type=None, email=None, username=None, domain=None):
195203
self.domain = domain
196204
self.do_list = []
197205

206+
def __str__(self):
207+
return "Command "+str(self.__dict__)
208+
209+
def __repr__(self):
210+
return "Command "+str(self.__dict__)
211+
198212
def update_user(self, attributes):
199213
"""
200214
:type attributes: dict

user_sync/rules.py

Lines changed: 82 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -190,14 +190,21 @@ def run(self, directory_groups, directory_connector, umapi_connectors):
190190

191191
umapi_stats = JobStats('Push to UMAPI' if self.push_umapi else 'Sync with UMAPI', divider="-")
192192
umapi_stats.log_start(logger)
193+
primary_commands = list()
194+
secondary_command_lists = defaultdict(list)
193195
if directory_connector is not None:
194196
# note: push mode is not supported because if it is, we won't have a list of groups
195197
# that exist in the console. we don't want to attempt to create groups that already exist
196198
if self.options.get('process_groups') and not self.push_umapi and self.options.get('auto_create'):
197199
self.create_umapi_groups(umapi_connectors)
198-
self.sync_umapi_users(umapi_connectors)
200+
primary_commands, secondary_command_lists = self.sync_umapi_users(umapi_connectors)
199201
if self.will_process_strays:
200-
self.process_strays(umapi_connectors)
202+
primary_commands, secondary_command_lists = self.process_strays(primary_commands,
203+
secondary_command_lists, umapi_connectors)
204+
# execute secondary commands first so we can safely handle user deletions (if applicable)
205+
for umapi_name, command_list in secondary_command_lists.items():
206+
self.execute_commands(command_list, umapi_connectors.get_secondary_connectors()[umapi_name])
207+
self.execute_commands(primary_commands, umapi_connectors.get_primary_connector())
201208
umapi_connectors.execute_actions()
202209
umapi_stats.log_end(logger)
203210
self.log_action_summary(umapi_connectors)
@@ -469,58 +476,86 @@ def sync_umapi_users(self, umapi_connectors):
469476
unmatched directory users which we then create on the Adobe side.
470477
:type umapi_connectors: UmapiConnectors
471478
"""
479+
primary_commands = list()
480+
secondary_command_lists = defaultdict(list)
472481
if self.push_umapi:
473482
verb = "Push"
474483
else:
475484
verb = "Sync"
476485
exclude_unmapped_users = self.will_exclude_unmapped_users()
477486
# first sync the primary connector, so the users get created in the primary
478487
if umapi_connectors.get_secondary_connectors():
479-
self.logger.debug('%sing users to primary umapi...', verb)
488+
self.logger.debug('Processing %s users for primary umapi...', verb)
480489
else:
481-
self.logger.debug('%sing users to umapi...', verb)
490+
self.logger.debug('Processing %s users for umapi...', verb)
482491
umapi_info, umapi_connector = self.get_umapi_info(PRIMARY_UMAPI_NAME), umapi_connectors.get_primary_connector()
483492
if self.push_umapi:
484493
primary_adds_by_user_key = umapi_info.get_desired_groups_by_user_key()
485494
else:
486-
primary_adds_by_user_key = self.update_umapi_users_for_connector(umapi_info, umapi_connector)
495+
primary_adds_by_user_key, update_commands = self.update_umapi_users_for_connector(umapi_info, umapi_connector)
496+
primary_commands.extend(update_commands)
487497
# save groups for new users
488498

489499
total_users = len(primary_adds_by_user_key)
490500

491501
user_count = 0
492-
for user_key, groups_to_add in six.iteritems(primary_adds_by_user_key):
502+
for user_key, groups_to_add in primary_adds_by_user_key.items():
493503
user_count += 1
494504
if exclude_unmapped_users and not groups_to_add:
495505
# If user is not part of any group and ignore outcast is enabled. Do not create user.
496506
continue
497507
# We always create every user in the primary umapi, because it's believed to own the directories.
498-
if user_count % 10 == 0:
499-
self.logger.progress(user_count, total_users, 'actions completed')
500508
self.primary_users_created.add(user_key)
501-
self.create_umapi_user(user_key, groups_to_add, umapi_info, umapi_connector)
509+
primary_commands.append(self.create_umapi_user(user_key, groups_to_add, umapi_info, umapi_connector.trusted))
502510

503511
# then sync the secondary connectors
504-
for umapi_name, umapi_connector in six.iteritems(umapi_connectors.get_secondary_connectors()):
512+
for umapi_name, umapi_connector in umapi_connectors.get_secondary_connectors().items():
505513
umapi_info = self.get_umapi_info(umapi_name)
506514
if len(umapi_info.get_mapped_groups()) == 0:
507515
continue
508-
self.logger.debug('%sing users to secondary umapi %s...', verb, umapi_name)
516+
self.logger.debug('Processing %s users for secondary umapi %s...', verb, umapi_name)
509517
if self.push_umapi:
510518
secondary_adds_by_user_key = umapi_info.get_desired_groups_by_user_key()
511519
else:
512-
secondary_adds_by_user_key = self.update_umapi_users_for_connector(umapi_info, umapi_connector)
520+
secondary_adds_by_user_key, update_commands = self.update_umapi_users_for_connector(umapi_info, umapi_connector)
521+
secondary_command_lists[umapi_name].extend(update_commands)
513522
total_users = len(secondary_adds_by_user_key)
514523
for user_key, groups_to_add in six.iteritems(secondary_adds_by_user_key):
515524
# We only create users who have group mappings in the secondary umapi
516525
if groups_to_add:
517-
self.logger.progress(user_count, total_users,
518-
'Adding user to umapi {0} with user key: {1}'.format(umapi_name, user_key))
519526
self.secondary_users_created.add(user_key)
520527
if user_key not in self.primary_users_created:
521528
# We pushed an existing user to a secondary in order to update his groups
522529
self.updated_user_keys.add(user_key)
523-
self.create_umapi_user(user_key, groups_to_add, umapi_info, umapi_connector)
530+
secondary_command_lists[umapi_name].append(self.create_umapi_user(user_key, groups_to_add, umapi_info, umapi_connector.trusted))
531+
return primary_commands, secondary_command_lists
532+
533+
def execute_commands(self, command_list, connector):
534+
# do nothing if we have no commands for this connector
535+
if not command_list:
536+
return
537+
538+
total_users = len(command_list)
539+
540+
if len(command_list) > 10:
541+
command_list, last_command = command_list[0:-1], command_list[-1]
542+
connector.start_sync()
543+
else:
544+
last_command = None
545+
546+
count = 0
547+
for commands in command_list:
548+
connector.send_commands(commands)
549+
count += 1
550+
if count % 10 == 0:
551+
self.logger.progress(count, total_users, 'actions completed')
552+
553+
if last_command is not None:
554+
connector.end_sync()
555+
connector.send_commands(last_command)
556+
count += 1
557+
558+
self.logger.progress(count, total_users, 'actions completed')
524559

525560
def create_umapi_groups(self, umapi_connectors):
526561
"""
@@ -585,7 +620,7 @@ def add_stray(self, umapi_name, user_key, removed_groups=None):
585620
else:
586621
self.stray_key_map[umapi_name][user_key] = removed_groups
587622

588-
def process_strays(self, umapi_connectors):
623+
def process_strays(self, primary_commands, secondary_command_lists, umapi_connectors):
589624
"""
590625
Do the top-level logic for stray processing (output to file or clean them up), enforce limits, etc.
591626
The actual work is done in sub-functions that we call.
@@ -608,9 +643,9 @@ def process_strays(self, umapi_connectors):
608643
self.action_summary['primary_strays_processed'] = 0
609644
return
610645
self.logger.debug("Processing Adobe-only users...")
611-
self.manage_strays(umapi_connectors)
646+
return self.manage_strays(primary_commands, secondary_command_lists, umapi_connectors)
612647

613-
def manage_strays(self, umapi_connectors):
648+
def manage_strays(self, primary_commands, secondary_command_lists, umapi_connectors):
614649
"""
615650
Manage strays. This doesn't require having loaded users from the umapi.
616651
Management of groups, removal of entitlements and removal from umapi are
@@ -629,7 +664,7 @@ def manage_strays(self, umapi_connectors):
629664

630665
# all our processing is controlled by the strays in the primary organization
631666
primary_strays = self.get_stray_keys()
632-
self.action_summary['primary_strays_processed'] = total_strays = len(primary_strays)
667+
self.action_summary['primary_strays_processed'] = len(primary_strays)
633668

634669
# convenience function to get umapi Commands given a user key
635670
def get_commands(key):
@@ -640,7 +675,7 @@ def get_commands(key):
640675
return user_sync.connector.umapi.Commands(identity_type=id_type, username=username, domain=domain)
641676

642677
# do the secondary umapis first, in case we are deleting user accounts from the primary umapi at the end
643-
for umapi_name, umapi_connector in six.iteritems(umapi_connectors.get_secondary_connectors()):
678+
for umapi_name, umapi_connector in umapi_connectors.get_secondary_connectors().items():
644679
secondary_strays = self.get_stray_keys(umapi_name)
645680
for user_key in primary_strays:
646681
if user_key in secondary_strays:
@@ -650,55 +685,55 @@ def get_commands(key):
650685
umapi_name, user_key)
651686
self.post_sync_data.remove_umapi_user_groups(umapi_name, user_key)
652687
commands.remove_all_groups()
688+
secondary_command_lists[umapi_name].append(commands)
653689
elif remove_strays or delete_strays:
654690
self.logger.info('Removing Adobe-only user from %s: %s',
655691
umapi_name, user_key)
656692
self.post_sync_data.remove_umapi_user(umapi_name, user_key)
657693
commands.remove_from_org(False)
694+
secondary_command_lists[umapi_name].append(commands)
658695
elif manage_stray_groups:
659696
groups_to_remove = secondary_strays[user_key]
660697
if groups_to_remove:
661698
self.logger.info('Removing mapped groups in %s from Adobe-only user: %s',
662699
umapi_name, user_key)
663700
self.post_sync_data.update_umapi_data(umapi_name, user_key, [], groups_to_remove)
664701
commands.remove_groups(groups_to_remove)
702+
secondary_command_lists[umapi_name].append(commands)
665703
else:
666704
continue
667705
else:
668706
# haven't done anything, don't send commands
669707
continue
670-
umapi_connector.send_commands(commands)
671-
# make sure the commands for each umapi are executed before moving to the next
672-
umapi_connector.get_action_manager().flush()
673708

674709
# finish with the primary umapi
675710
primary_connector = umapi_connectors.get_primary_connector()
676-
for i, user_key in enumerate(primary_strays):
677-
per = round(100*(float(i)/float(total_strays)),3)
711+
for user_key in primary_strays:
678712
commands = get_commands(user_key)
679713
if disentitle_strays:
680714
self.logger.info('Removing all adobe groups for Adobe-only user: %s', user_key)
681715
self.post_sync_data.remove_umapi_user_groups(None, user_key)
682716
commands.remove_all_groups()
717+
primary_commands.append(commands)
683718
elif remove_strays or delete_strays:
684719
action = "Deleting" if delete_strays else "Removing"
685-
self.logger.info('(%s/%s)(%s%%) %s Adobe-only user: %s', i, total_strays, per, action, user_key)
720+
self.logger.info('%s Adobe-only user: %s', action, user_key)
686721
self.post_sync_data.remove_umapi_user(None, user_key)
687722
commands.remove_from_org(True if delete_strays else False)
723+
primary_commands.append(commands)
688724
elif manage_stray_groups:
689725
groups_to_remove = primary_strays[user_key]
690726
if groups_to_remove:
691727
self.logger.info('Removing mapped groups from Adobe-only user: %s', user_key)
692728
self.post_sync_data.update_umapi_data(None, user_key, [], groups_to_remove)
693729
commands.remove_groups(groups_to_remove)
730+
primary_commands.append(commands)
694731
else:
695732
continue
696733
else:
697734
# haven't done anything, don't send commands
698735
continue
699-
primary_connector.send_commands(commands)
700-
# make sure the actions get sent
701-
primary_connector.get_action_manager().flush()
736+
return primary_commands, secondary_command_lists
702737

703738
@staticmethod
704739
def get_user_attributes(directory_user):
@@ -772,7 +807,7 @@ def create_umapi_commands_for_directory_user(self, directory_user, do_update=Fal
772807
commands.update_user({"email": directory_user['email'], "username": update_username})
773808
return commands
774809

775-
def create_umapi_user(self, user_key, groups_to_add, umapi_info, umapi_connector):
810+
def create_umapi_user(self, user_key, groups_to_add, umapi_info, trusted):
776811
"""
777812
Add the user to the org on the receiving end of the given umapi connector.
778813
If the connector is the primary connector, we ask to update the user's attributes because
@@ -784,23 +819,22 @@ def create_umapi_user(self, user_key, groups_to_add, umapi_info, umapi_connector
784819
:type user_key: str
785820
:type groups_to_add: set
786821
:type umapi_info: UmapiTargetInfo
787-
:type umapi_connector: user_sync.connector.umapi.UmapiConnector
822+
:type trusted: bool
788823
"""
789824
directory_user = self.directory_user_by_user_key[user_key]
790-
commands = self.create_umapi_commands_for_directory_user(directory_user, self.will_update_user_info(umapi_info),
791-
umapi_connector.trusted)
825+
commands = self.create_umapi_commands_for_directory_user(directory_user, self.will_update_user_info(umapi_info), trusted)
792826
if not commands:
793827
return
794828
if self.will_process_groups():
795829
if self.push_umapi:
796830
groups_to_remove = umapi_info.get_mapped_groups() - groups_to_add
797831
commands.remove_groups(groups_to_remove)
798832
commands.add_groups(groups_to_add)
799-
if umapi_connector.trusted:
800-
self.logger.info('Adding user to umapi %s with user key: %s', umapi_connector.name, user_key)
833+
if trusted:
834+
self.logger.info('Queuing new user for umapi %s with user key: %s', umapi_info.name, user_key)
801835
self.secondary_users_created.add(user_key)
802836
else:
803-
self.logger.info('Creating user with user key: %s', user_key)
837+
self.logger.info('Queuing new user with user key: %s', user_key)
804838
self.primary_users_created.add(user_key)
805839
post_sync_user = {
806840
'type': directory_user['identity_type'],
@@ -813,19 +847,17 @@ def create_umapi_user(self, user_key, groups_to_add, umapi_info, umapi_connector
813847
}
814848
self.post_sync_data.update_umapi_data(umapi_info.name, user_key,
815849
groups_to_add if self.will_process_groups() else [], [], **post_sync_user)
816-
umapi_connector.send_commands(commands)
850+
return commands
817851

818-
def update_umapi_user(self, umapi_info, user_key, umapi_connector,
819-
attributes_to_update=None, groups_to_add=None, groups_to_remove=None,
820-
umapi_user=None):
852+
def update_umapi_user(self, umapi_info, user_key, attributes_to_update=None, groups_to_add=None,
853+
groups_to_remove=None, umapi_user=None):
821854
# Note that the user may exist only in the directory, only in the umapi, or both at this point.
822855
# When we are updating an Adobe user who has been removed from the directory, we have to be careful to use
823856
# data from the umapi_user parameter and not try to get information from the directory.
824857
"""
825858
Send the action to update aspects of an adobe user, like info and groups
826859
:type umapi_info: UmapiTargetInfo
827860
:type user_key: str
828-
:type umapi_connector: user_sync.connector.umapi.UmapiConnector
829861
:type attributes_to_update: dict
830862
:type groups_to_add: set(str)
831863
:type groups_to_remove: set(str)
@@ -874,7 +906,7 @@ def update_umapi_user(self, umapi_info, user_key, umapi_connector,
874906
commands.update_user(attributes_to_update)
875907
commands.remove_groups(groups_to_remove)
876908
commands.add_groups(groups_to_add)
877-
umapi_connector.send_commands(commands)
909+
return commands
878910

879911
def update_umapi_users_for_connector(self, umapi_info, umapi_connector):
880912
"""
@@ -888,6 +920,8 @@ def update_umapi_users_for_connector(self, umapi_info, umapi_connector):
888920
:type umapi_connector: user_sync.connector.umapi.UmapiConnector
889921
:rtype: map(string, set)
890922
"""
923+
command_list = []
924+
891925
filtered_directory_user_by_user_key = self.filtered_directory_user_by_user_key
892926

893927
# the way we construct the return value is to start with a map from all directory users
@@ -966,12 +1000,14 @@ def update_umapi_users_for_connector(self, umapi_info, umapi_connector):
9661000
groups_to_remove = (current_groups - desired_groups) & umapi_info.get_mapped_groups()
9671001

9681002
# Finally, execute the attribute and group adjustments
969-
self.update_umapi_user(umapi_info, user_key, umapi_connector,
970-
attribute_differences, groups_to_add, groups_to_remove, umapi_user)
971-
1003+
# if we have nothing to update, omit this user
1004+
if not attribute_differences and not groups_to_add and not groups_to_remove:
1005+
continue
1006+
command_list.append(self.update_umapi_user(umapi_info, user_key, attribute_differences,
1007+
groups_to_add, groups_to_remove, umapi_user))
9721008
# mark the umapi's adobe users as processed and return the remaining ones in the map
9731009
umapi_info.set_umapi_users_loaded()
974-
return user_to_group_map
1010+
return (user_to_group_map, command_list)
9751011

9761012
def map_email_override(self, umapi_user):
9771013
"""

user_sync/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
1919
# SOFTWARE.
2020

21-
__version__ = '2.6.3'
21+
__version__ = '2.6.4'

0 commit comments

Comments
 (0)