-
Notifications
You must be signed in to change notification settings - Fork 614
✨ [client-v2] Statement parameters sent via multipart data request. #2693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -50,6 +50,8 @@ | |||||||||||||||
| import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; | ||||||||||||||||
| import org.apache.hc.core5.http.io.SocketConfig; | ||||||||||||||||
| import org.apache.hc.core5.http.io.entity.EntityTemplate; | ||||||||||||||||
| import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; | ||||||||||||||||
| import org.apache.hc.client5.http.entity.mime.ContentBody; | ||||||||||||||||
| import org.apache.hc.core5.http.protocol.HttpContext; | ||||||||||||||||
| import org.apache.hc.core5.io.CloseMode; | ||||||||||||||||
| import org.apache.hc.core5.io.IOCallback; | ||||||||||||||||
|
|
@@ -410,7 +412,7 @@ | |||||||||||||||
| private static final long POOL_VENT_TIMEOUT = 10000L; | ||||||||||||||||
| private final AtomicLong timeToPoolVent = new AtomicLong(0); | ||||||||||||||||
|
|
||||||||||||||||
| public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> requestConfig, LZ4Factory lz4Factory, | ||||||||||||||||
|
Check failure on line 415 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java
|
||||||||||||||||
| IOCallback<OutputStream> writeCallback) throws Exception { | ||||||||||||||||
| if (poolControl != null && timeToPoolVent.get() < System.currentTimeMillis()) { | ||||||||||||||||
| timeToPoolVent.set(System.currentTimeMillis() + POOL_VENT_TIMEOUT); | ||||||||||||||||
|
|
@@ -432,12 +434,54 @@ | |||||||||||||||
| // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding | ||||||||||||||||
| addHeaders(req, requestConfig); | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| // setting entity. wrapping if compression is enabled | ||||||||||||||||
| String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; | ||||||||||||||||
| req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), | ||||||||||||||||
| lz4Factory, | ||||||||||||||||
| requestConfig)); | ||||||||||||||||
| // Check if statement params exist - if so, send as multipart | ||||||||||||||||
| boolean hasStatementParams = requestConfig.containsKey(KEY_STATEMENT_PARAMS); | ||||||||||||||||
| Map<?, ?> statementParams = hasStatementParams ? | ||||||||||||||||
| (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS) : null; | ||||||||||||||||
|
|
||||||||||||||||
| HttpEntity requestEntity; | ||||||||||||||||
| String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? | ||||||||||||||||
| req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; | ||||||||||||||||
|
|
||||||||||||||||
| if (hasStatementParams && statementParams != null && !statementParams.isEmpty()) { | ||||||||||||||||
| // Create multipart entity with query body and statement params | ||||||||||||||||
| MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create(); | ||||||||||||||||
|
|
||||||||||||||||
| // Add query/body data as streaming binary body | ||||||||||||||||
| if (writeCallback != null) { | ||||||||||||||||
| // Use streaming ContentBody to avoid buffering large queries in memory | ||||||||||||||||
| StreamingContentBody queryBody = new StreamingContentBody(writeCallback, CONTENT_TYPE); | ||||||||||||||||
| multipartBuilder.addPart("query", queryBody); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Add statement params as multipart parts | ||||||||||||||||
| for (Map.Entry<?, ?> entry : statementParams.entrySet()) { | ||||||||||||||||
| String paramName = "param_" + entry.getKey().toString(); | ||||||||||||||||
| String paramValue = String.valueOf(entry.getValue()); | ||||||||||||||||
| multipartBuilder.addTextBody(paramName, paramValue); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| requestEntity = multipartBuilder.build(); | ||||||||||||||||
| // Update content type header for multipart | ||||||||||||||||
| // Note: Content-Encoding is a separate header and will be preserved | ||||||||||||||||
| // The multipart entity's Content-Type includes the boundary which is required | ||||||||||||||||
| req.setHeader(HttpHeaders.CONTENT_TYPE, requestEntity.getContentType()); | ||||||||||||||||
| } else { | ||||||||||||||||
| // No statement params - use regular entity template | ||||||||||||||||
| requestEntity = new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| // Wrap entity with compression if enabled | ||||||||||||||||
| // This will set Content-Encoding header if compression is enabled | ||||||||||||||||
| HttpEntity wrappedEntity = wrapRequestEntity(requestEntity, lz4Factory, requestConfig); | ||||||||||||||||
|
|
||||||||||||||||
| // Ensure Content-Encoding header is set on request if entity has it | ||||||||||||||||
| // This preserves compression settings even after setting multipart Content-Type | ||||||||||||||||
| if (wrappedEntity.getContentEncoding() != null) { | ||||||||||||||||
| req.setHeader(HttpHeaders.CONTENT_ENCODING, wrappedEntity.getContentEncoding()); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| req.setEntity(wrappedEntity); | ||||||||||||||||
|
|
||||||||||||||||
| HttpClientContext context = HttpClientContext.create(); | ||||||||||||||||
| Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); | ||||||||||||||||
|
|
@@ -595,10 +639,7 @@ | |||||||||||||||
| if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { | ||||||||||||||||
| req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); | ||||||||||||||||
| } | ||||||||||||||||
| if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { | ||||||||||||||||
| Map<?, ?> params = (Map<?, ?>) requestConfig.get(KEY_STATEMENT_PARAMS); | ||||||||||||||||
| params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); | ||||||||||||||||
| } | ||||||||||||||||
| // Statement params are now sent as multipart, not query params | ||||||||||||||||
|
|
||||||||||||||||
| boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); | ||||||||||||||||
| boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); | ||||||||||||||||
|
|
@@ -912,4 +953,92 @@ | |||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| /** | ||||||||||||||||
| * Streaming ContentBody implementation that writes data from an IOCallback without buffering. | ||||||||||||||||
| * This avoids OutOfMemoryError for large queries by streaming data directly to the output stream. | ||||||||||||||||
| */ | ||||||||||||||||
| private static class StreamingContentBody implements ContentBody { | ||||||||||||||||
| private final IOCallback<OutputStream> writeCallback; | ||||||||||||||||
| private final ContentType contentType; | ||||||||||||||||
|
|
||||||||||||||||
| StreamingContentBody(IOCallback<OutputStream> writeCallback, ContentType contentType) { | ||||||||||||||||
| this.writeCallback = writeCallback; | ||||||||||||||||
| this.contentType = contentType; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public String getFilename() { | ||||||||||||||||
| return null; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public String getMimeType() { | ||||||||||||||||
| return contentType != null ? contentType.getMimeType() : null; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public String getCharset() { | ||||||||||||||||
| return contentType != null && contentType.getCharset() != null ? | ||||||||||||||||
| contentType.getCharset().name() : null; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public long getContentLength() { | ||||||||||||||||
| return -1; // Unknown length - streaming | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public void writeTo(OutputStream outStream) throws IOException { | ||||||||||||||||
| // Wrap the stream to prevent premature closure | ||||||||||||||||
| // Multipart entities may call writeTo() multiple times, so we must not close the stream | ||||||||||||||||
| OutputStream nonClosingStream = new OutputStream() { | ||||||||||||||||
| @Override | ||||||||||||||||
| public void write(int b) throws IOException { | ||||||||||||||||
| outStream.write(b); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public void write(byte[] b) throws IOException { | ||||||||||||||||
| outStream.write(b); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public void write(byte[] b, int off, int len) throws IOException { | ||||||||||||||||
| outStream.write(b, off, len); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public void flush() throws IOException { | ||||||||||||||||
| outStream.flush(); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public void close() throws IOException { | ||||||||||||||||
| // Do not close - let the caller (multipart entity) handle stream closure | ||||||||||||||||
| // This prevents "Stream already closed" errors when writeTo() is called multiple times | ||||||||||||||||
| flush(); | ||||||||||||||||
| } | ||||||||||||||||
| }; | ||||||||||||||||
| writeCallback.execute(nonClosingStream); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public String getMediaType() { | ||||||||||||||||
| String mimeType = getMimeType(); | ||||||||||||||||
| if (mimeType != null && mimeType.contains("/")) { | ||||||||||||||||
| return mimeType.substring(0, mimeType.indexOf('/')); | ||||||||||||||||
|
Comment on lines
+1029
to
+1030
|
||||||||||||||||
| if (mimeType != null && mimeType.contains("/")) { | |
| return mimeType.substring(0, mimeType.indexOf('/')); | |
| if (mimeType != null) { | |
| int idx = mimeType.indexOf('/'); | |
| if (idx >= 0) { | |
| return mimeType.substring(0, idx); | |
| } |
Copilot
AI
Dec 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getSubType() method uses contains('/') check before indexOf('/'), but both checks could fail for the same input. If contains('/') returns true, indexOf('/') is guaranteed to return a valid index >= 0, but using indexOf directly would be more efficient. Consider checking indexOf('/') >= 0 instead to avoid two string traversals.
| if (mimeType != null && mimeType.contains("/")) { | |
| return mimeType.substring(mimeType.indexOf('/') + 1); | |
| if (mimeType != null) { | |
| int idx = mimeType.indexOf('/'); | |
| if (idx >= 0) { | |
| return mimeType.substring(idx + 1); | |
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,7 +43,9 @@ | |
| import java.util.Arrays; | ||
| import java.util.Base64; | ||
| import java.util.EnumSet; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Random; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
@@ -1102,6 +1104,55 @@ public void testTimeoutsWithRetry() { | |
| } | ||
| } | ||
|
|
||
| @Test(groups = {"integration"}) | ||
| public void testStatementParamsSentAsMultipart() throws Exception { | ||
| if (isCloud()) { | ||
| return; // mocked server | ||
| } | ||
|
|
||
| WireMockServer mockServer = new WireMockServer(WireMockConfiguration | ||
| .options().dynamicPort().notifier(new ConsoleNotifier(false))); | ||
| mockServer.start(); | ||
|
|
||
| try { | ||
| // Configure WireMock to capture multipart requests | ||
| mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) | ||
| .withHeader(HttpHeaders.CONTENT_TYPE, WireMock.matching("multipart/form-data.*")) | ||
| .withRequestBody(WireMock.containing("param_testParam")) | ||
| .withRequestBody(WireMock.containing("testValue")) | ||
| .withRequestBody(WireMock.containing("param_anotherParam")) | ||
| .withRequestBody(WireMock.containing("123")) | ||
| .willReturn(WireMock.aResponse() | ||
| .withStatus(HttpStatus.SC_OK) | ||
| .withHeader("X-ClickHouse-Summary", | ||
| "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") | ||
| .withBody("1\n")).build()); | ||
|
|
||
| try (Client client = new Client.Builder() | ||
| .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) | ||
| .setUsername("default") | ||
| .setPassword(ClickHouseServerForTest.getPassword()) | ||
| .compressClientRequest(false) | ||
| .build()) { | ||
|
|
||
| // Create query with statement parameters | ||
| Map<String, Object> queryParams = new HashMap<>(); | ||
| queryParams.put("testParam", "testValue"); | ||
| queryParams.put("anotherParam", 123); | ||
|
|
||
| QueryResponse response = client.query( | ||
| "SELECT {testParam:String} as col1, {anotherParam:Int32} as col2", | ||
| queryParams).get(10, TimeUnit.SECONDS); | ||
|
||
|
|
||
| // Verify the request was made (WireMock will throw if expectations not met) | ||
| Assert.assertNotNull(response); | ||
| response.close(); | ||
| } | ||
| } finally { | ||
| mockServer.stop(); | ||
| } | ||
| } | ||
|
|
||
| @Test(groups = {"integration"}) | ||
| public void testSNIWithCloud() throws Exception { | ||
| if (!isCloud()) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -138,6 +138,7 @@ private static void delayForProfiler(long millis) { | |||
| } | ||||
| } | ||||
|
|
||||
|
|
||||
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overwriting the Content-Type header may discard the content encoding that was previously set. If compression is enabled and a Content-Encoding header was added earlier, this replacement could cause issues with the multipart boundary. Consider preserving or merging headers appropriately.