diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index de3f4de78..b556799a0 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -50,6 +50,8 @@ import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory; import org.apache.hc.core5.http.io.SocketConfig; import org.apache.hc.core5.http.io.entity.EntityTemplate; +import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder; +import org.apache.hc.client5.http.entity.mime.ContentBody; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.IOCallback; @@ -432,12 +434,54 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding addHeaders(req, requestConfig); - - // setting entity. wrapping if compression is enabled - String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), - lz4Factory, - requestConfig)); + // Check if statement params exist - if so, send as multipart + boolean hasStatementParams = requestConfig.containsKey(KEY_STATEMENT_PARAMS); + Map statementParams = hasStatementParams ? + (Map) requestConfig.get(KEY_STATEMENT_PARAMS) : null; + + HttpEntity requestEntity; + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? + req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + + if (hasStatementParams && statementParams != null && !statementParams.isEmpty()) { + // Create multipart entity with query body and statement params + MultipartEntityBuilder multipartBuilder = MultipartEntityBuilder.create(); + + // Add query/body data as streaming binary body + if (writeCallback != null) { + // Use streaming ContentBody to avoid buffering large queries in memory + StreamingContentBody queryBody = new StreamingContentBody(writeCallback, CONTENT_TYPE); + multipartBuilder.addPart("query", queryBody); + } + + // Add statement params as multipart parts + for (Map.Entry entry : statementParams.entrySet()) { + String paramName = "param_" + entry.getKey().toString(); + String paramValue = String.valueOf(entry.getValue()); + multipartBuilder.addTextBody(paramName, paramValue); + } + + requestEntity = multipartBuilder.build(); + // Update content type header for multipart + // Note: Content-Encoding is a separate header and will be preserved + // The multipart entity's Content-Type includes the boundary which is required + req.setHeader(HttpHeaders.CONTENT_TYPE, requestEntity.getContentType()); + } else { + // No statement params - use regular entity template + requestEntity = new EntityTemplate(-1, CONTENT_TYPE, contentEncoding, writeCallback); + } + + // Wrap entity with compression if enabled + // This will set Content-Encoding header if compression is enabled + HttpEntity wrappedEntity = wrapRequestEntity(requestEntity, lz4Factory, requestConfig); + + // Ensure Content-Encoding header is set on request if entity has it + // This preserves compression settings even after setting multipart Content-Type + if (wrappedEntity.getContentEncoding() != null) { + req.setHeader(HttpHeaders.CONTENT_ENCODING, wrappedEntity.getContentEncoding()); + } + + req.setEntity(wrappedEntity); HttpClientContext context = HttpClientContext.create(); Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); @@ -595,10 +639,7 @@ private void addQueryParams(URIBuilder req, Map requestConfig) { if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { req.addParameter(ClickHouseHttpProto.QPARAM_QUERY_ID, requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()).toString()); } - if (requestConfig.containsKey(KEY_STATEMENT_PARAMS)) { - Map params = (Map) requestConfig.get(KEY_STATEMENT_PARAMS); - params.forEach((k, v) -> req.addParameter("param_" + k, String.valueOf(v))); - } + // Statement params are now sent as multipart, not query params boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); @@ -912,4 +953,92 @@ protected void prepareSocket(SSLSocket socket, HttpContext context) throws IOExc } } } + + /** + * Streaming ContentBody implementation that writes data from an IOCallback without buffering. + * This avoids OutOfMemoryError for large queries by streaming data directly to the output stream. + */ + private static class StreamingContentBody implements ContentBody { + private final IOCallback writeCallback; + private final ContentType contentType; + + StreamingContentBody(IOCallback writeCallback, ContentType contentType) { + this.writeCallback = writeCallback; + this.contentType = contentType; + } + + @Override + public String getFilename() { + return null; + } + + @Override + public String getMimeType() { + return contentType != null ? contentType.getMimeType() : null; + } + + @Override + public String getCharset() { + return contentType != null && contentType.getCharset() != null ? + contentType.getCharset().name() : null; + } + + @Override + public long getContentLength() { + return -1; // Unknown length - streaming + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + // Wrap the stream to prevent premature closure + // Multipart entities may call writeTo() multiple times, so we must not close the stream + OutputStream nonClosingStream = new OutputStream() { + @Override + public void write(int b) throws IOException { + outStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + outStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + outStream.flush(); + } + + @Override + public void close() throws IOException { + // Do not close - let the caller (multipart entity) handle stream closure + // This prevents "Stream already closed" errors when writeTo() is called multiple times + flush(); + } + }; + writeCallback.execute(nonClosingStream); + } + + @Override + public String getMediaType() { + String mimeType = getMimeType(); + if (mimeType != null && mimeType.contains("/")) { + return mimeType.substring(0, mimeType.indexOf('/')); + } + return null; + } + + @Override + public String getSubType() { + String mimeType = getMimeType(); + if (mimeType != null && mimeType.contains("/")) { + return mimeType.substring(mimeType.indexOf('/') + 1); + } + return null; + } + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index 49d12098e..79f833a5f 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -43,7 +43,9 @@ import java.util.Arrays; import java.util.Base64; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -1102,6 +1104,55 @@ public void testTimeoutsWithRetry() { } } + @Test(groups = {"integration"}) + public void testStatementParamsSentAsMultipart() throws Exception { + if (isCloud()) { + return; // mocked server + } + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration + .options().dynamicPort().notifier(new ConsoleNotifier(false))); + mockServer.start(); + + try { + // Configure WireMock to capture multipart requests + mockServer.addStubMapping(WireMock.post(WireMock.anyUrl()) + .withHeader(HttpHeaders.CONTENT_TYPE, WireMock.matching("multipart/form-data.*")) + .withRequestBody(WireMock.containing("param_testParam")) + .withRequestBody(WireMock.containing("testValue")) + .withRequestBody(WireMock.containing("param_anotherParam")) + .withRequestBody(WireMock.containing("123")) + .willReturn(WireMock.aResponse() + .withStatus(HttpStatus.SC_OK) + .withHeader("X-ClickHouse-Summary", + "{ \"read_bytes\": \"10\", \"read_rows\": \"1\"}") + .withBody("1\n")).build()); + + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, "localhost", mockServer.port(), false) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .compressClientRequest(false) + .build()) { + + // Create query with statement parameters + Map queryParams = new HashMap<>(); + queryParams.put("testParam", "testValue"); + queryParams.put("anotherParam", 123); + + QueryResponse response = client.query( + "SELECT {testParam:String} as col1, {anotherParam:Int32} as col2", + queryParams).get(10, TimeUnit.SECONDS); + + // Verify the request was made (WireMock will throw if expectations not met) + Assert.assertNotNull(response); + response.close(); + } + } finally { + mockServer.stop(); + } + } + @Test(groups = {"integration"}) public void testSNIWithCloud() throws Exception { if (!isCloud()) { diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index b8b1da74d..30b099dc7 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -138,6 +138,7 @@ private static void delayForProfiler(long millis) { } } + @Test(groups = {"integration"}) public void testSimpleQueryWithTSV() { prepareSimpleDataSet();