From 0576a053bfab0636d90eae431acabc7ad3214341 Mon Sep 17 00:00:00 2001 From: Jonathan Fox Date: Fri, 23 Aug 2024 12:15:15 +0100 Subject: [PATCH] Update ManagedSubscribe.java to handle termination When terminating, if the connection is closed, immediate resub results in the following error 'An active managed subscription with the same ID already exists. If a previous subscription with the same ID was stopped, there can be a delay before it expires. Try subscribing again later.'. This change aims to allow ctrl+c gracefully closes the connection. --- .../java/genericpubsub/ManagedSubscribe.java | 49 ++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/java/src/main/java/genericpubsub/ManagedSubscribe.java b/java/src/main/java/genericpubsub/ManagedSubscribe.java index 344382f..52c055f 100644 --- a/java/src/main/java/genericpubsub/ManagedSubscribe.java +++ b/java/src/main/java/genericpubsub/ManagedSubscribe.java @@ -1,6 +1,7 @@ package genericpubsub; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -11,6 +12,9 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.protobuf.ByteString; import com.salesforce.eventbus.protobuf.*; @@ -39,6 +43,7 @@ public class ManagedSubscribe extends CommonContext implements StreamObserver recordMap = new HashMap<>(); + for (Schema.Field field : writerSchema.getFields()) { + String fieldName = field.name(); + Object value = record.get(fieldName); + recordMap.put(fieldName, value); + } + + ObjectMapper mapper = new ObjectMapper(); + mapper.enable(SerializationFeature.INDENT_OUTPUT); + String jsonString = mapper.writeValueAsString(recordMap); + + logger.info("Received event with payload:\n" + jsonString); + logger.info("Schema name: " + writerSchema.getName()); + + //logger.info("Received event: {}", record.toString()); + if (processChangedFields) { // This example expands the changedFields bitmap field in ChangeEventHeader. // To expand the other bitmap fields, i.e., diffFields and nulledFields, replicate or modify this code. @@ -225,12 +247,15 @@ public synchronized void close() { } serverOnCompletedLatch.await(6, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.warn("interrupted while waiting to close ", e); + logger.warn("Interrupted while waiting to close", e); + } catch (Exception e) { + logger.error("Error during shutdown", e); } } super.close(); } + /** * Helper function to terminate the client on errors. */ @@ -253,15 +278,25 @@ public void waitInMillis(long duration) { } } - public static void main(String args[]) throws IOException { + public static void main(String args[]) throws IOException { ExampleConfigurations exampleConfigurations = new ExampleConfigurations("arguments.yaml"); - - // Using the try-with-resource statement. The CommonContext class implements AutoCloseable in - // order to close the resources used. - try (ManagedSubscribe subscribe = new ManagedSubscribe(exampleConfigurations)) { + + // Create an instance of ManagedSubscribe + ManagedSubscribe subscribe = new ManagedSubscribe(exampleConfigurations); + + // Add a shutdown hook to close the connection when Ctrl+C is pressed + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (ManagedSubscribe.isActive.get()) { + System.out.println("Shutdown hook triggered, closing connection..."); + subscribe.close(); + } + })); + + try { subscribe.startManagedSubscription(); } catch (Exception e) { printStatusRuntimeException("Error during ManagedSubscribe", e); } } + }