Переглянути джерело

add support for retry backoff

Grega Bremec 7 місяців тому
батько
коміт
ef49f6d5db

+ 3 - 1
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -64,7 +64,7 @@ public class Producer {
         boolean idempotence = true;
         if (cf.getOptionalValue("producer.acks", String.class).isPresent()) {
             String acks = cf.getValue("producer.acks", String.class);
-            if (!acks.equals("all")) {
+            if (!acks.equals("all") && !acks.equals("-1")) {
                 LOG.info("Setting idempotence to false as acks != all.");
                 idempotence = false;
             }
@@ -88,6 +88,8 @@ public class Producer {
         props.put(ProducerConfig.RETRIES_CONFIG, cf.getOptionalValue("producer.retries", String.class).orElse("2147483647"));
         props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.delivery-timeout", String.class).orElse("120000"));
         props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.request-timeout", String.class).orElse("30000"));
+        props.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, cf.getOptionalValue("producer.retry-max", String.class).orElse("1000"));
+        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, cf.getOptionalValue("producer.retry-backoff", String.class).orElse("100"));
 
         // TODO?
         // if (cf.getOptionalValue("producer.partitioner", String.class).isPresent()) {