Forráskód Böngészése

rename wait-after-recv to wait-after-batch; flush logs after each write

Grega Bremec 8 hónapja
szülő
commit
c056a165f6

+ 2 - 1
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/Consumer.java

@@ -78,7 +78,7 @@ public class Consumer {
         String topic = ConfigProvider.getConfig().getOptionalValue("consumer.topic", String.class).orElse("test-topic");
         int pollPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.poll-period", Integer.class).orElse(1000);
         int waitAfterRecord = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-record", Integer.class).orElse(0);
-        int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-recv", Integer.class).orElse(0);
+        int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-batch", Integer.class).orElse(0);
         int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-period", Integer.class).orElse(5000);
         int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
         boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("consumer.payload-trunc", Boolean.class).orElse(false);
@@ -141,6 +141,7 @@ public class Consumer {
                 // so it wasn't a control message - make sense of what we received
                 LOG.info(String.format("Received: T:%s P:%d K:%d V:%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
                 pl.println(String.format("%s,%d,%d,%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
+                pl.flush();
 
                 if (ackEveryNum != 0 && (recsSeen % ackEveryNum) == 0) {
                     LOG.info("Seen {} records, committing offsets as ackEveryNum == {}", recsSeen, ackEveryNum);

+ 2 - 2
code/core-api-consumer/src/main/resources/META-INF/microprofile-config.properties

@@ -21,10 +21,10 @@ ssl.truststore.password = FIXME
 
 # consumer.topic =                          # test-topic
 # consumer.poll-period =                    # 1000
-# consumer.wait-after-recv =                # 0
+# consumer.wait-after-batch =               # 0
 # consumer.wait-after-record =              # 0
-# consumer.wait-period =                    # 5000
 # consumer.ack-every-x-msgs =               # 0
+# consumer.wait-period =                    # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false
 

+ 1 - 0
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -132,6 +132,7 @@ public class Producer {
                             LOG.warn(e.getMessage());
                         } else {
                             pl.println(String.format("%s,%d,%d,%s", rm.topic(), rm.partition(), rec.key(), rec.value()));
+                            pl.flush();
                             LOG.info(String.format("Sent: T:%s P:%d K:%d V:%s", rm.topic(), rm.partition(), rec.key(), rec.value()));
                         }
                     }

+ 3 - 2
code/core-api-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -21,9 +21,10 @@ ssl.truststore.password = FIXME
 
 # consumer.topic =                          # test-topic
 # consumer.poll-period =                    # 1000
-# consumer.wait-after-recv =                # 0
-# consumer.wait-period =                    # 5000
+# consumer.wait-after-batch =               # 0
+# consumer.wait-after-record =              # 0
 # consumer.ack-every-x-msgs =               # 0
+# consumer.wait-period =                    # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false