diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..343066c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,62 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +######################################## +# Java +######################################## +[*.java] +indent_style = space +indent_size = 4 +max_line_length = off + +######################################## +# XML (POM files, Spring XML, misc) +######################################## +[*.xml] +indent_style = space +indent_size = 2 + +######################################## +# JSON +######################################## +[*.json] +indent_style = space +indent_size = 2 + +######################################## +# YAML (docker-compose, GitHub actions, configs) +# YAML SHOULD NEVER USE TABS — many parsers reject them +######################################## +[*.{yml,yaml}] +indent_style = space +indent_size = 2 + + +######################################## +# Markdown +# Keep indentation small; used mainly for lists and fenced blocks. +######################################## +[*.md] +indent_style = space +indent_size = 2 +trim_trailing_whitespace = false # needed to preserve Markdown hard line breaks + +######################################## +# Properties files +# Key-value pairs; indenting rarely matters but 2 spaces is common. +######################################## +[*.properties] +indent_style = space +indent_size = 2 + +######################################## +# HTML (simple default) +######################################## +[*.html] +indent_style = space +indent_size = 2 diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs new file mode 100644 index 0000000..d381a67 --- /dev/null +++ b/.git-blame-ignore-revs @@ -0,0 +1 @@ +cabb1a4746c49db58102833a858f4152141a4c7f diff --git a/README.md b/README.md index 4f8cf84..3210ced 100644 --- a/README.md +++ b/README.md @@ -2,28 +2,41 @@ This is a standalone project providing the functionality to implement "streaming" up- and downloads. -It is primarily meant to be used within the Step product, but it could also be used standalone. +It is primarily meant to be used within the Step product, but it could also be used standalone. "Streaming" here has a slightly special meaning. The purpose is to allow downloads of resources that are still being produced. We provide a reference implementation based on Websockets. ## Websockets implementation high-level description -1. Upload clients connect to a configurable server endpoint, send a "request upload" message with the file metadata, and expect a "ready for upload" message in return, containing a reference to the endpoint identifying the resource (i.e., a URL/URI). Uploads are currently sent as a one-shot stream upload (i.e., they are not resumable). -2. On the server side, as soon as an upload is initiated, the corresponding resource will be available for download requests, i.e., download clients may connect to the endpoint indicated by the reference, and start requesting data. -3. To provide this functionality, ongoing uploads are checkpointed at regular (configurable) intervals, emitting status updates about the current size of the resource. These status updates are forwarded to all clients interested in the resource in question. -4. A download client connects to the given reference endpoint, and immediately receives the latest known status. It will also receive any potential status updates as soon as they happen on the server side. -5. Clients may at any time request any chunk of the data, identified by a start and end offset, as long as the end offset is within the limits of the last known/checkpointed size on the server. +1. Upload clients connect to a configurable server endpoint, send a "request upload" message with the file metadata, and + expect a "ready for upload" message in return, containing a reference to the endpoint identifying the resource (i.e., + a URL/URI). Uploads are currently sent as a one-shot stream upload (i.e., they are not resumable). +2. On the server side, as soon as an upload is initiated, the corresponding resource will be available for download + requests, i.e., download clients may connect to the endpoint indicated by the reference, and start requesting data. +3. To provide this functionality, ongoing uploads are checkpointed at regular (configurable) intervals, emitting status + updates about the current size of the resource. These status updates are forwarded to all clients interested in the + resource in question. +4. A download client connects to the given reference endpoint, and immediately receives the latest known status. It will + also receive any potential status updates as soon as they happen on the server side. +5. Clients may at any time request any chunk of the data, identified by a start and end offset, as long as the end + offset is within the limits of the last known/checkpointed size on the server. 6. A typical scenario for a client wishing to retrieve the full file while it is being produced is: - - Wait for new status messages to arrive - - Download the next chunk of data (start offset=offset of already received data, end offset=current file size) - - Terminate once all data is received, as indicated by the status message (`status==COMPLETED`) -7. Of course, it is also possible to retrieve data that has been fully uploaded. In that case, only a single chunk download is actually performed. -8. This implementation has an additional benefit: clients may also request only parts of a file, for instance for client-side pagination of large log files. + - Wait for new status messages to arrive + - Download the next chunk of data (start offset=offset of already received data, end offset=current file size) + - Terminate once all data is received, as indicated by the status message (`status==COMPLETED`) +7. Of course, it is also possible to retrieve data that has been fully uploaded. In that case, only a single chunk + download is actually performed. +8. This implementation has an additional benefit: clients may also request only parts of a file, for instance for + client-side pagination of large log files. ## Project structure -There are currently 7 root-level modules: 4 for the API definitions (common, server, client-upload, client-download), and 2 containing (potentially partial) implementation classes (impl-common, impl-server). The Websocket implementation (impl-websocket) is provided as another module, with submodules mirroring the API modules. -Note that nontrivial Unit tests are consolidated in the websocket server module, because a full "infrastructure" including client and server parts is required for meaningful tests. +There are currently 7 root-level modules: 4 for the API definitions (common, server, client-upload, client-download), +and 2 containing (potentially partial) implementation classes (impl-common, impl-server). The Websocket implementation ( +impl-websocket) is provided as another module, with submodules mirroring the API modules. + +Note that nontrivial Unit tests are consolidated in the websocket server module, because a full "infrastructure" +including client and server parts is required for meaningful tests. diff --git a/build_parameters.json b/build_parameters.json index 4237e92..f4748eb 100644 --- a/build_parameters.json +++ b/build_parameters.json @@ -1,17 +1,17 @@ { - "ACTION": "COMPILE", - "TYPE": "POM", - "FOLDER": ".", - "PARAMETERS": [ - { - "NAME": "DEVELOPMENT", - "URL": "nexus-staging::https://nexus-enterprise-staging.exense.ch/repository/staging-maven/", - "CONFIG": "SkipJavadoc" - }, - { - "NAME": "PRODUCTION", - "URL": "sonatype::https://obsolete/", - "CONFIG": "SignedBuild" - } - ] -} \ No newline at end of file + "ACTION": "COMPILE", + "TYPE": "POM", + "FOLDER": ".", + "PARAMETERS": [ + { + "NAME": "DEVELOPMENT", + "URL": "nexus-staging::https://nexus-enterprise-staging.exense.ch/repository/staging-maven/", + "CONFIG": "SkipJavadoc" + }, + { + "NAME": "PRODUCTION", + "URL": "sonatype::https://obsolete/", + "CONFIG": "SignedBuild" + } + ] +} diff --git a/pom.xml b/pom.xml index c99750d..68c05a0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,399 +1,399 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - pom + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + pom - - ${project.groupId}:${project.artifactId} - Step Streaming - https://github.com/exense/step-streaming + + ${project.groupId}:${project.artifactId} + Step Streaming + https://github.com/exense/step-streaming - - - The GNU Affero General Public License, Version 3 - https://www.gnu.org/licenses/#AGPL - - + + + The GNU Affero General Public License, Version 3 + https://www.gnu.org/licenses/#AGPL + + - - - Exense GmbH - Exense GmbH - https://exense.ch - support@exense.ch - - + + + Exense GmbH + Exense GmbH + https://exense.ch + support@exense.ch + + - - scm:git:https://github.com/exense/step-streaming.git - scm:git:https://github.com/exense/step-streaming.git - https://github.com/exense/step-streaming - - + + scm:git:https://github.com/exense/step-streaming.git + scm:git:https://github.com/exense/step-streaming.git + https://github.com/exense/step-streaming + + - - - step-streaming-api-common - step-streaming-api-client-upload - step-streaming-api-client-download - step-streaming-api-server - step-streaming-impl-common - step-streaming-impl-server - step-streaming-impl-websocket - - - - - nexus-staging - https://nexus-enterprise.exense.ch/repository/staging-maven/ - - + + + step-streaming-api-common + step-streaming-api-client-upload + step-streaming-api-client-download + step-streaming-api-server + step-streaming-impl-common + step-streaming-impl-server + step-streaming-impl-websocket + + + + + nexus-staging + https://nexus-enterprise.exense.ch/repository/staging-maven/ + + - - UTF-8 - 11 - 11 - 11 + + UTF-8 + 11 + 11 + 11 - - 2025.6.25 + + 2025.6.25 - - 3.14.0 - 3.4.2 - 0.8.12 - 3.0.0-M1 - 3.2.5 - 3.3.2 - 3.0.1 - 1.6 - 10.0.3 - + + 3.14.0 + 3.4.2 + 0.8.12 + 3.0.0-M1 + 3.2.5 + 3.3.2 + 3.0.1 + 1.6 + 10.0.3 + - - - - - - ch.exense.dependencies - dependencies-logging - ${dependencies.version} - pom - import - - - ch.exense.dependencies - dependencies-junit - ${dependencies.version} - pom - import - - - ch.exense.dependencies - dependencies-jackson - ${dependencies.version} - pom - import - - - ch.exense.dependencies - dependencies-jakarta - ${dependencies.version} - pom - import - - - ch.exense.dependencies - dependencies-jetty - ${dependencies.version} - pom - import - + + + + + + ch.exense.dependencies + dependencies-logging + ${dependencies.version} + pom + import + + + ch.exense.dependencies + dependencies-junit + ${dependencies.version} + pom + import + + + ch.exense.dependencies + dependencies-jackson + ${dependencies.version} + pom + import + + + ch.exense.dependencies + dependencies-jakarta + ${dependencies.version} + pom + import + + + ch.exense.dependencies + dependencies-jetty + ${dependencies.version} + pom + import + - - - ch.exense.streaming - step-streaming-api-common - ${project.version} - - - ch.exense.streaming - step-streaming-api-server - ${project.version} - - - ch.exense.streaming - step-streaming-api-client-upload - ${project.version} - - - ch.exense.streaming - step-streaming-api-client-download - ${project.version} - - - ch.exense.streaming - step-streaming-impl-common - ${project.version} - - - ch.exense.streaming - step-streaming-impl-server - ${project.version} - - - - ch.exense.streaming - step-streaming-impl-websocket - ${project.version} - - - ch.exense.streaming - step-streaming-impl-websocket-common - ${project.version} - - - ch.exense.streaming - step-streaming-impl-websocket-server - ${project.version} - - - ch.exense.streaming - step-streaming-impl-websocket-client-upload - ${project.version} - - - ch.exense.streaming - step-streaming-impl-websocket-client-download - ${project.version} - - - + + + ch.exense.streaming + step-streaming-api-common + ${project.version} + + + ch.exense.streaming + step-streaming-api-server + ${project.version} + + + ch.exense.streaming + step-streaming-api-client-upload + ${project.version} + + + ch.exense.streaming + step-streaming-api-client-download + ${project.version} + + + ch.exense.streaming + step-streaming-impl-common + ${project.version} + + + ch.exense.streaming + step-streaming-impl-server + ${project.version} + + + + ch.exense.streaming + step-streaming-impl-websocket + ${project.version} + + + ch.exense.streaming + step-streaming-impl-websocket-common + ${project.version} + + + ch.exense.streaming + step-streaming-impl-websocket-server + ${project.version} + + + ch.exense.streaming + step-streaming-impl-websocket-client-upload + ${project.version} + + + ch.exense.streaming + step-streaming-impl-websocket-client-download + ${project.version} + + + - - - - - junit - junit - test - + + + + + junit + junit + test + - - ch.qos.logback - logback-classic - test - + + ch.qos.logback + logback-classic + test + - + - - - - - org.apache.maven.plugins - maven-compiler-plugin - ${dep.mvn.compiler.version} - - - org.apache.maven.plugins - maven-jar-plugin - ${dep.mvn.jar.version} - - - org.apache.maven.plugins - maven-deploy-plugin - ${dep.mvn.deploy.version} - - - org.apache.maven.plugins - maven-source-plugin - ${dep.mvn.source.version} - - - org.apache.maven.plugins - maven-javadoc-plugin - ${dep.mvn.javadoc.version} - - - org.jacoco - jacoco-maven-plugin - ${dep.mvn.jacoco.version} - - - org.apache.maven.plugins - maven-surefire-plugin - ${dep.mvn.surefire.version} - - - org.apache.maven.surefire - surefire-junit47 - ${dep.mvn.surefire.version} - - - - - org.owasp - dependency-check-maven - ${dep.mvn.dependency-check.version} - - - org.apache.maven.plugins - maven-gpg-plugin - ${dep.mvn.gpg.version} - - - - - - org.jacoco - jacoco-maven-plugin - - - default-prepare-agent - - prepare-agent - - - - default-report - - report - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - ${basedir}/../logback-maven.xml - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar-no-fork - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - -
]]>
- + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + +
]]>
+ - false + false - - - - false + + + + false - true - 11 - - -Xdoclint:none - -
- - - attach-javadocs - - jar - - - -
-
-
+ true + 11 + + -Xdoclint:none + + + + + attach-javadocs + + jar + + + + + + - - - - SkipJavadoc - - true - - - true - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - - - - SignedBuild - - false - - - - - - org.apache.maven.plugins - maven-gpg-plugin - - - sign-artifacts - verify - - sign - - - - --pinentry-mode - loopback - - - - - + + + + SkipJavadoc + + true + + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + + SignedBuild + + false + + + + + + org.apache.maven.plugins + maven-gpg-plugin + + + sign-artifacts + verify + + sign + + + + --pinentry-mode + loopback + + + + + - - - org.apache.maven.plugins - maven-deploy-plugin - - true - - + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + - - - org.sonatype.central - central-publishing-maven-plugin - 0.8.0 - true - - sonatype - - - - - - + + + org.sonatype.central + central-publishing-maven-plugin + 0.8.0 + true + + sonatype + + + + + + -
\ No newline at end of file + diff --git a/step-streaming-api-client-download/pom.xml b/step-streaming-api-client-download/pom.xml index 74a7f93..814b643 100644 --- a/step-streaming-api-client-download/pom.xml +++ b/step-streaming-api-client-download/pom.xml @@ -1,24 +1,24 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-api-client-download - ${project.groupId}:${project.artifactId} - jar + step-streaming-api-client-download + ${project.groupId}:${project.artifactId} + jar - - - ch.exense.streaming - step-streaming-api-common - - + + + ch.exense.streaming + step-streaming-api-common + + - \ No newline at end of file + diff --git a/step-streaming-api-client-upload/pom.xml b/step-streaming-api-client-upload/pom.xml index 85319c1..5bedcdb 100644 --- a/step-streaming-api-client-upload/pom.xml +++ b/step-streaming-api-client-upload/pom.xml @@ -1,23 +1,23 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-api-client-upload - ${project.groupId}:${project.artifactId} - jar + step-streaming-api-client-upload + ${project.groupId}:${project.artifactId} + jar - - - ch.exense.streaming - step-streaming-api-common - - - \ No newline at end of file + + + ch.exense.streaming + step-streaming-api-common + + + diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUpload.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUpload.java index f8b4032..70f7525 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUpload.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUpload.java @@ -64,11 +64,11 @@ public StreamingUploadSession getSession() { * @param timeout the maximum time to wait for the final status; must not be {@code null}. * @return the final {@link StreamingResourceStatus} of the upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws NullPointerException if {@code timeout} is {@code null}. - * @throws InterruptedException if the current thread is interrupted while waiting. - * @throws ExecutionException if the upload completed exceptionally; the cause can be - * retrieved from {@link ExecutionException#getCause()}. - * @throws TimeoutException if the given timeout elapses before the final status is available. + * @throws NullPointerException if {@code timeout} is {@code null}. + * @throws InterruptedException if the current thread is interrupted while waiting. + * @throws ExecutionException if the upload completed exceptionally; the cause can be + * retrieved from {@link ExecutionException#getCause()}. + * @throws TimeoutException if the given timeout elapses before the final status is available. */ public StreamingResourceStatus complete(Duration timeout) throws QuotaExceededException, ExecutionException, InterruptedException, TimeoutException { Objects.requireNonNull(timeout); @@ -94,10 +94,9 @@ public StreamingResourceStatus complete(Duration timeout) throws QuotaExceededEx * * @return the final {@link StreamingResourceStatus} of the upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws InterruptedException if the current thread is interrupted while waiting. - * @throws ExecutionException if the upload completed exceptionally; the cause can be - * retrieved from {@link ExecutionException#getCause()}. - * + * @throws InterruptedException if the current thread is interrupted while waiting. + * @throws ExecutionException if the upload completed exceptionally; the cause can be + * retrieved from {@link ExecutionException#getCause()}. * @see #complete(Duration) */ public StreamingResourceStatus complete() throws QuotaExceededException, ExecutionException, InterruptedException { @@ -131,6 +130,7 @@ public void cancel(Throwable cause) { * The implementation will internally use a * {@link java.util.concurrent.CancellationException} with the message * {@code "Cancelled by user"}. + * * @see #cancel(Throwable) */ public void cancel() { diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadProvider.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadProvider.java index c676af2..958a920 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadProvider.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadProvider.java @@ -39,7 +39,7 @@ public interface StreamingUploadProvider { * @param metadata the metadata describing the file, such as file name and content type. * @return a {@link StreamingUploadSession} instance representing the active upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws IOException if the file does not exist, cannot be read, or another I/O error occurs. + * @throws IOException if the file does not exist, cannot be read, or another I/O error occurs. * @see StreamingUploadSession#signalEndOfInput() * @see #startLiveTextFileUpload(File, StreamingResourceMetadata, Charset) */ @@ -72,7 +72,7 @@ public interface StreamingUploadProvider { * @param charset the {@link Charset} (encoding) of the source file; must not be {@code null}. * @return a {@link StreamingUploadSession} instance representing the active upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws IOException if the file does not exist, cannot be read, or another I/O error occurs. + * @throws IOException if the file does not exist, cannot be read, or another I/O error occurs. * @see StreamingUploadSession#signalEndOfInput() * @see #startLiveBinaryFileUpload(File, StreamingResourceMetadata) */ diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadSession.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadSession.java index b40d4ab..41dc43f 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadSession.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploadSession.java @@ -35,6 +35,7 @@ public interface StreamingUploadSession extends StreamingTransfer { /** * Returns the {@link StreamingResourceMetadata} of the upload being performed. + * * @return the upload metadata. */ StreamingResourceMetadata getMetadata(); diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploads.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploads.java index 7ff0a3d..640642d 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploads.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/StreamingUploads.java @@ -99,6 +99,7 @@ public StreamingUpload startTextFileUpload(File textFile, Charset charset) throw * If you need more control over the metadata, like specifying a different filename, directly use * {@link StreamingUploadProvider#startLiveTextFileUpload(File, StreamingResourceMetadata, Charset) of the provider. *

+ * * @param textFile the text file to upload. * @param charset the {@link Charset} of the source file; must not be {@code null}. * @return a {@link StreamingUpload} representing the active upload. @@ -108,12 +109,12 @@ public StreamingUpload startTextFileUpload(File textFile, Charset charset) throw */ public StreamingUpload startTextFileUpload(File textFile, Charset charset, String mimeType) throws QuotaExceededException, IOException { StreamingUploadSession session = provider.startLiveTextFileUpload( - textFile, - new StreamingResourceMetadata( - textFile.getName(), - mimeType, - true), - Objects.requireNonNull(charset)); + textFile, + new StreamingResourceMetadata( + textFile.getName(), + mimeType, + true), + Objects.requireNonNull(charset)); return new StreamingUpload(session); } @@ -154,11 +155,11 @@ public StreamingUpload startBinaryFileUpload(File file) throws QuotaExceededExce */ public StreamingUpload startBinaryFileUpload(File file, String mimeType) throws QuotaExceededException, IOException { StreamingUploadSession session = provider.startLiveBinaryFileUpload( - file, - new StreamingResourceMetadata( - file.getName(), - mimeType, - false)); + file, + new StreamingResourceMetadata( + file.getName(), + mimeType, + false)); return new StreamingUpload(session); } diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/AbstractStreamingUploadProvider.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/AbstractStreamingUploadProvider.java index bc77d37..68b9fe3 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/AbstractStreamingUploadProvider.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/AbstractStreamingUploadProvider.java @@ -126,10 +126,10 @@ public void setUploadBufferSize(int uploadBufferSize) { * @param fileToStream the binary file to upload. * @param metadata metadata describing the file to upload. * @return a {@link StreamingUploadSession} representing the active upload. - * @throws QuotaExceededException if the server signals that the upload failed due to quota limitations - * @throws NullPointerException if any of the arguments is missing + * @throws QuotaExceededException if the server signals that the upload failed due to quota limitations + * @throws NullPointerException if any of the arguments is missing * @throws IllegalArgumentException if an invalid argument combination is provided - * @throws IOException if the file cannot be read or an error occurs during stream setup. + * @throws IOException if the file cannot be read or an error occurs during stream setup. */ @Override public StreamingUploadSession startLiveBinaryFileUpload(File fileToStream, StreamingResourceMetadata metadata) throws QuotaExceededException, IOException { @@ -149,8 +149,8 @@ public StreamingUploadSession startLiveBinaryFileUpload(File fileToStream, Strea * @param charset the character encoding of the source file. * @return a {@link StreamingUploadSession} representing the active upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws NullPointerException if any of the arguments is missing - * @throws IOException if the file cannot be read or an error occurs during stream setup. + * @throws NullPointerException if any of the arguments is missing + * @throws IOException if the file cannot be read or an error occurs during stream setup. */ @Override public StreamingUploadSession startLiveTextFileUpload(File textFile, StreamingResourceMetadata metadata, Charset charset) throws QuotaExceededException, IOException { @@ -172,7 +172,7 @@ public StreamingUploadSession startLiveTextFileUpload(File textFile, StreamingRe * @param convertFromCharset if non-null, the character set used to decode the file before converting to UTF-8. * @return a {@link StreamingUploadSession} representing the active upload. * @throws QuotaExceededException if the upload failed due to quota limitations - * @throws IOException if the file cannot be read or the stream cannot be created + * @throws IOException if the file cannot be read or the stream cannot be created */ protected StreamingUploadSession startLiveFileUpload(File fileToStream, StreamingResourceMetadata metadata, Charset convertFromCharset) throws QuotaExceededException, IOException { if (!admissionSemaphore.tryAcquire()) { @@ -184,8 +184,8 @@ protected StreamingUploadSession startLiveFileUpload(File fileToStream, Streamin EndOfInputSignal endOfInputSignal = new EndOfInputSignal(); LiveFileInputStream liveInputStream = new LiveFileInputStream(fileToStream, endOfInputSignal, uploadFilePollInterval); InputStream uploadInputStream = convertFromCharset == null - ? liveInputStream - : new UTF8TranscodingTextInputStream(liveInputStream, convertFromCharset); + ? liveInputStream + : new UTF8TranscodingTextInputStream(liveInputStream, convertFromCharset); InputStream limitedBufferInputStream = new LimitedBufferInputStream(uploadInputStream, uploadBufferSize); StreamingUploadSession session = startLiveFileUpload(limitedBufferInputStream, metadata, endOfInputSignal); session.onClose(ignoredMessage -> activeSessions.remove(session)); @@ -206,9 +206,9 @@ protected StreamingUploadSession startLiveFileUpload(File fileToStream, Streamin * @param sourceInputStream the input stream representing the upload content. * @param metadata the metadata describing the resource. * @param endOfInputSignal the signal used to detect upload completion or cancellation. - * @throws QuotaExceededException if the upload failed due to quota limitations * @return a {@link StreamingUploadSession} representing the active upload. - * @throws IOException if the stream setup or transfer initiation fails. + * @throws QuotaExceededException if the upload failed due to quota limitations + * @throws IOException if the stream setup or transfer initiation fails. */ protected abstract StreamingUploadSession startLiveFileUpload(InputStream sourceInputStream, StreamingResourceMetadata metadata, diff --git a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/local/LocalStreamingUploadSession.java b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/local/LocalStreamingUploadSession.java index 1065c70..caa0901 100644 --- a/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/local/LocalStreamingUploadSession.java +++ b/step-streaming-api-client-upload/src/main/java/step/streaming/client/upload/impl/local/LocalStreamingUploadSession.java @@ -41,11 +41,13 @@ void transfer() { } finally { try { inputStream.close(); - } catch (IOException ignored) {} + } catch (IOException ignored) { + } if (actualOutputStream != null) { try { actualOutputStream.close(); - } catch (IOException ignored) {} + } catch (IOException ignored) { + } } } } diff --git a/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/AbstractStreamingUploadProviderTests.java b/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/AbstractStreamingUploadProviderTests.java index 57b9082..d11c00f 100644 --- a/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/AbstractStreamingUploadProviderTests.java +++ b/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/AbstractStreamingUploadProviderTests.java @@ -53,7 +53,8 @@ protected LocalStreamingUploadSession startLiveFileUpload(InputStream sourceInpu // Here, we're "inside" the semaphore holding a permit uploadStarted.complete(null); Thread.sleep(250); - } catch (InterruptedException ignored) {} + } catch (InterruptedException ignored) { + } return session; } }); diff --git a/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/local/LocalUploadTests.java b/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/local/LocalUploadTests.java index 7b63b8c..cacc436 100644 --- a/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/local/LocalUploadTests.java +++ b/step-streaming-api-client-upload/src/test/java/step/streaming/client/upload/impl/local/LocalUploadTests.java @@ -29,7 +29,8 @@ public void testDiscardingUpload() throws Exception { } finally { try { producer.stop(); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } Files.deleteIfExists(liveFile.toPath()); } } @@ -52,17 +53,19 @@ public void testDirectoryUpload() throws Exception { } finally { try { producer.stop(); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } Files.deleteIfExists(liveFile.toPath()); Files.deleteIfExists(tmpDir.toPath()); } } - private static class LineProducer extends Thread{ + private static class LineProducer extends Thread { private final File outputFile; private final int numberOfLines; private final long sleepBetween; + LineProducer(File outputFile, int numberOfLines, long sleepBetweenMs) { this.outputFile = outputFile; this.numberOfLines = numberOfLines; @@ -77,7 +80,7 @@ public void run() { Thread.sleep(sleepBetween); } } catch (Exception e) { - throw new RuntimeException(e); + throw new RuntimeException(e); } } } diff --git a/step-streaming-api-common/pom.xml b/step-streaming-api-common/pom.xml index 0652e2f..519aefb 100644 --- a/step-streaming-api-common/pom.xml +++ b/step-streaming-api-common/pom.xml @@ -1,17 +1,17 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-api-common - ${project.groupId}:${project.artifactId} - jar + step-streaming-api-common + ${project.groupId}:${project.artifactId} + jar - \ No newline at end of file + diff --git a/step-streaming-api-common/src/main/java/step/streaming/client/AbstractStreamingTransfer.java b/step-streaming-api-common/src/main/java/step/streaming/client/AbstractStreamingTransfer.java index a1ecdc1..3cbd2e1 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/client/AbstractStreamingTransfer.java +++ b/step-streaming-api-common/src/main/java/step/streaming/client/AbstractStreamingTransfer.java @@ -68,10 +68,11 @@ public void setCurrentStatus(StreamingResourceStatus status) { } } - /** Invoked when a status callback failed. + /** + * Invoked when a status callback failed. * The default implementation does nothing; override in subclasses e.g. for logging purposes. * - * @param callback the failed callback + * @param callback the failed callback * @param exception the exception that was thrown */ protected void onStatusCallbackFailed(Consumer callback, Exception exception) { diff --git a/step-streaming-api-common/src/main/java/step/streaming/client/StreamingTransfer.java b/step-streaming-api-common/src/main/java/step/streaming/client/StreamingTransfer.java index 4e28cd9..81b8fea 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/client/StreamingTransfer.java +++ b/step-streaming-api-common/src/main/java/step/streaming/client/StreamingTransfer.java @@ -37,7 +37,7 @@ public interface StreamingTransfer extends AutoCloseable { * may be useful e.g. if you're only interested in FAILED events. * If no filter is given, all status updates will be signaled. * - * @param callback the listener to notify on status changes + * @param callback the listener to notify on status changes * @param optionalTransferStatusFilter if provided, the callback will only be invoked * for these {@link StreamingResourceTransferStatus} values */ diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceMetadata.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceMetadata.java index 03fc696..6cf1fcb 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceMetadata.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceMetadata.java @@ -31,8 +31,8 @@ public StreamingResourceMetadata() { * Constructs a {@code StreamingResourceMetadata} with the given information. * Validates the MIME type format and filename upon construction. * - * @param filename the name of the file (must not be {@code null} or empty) - * @param mimeType the MIME type of the file + * @param filename the name of the file (must not be {@code null} or empty) + * @param mimeType the MIME type of the file * @param supportsLineAccess flag to indicate whether access by line number is possible for this resource * @throws NullPointerException if filename is {@code null} * @throws IllegalArgumentException if MIME type is {@code null} or invalid, or filename is empty @@ -56,7 +56,7 @@ public String getFilename() { * Sets the filename. * * @param filename the filename to set (must not be {@code null}) - * @throws NullPointerException if filename is {@code null} + * @throws NullPointerException if filename is {@code null} * @throws IllegalArgumentException if filename is empty */ public void setFilename(String filename) { @@ -99,9 +99,9 @@ public void setSupportsLineAccess(boolean supportsLineAccess) { @Override public String toString() { return "StreamingResourceMetadata{" + - "filename='" + filename + '\'' + - ", mimeType='" + mimeType + '\'' + - ", supportsLineAccess=" + supportsLineAccess + - '}'; + "filename='" + filename + '\'' + + ", mimeType='" + mimeType + '\'' + + ", supportsLineAccess=" + supportsLineAccess + + '}'; } } diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceStatus.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceStatus.java index be307b3..67b5b48 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceStatus.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceStatus.java @@ -8,6 +8,7 @@ * This includes both the transfer state (e.g., in-progress, completed, failed), * the last known size, and the number of lines (if applicable) of the resource, * at the time the status was generated. + * * @see StreamingResourceTransferStatus */ public class StreamingResourceStatus { @@ -27,7 +28,7 @@ public StreamingResourceStatus() { * * @param transferStatus the transfer status (must not be null) * @param currentSize the current size of the resource - * @param numberOfLines the current number of lines, if applicable + * @param numberOfLines the current number of lines, if applicable * @throws NullPointerException if {@code transferStatus} is null */ public StreamingResourceStatus(StreamingResourceTransferStatus transferStatus, long currentSize, Long numberOfLines) { @@ -94,10 +95,10 @@ public void setNumberOfLines(Long numberOfLines) { @Override public String toString() { return "StreamingResourceStatus{" + - "transferStatus=" + transferStatus + - ", currentSize=" + currentSize + - ", numberOfLines=" + numberOfLines + - '}'; + "transferStatus=" + transferStatus + + ", currentSize=" + currentSize + + ", numberOfLines=" + numberOfLines + + '}'; } @Override diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceTransferStatus.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceTransferStatus.java index 8d69453..1ede021 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceTransferStatus.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceTransferStatus.java @@ -1,6 +1,7 @@ package step.streaming.common; -/** Transfer status of a streaming resource. +/** + * Transfer status of a streaming resource. */ public enum StreamingResourceTransferStatus { INITIATED, diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContext.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContext.java index 42ea7fb..29129d6 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContext.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContext.java @@ -9,7 +9,7 @@ * may happen, for instance a keyword call in Step. A context's lifecycle MUST be registered using * {@link StreamingResourceUploadContexts#registerContext(StreamingResourceUploadContext)} * on instantiation, and unregistered using {@link StreamingResourceUploadContexts#unregisterContext(StreamingResourceUploadContext)} - * + *

* A context is identified by a unique auto-generated ID, which may be passed around to identify it e.g. when an upload happens etc. * A context can carry arbitrary attributes (exposed as a read/write Map) * @@ -34,8 +34,8 @@ public Map getAttributes() { @Override public String toString() { return "StreamingResourceUploadContext{" + - "contextId='" + contextId + '\'' + - ", attributes=" + attributes + - '}'; + "contextId='" + contextId + '\'' + + ", attributes=" + attributes + + '}'; } } diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContextListener.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContextListener.java index 96eebc6..e316284 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContextListener.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContextListener.java @@ -32,7 +32,8 @@ public interface StreamingResourceUploadContextListener { /** * Called when a streaming upload was requested by a client, but was refused by the server, for instance * due to quota restrictions. - * @param metadata the resource metadata associated with the rejected upload request + * + * @param metadata the resource metadata associated with the rejected upload request * @param reasonPhrase a human-readable explanation for refusing the upload */ void onResourceCreationRefused(StreamingResourceMetadata metadata, String reasonPhrase); diff --git a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContexts.java b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContexts.java index 0b574e8..1ffda35 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContexts.java +++ b/step-streaming-api-common/src/main/java/step/streaming/common/StreamingResourceUploadContexts.java @@ -115,7 +115,8 @@ public void onResourceCreated(String uploadContextId, String resourceId, Streami for (StreamingResourceUploadContextListener listener : getListeners(uploadContextId)) { try { listener.onResourceCreated(resourceId, metadata); - } catch (Exception ignored) {} // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. + } catch (Exception ignored) { + } // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. } } @@ -135,22 +136,25 @@ public void onResourceStatusChanged(String uploadContextId, String resourceId, S for (StreamingResourceUploadContextListener listener : getListeners(uploadContextId)) { try { listener.onResourceStatusChanged(resourceId, status); - } catch (Exception ignored) {} // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. + } catch (Exception ignored) { + } // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. } } /** * Notifies all registered listeners that an attempt to create a resource within the given context was refused. * This might happen for instance if there are quota restrictions on the number of permitted resources per context. + * * @param uploadContextId the upload context ID - * @param metadata metadata of the attempted upload - * @param reasonPhrase human-readable reason for refusing the upload + * @param metadata metadata of the attempted upload + * @param reasonPhrase human-readable reason for refusing the upload */ public void onResourceCreationRefused(String uploadContextId, StreamingResourceMetadata metadata, String reasonPhrase) { for (StreamingResourceUploadContextListener listener : getListeners(uploadContextId)) { try { listener.onResourceCreationRefused(metadata, reasonPhrase); - } catch (Exception ignored) {} // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. + } catch (Exception ignored) { + } // we don't have access to a logger here, but these methods must not throw exceptions anyway; this is just a safeguard. } } diff --git a/step-streaming-api-common/src/main/java/step/streaming/data/EndOfInputSignal.java b/step-streaming-api-common/src/main/java/step/streaming/data/EndOfInputSignal.java index 79fd0a9..5259005 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/data/EndOfInputSignal.java +++ b/step-streaming-api-common/src/main/java/step/streaming/data/EndOfInputSignal.java @@ -4,6 +4,7 @@ /** * Class for externally signalling that input is finished. + * * @see EndOfInputRequiringInputStream * @see CompletableFuture */ diff --git a/step-streaming-api-common/src/main/java/step/streaming/data/LiveFileInputStream.java b/step-streaming-api-common/src/main/java/step/streaming/data/LiveFileInputStream.java index 674db4a..dc66d91 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/data/LiveFileInputStream.java +++ b/step-streaming-api-common/src/main/java/step/streaming/data/LiveFileInputStream.java @@ -32,8 +32,8 @@ public class LiveFileInputStream extends EndOfInputRequiringInputStream { * Constructs a new {@code LiveFileInputStream} for the given file, using the provided signal * to determine when the end of the input is truly reached. * - * @param file the file to read from; must not be {@code null} - * @param finishedSignal the signal that determines when the file is finished being written + * @param file the file to read from; must not be {@code null} + * @param finishedSignal the signal that determines when the file is finished being written * @param pollIntervalMillis the polling interval in milliseconds to check the signal when at EOF * @throws IOException if the file cannot be opened for reading */ diff --git a/step-streaming-api-common/src/main/java/step/streaming/data/UTF8TranscodingTextInputStream.java b/step-streaming-api-common/src/main/java/step/streaming/data/UTF8TranscodingTextInputStream.java index 4eedcb8..7137a09 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/data/UTF8TranscodingTextInputStream.java +++ b/step-streaming-api-common/src/main/java/step/streaming/data/UTF8TranscodingTextInputStream.java @@ -23,8 +23,8 @@ public class UTF8TranscodingTextInputStream extends InputStream { private InputStream sourceStream; private final Charset declaredCharset; private final CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder() - .onMalformedInput(CodingErrorAction.REPLACE) - .onUnmappableCharacter(CodingErrorAction.REPLACE); + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); private final CharBuffer charBuffer = CharBuffer.allocate(1024); private final ByteBuffer byteBuffer = ByteBuffer.allocate(2048); @@ -67,9 +67,9 @@ public UTF8TranscodingTextInputStream(InputStream sourceStream, Charset declared private boolean charsetSupportsBOM(Charset charset) { return charset.equals(StandardCharsets.UTF_8) - || charset.equals(StandardCharsets.UTF_16) - || charset.equals(StandardCharsets.UTF_16LE) - || charset.equals(StandardCharsets.UTF_16BE); + || charset.equals(StandardCharsets.UTF_16) + || charset.equals(StandardCharsets.UTF_16LE) + || charset.equals(StandardCharsets.UTF_16BE); } // This method is called lazily on read (and will only perform actual initialization on first read). diff --git a/step-streaming-api-common/src/main/java/step/streaming/util/ExceptionsUtil.java b/step-streaming-api-common/src/main/java/step/streaming/util/ExceptionsUtil.java index ac2aba2..5a12cd8 100644 --- a/step-streaming-api-common/src/main/java/step/streaming/util/ExceptionsUtil.java +++ b/step-streaming-api-common/src/main/java/step/streaming/util/ExceptionsUtil.java @@ -45,7 +45,7 @@ public static T as(Exception e, Class targetType) { } throw new IllegalArgumentException( - "Cannot construct " + targetType.getName() + " to wrap " + e.getClass().getName(), e); + "Cannot construct " + targetType.getName() + " to wrap " + e.getClass().getName(), e); } private static T tryNew(Class type, Class[] sig, Object[] args) { diff --git a/step-streaming-api-common/src/test/java/step/streaming/data/LinebreakDetectingOutputStreamTests.java b/step-streaming-api-common/src/test/java/step/streaming/data/LinebreakDetectingOutputStreamTests.java index 1fe5dd2..8f0a91d 100644 --- a/step-streaming-api-common/src/test/java/step/streaming/data/LinebreakDetectingOutputStreamTests.java +++ b/step-streaming-api-common/src/test/java/step/streaming/data/LinebreakDetectingOutputStreamTests.java @@ -18,7 +18,7 @@ public void testLinebreakPositionsSingleBytes() throws IOException { List detectedPositions = new ArrayList<>(); try (LinebreakDetectingOutputStream stream = - new LinebreakDetectingOutputStream(target, detectedPositions::add)) { + new LinebreakDetectingOutputStream(target, detectedPositions::add)) { String input = "\nabc\ndef\nghi"; byte[] bytes = input.getBytes(StandardCharsets.UTF_8); @@ -40,7 +40,7 @@ public void testLinebreakPositionsBulkWrite() throws IOException { byte[] data = "line1\nline2\nno linebreak".getBytes(StandardCharsets.UTF_8); try (LinebreakDetectingOutputStream stream = - new LinebreakDetectingOutputStream(target, detectedPositions::add)) { + new LinebreakDetectingOutputStream(target, detectedPositions::add)) { stream.write(data, 0, data.length); } @@ -54,7 +54,7 @@ public void testMixedWrites() throws IOException { List detectedPositions = new ArrayList<>(); try (LinebreakDetectingOutputStream stream = - new LinebreakDetectingOutputStream(target, detectedPositions::add)) { + new LinebreakDetectingOutputStream(target, detectedPositions::add)) { stream.write("abc\n".getBytes(StandardCharsets.UTF_8)); stream.write('x'); @@ -73,7 +73,7 @@ public void testNoLinebreaks() throws IOException { List detectedPositions = new ArrayList<>(); try (LinebreakDetectingOutputStream stream = - new LinebreakDetectingOutputStream(target, detectedPositions::add)) { + new LinebreakDetectingOutputStream(target, detectedPositions::add)) { stream.write("abcdefg".getBytes(StandardCharsets.UTF_8)); } @@ -90,7 +90,7 @@ public void testMultipleLinebreaksInOneWrite() throws IOException { byte[] data = input.getBytes(StandardCharsets.UTF_8); try (LinebreakDetectingOutputStream stream = - new LinebreakDetectingOutputStream(target, detectedPositions::add)) { + new LinebreakDetectingOutputStream(target, detectedPositions::add)) { stream.write(data); } diff --git a/step-streaming-api-common/src/test/java/step/streaming/data/UTF8TranscodingTextInputStreamTests.java b/step-streaming-api-common/src/test/java/step/streaming/data/UTF8TranscodingTextInputStreamTests.java index 366a87c..10bc43c 100644 --- a/step-streaming-api-common/src/test/java/step/streaming/data/UTF8TranscodingTextInputStreamTests.java +++ b/step-streaming-api-common/src/test/java/step/streaming/data/UTF8TranscodingTextInputStreamTests.java @@ -34,8 +34,8 @@ public void testLatin1ToUtf8() throws Exception { public void testUtf16ToUtf8() throws Exception { // "こんにちは" (Hello in Japanese) in UTF-16LE byte[] utf16le = { - (byte) 0x53, (byte) 0x30, (byte) 0x93, (byte) 0x30, (byte) 0x6B, (byte) 0x30, - (byte) 0x61, (byte) 0x30, (byte) 0x6F, (byte) 0x30 + (byte) 0x53, (byte) 0x30, (byte) 0x93, (byte) 0x30, (byte) 0x6B, (byte) 0x30, + (byte) 0x61, (byte) 0x30, (byte) 0x6F, (byte) 0x30 }; InputStream in = new UTF8TranscodingTextInputStream(new ByteArrayInputStream(utf16le), StandardCharsets.UTF_16LE); String result = readAll(in); @@ -78,7 +78,6 @@ public void testMalformedUtf8InputIsReplaced() throws Exception { } - @Test public void testUtf8WithBom() throws Exception { byte[] bomUtf8 = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF, 'H', 'i'}; @@ -91,8 +90,8 @@ public void testUtf8WithBom() throws Exception { public void testUtf16LEWithBom() throws Exception { // UTF-16LE BOM + "Hi" byte[] bomUtf16LE = new byte[]{ - (byte) 0xFF, (byte) 0xFE, // BOM - 'H', 0x00, 'i', 0x00 // "H", "i" + (byte) 0xFF, (byte) 0xFE, // BOM + 'H', 0x00, 'i', 0x00 // "H", "i" }; InputStream in = new UTF8TranscodingTextInputStream(new ByteArrayInputStream(bomUtf16LE), StandardCharsets.UTF_16LE); String result = readAll(in); @@ -103,8 +102,8 @@ public void testUtf16LEWithBom() throws Exception { public void testUtf16LEWithConflictingBom() throws Exception { // UTF-16LE BOM + "Hi" byte[] bomUtf16LE = new byte[]{ - (byte) 0xFF, (byte) 0xFE, // BOM - 'H', 0x00, 'i', 0x00 // "H", "i" + (byte) 0xFF, (byte) 0xFE, // BOM + 'H', 0x00, 'i', 0x00 // "H", "i" }; // Wrong encoding given as argument! InputStream in = new UTF8TranscodingTextInputStream(new ByteArrayInputStream(bomUtf16LE), StandardCharsets.UTF_16BE); @@ -120,8 +119,8 @@ public void testUtf16LEWithConflictingBom() throws Exception { public void testUtf16BEWithBom() throws Exception { // UTF-16BE BOM + "Hi" byte[] bomUtf16BE = new byte[]{ - (byte) 0xFE, (byte) 0xFF, // BOM - 0x00, 'H', 0x00, 'i' // "H", "i" + (byte) 0xFE, (byte) 0xFF, // BOM + 0x00, 'H', 0x00, 'i' // "H", "i" }; InputStream in = new UTF8TranscodingTextInputStream(new ByteArrayInputStream(bomUtf16BE), StandardCharsets.UTF_16BE); String result = readAll(in); diff --git a/step-streaming-api-server/pom.xml b/step-streaming-api-server/pom.xml index 7e5981a..85b89cc 100644 --- a/step-streaming-api-server/pom.xml +++ b/step-streaming-api-server/pom.xml @@ -1,23 +1,23 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-api-server - ${project.groupId}:${project.artifactId} - jar + step-streaming-api-server + ${project.groupId}:${project.artifactId} + jar - - - ch.exense.streaming - step-streaming-api-common - - - \ No newline at end of file + + + ch.exense.streaming + step-streaming-api-common + + + diff --git a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourceStatusUpdate.java b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourceStatusUpdate.java index 7bf853f..13f4e04 100644 --- a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourceStatusUpdate.java +++ b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourceStatusUpdate.java @@ -39,9 +39,9 @@ public StreamingResourceStatus applyTo(StreamingResourceStatus status) { @Override public String toString() { return "StreamingResourceStatusUpdate{" + - "transferStatus=" + transferStatus + - ", currentSize=" + currentSize + - ", numberOfLines=" + numberOfLines + - '}'; + "transferStatus=" + transferStatus + + ", currentSize=" + currentSize + + ", numberOfLines=" + numberOfLines + + '}'; } } diff --git a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesCatalogBackend.java b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesCatalogBackend.java index 49a19b3..8d0ba99 100644 --- a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesCatalogBackend.java +++ b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesCatalogBackend.java @@ -4,7 +4,8 @@ import step.streaming.common.StreamingResourceStatus; import step.streaming.common.StreamingResourceUploadContext; -/** Catalog backend used on the server side. +/** + * Catalog backend used on the server side. * The catalog keeps track of resource metadata. */ public interface StreamingResourcesCatalogBackend { @@ -12,7 +13,7 @@ public interface StreamingResourcesCatalogBackend { /** * Creates a new resource and registers its metadata. * - * @param metadata resource metadata + * @param metadata resource metadata * @param uploadContext upload context, potentially null depending on configuration * @return a unique internal resource ID */ @@ -21,7 +22,7 @@ public interface StreamingResourcesCatalogBackend { /** * Updates the current transfer status and size of the resource. * - * @param resourceId internal resource identifier + * @param resourceId internal resource identifier * @param statusUpdate the update to perform on the object */ StreamingResourceStatus updateStatus(String resourceId, StreamingResourceStatusUpdate statusUpdate); @@ -36,6 +37,7 @@ public interface StreamingResourcesCatalogBackend { /** * Deletes a resource. + * * @param resourceId internal resource identifier */ void delete(String resourceId); diff --git a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesStorageBackend.java b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesStorageBackend.java index 31727b1..d44ba0c 100644 --- a/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesStorageBackend.java +++ b/step-streaming-api-server/src/main/java/step/streaming/server/StreamingResourcesStorageBackend.java @@ -28,7 +28,7 @@ public interface StreamingResourcesStorageBackend { /** * Prepares for writing data to a new resource. * - * @param resourceId internal resource identifier + * @param resourceId internal resource identifier * @param enableLineCounting flag to indicate whether line counting must be supported * @throws IOException if initialization fails */ @@ -70,6 +70,7 @@ public interface StreamingResourcesStorageBackend { /** * Handles a failed upload. Return value indicates whether the data still exists ({@code true}), or was deleted ({@code false}) + * * @param resourceId internal resource identifier * @return return value indicating if the data still exists */ diff --git a/step-streaming-impl-common/pom.xml b/step-streaming-impl-common/pom.xml index 243c4e7..bc3fc8b 100644 --- a/step-streaming-impl-common/pom.xml +++ b/step-streaming-impl-common/pom.xml @@ -1,28 +1,28 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-impl-common - ${project.groupId}:${project.artifactId} - jar + step-streaming-impl-common + ${project.groupId}:${project.artifactId} + jar - - - org.slf4j - slf4j-api - + + + org.slf4j + slf4j-api + - - ch.exense.streaming - step-streaming-api-common - - - \ No newline at end of file + + ch.exense.streaming + step-streaming-api-common + + + diff --git a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunk.java b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunk.java index 4ae9202..2aba77b 100644 --- a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunk.java +++ b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunk.java @@ -40,5 +40,6 @@ public static OutputStream getOutputStream(File file, long startPosition, boolea } // Utility class - private FileChunk() {} + private FileChunk() { + } } diff --git a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkInputStream.java b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkInputStream.java index d44073c..f6aed67 100644 --- a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkInputStream.java +++ b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkInputStream.java @@ -38,7 +38,8 @@ public FileChunkInputStream(File file, long start, long end) throws IOException if (end > raf.length()) { try { raf.close(); - } catch (IOException ignored) {} + } catch (IOException ignored) { + } throw new IllegalArgumentException("End position exceeds file length"); } diff --git a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkOutputStream.java b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkOutputStream.java index 3d3ad39..5fad0ef 100644 --- a/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkOutputStream.java +++ b/step-streaming-impl-common/src/main/java/step/streaming/data/FileChunkOutputStream.java @@ -23,7 +23,7 @@ public class FileChunkOutputStream extends OutputStream { * * @param file the file to write to * @param startPosition the position in the file to begin writing - * @param paranoidSync if {@code true}, "rwd" mode is used for the backing {@link RandomAccessFile}, otherwise "rw". + * @param paranoidSync if {@code true}, "rwd" mode is used for the backing {@link RandomAccessFile}, otherwise "rw". * @throws IOException if the file can't be opened or positioned */ public FileChunkOutputStream(File file, long startPosition, boolean paranoidSync) throws IOException { diff --git a/step-streaming-impl-server/pom.xml b/step-streaming-impl-server/pom.xml index a1fc5d7..ffd13ca 100644 --- a/step-streaming-impl-server/pom.xml +++ b/step-streaming-impl-server/pom.xml @@ -1,48 +1,48 @@ - 4.0.0 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-impl-server - ${project.groupId}:${project.artifactId} - jar + step-streaming-impl-server + ${project.groupId}:${project.artifactId} + jar - - - ch.exense.streaming - step-streaming-api-server - + + + ch.exense.streaming + step-streaming-api-server + - - ch.exense.streaming - step-streaming-impl-common - + + ch.exense.streaming + step-streaming-impl-common + - + - - - - org.apache.maven.plugins - maven-jar-plugin - 3.3.0 - - - test-jar - package - - test-jar - - - - - - - \ No newline at end of file + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + test-jar + package + + test-jar + + + + + + + diff --git a/step-streaming-impl-server/src/main/java/step/streaming/server/DefaultStreamingResourceManager.java b/step-streaming-impl-server/src/main/java/step/streaming/server/DefaultStreamingResourceManager.java index 5754352..366c54a 100644 --- a/step-streaming-impl-server/src/main/java/step/streaming/server/DefaultStreamingResourceManager.java +++ b/step-streaming-impl-server/src/main/java/step/streaming/server/DefaultStreamingResourceManager.java @@ -149,7 +149,7 @@ public long writeChunk(String resourceId, InputStream input, boolean isFinal) th private void persistAndNotifyOnSizeChange(String resourceId, long updatedSize, Long linebreakCount) throws QuotaExceededException, IOException { onSizeChanged(resourceId, updatedSize); StreamingResourceStatusUpdate update = new StreamingResourceStatusUpdate( - StreamingResourceTransferStatus.IN_PROGRESS, updatedSize, linebreakCount + StreamingResourceTransferStatus.IN_PROGRESS, updatedSize, linebreakCount ); logger.debug("Updating streaming resource: {}, statusUpdate={}", resourceId, update); StreamingResourceStatus status = catalog.updateStatus(resourceId, update); @@ -239,8 +239,8 @@ public InputStream openStream(String resourceId, long start, long end) throws IO @Override public void registerStatusListener(String resourceId, Consumer listener) { statusListeners - .computeIfAbsent(resourceId, k -> new CopyOnWriteArrayList<>()) - .add(listener); + .computeIfAbsent(resourceId, k -> new CopyOnWriteArrayList<>()) + .add(listener); logger.debug("Registered status listener for {}", resourceId); @@ -344,7 +344,7 @@ public List getLines(String resourceId, long startingLineIndex, long cou // Get required linebreaks -- this method performs validations and inserts an artificial LB at EOF if needed. List linebreakPositions = getLinebreakPositions(resourceId, firstLbIndex, linebreaksCount); - List lines = new ArrayList<>((int)count); + List lines = new ArrayList<>((int) count); // Determine the complete range of bytes we need to read: // Start **after** the previous linebreak (or at beginning of file) diff --git a/step-streaming-impl-server/src/main/java/step/streaming/server/FilesystemStreamingResourcesStorageBackend.java b/step-streaming-impl-server/src/main/java/step/streaming/server/FilesystemStreamingResourcesStorageBackend.java index 416d770..c08b0c1 100644 --- a/step-streaming-impl-server/src/main/java/step/streaming/server/FilesystemStreamingResourcesStorageBackend.java +++ b/step-streaming-impl-server/src/main/java/step/streaming/server/FilesystemStreamingResourcesStorageBackend.java @@ -152,12 +152,12 @@ public void writeChunk(String resourceId, InputStream input, ThrowingConsumer= 4 - ? hashedPath.substring(0, 2) + File.separator + hashedPath.substring(2, 4) - : "xx" + File.separator + "yy"; + ? hashedPath.substring(0, 2) + File.separator + hashedPath.substring(2, 4) + : "xx" + File.separator + "yy"; return new File(baseDirectory, relPath + File.separator + originalId + ".bin"); } diff --git a/step-streaming-impl-server/src/main/java/step/streaming/server/data/LinebreakIndexFile.java b/step-streaming-impl-server/src/main/java/step/streaming/server/data/LinebreakIndexFile.java index 0797844..81c3388 100644 --- a/step-streaming-impl-server/src/main/java/step/streaming/server/data/LinebreakIndexFile.java +++ b/step-streaming-impl-server/src/main/java/step/streaming/server/data/LinebreakIndexFile.java @@ -100,10 +100,10 @@ private void encodeToBuffer(long value) throws IOException { private long decodeFromBuffer() throws IOException { return ((long) (entryBuffer[0] & 0xFF) << 32) - | ((long) (entryBuffer[1] & 0xFF) << 24) - | ((long) (entryBuffer[2] & 0xFF) << 16) - | ((long) (entryBuffer[3] & 0xFF) << 8) - | ((long) (entryBuffer[4] & 0xFF)); + | ((long) (entryBuffer[1] & 0xFF) << 24) + | ((long) (entryBuffer[2] & 0xFF) << 16) + | ((long) (entryBuffer[3] & 0xFF) << 8) + | ((long) (entryBuffer[4] & 0xFF)); } @@ -140,8 +140,8 @@ public List getLinebreakPositions(long startIndex, long count) throws IOEx if (startIndex < 0 || count < 0 || startIndex + count > entries) { throw new IndexOutOfBoundsException(String.format( - "Invalid range: startIndex=%d, count=%d, totalEntries=%d. Valid range is [0, %d]", - startIndex, count, entries, entries + "Invalid range: startIndex=%d, count=%d, totalEntries=%d. Valid range is [0, %d]", + startIndex, count, entries, entries )); } diff --git a/step-streaming-impl-server/src/test/java/step/streaming/server/BackendTests.java b/step-streaming-impl-server/src/test/java/step/streaming/server/BackendTests.java index 8da81f0..57e38ce 100644 --- a/step-streaming-impl-server/src/test/java/step/streaming/server/BackendTests.java +++ b/step-streaming-impl-server/src/test/java/step/streaming/server/BackendTests.java @@ -23,8 +23,8 @@ public void cleanup() { public void testUploadWithTwoDownloads() throws Exception { StreamingResourcesCatalogBackend catalog = new InMemoryCatalogBackend(); StreamingResourceManager manager = new DefaultStreamingResourceManager(catalog, storage, - s -> null, - null + s -> null, + null ); storage.cleanup(); } diff --git a/step-streaming-impl-server/src/test/java/step/streaming/server/DefaultStreamingResourceManagerTest.java b/step-streaming-impl-server/src/test/java/step/streaming/server/DefaultStreamingResourceManagerTest.java index 6b44c26..fe41e03 100644 --- a/step-streaming-impl-server/src/test/java/step/streaming/server/DefaultStreamingResourceManagerTest.java +++ b/step-streaming-impl-server/src/test/java/step/streaming/server/DefaultStreamingResourceManagerTest.java @@ -33,8 +33,8 @@ public void setUp() throws IOException { storageBackend = new TestingStorageBackend(StreamingResourcesStorageBackend.DEFAULT_NOTIFY_INTERVAL_MILLIS, false); catalogBackend = new InMemoryCatalogBackend(); manager = new DefaultStreamingResourceManager(catalogBackend, storageBackend, - s -> null, - null + s -> null, + null ); } diff --git a/step-streaming-impl-server/src/test/java/step/streaming/server/data/LinebreakIndexFileTests.java b/step-streaming-impl-server/src/test/java/step/streaming/server/data/LinebreakIndexFileTests.java index 60b4a0a..bd3266a 100644 --- a/step-streaming-impl-server/src/test/java/step/streaming/server/data/LinebreakIndexFileTests.java +++ b/step-streaming-impl-server/src/test/java/step/streaming/server/data/LinebreakIndexFileTests.java @@ -116,7 +116,7 @@ public void testConcurrentReadWhileWriting() throws Exception { long tailStart = Math.max(0, available - assertTailLength); long tailCount = available - tailStart; List actual = reader - .getLinebreakPositions(tailStart, (int) tailCount); + .getLinebreakPositions(tailStart, (int) tailCount); // System.out.println(actual); // in case someone wants to debug :-D -- examples: // beginning: [1000, 1010, 1020, 1030, 1040, 1050] @@ -131,12 +131,12 @@ public void testConcurrentReadWhileWriting() throws Exception { long tailStart = Math.max(0, available - assertTailLength); long tailCount = available - tailStart; List actualFromLongLived = longLivedReader - .getLinebreakPositions(tailStart, (int) tailCount); + .getLinebreakPositions(tailStart, (int) tailCount); for (int i = 0; i < actualFromLongLived.size(); i++) { long expected = baseOffset + (tailStart + i) * 10L; assertEquals("Mismatch from long-lived reader at index " + (tailStart + i), - expected, (long) actualFromLongLived.get(i)); + expected, (long) actualFromLongLived.get(i)); } } @@ -166,7 +166,7 @@ public void testConcurrentReadWhileWriting() throws Exception { @Test public void performanceTest() throws Exception { long count = 1_000_000L; - try(LinebreakIndexFile index = new LinebreakIndexFile(rawIndexFile, 0, LinebreakIndexFile.Mode.WRITE)) { + try (LinebreakIndexFile index = new LinebreakIndexFile(rawIndexFile, 0, LinebreakIndexFile.Mode.WRITE)) { long d = System.currentTimeMillis(); for (long i = 0; i < count; ++i) { Assert.assertEquals(i + 1, index.addLinebreakPosition(i)); diff --git a/step-streaming-impl-server/src/test/java/step/streaming/server/test/InMemoryCatalogBackend.java b/step-streaming-impl-server/src/test/java/step/streaming/server/test/InMemoryCatalogBackend.java index e5ff785..41be298 100644 --- a/step-streaming-impl-server/src/test/java/step/streaming/server/test/InMemoryCatalogBackend.java +++ b/step-streaming-impl-server/src/test/java/step/streaming/server/test/InMemoryCatalogBackend.java @@ -31,9 +31,9 @@ public static class CatalogEntry { public String createResource(StreamingResourceMetadata metadata, StreamingResourceUploadContext uploadContext) { String resourceId = UUID.randomUUID().toString(); catalog.put(resourceId, new CatalogEntry( - metadata.getFilename(), - metadata.getMimeType(), - new StreamingResourceStatus(StreamingResourceTransferStatus.INITIATED, 0L, metadata.getSupportsLineAccess() ? 0L: null) + metadata.getFilename(), + metadata.getMimeType(), + new StreamingResourceStatus(StreamingResourceTransferStatus.INITIATED, 0L, metadata.getSupportsLineAccess() ? 0L : null) )); return resourceId; } diff --git a/step-streaming-impl-server/src/test/java/step/streaming/server/test/TestingStorageBackend.java b/step-streaming-impl-server/src/test/java/step/streaming/server/test/TestingStorageBackend.java index bf5df25..8b35ec7 100644 --- a/step-streaming-impl-server/src/test/java/step/streaming/server/test/TestingStorageBackend.java +++ b/step-streaming-impl-server/src/test/java/step/streaming/server/test/TestingStorageBackend.java @@ -53,15 +53,15 @@ private void deleteRecursively(Path path) throws IOException { if (!Files.exists(path)) return; Files.walk(path) - .sorted((a, b) -> b.compareTo(a)) // delete children before parents - .forEach(p -> { - try { - logger.debug("Cleaning up: deleting {} {}", p.toFile().isDirectory() ? "directory": "file", p); - Files.delete(p); - } catch (IOException e) { - throw new RuntimeException("Failed to delete: " + p, e); - } - }); + .sorted((a, b) -> b.compareTo(a)) // delete children before parents + .forEach(p -> { + try { + logger.debug("Cleaning up: deleting {} {}", p.toFile().isDirectory() ? "directory" : "file", p); + Files.delete(p); + } catch (IOException e) { + throw new RuntimeException("Failed to delete: " + p, e); + } + }); } /** diff --git a/step-streaming-impl-websocket/pom.xml b/step-streaming-impl-websocket/pom.xml index cbb2d60..5735ca7 100644 --- a/step-streaming-impl-websocket/pom.xml +++ b/step-streaming-impl-websocket/pom.xml @@ -2,22 +2,22 @@ - 4.0.0 + 4.0.0 - - ch.exense.streaming - step-streaming - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming + 0.0.0-SNAPSHOT + - step-streaming-impl-websocket - ${project.groupId}:${project.artifactId} - pom + step-streaming-impl-websocket + ${project.groupId}:${project.artifactId} + pom - - step-streaming-impl-websocket-common - step-streaming-impl-websocket-server - step-streaming-impl-websocket-client-upload - step-streaming-impl-websocket-client-download - - \ No newline at end of file + + step-streaming-impl-websocket-common + step-streaming-impl-websocket-server + step-streaming-impl-websocket-client-upload + step-streaming-impl-websocket-client-download + + diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/pom.xml b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/pom.xml index 94b1e81..9f7d0a3 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/pom.xml +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/pom.xml @@ -2,35 +2,35 @@ - 4.0.0 + 4.0.0 - step-streaming-impl-websocket-client-download - ${project.groupId}:${project.artifactId} - jar + step-streaming-impl-websocket-client-download + ${project.groupId}:${project.artifactId} + jar - - ch.exense.streaming - step-streaming-impl-websocket - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming-impl-websocket + 0.0.0-SNAPSHOT + - - - ch.exense.streaming - step-streaming-api-client-download - + + + ch.exense.streaming + step-streaming-api-client-download + - - ch.exense.streaming - step-streaming-impl-websocket-common - + + ch.exense.streaming + step-streaming-impl-websocket-common + - - org.eclipse.jetty.websocket - websocket-jakarta-client - + + org.eclipse.jetty.websocket + websocket-jakarta-client + - + - \ No newline at end of file + diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownload.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownload.java index d8952d5..aa22fb1 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownload.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownload.java @@ -105,7 +105,7 @@ private void onClientClose() { * Fails if another chunk stream is still active and open. * * @param startOffset the starting byte offset (inclusive) - * @param endOffset the ending byte offset (exclusive) + * @param endOffset the ending byte offset (exclusive) * @return an input stream for the requested chunk * @throws IOException if the chunk range is invalid or the transfer cannot proceed */ @@ -232,9 +232,9 @@ private TrackablePipedInputStream getStreamForStatus(StreamingResourceStatus sta return getChunkStream(bytesRead, status.getCurrentSize()); } if (status.getTransferStatus() == StreamingResourceTransferStatus.COMPLETED - || status.getTransferStatus() == StreamingResourceTransferStatus.FAILED) { + || status.getTransferStatus() == StreamingResourceTransferStatus.FAILED) { logger.debug("Transfer status is {} and all bytes were received, signaling end of data", - status.getTransferStatus()); + status.getTransferStatus()); return null; // normal EOF } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownloadClient.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownloadClient.java index 075577b..dccfff1 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownloadClient.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-download/src/main/java/step/streaming/client/download/WebsocketDownloadClient.java @@ -50,8 +50,10 @@ private enum State {CREATED, READY, AWAITING_DOWNLOAD, DOWNLOADING, CLOSED} // Request queue (serialized on event loop) private abstract static class Request { abstract void send(Session s) throws Exception; + abstract String hint(); } + private final Deque requests = new ArrayDeque<>(); private final AtomicLong requestIds = new AtomicLong(); private volatile long inFlightReqId; @@ -77,8 +79,8 @@ public WebsocketDownloadClient(URI endpointUri, WebSocketContainer container) th try { endpoint = new Remote(); session = container.connectToServer(endpoint, - ClientEndpointConfig.Builder.create().build(), - endpointUri); + ClientEndpointConfig.Builder.create().build(), + endpointUri); logger.info("Connected to {}, waiting for initial status...", endpointUri); try { initialStatus.get(30, TimeUnit.SECONDS); @@ -118,7 +120,7 @@ public void close() { state = State.CLOSED; if (session.isOpen()) { endpoint.closeSession(session, CloseReasonUtil.makeSafeCloseReason( - CloseReason.CloseCodes.NORMAL_CLOSURE, "Client Session closed")); + CloseReason.CloseCodes.NORMAL_CLOSURE, "Client Session closed")); } requests.clear(); // We will shut down the executors in onSessionClose after fan-out. @@ -137,7 +139,8 @@ public void requestChunkStream(long startOffset, long endOffset, Consumer { if (state == State.CLOSED) return; logger.debug("[TEXT IN] state={}, busy={}, q={}, inFlightReqId={}, loop=ws-dl-client-loop", - state, busy, requests.size(), inFlightReqId); + state, busy, requests.size(), inFlightReqId); DownloadServerMessage msg = DownloadServerMessage.fromString(messageString); if (msg instanceof StatusChangedMessage) { @@ -320,7 +332,7 @@ private void sendNow(Request req) { private void finishCurrent() { logger.debug("[FINISH start] state={}, busy={}, queueSize={}, inFlightReqId={}", - state, busy, requests.size(), inFlightReqId); + state, busy, requests.size(), inFlightReqId); state = State.READY; Request next = requests.pollFirst(); if (next != null) { diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/pom.xml b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/pom.xml index 663dc2f..665fc1e 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/pom.xml +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/pom.xml @@ -2,35 +2,35 @@ - 4.0.0 + 4.0.0 - step-streaming-impl-websocket-client-upload - ${project.groupId}:${project.artifactId} - jar + step-streaming-impl-websocket-client-upload + ${project.groupId}:${project.artifactId} + jar - - ch.exense.streaming - step-streaming-impl-websocket - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming-impl-websocket + 0.0.0-SNAPSHOT + - - - ch.exense.streaming - step-streaming-api-client-upload - + + + ch.exense.streaming + step-streaming-api-client-upload + - - ch.exense.streaming - step-streaming-impl-websocket-common - + + ch.exense.streaming + step-streaming-impl-websocket-common + - - org.eclipse.jetty.websocket - websocket-jakarta-client - + + org.eclipse.jetty.websocket + websocket-jakarta-client + - + - \ No newline at end of file + diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/src/main/java/step/streaming/websocket/client/upload/WebsocketUploadClient.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/src/main/java/step/streaming/websocket/client/upload/WebsocketUploadClient.java index 9c8a379..c9fb39b 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/src/main/java/step/streaming/websocket/client/upload/WebsocketUploadClient.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-client-upload/src/main/java/step/streaming/websocket/client/upload/WebsocketUploadClient.java @@ -116,7 +116,7 @@ private void sendText(String text) throws IOException { if (e instanceof ClosedChannelException || e.getCause() instanceof ClosedChannelException) { if (closeReason != null) { throw new IOException( - "WebSocket closed by server: " + closeReason, e + "WebSocket closed by server: " + closeReason, e ); } else { throw new IOException("WebSocket channel closed unexpectedly", e); diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/pom.xml b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/pom.xml index f1302b3..995555f 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/pom.xml +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/pom.xml @@ -2,50 +2,50 @@ - 4.0.0 - - step-streaming-impl-websocket-common - ${project.groupId}:${project.artifactId} - jar - - - ch.exense.streaming - step-streaming-impl-websocket - 0.0.0-SNAPSHOT - - - - - ch.exense.streaming - step-streaming-api-common - - - - ch.exense.streaming - step-streaming-impl-common - - - - com.fasterxml.jackson.core - jackson-annotations - - - - com.fasterxml.jackson.core - jackson-databind - - - - jakarta.websocket - jakarta.websocket-api - - - - org.eclipse.jetty.toolchain - jetty-jakarta-websocket-api - - 2.0.0 - - - - \ No newline at end of file + 4.0.0 + + step-streaming-impl-websocket-common + ${project.groupId}:${project.artifactId} + jar + + + ch.exense.streaming + step-streaming-impl-websocket + 0.0.0-SNAPSHOT + + + + + ch.exense.streaming + step-streaming-api-common + + + + ch.exense.streaming + step-streaming-impl-common + + + + com.fasterxml.jackson.core + jackson-annotations + + + + com.fasterxml.jackson.core + jackson-databind + + + + jakarta.websocket + jakarta.websocket-api + + + + org.eclipse.jetty.toolchain + jetty-jakarta-websocket-api + + 2.0.0 + + + + diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/CloseReasonUtil.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/CloseReasonUtil.java index 853f9bd..f7e1257 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/CloseReasonUtil.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/CloseReasonUtil.java @@ -23,7 +23,9 @@ */ public class CloseReasonUtil { - /** Maximum byte length of the reason phrase, as defined in the specification. */ + /** + * Maximum byte length of the reason phrase, as defined in the specification. + */ public static final int MAX_REASON_BYTES = 123; diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/download/DownloadProtocolMessage.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/download/DownloadProtocolMessage.java index 1130b2e..109cb78 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/download/DownloadProtocolMessage.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/download/DownloadProtocolMessage.java @@ -5,7 +5,8 @@ import java.util.concurrent.atomic.AtomicBoolean; -/** Protocol messages used for downloads. +/** + * Protocol messages used for downloads. * Only for clarity and type-safety, an additional layer of (abstract) classes named * {@link DownloadServerMessage} and {@link DownloadClientMessage} exists to * indicate which end of the connection produces the messages. @@ -20,10 +21,10 @@ public class DownloadProtocolMessage extends ProtocolMessage { public static void initialize() { if (!registered.getAndSet(true)) { registerSubtypes( - new NamedType(StatusChangedMessage.class, "StatusChanged"), - new NamedType(RequestChunkMessage.class, "RequestChunk"), - new NamedType(RequestLinesMessage.class, "RequestLines"), - new NamedType(LinesMessage.class, "Lines") + new NamedType(StatusChangedMessage.class, "StatusChanged"), + new NamedType(RequestChunkMessage.class, "RequestChunk"), + new NamedType(RequestLinesMessage.class, "RequestLines"), + new NamedType(LinesMessage.class, "Lines") ); } } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/upload/UploadProtocolMessage.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/upload/UploadProtocolMessage.java index f234f80..f452677 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/upload/UploadProtocolMessage.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-common/src/main/java/step/streaming/websocket/protocol/upload/UploadProtocolMessage.java @@ -5,7 +5,8 @@ import java.util.concurrent.atomic.AtomicBoolean; -/** Protocol messages used for uploads. +/** + * Protocol messages used for uploads. * Only for clarity and type-safety, an additional layer of (abstract) classes named * {@link UploadServerMessage} and {@link UploadClientMessage} exists to * indicate which end of the connection produces the messages. @@ -14,6 +15,7 @@ public class UploadProtocolMessage extends ProtocolMessage { public static final String CLOSEREASON_PHRASE_UPLOAD_COMPLETED = "Upload completed"; private static final AtomicBoolean registered = new AtomicBoolean(false); + static { initialize(); } @@ -21,12 +23,12 @@ public class UploadProtocolMessage extends ProtocolMessage { public static void initialize() { if (!registered.getAndSet(true)) { registerSubtypes( - // unfortunately we cannot put the names as constants in the subclasses, as - // that leads to a "potential JVM deadlock" warning - new NamedType(StartUploadMessage.class, "StartUpload"), - new NamedType(ReadyForUploadMessage.class, "ReadyForUpload"), - new NamedType(FinishUploadMessage.class, "FinishUpload"), - new NamedType(UploadAcknowledgedMessage.class, "UploadAcknowledged") + // unfortunately we cannot put the names as constants in the subclasses, as + // that leads to a "potential JVM deadlock" warning + new NamedType(StartUploadMessage.class, "StartUpload"), + new NamedType(ReadyForUploadMessage.class, "ReadyForUpload"), + new NamedType(FinishUploadMessage.class, "FinishUpload"), + new NamedType(UploadAcknowledgedMessage.class, "UploadAcknowledged") ); } } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/pom.xml b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/pom.xml index 9f7340b..55ff4cb 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/pom.xml +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/pom.xml @@ -2,89 +2,89 @@ - 4.0.0 + 4.0.0 - step-streaming-impl-websocket-server - ${project.groupId}:${project.artifactId} - jar + step-streaming-impl-websocket-server + ${project.groupId}:${project.artifactId} + jar - - ch.exense.streaming - step-streaming-impl-websocket - 0.0.0-SNAPSHOT - + + ch.exense.streaming + step-streaming-impl-websocket + 0.0.0-SNAPSHOT + - - - ch.exense.streaming - step-streaming-impl-websocket-common - + + + ch.exense.streaming + step-streaming-impl-websocket-common + - - ch.exense.streaming - step-streaming-api-server - + + ch.exense.streaming + step-streaming-api-server + - - org.eclipse.jetty.websocket - websocket-jakarta-server - + + org.eclipse.jetty.websocket + websocket-jakarta-server + - - - org.eclipse.jetty.websocket - websocket-jetty-server - test - + + + org.eclipse.jetty.websocket + websocket-jetty-server + test + - - org.eclipse.jetty.websocket - websocket-core-server - test - + + org.eclipse.jetty.websocket + websocket-core-server + test + - - ch.exense.streaming - step-streaming-impl-server - test - + + ch.exense.streaming + step-streaming-impl-server + test + - - ch.exense.streaming - step-streaming-impl-server - ${project.version} - tests - test - + + ch.exense.streaming + step-streaming-impl-server + ${project.version} + tests + test + - - ch.exense.streaming - step-streaming-impl-websocket-client-upload - test - + + ch.exense.streaming + step-streaming-impl-websocket-client-upload + test + - - ch.exense.streaming - step-streaming-impl-websocket-client-download - test - + + ch.exense.streaming + step-streaming-impl-websocket-client-download + test + - + - - - - org.apache.maven.plugins - maven-surefire-plugin - - - none - 1 - false - - - - + + + + org.apache.maven.plugins + maven-surefire-plugin + + + none + 1 + false + + + + - \ No newline at end of file + diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketDownloadEndpoint.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketDownloadEndpoint.java index 36085f3..19a861f 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketDownloadEndpoint.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketDownloadEndpoint.java @@ -141,7 +141,7 @@ private void sendStatusUpdate(StreamingResourceStatus status) { logger.debug("Notification sent."); } else { logger.debug("Status notify failed (possibly client closed): {}", - result.getException() != null ? result.getException().toString() : "unknown"); + result.getException() != null ? result.getException().toString() : "unknown"); } }); } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketUploadEndpoint.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketUploadEndpoint.java index 16bf70e..b99dfeb 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketUploadEndpoint.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/main/java/step/streaming/websocket/server/WebsocketUploadEndpoint.java @@ -61,12 +61,12 @@ public WebsocketUploadEndpoint(StreamingResourceManager manager, WebsocketServer public void onOpen(Session session, EndpointConfig config) { this.session = session; session.getRequestParameterMap() - .getOrDefault(StreamingResourceUploadContext.PARAMETER_NAME, List.of()) - .stream().findFirst().ifPresent(ctx -> uploadContextId = ctx); + .getOrDefault(StreamingResourceUploadContext.PARAMETER_NAME, List.of()) + .stream().findFirst().ifPresent(ctx -> uploadContextId = ctx); if (manager.isUploadContextRequired() && uploadContextId == null) { closeSession(session, CloseReasonUtil.makeSafeCloseReason( - CloseReason.CloseCodes.VIOLATED_POLICY, - "Missing parameter " + StreamingResourceUploadContext.PARAMETER_NAME)); + CloseReason.CloseCodes.VIOLATED_POLICY, + "Missing parameter " + StreamingResourceUploadContext.PARAMETER_NAME)); return; } Optional.ofNullable(sessionsHandler).ifPresent(handler -> handler.register(session)); @@ -75,14 +75,14 @@ public void onOpen(Session session, EndpointConfig config) { // conceptually the same as reading the data directly from a stream, but better for // performance as it does not block Jetty threads. session.addMessageHandler( - ByteBuffer.class, - // DO NOT REPLACE WITH A LAMBDA -- this will trip up Jetty at runtime - new MessageHandler.Partial() { - @Override - public void onMessage(ByteBuffer data, boolean last) { - onDataPartial(data, last); - } + ByteBuffer.class, + // DO NOT REPLACE WITH A LAMBDA -- this will trip up Jetty at runtime + new MessageHandler.Partial() { + @Override + public void onMessage(ByteBuffer data, boolean last) { + onDataPartial(data, last); } + } ); logger.debug("Session opened: {}", session.getId()); @@ -96,13 +96,13 @@ private void closeSession(Throwable exception) { if (exception instanceof QuotaExceededException) { // this one is handled specially by the client, and also logged again with a shorter warning on the server closeSession(session, CloseReasonUtil.makeSafeCloseReason( - CloseReason.CloseCodes.VIOLATED_POLICY, - "QuotaExceededException: " + exception.getMessage())); + CloseReason.CloseCodes.VIOLATED_POLICY, + "QuotaExceededException: " + exception.getMessage())); } else { logger.error("Closing Websocket Session for resource {} with error: {}", resourceId, exception.getMessage(), exception); closeSession(session, CloseReasonUtil.makeSafeCloseReason( - CloseReason.CloseCodes.UNEXPECTED_CONDITION, - exception.getMessage())); + CloseReason.CloseCodes.UNEXPECTED_CONDITION, + exception.getMessage())); } } @@ -145,8 +145,8 @@ private void onMessage(String messageString) { } if (!ack.checksum.equals(finishMessage.checksum)) { closeSession(new RuntimeException( - String.format("Checksum mismatch after upload! Client checksum=%s, server checksum=%s", - finishMessage.checksum, ack.checksum))); + String.format("Checksum mismatch after upload! Client checksum=%s, server checksum=%s", + finishMessage.checksum, ack.checksum))); return; } var finalStatus = manager.markCompleted(resourceId); @@ -206,21 +206,21 @@ public void onSessionClose(Session session, CloseReason closeReason) { super.onClose(session, closeReason); if (resourceId != null) { if (closeReason.getCloseCode() == CloseReason.CloseCodes.NORMAL_CLOSURE - && closeReason.getReasonPhrase().equals(UploadProtocolMessage.CLOSEREASON_PHRASE_UPLOAD_COMPLETED) - && state == State.FINISHED) { + && closeReason.getReasonPhrase().equals(UploadProtocolMessage.CLOSEREASON_PHRASE_UPLOAD_COMPLETED) + && state == State.FINISHED) { logger.debug("Session closed: {}, resourceId={}, reason={}", - session.getId(), resourceId, closeReason); + session.getId(), resourceId, closeReason); } else { StreamingResourceTransferStatus status = manager.getStatus(resourceId).getTransferStatus(); logger.warn("Upload session for resource {}, state {} was closed with reason {}, resource was left with status {}; marking upload as failed.", - resourceId, state, closeReason, status); + resourceId, state, closeReason, status); if (uploadPipeline != null) { // 1) ensure no more input is expected; let consumer drain what's already queued uploadPipeline.closeInput(); try { (uploadAcknowledgedMessage != null ? uploadAcknowledgedMessage : CompletableFuture.completedFuture(null)) - .get(ABNORMAL_CLOSE_DRAIN_TIMEOUT_MS, java.util.concurrent.TimeUnit.MILLISECONDS); + .get(ABNORMAL_CLOSE_DRAIN_TIMEOUT_MS, java.util.concurrent.TimeUnit.MILLISECONDS); } catch (Exception ignoredBecauseWereFailingAnyway) { } } @@ -229,7 +229,7 @@ public void onSessionClose(Session session, CloseReason closeReason) { } } else { logger.warn("Incomplete session (no resource ID) closed: session={}, state={}, reason={}", - session.getId(), state, closeReason); + session.getId(), state, closeReason); } } @@ -304,7 +304,7 @@ private void processChunks(List chunks) { try { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); long processed = 0; - for (byte[] chunk: chunks) { + for (byte[] chunk : chunks) { md5.update(chunk); bytes.write(chunk); processed += chunk.length; @@ -337,7 +337,7 @@ private static String toHex(byte[] digest) { StringBuilder sb = new StringBuilder(digest.length * 2); for (byte b : digest) { sb.append(Character.forDigit((b >>> 4) & 0xF, 16)) - .append(Character.forDigit(b & 0xF, 16)); + .append(Character.forDigit(b & 0xF, 16)); } return sb.toString(); } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/server/upload/StreamingResourceEndpointTests.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/server/upload/StreamingResourceEndpointTests.java index ef06ba8..00adebf 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/server/upload/StreamingResourceEndpointTests.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/server/upload/StreamingResourceEndpointTests.java @@ -94,8 +94,8 @@ public void setUp() throws Exception { catalogBackend = new InMemoryCatalogBackend(); referenceProducer = new URITemplateBasedReferenceProducer(null, WebsocketDownloadEndpoint.DEFAULT_ENDPOINT_URL, WebsocketDownloadEndpoint.DEFAULT_PARAMETER_NAME); manager = new TestingResourceManager(catalogBackend, storageBackend, - referenceProducer, - new StreamingResourceUploadContexts() + referenceProducer, + new StreamingResourceUploadContexts() ); server = new TestingWebsocketServer().withEndpointConfigs(uploadConfig(), downloadConfig()).start(); referenceProducer.setBaseUri(server.getURI()); @@ -114,24 +114,24 @@ public void tearDown() throws Exception { private ServerEndpointConfig uploadConfig() { return ServerEndpointConfig.Builder.create(WebsocketUploadEndpoint.class, WebsocketUploadEndpoint.DEFAULT_ENDPOINT_URL) - .configurator(new ServerEndpointConfig.Configurator() { - @Override - public T getEndpointInstance(Class endpointClass) throws InstantiationException { - return endpointClass.cast(new WebsocketUploadEndpoint(manager, sessionsHandler)); - } - }) - .build(); + .configurator(new ServerEndpointConfig.Configurator() { + @Override + public T getEndpointInstance(Class endpointClass) throws InstantiationException { + return endpointClass.cast(new WebsocketUploadEndpoint(manager, sessionsHandler)); + } + }) + .build(); } private ServerEndpointConfig downloadConfig() { return ServerEndpointConfig.Builder.create(WebsocketDownloadEndpoint.class, WebsocketDownloadEndpoint.DEFAULT_ENDPOINT_URL) - .configurator(new ServerEndpointConfig.Configurator() { - @Override - public T getEndpointInstance(Class endpointClass) throws InstantiationException { - return endpointClass.cast(new WebsocketDownloadEndpoint(manager, sessionsHandler, WebsocketDownloadEndpoint.DEFAULT_PARAMETER_NAME)); - } - }) - .build(); + .configurator(new ServerEndpointConfig.Configurator() { + @Override + public T getEndpointInstance(Class endpointClass) throws InstantiationException { + return endpointClass.cast(new WebsocketDownloadEndpoint(manager, sessionsHandler, WebsocketDownloadEndpoint.DEFAULT_PARAMETER_NAME)); + } + }) + .build(); } @Test @@ -227,9 +227,9 @@ public void testHighLevelUploadWithSimultaneousDownloadsRandomData() throws Exce AtomicReference downloadChecksum = new AtomicReference<>(); Thread downloadThread = new Thread(() -> { try ( - CheckpointingOutputStream out = new CheckpointingOutputStream(OutputStream.nullOutputStream(), 500, read -> - logger.info("Download status: {} bytes currently transferred", read)); - MD5CalculatingInputStream downloadStream = new MD5CalculatingInputStream(download.getInputStream()); + CheckpointingOutputStream out = new CheckpointingOutputStream(OutputStream.nullOutputStream(), 500, read -> + logger.info("Download status: {} bytes currently transferred", read)); + MD5CalculatingInputStream downloadStream = new MD5CalculatingInputStream(download.getInputStream()); ) { long downloadSize = downloadStream.transferTo(out); downloadStream.close(); @@ -272,9 +272,9 @@ public void testHighLevelUploadWithSimultaneousDownloadsWithTextConversion() thr AtomicReference downloadChecksum = new AtomicReference<>(); Thread downloadThread = new Thread(() -> { try ( - CheckpointingOutputStream out = new CheckpointingOutputStream(OutputStream.nullOutputStream(), 500, read -> - logger.info("Download status: {} bytes currently transferred", read)); - MD5CalculatingInputStream downloadStream = new MD5CalculatingInputStream(download.getInputStream()); + CheckpointingOutputStream out = new CheckpointingOutputStream(OutputStream.nullOutputStream(), 500, read -> + logger.info("Download status: {} bytes currently transferred", read)); + MD5CalculatingInputStream downloadStream = new MD5CalculatingInputStream(download.getInputStream()); ) { long downloadSize = downloadStream.transferTo(out); downloadStream.close(); diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/ThreadPools.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/ThreadPools.java index 3e32504..9119736 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/ThreadPools.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/ThreadPools.java @@ -18,11 +18,11 @@ public static ExecutorService createPoolExecutor(String threadPrefix, int poolSi public static ExecutorService createPoolExecutor(String threadPrefix, int poolSize, int queueCapacity) { return new ThreadPoolExecutor( - poolSize, - poolSize, - 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(queueCapacity), - namedDaemon(threadPrefix) + poolSize, + poolSize, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), + namedDaemon(threadPrefix) ); } diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/TricklingDelegatingInputStream.java b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/TricklingDelegatingInputStream.java index 823b750..74c9367 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/TricklingDelegatingInputStream.java +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/java/step/streaming/websocket/test/TricklingDelegatingInputStream.java @@ -1,4 +1,5 @@ package step.streaming.websocket.test; + import java.io.IOException; import java.io.InputStream; import java.util.Objects; @@ -21,10 +22,10 @@ public class TricklingDelegatingInputStream extends InputStream { /** * Constructs a TricklingDelegatingInputStream. * - * @param source the underlying InputStream to read from - * @param totalBytes the total number of bytes expected from the source - * @param duration the duration over which to trickle the data - * @param timeUnit the time unit of the duration + * @param source the underlying InputStream to read from + * @param totalBytes the total number of bytes expected from the source + * @param duration the duration over which to trickle the data + * @param timeUnit the time unit of the duration */ public TricklingDelegatingInputStream(InputStream source, long totalBytes, long duration, TimeUnit timeUnit) { this.source = Objects.requireNonNull(source, "source InputStream cannot be null"); diff --git a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/resources/logback-test.xml b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/resources/logback-test.xml index a877f13..9000d71 100644 --- a/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/resources/logback-test.xml +++ b/step-streaming-impl-websocket/step-streaming-impl-websocket-server/src/test/resources/logback-test.xml @@ -1,17 +1,17 @@ - - - - ${LOG_PATTERN} - - + + + + ${LOG_PATTERN} + + - - - + + + - - - + + +