Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 845026f

Browse files
authored
Merge pull request #70 from pageuppeople-opensource/feature/SP-389-add-retry-policy-when-invoking-lambda
add retry policy when invoking lambda
2 parents d26b318 + e6160fe commit 845026f

File tree

2 files changed

+41
-33
lines changed

2 files changed

+41
-33
lines changed

rdl/data_sources/AWSLambdaDataSource.py

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
77
from rdl.data_sources.SourceTableInfo import SourceTableInfo
8-
from rdl.shared import Providers
8+
from rdl.shared import Providers, Constants
99
from rdl.shared.Utils import prevent_senstive_data_logging
1010

1111

@@ -148,38 +148,45 @@ def __get_data_frame(self, data: [[]], column_names: []):
148148
return pandas.DataFrame(data=data, columns=column_names)
149149

150150
def __invoke_lambda(self, pay_load):
151-
self.logger.debug("\nRequest being sent to Lambda:")
152-
self.logger.debug(pay_load)
153-
154-
lambda_response = self.aws_lambda_client.invoke(
155-
FunctionName=self.connection_data["function"],
156-
InvocationType="RequestResponse",
157-
LogType="None", # |'Tail', Set to Tail to include the execution log in the response
158-
Payload=json.dumps(pay_load).encode(),
159-
)
160-
161-
response_status_code = int(lambda_response["StatusCode"])
162-
response_function_error = lambda_response.get("FunctionError")
163-
self.logger.debug("\nResponse received from Lambda:")
164-
self.logger.debug(f'Response - StatusCode = "{response_status_code}"')
165-
self.logger.debug(f'Response - FunctionError = "{response_function_error}"')
166-
167-
response_payload = json.loads(lambda_response["Payload"].read())
168-
169-
if response_status_code != 200 or response_function_error:
170-
self.logger.error(
171-
f'Error in response from aws lambda {self.connection_data["function"]}'
172-
)
173-
self.logger.error(f"Response - Status Code = {response_status_code}")
174-
self.logger.error(f"Response - Error Function = {response_function_error}")
175-
self.logger.error(f"Response - Error Details:")
176-
# the below is risky as it may contain actual data if this line is reached in case of a successful result
177-
# however, the same Payload field is used to return actual error details in case of real errors
178-
# i.e. StatusCode is 200 (since AWS could invoke the lambda)
179-
# BUT the lambda barfed with an error and therefore the FunctionError would not be None
180-
self.logger.error(response_payload)
181-
raise Exception(
182-
"Error received when invoking AWS Lambda. See logs for further details."
151+
max_attempts = Constants.MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS
152+
response_payload = None
153+
154+
for current_attempt in list(range(1, max_attempts+1, 1)):
155+
self.logger.debug(f"\nRequest being sent to Lambda, attempt {current_attempt} of {max_attempts}:")
156+
self.logger.debug(pay_load)
157+
158+
lambda_response = self.aws_lambda_client.invoke(
159+
FunctionName=self.connection_data["function"],
160+
InvocationType="RequestResponse",
161+
LogType="None", # |'Tail', Set to Tail to include the execution log in the response
162+
Payload=json.dumps(pay_load).encode(),
183163
)
184164

165+
response_status_code = int(lambda_response["StatusCode"])
166+
response_function_error = lambda_response.get("FunctionError")
167+
self.logger.debug(f"\nResponse received from Lambda, attempt {current_attempt} of {max_attempts}:")
168+
self.logger.debug(f'Response - StatusCode = "{response_status_code}"')
169+
self.logger.debug(f'Response - FunctionError = "{response_function_error}"')
170+
171+
response_payload = json.loads(lambda_response["Payload"].read())
172+
173+
if response_status_code != 200 or response_function_error:
174+
self.logger.error(
175+
f'Error in response from aws lambda \'{self.connection_data["function"]}\', '
176+
f'attempt {current_attempt} of {max_attempts}'
177+
)
178+
self.logger.error(f"Response - Status Code = {response_status_code}")
179+
self.logger.error(f"Response - Error Function = {response_function_error}")
180+
self.logger.error(f"Response - Error Details:")
181+
# the below is risky as it may contain actual data if this line is reached in case of success
182+
# however, the same Payload field is used to return actual error details in case of failure
183+
# i.e. StatusCode is 200 (since AWS could invoke the lambda)
184+
# BUT the lambda barfed with an error and therefore the FunctionError would not be None
185+
self.logger.error(response_payload)
186+
187+
if current_attempt >= max_attempts:
188+
raise Exception(
189+
"Error received when invoking AWS Lambda. See logs for further details."
190+
)
191+
185192
return response_payload

rdl/shared/Constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
APP_NAME = "Relational Data Loader"
22
DATA_PIPELINE_EXECUTION_SCHEMA_NAME = "rdl"
3+
MAX_AWS_LAMBDA_INVOKATION_ATTEMPTS = 3
34

45

56
class FullRefreshReason:

0 commit comments

Comments
 (0)