|
@@ -77,6 +77,7 @@ public class Consumer {
|
|
|
// the remaining configurations
|
|
|
String topic = ConfigProvider.getConfig().getOptionalValue("consumer.topic", String.class).orElse("test-topic");
|
|
|
int pollPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.poll-period", Integer.class).orElse(1000);
|
|
|
+ int waitAfterRecord = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-record", Integer.class).orElse(0);
|
|
|
int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-recv", Integer.class).orElse(0);
|
|
|
int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-period", Integer.class).orElse(5000);
|
|
|
int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
|
|
@@ -145,7 +146,12 @@ public class Consumer {
|
|
|
LOG.info("Seen {} records, committing offsets as ackEveryNum == {}", recsSeen, ackEveryNum);
|
|
|
kc.commitSync();
|
|
|
}
|
|
|
- }
|
|
|
+ try {
|
|
|
+ Thread.sleep(waitAfterRecord);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted in sleep-after-record: " + ie.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
if (exitRequest) {
|
|
|
break;
|
|
|
}
|