-
Notifications
You must be signed in to change notification settings - Fork 4
Feature/accept mtls keys instead of paths #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/move-mtls-settings-to-presets
Are you sure you want to change the base?
Changes from all commits
38af12d
6252b88
734dd6c
987c597
581a267
1cfed44
59597ce
b807d85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| class DKUConstants(object): | ||
| API_RESPONSE_KEY = "api_response" | ||
| FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token"] | ||
| FORBIDDEN_KEYS = ["token", "password", "api_key_value", "secure_token", "mtls_key_path", "mtls_certificate_path"] | ||
| FORM_DATA_BODY_FORMAT = "FORM_DATA" | ||
| PLUGIN_VERSION = "1.2.7" | ||
| PLUGIN_VERSION = "1.2.7-beta.5" | ||
| RAW_BODY_FORMAT = "RAW" | ||
| REPONSE_ERROR_KEY = "dku_error" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import requests | ||
| import time | ||
| import copy | ||
| import tempfile | ||
| from pagination import Pagination | ||
| from safe_logger import SafeLogger | ||
| from loop_detector import LoopDetector | ||
|
|
@@ -184,14 +185,35 @@ def request(self, method, url, can_raise_exeption=True, **kwargs): | |
| def request_with_redirect_retry(self, method, url, **kwargs): | ||
| # In case of redirection to another domain, the authorization header is not kept | ||
| # If redirect_auth_header is true, another attempt is made with initial headers to the redirected url | ||
| response = self.session.request(method, url, **kwargs) | ||
| response = self.request_with_cert(method, url, **kwargs) | ||
| if self.redirect_auth_header and not response.url.startswith(url): | ||
| redirection_kwargs = copy.deepcopy(kwargs) | ||
| redirection_kwargs.pop("params", None) # params are contained in the redirected url | ||
| logger.warning("Redirection ! Accessing endpoint {} with initial authorization headers".format(response.url)) | ||
| response = self.session.request(method, response.url, **redirection_kwargs) | ||
| response = self.request_with_cert(method, response.url, **redirection_kwargs) | ||
| return response | ||
|
|
||
| def request_with_cert(self, method, url, **kwargs): | ||
| cert = kwargs.get("cert", None) | ||
| if cert and len(cert) == 2: | ||
| if cert[0].startswith("-----BEGIN CERTIFICATE") and cert[1].startswith("-----BEGIN "): | ||
| logger.info("mTLS certificate and key are strings") | ||
| response = None | ||
| with tempfile.NamedTemporaryFile(mode="w", suffix=".crt") as tmp_certificate: | ||
| with tempfile.NamedTemporaryFile(mode="w", suffix=".key") as tmp_key: | ||
| tmp_certificate.write( | ||
| normalize_key(cert[0]) | ||
| ) | ||
| tmp_certificate.seek(0) | ||
| tmp_key.write( | ||
| normalize_key(cert[1]) | ||
| ) | ||
| tmp_key.seek(0) | ||
| kwargs["cert"] = (tmp_certificate.name, tmp_key.name) | ||
| response = self.session.request(method, url, **kwargs) | ||
| return response | ||
| return self.session.request(method, url, **kwargs) | ||
|
|
||
| def paginated_api_call(self, can_raise_exeption=True): | ||
| if self.pagination.params_must_be_blanked: | ||
| self.requests_kwargs["params"] = {} | ||
|
|
@@ -278,3 +300,20 @@ def get_headers(response): | |
| if isinstance(response, requests.Response): | ||
| return response.headers | ||
| return None | ||
|
|
||
|
|
||
| def normalize_key(key): | ||
| PROTECTED_EXPRESSIONS = [ | ||
| "BEGIN CERTIFICATE", "END CERTIFICATE", | ||
| "BEGIN PRIVATE KEY", "END PRIVATE KEY", | ||
| "BEGIN RSA PRIVATE KEY", "END RSA PRIVATE KEY" | ||
| ] | ||
| tempo_text = str(key) | ||
| for expression_to_protect in PROTECTED_EXPRESSIONS: | ||
| protected_form = expression_to_protect.replace(" ", "") | ||
| tempo_text = tempo_text.replace(expression_to_protect, protected_form) | ||
| tempo_text = tempo_text.replace(" ", "\n") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this be avoided if the input to the cert was a textarea instead of a input field? If I understand correctly we're trying to rebuild the certificate content because it was flattened when the user pasted into the input field
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to use the password type here so that the certificate and key are encrypted on the instance... Using Textarea would be less safe in that regard. |
||
| for expression_to_protect in PROTECTED_EXPRESSIONS: | ||
| protected_form = expression_to_protect.replace(" ", "") | ||
| tempo_text = tempo_text.replace(protected_form, expression_to_protect) | ||
| return tempo_text | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it should no longer be named
pathThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about that one: