浏览代码

add ack-after-batch to consumer; ensure print writers append

Grega Bremec 7 月之前
父节点
当前提交
f1b1c75fa4

+ 21 - 1
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/Consumer.java

@@ -1,6 +1,7 @@
 package com.redhat.training.kafka.coreapi.consumer;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.time.Duration;
@@ -90,6 +91,18 @@ public class Consumer {
             System.setProperty("consumer.auto-commit", "false");
             ackEveryNum = ConfigProvider.getConfig().getValue("consumer.ack-every-x-msgs", Integer.class);
         }
+        boolean ackAfterBatch = false;
+        if (ConfigProvider.getConfig().getOptionalValue("consumer.ack-after-batch", Boolean.class).isPresent()) {
+            if (ConfigProvider.getConfig().getValue("consumer.ack-after-batch", Boolean.class)) {
+                LOG.warn("ack-after-batch is set, turning autocommit off.");
+                System.setProperty("consumer.auto-commit", "false");
+                ackAfterBatch = true;
+                if (ackEveryNum != 0) {
+                    LOG.warn("ack-every-x-msgs and ack-after-batch are exclusive; turning the former off.");
+                    ackEveryNum = 0;
+                }
+            }
+        }
 
         // keep a payload log for each run, truncate it
         LOG.info("Opening payload log...");
@@ -105,7 +118,8 @@ public class Consumer {
                 payloadLog.delete();
                 payloadLog.createNewFile();
             }
-            pl = new PrintWriter(payloadLog);
+            FileWriter fl = new FileWriter(logfile, true);
+            pl = new PrintWriter(fl);
         } catch (IOException ioe) {
             throw new RuntimeException("Could not (re)create payload log: " + ioe.getMessage());
         }
@@ -197,6 +211,12 @@ public class Consumer {
                     LOG.warn("Interrupted in sleep-after-batch: " + ie.getMessage());
                 }
             }
+
+            // consumer.ack-after-batch
+            if (ackAfterBatch) {
+                LOG.info("Batch completed, committing offsets as ackAfterBatch == true");
+                kc.commitSync();
+            }
         }
         kc.close();
         pl.close();

+ 1 - 0
code/core-api-consumer/src/main/resources/META-INF/microprofile-config.properties

@@ -37,6 +37,7 @@ ssl.truststore.password = FIXME
 # consumer.wait-after-batch =               # 0
 # consumer.wait-after-record =              # 0
 # consumer.ack-every-x-msgs =               # 0
+# consumer.ack-after-batch =                # false
 # consumer.wait-cmd-period =                # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false

+ 3 - 1
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -1,6 +1,7 @@
 package com.redhat.training.kafka.coreapi.producer;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Properties;
@@ -126,7 +127,8 @@ public class Producer {
                 payloadLog.createNewFile();
             }
 
-            pl = new PrintWriter(payloadLog);
+            FileWriter fl = new FileWriter(logfile, true);
+            pl = new PrintWriter(fl);
         } catch (IOException ioe) {
             throw new RuntimeException("Could not (re)create payload log: " + ioe.getMessage());
         }

+ 1 - 0
code/core-api-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -37,6 +37,7 @@ ssl.truststore.password = FIXME
 # consumer.wait-after-batch =               # 0
 # consumer.wait-after-record =              # 0
 # consumer.ack-every-x-msgs =               # 0
+# consumer.ack-after-batch =                # false
 # consumer.wait-cmd-period =                # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false