Skip to content

Commit

Permalink
fix(kafka): fix Kafka connector deactivate method
Browse files Browse the repository at this point in the history
  • Loading branch information
johnBgood committed Jan 23, 2025
1 parent 365b0ad commit 1c97e87
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -102,7 +99,9 @@ public void startConsumer() {
} catch (Exception ex) {
LOG.error("Consumer loop failure, retry pending: {}", ex.getMessage(), ex);
try {
consumer.close();
if (consumer != null) {
consumer.close();
}
} catch (Exception e) {
LOG.error(
"Failed to close consumer before retrying, reason: {}. "
Expand Down Expand Up @@ -134,7 +133,7 @@ private void prepareConsumer() {
new OffsetUpdateRequiredListener(topicName, consumer, elementProps.offsets()));
reportUp();
} catch (Exception ex) {
LOG.error("Failed to initialize connector: {}", ex.getMessage());
LOG.error("Failed to initialize connector", ex);
context.log(
Activity.level(Severity.ERROR)
.tag("Subscription")
Expand Down Expand Up @@ -198,12 +197,18 @@ private void handleCorrelationResult(CorrelationResult result) {
}
}

public void stopConsumer() throws ExecutionException, InterruptedException {
public void stopConsumer() {
this.shouldLoop = false;
if (this.future != null && !this.future.isDone()) {
this.future.get();
try {
this.future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Timeout while waiting for retryableFuture to stop", e);
}
}
if (this.consumer != null) {
this.consumer.close();
}
this.consumer.close();
if (this.executorService != null) {
this.executorService.shutdownNow();
}
Expand Down

0 comments on commit 1c97e87

Please sign in to comment.