From 558d13da7547e2073ba884a5e9c135c06a263863 Mon Sep 17 00:00:00 2001 From: Chandra Date: Mon, 26 Jan 2026 19:17:43 +0000 Subject: [PATCH 1/3] fix: instance grpc client once per process in benchmarks --- .../perf/microbenchmarks/reads/test_reads.py | 69 ++++++++++--------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/tests/perf/microbenchmarks/reads/test_reads.py b/tests/perf/microbenchmarks/reads/test_reads.py index 81f180d75..eee1b9499 100644 --- a/tests/perf/microbenchmarks/reads/test_reads.py +++ b/tests/perf/microbenchmarks/reads/test_reads.py @@ -180,6 +180,7 @@ def download_files_using_mrd_multi_coro(loop, client, files, other_params, chunk Returns: float: The maximum latency (in seconds) among all coroutines. """ + async def main(): if len(files) == 1: result = await download_chunks_using_mrd_async( @@ -298,45 +299,47 @@ def target_wrapper(*args, **kwargs): storage_client.bucket(params.bucket_name).blob(f) for f in files_names ) +# --- Global Variables for Worker Process --- +worker_loop = None +worker_client = None +worker_json_client = None -def _download_files_worker(files_to_download, other_params, chunks, bucket_type): - # For regional buckets, a new client must be created for each process. - # For zonal, the same is done for consistency. + +def _worker_init(bucket_type): + """Initializes a persistent event loop and client for each worker process.""" + global worker_loop, worker_client, worker_json_client if bucket_type == "zonal": - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - client = loop.run_until_complete(create_client()) - try: - # download_files_using_mrd_multi_coro returns max latency of coros - result = download_files_using_mrd_multi_coro( - loop, client, files_to_download, other_params, chunks - ) - finally: - tasks = asyncio.all_tasks(loop=loop) - for task in tasks: - task.cancel() - loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) - loop.close() - return result + worker_loop = asyncio.new_event_loop() + asyncio.set_event_loop(worker_loop) + worker_client = worker_loop.run_until_complete(create_client()) else: # regional from google.cloud import storage - json_client = storage.Client() + worker_json_client = storage.Client() + + +def _download_files_worker(files_to_download, other_params, chunks, bucket_type): + global worker_loop, worker_client, worker_json_client + if bucket_type == "zonal": + # The loop and client are already initialized in _worker_init. + # download_files_using_mrd_multi_coro returns max latency of coros + return download_files_using_mrd_multi_coro( + worker_loop, worker_client, files_to_download, other_params, chunks + ) + else: # regional # download_files_using_json_multi_threaded returns max latency of threads return download_files_using_json_multi_threaded( - None, json_client, files_to_download, other_params, chunks + None, worker_json_client, files_to_download, other_params, chunks ) -def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): - num_processes = params.num_processes +def download_files_mp_mc_wrapper(pool, files_names, params, chunks, bucket_type): num_coros = params.num_coros # This is n, number of files per process # Distribute filenames to processes filenames_per_process = [ files_names[i : i + num_coros] for i in range(0, len(files_names), num_coros) ] - args = [ ( filenames, @@ -347,17 +350,13 @@ def download_files_mp_mc_wrapper(files_names, params, chunks, bucket_type): for filenames in filenames_per_process ] - ctx = multiprocessing.get_context("spawn") - with ctx.Pool(processes=num_processes) as pool: - results = pool.starmap(_download_files_worker, args) - + results = pool.starmap(_download_files_worker, args) return max(results) @pytest.mark.parametrize( "workload_params", - all_params["read_seq_multi_process"] + - all_params["read_rand_multi_process"], + all_params["read_seq_multi_process"] + all_params["read_rand_multi_process"], indirect=True, ids=lambda p: p.name, ) @@ -386,10 +385,16 @@ def test_downloads_multi_proc_multi_coro( logging.info("randomizing chunks") random.shuffle(chunks) + ctx = multiprocessing.get_context("spawn") + pool = ctx.Pool( + processes=params.num_processes, + initializer=_worker_init, + initargs=(params.bucket_type,), + ) output_times = [] def target_wrapper(*args, **kwargs): - result = download_files_mp_mc_wrapper(*args, **kwargs) + result = download_files_mp_mc_wrapper(pool, *args, **kwargs) output_times.append(result) return output_times @@ -407,9 +412,11 @@ def target_wrapper(*args, **kwargs): ), ) finally: + pool.close() + pool.join() publish_benchmark_extra_info(benchmark, params, true_times=output_times) publish_resource_metrics(benchmark, m) blobs_to_delete.extend( storage_client.bucket(params.bucket_name).blob(f) for f in files_names - ) \ No newline at end of file + ) From dcc7932307319acd92e42ad724da4e175a9b9b50 Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 28 Jan 2026 10:42:43 +0000 Subject: [PATCH 2/3] fix lint issues: remove global declaration in `download_files_worker` --- tests/perf/microbenchmarks/reads/test_reads.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/perf/microbenchmarks/reads/test_reads.py b/tests/perf/microbenchmarks/reads/test_reads.py index eee1b9499..324938e94 100644 --- a/tests/perf/microbenchmarks/reads/test_reads.py +++ b/tests/perf/microbenchmarks/reads/test_reads.py @@ -299,6 +299,7 @@ def target_wrapper(*args, **kwargs): storage_client.bucket(params.bucket_name).blob(f) for f in files_names ) + # --- Global Variables for Worker Process --- worker_loop = None worker_client = None @@ -319,7 +320,6 @@ def _worker_init(bucket_type): def _download_files_worker(files_to_download, other_params, chunks, bucket_type): - global worker_loop, worker_client, worker_json_client if bucket_type == "zonal": # The loop and client are already initialized in _worker_init. # download_files_using_mrd_multi_coro returns max latency of coros From 98f85edf566b6a81a2b7aae77151447fc76ac0ed Mon Sep 17 00:00:00 2001 From: Chandra Date: Wed, 28 Jan 2026 10:44:22 +0000 Subject: [PATCH 3/3] skip flaky system test --- tests/system/test_zonal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 8019156dd..eb9df582c 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -264,7 +264,7 @@ async def _run(): event_loop.run_until_complete(_run()) - +@pytest.mark.skip(reason='Flaky test b/478129078') def test_mrd_open_with_read_handle(event_loop, grpc_client_direct): object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"