Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 126 additions & 45 deletions modules/fortigate/fgt_asg/fgt-asg-lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,51 @@ def main(self, detail_type):
self.logger.info(f"Do interface config")

if detail_type == "EC2 Instance-launch Lifecycle Action":
self.do_launch()
success = self.do_launch()
return success
elif detail_type == "EC2 Instance-terminate Lifecycle Action":
self.do_terminate()
return False #do nothing
if detail_type == "EC2 Instance Launch Successful":
self.save_intf()
return
return False #do nothing
else:
self.logger.info(f"Can not identify detail-type: {detail_type}")
return
self.logger.info(f"Can not identify detail-type Network stage: {detail_type}")
return False #do nothing

def do_launch(self):
self.logger.info(f"Do launch fgt vm instance: {self.fgt_vm_id}")
intf_setting = json.loads(os.getenv('network_interfaces'))
fgt_az = self.instance_detail['Reservations'][0]['Instances'][0]['Placement']['AvailabilityZone']
for intf_name, intf_conf in intf_setting.items():
# Ignore if the interface already exist
# 1. Ignore if the interface already exist
if str(intf_conf["device_index"]) in self.fgt_vm_intfs:
self.logger.info("Ignoring the interface already exist") # #this is where we go
continue
# Create interface
# 2. Create interface
cur_intf_id = self.create_interface(intf_name, intf_conf, fgt_az)
if cur_intf_id == None:
continue
# Attach the interface to FortiGate VM instance
self.logger.info("Create interface")
if cur_intf_id == None: #None means it has an error during create_interface function
return False
# 3. Attach the interface to FortiGate VM instance
attach_id = self.attach_intf(cur_intf_id, intf_conf["device_index"])
if attach_id == None:
self.delete_interface(cur_intf_id)
continue
self.logger.info("Attach the interface to FortiGate VM instance")
if attach_id == None:
self.delete_interface(cur_intf_id) #Leave the result for delete_interface for now, Xing said we will work on this in the future.
return False
continue
# If 1, 2 and 3 is False - set delete on termination
self.set_delete_on_termination(cur_intf_id, attach_id)
# Create and associate Public IP if needed
self.logger.info("Create and associate Public IP if needed")
if "enable_public_ip" in intf_conf and intf_conf["enable_public_ip"] :
self.associate_pub_ip(cur_intf_id, intf_conf)



self.logger.info (f"Loop finished without error succesfully configured network interface for {self.fgt_vm_id}")
return True

def do_terminate(self):
self.logger.info(f"Do terminate fgt vm instance: {self.fgt_vm_id}")
intf_track_dict = self.get_intf()
Expand Down Expand Up @@ -482,13 +495,15 @@ def set_vm_id(self, vm_id):
self.fgt_vm_id = vm_id

def main(self, detail_type):
if detail_type == "EC2 Instance Launch Successful":
self.do_launch()
if detail_type == "EC2 Instance-launch Lifecycle Action":
b_succ = self.do_launch()
return b_succ
elif detail_type == "EC2 Instance-terminate Lifecycle Action":
self.do_terminate()
return False #do nothing
else:
self.logger.debug(f"Can not identify detail-type: {detail_type}")
return
self.logger.debug(f"Can not identify detail-type FGT conf stage: {detail_type}")
return False #do nothing

def do_launch(self):
self.logger.info("Do launch event.")
Expand All @@ -505,12 +520,12 @@ def do_launch(self):
fgt_private_ip = self.get_private_ip(instance_detail['Reservations'][0]['Instances'][0])
if not fgt_private_ip:
self.logger.error("Can not find private IP.")
return
return False
# Change password
b_succ = self.change_password(fgt_private_ip, self.fgt_vm_id)
if not b_succ:
self.logger.error(f"Change password failed.")
return
return False

# Update in Dynamo DB
self.add_asg_instance_dydb(self.fgt_vm_id)
Expand All @@ -521,7 +536,7 @@ def do_launch(self):
b_succ = self.update_all_sn_list()
b_succ = self.upload_license(fgt_private_ip, self.fgt_vm_id)
if not b_succ:
return
return False

# Configure the FortiGate instance
if self.fgt_lic_mgmt != "fmg":
Expand All @@ -531,7 +546,9 @@ def do_launch(self):
self.fgt_primary_ip, self.fgt_primary_port = self.get_primary_ip(instance_detail['Reservations'][0]['Instances'][0])

config_content = self.gen_config_content(self.fgt_vm_id)
b_succ = self.upload_config(config_content, fgt_private_ip)
b_succ = self.upload_config(config_content, fgt_private_ip) #b_succ is either True or False

return b_succ


def do_terminate(self):
Expand Down Expand Up @@ -2048,47 +2065,111 @@ def clean_terminated_vms(logger, intf_object, fgtconf_object):
fgtconf_object.set_vm_id(vm_id)
fgtconf_object.do_terminate()

def complete_lifecycle(logger, event_detail):
def complete_lifecycle(logger, event_detail, result="ABANDON"):
logger.info("Complete lifecycle action.")
asg_client = boto3.client('autoscaling')
try:
asg_client.complete_lifecycle_action(
LifecycleHookName=event_detail.get('LifecycleHookName', ""),
AutoScalingGroupName=event_detail.get('AutoScalingGroupName', ""),
LifecycleActionToken=event_detail.get('LifecycleActionToken', ""),
LifecycleActionResult='CONTINUE'
LifecycleActionResult=result
)
except ClientError as e:
logger.error(f"Error completing life cycle hook for instance: {e.response['Error']['Code']}")

def lambda_handler(event, context):
logger = logging.getLogger("fgt_asg_lambda")
logger.setLevel(logging.INFO)
event_detail = event["detail"]
fgt_vm_id = event["detail"]["EC2InstanceId"]
detail_type = event["detail-type"]

# Initiate objects
intf_object = NetworkInterface(logger)
fgtconf_object = FgtConf(logger)
try:
logger = logging.getLogger("fgt_asg_lambda")
logger.setLevel(logging.INFO)

# --- ADDED: Log incoming event for debugging lifecycle hook routing ---
# Key fields from EventBridge event:
# event["detail-type"] - Event type (e.g., "EC2 Instance-launch Lifecycle Action")
# event["detail"]["EC2InstanceId"] - Instance being acted upon
# event["detail"]["AutoScalingGroupName"] - ASG that triggered the event
# event["detail"]["LifecycleHookName"] - Name of the lifecycle hook (used for routing)
# event["detail"]["LifecycleActionToken"] - Token for completing the hook (only in lifecycle events)
logger.info("=" * 60)
logger.info("fgt-asg-lambda invoked")
logger.info(f"Event detail-type: {event.get('detail-type', 'N/A')}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate log with the following logs.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Lix, the logs were added by my colleague to see what is happening.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the detail-type was logged again in the following log.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see what you mean :) I have remove the duplicate one thank you!

logger.info(f"Full event: {json.dumps(event, default=str)}")
logger.info("=" * 60)
# --- END ADDED ---

event_detail = event["detail"]
fgt_vm_id = event["detail"]["EC2InstanceId"]
detail_type = event["detail-type"]

# Log key fields extracted from event
logger.info(f"Processing event:")
logger.info(f" EC2InstanceId: {fgt_vm_id}")
logger.info(f" AutoScalingGroupName: {event_detail.get('AutoScalingGroupName', 'N/A')}")
logger.info(f" LifecycleHookName: {event_detail.get('LifecycleHookName', 'N/A')}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LifecycleHookName used multiple times. Could create a variable for it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hello @lix-fortinet I have modified this part. changes:

move hook_name variable above managed_hooks variable

        # create variable for hook_name
        hook_name = event_detail.get('LifecycleHookName', '')
        # create variable for managed_hooks
        managed_hooks = ['fgt_asg_launch_hook', 'fgt_asg_terminate_hook']

remove the hook_name on the complete lifecycle action part as we already have defined it.

        if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance-terminate Lifecycle Action"]:
            logger.info(f"Completing lifecycle hook: {hook_name}")
            complete_lifecycle(logger, event_detail,result="CONTINUE" )

lifecycle_token = event_detail.get('LifecycleActionToken', '')
logger.info(f" LifecycleActionToken: {lifecycle_token[:20] + '...' if lifecycle_token else 'N/A'}")

# --- EARLY EXIT: Skip unmanaged lifecycle hooks ---
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this early exit? What other hooks does it have besides managed hooks? Also, the hook name may contain a prefix.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Lix, On our environment we added some hooks that will check threat feeds. And for some reason, this hook also receives the event from threat feeds hook. We added this so that it will not process Life cycle action for threat feeds hook.

Copy link
Copy Markdown
Author

@cathyrox cathyrox May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the suggestion I will check with my colleague as he is the one that modifies it :)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. You can check main.tf line 141.

# EventBridge routes ALL lifecycle events for this ASG to this Lambda
# because the rule filters by AutoScalingGroupName only, not LifecycleHookName.
# Exit early for hooks not managed by this Lambda to avoid unnecessary processing.

# create variable for hook_name
hook_name = event_detail.get('LifecycleHookName', '')
# create variable for managed_hooks
managed_hooks = ['fgt_asg_launch_hook', 'fgt_asg_terminate_hook']
if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance-terminate Lifecycle Action"]:
if hook_name not in managed_hooks:
logger.info(f"Skipping unmanaged lifecycle hook: {hook_name} - exiting early")
return {}
logger.info(f"Processing managed lifecycle hook: {hook_name}")
# --- END EARLY EXIT ---

# Initiate objects
intf_object = NetworkInterface(logger)
fgtconf_object = FgtConf(logger)

# If detail_type is launch related, check and clean the VMs before main operation
if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance Launch Successful"]:
clean_terminated_vms(logger, intf_object, fgtconf_object)

## Network Interface operations
intf_object.set_vm_id(fgt_vm_id)
success_network = intf_object.main(detail_type)
logger.info (f"Printing NetworkInterface operations success {success_network}")

# If detail_type is launch related, check and clean the VMs before main operation
if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance Launch Successful"]:
clean_terminated_vms(logger, intf_object, fgtconf_object)
if detail_type == "EC2 Instance-launch Lifecycle Action":
if not success_network:
logger.info(f"Configuring NetworkInterface {fgt_vm_id} failed - abandoning instance")
complete_lifecycle(logger, event_detail, result="ABANDON")
return {}

## Network Interface operations
intf_object.set_vm_id(fgt_vm_id)
intf_object.main(detail_type)
## FortiGate configuration operations
fgtconf_object.set_vm_id(fgt_vm_id)
success_fgtconf = fgtconf_object.main(detail_type)
logger.info (f"Printing FGTConf operations success {success_fgtconf}")

## FortiGate configuration operations
fgtconf_object.set_vm_id(fgt_vm_id)
fgtconf_object.main(detail_type)
if detail_type == "EC2 Instance-launch Lifecycle Action":
if not success_fgtconf:
logger.info(f"Configuring FortiGate {fgt_vm_id} failed - abandoning instance")
complete_lifecycle(logger, event_detail, result="ABANDON")
return {}


# If detail_type is terminate related, check and clean the VMs after main operation
if detail_type == "EC2 Instance-terminate Lifecycle Action":
clean_terminated_vms(logger, intf_object, fgtconf_object)
if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance-terminate Lifecycle Action"]:
complete_lifecycle(logger, event_detail)
return {}
# If detail_type is terminate related, check and clean the VMs after main operation
if detail_type == "EC2 Instance-terminate Lifecycle Action":
clean_terminated_vms(logger, intf_object, fgtconf_object)

# Complete lifecycle hook for managed hooks
# (We already verified this is a managed hook via the early exit check above)
if detail_type in ["EC2 Instance-launch Lifecycle Action", "EC2 Instance-terminate Lifecycle Action"]:
logger.info(f"Completing lifecycle hook: {hook_name}")
complete_lifecycle(logger, event_detail,result="CONTINUE" )

return {}

except Exception as e:
logger.error(f"Encountered an error cannot complete fgt_asg_lambda lifecycle hook {e}")
complete_lifecycle(logger, event_detail,result="ABANDON" )
return {}