diff --git a/backend/cosmetology-app/lambdas/python/data-events/handlers/encumbrance_events.py b/backend/cosmetology-app/lambdas/python/data-events/handlers/encumbrance_events.py index 966af44fb..e81dd8ed8 100644 --- a/backend/cosmetology-app/lambdas/python/data-events/handlers/encumbrance_events.py +++ b/backend/cosmetology-app/lambdas/python/data-events/handlers/encumbrance_events.py @@ -10,12 +10,10 @@ EncumbranceNotificationTemplateVariables, JurisdictionNotificationMethod, ) -from cc_common.event_batch_writer import EventBatchWriter -from cc_common.event_bus_client import EventBusClient from cc_common.event_state_client import EventType, NotificationTracker, RecipientType from cc_common.exceptions import CCInternalException from cc_common.license_util import LicenseUtility -from cc_common.utils import sqs_handler, sqs_handler_with_notification_tracking +from cc_common.utils import sqs_handler_with_notification_tracking def _get_license_type_name(compact: str, license_type_abbreviation: str) -> str: @@ -213,135 +211,6 @@ def _send_additional_state_notifications( ) -@sqs_handler -def license_encumbrance_listener(message: dict): - """ - Handle license encumbrance events by encumbering associated privileges. - - This handler processes 'license.encumbrance' events and automatically encumbers - all privileges associated with the encumbered license, then publishes privilege - encumbrance events for each affected privilege. - """ - detail_schema = EncumbranceEventDetailSchema() - detail = detail_schema.load(message['detail']) - - compact = detail['compact'] - provider_id = detail['providerId'] - license_jurisdiction = detail['jurisdiction'] - adverse_action_id = detail['adverseActionId'] - license_type_abbreviation = detail['licenseTypeAbbreviation'] - effective_date = detail['effectiveDate'] - - with logger.append_context_keys( - compact=compact, - provider_id=provider_id, - jurisdiction=license_jurisdiction, - license_type_abbreviation=license_type_abbreviation, - effective_date=effective_date, - adverse_action_id=adverse_action_id, - ): - logger.info('Processing license encumbrance event') - - # When a home state license is encumbered, we notify all the other live states in the compact - # Since privileges in those states are automatically granted as an extension of the home state license - # We publish privilege encumbrance events for each live jurisdiction so they are notified - live_jurisdictions = config.live_compact_jurisdictions.get(compact, []) - event_bus_client = EventBusClient() - published_count = 0 - with EventBatchWriter(config.events_client) as event_batch_writer: - for live_jurisdiction in live_jurisdictions: - if live_jurisdiction == license_jurisdiction: - continue - logger.info( - 'Publishing privilege encumbrance event for affected jurisdiction', - privilege_jurisdiction=live_jurisdiction, - ) - event_bus_client.publish_privilege_encumbrance_event( - source='org.compactconnect.data-events', - compact=compact, - provider_id=provider_id, - jurisdiction=live_jurisdiction, # The privilege jurisdiction, not the license jurisdiction - license_type_abbreviation=license_type_abbreviation, - effective_date=effective_date, - event_batch_writer=event_batch_writer, - ) - published_count += 1 - - logger.info( - 'Successfully processed license encumbrance event', - privilege_encumbrance_events_published=published_count, - ) - - -@sqs_handler -def license_encumbrance_lifted_listener(message: dict): - """ - Handle license encumbrance lifting events by publishing privilege encumbrance lifting events. - - This handler processes 'license.encumbranceLifted' events and publishes privilege encumbrance - lifting events for each live jurisdiction in the compact (excluding the home state license - jurisdiction) so those states can be notified. - """ - detail_schema = EncumbranceEventDetailSchema() - detail = detail_schema.load(message['detail']) - - compact = detail['compact'] - provider_id = detail['providerId'] - license_jurisdiction = detail['jurisdiction'] - license_type_abbreviation = detail['licenseTypeAbbreviation'] - effective_date = detail['effectiveDate'] - - with logger.append_context_keys( - compact=compact, - provider_id=provider_id, - jurisdiction=license_jurisdiction, - license_type_abbreviation=license_type_abbreviation, - effective_date=effective_date, - ): - logger.info('Processing license encumbrance lifting event') - - # Get all provider records - provider_user_records = config.data_client.get_provider_user_records( - compact=compact, provider_id=provider_id, consistent_read=True - ) - # Verify the license itself is unencumbered before lifting privilege encumbrances - # A license may still be encumbered by another adverse action that has not been lifted yet. - license_record = provider_user_records.get_specific_license_record( - license_jurisdiction, license_type_abbreviation - ) - if not license_record: - logger.warning('No license record found for the specified jurisdiction and license type') - raise CCInternalException('No license record found for the specified jurisdiction and license type') - - if license_record.encumberedStatus == LicenseEncumberedStatusEnum.ENCUMBERED: - logger.info('License is still encumbered. Not sending privilege encumbrance lift notifications.') - return - - # When a home state license encumbrance is lifted, we notify all the other live states in the compact - # We publish privilege encumbrance lifting events for each live jurisdiction (excluding home state) - live_jurisdictions = config.live_compact_jurisdictions.get(compact, []) - event_bus_client = EventBusClient() - with EventBatchWriter(config.events_client) as event_batch_writer: - for live_jurisdiction in live_jurisdictions: - if live_jurisdiction == license_jurisdiction: - continue - logger.info( - 'Publishing privilege encumbrance lifting event for affected jurisdiction', - privilege_jurisdiction=live_jurisdiction, - ) - event_bus_client.publish_privilege_encumbrance_lifting_event( - source='org.compactconnect.data-events', - compact=compact, - provider_id=provider_id, - jurisdiction=live_jurisdiction, - license_type_abbreviation=license_type_abbreviation, - effective_date=effective_date, - event_batch_writer=event_batch_writer, - ) - - logger.info('Successfully processed license encumbrance lifting event') - - @sqs_handler_with_notification_tracking def privilege_encumbrance_notification_listener(message: dict, tracker: NotificationTracker): """ diff --git a/backend/cosmetology-app/lambdas/python/data-events/tests/function/test_encumbrance_events.py b/backend/cosmetology-app/lambdas/python/data-events/tests/function/test_encumbrance_events.py index 7d30c9c78..f1db85b83 100644 --- a/backend/cosmetology-app/lambdas/python/data-events/tests/function/test_encumbrance_events.py +++ b/backend/cosmetology-app/lambdas/python/data-events/tests/function/test_encumbrance_events.py @@ -52,131 +52,6 @@ def _create_sqs_event(self, message): """Create a proper SQS event structure with the message in the body.""" return {'Records': [{'messageId': str(uuid.uuid4()), 'body': json.dumps(message)}]} - def _when_testing_live_jurisdictions(self): - from cc_common.config import config - from handlers.encumbrance_events import license_encumbrance_listener - - # Mock live jurisdictions to home state + one other so we can assert the non-home one is notified - config.__dict__['live_compact_jurisdictions'] = { - DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky'], - } - try: - message = self._generate_license_encumbrance_message() - event = self._create_sqs_event(message) - - license_encumbrance_listener(event, self.mock_context) - - finally: - config.__dict__.pop('live_compact_jurisdictions', None) - - @patch('cc_common.event_bus_client.EventBusClient._publish_event') - def test_license_encumbrance_listener_publishes_privilege_encumbrance_for_non_home_live_jurisdictions( - self, mock_publish_event - ): - """When a license is encumbered, privilege encumbrance events are published for each live jurisdiction. - - Verifies that the privilege encumbrance notification is sent for the jurisdiction that is not - the home state license jurisdiction (i.e. the other live compact jurisdictions). - """ - self._when_testing_live_jurisdictions() - - # Should have published for each live jurisdiction that isn't the home state license jurisdiction (ky) - self.assertEqual(1, mock_publish_event.call_count) - mock_publish_event.assert_called_once_with( - source='org.compactconnect.data-events', - detail_type='privilege.encumbrance', - detail={ - 'compact': 'cosm', - 'jurisdiction': 'ky', - 'eventTime': '2024-11-08T23:59:59+00:00', - 'providerId': '89a6377e-c3a5-40e5-bca5-317ec854c570', - 'licenseTypeAbbreviation': 'cos', - 'effectiveDate': '2024-11-08', - }, - event_batch_writer=ANY, - ) - - def _run_license_encumbrance_lifted_listener_with_live_jurisdictions( - self, live_jurisdictions, message_overrides=None - ): - """Set live_compact_jurisdictions, run license encumbrance lifted listener, then restore config.""" - from cc_common.config import config - from handlers.encumbrance_events import license_encumbrance_lifted_listener - - config.__dict__['live_compact_jurisdictions'] = live_jurisdictions - try: - message = self._generate_license_encumbrance_lifting_message(message_overrides) - event = self._create_sqs_event(message) - license_encumbrance_lifted_listener(event, self.mock_context) - finally: - config.__dict__.pop('live_compact_jurisdictions', None) - - def _when_testing_live_jurisdictions_license_lifted(self): - """Run license encumbrance lifted listener with live jurisdictions = [home, ky]; call under patch.""" - self._run_license_encumbrance_lifted_listener_with_live_jurisdictions( - {DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']} - ) - - @patch('cc_common.event_bus_client.EventBusClient._publish_event') - def test_license_encumbrance_lifted_listener_publishes_privilege_encumbrance_lifting_for_non_home_jurisdictions( - self, mock_publish_event - ): - """When a license encumbrance is lifted, privilege encumbrance lifting events are published - for non-home live jurisdictions. - - Verifies that the privilege encumbrance lifting notification is sent for the jurisdiction that is not - the home state license jurisdiction. - """ - # Handler fetches provider + license to verify license is unencumbered before publishing - self.test_data_generator.put_default_provider_record_in_provider_table() - self.test_data_generator.put_default_license_record_in_provider_table( - value_overrides={ - 'jurisdiction': DEFAULT_LICENSE_JURISDICTION, - 'licenseType': DEFAULT_LICENSE_TYPE, - 'encumberedStatus': 'unencumbered', - } - ) - self._when_testing_live_jurisdictions_license_lifted() - - # Should have published for each live jurisdiction that isn't the home state license jurisdiction (ky only) - self.assertEqual(1, mock_publish_event.call_count) - mock_publish_event.assert_called_once_with( - source='org.compactconnect.data-events', - detail_type='privilege.encumbranceLifted', - detail={ - 'compact': DEFAULT_COMPACT, - 'jurisdiction': 'ky', - 'eventTime': DEFAULT_DATE_OF_UPDATE_TIMESTAMP, - 'providerId': DEFAULT_PROVIDER_ID, - 'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION, - 'effectiveDate': DEFAULT_EFFECTIVE_DATE, - }, - event_batch_writer=ANY, - ) - - @patch('cc_common.event_bus_client.EventBusClient._publish_event') - def test_license_encumbrance_lifted_listener_does_not_publish_when_license_still_encumbered( - self, mock_publish_event - ): - """When the license is still encumbered (e.g. another adverse action not yet lifted), - no events are published.""" - # Set up provider and license that is still encumbered - self.test_data_generator.put_default_provider_record_in_provider_table() - self.test_data_generator.put_default_license_record_in_provider_table( - value_overrides={ - 'jurisdiction': DEFAULT_LICENSE_JURISDICTION, - 'licenseType': DEFAULT_LICENSE_TYPE, - 'encumberedStatus': 'encumbered', # License still encumbered - } - ) - - # Live jurisdictions would normally cause a publish for 'ky'; we verify we skip all publishes - self._run_license_encumbrance_lifted_listener_with_live_jurisdictions( - {DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']} - ) - - mock_publish_event.assert_not_called() - def _generate_privilege_encumbrance_message(self, message_overrides=None): """Generate a test SQS message for privilege encumbrance events.""" message = { @@ -1098,29 +973,6 @@ def test_license_encumbrance_notification_listeners_handle_no_additional_jurisdi finally: config.__dict__.pop('live_compact_jurisdictions', None) - @patch('cc_common.event_bus_client.EventBusClient._publish_event') - def test_license_encumbrance_lifted_listener_uses_effective_date_from_message(self, mock_publish_event): - """Test that published privilege encumbrance lifting events use the effectiveDate from the message.""" - # Handler fetches provider + license to verify license is unencumbered before publishing - self.test_data_generator.put_default_provider_record_in_provider_table() - self.test_data_generator.put_default_license_record_in_provider_table( - value_overrides={ - 'jurisdiction': DEFAULT_LICENSE_JURISDICTION, - 'licenseType': DEFAULT_LICENSE_TYPE, - 'encumberedStatus': 'unencumbered', - } - ) - self._run_license_encumbrance_lifted_listener_with_live_jurisdictions( - {DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']}, - message_overrides={'effectiveDate': '2024-03-15'}, - ) - - mock_publish_event.assert_called_once() - call_args = mock_publish_event.call_args[1] - self.assertEqual('privilege.encumbranceLifted', call_args['detail_type']) - self.assertEqual('2024-03-15', call_args['detail']['effectiveDate']) - self.assertEqual('ky', call_args['detail']['jurisdiction']) - def _when_testing_privilege_lift_handler_with_encumbered_privilege(self, encumbered_status, mock_state_email): from cc_common.data_model.schema.common import PrivilegeEncumberedStatusEnum from handlers.encumbrance_events import privilege_encumbrance_lifting_notification_listener diff --git a/backend/cosmetology-app/pipeline/backend_stage.py b/backend/cosmetology-app/pipeline/backend_stage.py index 982cd1941..044bc6f44 100644 --- a/backend/cosmetology-app/pipeline/backend_stage.py +++ b/backend/cosmetology-app/pipeline/backend_stage.py @@ -5,7 +5,6 @@ from stacks.api_lambda_stack import ApiLambdaStack from stacks.api_stack import ApiStack from stacks.disaster_recovery_stack import DisasterRecoveryStack -from stacks.event_listener_stack import EventListenerStack from stacks.event_state_stack import EventStateStack from stacks.feature_flag_stack import FeatureFlagStack from stacks.ingest_stack import IngestStack @@ -149,16 +148,6 @@ def __init__( state_auth_stack=self.state_auth_stack, ) - self.event_listener_stack = EventListenerStack( - self, - 'EventListenerStack', - env=environment, - environment_context=environment_context, - standard_tags=standard_tags, - environment_name=environment_name, - persistent_stack=self.persistent_stack, - ) - # Reporting and notifications depend on emails, which depend on having a domain name. If we don't configure # a HostedZone we won't bother with these whole stacks. if self.persistent_stack.hosted_zone: diff --git a/backend/cosmetology-app/stacks/event_listener_stack/__init__.py b/backend/cosmetology-app/stacks/event_listener_stack/__init__.py deleted file mode 100644 index 3219c1a04..000000000 --- a/backend/cosmetology-app/stacks/event_listener_stack/__init__.py +++ /dev/null @@ -1,149 +0,0 @@ -from __future__ import annotations - -import os - -from aws_cdk import Duration -from aws_cdk.aws_events import EventBus -from cdk_nag import NagSuppressions -from common_constructs.stack import AppStack -from constructs import Construct - -from common_constructs.python_function import PythonFunction -from common_constructs.queue_event_listener import QueueEventListener -from common_constructs.ssm_parameter_utility import SSMParameterUtility -from stacks import persistent_stack as ps - - -class EventListenerStack(AppStack): - """ - This stack defines resources that listen for events from the data event bus and perform downstream processing. - - Note: Unlike the NotificationStack, the resources in this stack are _not_ dependent on the presence of a domain - name. This is because the resources in this stack are responsible for listening for events from the data event bus - and performing downstream processing, such as encumbering privileges associated with an encumbered license. The - resources in this stack cannot use SES, since this stack may be present when there is no domain name configured. - """ - - def __init__( - self, - scope: Construct, - construct_id: str, - *, - environment_name: str, - persistent_stack: ps.PersistentStack, - **kwargs, - ): - super().__init__(scope, construct_id, environment_name=environment_name, **kwargs) - data_event_bus = SSMParameterUtility.load_data_event_bus_from_ssm_parameter(self) - # we only pass the API_BASE_URL env var if the API_DOMAIN_NAME is set - # this is because the API_BASE_URL is used by the feature flag client to call the flag check endpoint - if persistent_stack.api_domain_name: - self.common_env_vars.update({'API_BASE_URL': f'https://{persistent_stack.api_domain_name}'}) - - self.event_processors = {} - self._add_license_encumbrance_listener(persistent_stack, data_event_bus) - self._add_lifting_license_encumbrance_listener(persistent_stack, data_event_bus) - - def _add_license_encumbrance_listener(self, persistent_stack: ps.PersistentStack, data_event_bus: EventBus): - """Add the license encumbrance listener lambda, queues, and event rules.""" - # Create the Lambda function handler that listens for license encumbrance events - construct_id_prefix = 'LicenseEncumbranceListener' - license_encumbrance_listener_handler = PythonFunction( - self, - f'{construct_id_prefix}Handler', - description='License Encumbrance Listener Handler', - lambda_dir='data-events', - index=os.path.join('handlers', 'encumbrance_events.py'), - handler='license_encumbrance_listener', - timeout=Duration.minutes(2), - environment={ - 'PROVIDER_TABLE_NAME': persistent_stack.provider_table.table_name, - 'EMAIL_NOTIFICATION_SERVICE_LAMBDA_NAME': persistent_stack.email_notification_service_lambda.function_name, # noqa: E501 line-too-long - 'EVENT_BUS_NAME': data_event_bus.event_bus_name, - 'COMPACT_CONFIGURATION_TABLE_NAME': persistent_stack.compact_configuration_table.table_name, - **self.common_env_vars, - }, - alarm_topic=persistent_stack.alarm_topic, - ) - - # Grant necessary permissions - persistent_stack.provider_table.grant_read_write_data(license_encumbrance_listener_handler) - persistent_stack.compact_configuration_table.grant_read_data(license_encumbrance_listener_handler) - persistent_stack.email_notification_service_lambda.grant_invoke(license_encumbrance_listener_handler) - data_event_bus.grant_put_events_to(license_encumbrance_listener_handler) - - NagSuppressions.add_resource_suppressions_by_path( - self, - f'{license_encumbrance_listener_handler.role.node.path}/DefaultPolicy/Resource', - suppressions=[ - { - 'id': 'AwsSolutions-IAM5', - 'reason': """ - This policy contains wild-carded actions and resources but they are scoped to the - specific actions, KMS key, Table, and Email Service Lambda that this lambda specifically - needs access to. - """, - }, - ], - ) - - self.event_processors[construct_id_prefix] = QueueEventListener( - self, - construct_id=construct_id_prefix, - data_event_bus=data_event_bus, - listener_function=license_encumbrance_listener_handler, - listener_detail_type='license.encumbrance', - encryption_key=persistent_stack.shared_encryption_key, - alarm_topic=persistent_stack.alarm_topic, - ) - - def _add_lifting_license_encumbrance_listener(self, persistent_stack: ps.PersistentStack, data_event_bus: EventBus): - """Add the lifting license encumbrance listener lambda, queues, and event rules.""" - # Create the Lambda function handler that listens for license encumbrance lifting events - construct_id_prefix = 'LiftedLicenseEncumbranceListener' - lifting_license_encumbrance_listener_handler = PythonFunction( - self, - f'{construct_id_prefix}Handler', - description='License Encumbrance Lifted Listener Handler', - lambda_dir='data-events', - index=os.path.join('handlers', 'encumbrance_events.py'), - handler='license_encumbrance_lifted_listener', - timeout=Duration.minutes(2), - environment={ - 'PROVIDER_TABLE_NAME': persistent_stack.provider_table.table_name, - 'EVENT_BUS_NAME': data_event_bus.event_bus_name, - 'COMPACT_CONFIGURATION_TABLE_NAME': persistent_stack.compact_configuration_table.table_name, - **self.common_env_vars, - }, - alarm_topic=persistent_stack.alarm_topic, - ) - - # Grant necessary permissions - persistent_stack.provider_table.grant_read_write_data(lifting_license_encumbrance_listener_handler) - persistent_stack.compact_configuration_table.grant_read_data(lifting_license_encumbrance_listener_handler) - data_event_bus.grant_put_events_to(lifting_license_encumbrance_listener_handler) - - NagSuppressions.add_resource_suppressions_by_path( - self, - f'{lifting_license_encumbrance_listener_handler.role.node.path}/DefaultPolicy/Resource', - suppressions=[ - { - 'id': 'AwsSolutions-IAM5', - 'reason': """ - This policy contains wild-carded actions and resources but they are scoped to the - specific actions, KMS key, Table, and Email Service Lambda that this lambda specifically - needs access to. - """, - }, - ], - ) - - self.event_processors[construct_id_prefix] = QueueEventListener( - self, - construct_id=construct_id_prefix, - data_event_bus=data_event_bus, - listener_function=lifting_license_encumbrance_listener_handler, - listener_detail_type='license.encumbranceLifted', - encryption_key=persistent_stack.shared_encryption_key, - alarm_topic=persistent_stack.alarm_topic, - ) diff --git a/backend/cosmetology-app/tests/app/base.py b/backend/cosmetology-app/tests/app/base.py index 60f9be1e3..505e834c9 100644 --- a/backend/cosmetology-app/tests/app/base.py +++ b/backend/cosmetology-app/tests/app/base.py @@ -516,7 +516,6 @@ def _check_no_backend_stage_annotations(self, stage: BackendStage): self._check_no_stack_annotations(stage.api_lambda_stack) self._check_no_stack_annotations(stage.api_stack) self._check_no_stack_annotations(stage.disaster_recovery_stack) - self._check_no_stack_annotations(stage.event_listener_stack) self._check_no_stack_annotations(stage.feature_flag_stack) self._check_no_stack_annotations(stage.ingest_stack) self._check_no_stack_annotations(stage.managed_login_stack) @@ -556,7 +555,6 @@ def _check_backend_stage_resource_counts(self, stage: BackendStage): ('api_stack', stage.api_stack), ('backup_infrastructure_stack', stage.backup_infrastructure_stack), ('disaster_recovery_stack', stage.disaster_recovery_stack), - ('event_listener_stack', stage.event_listener_stack), ('feature_flag_stack', stage.feature_flag_stack), ('ingest_stack', stage.ingest_stack), ('managed_login_stack', stage.managed_login_stack), diff --git a/backend/cosmetology-app/tests/app/test_event_listener.py b/backend/cosmetology-app/tests/app/test_event_listener.py deleted file mode 100644 index 02ba4c217..000000000 --- a/backend/cosmetology-app/tests/app/test_event_listener.py +++ /dev/null @@ -1,234 +0,0 @@ -import json -from unittest import TestCase - -from aws_cdk.assertions import Template -from aws_cdk.aws_events import CfnRule -from aws_cdk.aws_lambda import CfnEventSourceMapping, CfnFunction -from aws_cdk.aws_sqs import CfnQueue - -from tests.app.base import TstAppABC - - -class TestEventListenerStack(TstAppABC, TestCase): - """ - Test cases for the EventListenerStack to ensure proper resource configuration - for handling license encumbrance events. - """ - - @classmethod - def get_context(cls): - with open('cdk.json') as f: - context = json.load(f)['context'] - with open('cdk.context.sandbox-example.json') as f: - context.update(json.load(f)) - - # Suppresses lambda bundling for tests - context['aws:cdk:bundling-stacks'] = [] - return context - - def test_license_encumbrance_listener_resources_created(self): - """ - Test that the license encumbrance listener lambda is added with a SQS queue - and an event bridge event rule that listens for 'license.encumbrance' detail types. - """ - event_listener_stack = self.app.sandbox_backend_stage.event_listener_stack - event_listener_template = Template.from_stack(event_listener_stack) - - # Verify the lambda function is created - license_encumbrance_handler_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LicenseEncumbranceListener' - ].queue_processor.process_function.node.default_child - ) - license_encumbrance_handler = TestEventListenerStack.get_resource_properties_by_logical_id( - license_encumbrance_handler_logical_id, - resources=event_listener_template.find_resources(CfnFunction.CFN_RESOURCE_TYPE_NAME), - ) - - self.assertEqual( - 'handlers.encumbrance_events.license_encumbrance_listener', license_encumbrance_handler['Handler'] - ) - - # Verify SQS queue is created for the license encumbrance listener - listener_queue_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors['LicenseEncumbranceListener'].queue_processor.queue.node.default_child - ) - license_encumbrance_listener_queue = TestEventListenerStack.get_resource_properties_by_logical_id( - listener_queue_logical_id, resources=event_listener_template.find_resources(CfnQueue.CFN_RESOURCE_TYPE_NAME) - ) - - dlq_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors['LicenseEncumbranceListener'].queue_processor.dlq.node.default_child - ) - - # remove dynamic field - del license_encumbrance_listener_queue['KmsMasterKeyId'] - - self.assertEqual( - { - 'MessageRetentionPeriod': 43200, - 'RedrivePolicy': {'deadLetterTargetArn': {'Fn::GetAtt': [dlq_logical_id, 'Arn']}, 'maxReceiveCount': 3}, - 'VisibilityTimeout': 300, - }, - license_encumbrance_listener_queue, - ) - - # Verify EventBridge rule is created with correct detail type - license_encumbrance_listener_event_bridge_rule = TestEventListenerStack.get_resource_properties_by_logical_id( - event_listener_stack.get_logical_id( - event_listener_stack.event_processors['LicenseEncumbranceListener'].event_rule.node.default_child - ), - resources=event_listener_template.find_resources(CfnRule.CFN_RESOURCE_TYPE_NAME), - ) - - self.assertEqual( - { - 'EventBusName': { - 'Fn::Select': [ - 1, - { - 'Fn::Split': [ - '/', - {'Fn::Select': [5, {'Fn::Split': [':', {'Ref': 'DataEventBusArnParameterParameter'}]}]}, - ] - }, - ] - }, - 'EventPattern': {'detail-type': ['license.encumbrance']}, - 'State': 'ENABLED', - 'Targets': [ - { - 'Arn': {'Fn::GetAtt': [listener_queue_logical_id, 'Arn']}, - 'DeadLetterConfig': {'Arn': {'Fn::GetAtt': [dlq_logical_id, 'Arn']}}, - 'Id': 'Target0', - } - ], - }, - license_encumbrance_listener_event_bridge_rule, - ) - - # Verify event source mapping between SQS queue and Lambda function - event_source_mapping = TestEventListenerStack.get_resource_properties_by_logical_id( - event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LicenseEncumbranceListener' - ].queue_processor.event_source_mapping.node.default_child - ), - resources=event_listener_template.find_resources(CfnEventSourceMapping.CFN_RESOURCE_TYPE_NAME), - ) - self.assertEqual( - { - 'BatchSize': 10, - 'EventSourceArn': {'Fn::GetAtt': [listener_queue_logical_id, 'Arn']}, - 'FunctionName': {'Ref': license_encumbrance_handler_logical_id}, - 'FunctionResponseTypes': ['ReportBatchItemFailures'], - 'MaximumBatchingWindowInSeconds': 15, - }, - event_source_mapping, - ) - - def test_license_encumbrance_lifting_listener_resources_created(self): - """ - Test that the license encumbrance lifting listener lambda is added with a SQS queue - and an event bridge event rule that listens for 'license.encumbranceLifted' detail types. - """ - event_listener_stack = self.app.sandbox_backend_stage.event_listener_stack - event_listener_template = Template.from_stack(event_listener_stack) - - # Verify the lambda function is created - lifting_encumbrance_handler_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LiftedLicenseEncumbranceListener' - ].queue_processor.process_function.node.default_child - ) - lifting_encumbrance_handler = TestEventListenerStack.get_resource_properties_by_logical_id( - lifting_encumbrance_handler_logical_id, - resources=event_listener_template.find_resources(CfnFunction.CFN_RESOURCE_TYPE_NAME), - ) - - self.assertEqual( - 'handlers.encumbrance_events.license_encumbrance_lifted_listener', lifting_encumbrance_handler['Handler'] - ) - - # Verify SQS queue is created for the license encumbrance lifting listener - lifting_listener_queue_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LiftedLicenseEncumbranceListener' - ].queue_processor.queue.node.default_child - ) - lifting_encumbrance_listener_queue = TestEventListenerStack.get_resource_properties_by_logical_id( - lifting_listener_queue_logical_id, - resources=event_listener_template.find_resources(CfnQueue.CFN_RESOURCE_TYPE_NAME), - ) - - dlq_logical_id = event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LiftedLicenseEncumbranceListener' - ].queue_processor.dlq.node.default_child - ) - - # remove dynamic field - del lifting_encumbrance_listener_queue['KmsMasterKeyId'] - - self.assertEqual( - { - 'MessageRetentionPeriod': 43200, - 'RedrivePolicy': {'deadLetterTargetArn': {'Fn::GetAtt': [dlq_logical_id, 'Arn']}, 'maxReceiveCount': 3}, - 'VisibilityTimeout': 300, - }, - lifting_encumbrance_listener_queue, - ) - - # Verify EventBridge rule is created with correct detail type - lifting_encumbrance_listener_event_bridge_rule = TestEventListenerStack.get_resource_properties_by_logical_id( - event_listener_stack.get_logical_id( - event_listener_stack.event_processors['LiftedLicenseEncumbranceListener'].event_rule.node.default_child - ), - resources=event_listener_template.find_resources(CfnRule.CFN_RESOURCE_TYPE_NAME), - ) - - self.assertEqual( - { - 'EventBusName': { - 'Fn::Select': [ - 1, - { - 'Fn::Split': [ - '/', - {'Fn::Select': [5, {'Fn::Split': [':', {'Ref': 'DataEventBusArnParameterParameter'}]}]}, - ] - }, - ] - }, - 'EventPattern': {'detail-type': ['license.encumbranceLifted']}, - 'State': 'ENABLED', - 'Targets': [ - { - 'Arn': {'Fn::GetAtt': [lifting_listener_queue_logical_id, 'Arn']}, - 'DeadLetterConfig': {'Arn': {'Fn::GetAtt': [dlq_logical_id, 'Arn']}}, - 'Id': 'Target0', - } - ], - }, - lifting_encumbrance_listener_event_bridge_rule, - ) - - # Verify event source mapping between SQS queue and Lambda function - event_source_mapping = TestEventListenerStack.get_resource_properties_by_logical_id( - event_listener_stack.get_logical_id( - event_listener_stack.event_processors[ - 'LiftedLicenseEncumbranceListener' - ].queue_processor.event_source_mapping.node.default_child - ), - resources=event_listener_template.find_resources(CfnEventSourceMapping.CFN_RESOURCE_TYPE_NAME), - ) - self.assertEqual( - { - 'BatchSize': 10, - 'EventSourceArn': {'Fn::GetAtt': [lifting_listener_queue_logical_id, 'Arn']}, - 'FunctionName': {'Ref': lifting_encumbrance_handler_logical_id}, - 'FunctionResponseTypes': ['ReportBatchItemFailures'], - 'MaximumBatchingWindowInSeconds': 15, - }, - event_source_mapping, - )