Skip to content

Commit f84a02a

Browse files
oskardudyczw1am
authored andcommitted
[DEVEX-250] Reshaped the API to handle regular and wrapped messages, together with explicit Message type
1 parent 7cfe053 commit f84a02a

File tree

4 files changed

+196
-41
lines changed

4 files changed

+196
-41
lines changed

db-client-java/src/main/java/io/kurrent/dbclient/AppendToStream.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717
class AppendToStream {
1818
private final GrpcClient client;
1919
private final String streamName;
20+
private final StreamState streamState;
2021
private final List<MessageData> events;
2122
private final AppendToStreamOptions options;
2223

23-
public AppendToStream(GrpcClient client, String streamName, Iterator<MessageData> events, AppendToStreamOptions options) {
24+
public AppendToStream(
25+
GrpcClient client,
26+
String streamName,
27+
StreamState streamState,
28+
Iterator<MessageData> events,
29+
AppendToStreamOptions options
30+
) {
2431
this.client = client;
2532
this.streamName = streamName;
33+
this.streamState = streamState;
2634
this.events = new ArrayList<>();
2735
while (events.hasNext()) {
2836
this.events.add(events.next());
@@ -42,7 +50,7 @@ public CompletableFuture<WriteResult> execute() {
4250

4351
private CompletableFuture<WriteResult> append(ManagedChannel channel, List<MessageData> events) {
4452
CompletableFuture<WriteResult> result = new CompletableFuture<>();
45-
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
53+
StreamsOuterClass.AppendReq.Options.Builder options = this.streamState.applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
4654
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
4755
.setStreamName(ByteString.copyFromUtf8(streamName))
4856
.build()));
@@ -117,7 +125,7 @@ private CompletableFuture<WriteResult> append(ManagedChannel channel, List<Messa
117125
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
118126

119127
if (leaderHost != null && leaderPort != null) {
120-
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
128+
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.parseInt(leaderPort));
121129
result.completeExceptionally(reason);
122130
} else {
123131
result.completeExceptionally(e);

db-client-java/src/main/java/io/kurrent/dbclient/KurrentDBClient.java

Lines changed: 169 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.*;
1010
import java.util.concurrent.CompletableFuture;
1111
import java.util.stream.Collectors;
12-
import java.util.stream.Stream;
1312
import java.util.stream.StreamSupport;
1413

1514
import static io.kurrent.dbclient.serialization.MessageTypeNamingResolutionContext.fromStreamName;
@@ -33,8 +32,9 @@ public static KurrentDBClient create(KurrentDBClientSettings settings) {
3332
/**
3433
* Appends events to a given stream.
3534
*
35+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
3636
* @param streamName stream's name.
37-
* @param events events to send.
37+
* @param events events to store.
3838
* @return a write result if successful.
3939
* @see WriteResult
4040
*/
@@ -45,8 +45,9 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, EventDat
4545
/**
4646
* Appends events to a given stream.
4747
*
48+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
4849
* @param streamName stream's name.
49-
* @param events events to send.
50+
* @param events events to store.
5051
* @return a write result if successful.
5152
* @see WriteResult
5253
*/
@@ -57,22 +58,24 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, Iterator
5758
/**
5859
* Appends events to a given stream.
5960
*
61+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
6062
* @param streamName stream's name.
6163
* @param options append stream request's options.
62-
* @param events events to send.
64+
* @param events events to store.
6365
* @return a write result if successful.
6466
* @see WriteResult
6567
*/
6668
public CompletableFuture<WriteResult> appendToStream(String streamName, AppendToStreamOptions options, EventData... events) {
6769
return this.appendToStream(streamName, options, Arrays.stream(events).iterator());
6870
}
69-
71+
7072
/**
7173
* Appends events to a given stream.
7274
*
75+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
7376
* @param streamName stream's name.
7477
* @param options append stream request's options.
75-
* @param events events to send.
78+
* @param events events to store.
7679
* @return a write result if successful.
7780
* @see WriteResult
7881
*/
@@ -85,73 +88,209 @@ public CompletableFuture<WriteResult> appendToStream(String streamName, AppendTo
8588
.map(EventData::toMessageData)
8689
.iterator();
8790

88-
return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute();
91+
return new AppendToStream(this.getGrpcClient(), streamName, options.getStreamState(), messageData, options).execute();
8992
}
9093

9194
/**
9295
* Appends messages to a given stream.
9396
*
94-
* @param streamName stream's name.
95-
* @param messages messages to send.
97+
* @param streamName stream's name.
98+
* @param streamState expected stream's state in the database
99+
* @param messages messages to store.
96100
* @return a write result if successful.
97101
* @see WriteResult
98102
*/
99-
public CompletableFuture<WriteResult> appendToStream(String streamName, List<Message> messages) {
100-
// TODO: Sort type erasure issue with Iterator
101-
return this.appendToStream(streamName, AppendToStreamOptions.get(), messages);
103+
public CompletableFuture<WriteResult> appendToStream(
104+
String streamName,
105+
StreamState streamState,
106+
Object... messages
107+
) {
108+
return this.appendToStream(
109+
streamName,
110+
streamState,
111+
Arrays.stream(messages).collect(Collectors.toList()),
112+
AppendToStreamOptions.get()
113+
);
102114
}
103115

116+
104117
/**
105118
* Appends messages to a given stream.
106119
*
107-
* @param streamName stream's name.
108-
* @param options append stream request's options.
109-
* @param messages messages to send.
120+
* @param streamName stream's name.
121+
* @param streamState expected stream's state in the database
122+
* @param messages messages to store.
110123
* @return a write result if successful.
111124
* @see WriteResult
112125
*/
113-
public CompletableFuture<WriteResult> appendToStream(String streamName, AppendToStreamOptions options, Message... messages) {
114-
return this.appendToStream(streamName, options, Stream.of(messages).collect(Collectors.toList()));
126+
public CompletableFuture<WriteResult> appendToStream(
127+
String streamName,
128+
StreamState streamState,
129+
Message... messages
130+
) {
131+
List<Message> toAppend = Arrays.stream(messages).collect(Collectors.toList());
132+
return this.appendToStream(
133+
streamName,
134+
streamState,
135+
toAppend,
136+
AppendToStreamOptions.get()
137+
);
115138
}
116139

117140
/**
118141
* Appends messages to a given stream.
119142
*
120-
* @param streamName stream's name.
121-
* @param options append stream request's options.
122-
* @param messages messages to send.
143+
* @param streamName stream's name.
144+
* @param streamState expected stream's state in the database
145+
* @param messages messages to store.
123146
* @return a write result if successful.
124147
* @see WriteResult
125148
*/
126-
public CompletableFuture<WriteResult> appendToStream(String streamName, AppendToStreamOptions options, Object... messages) {
127-
return this.appendToStream(streamName, options, Stream.of(messages).map(Message::from).collect(Collectors.toList()));
149+
public CompletableFuture<WriteResult> appendToStream(
150+
String streamName,
151+
StreamState streamState,
152+
Iterable<Object> messages
153+
) {
154+
return this.appendToStream(
155+
streamName,
156+
streamState,
157+
messages,
158+
AppendToStreamOptions.get()
159+
);
128160
}
129161

162+
130163
/**
131164
* Appends messages to a given stream.
132165
*
133-
* @param streamName stream's name.
134-
* @param options append stream request's options.
135-
* @param messages messages to send.
166+
* @param streamName stream's name.
167+
* @param streamState expected stream's state in the database
168+
* @param messages messages to store.
169+
* @param options append stream request's options.
170+
* @return a write result if successful.
171+
* @see WriteResult
172+
*/
173+
public CompletableFuture<WriteResult> appendToStream(
174+
String streamName,
175+
StreamState streamState,
176+
Object[] messages,
177+
AppendToStreamOptions options
178+
) {
179+
List<Message> toAppend =
180+
Arrays.stream(messages)
181+
.map(Message::from)
182+
.collect(Collectors.toList());
183+
184+
return this.appendToStream(
185+
streamName,
186+
streamState,
187+
toAppend,
188+
options
189+
);
190+
}
191+
192+
/**
193+
* Appends messages to a given stream.
194+
*
195+
* @param streamName stream's name.
196+
* @param streamState expected stream's state in the database
197+
* @param messages messages to store.
198+
* @param options append stream request's options.
136199
* @return a write result if successful.
137200
* @see WriteResult
138201
*/
139-
public CompletableFuture<WriteResult> appendToStream(String streamName, AppendToStreamOptions options, List<Message> messages) {
140-
// TODO: Sort type erasure issue with Iterator
202+
public CompletableFuture<WriteResult> appendToStream(
203+
String streamName,
204+
StreamState streamState,
205+
Iterable<Object> messages,
206+
AppendToStreamOptions options
207+
) {
208+
List<Message> toAppend =
209+
StreamSupport.stream(messages.spliterator(), false)
210+
.map(Message::from)
211+
.collect(Collectors.toList());
212+
213+
return this.appendToStream(
214+
streamName,
215+
streamState,
216+
toAppend,
217+
options
218+
);
219+
}
220+
221+
/**
222+
* Appends messages to a given stream.
223+
*
224+
* @param streamName stream's name.
225+
* @param streamState expected stream's state in the database
226+
* @param messages messages to store.
227+
* @return a write result if successful.
228+
* @see WriteResult
229+
*/
230+
public CompletableFuture<WriteResult> appendToStream(
231+
String streamName,
232+
StreamState streamState,
233+
List<Message> messages
234+
) {
235+
return this.appendToStream(streamName, streamState, messages, AppendToStreamOptions.get());
236+
}
237+
238+
/**
239+
* Appends messages to a given stream.
240+
*
241+
* @param streamName stream's name.
242+
* @param streamState expected stream's state in the database.
243+
* @param messages messages to store.
244+
* @param options append stream request's options.
245+
* @return a write result if successful.
246+
* @see WriteResult
247+
*/
248+
public CompletableFuture<WriteResult> appendToStream(
249+
String streamName,
250+
StreamState streamState,
251+
Message[] messages,
252+
AppendToStreamOptions options
253+
) {
254+
List<Message> toAppend = Arrays.stream(messages).collect(Collectors.toList());
255+
256+
return this.appendToStream(
257+
streamName,
258+
streamState,
259+
toAppend,
260+
options
261+
);
262+
}
263+
264+
/**
265+
* Appends messages to a given stream.
266+
*
267+
* @param streamName stream's name.
268+
* @param streamState expected stream's state in the database.
269+
* @param messages messages to store.
270+
* @param options append stream request's options.
271+
* @return a write result if successful.
272+
* @see WriteResult
273+
*/
274+
public CompletableFuture<WriteResult> appendToStream(
275+
String streamName,
276+
StreamState streamState,
277+
List<Message> messages,
278+
AppendToStreamOptions options
279+
) {
141280
if (options == null)
142281
options = AppendToStreamOptions.get();
143282

144-
MessageSerializationContext serializationContext =
283+
MessageSerializationContext serializationContext =
145284
new MessageSerializationContext(fromStreamName(streamName));
146285

147286
MessageSerializer serializer = getGrpcClient()
148287
.getSerializer(options.serializationSettings().orElse(null));
149-
288+
150289
Iterator<MessageData> messageData = serializer
151290
.serialize(messages, serializationContext)
152291
.iterator();
153292

154-
return new AppendToStream(this.getGrpcClient(), streamName, messageData, options).execute();
293+
return new AppendToStream(this.getGrpcClient(), streamName, streamState, messageData, options).execute();
155294
}
156295

157296
/**

db-client-java/src/main/java/io/kurrent/dbclient/OptionsWithStreamStateBase.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,32 @@
11
package io.kurrent.dbclient;
22

3+
/**
4+
* @deprecated This class may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter
5+
*/
6+
@Deprecated
37
class OptionsWithStreamStateBase<T> extends OptionsBase<T> {
48
private StreamState streamState;
59

610
protected OptionsWithStreamStateBase() {
711
this.streamState = StreamState.any();
812
}
913

14+
/**
15+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
16+
*/
17+
@Deprecated
1018
StreamState getStreamState() {
1119
return this.streamState;
1220
}
1321

1422
/**
1523
* Asks the server to check that the stream receiving is at the expected state.
16-
24+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
1725
* @param state - expected revision.
1826
* @return updated options.
1927
*/
2028
@SuppressWarnings("unchecked")
29+
@Deprecated
2130
public T streamState(StreamState state) {
2231
this.streamState = state;
2332
return (T) this;
@@ -26,10 +35,11 @@ public T streamState(StreamState state) {
2635

2736
/**
2837
* Asks the server to check that the stream receiving is at the given expected revision.
29-
38+
* @deprecated This method may be removed in future releases. Prefer using appendToStream method with explicit stream state parameter.
3039
* @param revision - expected revision.
3140
* @return updated options.
3241
*/
42+
@Deprecated
3343
public T streamRevision(long revision) {
3444
return streamState(StreamState.streamRevision(revision));
3545
}

0 commit comments

Comments
 (0)