From 1429c4c175aaf4c9af7bcfcfbb42f03db43bf467 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Fri, 30 Jan 2026 08:34:12 +0000 Subject: [PATCH 1/4] fix: receive eof while closing reads stream --- .../asyncio/async_write_object_stream.py | 21 +++++++++- .../asyncio/test_async_write_object_stream.py | 41 +++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index 721183962..c814e8c15 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -181,9 +181,26 @@ async def close(self) -> None: async def requests_done(self): """Signals that all requests have been sent.""" - await self.socket_like_rpc.send(None) - _utils.update_write_handle_if_exists(self, await self.socket_like_rpc.recv()) + + # The server may send a final "EOF" response immediately, or it may + # first send an intermediate response followed by the EOF response depending on whether the object was finalized or not. + first_resp = await self.socket_like_rpc.recv() + + is_eof = ( + first_resp is None + or ( + getattr(first_resp, "persisted_size", None) is None + ) + ) + _utils.update_write_handle_if_exists(self, first_resp) + + if not is_eof: + self.persisted_size = first_resp.persisted_size + second_resp = await self.socket_like_rpc.recv() + + if second_resp is not None: + _utils.update_write_handle_if_exists(self, second_resp) async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 77e2ef091..91b7a57ca 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -209,3 +209,44 @@ async def test_methods_require_open_raises(self, mock_client): await stream.recv() with pytest.raises(ValueError, match="Stream is not open"): await stream.close() + + @pytest.mark.asyncio + async def test_close_with_persisted_size_then_eof(self, mock_client): + """Test close when first recv has persisted_size, second is EOF.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + stream._is_stream_open = True + stream.socket_like_rpc = AsyncMock() + + # First response has persisted_size (NOT EOF, intermediate) + persisted_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=500) + # Second response is EOF (None) + eof_resp = None + + stream.socket_like_rpc.send = AsyncMock() + stream.socket_like_rpc.recv = AsyncMock(side_effect=[persisted_resp, eof_resp]) + stream.socket_like_rpc.close = AsyncMock() + + await stream.close() + + # Verify two recv calls: first has persisted_size (NOT EOF), so read second (EOF) + assert stream.socket_like_rpc.recv.await_count == 2 + assert stream.persisted_size == 500 + assert not stream.is_stream_open + + @pytest.mark.asyncio + async def test_close_with_eof_response(self, mock_client): + """Test close when first recv is EOF or None.""" + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + stream._is_stream_open = True + stream.socket_like_rpc = AsyncMock() + + # First recv returns None (gRPC EOF, stream is already closed) + stream.socket_like_rpc.send = AsyncMock() + stream.socket_like_rpc.recv = AsyncMock(return_value=None) + stream.socket_like_rpc.close = AsyncMock() + + await stream.close() + + # Verify only one recv call (None=EOF, so don't read second) + assert stream.socket_like_rpc.recv.await_count == 1 + assert not stream.is_stream_open From 2e88a02d10319456d24c380427fc2336c0ea5a6a Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Fri, 30 Jan 2026 10:32:53 +0000 Subject: [PATCH 2/4] add check for EOF --- .../asyncio/async_write_object_stream.py | 3 ++- .../asyncio/test_async_write_object_stream.py | 24 +++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index c814e8c15..96cbef624 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -13,6 +13,7 @@ # limitations under the License. from typing import List, Optional, Tuple +import grpc from google.cloud import _storage_v2 from google.cloud.storage.asyncio import _utils from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient @@ -188,7 +189,7 @@ async def requests_done(self): first_resp = await self.socket_like_rpc.recv() is_eof = ( - first_resp is None + first_resp is None or first_resp == grpc.aio.EOF or ( getattr(first_resp, "persisted_size", None) is None ) diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 91b7a57ca..ba85f1bd2 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -234,8 +234,8 @@ async def test_close_with_persisted_size_then_eof(self, mock_client): assert not stream.is_stream_open @pytest.mark.asyncio - async def test_close_with_eof_response(self, mock_client): - """Test close when first recv is EOF or None.""" + async def test_close_with_none_response(self, mock_client): + """Test close when first recv is None.""" stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) stream._is_stream_open = True stream.socket_like_rpc = AsyncMock() @@ -250,3 +250,23 @@ async def test_close_with_eof_response(self, mock_client): # Verify only one recv call (None=EOF, so don't read second) assert stream.socket_like_rpc.recv.await_count == 1 assert not stream.is_stream_open + + @pytest.mark.asyncio + async def test_close_with_grpc_aio_eof_response(self, mock_client): + """Test close when first recv is grpc.aio.EOF sentinel.""" + import grpc + + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + stream._is_stream_open = True + stream.socket_like_rpc = AsyncMock() + + # First recv returns grpc.aio.EOF (explicit sentinel from finalize) + stream.socket_like_rpc.send = AsyncMock() + stream.socket_like_rpc.recv = AsyncMock(return_value=grpc.aio.EOF) + stream.socket_like_rpc.close = AsyncMock() + + await stream.close() + + # Verify only one recv call (grpc.aio.EOF=EOF, so don't read second) + assert stream.socket_like_rpc.recv.await_count == 1 + assert not stream.is_stream_open From 2d1243299dbe413d90e8b3cc386879957359d7f7 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Fri, 30 Jan 2026 10:47:44 +0000 Subject: [PATCH 3/4] simplify requests_done --- .../asyncio/async_write_object_stream.py | 13 ++--------- .../asyncio/test_async_write_object_stream.py | 22 +++---------------- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index 96cbef624..305e9b301 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -187,21 +187,12 @@ async def requests_done(self): # The server may send a final "EOF" response immediately, or it may # first send an intermediate response followed by the EOF response depending on whether the object was finalized or not. first_resp = await self.socket_like_rpc.recv() - - is_eof = ( - first_resp is None or first_resp == grpc.aio.EOF - or ( - getattr(first_resp, "persisted_size", None) is None - ) - ) _utils.update_write_handle_if_exists(self, first_resp) - if not is_eof: + if first_resp != grpc.aio.EOF: self.persisted_size = first_resp.persisted_size second_resp = await self.socket_like_rpc.recv() - - if second_resp is not None: - _utils.update_write_handle_if_exists(self, second_resp) + _utils.update_write_handle_if_exists(self, second_resp) async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index ba85f1bd2..153c550a2 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -213,6 +213,8 @@ async def test_methods_require_open_raises(self, mock_client): @pytest.mark.asyncio async def test_close_with_persisted_size_then_eof(self, mock_client): """Test close when first recv has persisted_size, second is EOF.""" + import grpc + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) stream._is_stream_open = True stream.socket_like_rpc = AsyncMock() @@ -220,7 +222,7 @@ async def test_close_with_persisted_size_then_eof(self, mock_client): # First response has persisted_size (NOT EOF, intermediate) persisted_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=500) # Second response is EOF (None) - eof_resp = None + eof_resp = grpc.aio.EOF stream.socket_like_rpc.send = AsyncMock() stream.socket_like_rpc.recv = AsyncMock(side_effect=[persisted_resp, eof_resp]) @@ -233,24 +235,6 @@ async def test_close_with_persisted_size_then_eof(self, mock_client): assert stream.persisted_size == 500 assert not stream.is_stream_open - @pytest.mark.asyncio - async def test_close_with_none_response(self, mock_client): - """Test close when first recv is None.""" - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - stream._is_stream_open = True - stream.socket_like_rpc = AsyncMock() - - # First recv returns None (gRPC EOF, stream is already closed) - stream.socket_like_rpc.send = AsyncMock() - stream.socket_like_rpc.recv = AsyncMock(return_value=None) - stream.socket_like_rpc.close = AsyncMock() - - await stream.close() - - # Verify only one recv call (None=EOF, so don't read second) - assert stream.socket_like_rpc.recv.await_count == 1 - assert not stream.is_stream_open - @pytest.mark.asyncio async def test_close_with_grpc_aio_eof_response(self, mock_client): """Test close when first recv is grpc.aio.EOF sentinel.""" From a7438a9b0bd8c0fad8a3ae8b89b92ece23a022c5 Mon Sep 17 00:00:00 2001 From: Pulkit Aggarwal Date: Fri, 30 Jan 2026 16:02:10 +0000 Subject: [PATCH 4/4] resolve comments --- .../asyncio/async_write_object_stream.py | 2 +- .../asyncio/test_async_write_object_stream.py | 31 ++++++++++--------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/google/cloud/storage/asyncio/async_write_object_stream.py b/google/cloud/storage/asyncio/async_write_object_stream.py index 305e9b301..319f394dd 100644 --- a/google/cloud/storage/asyncio/async_write_object_stream.py +++ b/google/cloud/storage/asyncio/async_write_object_stream.py @@ -192,7 +192,7 @@ async def requests_done(self): if first_resp != grpc.aio.EOF: self.persisted_size = first_resp.persisted_size second_resp = await self.socket_like_rpc.recv() - _utils.update_write_handle_if_exists(self, second_resp) + assert second_resp == grpc.aio.EOF async def send( self, bidi_write_object_request: _storage_v2.BidiWriteObjectRequest diff --git a/tests/unit/asyncio/test_async_write_object_stream.py b/tests/unit/asyncio/test_async_write_object_stream.py index 153c550a2..4e952336b 100644 --- a/tests/unit/asyncio/test_async_write_object_stream.py +++ b/tests/unit/asyncio/test_async_write_object_stream.py @@ -15,6 +15,8 @@ import unittest.mock as mock from unittest.mock import AsyncMock, MagicMock import pytest +import grpc + from google.cloud.storage.asyncio.async_write_object_stream import ( _AsyncWriteObjectStream, @@ -194,27 +196,20 @@ async def test_close_success(self, mock_client): stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) stream._is_stream_open = True stream.socket_like_rpc = AsyncMock() + + stream.socket_like_rpc.send = AsyncMock() + first_resp = _storage_v2.BidiWriteObjectResponse(persisted_size=100) + stream.socket_like_rpc.recv = AsyncMock(side_effect=[first_resp, grpc.aio.EOF]) stream.socket_like_rpc.close = AsyncMock() await stream.close() stream.socket_like_rpc.close.assert_awaited_once() assert not stream.is_stream_open - - @pytest.mark.asyncio - async def test_methods_require_open_raises(self, mock_client): - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) - with pytest.raises(ValueError, match="Stream is not open"): - await stream.send(MagicMock()) - with pytest.raises(ValueError, match="Stream is not open"): - await stream.recv() - with pytest.raises(ValueError, match="Stream is not open"): - await stream.close() + assert stream.persisted_size == 100 @pytest.mark.asyncio async def test_close_with_persisted_size_then_eof(self, mock_client): """Test close when first recv has persisted_size, second is EOF.""" - import grpc - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) stream._is_stream_open = True stream.socket_like_rpc = AsyncMock() @@ -238,8 +233,6 @@ async def test_close_with_persisted_size_then_eof(self, mock_client): @pytest.mark.asyncio async def test_close_with_grpc_aio_eof_response(self, mock_client): """Test close when first recv is grpc.aio.EOF sentinel.""" - import grpc - stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) stream._is_stream_open = True stream.socket_like_rpc = AsyncMock() @@ -254,3 +247,13 @@ async def test_close_with_grpc_aio_eof_response(self, mock_client): # Verify only one recv call (grpc.aio.EOF=EOF, so don't read second) assert stream.socket_like_rpc.recv.await_count == 1 assert not stream.is_stream_open + + @pytest.mark.asyncio + async def test_methods_require_open_raises(self, mock_client): + stream = _AsyncWriteObjectStream(mock_client, BUCKET, OBJECT) + with pytest.raises(ValueError, match="Stream is not open"): + await stream.send(MagicMock()) + with pytest.raises(ValueError, match="Stream is not open"): + await stream.recv() + with pytest.raises(ValueError, match="Stream is not open"): + await stream.close()