From 2ebf1380aa3b17bb5ef582bd6f43396696e07e38 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Fri, 12 Dec 2025 15:26:46 -0800 Subject: [PATCH 1/2] Added sending statement parameters in multipart data request --- .../api/internal/HttpAPIClientHelper.java | 52 +++++++++++++++---- .../clickhouse/client/HttpTransportTests.java | 51 ++++++++++++++++++ .../clickhouse/client/query/QueryTests.java | 1 + 3 files changed, 94 insertions(+), 10 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index de3f4de78..354cf4b2d 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -50,6 +50,8 @@ import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; +import java.io.ByteArrayOutputStream; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.IOCallback; @@ -432,12 +434,45 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding addHeaders(req, requestConfig); - - // setting entity. wrapping if compression is enabled - String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), - lz4Factory, - requestConfig)); + // Check if statement params exist - if so, send as multipart + boolean hasStatementParams = requestConfig.containsKey(KEY_STATEMENT_PARAMS); + Map statementParams = hasStatementParams ? + (Map) requestConfig.get(KEY_STATEMENT_PARAMS) : null; + + HttpEntity requestEntity; + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? + req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + + if (hasStatementParams && statementParams != null && !statementParams.isEmpty()) { + // Create multipart entity with query body and statement params + MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create(); + + // Add query/body data as binary body + if (writeCallback != null) { + // Write callback output to buffer, then add as binary body + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + writeCallback.execute(buffer); + byte[] queryData = buffer.toByteArray(); + multipartBuilder.addBinaryBody("query", queryData, CONTENT_TYPE, null); + } + + // Add statement params as multipart parts + for (Map.Entry entry : statementParams.entrySet()) { + String paramName = "param_" + entry.getKey().toString(); + String paramValue = String.valueOf(entry.getValue()); + multipartBuilder.addTextBody(paramName, paramValue); + } + + requestEntity = multipartBuilder.build(); + // Update content type header for multipart + req.setHeader(HttpHeaders.CONTENT_TYPE, requestEntity.getContentType()); + } else { + // No statement params - use regular entity template + requestEntity = new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback); + } + + // Wrap entity with compression if enabled + req.setEntity(wrapRequestEntity(requestEntity, lz4Factory, requestConfig)); HttpClientContext context = HttpClientContext.create(); Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); @@ -595,10 +630,7 @@ private void addQueryParams(URIBuilder req, Map requestConfig) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { - Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); - params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); - } + // Statement params are now sent as multipart, not query params boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 49d12098e..f695aeada 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -43,7 +43,9 @@ import java.util.Arrays; import java.util.Base64; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -1102,6 +1104,55 @@ public void testTimeoutsWithRetry() { } } + @Test(groups = {"integration"}) + public void testStatementParamsSentAsMultipart() throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().port(9090).notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + // Configure WireMock to capture multipart requests + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withHeader(HttpHeaders.CONTENT_TYPE, WireMock.matching("multipart/form-data.*")) + .withRequestBody(WireMock.containing("param_testParam")) + .withRequestBody(WireMock.containing("testValue")) + .withRequestBody(WireMock.containing("param_anotherParam")) + .withRequestBody(WireMock.containing("123")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") + .withBody("1\n")).build()); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressClientRequest(false) + .build()) { + + // Create query with statement parameters + Map queryParams = new HashMap<>(); + queryParams.put("testParam", "testValue"); + queryParams.put("anotherParam", 123); + + QueryResponse response = client.query( + "SELECT {testParam:String} as col1, {anotherParam:Int32} as col2", + queryParams).get(10, TimeUnit.SECONDS); + + // Verify the request was made (WireMock will throw if expectations not met) + Assert.assertNotNull(response); + response.close(); + } + } finally { + mockServer.stop(); + } + } + @Test(groups = {"integration"}) public void testSNIWithCloud() throws Exception { if (!isCloud()) { diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index b8b1da74d..30b099dc7 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -138,6 +138,7 @@ private static void delayForProfiler(long millis) { } } + @Test(groups = {"integration"}) public void testSimpleQueryWithTSV() { prepareSimpleDataSet(); From 9fdb2fb1de3c1de7906a9640ca1d734196649318 Mon Sep 17 00:00:00 2001 From: Sergey Chernov Date: Fri, 12 Dec 2025 15:42:05 -0800 Subject: [PATCH 2/2] fixed potential issue with overriding content encoding. made query body to stream instead of copy --- .../api/internal/HttpAPIClientHelper.java | 113 ++++++++++++++++-- .../clickhouse/client/HttpTransportTests.java | 2 +- 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 354cf4b2d..b556799a0 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -51,7 +51,7 @@ import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.entity.EntityTemplate; import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; -import java.io.ByteArrayOutputStream; +import org.apache.hc.client5.http.entity.mime.ContentBody; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.IOCallback; @@ -447,13 +447,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r // Create multipart entity with query body and statement params MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create(); - // Add query/body data as binary body + // Add query/body data as streaming binary body if (writeCallback != null) { - // Write callback output to buffer, then add as binary body - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - writeCallback.execute(buffer); - byte[] queryData = buffer.toByteArray(); - multipartBuilder.addBinaryBody("query", queryData, CONTENT_TYPE, null); + // Use streaming ContentBody to avoid buffering large queries in memory + StreamingContentBody queryBody = new StreamingContentBody(writeCallback, CONTENT_TYPE); + multipartBuilder.addPart("query", queryBody); } // Add statement params as multipart parts @@ -465,6 +463,8 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r requestEntity = multipartBuilder.build(); // Update content type header for multipart + // Note: Content-Encoding is a separate header and will be preserved + // The multipart entity's Content-Type includes the boundary which is required req.setHeader(HttpHeaders.CONTENT_TYPE, requestEntity.getContentType()); } else { // No statement params - use regular entity template @@ -472,7 +472,16 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r } // Wrap entity with compression if enabled - req.setEntity(wrapRequestEntity(requestEntity, lz4Factory, requestConfig)); + // This will set Content-Encoding header if compression is enabled + HttpEntity wrappedEntity = wrapRequestEntity(requestEntity, lz4Factory, requestConfig); + + // Ensure Content-Encoding header is set on request if entity has it + // This preserves compression settings even after setting multipart Content-Type + if (wrappedEntity.getContentEncoding() != null) { + req.setHeader(HttpHeaders.CONTENT_ENCODING, wrappedEntity.getContentEncoding()); + } + + req.setEntity(wrappedEntity); HttpClientContext context = HttpClientContext.create(); Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); @@ -944,4 +953,92 @@ protected void prepareSocket(SSLSocket socket, HttpContext context) throws IOExc } } } + + /** + * Streaming ContentBody implementation that writes data from an IOCallback without buffering. + * This avoids OutOfMemoryError for large queries by streaming data directly to the output stream. + */ + private static class StreamingContentBody implements ContentBody { + private final IOCallback writeCallback; + private final ContentType contentType; + + StreamingContentBody(IOCallback writeCallback, ContentType contentType) { + this.writeCallback = writeCallback; + this.contentType = contentType; + } + + @Override + public String getFilename() { + return null; + } + + @Override + public String getMimeType() { + return contentType != null ? contentType.getMimeType() : null; + } + + @Override + public String getCharset() { + return contentType != null && contentType.getCharset() != null ? + contentType.getCharset().name() : null; + } + + @Override + public long getContentLength() { + return -1; // Unknown length - streaming + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + // Wrap the stream to prevent premature closure + // Multipart entities may call writeTo() multiple times, so we must not close the stream + OutputStream nonClosingStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + outStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + outStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + outStream.flush(); + } + + @Override + public void close() throws IOException { + // Do not close - let the caller (multipart entity) handle stream closure + // This prevents "Stream already closed" errors when writeTo() is called multiple times + flush(); + } + }; + writeCallback.execute(nonClosingStream); + } + + @Override + public String getMediaType() { + String mimeType = getMimeType(); + if (mimeType != null && mimeType.contains("/")) { + return mimeType.substring(0, mimeType.indexOf('/')); + } + return null; + } + + @Override + public String getSubType() { + String mimeType = getMimeType(); + if (mimeType != null && mimeType.contains("/")) { + return mimeType.substring(mimeType.indexOf('/') + 1); + } + return null; + } + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index f695aeada..79f833a5f 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1111,7 +1111,7 @@ public void testStatementParamsSentAsMultipart() throws Exception { } WireMockServer mockServer = new WireMockServer(WireMockConfiguration - .options().port(9090).notifier(new ConsoleNotifier(false))); + .options().dynamicPort().notifier(new ConsoleNotifier(false))); mockServer.start(); try {