Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions google/cloud/storage/asyncio/async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ async def create_mrd(
client: AsyncGrpcClient,
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
generation: Optional[int] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
retry_policy: Optional[AsyncRetry] = None,
metadata: Optional[List[Tuple[str, str]]] = None,
**kwargs,
) -> AsyncMultiRangeDownloader:
"""Initializes a MultiRangeDownloader and opens the underlying bidi-gRPC
object for reading.
Expand All @@ -145,8 +146,8 @@ async def create_mrd(
:type object_name: str
:param object_name: The name of the object to be read.

:type generation_number: int
:param generation_number: (Optional) If present, selects a specific
:type generation: int
:param generation: (Optional) If present, selects a specific
revision of this object.

:type read_handle: _storage_v2.BidiReadHandle
Expand All @@ -162,7 +163,14 @@ async def create_mrd(
:rtype: :class:`~google.cloud.storage.asyncio.async_multi_range_downloader.AsyncMultiRangeDownloader`
:returns: An initialized AsyncMultiRangeDownloader instance for reading.
"""
mrd = cls(client, bucket_name, object_name, generation_number, read_handle)
mrd = cls(
client,
bucket_name,
object_name,
generation=generation,
read_handle=read_handle,
**kwargs,
)
await mrd.open(retry_policy=retry_policy, metadata=metadata)
return mrd

Expand All @@ -171,8 +179,9 @@ def __init__(
client: AsyncGrpcClient,
bucket_name: str,
object_name: str,
generation_number: Optional[int] = None,
generation: Optional[int] = None,
read_handle: Optional[_storage_v2.BidiReadHandle] = None,
**kwargs,
) -> None:
"""Constructor for AsyncMultiRangeDownloader, clients are not adviced to
use it directly. Instead it's adviced to use the classmethod `create_mrd`.
Expand All @@ -186,20 +195,27 @@ def __init__(
:type object_name: str
:param object_name: The name of the object to be read.

:type generation_number: int
:param generation_number: (Optional) If present, selects a specific revision of
:type generation: int
:param generation: (Optional) If present, selects a specific revision of
this object.

:type read_handle: _storage_v2.BidiReadHandle
:param read_handle: (Optional) An existing read handle.
"""
if "generation_number" in kwargs:
if generation is not None:
raise TypeError(
"Cannot set both 'generation' and 'generation_number'. "
"Use 'generation' for new code."
)
generation = kwargs.pop("generation_number")

raise_if_no_fast_crc32c()

self.client = client
self.bucket_name = bucket_name
self.object_name = object_name
self.generation_number = generation_number
self.generation = generation
self.read_handle: Optional[_storage_v2.BidiReadHandle] = read_handle
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
self._is_stream_open: bool = False
Expand Down Expand Up @@ -276,7 +292,7 @@ async def _do_open():
client=self.client.grpc_client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
generation_number=self.generation,
read_handle=self.read_handle,
)

Expand All @@ -291,7 +307,7 @@ async def _do_open():
)

if self.read_obj_str.generation_number:
self.generation_number = self.read_obj_str.generation_number
self.generation = self.read_obj_str.generation_number
if self.read_obj_str.read_handle:
self.read_handle = self.read_obj_str.read_handle
if self.read_obj_str.persisted_size is not None:
Expand Down Expand Up @@ -435,7 +451,7 @@ async def generator():
client=self.client.grpc_client,
bucket_name=self.bucket_name,
object_name=self.object_name,
generation_number=self.generation_number,
generation_number=self.generation,
read_handle=current_handle,
)

Expand Down
3 changes: 2 additions & 1 deletion tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ async def _run():

event_loop.run_until_complete(_run())

@pytest.mark.skip(reason='Flaky test b/478129078')

@pytest.mark.skip(reason="Flaky test b/478129078")
def test_mrd_open_with_read_handle(event_loop, grpc_client_direct):
object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
from google.cloud._storage_v2.types.storage import BidiReadObjectRedirectedError

_READ_ID = 1
LOGGER_NAME = (
"google.cloud.storage.asyncio.retry.reads_resumption_strategy"
)
LOGGER_NAME = "google.cloud.storage.asyncio.retry.reads_resumption_strategy"


class TestDownloadState(unittest.TestCase):
Expand Down
58 changes: 49 additions & 9 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def _make_mock_mrd(
mock_cls_async_read_object_stream,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
generation=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
):
mock_client = mock.MagicMock()
Expand All @@ -62,7 +62,7 @@ async def _make_mock_mrd(
mock_stream.read_handle = _TEST_READ_HANDLE

mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_client, bucket_name, object_name, generation_number, read_handle
mock_client, bucket_name, object_name, generation, read_handle
)

return mrd, mock_client
Expand All @@ -89,7 +89,7 @@ async def test_create_mrd(self, mock_cls_async_read_object_stream):
assert mrd.client == mock_client
assert mrd.bucket_name == _TEST_BUCKET_NAME
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.generation == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE
assert mrd.persisted_size == _TEST_OBJECT_SIZE
assert mrd.is_stream_open
Expand Down Expand Up @@ -303,9 +303,7 @@ async def test_downloading_without_opening_should_throw_error(self):
assert not mrd.is_stream_open

@mock.patch("google.cloud.storage.asyncio._utils.google_crc32c")
def test_init_raises_if_crc32c_c_extension_is_missing(
self, mock_google_crc32c
):
def test_init_raises_if_crc32c_c_extension_is_missing(self, mock_google_crc32c):
mock_google_crc32c.implementation = "python"
mock_client = mock.MagicMock()

Expand All @@ -317,9 +315,7 @@ def test_init_raises_if_crc32c_c_extension_is_missing(
)

@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage.asyncio.retry.reads_resumption_strategy.Checksum"
)
@mock.patch("google.cloud.storage.asyncio.retry.reads_resumption_strategy.Checksum")
async def test_download_ranges_raises_on_checksum_mismatch(
self, mock_checksum_class
):
Expand Down Expand Up @@ -405,3 +401,47 @@ async def close_side_effect():

mock_close.assert_called_once()
assert not mrd.is_stream_open

@mock.patch(
"google.cloud.storage.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@pytest.mark.asyncio
async def test_create_mrd_with_generation_number(
self, mock_cls_async_read_object_stream
):
# Arrange
mock_client = mock.MagicMock()
mock_client.grpc_client = mock.AsyncMock()

mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = _TEST_GENERATION_NUMBER
mock_stream.persisted_size = _TEST_OBJECT_SIZE
mock_stream.read_handle = _TEST_READ_HANDLE

# Act
mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_client,
_TEST_BUCKET_NAME,
_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
)

# Assert
assert mrd.generation == _TEST_GENERATION_NUMBER

@pytest.mark.asyncio
async def test_create_mrd_with_both_generation_and_generation_number(self):
# Arrange
mock_client = mock.MagicMock()

# Act & Assert
with pytest.raises(TypeError):
await AsyncMultiRangeDownloader.create_mrd(
mock_client,
_TEST_BUCKET_NAME,
_TEST_OBJECT_NAME,
generation=_TEST_GENERATION_NUMBER,
generation_number=_TEST_GENERATION_NUMBER,
)
48 changes: 12 additions & 36 deletions tests/unit/asyncio/test_async_read_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ async def instantiate_read_obj_stream_with_read_handle(
return read_obj_stream


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand Down Expand Up @@ -110,9 +108,7 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
assert read_obj_stream.rpc == rpc_sentinel


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -136,9 +132,7 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):
assert read_obj_stream.is_stream_open


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -162,9 +156,7 @@ async def test_open_with_read_handle(mock_client, mock_cls_async_bidi_rpc):
assert read_obj_stream.is_stream_open


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -185,9 +177,7 @@ async def test_open_when_already_open_should_raise_error(
assert str(exc.value) == "Stream is already open"


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -208,9 +198,7 @@ async def test_close(mock_client, mock_cls_async_bidi_rpc):
assert not read_obj_stream.is_stream_open


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -232,9 +220,7 @@ async def test_requests_done(mock_client, mock_cls_async_bidi_rpc):
read_obj_stream.socket_like_rpc.recv.assert_called_once()


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -255,9 +241,7 @@ async def test_close_without_open_should_raise_error(
assert str(exc.value) == "Stream is not open"


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -278,9 +262,7 @@ async def test_send(mock_client, mock_cls_async_bidi_rpc):
)


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -301,9 +283,7 @@ async def test_send_without_open_should_raise_error(
assert str(exc.value) == "Stream is not open"


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -326,9 +306,7 @@ async def test_recv(mock_client, mock_cls_async_bidi_rpc):
assert response == bidi_read_object_response


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand All @@ -349,9 +327,7 @@ async def test_recv_without_open_should_raise_error(
assert str(exc.value) == "Stream is not open"


@mock.patch(
"google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc"
)
@mock.patch("google.cloud.storage.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch(
"google.cloud.storage.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
Expand Down
Loading