Преглед изворни кода

add support for waits at several key places

Grega Bremec пре 7 месеци
родитељ
комит
18db3d9f78

+ 17 - 6
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/Consumer.java

@@ -78,7 +78,8 @@ public class Consumer {
         String topic = ConfigProvider.getConfig().getOptionalValue("consumer.topic", String.class).orElse("test-topic");
         int pollPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.poll-period", Integer.class).orElse(1000);
         int waitAfterRecord = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-record", Integer.class).orElse(0);
-        int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-batch", Integer.class).orElse(0);
+        int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-recv", Integer.class).orElse(0);
+        int waitAfterBatch = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-batch", Integer.class).orElse(0);
         int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-cmd-period", Integer.class).orElse(5000);
         int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
         boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("consumer.payload-trunc", Boolean.class).orElse(false);
@@ -112,11 +113,19 @@ public class Consumer {
         KafkaConsumer<Integer, String> kc = new KafkaConsumer<>(configProperties());
         kc.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListenerImpl());
 
-        LOG.info(String.format("Starting to poll for records of up to %d ms...", pollPeriod));
+        LOG.info("Starting to poll for batches of up to {} records / up to {} ms...",
+                    ConfigProvider.getConfig().getOptionalValue("consumer.max-poll-recs", String.class).orElse("500"),
+                    pollPeriod);
         boolean exitRequest = false;
         int recsSeen = 0;
         while (true) {
             ConsumerRecords<Integer, String> recs = kc.poll(Duration.ofMillis(pollPeriod));
+            try {
+                LOG.info("Received {} records. Sleeping for {} ms as per instructions...", recs.count(), waitAfterRecv);
+                Thread.sleep(waitAfterRecv);
+            } catch (InterruptedException ie) {
+                LOG.warn("Interrupted in sleep-after-recv: " + ie.getMessage());
+            }
             for (ConsumerRecord<Integer, String> rec : recs) {
                 recsSeen++;
                 if (rec.value().equals("quit")) {
@@ -148,20 +157,22 @@ public class Consumer {
                     kc.commitSync();
                 }
                 try {
+                    LOG.info("Record processed. Sleeping for {} ms as per instructions...", waitAfterRecord);
                     Thread.sleep(waitAfterRecord);
                 } catch (InterruptedException ie) {
                     LOG.warn("Interrupted in sleep-after-record: " + ie.getMessage());
                 }
-                }
+            }
             if (exitRequest) {
                 break;
             }
             try {
-                Thread.sleep(waitAfterRecv);
+                LOG.info("Batch of {} processed. Sleeping for {} ms as per instructions...", recs.count(), waitAfterBatch);
+                Thread.sleep(waitAfterBatch);
             } catch (InterruptedException ie) {
-                LOG.warn("Interrupted in sleep-after-recv: " + ie.getMessage());
+                LOG.warn("Interrupted in sleep-after-batch: " + ie.getMessage());
             }
-        };
+        }
         kc.close();
         pl.close();
     }