|
@@ -80,6 +80,12 @@ public class Consumer {
|
|
|
int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-period", Integer.class).orElse(5000);
|
|
|
int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
|
|
|
boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("consumer.payload-trunc", Boolean.class).orElse(false);
|
|
|
+ int ackEveryNum = 0;
|
|
|
+ if (ConfigProvider.getConfig().getOptionalValue("consumer.ack-every-x-msgs", Integer.class).isPresent()) {
|
|
|
+ LOG.warn("ack-every-x-msgs is set, turning autocommit off.");
|
|
|
+ System.setProperty("consumer.auto-commit", "false");
|
|
|
+ ackEveryNum = ConfigProvider.getConfig().getValue("consumer.ack-every-x-msgs", Integer.class);
|
|
|
+ }
|
|
|
|
|
|
// keep a payload log for each run, truncate it
|
|
|
LOG.info("Opening payload log...");
|
|
@@ -106,9 +112,11 @@ public class Consumer {
|
|
|
|
|
|
LOG.info(String.format("Starting to poll for records of up to %d ms...", pollPeriod));
|
|
|
boolean exitRequest = false;
|
|
|
+ int recsSeen = 0;
|
|
|
while (true) {
|
|
|
ConsumerRecords<Integer, String> recs = kc.poll(Duration.ofMillis(pollPeriod));
|
|
|
for (ConsumerRecord<Integer, String> rec : recs) {
|
|
|
+ recsSeen++;
|
|
|
if (rec.value().equals("quit")) {
|
|
|
LOG.info("Received \"quit\" message. Exiting.");
|
|
|
exitRequest = true;
|
|
@@ -131,6 +139,11 @@ public class Consumer {
|
|
|
// so it wasn't a control message - make sense of what we received
|
|
|
LOG.info(String.format("Received: T:%s P:%d K:%d V:%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
|
|
|
pl.println(String.format("%s,%d,%d,%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
|
|
|
+
|
|
|
+ if (ackEveryNum != 0 && (recsSeen % ackEveryNum) == 0) {
|
|
|
+ LOG.info("Seen {} records, committing offsets as ackEveryNum == {}", recsSeen, ackEveryNum);
|
|
|
+ kc.commitSync();
|
|
|
+ }
|
|
|
}
|
|
|
if (exitRequest) {
|
|
|
break;
|