From b102f34d7a6e243c17aaee50f020218eda6f4386 Mon Sep 17 00:00:00 2001 From: Ankita Luthra Date: Sun, 25 Jan 2026 15:10:18 +0530 Subject: [PATCH 01/10] Adds test coverage check in repo (#68) * Adds test coverage check in repo * Adds pytest covergae dependency * Update threshold for new code --- .github/.codecov.yml | 13 +++++++++++++ .github/workflows/ci.yml | 7 +++++++ environment_gcsfs.yaml | 1 + 3 files changed, 21 insertions(+) create mode 100644 .github/.codecov.yml diff --git a/.github/.codecov.yml b/.github/.codecov.yml new file mode 100644 index 00000000..d3f91f70 --- /dev/null +++ b/.github/.codecov.yml @@ -0,0 +1,13 @@ +codecov: + require_ci_to_pass: yes + +coverage: + status: + project: + default: + target: auto + threshold: 0% # Allow 0% drop. + patch: + default: + target: 80% + threshold: 0% diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8a56fa19..3c5f92dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,9 +51,16 @@ jobs: export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true" pytest -vv -s \ + --cov=gcsfs --cov-report=xml \ --log-format="%(asctime)s %(levelname)s %(message)s" \ --log-date-format="%H:%M:%S" \ gcsfs/ + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml + fail_ci_if_error: true lint: name: lint diff --git a/environment_gcsfs.yaml b/environment_gcsfs.yaml index 39db5120..cd761d8a 100644 --- a/environment_gcsfs.yaml +++ b/environment_gcsfs.yaml @@ -17,6 +17,7 @@ dependencies: - pytest - pytest-timeout - pytest-asyncio + - pytest-cov - pytest-subtests - requests - ujson From f0d08090bba742d95fc3b30c7369961d18be9fa3 Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 19 Jan 2026 09:35:01 +0000 Subject: [PATCH 02/10] implement _cp_file method for zonal buckets --- cloudbuild/e2e-tests-cloudbuild.yaml | 1 + gcsfs/extended_gcsfs.py | 31 +++++++++++++++ gcsfs/tests/test_extended_gcsfs.py | 57 ++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/cloudbuild/e2e-tests-cloudbuild.yaml b/cloudbuild/e2e-tests-cloudbuild.yaml index 84e2fee3..c8c3bee8 100644 --- a/cloudbuild/e2e-tests-cloudbuild.yaml +++ b/cloudbuild/e2e-tests-cloudbuild.yaml @@ -141,6 +141,7 @@ steps: TEST_SCRIPT=" source env/bin/activate && \ echo '--- Preparing test environment for zonal tests ---' && \ + export GCSFS_TEST_BUCKET='gcsfs-test-standard-${_SHORT_BUILD_ID}' && \ export GCSFS_ZONAL_TEST_BUCKET='gcsfs-test-zonal-${_SHORT_BUILD_ID}' && \ export STORAGE_EMULATOR_HOST=https://storage.googleapis.com && \ export GCSFS_TEST_PROJECT=${PROJECT_ID} && \ diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index b2ecb260..2efb14bf 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -1207,6 +1207,37 @@ async def _do_list_objects( **kwargs, ) + async def _cp_file(self, path1, path2, acl=None, **kwargs): + """Duplicate remote file. + + For Standard GCS buckets, falls back to the parent class's implementation + + Zonal Bucket Support: + Server-side copy is currently NOT supported for Zonal buckets because + the `RewriteObject` API is unavailable for them. + + The following scenarios will raise a `NotImplementedError`: + * Intra-zonal: Copying within the same Zonal bucket. + * Inter-zonal: Copying between two different Zonal buckets. + * Mixed: Copying between a Zonal bucket and a Standard bucket. + + """ + b1, _, _ = self.split_path(path1) + b2, _, _ = self.split_path(path2) + + is_zonal_source = await self._is_zonal_bucket(b1) + is_zonal_dest = await self._is_zonal_bucket(b2) + + # 1. Standard -> Standard (Delegate to core implementation) + if not is_zonal_source and not is_zonal_dest: + return await super()._cp_file(path1, path2, acl=acl, **kwargs) + + # 2. Zonal Scenarios (Currently Unsupported) + raise NotImplementedError( + "Server-side copy involving Zonal buckets is not supported. " + "Zonal objects do not support rewrite." + ) + async def upload_chunk(fs, location, data, offset, size, content_type): """ diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index b357920c..9a9545f9 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -5,6 +5,7 @@ import os import random import threading +import uuid from concurrent.futures import ThreadPoolExecutor from itertools import chain from unittest import mock @@ -1597,3 +1598,59 @@ async def test_get_file_exception_cleanup( # The local file should not exist after the failed download assert not lpath.exists() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "source_bucket, dest_bucket, should_fail", + [ + (TEST_ZONAL_BUCKET, TEST_ZONAL_BUCKET, True), + (TEST_ZONAL_BUCKET, TEST_BUCKET, True), + (TEST_BUCKET, TEST_ZONAL_BUCKET, True), + (TEST_BUCKET, TEST_BUCKET, False), + ], +) +async def test_cp_file_not_implemented_error( + async_gcs, source_bucket, dest_bucket, should_fail +): + """ + Tests _cp_file behavior for combinations of Zonal and Standard buckets. + """ + short_uuid = str(uuid.uuid4())[:8] + source_path = f"{source_bucket}/source_{short_uuid}" + dest_path = f"{dest_bucket}/dest_{short_uuid}" + is_real_gcs = os.getenv("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" + + # Source file needs to exist for last case when super method is called for standard buckets + if is_real_gcs: + await async_gcs._pipe_file(source_path, b"test data", finalize_on_close=True) + + async def mock_is_zonal(bucket): + return bucket == TEST_ZONAL_BUCKET + + is_zonal_patch_cm = ( + mock.patch.object(async_gcs, "_is_zonal_bucket", side_effect=mock_is_zonal) + if not is_real_gcs + else contextlib.nullcontext() + ) + + with is_zonal_patch_cm: + if should_fail: + with pytest.raises( + NotImplementedError, + match=( + r"Server-side copy involving Zonal buckets is not supported. " + r"Zonal objects do not support rewrite." + ), + ): + await async_gcs._cp_file(source_path, dest_path) + else: # Standard -> Standard + if is_real_gcs: + await async_gcs._cp_file(source_path, dest_path) + assert await async_gcs._cat(dest_path) == b"test data" + else: + with mock.patch( + "gcsfs.core.GCSFileSystem._cp_file", new_callable=mock.AsyncMock + ) as mock_super_cp: + await async_gcs._cp_file(source_path, dest_path) + mock_super_cp.assert_awaited_once() From 5dd316a5908a095d0d321f5f8af9042cfba0aa86 Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 19 Jan 2026 10:29:10 +0000 Subject: [PATCH 03/10] update ulimit to avoid too many open file error --- cloudbuild/e2e-tests-cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild/e2e-tests-cloudbuild.yaml b/cloudbuild/e2e-tests-cloudbuild.yaml index c8c3bee8..b871f2c0 100644 --- a/cloudbuild/e2e-tests-cloudbuild.yaml +++ b/cloudbuild/e2e-tests-cloudbuild.yaml @@ -147,7 +147,7 @@ steps: export GCSFS_TEST_PROJECT=${PROJECT_ID} && \ export GCSFS_TEST_KMS_KEY=projects/${PROJECT_ID}/locations/${_REGION}/keyRings/${_GCSFS_KEY_RING_NAME}/cryptoKeys/${_GCSFS_KEY_NAME} && \ echo '--- Running Zonal tests on VM ---' && \ - ulimit -n 2048 && export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' && \ + ulimit -n 4096 && export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' && \ pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main && \ pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' --color=no gcsfs/tests/test_extended_gcsfs.py gcsfs/tests/test_zonal_file.py gcsfs/tests/test_async_gcsfs.py " From d3382a4628ea7bd6f1ebded37cbf7c6387dc4460 Mon Sep 17 00:00:00 2001 From: suni72 Date: Thu, 22 Jan 2026 09:52:58 +0000 Subject: [PATCH 04/10] use different regional bucket for zonal cp test --- cloudbuild/e2e-tests-cloudbuild.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cloudbuild/e2e-tests-cloudbuild.yaml b/cloudbuild/e2e-tests-cloudbuild.yaml index b871f2c0..34a4f970 100644 --- a/cloudbuild/e2e-tests-cloudbuild.yaml +++ b/cloudbuild/e2e-tests-cloudbuild.yaml @@ -29,6 +29,8 @@ steps: set -e echo "--- Creating standard bucket ---" gcloud storage buckets create gs://gcsfs-test-standard-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} & + echo "--- Creating standard bucket for zonal tests ---" + gcloud storage buckets create gs://gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} & echo "--- Creating versioned bucket ---" gcloud storage buckets create gs://gcsfs-test-versioned-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} & echo "--- Creating HNS bucket ---" @@ -141,7 +143,7 @@ steps: TEST_SCRIPT=" source env/bin/activate && \ echo '--- Preparing test environment for zonal tests ---' && \ - export GCSFS_TEST_BUCKET='gcsfs-test-standard-${_SHORT_BUILD_ID}' && \ + export GCSFS_TEST_BUCKET='gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID}' && \ export GCSFS_ZONAL_TEST_BUCKET='gcsfs-test-zonal-${_SHORT_BUILD_ID}' && \ export STORAGE_EMULATOR_HOST=https://storage.googleapis.com && \ export GCSFS_TEST_PROJECT=${PROJECT_ID} && \ @@ -222,6 +224,7 @@ steps: set -e echo "--- Deleting test buckets in parallel ---" gcloud storage rm --recursive gs://gcsfs-test-standard-${_SHORT_BUILD_ID} & + gcloud storage rm --recursive gs://gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID} & gcloud storage rm --recursive gs://gcsfs-test-versioned-${_SHORT_BUILD_ID} & gcloud storage rm --recursive gs://gcsfs-test-hns-${_SHORT_BUILD_ID} & gcloud storage rm --recursive gs://gcsfs-test-zonal-${_SHORT_BUILD_ID} & From a75bbfb708ab62c4fb5cff854357b9866ea9c2be Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 26 Jan 2026 17:09:38 +0000 Subject: [PATCH 05/10] support custom flush interval for zonal writes --- gcsfs/extended_gcsfs.py | 17 +++++++++++++-- gcsfs/tests/test_extended_gcsfs.py | 8 ++++++- gcsfs/tests/test_zb_hns_utils.py | 35 ++++++++++++++++++++++++++++-- gcsfs/tests/test_zonal_file.py | 32 ++++++++++++++++++++++++--- gcsfs/zb_hns_utils.py | 12 ++++++++-- gcsfs/zonal_file.py | 23 ++++++++++++++++---- 6 files changed, 113 insertions(+), 14 deletions(-) diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 2efb14bf..f1570267 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -13,6 +13,7 @@ from google.auth.credentials import AnonymousCredentials from google.cloud import storage_control_v2 from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient @@ -22,7 +23,7 @@ from gcsfs import __version__ as version from gcsfs import zb_hns_utils -from gcsfs.core import GCSFile, GCSFileSystem +from gcsfs.core import DEFAULT_BLOCK_SIZE, GCSFile, GCSFileSystem from gcsfs.zonal_file import ZonalFile logger = logging.getLogger("gcsfs") @@ -173,11 +174,23 @@ def _open( """ bucket, _, _ = self.split_path(path) bucket_type = self._sync_lookup_bucket_type(bucket) + + # Choose correct block_size if not explicitly provided + if block_size is None: + block_size = self.default_block_size + # If we are using the generic default (user didn't override it), + # switch to the Zonal-optimized default for Zonal buckets. + if ( + bucket_type == BucketType.ZONAL_HIERARCHICAL + and block_size == DEFAULT_BLOCK_SIZE + ): + block_size = _DEFAULT_FLUSH_INTERVAL_BYTES + return gcs_file_types[bucket_type]( self, path, mode, - block_size=block_size or self.default_block_size, + block_size=block_size, cache_type=cache_type, cache_options=cache_options, consistency=consistency or self.consistency, diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index 9a9545f9..5e13d1b6 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -13,6 +13,7 @@ import pytest from google.api_core.exceptions import NotFound from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( @@ -202,13 +203,18 @@ def test_open_uses_correct_blocksize_and_consistency_for_all_bucket_types( def test_open_uses_default_blocksize_and_consistency_from_fs( extended_gcsfs, gcs_bucket_mocks, bucket_type_val ): + if extended_gcsfs.on_google: + pytest.skip("Cannot mock bucket_types on real GCS") csv_file = "2014-01-01.csv" csv_file_path = f"{TEST_ZONAL_BUCKET}/{csv_file}" csv_data = csv_files[csv_file] with gcs_bucket_mocks(csv_data, bucket_type_val=bucket_type_val): with extended_gcsfs.open(csv_file_path, "rb") as f: - assert f.blocksize == extended_gcsfs.default_block_size + if bucket_type_val == BucketType.ZONAL_HIERARCHICAL: + assert f.blocksize == _DEFAULT_FLUSH_INTERVAL_BYTES + else: + assert f.blocksize == extended_gcsfs.default_block_size assert type(f.checker) is ConsistencyChecker diff --git a/gcsfs/tests/test_zb_hns_utils.py b/gcsfs/tests/test_zb_hns_utils.py index 77c287d1..a1d3a675 100644 --- a/gcsfs/tests/test_zb_hns_utils.py +++ b/gcsfs/tests/test_zb_hns_utils.py @@ -42,8 +42,8 @@ async def test_init_aaow(): """ mock_writer_instance = mock.AsyncMock() with mock.patch( - "gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", # The class to patch - new_callable=mock.Mock, # Use a regular Mock for the class + "gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", + new_callable=mock.Mock, return_value=mock_writer_instance, ) as mock_writer_class: result = await zb_hns_utils.init_aaow( @@ -55,6 +55,37 @@ async def test_init_aaow(): bucket_name=bucket_name, object_name=object_name, generation=generation, + writer_options={}, + ) + mock_writer_instance.open.assert_awaited_once() + assert result is mock_writer_instance + + +@pytest.mark.asyncio +async def test_init_aaow_with_flush_interval_bytes(): + """ + Docstring for test_init_aaow_with_flush_interval_bytes + """ + mock_writer_instance = mock.AsyncMock() + with mock.patch( + "gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", + new_callable=mock.Mock, + return_value=mock_writer_instance, + ) as mock_writer_class: + result = await zb_hns_utils.init_aaow( + mock_grpc_client, + bucket_name, + object_name, + generation, + flush_interval_bytes=1024, + ) + + mock_writer_class.assert_called_once_with( + client=mock_grpc_client, + bucket_name=bucket_name, + object_name=object_name, + generation=generation, + writer_options={"FLUSH_INTERVAL_BYTES": 1024}, ) mock_writer_instance.open.assert_awaited_once() assert result is mock_writer_instance diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py index 76c0bfcf..e1e55c23 100644 --- a/gcsfs/tests/test_zonal_file.py +++ b/gcsfs/tests/test_zonal_file.py @@ -4,6 +4,9 @@ from unittest import mock import pytest +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _DEFAULT_FLUSH_INTERVAL_BYTES, +) from gcsfs.tests.settings import TEST_ZONAL_BUCKET from gcsfs.tests.utils import tempdir, tmpfile @@ -68,7 +71,26 @@ def test_zonal_file_open_write_mode(extended_gcsfs, zonal_write_mocks, file_path if zonal_write_mocks: zonal_write_mocks["init_aaow"].assert_called_once_with( - extended_gcsfs.grpc_client, bucket, key, None + extended_gcsfs.grpc_client, bucket, key, None, _DEFAULT_FLUSH_INTERVAL_BYTES + ) + else: + assert extended_gcsfs.exists(file_path) + + +def test_zonal_file_open_write_mode_with_custom_flush_interval_bytes( + extended_gcsfs, zonal_write_mocks, file_path +): + """Test that opening a ZonalFile in write mode initializes the writer.""" + bucket, key, _ = extended_gcsfs.split_path(file_path) + custom_flush_interval_bytes = 4 * 1024 * 1024 + with extended_gcsfs.open( + file_path, "wb", block_size=custom_flush_interval_bytes, finalize_on_close=True + ): + pass + + if zonal_write_mocks: + zonal_write_mocks["init_aaow"].assert_called_once_with( + extended_gcsfs.grpc_client, bucket, key, None, custom_flush_interval_bytes ) else: assert extended_gcsfs.exists(file_path) @@ -85,7 +107,11 @@ def test_zonal_file_open_append_mode(extended_gcsfs, zonal_write_mocks, file_pat # check _info is called to get the generation zonal_write_mocks["_gcsfs_info"].assert_awaited_once_with(file_path) zonal_write_mocks["init_aaow"].assert_called_once_with( - extended_gcsfs.grpc_client, bucket, key, "12345" + extended_gcsfs.grpc_client, + bucket, + key, + "12345", + _DEFAULT_FLUSH_INTERVAL_BYTES, ) else: assert extended_gcsfs.cat(file_path) == b"data" @@ -112,7 +138,7 @@ def test_zonal_file_open_append_mode_nonexistent_file( if zonal_write_mocks: # init_aaow should be called with generation=None zonal_write_mocks["init_aaow"].assert_called_once_with( - extended_gcsfs.grpc_client, bucket, key, None + extended_gcsfs.grpc_client, bucket, key, None, _DEFAULT_FLUSH_INTERVAL_BYTES ) # _info is called to get the generation, but it fails extended_gcsfs._info.assert_awaited_once() diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index 57d5d842..be1d1db9 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -1,6 +1,7 @@ from io import BytesIO from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) @@ -17,16 +18,23 @@ async def download_range(offset, length, mrd): return buffer.getvalue() -async def init_aaow(grpc_client, bucket_name, object_name, generation=None): +async def init_aaow( + grpc_client, bucket_name, object_name, generation=None, flush_interval_bytes=None +): """ Creates and opens the AsyncAppendableObjectWriter. """ - + writer_options = {} + # Only pass flush_interval_bytes if the user explicitly provided a + # non-default flush interval. + if flush_interval_bytes and flush_interval_bytes != _DEFAULT_FLUSH_INTERVAL_BYTES: + writer_options["FLUSH_INTERVAL_BYTES"] = flush_interval_bytes writer = AsyncAppendableObjectWriter( client=grpc_client, bucket_name=bucket_name, object_name=object_name, generation=generation, + writer_options=writer_options, ) await writer.open() return writer diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index f200a219..5e99d508 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -1,12 +1,15 @@ import logging from fsspec import asyn +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _DEFAULT_FLUSH_INTERVAL_BYTES, +) from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) from gcsfs import zb_hns_utils -from gcsfs.core import DEFAULT_BLOCK_SIZE, GCSFile +from gcsfs.core import GCSFile logger = logging.getLogger("gcsfs.zonal_file") @@ -22,7 +25,7 @@ def __init__( gcsfs, path, mode="rb", - block_size=DEFAULT_BLOCK_SIZE, + block_size=_DEFAULT_FLUSH_INTERVAL_BYTES, autocommit=True, cache_type="readahead", cache_options=None, @@ -45,6 +48,11 @@ def __init__( a `with` block or closing, the file will not be automatically finalized. To ensure the write is finalized, `.commit()` must be called explicitly or `finalize_on_close` must be set to `True` when opening the file. + + For Zonal buckets, `block_size` is set to `_DEFAULT_FLUSH_INTERVAL_BYTES` (16MiB) by default. + block_size is applicable for write mode and refers to the number of bytes to append before + "persisting" data in GCS. Must be a multiple of `_MAX_CHUNK_SIZE_BYTES` (2MiB). + """ bucket, key, generation = gcsfs._split_path(path) if not key: @@ -72,6 +80,7 @@ def __init__( bucket, key, generation, + block_size, ) else: raise NotImplementedError( @@ -109,7 +118,9 @@ async def _init_mrd(self, bucket_name, object_name, generation=None): self.gcsfs.grpc_client, bucket_name, object_name, generation ) - async def _init_aaow(self, bucket_name, object_name, generation=None): + async def _init_aaow( + self, bucket_name, object_name, generation=None, flush_interval_bytes=None + ): """ Initializes the AsyncAppendableObjectWriter. """ @@ -124,7 +135,11 @@ async def _init_aaow(self, bucket_name, object_name, generation=None): pass await self.gcsfs._get_grpc_client() return await zb_hns_utils.init_aaow( - self.gcsfs.grpc_client, bucket_name, object_name, generation + self.gcsfs.grpc_client, + bucket_name, + object_name, + generation, + flush_interval_bytes, ) def _fetch_range(self, start, end): From 2cd21fbb0cc1b3867f108104fb6bcf441e8c82cc Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 26 Jan 2026 17:31:03 +0000 Subject: [PATCH 06/10] add few method description and update warning/error statements --- gcsfs/extended_gcsfs.py | 24 ++++++++++++++++++++++++ gcsfs/tests/test_zonal_file.py | 2 +- gcsfs/zonal_file.py | 8 +++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index f1570267..f9878f15 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -1139,6 +1139,24 @@ async def _pipe_file( self.invalidate_cache(self._parent(path)) async def _get_file(self, rpath, lpath, callback=None, **kwargs): + """ + Downloads a file from GCS to a local path. + + For Zonal buckets, it uses gRPC client for optimized downloads. + For Standard buckets, it delegates to the parent class implementation. + + Parameters + ---------- + rpath: str + Path on GCS to download the file from. + lpath: str + Path to the local file to be downloaded. + callback: fsspec.callbacks.Callback, optional + Callback to monitor the download progress. + **kwargs: + For Zonal buckets, `chunksize` bytes (int) can be provided to control + the download chunk size (default is 128KB). + """ bucket, key, generation = self.split_path(rpath) if not await self._is_zonal_bucket(bucket): return await super()._get_file( @@ -1207,6 +1225,12 @@ async def _do_list_objects( versions=False, **kwargs, ): + """ + Lists objects in a bucket. + + For HNS-enabled buckets, it sets `includeFoldersAsPrefixes` to True + when the delimiter is '/'. + """ bucket, _, _ = self.split_path(path) if await self._is_bucket_hns_enabled(bucket) and delimiter == "/": kwargs["includeFoldersAsPrefixes"] = "true" diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py index e1e55c23..f7ea3416 100644 --- a/gcsfs/tests/test_zonal_file.py +++ b/gcsfs/tests/test_zonal_file.py @@ -214,7 +214,7 @@ def test_zonal_file_double_finalize_warning( with mock.patch("gcsfs.zonal_file.logger") as mock_logger: f.commit() mock_logger.warning.assert_called_once_with( - "This file has already been finalized." + "This file has already been finalized. Ignoring commit call." ) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 5e99d508..863038f5 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -164,7 +164,7 @@ def write(self, data): if self.closed: raise ValueError("I/O operation on closed file.") if not self.writable(): - raise ValueError("File not in write mode") + raise ValueError("File not in write mode.") if self.forced: raise ValueError("This file has been force-flushed, can only close") @@ -196,10 +196,12 @@ def commit(self): Commits the write by finalizing the AsyncAppendableObjectWriter. """ if not self.writable(): # No-op - logger.warning("File not in write mode.") + logger.warning("File not in write mode. Ignoring commit call.") return if self.finalized: # No-op - logger.warning("This file has already been finalized.") + logger.warning( + "This file has already been finalized. Ignoring commit call." + ) return asyn.sync(self.gcsfs.loop, self.aaow.finalize) self.finalized = True From 09f572fec42246b163a353e1baa1c372362e9e38 Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 26 Jan 2026 17:55:24 +0000 Subject: [PATCH 07/10] corrected aaow creation logic in zonalfile init --- gcsfs/zonal_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 863038f5..91a792ba 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -73,7 +73,7 @@ def __init__( "AsyncMultiRangeDownloader (MRD) exists but has no 'persisted_size'. " "This may result in incorrect behavior for unfinalized objects." ) - elif "w" or "a" in self.mode: + elif "w" in self.mode or "a" in self.mode: self.aaow = asyn.sync( self.gcsfs.loop, self._init_aaow, From dfdc0beb08987dcd99771632980c88fd1ffa5d9e Mon Sep 17 00:00:00 2001 From: suni72 Date: Mon, 26 Jan 2026 18:40:40 +0000 Subject: [PATCH 08/10] added logging for download_range to track byte requested and downloaded --- gcsfs/zb_hns_utils.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index be1d1db9..a111b89c 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -1,3 +1,4 @@ +import logging from io import BytesIO from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( @@ -5,6 +6,8 @@ AsyncAppendableObjectWriter, ) +logger = logging.getLogger("gcsfs") + async def download_range(offset, length, mrd): """ @@ -15,7 +18,11 @@ async def download_range(offset, length, mrd): return b"" buffer = BytesIO() await mrd.download_ranges([(offset, length, buffer)]) - return buffer.getvalue() + data = buffer.getvalue() + logger.info( + f"Requested {length} bytes from offset {offset}, downloaded {len(data)} bytes" + ) + return data async def init_aaow( From ceb992fa8e2b977f5ac6bfa958811185f94fbdd7 Mon Sep 17 00:00:00 2001 From: suni72 Date: Fri, 30 Jan 2026 14:09:02 +0000 Subject: [PATCH 09/10] Update imports for python sdk Update grpc_client since MRD and AAOW now accept AsyncGrpcClient object --- gcsfs/core.py | 2 +- gcsfs/extended_gcsfs.py | 8 ++++---- gcsfs/tests/conftest.py | 2 +- gcsfs/tests/test_extended_gcsfs.py | 4 ++-- gcsfs/tests/test_zonal_file.py | 2 +- gcsfs/zb_hns_utils.py | 2 +- gcsfs/zonal_file.py | 4 ++-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 564141b4..c9d7353a 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -2157,7 +2157,7 @@ async def upload_chunk(fs, location, data, offset, size, content_type): Uploads a chunk of data. This function has a conditional path to support experimental features for Zonal buckets to append data using gRPC. """ - from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + from google.cloud.storage.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index f9878f15..827b2af8 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -12,12 +12,12 @@ from google.api_core.client_info import ClientInfo from google.auth.credentials import AnonymousCredentials from google.cloud import storage_control_v2 -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) -from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) @@ -85,7 +85,7 @@ async def _get_grpc_client(self): self._grpc_client = AsyncGrpcClient( credentials=self.credential, client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), - ).grpc_client + ) return self._grpc_client async def _get_control_plane_client(self): diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index 4f4aaf74..669badf6 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -11,7 +11,7 @@ import pytest_asyncio import requests from google.cloud import storage -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( AsyncAppendableObjectWriter, ) diff --git a/gcsfs/tests/test_extended_gcsfs.py b/gcsfs/tests/test_extended_gcsfs.py index 5e13d1b6..f5eb2104 100644 --- a/gcsfs/tests/test_extended_gcsfs.py +++ b/gcsfs/tests/test_extended_gcsfs.py @@ -12,11 +12,11 @@ import pytest from google.api_core.exceptions import NotFound -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) from google.cloud.storage.exceptions import DataCorruption diff --git a/gcsfs/tests/test_zonal_file.py b/gcsfs/tests/test_zonal_file.py index f7ea3416..3bd748a8 100644 --- a/gcsfs/tests/test_zonal_file.py +++ b/gcsfs/tests/test_zonal_file.py @@ -4,7 +4,7 @@ from unittest import mock import pytest -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( _DEFAULT_FLUSH_INTERVAL_BYTES, ) diff --git a/gcsfs/zb_hns_utils.py b/gcsfs/zb_hns_utils.py index a111b89c..063b441c 100644 --- a/gcsfs/zb_hns_utils.py +++ b/gcsfs/zb_hns_utils.py @@ -1,7 +1,7 @@ import logging from io import BytesIO -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( _DEFAULT_FLUSH_INTERVAL_BYTES, AsyncAppendableObjectWriter, ) diff --git a/gcsfs/zonal_file.py b/gcsfs/zonal_file.py index 91a792ba..488c36a0 100644 --- a/gcsfs/zonal_file.py +++ b/gcsfs/zonal_file.py @@ -1,10 +1,10 @@ import logging from fsspec import asyn -from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( +from google.cloud.storage.asyncio.async_appendable_object_writer import ( _DEFAULT_FLUSH_INTERVAL_BYTES, ) -from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( +from google.cloud.storage.asyncio.async_multi_range_downloader import ( AsyncMultiRangeDownloader, ) From da12664d1472dc98bbe0a08bb812ca2a1b453638 Mon Sep 17 00:00:00 2001 From: suni72 Date: Fri, 30 Jan 2026 14:21:33 +0000 Subject: [PATCH 10/10] update ci tests to use latest changes from python-storage update comment in test_zb_hns_utils.py --- .github/workflows/ci.yml | 1 + cloudbuild/e2e-tests-cloudbuild.yaml | 2 +- gcsfs/tests/test_zb_hns_utils.py | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c5f92dd..35cfe645 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,7 @@ jobs: - name: install run: | pip install -e . + pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main - name: Run Standard Tests run: | export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json diff --git a/cloudbuild/e2e-tests-cloudbuild.yaml b/cloudbuild/e2e-tests-cloudbuild.yaml index 34a4f970..601af331 100644 --- a/cloudbuild/e2e-tests-cloudbuild.yaml +++ b/cloudbuild/e2e-tests-cloudbuild.yaml @@ -101,6 +101,7 @@ steps: pip install --upgrade pip > /dev/null # Install testing libraries explicitly, as they are not in setup.py pip install pytest pytest-timeout pytest-subtests pytest-asyncio fusepy google-cloud-storage > /dev/null + pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main pip install -e . > /dev/null " @@ -150,7 +151,6 @@ steps: export GCSFS_TEST_KMS_KEY=projects/${PROJECT_ID}/locations/${_REGION}/keyRings/${_GCSFS_KEY_RING_NAME}/cryptoKeys/${_GCSFS_KEY_NAME} && \ echo '--- Running Zonal tests on VM ---' && \ ulimit -n 4096 && export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' && \ - pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main && \ pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' --color=no gcsfs/tests/test_extended_gcsfs.py gcsfs/tests/test_zonal_file.py gcsfs/tests/test_async_gcsfs.py " gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="$$TEST_SCRIPT" diff --git a/gcsfs/tests/test_zb_hns_utils.py b/gcsfs/tests/test_zb_hns_utils.py index a1d3a675..d7f9cb6e 100644 --- a/gcsfs/tests/test_zb_hns_utils.py +++ b/gcsfs/tests/test_zb_hns_utils.py @@ -64,7 +64,8 @@ async def test_init_aaow(): @pytest.mark.asyncio async def test_init_aaow_with_flush_interval_bytes(): """ - Docstring for test_init_aaow_with_flush_interval_bytes + Tests that init_aaow correctly passes the flush_interval_bytes + parameter to the AsyncAppendableObjectWriter. """ mock_writer_instance = mock.AsyncMock() with mock.patch(