Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
EncumbranceNotificationTemplateVariables,
JurisdictionNotificationMethod,
)
from cc_common.event_batch_writer import EventBatchWriter
from cc_common.event_bus_client import EventBusClient
from cc_common.event_state_client import EventType, NotificationTracker, RecipientType
from cc_common.exceptions import CCInternalException
from cc_common.license_util import LicenseUtility
from cc_common.utils import sqs_handler, sqs_handler_with_notification_tracking
from cc_common.utils import sqs_handler_with_notification_tracking


def _get_license_type_name(compact: str, license_type_abbreviation: str) -> str:
Expand Down Expand Up @@ -213,135 +211,6 @@ def _send_additional_state_notifications(
)


@sqs_handler
def license_encumbrance_listener(message: dict):
"""
Handle license encumbrance events by encumbering associated privileges.

This handler processes 'license.encumbrance' events and automatically encumbers
all privileges associated with the encumbered license, then publishes privilege
encumbrance events for each affected privilege.
"""
detail_schema = EncumbranceEventDetailSchema()
detail = detail_schema.load(message['detail'])

compact = detail['compact']
provider_id = detail['providerId']
license_jurisdiction = detail['jurisdiction']
adverse_action_id = detail['adverseActionId']
license_type_abbreviation = detail['licenseTypeAbbreviation']
effective_date = detail['effectiveDate']

with logger.append_context_keys(
compact=compact,
provider_id=provider_id,
jurisdiction=license_jurisdiction,
license_type_abbreviation=license_type_abbreviation,
effective_date=effective_date,
adverse_action_id=adverse_action_id,
):
logger.info('Processing license encumbrance event')

# When a home state license is encumbered, we notify all the other live states in the compact
# Since privileges in those states are automatically granted as an extension of the home state license
# We publish privilege encumbrance events for each live jurisdiction so they are notified
live_jurisdictions = config.live_compact_jurisdictions.get(compact, [])
event_bus_client = EventBusClient()
published_count = 0
with EventBatchWriter(config.events_client) as event_batch_writer:
for live_jurisdiction in live_jurisdictions:
if live_jurisdiction == license_jurisdiction:
continue
logger.info(
'Publishing privilege encumbrance event for affected jurisdiction',
privilege_jurisdiction=live_jurisdiction,
)
event_bus_client.publish_privilege_encumbrance_event(
source='org.compactconnect.data-events',
compact=compact,
provider_id=provider_id,
jurisdiction=live_jurisdiction, # The privilege jurisdiction, not the license jurisdiction
license_type_abbreviation=license_type_abbreviation,
effective_date=effective_date,
event_batch_writer=event_batch_writer,
)
published_count += 1

logger.info(
'Successfully processed license encumbrance event',
privilege_encumbrance_events_published=published_count,
)


@sqs_handler
def license_encumbrance_lifted_listener(message: dict):
"""
Handle license encumbrance lifting events by publishing privilege encumbrance lifting events.

This handler processes 'license.encumbranceLifted' events and publishes privilege encumbrance
lifting events for each live jurisdiction in the compact (excluding the home state license
jurisdiction) so those states can be notified.
"""
detail_schema = EncumbranceEventDetailSchema()
detail = detail_schema.load(message['detail'])

compact = detail['compact']
provider_id = detail['providerId']
license_jurisdiction = detail['jurisdiction']
license_type_abbreviation = detail['licenseTypeAbbreviation']
effective_date = detail['effectiveDate']

with logger.append_context_keys(
compact=compact,
provider_id=provider_id,
jurisdiction=license_jurisdiction,
license_type_abbreviation=license_type_abbreviation,
effective_date=effective_date,
):
logger.info('Processing license encumbrance lifting event')

# Get all provider records
provider_user_records = config.data_client.get_provider_user_records(
compact=compact, provider_id=provider_id, consistent_read=True
)
# Verify the license itself is unencumbered before lifting privilege encumbrances
# A license may still be encumbered by another adverse action that has not been lifted yet.
license_record = provider_user_records.get_specific_license_record(
license_jurisdiction, license_type_abbreviation
)
if not license_record:
logger.warning('No license record found for the specified jurisdiction and license type')
raise CCInternalException('No license record found for the specified jurisdiction and license type')

if license_record.encumberedStatus == LicenseEncumberedStatusEnum.ENCUMBERED:
logger.info('License is still encumbered. Not sending privilege encumbrance lift notifications.')
return

# When a home state license encumbrance is lifted, we notify all the other live states in the compact
# We publish privilege encumbrance lifting events for each live jurisdiction (excluding home state)
live_jurisdictions = config.live_compact_jurisdictions.get(compact, [])
event_bus_client = EventBusClient()
with EventBatchWriter(config.events_client) as event_batch_writer:
for live_jurisdiction in live_jurisdictions:
if live_jurisdiction == license_jurisdiction:
continue
logger.info(
'Publishing privilege encumbrance lifting event for affected jurisdiction',
privilege_jurisdiction=live_jurisdiction,
)
event_bus_client.publish_privilege_encumbrance_lifting_event(
source='org.compactconnect.data-events',
compact=compact,
provider_id=provider_id,
jurisdiction=live_jurisdiction,
license_type_abbreviation=license_type_abbreviation,
effective_date=effective_date,
event_batch_writer=event_batch_writer,
)

logger.info('Successfully processed license encumbrance lifting event')


@sqs_handler_with_notification_tracking
def privilege_encumbrance_notification_listener(message: dict, tracker: NotificationTracker):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,131 +52,6 @@ def _create_sqs_event(self, message):
"""Create a proper SQS event structure with the message in the body."""
return {'Records': [{'messageId': str(uuid.uuid4()), 'body': json.dumps(message)}]}

def _when_testing_live_jurisdictions(self):
from cc_common.config import config
from handlers.encumbrance_events import license_encumbrance_listener

# Mock live jurisdictions to home state + one other so we can assert the non-home one is notified
config.__dict__['live_compact_jurisdictions'] = {
DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky'],
}
try:
message = self._generate_license_encumbrance_message()
event = self._create_sqs_event(message)

license_encumbrance_listener(event, self.mock_context)

finally:
config.__dict__.pop('live_compact_jurisdictions', None)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_listener_publishes_privilege_encumbrance_for_non_home_live_jurisdictions(
self, mock_publish_event
):
"""When a license is encumbered, privilege encumbrance events are published for each live jurisdiction.

Verifies that the privilege encumbrance notification is sent for the jurisdiction that is not
the home state license jurisdiction (i.e. the other live compact jurisdictions).
"""
self._when_testing_live_jurisdictions()

# Should have published for each live jurisdiction that isn't the home state license jurisdiction (ky)
self.assertEqual(1, mock_publish_event.call_count)
mock_publish_event.assert_called_once_with(
source='org.compactconnect.data-events',
detail_type='privilege.encumbrance',
detail={
'compact': 'cosm',
'jurisdiction': 'ky',
'eventTime': '2024-11-08T23:59:59+00:00',
'providerId': '89a6377e-c3a5-40e5-bca5-317ec854c570',
'licenseTypeAbbreviation': 'cos',
'effectiveDate': '2024-11-08',
},
event_batch_writer=ANY,
)

def _run_license_encumbrance_lifted_listener_with_live_jurisdictions(
self, live_jurisdictions, message_overrides=None
):
"""Set live_compact_jurisdictions, run license encumbrance lifted listener, then restore config."""
from cc_common.config import config
from handlers.encumbrance_events import license_encumbrance_lifted_listener

config.__dict__['live_compact_jurisdictions'] = live_jurisdictions
try:
message = self._generate_license_encumbrance_lifting_message(message_overrides)
event = self._create_sqs_event(message)
license_encumbrance_lifted_listener(event, self.mock_context)
finally:
config.__dict__.pop('live_compact_jurisdictions', None)

def _when_testing_live_jurisdictions_license_lifted(self):
"""Run license encumbrance lifted listener with live jurisdictions = [home, ky]; call under patch."""
self._run_license_encumbrance_lifted_listener_with_live_jurisdictions(
{DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']}
)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_lifted_listener_publishes_privilege_encumbrance_lifting_for_non_home_jurisdictions(
self, mock_publish_event
):
"""When a license encumbrance is lifted, privilege encumbrance lifting events are published
for non-home live jurisdictions.

Verifies that the privilege encumbrance lifting notification is sent for the jurisdiction that is not
the home state license jurisdiction.
"""
# Handler fetches provider + license to verify license is unencumbered before publishing
self.test_data_generator.put_default_provider_record_in_provider_table()
self.test_data_generator.put_default_license_record_in_provider_table(
value_overrides={
'jurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseType': DEFAULT_LICENSE_TYPE,
'encumberedStatus': 'unencumbered',
}
)
self._when_testing_live_jurisdictions_license_lifted()

# Should have published for each live jurisdiction that isn't the home state license jurisdiction (ky only)
self.assertEqual(1, mock_publish_event.call_count)
mock_publish_event.assert_called_once_with(
source='org.compactconnect.data-events',
detail_type='privilege.encumbranceLifted',
detail={
'compact': DEFAULT_COMPACT,
'jurisdiction': 'ky',
'eventTime': DEFAULT_DATE_OF_UPDATE_TIMESTAMP,
'providerId': DEFAULT_PROVIDER_ID,
'licenseTypeAbbreviation': DEFAULT_LICENSE_TYPE_ABBREVIATION,
'effectiveDate': DEFAULT_EFFECTIVE_DATE,
},
event_batch_writer=ANY,
)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_lifted_listener_does_not_publish_when_license_still_encumbered(
self, mock_publish_event
):
"""When the license is still encumbered (e.g. another adverse action not yet lifted),
no events are published."""
# Set up provider and license that is still encumbered
self.test_data_generator.put_default_provider_record_in_provider_table()
self.test_data_generator.put_default_license_record_in_provider_table(
value_overrides={
'jurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseType': DEFAULT_LICENSE_TYPE,
'encumberedStatus': 'encumbered', # License still encumbered
}
)

# Live jurisdictions would normally cause a publish for 'ky'; we verify we skip all publishes
self._run_license_encumbrance_lifted_listener_with_live_jurisdictions(
{DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']}
)

mock_publish_event.assert_not_called()

def _generate_privilege_encumbrance_message(self, message_overrides=None):
"""Generate a test SQS message for privilege encumbrance events."""
message = {
Expand Down Expand Up @@ -1098,29 +973,6 @@ def test_license_encumbrance_notification_listeners_handle_no_additional_jurisdi
finally:
config.__dict__.pop('live_compact_jurisdictions', None)

@patch('cc_common.event_bus_client.EventBusClient._publish_event')
def test_license_encumbrance_lifted_listener_uses_effective_date_from_message(self, mock_publish_event):
"""Test that published privilege encumbrance lifting events use the effectiveDate from the message."""
# Handler fetches provider + license to verify license is unencumbered before publishing
self.test_data_generator.put_default_provider_record_in_provider_table()
self.test_data_generator.put_default_license_record_in_provider_table(
value_overrides={
'jurisdiction': DEFAULT_LICENSE_JURISDICTION,
'licenseType': DEFAULT_LICENSE_TYPE,
'encumberedStatus': 'unencumbered',
}
)
self._run_license_encumbrance_lifted_listener_with_live_jurisdictions(
{DEFAULT_COMPACT: [DEFAULT_LICENSE_JURISDICTION, 'ky']},
message_overrides={'effectiveDate': '2024-03-15'},
)

mock_publish_event.assert_called_once()
call_args = mock_publish_event.call_args[1]
self.assertEqual('privilege.encumbranceLifted', call_args['detail_type'])
self.assertEqual('2024-03-15', call_args['detail']['effectiveDate'])
self.assertEqual('ky', call_args['detail']['jurisdiction'])

def _when_testing_privilege_lift_handler_with_encumbered_privilege(self, encumbered_status, mock_state_email):
from cc_common.data_model.schema.common import PrivilegeEncumberedStatusEnum
from handlers.encumbrance_events import privilege_encumbrance_lifting_notification_listener
Expand Down
11 changes: 0 additions & 11 deletions backend/cosmetology-app/pipeline/backend_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from stacks.api_lambda_stack import ApiLambdaStack
from stacks.api_stack import ApiStack
from stacks.disaster_recovery_stack import DisasterRecoveryStack
from stacks.event_listener_stack import EventListenerStack
from stacks.event_state_stack import EventStateStack
from stacks.feature_flag_stack import FeatureFlagStack
from stacks.ingest_stack import IngestStack
Expand Down Expand Up @@ -149,16 +148,6 @@ def __init__(
state_auth_stack=self.state_auth_stack,
)

self.event_listener_stack = EventListenerStack(
self,
'EventListenerStack',
env=environment,
environment_context=environment_context,
standard_tags=standard_tags,
environment_name=environment_name,
persistent_stack=self.persistent_stack,
)

# Reporting and notifications depend on emails, which depend on having a domain name. If we don't configure
# a HostedZone we won't bother with these whole stacks.
if self.persistent_stack.hosted_zone:
Expand Down
Loading
Loading