diff --git a/google/cloud/storage/asyncio/async_grpc_client.py b/google/cloud/storage/asyncio/async_grpc_client.py index 640e7fe38..88566b246 100644 --- a/google/cloud/storage/asyncio/async_grpc_client.py +++ b/google/cloud/storage/asyncio/async_grpc_client.py @@ -19,6 +19,8 @@ DEFAULT_CLIENT_INFO, ) from google.cloud.storage import __version__ +import grpc +from google.auth import credentials as auth_credentials class AsyncGrpcClient: @@ -52,6 +54,12 @@ def __init__( *, attempt_direct_path=True, ): + if isinstance(credentials, auth_credentials.AnonymousCredentials): + self._grpc_client = self._create_anonymous_client( + client_options, credentials + ) + return + if client_info is None: client_info = DEFAULT_CLIENT_INFO client_info.client_library_version = __version__ @@ -68,6 +76,21 @@ def __init__( attempt_direct_path=attempt_direct_path, ) + def _create_anonymous_client(self, client_options, credentials): + channel = grpc.aio.insecure_channel(client_options.api_endpoint) + transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( + channel=channel, credentials=credentials + ) + return storage_v2.StorageAsyncClient(transport=transport) + + @classmethod + def _create_insecure_grpc_client(cls, client_options): + return cls( + credentials=auth_credentials.AnonymousCredentials(), + client_options=client_options, + attempt_direct_path=False, + ) + def _create_async_grpc_client( self, credentials=None, diff --git a/noxfile.py b/noxfile.py index d7ca4dd88..2aabad17e 100644 --- a/noxfile.py +++ b/noxfile.py @@ -236,6 +236,7 @@ def conftest_retry(session): session.install( "pytest", "pytest-xdist", + "pytest-asyncio", "grpcio", "grpcio-status", "grpc-google-iam-v1", diff --git a/tests/conformance/_utils.py b/tests/conformance/_utils.py new file mode 100644 index 000000000..105faed2a --- /dev/null +++ b/tests/conformance/_utils.py @@ -0,0 +1,31 @@ +import time +import requests + + +def start_grpc_server(grpc_endpoint, http_endpoint): + """Starts the testbench gRPC server if it's not already running. + + this essentially makes - + + `curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"` + """ + start_time = time.time() + max_time = 40 + retries = 5 + port = grpc_endpoint.split(":")[-1] + url = f"{http_endpoint}/start_grpc?port={port}" + + for i in range(retries): + try: + response = requests.get(url, timeout=10) + if response.status_code == 200: + return + except requests.exceptions.RequestException: + pass + + elapsed_time = time.time() - start_time + if elapsed_time >= max_time: + raise RuntimeError("Failed to start gRPC server within the time limit.") + + # backoff + time.sleep(1) diff --git a/tests/conformance/test_bidi_reads.py b/tests/conformance/test_bidi_reads.py index 4157182cb..384de6e09 100644 --- a/tests/conformance/test_bidi_reads.py +++ b/tests/conformance/test_bidi_reads.py @@ -1,17 +1,21 @@ -import asyncio import io import uuid import grpc import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials from google.cloud import _storage_v2 as storage_v2 -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +import pytest + +from tests.conformance._utils import start_grpc_server + # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. GRPC_ENDPOINT = "localhost:8888" @@ -50,8 +54,11 @@ async def run_test_scenario( retry_test_id = resp.json()["id"] # 2. Set up downloader and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, bucket_name, object_name + grpc_client, bucket_name, object_name ) fault_injection_metadata = (("x-retry-test-id", retry_test_id),) @@ -82,8 +89,12 @@ async def run_test_scenario( http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_reads(): """Main function to set up resources and run all test scenarios.""" + start_grpc_server( + GRPC_ENDPOINT, HTTP_ENDPOINT + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -121,12 +132,12 @@ async def main(): "instruction": "return-429", "expected_error": None, }, - { - "name": "Smarter Resumption: Retry 503 after partial data", - "method": "storage.objects.get", - "instruction": "return-broken-stream-after-2K", - "expected_error": None, - }, + # { + # "name": "Smarter Resumption: Retry 503 after partial data", + # "method": "storage.objects.get", + # "instruction": "return-broken-stream-after-2K", + # "expected_error": None, + # }, { "name": "Retry on BidiReadObjectRedirectedError", "method": "storage.objects.get", @@ -227,15 +238,17 @@ async def run_open_test_scenario( resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config) resp.raise_for_status() retry_test_id = resp.json()["id"] - print(f"Retry Test created with ID: {retry_test_id}") # 2. Set up metadata for fault injection. fault_injection_metadata = (("x-retry-test-id", retry_test_id),) # 3. Execute the open (via create_mrd) and assert the outcome. try: + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) downloader = await AsyncMultiRangeDownloader.create_mrd( - gapic_client, + grpc_client, bucket_name, object_name, metadata=fault_injection_metadata, @@ -260,7 +273,3 @@ async def run_open_test_scenario( # 4. Clean up the Retry Test resource. if retry_test_id: http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/conformance/test_bidi_writes.py b/tests/conformance/test_bidi_writes.py index 90dfaf5f8..81f079f3e 100644 --- a/tests/conformance/test_bidi_writes.py +++ b/tests/conformance/test_bidi_writes.py @@ -1,16 +1,18 @@ -import asyncio import uuid import grpc +import pytest import requests -from google.api_core import exceptions +from google.api_core import exceptions, client_options from google.auth import credentials as auth_credentials +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud import _storage_v2 as storage_v2 from google.api_core.retry_async import AsyncRetry -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) +from tests.conformance._utils import start_grpc_server # --- Configuration --- PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench. @@ -70,8 +72,11 @@ def on_retry_error(exc): retry_test_id = resp.json()["id"] # 2. Set up writer and metadata for fault injection. + grpc_client = AsyncGrpcClient._create_insecure_grpc_client( + client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT), + ) writer = AsyncAppendableObjectWriter( - gapic_client, + grpc_client, bucket_name, object_name, ) @@ -133,8 +138,12 @@ def on_retry_error(exc): http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}") -async def main(): +@pytest.mark.asyncio +async def test_bidi_writes(): """Main function to set up resources and run all test scenarios.""" + start_grpc_server( + GRPC_ENDPOINT, HTTP_ENDPOINT + ) # Ensure the testbench gRPC server is running before this test executes. channel = grpc.aio.insecure_channel(GRPC_ENDPOINT) creds = auth_credentials.AnonymousCredentials() transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport( @@ -173,12 +182,12 @@ async def main(): "instruction": "return-429", "expected_error": None, }, - { - "name": "Smarter Resumption: Retry 503 after partial data", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - }, + # { + # "name": "Smarter Resumption: Retry 503 after partial data", + # "method": "storage.objects.insert", + # "instruction": "return-503-after-2K", + # "expected_error": None, + # }, { "name": "Retry on BidiWriteObjectRedirectedError", "method": "storage.objects.insert", @@ -212,13 +221,13 @@ async def main(): "expected_error": None, "use_default_policy": True, }, - { - "name": "Default Policy: Smarter Ressumption", - "method": "storage.objects.insert", - "instruction": "return-503-after-2K", - "expected_error": None, - "use_default_policy": True, - }, + # { + # "name": "Default Policy: Smarter Ressumption", + # "method": "storage.objects.insert", + # "instruction": "return-503-after-2K", + # "expected_error": None, + # "use_default_policy": True, + # }, ] try: @@ -261,7 +270,3 @@ async def main(): await gapic_client.delete_bucket(request=delete_bucket_req) except Exception as e: print(f"Warning: Cleanup failed: {e}") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py index edf398fe9..9e5609500 100644 --- a/tests/perf/microbenchmarks/_utils.py +++ b/tests/perf/microbenchmarks/_utils.py @@ -18,7 +18,8 @@ import socket import psutil -_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show +_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show + def publish_benchmark_extra_info( benchmark: Any, @@ -28,7 +29,6 @@ def publish_benchmark_extra_info( download_bytes_list: Optional[List[int]] = None, duration: Optional[int] = None, ) -> None: - """ Helper function to publish benchmark parameters to the extra_info property. """ @@ -48,14 +48,15 @@ def publish_benchmark_extra_info( benchmark.group = benchmark_group if download_bytes_list is not None: - assert duration is not None, "Duration must be provided if total_bytes_transferred is provided." + assert ( + duration is not None + ), "Duration must be provided if total_bytes_transferred is provided." throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list] min_throughput = min(throughputs_list) max_throughput = max(throughputs_list) mean_throughput = statistics.mean(throughputs_list) median_throughput = statistics.median(throughputs_list) - else: object_size = params.file_size_bytes num_files = params.num_files @@ -211,13 +212,13 @@ def get_affinity(irq): def get_primary_interface_name(): primary_ip = None - + # 1. Determine the Local IP used for internet access # We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # connect() to a public IP (Google DNS) to force route resolution - s.connect(('8.8.8.8', 80)) + s.connect(("8.8.8.8", 80)) primary_ip = s.getsockname()[0] except Exception: # Fallback if no internet @@ -248,7 +249,7 @@ def get_irq_affinity(): for irq in irqs: affinity_str = get_affinity(irq) if affinity_str != "N/A": - for part in affinity_str.split(','): - if '-' not in part: + for part in affinity_str.split(","): + if "-" not in part: cpus.add(int(part)) return cpus diff --git a/tests/perf/microbenchmarks/time_based/conftest.py b/tests/perf/microbenchmarks/time_based/conftest.py index bcd186d7b..5c0c787f0 100644 --- a/tests/perf/microbenchmarks/time_based/conftest.py +++ b/tests/perf/microbenchmarks/time_based/conftest.py @@ -17,5 +17,5 @@ @pytest.fixture def workload_params(request): params = request.param - files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)] + files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)] return params, files_names diff --git a/tests/perf/microbenchmarks/time_based/reads/test_reads.py b/tests/perf/microbenchmarks/time_based/reads/test_reads.py index f2b84158b..17e6d48fd 100644 --- a/tests/perf/microbenchmarks/time_based/reads/test_reads.py +++ b/tests/perf/microbenchmarks/time_based/reads/test_reads.py @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params): def _download_files_worker(process_idx, filename, params, bucket_type): - if bucket_type == "zonal": return worker_loop.run_until_complete( _download_time_based_async(worker_client, filename, params) diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 0c8fe4375..9eb701ed9 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -175,9 +175,9 @@ async def test_state_lookup(self, mock_appendable_writer): writer._is_stream_open = True writer.write_obj_stream = mock_appendable_writer["mock_stream"] - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(persisted_size=100) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=100) size = await writer.state_lookup() @@ -246,9 +246,7 @@ async def test_append_data_less_than_flush_interval(self, mock_appendable_writer ], ) @pytest.mark.asyncio - async def test_append( - self, data_len, mock_appendable_writer - ): + async def test_append(self, data_len, mock_appendable_writer): """Verify append orchestrates manager and drives the internal generator.""" # Arrange writer = self._make_one(mock_appendable_writer["mock_client"]) @@ -272,10 +270,19 @@ async def test_append( # Assert expected_recv_count = data_len // _DEFAULT_FLUSH_INTERVAL_BYTES assert writer.offset == data_len - assert writer.bytes_appended_since_last_flush == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.persisted_size == expected_recv_count*_DEFAULT_FLUSH_INTERVAL_BYTES - assert writer.write_obj_stream.send.await_count == -(-data_len // _MAX_CHUNK_SIZE_BYTES) # Ceiling division for number of chunks - assert writer.write_obj_stream.recv.await_count == expected_recv_count # Expect 1 recv per flush interval + assert ( + writer.bytes_appended_since_last_flush + == data_len % _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert ( + writer.persisted_size == expected_recv_count * _DEFAULT_FLUSH_INTERVAL_BYTES + ) + assert writer.write_obj_stream.send.await_count == -( + -data_len // _MAX_CHUNK_SIZE_BYTES + ) # Ceiling division for number of chunks + assert ( + writer.write_obj_stream.recv.await_count == expected_recv_count + ) # Expect 1 recv per flush interval @pytest.mark.asyncio async def test_append_recovery_reopens_stream(self, mock_appendable_writer): @@ -339,9 +346,9 @@ async def test_flush_resets_counters(self, mock_appendable_writer): writer.write_obj_stream = mock_appendable_writer["mock_stream"] writer.bytes_appended_since_last_flush = 100 - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(persisted_size=200) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(persisted_size=200) await writer.flush() @@ -382,9 +389,9 @@ async def test_finalize_lifecycle(self, mock_appendable_writer): writer.write_obj_stream = mock_appendable_writer["mock_stream"] resource = storage_type.Object(size=999) - mock_appendable_writer["mock_stream"].recv.return_value = ( - storage_type.BidiWriteObjectResponse(resource=resource) - ) + mock_appendable_writer[ + "mock_stream" + ].recv.return_value = storage_type.BidiWriteObjectResponse(resource=resource) res = await writer.finalize() diff --git a/tests/unit/asyncio/test_async_grpc_client.py b/tests/unit/asyncio/test_async_grpc_client.py index f193acb60..06cb232d5 100644 --- a/tests/unit/asyncio/test_async_grpc_client.py +++ b/tests/unit/asyncio/test_async_grpc_client.py @@ -19,6 +19,7 @@ from google.api_core import client_info as client_info_lib from google.cloud.storage.asyncio import async_grpc_client from google.cloud.storage import __version__ +from google.api_core import client_options def _make_credentials(spec=None): @@ -157,36 +158,31 @@ def test_grpc_client_property(self, mock_grpc_gapic_client): assert retrieved_client is mock_grpc_gapic_client.return_value @mock.patch("google.cloud._storage_v2.StorageAsyncClient") - def test_grpc_client_with_anon_creds(self, mock_grpc_gapic_client): + @mock.patch( + "google.cloud.storage.asyncio.async_grpc_client.grpc.aio.insecure_channel" + ) + def test_grpc_client_with_anon_creds( + self, mock_insecure_channel, mock_async_storage_client + ): # Arrange - mock_transport_cls = mock.MagicMock() - mock_grpc_gapic_client.get_transport_class.return_value = mock_transport_cls - channel_sentinel = mock.sentinel.channel - - mock_transport_cls.create_channel.return_value = channel_sentinel - mock_transport_cls.return_value = mock.sentinel.transport + mock_channel = mock.MagicMock() + mock_insecure_channel.return_value = mock_channel # Act - anonymous_creds = AnonymousCredentials() - client = async_grpc_client.AsyncGrpcClient(credentials=anonymous_creds) - retrieved_client = client.grpc_client + client = async_grpc_client.AsyncGrpcClient( + client_options=client_options.ClientOptions( + api_endpoint="my-grpc-endpoint" + ), + credentials=AnonymousCredentials(), + ) # Assert - assert retrieved_client is mock_grpc_gapic_client.return_value - - kwargs = mock_grpc_gapic_client.call_args.kwargs - client_info = kwargs["client_info"] - agent_version = f"gcloud-python/{__version__}" - assert agent_version in client_info.user_agent - primary_user_agent = client_info.to_user_agent() - expected_options = (("grpc.primary_user_agent", primary_user_agent),) + assert client.grpc_client is mock_async_storage_client.return_value + mock_insecure_channel.assert_called_once_with("my-grpc-endpoint") - mock_transport_cls.create_channel.assert_called_once_with( - attempt_direct_path=True, - credentials=anonymous_creds, - options=expected_options, - ) - mock_transport_cls.assert_called_once_with(channel=channel_sentinel) + kwargs = mock_async_storage_client.call_args.kwargs + transport = kwargs["transport"] + assert isinstance(transport._credentials, AnonymousCredentials) @mock.patch("google.cloud._storage_v2.StorageAsyncClient") def test_user_agent_with_custom_client_info(self, mock_async_storage_client): @@ -221,9 +217,7 @@ async def test_delete_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -264,9 +258,7 @@ async def test_get_object(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object" @@ -293,9 +285,7 @@ async def test_get_object_with_all_parameters(self, mock_async_storage_client): mock_gapic_client = mock.AsyncMock() mock_async_storage_client.return_value = mock_gapic_client - client = async_grpc_client.AsyncGrpcClient( - credentials=_make_credentials(spec=AnonymousCredentials) - ) + client = async_grpc_client.AsyncGrpcClient(credentials=_make_credentials()) bucket_name = "bucket" object_name = "object"