|
@@ -121,12 +121,23 @@ public class Consumer {
|
|
|
int recsSeen = 0;
|
|
|
while (true) {
|
|
|
ConsumerRecords<Integer, String> recs = kc.poll(Duration.ofMillis(pollPeriod));
|
|
|
- try {
|
|
|
- LOG.info("Received {} records. Sleeping for {} ms as per instructions...", recs.count(), waitAfterRecv);
|
|
|
- Thread.sleep(waitAfterRecv);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Interrupted in sleep-after-recv: " + ie.getMessage());
|
|
|
+
|
|
|
+ // no records? skip the rest of this loop
|
|
|
+ if (recs.count() == 0) {
|
|
|
+ continue;
|
|
|
}
|
|
|
+
|
|
|
+ // consumer.wait-after-recv
|
|
|
+ if (waitAfterRecv > 0) {
|
|
|
+ try {
|
|
|
+ LOG.info("Received {} records. Sleeping for {} ms as per instructions...", recs.count(), waitAfterRecv);
|
|
|
+ Thread.sleep(waitAfterRecv);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted in sleep-after-recv: " + ie.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // process records
|
|
|
for (ConsumerRecord<Integer, String> rec : recs) {
|
|
|
recsSeen++;
|
|
|
if (rec.value().equals("quit")) {
|
|
@@ -153,25 +164,36 @@ public class Consumer {
|
|
|
pl.println(String.format("%s,%d,%d,%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
|
|
|
pl.flush();
|
|
|
|
|
|
+ // consumer.ack-every-x-msgs
|
|
|
if (ackEveryNum != 0 && (recsSeen % ackEveryNum) == 0) {
|
|
|
LOG.info("Seen {} records, committing offsets as ackEveryNum == {}", recsSeen, ackEveryNum);
|
|
|
kc.commitSync();
|
|
|
}
|
|
|
- try {
|
|
|
- LOG.info("Record processed. Sleeping for {} ms as per instructions...", waitAfterRecord);
|
|
|
- Thread.sleep(waitAfterRecord);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Interrupted in sleep-after-record: " + ie.getMessage());
|
|
|
+
|
|
|
+ // consumer.wait-after-record
|
|
|
+ if (waitAfterRecord > 0) {
|
|
|
+ try {
|
|
|
+ LOG.info("Record processed. Sleeping for {} ms as per instructions...", waitAfterRecord);
|
|
|
+ Thread.sleep(waitAfterRecord);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted in sleep-after-record: " + ie.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ // clean shutdown command?
|
|
|
if (exitRequest) {
|
|
|
break;
|
|
|
}
|
|
|
- try {
|
|
|
- LOG.info("Batch of {} processed. Sleeping for {} ms as per instructions...", recs.count(), waitAfterBatch);
|
|
|
- Thread.sleep(waitAfterBatch);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- LOG.warn("Interrupted in sleep-after-batch: " + ie.getMessage());
|
|
|
+
|
|
|
+ // consumer.wait-after-batch
|
|
|
+ if (waitAfterBatch > 0) {
|
|
|
+ try {
|
|
|
+ LOG.info("Batch of {} processed. Sleeping for {} ms as per instructions...", recs.count(), waitAfterBatch);
|
|
|
+ Thread.sleep(waitAfterBatch);
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted in sleep-after-batch: " + ie.getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
kc.close();
|