-
Notifications
You must be signed in to change notification settings - Fork 396
feat: Spanner.V1 BatchWrite #15122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Spanner.V1 BatchWrite #15122
Conversation
4c23ade to
f2c5f92
Compare
606bdfd to
d803b83
Compare
apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs
Outdated
Show resolved
Hide resolved
apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs
Outdated
Show resolved
Hide resolved
d803b83 to
a91b1b4
Compare
| /// Executes a BatchWrite RPC asynchronously, returning a stream of responses. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// This operation does not participate in the session's active transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true? Or does this operation requires a session with no active transaction?
Even if this is true, would this operation also work with a session with no active transaction?
The answer to these questions is fundamental to how we implement this, so I'll wait for the answers before continuing the review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to explicitly verify with spanner team, but I've tested this against my own spanner instance (non-emulated):
BatchWritesucceeds when working on a fresh session with no active transaction.BatchWritesucceeds when working on a session on which I've begun a transaction.
Will verify with spanner team since transaction management is opaque to the caller for BatchWrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 2. what happens to that transaction, any changes made within it, etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use that transaction for read-write and received the following. It looks like batch write invalidates the currently active transaction
Error Message: Grpc.Core.RpcException : Status(StatusCode="FailedPre
condition", Detail="This transaction has been invalidated by a later transaction in the same session."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so if using a session with a transaction breaks that transaction, let's not allowed that. BatchWrite should only run on a session with no transaction, that is a pooled session with TransactionMode.None. The way to validate this is to go through ExecuteMaybeWithTransaction and fail on all the callbacks that attempt to either set or extract a transaction from the BatchWrite request/response. Let me know if you want to chat about that. Else, let me know when the code reflects these changes by re-requesting my review. Thanks.
a91b1b4 to
89082c8
Compare
89082c8 to
0ccf88d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left all the comments, but I think we should wait for MUX to implement this, let's chat. I'm marking as do not merge for the time being.
| /// should set this parameter to false.</param> | ||
| /// <param name="cancellationToken">The cancellation token for the operation.</param> | ||
| /// <returns>A task whose result will be the result from having executed <paramref name="command"/>.</returns> | ||
| internal TResponse ExecuteMaybeWithTransactionSelector<TResponse>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a sync version of this method. If the actual exection is syncrhonous, the async method will return synchronously anyway, and here, . Please undo and call the async version instead.
| /// </remarks> | ||
| /// <param name="request">The batch write request. Must not be null.</param> | ||
| /// <param name="callSettings">If not null, applies overrides to this RPC call.</param> | ||
| /// <returns>An asynchronous stream of <see cref="BatchWriteResponse"/> messages.</returns> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// <returns>An asynchronous stream of <see cref="BatchWriteResponse"/> messages.</returns> | |
| /// <returns>An asynchronous stream of <see cref="BatchWriteResponse"/> objects.</returns> |
| CheckNotDisposed(); | ||
| GaxPreconditions.CheckNotNull(request, nameof(request)); | ||
| // Fail early to prevent the creation of an unused transaction on the session. | ||
| GaxPreconditions.CheckState(TransactionOptions.ModeCase == ModeOneofCase.None, $"{nameof(BatchWriteAsync)} can only be used when the session is configured to use no transaction."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, the ExecuteMaybeWithTransactionSelector will fail early enough (i.e. without making any RPC) if we configure the right callbacks. See below.
| } | ||
|
|
||
| // Batch write gRPC manages the transaction itself, if we are trying to explicitly set a transaction or inline, this is a bug. | ||
| void SetCommandTransaction(TransactionSelector transactionSelector) => throw new InvalidOperationException($"{nameof(BatchWriteAsync)} must be executed with no transaction. If you see this there is a bug in the code."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is sufficient to fail early. This would be our validation point, BatchWrite has not caused any RPCs to be executed at this point.
Message, or similar:
"BatchWrite does not support explicitly set transactions, it won't be executed within an active transaction. The sessions most be acquired with nameof(ModeOneOfCase.TransactionNone) or BatchWrite should complete execution before any other commands that would create a transaction are executed."
|
|
||
| // Batch write gRPC manages the transaction itself, if we are trying to explicitly set a transaction or inline, this is a bug. | ||
| void SetCommandTransaction(TransactionSelector transactionSelector) => throw new InvalidOperationException($"{nameof(BatchWriteAsync)} must be executed with no transaction. If you see this there is a bug in the code."); | ||
| Transaction GetInlinedTransaction(IAsyncEnumerable<BatchWriteResponse> response) => throw new InvalidOperationException($"{nameof(BatchWriteAsync)} cannot be executed with an inlined transaction. If you see this there is a bug in the code."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this delegate may be set to null on the paremeter call. That's a signal to the "ExecuteMaybeWithTransactionAsync" that this method does not support inlining. Because we are also asking it to skip transaction creation, this will never be called.
SetCommandTransaction will be called is some other method created a transaction and that's when we want to fail.
| @@ -876,6 +944,22 @@ private async Task RecordSuccessAndExpiredSessions(Task task) | |||
| UpdateRefreshTime(); | |||
| } | |||
|
|
|||
| private async IAsyncEnumerable<TResponse> StreamResponsesAndRecordSuccessAsync<TResponse>(AsyncResponseStream<TResponse> responseStream) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we want to do this on V1. AsyncResponseStream is a wrapper that works for both pre and post C# 8 users. And we don't want to loose that just so we can keep better track of the refresh time/expired session.
I think we should not wrap the response, and then, to make certain that we don't return a bad session to the pool, after the succesful call to BatchWrite, before returning the AsyncResponseStream, I'd update the refresh time to now, to make certain the session will be refreshed before releasing.
But, wrting about all of this, and knowing that we are about to release MUX, I think we should pause. Let's chat.
User need access to the
BatchWriteAPI through the Spanner SDK. This is important becauseBatchWriteallows users to submit a set of bulk mutations in batch for efficient processing. To address this, We introduce aBatchWriteAsyncmethod inPooledSessionexposing the API in Spanner.V1.Details
BatchWriteAsyncmethod avoids transaction management.SpannerClient.BatchWrite,AsyncStream<>, is a sealed class (i.e. cannot be mocked) with no public constructor (i.e. cannot be explicitly created in our tests). We could introduce wrappers to allow for mocking, but that adds a lot of abstraction layers I'm not sure we want.Non Goals