11/*
2- * Copyright 2016-2017 the original author or authors.
2+ * Copyright 2016-2019 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
6969 * invocation. Consider to use an async upstream hand off if this blocking behavior isn't appropriate.
7070 * <p>
7171 * The "request-reply" behavior is async and the {@link Transfer} result from the {@link TransferManager}
72- * operation is sent to the {@link #outputChannel }, assuming the transfer progress observation in the
72+ * operation is sent to the {@link #getOutputChannel() }, assuming the transfer progress observation in the
7373 * downstream flow.
7474 * <p>
7575 * The {@link S3ProgressListener} can be supplied to track the transfer progress.
7676 * Also the listener can be populated into the returned {@link Transfer} afterwards in the downstream flow.
77+ * If the context of the {@code requestMessage} is important in the {@code progressChanged} event, it is
78+ * recommended to use a {@link MessageS3ProgressListener} implementation instead.
7779 * * <p>
7880 * For the upload operation the {@link UploadMetadataProvider} callback can be supplied to populate required
7981 * {@link ObjectMetadata} options, as for a single entry, as well as for each file in directory to upload.
@@ -131,12 +133,8 @@ public S3MessageHandler(AmazonS3 amazonS3, Expression bucketExpression) {
131133 }
132134
133135 public S3MessageHandler (AmazonS3 amazonS3 , String bucket , boolean produceReply ) {
134- this (TransferManagerBuilder .standard ()
135- .withS3Client (amazonS3 )
136- .build (),
137- bucket ,
138- produceReply );
139- Assert .notNull (amazonS3 , "'amazonS3' must not be null" );
136+ this (amazonS3 , new LiteralExpression (bucket ), produceReply );
137+ Assert .notNull (bucket , "'bucket' must not be null" );
140138 }
141139
142140 public S3MessageHandler (AmazonS3 amazonS3 , Expression bucketExpression , boolean produceReply ) {
@@ -228,6 +226,7 @@ public void setDestinationKeyExpression(Expression destinationKeyExpression) {
228226 /**
229227 * Specify a {@link S3ProgressListener} for upload and download operations.
230228 * @param s3ProgressListener the {@link S3ProgressListener} to use.
229+ * @see MessageS3ProgressListener
231230 */
232231 public void setProgressListener (S3ProgressListener s3ProgressListener ) {
233232 this .s3ProgressListener = s3ProgressListener ;
@@ -260,9 +259,8 @@ protected void doInit() {
260259 @ Override
261260 protected Object handleRequestMessage (Message <?> requestMessage ) {
262261 Command command = this .commandExpression .getValue (this .evaluationContext , requestMessage , Command .class );
263- Assert .state (command != null , "'commandExpression' ["
264- + this .commandExpression .getExpressionString ()
265- + "] cannot evaluate to null." );
262+ Assert .state (command != null , () ->
263+ "'commandExpression' [" + this .commandExpression .getExpressionString () + "] cannot evaluate to null." );
266264
267265 Transfer transfer = null ;
268266
@@ -327,8 +325,8 @@ private Transfer upload(Message<?> requestMessage) {
327325 InputStream inputStream = (InputStream ) payload ;
328326 if (metadata .getContentMD5 () == null ) {
329327 Assert .state (inputStream .markSupported (),
330- "For an upload InputStream with no MD5 digest metadata, the " +
331- "markSupported() method must evaluate to true. " );
328+ "For an upload InputStream with no MD5 digest metadata, " +
329+ "the markSupported() method must evaluate to true." );
332330 String contentMd5 = Md5Utils .md5AsBase64 (inputStream );
333331 metadata .setContentMD5 (contentMd5 );
334332 inputStream .reset ();
@@ -350,7 +348,9 @@ else if (payload instanceof File) {
350348 if (metadata .getContentType () == null ) {
351349 metadata .setContentType (Mimetypes .getInstance ().getMimetype (fileToUpload ));
352350 }
353- putObjectRequest = new PutObjectRequest (bucketName , key , fileToUpload ).withMetadata (metadata );
351+ putObjectRequest =
352+ new PutObjectRequest (bucketName , key , fileToUpload )
353+ .withMetadata (metadata );
354354 }
355355 else if (payload instanceof byte []) {
356356 byte [] payloadBytes = (byte []) payload ;
@@ -387,12 +387,30 @@ else if (payload instanceof byte[]) {
387387 }
388388 }
389389
390- S3ProgressListener progressListener = this .s3ProgressListener ;
390+ S3ProgressListener configuredProgressListener = this .s3ProgressListener ;
391+ if (this .s3ProgressListener instanceof MessageS3ProgressListener ) {
392+ configuredProgressListener = new S3ProgressListener () {
393+
394+ @ Override
395+ public void onPersistableTransfer (PersistableTransfer persistableTransfer ) {
396+ S3MessageHandler .this .s3ProgressListener .onPersistableTransfer (persistableTransfer );
397+ }
398+
399+ @ Override
400+ public void progressChanged (ProgressEvent progressEvent ) {
401+ ((MessageS3ProgressListener ) S3MessageHandler .this .s3ProgressListener )
402+ .progressChanged (progressEvent , requestMessage );
403+ }
404+
405+ };
406+ }
407+
408+ S3ProgressListener progressListener = configuredProgressListener ;
391409
392410 if (this .objectAclExpression != null ) {
393411 Object acl = this .objectAclExpression .getValue (this .evaluationContext , requestMessage );
394- Assert .state (acl instanceof AccessControlList || acl instanceof CannedAccessControlList ,
395- "The 'objectAclExpression' ["
412+ Assert .state (acl == null || acl instanceof AccessControlList || acl instanceof CannedAccessControlList ,
413+ () -> "The 'objectAclExpression' ["
396414 + this .objectAclExpression .getExpressionString ()
397415 + "] must evaluate to com.amazonaws.services.s3.model.AccessControlList " +
398416 "or must evaluate to com.amazonaws.services.s3.model.CannedAccessControlList. " +
@@ -424,8 +442,8 @@ public void progressChanged(ProgressEvent progressEvent) {
424442
425443 };
426444
427- if (this . s3ProgressListener != null ) {
428- progressListener = new S3ProgressListenerChain (this . s3ProgressListener , progressListener );
445+ if (configuredProgressListener != null ) {
446+ progressListener = new S3ProgressListenerChain (configuredProgressListener , progressListener );
429447 }
430448
431449 }
@@ -441,8 +459,9 @@ public void progressChanged(ProgressEvent progressEvent) {
441459
442460 private Transfer download (Message <?> requestMessage ) {
443461 Object payload = requestMessage .getPayload ();
444- Assert .state (payload instanceof File , "For the 'DOWNLOAD' operation the 'payload' must be of " +
445- "'java.io.File' type, but gotten: [" + payload .getClass () + ']' );
462+ Assert .state (payload instanceof File ,
463+ () -> "For the 'DOWNLOAD' operation the 'payload' must be of " +
464+ "'java.io.File' type, but gotten: [" + payload .getClass () + ']' );
446465
447466 File targetFile = (File ) payload ;
448467
@@ -457,7 +476,7 @@ private Transfer download(Message<?> requestMessage) {
457476 }
458477
459478 Assert .state (key != null ,
460- "The 'keyExpression' must not be null for non-File payloads and can't evaluate to null. " +
479+ () -> "The 'keyExpression' must not be null for non-File payloads and can't evaluate to null. " +
461480 "Root object is: " + requestMessage );
462481
463482 if (targetFile .isDirectory ()) {
@@ -483,7 +502,7 @@ private Transfer copy(Message<?> requestMessage) {
483502 }
484503
485504 Assert .state (sourceKey != null ,
486- "The 'keyExpression' must not be null for 'copy' operation " +
505+ () -> "The 'keyExpression' must not be null for 'copy' operation " +
487506 "and 'keyExpression' can't evaluate to null. " +
488507 "Root object is: " + requestMessage );
489508
@@ -498,8 +517,8 @@ private Transfer copy(Message<?> requestMessage) {
498517 }
499518
500519 Assert .state (destinationBucketName != null ,
501- "The 'destinationBucketExpression' must not be null for 'copy' operation and can't evaluate to null. " +
502- "Root object is: " + requestMessage );
520+ () -> "The 'destinationBucketExpression' must not be null for 'copy' operation " +
521+ "and can't evaluate to null. Root object is: " + requestMessage );
503522
504523 String destinationKey = null ;
505524 if (this .destinationKeyExpression != null ) {
@@ -508,8 +527,8 @@ private Transfer copy(Message<?> requestMessage) {
508527 }
509528
510529 Assert .state (destinationKey != null ,
511- "The 'destinationKeyExpression' must not be null for 'copy' operation and can't evaluate to null. " +
512- "Root object is: " + requestMessage );
530+ () -> "The 'destinationKeyExpression' must not be null for 'copy' operation " +
531+ "and can't evaluate to null. Root object is: " + requestMessage );
513532
514533
515534 CopyObjectRequest copyObjectRequest =
@@ -525,9 +544,10 @@ private String obtainBucket(Message<?> requestMessage) {
525544 else {
526545 bucketName = this .bucketExpression .getValue (this .evaluationContext , requestMessage , String .class );
527546 }
528- Assert .state (bucketName != null , "The 'bucketExpression' ["
529- + this .bucketExpression .getExpressionString ()
530- + "] must not evaluate to null. Root object is: " + requestMessage );
547+ Assert .state (bucketName != null ,
548+ () -> "The 'bucketExpression' ["
549+ + this .bucketExpression .getExpressionString ()
550+ + "] must not evaluate to null. Root object is: " + requestMessage );
531551
532552 if (this .resourceIdResolver != null ) {
533553 bucketName = this .resourceIdResolver .resolveToPhysicalResourceId (bucketName );
@@ -560,6 +580,23 @@ public enum Command {
560580
561581 }
562582
583+ /**
584+ * An {@link S3ProgressListener} extension to provide a {@code requestMessage}
585+ * context for the {@code progressChanged} event.
586+ *
587+ * @since 2.1
588+ */
589+ public interface MessageS3ProgressListener extends S3ProgressListener {
590+
591+ @ Override
592+ default void progressChanged (ProgressEvent progressEvent ) {
593+ throw new UnsupportedOperationException ("Use progressChanged(ProgressEvent, Message<?>) instead." );
594+ }
595+
596+ void progressChanged (ProgressEvent progressEvent , Message <?> message );
597+
598+ }
599+
563600 /**
564601 * The callback to populate an {@link ObjectMetadata} for upload operation.
565602 * The message can be used as a metadata source.
0 commit comments