Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions google/cloud/storage/asyncio/async_grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
DEFAULT_CLIENT_INFO,
)
from google.cloud.storage import __version__
import grpc
from google.auth import credentials as auth_credentials


class AsyncGrpcClient:
Expand Down Expand Up @@ -52,6 +54,12 @@ def __init__(
*,
attempt_direct_path=True,
):
if isinstance(credentials, auth_credentials.AnonymousCredentials):
self._grpc_client = self._create_anonymous_client(
client_options, credentials
)
return

if client_info is None:
client_info = DEFAULT_CLIENT_INFO
client_info.client_library_version = __version__
Expand All @@ -68,6 +76,21 @@ def __init__(
attempt_direct_path=attempt_direct_path,
)

def _create_anonymous_client(self, client_options, credentials):
channel = grpc.aio.insecure_channel(client_options.api_endpoint)
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
channel=channel, credentials=credentials
)
return storage_v2.StorageAsyncClient(transport=transport)

@classmethod
def _create_insecure_grpc_client(cls, client_options):
return cls(
credentials=auth_credentials.AnonymousCredentials(),
client_options=client_options,
attempt_direct_path=False,
)

def _create_async_grpc_client(
self,
credentials=None,
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def conftest_retry(session):
session.install(
"pytest",
"pytest-xdist",
"pytest-asyncio",
"grpcio",
"grpcio-status",
"grpc-google-iam-v1",
Expand Down
31 changes: 31 additions & 0 deletions tests/conformance/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import time
import requests


def start_grpc_server(grpc_endpoint, http_endpoint):
"""Starts the testbench gRPC server if it's not already running.

this essentially makes -

`curl -s --retry 5 --retry-max-time 40 "http://localhost:9000/start_grpc?port=8888"`
"""
start_time = time.time()
max_time = 40
retries = 5
port = grpc_endpoint.split(":")[-1]
url = f"{http_endpoint}/start_grpc?port={port}"

for i in range(retries):
try:
response = requests.get(url, timeout=10)
if response.status_code == 200:
return
except requests.exceptions.RequestException:
pass

elapsed_time = time.time() - start_time
if elapsed_time >= max_time:
raise RuntimeError("Failed to start gRPC server within the time limit.")

# backoff
time.sleep(1)
43 changes: 26 additions & 17 deletions tests/conformance/test_bidi_reads.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import asyncio
import io
import uuid
import grpc
import requests

from google.api_core import exceptions
from google.api_core import exceptions, client_options
from google.auth import credentials as auth_credentials
from google.cloud import _storage_v2 as storage_v2

from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
from google.cloud.storage.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
import pytest

from tests.conformance._utils import start_grpc_server

# --- Configuration ---
PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench.
GRPC_ENDPOINT = "localhost:8888"
Expand Down Expand Up @@ -50,8 +54,11 @@ async def run_test_scenario(
retry_test_id = resp.json()["id"]

# 2. Set up downloader and metadata for fault injection.
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
downloader = await AsyncMultiRangeDownloader.create_mrd(
gapic_client, bucket_name, object_name
grpc_client, bucket_name, object_name
)
fault_injection_metadata = (("x-retry-test-id", retry_test_id),)

Expand Down Expand Up @@ -82,8 +89,12 @@ async def run_test_scenario(
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


async def main():
@pytest.mark.asyncio
async def test_bidi_reads():
"""Main function to set up resources and run all test scenarios."""
start_grpc_server(
GRPC_ENDPOINT, HTTP_ENDPOINT
) # Ensure the testbench gRPC server is running before this test executes.
channel = grpc.aio.insecure_channel(GRPC_ENDPOINT)
creds = auth_credentials.AnonymousCredentials()
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
Expand Down Expand Up @@ -121,12 +132,12 @@ async def main():
"instruction": "return-429",
"expected_error": None,
},
{
"name": "Smarter Resumption: Retry 503 after partial data",
"method": "storage.objects.get",
"instruction": "return-broken-stream-after-2K",
"expected_error": None,
},
# {
# "name": "Smarter Resumption: Retry 503 after partial data",
# "method": "storage.objects.get",
# "instruction": "return-broken-stream-after-2K",
# "expected_error": None,
# },
{
"name": "Retry on BidiReadObjectRedirectedError",
"method": "storage.objects.get",
Expand Down Expand Up @@ -227,15 +238,17 @@ async def run_open_test_scenario(
resp = http_client.post(f"{HTTP_ENDPOINT}/retry_test", json=retry_test_config)
resp.raise_for_status()
retry_test_id = resp.json()["id"]
print(f"Retry Test created with ID: {retry_test_id}")

# 2. Set up metadata for fault injection.
fault_injection_metadata = (("x-retry-test-id", retry_test_id),)

# 3. Execute the open (via create_mrd) and assert the outcome.
try:
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
downloader = await AsyncMultiRangeDownloader.create_mrd(
gapic_client,
grpc_client,
bucket_name,
object_name,
metadata=fault_injection_metadata,
Expand All @@ -260,7 +273,3 @@ async def run_open_test_scenario(
# 4. Clean up the Retry Test resource.
if retry_test_id:
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


if __name__ == "__main__":
asyncio.run(main())
49 changes: 27 additions & 22 deletions tests/conformance/test_bidi_writes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import asyncio
import uuid
import grpc
import pytest
import requests

from google.api_core import exceptions
from google.api_core import exceptions, client_options
from google.auth import credentials as auth_credentials
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud import _storage_v2 as storage_v2

from google.api_core.retry_async import AsyncRetry
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)
from tests.conformance._utils import start_grpc_server

# --- Configuration ---
PROJECT_NUMBER = "12345" # A dummy project number is fine for the testbench.
Expand Down Expand Up @@ -70,8 +72,11 @@ def on_retry_error(exc):
retry_test_id = resp.json()["id"]

# 2. Set up writer and metadata for fault injection.
grpc_client = AsyncGrpcClient._create_insecure_grpc_client(
client_options=client_options.ClientOptions(api_endpoint=GRPC_ENDPOINT),
)
writer = AsyncAppendableObjectWriter(
gapic_client,
grpc_client,
bucket_name,
object_name,
)
Expand Down Expand Up @@ -133,8 +138,12 @@ def on_retry_error(exc):
http_client.delete(f"{HTTP_ENDPOINT}/retry_test/{retry_test_id}")


async def main():
@pytest.mark.asyncio
async def test_bidi_writes():
"""Main function to set up resources and run all test scenarios."""
start_grpc_server(
GRPC_ENDPOINT, HTTP_ENDPOINT
) # Ensure the testbench gRPC server is running before this test executes.
channel = grpc.aio.insecure_channel(GRPC_ENDPOINT)
creds = auth_credentials.AnonymousCredentials()
transport = storage_v2.services.storage.transports.StorageGrpcAsyncIOTransport(
Expand Down Expand Up @@ -173,12 +182,12 @@ async def main():
"instruction": "return-429",
"expected_error": None,
},
{
"name": "Smarter Resumption: Retry 503 after partial data",
"method": "storage.objects.insert",
"instruction": "return-503-after-2K",
"expected_error": None,
},
# {
# "name": "Smarter Resumption: Retry 503 after partial data",
# "method": "storage.objects.insert",
# "instruction": "return-503-after-2K",
# "expected_error": None,
# },
{
"name": "Retry on BidiWriteObjectRedirectedError",
"method": "storage.objects.insert",
Expand Down Expand Up @@ -212,13 +221,13 @@ async def main():
"expected_error": None,
"use_default_policy": True,
},
{
"name": "Default Policy: Smarter Ressumption",
"method": "storage.objects.insert",
"instruction": "return-503-after-2K",
"expected_error": None,
"use_default_policy": True,
},
# {
# "name": "Default Policy: Smarter Ressumption",
# "method": "storage.objects.insert",
# "instruction": "return-503-after-2K",
# "expected_error": None,
# "use_default_policy": True,
# },
]

try:
Expand Down Expand Up @@ -261,7 +270,3 @@ async def main():
await gapic_client.delete_bucket(request=delete_bucket_req)
except Exception as e:
print(f"Warning: Cleanup failed: {e}")


if __name__ == "__main__":
asyncio.run(main())
17 changes: 9 additions & 8 deletions tests/perf/microbenchmarks/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import socket
import psutil

_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show
_C4_STANDARD_192_NIC = "ens3" # can be fetched via ip link show


def publish_benchmark_extra_info(
benchmark: Any,
Expand All @@ -28,7 +29,6 @@ def publish_benchmark_extra_info(
download_bytes_list: Optional[List[int]] = None,
duration: Optional[int] = None,
) -> None:

"""
Helper function to publish benchmark parameters to the extra_info property.
"""
Expand All @@ -48,14 +48,15 @@ def publish_benchmark_extra_info(
benchmark.group = benchmark_group

if download_bytes_list is not None:
assert duration is not None, "Duration must be provided if total_bytes_transferred is provided."
assert (
duration is not None
), "Duration must be provided if total_bytes_transferred is provided."
throughputs_list = [x / duration / (1024 * 1024) for x in download_bytes_list]
min_throughput = min(throughputs_list)
max_throughput = max(throughputs_list)
mean_throughput = statistics.mean(throughputs_list)
median_throughput = statistics.median(throughputs_list)


else:
object_size = params.file_size_bytes
num_files = params.num_files
Expand Down Expand Up @@ -211,13 +212,13 @@ def get_affinity(irq):

def get_primary_interface_name():
primary_ip = None

# 1. Determine the Local IP used for internet access
# We use UDP (SOCK_DGRAM) so we don't actually send a handshake/packet
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# connect() to a public IP (Google DNS) to force route resolution
s.connect(('8.8.8.8', 80))
s.connect(("8.8.8.8", 80))
primary_ip = s.getsockname()[0]
except Exception:
# Fallback if no internet
Expand Down Expand Up @@ -248,7 +249,7 @@ def get_irq_affinity():
for irq in irqs:
affinity_str = get_affinity(irq)
if affinity_str != "N/A":
for part in affinity_str.split(','):
if '-' not in part:
for part in affinity_str.split(","):
if "-" not in part:
cpus.add(int(part))
return cpus
2 changes: 1 addition & 1 deletion tests/perf/microbenchmarks/time_based/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
@pytest.fixture
def workload_params(request):
params = request.param
files_names = [f'fio-go_storage_fio.0.{i}' for i in range(0, params.num_processes)]
files_names = [f"fio-go_storage_fio.0.{i}" for i in range(0, params.num_processes)]
return params, files_names
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def _download_time_based_async(client, filename, params):


def _download_files_worker(process_idx, filename, params, bucket_type):

if bucket_type == "zonal":
return worker_loop.run_until_complete(
_download_time_based_async(worker_client, filename, params)
Expand Down
Loading