Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ async def create_mrd(
client: AsyncGrpcClient,
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
generation: Optional[int] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
**kwargs,
) -> AsyncMultiRangeDownloader:
"""Initializes a MultiRangeDownloader and opens the underlying bidi-gRPC
object for reading.
Expand All @@ -145,8 +146,8 @@ async def create_mrd(
:type object_name: str
:param object_name: The name of the object to be read.

:type generation_number: int
:param generation_number: (Optional) If present, selects a specific
:type generation: int
:param generation: (Optional) If present, selects a specific
revision of this object.

:type read_handle: _storage_v2.BidiReadHandle
Expand All @@ -162,7 +163,14 @@ async def create_mrd(
:rtype: :class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.AsyncMultiRangeDownloader`
:returns: An initialized AsyncMultiRangeDownloader instance for reading.
"""
mrd = cls(client, bucket_name, object_name, generation_number, read_handle)
mrd = cls(
client,
bucket_name,
object_name,
generation=generation,
read_handle=read_handle,
**kwargs,
)
await mrd.open(retry_policy=retry_policy, metadata=metadata)
return mrd

Expand All @@ -171,8 +179,9 @@ def __init__(
client: AsyncGrpcClient,
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
generation: Optional[int] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
**kwargs,
) -> None:
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
Expand All @@ -186,20 +195,27 @@ def __init__(
:type object_name: str
:param object_name: The name of the object to be read.

:type generation_number: int
:param generation_number: (Optional) If present, selects a specific revision of
:type generation: int
:param generation: (Optional) If present, selects a specific revision of
this object.

:type read_handle: _storage_v2.BidiReadHandle
:param read_handle: (Optional) An existing read handle.
"""
if "generation_number" in kwargs:
if generation is not None:
raise TypeError(
"Cannot set both 'generation' and 'generation_number'. "
"Use 'generation' for new code."
)
generation = kwargs.pop("generation_number")

raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
self.generation_number = generation_number
self.generation = generation
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
self._is_stream_open: bool = False
Expand Down Expand Up @@ -276,7 +292,7 @@ async def _do_open():
client=self.client.grpc_client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
generation_number=self.generation,
read_handle=self.read_handle,
)

Expand All @@ -291,7 +307,7 @@ async def _do_open():
)

if self.read_obj_str.generation_number:
self.generation_number = self.read_obj_str.generation_number
self.generation = self.read_obj_str.generation_number
if self.read_obj_str.read_handle:
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
Expand Down Expand Up @@ -435,7 +451,7 @@ async def generator():
client=self.client.grpc_client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
generation_number=self.generation,
read_handle=current_handle,
)

Expand Down
3 changes: 2 additions & 1 deletion tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ async def _run():

event_loop.run_until_complete(_run())

@pytest.mark.skip(reason='Flaky test b/478129078')

@pytest.mark.skip(reason="Flaky test b/478129078")
def test_mrd_open_with_read_handle(event_loop, grpc_client_direct):
object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"

Expand Down
54 changes: 48 additions & 6 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def _make_mock_mrd(
mock_cls_async_read_object_stream,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
generation=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
):
mock_client = mock.MagicMock()
Expand All @@ -62,7 +62,7 @@ async def _make_mock_mrd(
mock_stream.read_handle = _TEST_READ_HANDLE

mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_client, bucket_name, object_name, generation_number, read_handle
mock_client, bucket_name, object_name, generation, read_handle
)

return mrd, mock_client
Expand All @@ -89,7 +89,7 @@ async def test_create_mrd(self, mock_cls_async_read_object_stream):
assert mrd.client == mock_client
assert mrd.bucket_name == _TEST_BUCKET_NAME
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.generation == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE
assert mrd.persisted_size == _TEST_OBJECT_SIZE
assert mrd.is_stream_open
Expand Down Expand Up @@ -303,9 +303,7 @@ async def test_downloading_without_opening_should_throw_error(self):
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c")
def test_init_raises_if_crc32c_c_extension_is_missing(
self, mock_google_crc32c
):
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
mock_google_crc32c.implementation = "python"
mock_client = mock.MagicMock()

Expand Down Expand Up @@ -405,3 +403,47 @@ async def close_side_effect():

mock_close.assert_called_once()
assert not mrd.is_stream_open

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_create_mrd_with_generation_number(
self, mock_cls_async_read_object_stream
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()

mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = _TEST_GENERATION_NUMBER
mock_stream.persisted_size = _TEST_OBJECT_SIZE
mock_stream.read_handle = _TEST_READ_HANDLE

# Act
mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_client,
_TEST_BUCKET_NAME,
_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
)

# Assert
assert mrd.generation == _TEST_GENERATION_NUMBER

@pytest.mark.asyncio
async def test_create_mrd_with_both_generation_and_generation_number(self):
# Arrange
mock_client = mock.MagicMock()

# Act & Assert
with pytest.raises(TypeError):
await AsyncMultiRangeDownloader.create_mrd(
mock_client,
_TEST_BUCKET_NAME,
_TEST_OBJECT_NAME,
generation=_TEST_GENERATION_NUMBER,
generation_number=_TEST_GENERATION_NUMBER,
)