Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions google/auth/compute_engine/_credentials_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Google Compute Engine credentials.

This module provides authentication for an application running on Google
Compute Engine using the Compute Engine metadata server.

"""

import google.auth._credentials_async as credentials_async
from google.auth import exceptions
from google.auth.compute_engine import _metadata_async as _metadata
from google.auth.compute_engine import credentials as credentials_sync


class Credentials(
credentials_sync.Credentials,
credentials_async.Credentials,
credentials_async.Scoped,
credentials_async.CredentialsWithQuotaProject
):
"""Async Compute Engine Credentials.

These credentials use the Google Compute Engine metadata server to obtain
OAuth 2.0 access tokens associated with the instance's service account,
and are also used for Cloud Run, Flex and App Engine (except for the Python
2.7 runtime, which is supported only on older versions of this library).

For more information about Compute Engine authentication, including how
to configure scopes, see the `Compute Engine authentication
documentation`_.

.. note:: On Compute Engine the metadata server ignores requested scopes.
On Cloud Run, Flex and App Engine the server honours requested scopes.

.. _Compute Engine authentication documentation:
https://cloud.google.com/compute/docs/authentication#using
"""

def __init__(
self,
service_account_email="default",
quota_project_id=None,
scopes=None,
default_scopes=None,
):
"""
Args:
service_account_email (str): The service account email to use, or
'default'. A Compute Engine instance may have multiple service
accounts.
quota_project_id (Optional[str]): The project ID used for quota and
billing.
scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
default_scopes (Optional[Sequence[str]]): Default scopes passed by a
Google client library. Use 'scopes' for user-defined scopes.
"""
super(Credentials, self).__init__()
self._service_account_email = service_account_email
self._quota_project_id = quota_project_id
self._scopes = scopes
self._default_scopes = default_scopes

async def _retrieve_info(self, request):
"""Retrieve information about the service account.

Updates the scopes and retrieves the full service account email.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
"""
info = await _metadata.get_service_account_info(
request, service_account=self._service_account_email
)

self._service_account_email = info["email"]

# Don't override scopes requested by the user.
if self._scopes is None:
self._scopes = info["scopes"]

async def refresh(self, request):
"""Refresh the access token and scopes.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Raises:
google.auth.exceptions.RefreshError: If the Compute Engine metadata
service can't be reached or if the instance has no
credentials.
"""
scopes = self._scopes if self._scopes is not None else self._default_scopes
try:
await self._retrieve_info(request)
self.token, self.expiry = await _metadata.get_service_account_token(
request, service_account=self._service_account_email, scopes=scopes
)
except exceptions.TransportError as caught_exc:
new_exc = exceptions.RefreshError(caught_exc)
raise new_exc from caught_exc
266 changes: 266 additions & 0 deletions google/auth/compute_engine/_metadata_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Provides helper methods for talking to the Compute Engine metadata server.

See https://cloud.google.com/compute/docs/metadata for more details.
"""

import datetime
import json
import logging
import os
from http import HTTPStatus
from urllib.parse import urljoin

from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions
from google.auth.transport import _httpx_requests as httpx_requests

_LOGGER = logging.getLogger(__name__)

# Environment variable GCE_METADATA_HOST is originally named
# GCE_METADATA_ROOT. For compatiblity reasons, here it checks
# the new variable first; if not set, the system falls back
# to the old variable.
_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None)
if not _GCE_METADATA_HOST:
_GCE_METADATA_HOST = os.getenv(
environment_vars.GCE_METADATA_ROOT, "metadata.google.internal"
)
_METADATA_ROOT = "http://{}/computeMetadata/v1/".format(_GCE_METADATA_HOST)

# This is used to ping the metadata server, it avoids the cost of a DNS
# lookup.
_METADATA_IP_ROOT = "http://{}".format(
os.getenv(environment_vars.GCE_METADATA_IP, "169.254.169.254")
)
_METADATA_FLAVOR_HEADER = "metadata-flavor"
_METADATA_FLAVOR_VALUE = "Google"
_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE}

# Timeout in seconds to wait for the GCE metadata server when detecting the
# GCE environment.
try:
_METADATA_DEFAULT_TIMEOUT = int(os.getenv("GCE_METADATA_TIMEOUT", 3))
except ValueError: # pragma: NO COVER
_METADATA_DEFAULT_TIMEOUT = 3


async def ping(request: httpx_requests.Request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=3):
"""Checks to see if the metadata server is available.

Args:
request (httpx_requests.Request): A callable used to make
HTTP requests.
timeout (int): How long to wait for the metadata server to respond.
retry_count (int): How many times to attempt connecting to metadata
server using above timeout.

Returns:
bool: True if the metadata server is reachable, False otherwise.
"""
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
retries = 0
while retries < retry_count:
try:
response = await request(
url=_METADATA_IP_ROOT,
method="GET",
headers=_METADATA_HEADERS,
timeout=timeout,
)

metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER)
return (
response.status == HTTPStatus.OK
and metadata_flavor == _METADATA_FLAVOR_VALUE
)

except exceptions.TransportError as e:
_LOGGER.warning(
"Compute Engine Metadata server unavailable on "
"attempt %s of %s. Reason: %s",
retries + 1,
retry_count,
e,
)
retries += 1

return False


async def get(
request: httpx_requests.Request, path, root=_METADATA_ROOT, params=None, recursive=False, retry_count=5
):
"""Fetch a resource from the metadata server.

Args:
request (httpx_requests.Request): A callable used to make
HTTP requests.
path (str): The resource to retrieve. For example,
``'instance/service-accounts/default'``.
root (str): The full path to the metadata server root.
params (Optional[Mapping[str, str]]): A mapping of query parameter
keys to values.
recursive (bool): Whether to do a recursive query of metadata. See
https://cloud.google.com/compute/docs/metadata#aggcontents for more
details.
retry_count (int): How many times to attempt connecting to metadata
server using above timeout.

Returns:
Union[Mapping, str]: If the metadata server returns JSON, a mapping of
the decoded JSON is return. Otherwise, the response content is
returned as a string.

Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
base_url = urljoin(root, path)
query_params = {} if params is None else params

if recursive:
query_params["recursive"] = "true"

url = _helpers.update_query(base_url, query_params)

retries = 0
while retries < retry_count:
try:
response = await request(url=url, method="GET", headers=_METADATA_HEADERS)
break

except exceptions.TransportError as e:
_LOGGER.warning(
"Compute Engine Metadata server unavailable on "
"attempt %s of %s. Reason: %s",
retries + 1,
retry_count,
e,
)
retries += 1
else:
raise exceptions.TransportError(
"Failed to retrieve {} from the Google Compute Engine "
"metadata service. Compute Engine Metadata server unavailable".format(url)
)

if response.status == HTTPStatus.OK:
content = _helpers.from_bytes(response.data)
if response.headers["content-type"] == "application/json":
try:
return json.loads(content)
except ValueError as caught_exc:
new_exc = exceptions.TransportError(
"Received invalid JSON from the Google Compute Engine "
"metadata service: {:.20}".format(content)
)
raise new_exc from caught_exc
else:
return content
else:
raise exceptions.TransportError(
"Failed to retrieve {} from the Google Compute Engine "
"metadata service. Status: {} Response:\n{}".format(
url, response.status, response.data
),
response,
)


async def get_project_id(request: httpx_requests.Request):
"""Get the Google Cloud Project ID from the metadata server.

Args:
request (httpx_requests.Request): A callable used to make
HTTP requests.

Returns:
str: The project ID

Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
return await get(request, "project/project-id")


async def get_service_account_info(request: httpx_requests.Request, service_account="default"):
"""Get information about a service account from the metadata server.

Args:
request (httpx_requests.Request): A callable used to make
HTTP requests.
service_account (str): The string 'default' or a service account email
address. The determines which service account for which to acquire
information.

Returns:
Mapping: The service account's information, for example::

{
'email': '...',
'scopes': ['scope', ...],
'aliases': ['default', '...']
}

Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
path = "instance/service-accounts/{0}/".format(service_account)
# See https://cloud.google.com/compute/docs/metadata#aggcontents
# for more on the use of 'recursive'.
return await get(request, path, params={"recursive": "true"})


async def get_service_account_token(request: httpx_requests.Request, service_account="default", scopes=None):
"""Get the OAuth 2.0 access token for a service account.

Args:
request (httpx_requests.Request): A callable used to make
HTTP requests.
service_account (str): The string 'default' or a service account email
address. The determines which service account for which to acquire
an access token.
scopes (Optional[Union[str, List[str]]]): Optional string or list of
strings with auth scopes.
Returns:
Tuple[str, datetime]: The access token and its expiration.

Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
if scopes:
if not isinstance(scopes, str):
scopes = ",".join(scopes)
params = {"scopes": scopes}
else:
params = None

path = "instance/service-accounts/{0}/token".format(service_account)
token_json = await get(request, path, params=params)
token_expiry = datetime.datetime.utcnow() + datetime.timedelta(
seconds=token_json["expires_in"]
)
return token_json["access_token"], token_expiry
Loading