Skip to content
Open
13 changes: 13 additions & 0 deletions .github/.codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
codecov:
require_ci_to_pass: yes

coverage:
status:
project:
default:
target: auto
threshold: 0% # Allow 0% drop.
patch:
default:
target: 80%
threshold: 0%
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
- name: install
run: |
pip install -e .
pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main
- name: Run Standard Tests
run: |
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
Expand All @@ -51,9 +52,16 @@ jobs:
export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/gcsfs/tests/fake-secret.json
export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true"
pytest -vv -s \
--cov=gcsfs --cov-report=xml \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--log-date-format="%H:%M:%S" \
gcsfs/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./coverage.xml
fail_ci_if_error: true

lint:
name: lint
Expand Down
8 changes: 6 additions & 2 deletions cloudbuild/e2e-tests-cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ steps:
set -e
echo "--- Creating standard bucket ---"
gcloud storage buckets create gs://gcsfs-test-standard-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} &
echo "--- Creating standard bucket for zonal tests ---"
gcloud storage buckets create gs://gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} &
echo "--- Creating versioned bucket ---"
gcloud storage buckets create gs://gcsfs-test-versioned-${_SHORT_BUILD_ID} --project=${PROJECT_ID} --location=${_REGION} &
echo "--- Creating HNS bucket ---"
Expand Down Expand Up @@ -99,6 +101,7 @@ steps:
pip install --upgrade pip > /dev/null
# Install testing libraries explicitly, as they are not in setup.py
pip install pytest pytest-timeout pytest-subtests pytest-asyncio fusepy google-cloud-storage > /dev/null
pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main
pip install -e . > /dev/null
"

Expand Down Expand Up @@ -141,13 +144,13 @@ steps:
TEST_SCRIPT="
source env/bin/activate && \
echo '--- Preparing test environment for zonal tests ---' && \
export GCSFS_TEST_BUCKET='gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID}' && \
export GCSFS_ZONAL_TEST_BUCKET='gcsfs-test-zonal-${_SHORT_BUILD_ID}' && \
export STORAGE_EMULATOR_HOST=https://storage.googleapis.com && \
export GCSFS_TEST_PROJECT=${PROJECT_ID} && \
export GCSFS_TEST_KMS_KEY=projects/${PROJECT_ID}/locations/${_REGION}/keyRings/${_GCSFS_KEY_RING_NAME}/cryptoKeys/${_GCSFS_KEY_NAME} && \
echo '--- Running Zonal tests on VM ---' && \
ulimit -n 2048 && export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' && \
pip install --upgrade --force-reinstall git+https://github.com/googleapis/python-storage.git@main && \
ulimit -n 4096 && export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT='true' && \
pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' --color=no gcsfs/tests/test_extended_gcsfs.py gcsfs/tests/test_zonal_file.py gcsfs/tests/test_async_gcsfs.py
"
gcloud compute ssh gcsfs-test-vm-${_SHORT_BUILD_ID} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="$$TEST_SCRIPT"
Expand Down Expand Up @@ -221,6 +224,7 @@ steps:
set -e
echo "--- Deleting test buckets in parallel ---"
gcloud storage rm --recursive gs://gcsfs-test-standard-${_SHORT_BUILD_ID} &
gcloud storage rm --recursive gs://gcsfs-test-standard-for-zonal-${_SHORT_BUILD_ID} &
gcloud storage rm --recursive gs://gcsfs-test-versioned-${_SHORT_BUILD_ID} &
gcloud storage rm --recursive gs://gcsfs-test-hns-${_SHORT_BUILD_ID} &
gcloud storage rm --recursive gs://gcsfs-test-zonal-${_SHORT_BUILD_ID} &
Expand Down
1 change: 1 addition & 0 deletions environment_gcsfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies:
- pytest
- pytest-timeout
- pytest-asyncio
- pytest-cov
- pytest-subtests
- requests
- ujson
Expand Down
2 changes: 1 addition & 1 deletion gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2157,7 +2157,7 @@ async def upload_chunk(fs, location, data, offset, size, content_type):
Uploads a chunk of data. This function has a conditional path to support
experimental features for Zonal buckets to append data using gRPC.
"""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)

Expand Down
80 changes: 74 additions & 6 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
from google.api_core.client_info import ClientInfo
from google.auth.credentials import AnonymousCredentials
from google.cloud import storage_control_v2
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
_DEFAULT_FLUSH_INTERVAL_BYTES,
AsyncAppendableObjectWriter,
)
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

from gcsfs import __version__ as version
from gcsfs import zb_hns_utils
from gcsfs.core import GCSFile, GCSFileSystem
from gcsfs.core import DEFAULT_BLOCK_SIZE, GCSFile, GCSFileSystem
from gcsfs.zonal_file import ZonalFile

logger = logging.getLogger("gcsfs")
Expand Down Expand Up @@ -84,7 +85,7 @@ async def _get_grpc_client(self):
self._grpc_client = AsyncGrpcClient(
credentials=self.credential,
client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"),
).grpc_client
)
return self._grpc_client

async def _get_control_plane_client(self):
Expand Down Expand Up @@ -173,11 +174,23 @@ def _open(
"""
bucket, _, _ = self.split_path(path)
bucket_type = self._sync_lookup_bucket_type(bucket)

# Choose correct block_size if not explicitly provided
if block_size is None:
block_size = self.default_block_size
# If we are using the generic default (user didn't override it),
# switch to the Zonal-optimized default for Zonal buckets.
if (
bucket_type == BucketType.ZONAL_HIERARCHICAL
and block_size == DEFAULT_BLOCK_SIZE
):
block_size = _DEFAULT_FLUSH_INTERVAL_BYTES

return gcs_file_types[bucket_type](
self,
path,
mode,
block_size=block_size or self.default_block_size,
block_size=block_size,
cache_type=cache_type,
cache_options=cache_options,
consistency=consistency or self.consistency,
Expand Down Expand Up @@ -1126,6 +1139,24 @@ async def _pipe_file(
self.invalidate_cache(self._parent(path))

async def _get_file(self, rpath, lpath, callback=None, **kwargs):
"""
Downloads a file from GCS to a local path.

For Zonal buckets, it uses gRPC client for optimized downloads.
For Standard buckets, it delegates to the parent class implementation.

Parameters
----------
rpath: str
Path on GCS to download the file from.
lpath: str
Path to the local file to be downloaded.
callback: fsspec.callbacks.Callback, optional
Callback to monitor the download progress.
**kwargs:
For Zonal buckets, `chunksize` bytes (int) can be provided to control
the download chunk size (default is 128KB).
"""
bucket, key, generation = self.split_path(rpath)
if not await self._is_zonal_bucket(bucket):
return await super()._get_file(
Expand Down Expand Up @@ -1194,6 +1225,12 @@ async def _do_list_objects(
versions=False,
**kwargs,
):
"""
Lists objects in a bucket.

For HNS-enabled buckets, it sets `includeFoldersAsPrefixes` to True
when the delimiter is '/'.
"""
bucket, _, _ = self.split_path(path)
if await self._is_bucket_hns_enabled(bucket) and delimiter == "/":
kwargs["includeFoldersAsPrefixes"] = "true"
Expand All @@ -1207,6 +1244,37 @@ async def _do_list_objects(
**kwargs,
)

async def _cp_file(self, path1, path2, acl=None, **kwargs):
"""Duplicate remote file.

For Standard GCS buckets, falls back to the parent class's implementation

Zonal Bucket Support:
Server-side copy is currently NOT supported for Zonal buckets because
the `RewriteObject` API is unavailable for them.

The following scenarios will raise a `NotImplementedError`:
* Intra-zonal: Copying within the same Zonal bucket.
* Inter-zonal: Copying between two different Zonal buckets.
* Mixed: Copying between a Zonal bucket and a Standard bucket.

"""
b1, _, _ = self.split_path(path1)
b2, _, _ = self.split_path(path2)

is_zonal_source = await self._is_zonal_bucket(b1)
is_zonal_dest = await self._is_zonal_bucket(b2)

# 1. Standard -> Standard (Delegate to core implementation)
if not is_zonal_source and not is_zonal_dest:
return await super()._cp_file(path1, path2, acl=acl, **kwargs)

# 2. Zonal Scenarios (Currently Unsupported)
raise NotImplementedError(
"Server-side copy involving Zonal buckets is not supported. "
"Zonal objects do not support rewrite."
)


async def upload_chunk(fs, location, data, offset, size, content_type):
"""
Expand Down
2 changes: 1 addition & 1 deletion gcsfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pytest_asyncio
import requests
from google.cloud import storage
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)

Expand Down
69 changes: 66 additions & 3 deletions gcsfs/tests/test_extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
import os
import random
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
from unittest import mock

import pytest
from google.api_core.exceptions import NotFound
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
from google.cloud.storage.asyncio.async_appendable_object_writer import (
_DEFAULT_FLUSH_INTERVAL_BYTES,
AsyncAppendableObjectWriter,
)
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
from google.cloud.storage.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)
from google.cloud.storage.exceptions import DataCorruption
Expand Down Expand Up @@ -201,13 +203,18 @@ def test_open_uses_correct_blocksize_and_consistency_for_all_bucket_types(
def test_open_uses_default_blocksize_and_consistency_from_fs(
extended_gcsfs, gcs_bucket_mocks, bucket_type_val
):
if extended_gcsfs.on_google:
pytest.skip("Cannot mock bucket_types on real GCS")
csv_file = "2014-01-01.csv"
csv_file_path = f"{TEST_ZONAL_BUCKET}/{csv_file}"
csv_data = csv_files[csv_file]

with gcs_bucket_mocks(csv_data, bucket_type_val=bucket_type_val):
with extended_gcsfs.open(csv_file_path, "rb") as f:
assert f.blocksize == extended_gcsfs.default_block_size
if bucket_type_val == BucketType.ZONAL_HIERARCHICAL:
assert f.blocksize == _DEFAULT_FLUSH_INTERVAL_BYTES
else:
assert f.blocksize == extended_gcsfs.default_block_size
assert type(f.checker) is ConsistencyChecker


Expand Down Expand Up @@ -1597,3 +1604,59 @@ async def test_get_file_exception_cleanup(

# The local file should not exist after the failed download
assert not lpath.exists()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"source_bucket, dest_bucket, should_fail",
[
(TEST_ZONAL_BUCKET, TEST_ZONAL_BUCKET, True),
(TEST_ZONAL_BUCKET, TEST_BUCKET, True),
(TEST_BUCKET, TEST_ZONAL_BUCKET, True),
(TEST_BUCKET, TEST_BUCKET, False),
],
)
async def test_cp_file_not_implemented_error(
async_gcs, source_bucket, dest_bucket, should_fail
):
"""
Tests _cp_file behavior for combinations of Zonal and Standard buckets.
"""
short_uuid = str(uuid.uuid4())[:8]
source_path = f"{source_bucket}/source_{short_uuid}"
dest_path = f"{dest_bucket}/dest_{short_uuid}"
is_real_gcs = os.getenv("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com"

# Source file needs to exist for last case when super method is called for standard buckets
if is_real_gcs:
await async_gcs._pipe_file(source_path, b"test data", finalize_on_close=True)

async def mock_is_zonal(bucket):
return bucket == TEST_ZONAL_BUCKET

is_zonal_patch_cm = (
mock.patch.object(async_gcs, "_is_zonal_bucket", side_effect=mock_is_zonal)
if not is_real_gcs
else contextlib.nullcontext()
)

with is_zonal_patch_cm:
if should_fail:
with pytest.raises(
NotImplementedError,
match=(
r"Server-side copy involving Zonal buckets is not supported. "
r"Zonal objects do not support rewrite."
),
):
await async_gcs._cp_file(source_path, dest_path)
else: # Standard -> Standard
if is_real_gcs:
await async_gcs._cp_file(source_path, dest_path)
assert await async_gcs._cat(dest_path) == b"test data"
else:
with mock.patch(
"gcsfs.core.GCSFileSystem._cp_file", new_callable=mock.AsyncMock
) as mock_super_cp:
await async_gcs._cp_file(source_path, dest_path)
mock_super_cp.assert_awaited_once()
36 changes: 34 additions & 2 deletions gcsfs/tests/test_zb_hns_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ async def test_init_aaow():
"""
mock_writer_instance = mock.AsyncMock()
with mock.patch(
"gcsfs.zb_hns_utils.AsyncAppendableObjectWriter", # The class to patch
new_callable=mock.Mock, # Use a regular Mock for the class
"gcsfs.zb_hns_utils.AsyncAppendableObjectWriter",
new_callable=mock.Mock,
return_value=mock_writer_instance,
) as mock_writer_class:
result = await zb_hns_utils.init_aaow(
Expand All @@ -55,6 +55,38 @@ async def test_init_aaow():
bucket_name=bucket_name,
object_name=object_name,
generation=generation,
writer_options={},
)
mock_writer_instance.open.assert_awaited_once()
assert result is mock_writer_instance


@pytest.mark.asyncio
async def test_init_aaow_with_flush_interval_bytes():
"""
Tests that init_aaow correctly passes the flush_interval_bytes
parameter to the AsyncAppendableObjectWriter.
"""
mock_writer_instance = mock.AsyncMock()
with mock.patch(
"gcsfs.zb_hns_utils.AsyncAppendableObjectWriter",
new_callable=mock.Mock,
return_value=mock_writer_instance,
) as mock_writer_class:
result = await zb_hns_utils.init_aaow(
mock_grpc_client,
bucket_name,
object_name,
generation,
flush_interval_bytes=1024,
)

mock_writer_class.assert_called_once_with(
client=mock_grpc_client,
bucket_name=bucket_name,
object_name=object_name,
generation=generation,
writer_options={"FLUSH_INTERVAL_BYTES": 1024},
)
mock_writer_instance.open.assert_awaited_once()
assert result is mock_writer_instance
Loading
Loading