Browse Source

add support for new producer options

Grega Bremec 7 months ago
parent
commit
15cc1cba62

+ 1 - 1
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/Consumer.java

@@ -79,7 +79,7 @@ public class Consumer {
         int pollPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.poll-period", Integer.class).orElse(1000);
         int waitAfterRecord = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-record", Integer.class).orElse(0);
         int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-batch", Integer.class).orElse(0);
-        int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-period", Integer.class).orElse(5000);
+        int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-cmd-period", Integer.class).orElse(5000);
         int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
         boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("consumer.payload-trunc", Boolean.class).orElse(false);
         int ackEveryNum = 0;

+ 14 - 1
code/core-api-consumer/src/main/resources/META-INF/microprofile-config.properties

@@ -10,6 +10,8 @@ ssl.truststore.password = FIXME
 # producer.num-records-per-roll =           # 100
 # producer.wait-after-roll =                # 5000
 # producer.wait-after-send =                # 500
+# producer.local-id =                       # -1 (for log file only)
+# producer.payload-trunc =                  # false
 
 # producer.acks =                           # all
 # producer.max-inflight =                   # 5
@@ -18,13 +20,24 @@ ssl.truststore.password = FIXME
 # producer.linger =                         # 0
 # producer.retries =                        # 2147483647
 # producer.delivery-timeout =               # 120000
+# producer.request-timeout =                # 30000
+# producer.retry-max =                      # 1000
+# producer.retry-backoff =                  # 100
+
+# TODO
+# producer.partitioner
+# producer.transaction-id
+# producer.transaction-timeout
+# producer.send-wait
+# producer.send-crash
+# producer.send-quit
 
 # consumer.topic =                          # test-topic
 # consumer.poll-period =                    # 1000
 # consumer.wait-after-batch =               # 0
 # consumer.wait-after-record =              # 0
 # consumer.ack-every-x-msgs =               # 0
-# consumer.wait-period =                    # 5000
+# consumer.wait-cmd-period =                # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false
 

+ 17 - 3
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -87,6 +87,9 @@ public class Producer {
         props.put(ProducerConfig.LINGER_MS_CONFIG, cf.getOptionalValue("producer.linger", String.class).orElse("0"));
         props.put(ProducerConfig.RETRIES_CONFIG, cf.getOptionalValue("producer.retries", String.class).orElse("2147483647"));
         props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.delivery-timeout", String.class).orElse("120000"));
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.request-timeout", String.class).orElse("30000"));
+
+        // TODO?
         // if (cf.getOptionalValue("producer.partitioner", String.class).isPresent()) {
         //     switch (cf.getValue("producer.partitioner", String.class)) {
         //         case ""
@@ -102,14 +105,25 @@ public class Producer {
         int sendSize = ConfigProvider.getConfig().getOptionalValue("producer.num-records-per-roll", Integer.class).orElse(100);
         int waitAfterBatch = ConfigProvider.getConfig().getOptionalValue("producer.wait-after-roll", Integer.class).orElse(5000);
         int waitAfterSend = ConfigProvider.getConfig().getOptionalValue("producer.wait-after-send", Integer.class).orElse(500);
+        int localId = ConfigProvider.getConfig().getOptionalValue("producer.local-id", Integer.class).orElse(-1);
+        boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("producer.payload-trunc", Boolean.class).orElse(false);
 
         // keep a payload log for each run, truncate it
         LOG.info("Opening payload log...");
         PrintWriter pl;
         try {
-            File payloadLog = new File("payload.log");
-            payloadLog.delete();
-            payloadLog.createNewFile();
+            String logfile = "payload.log";
+            if (localId > -1) {
+                logfile = "payload-" + localId + ".log";
+            }
+            File payloadLog = new File(logfile);
+
+            if (truncPayload) {
+                LOG.info("Truncating payload log per request.");
+                payloadLog.delete();
+                payloadLog.createNewFile();
+            }
+
             pl = new PrintWriter(payloadLog);
         } catch (IOException ioe) {
             throw new RuntimeException("Could not (re)create payload log: " + ioe.getMessage());

+ 14 - 1
code/core-api-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -10,6 +10,8 @@ ssl.truststore.password = FIXME
 # producer.num-records-per-roll =           # 100
 # producer.wait-after-roll =                # 5000
 # producer.wait-after-send =                # 500
+# producer.local-id =                       # -1 (for log file only)
+# producer.payload-trunc =                  # false
 
 # producer.acks =                           # all
 # producer.max-inflight =                   # 5
@@ -18,13 +20,24 @@ ssl.truststore.password = FIXME
 # producer.linger =                         # 0
 # producer.retries =                        # 2147483647
 # producer.delivery-timeout =               # 120000
+# producer.request-timeout =                # 30000
+# producer.retry-max =                      # 1000
+# producer.retry-backoff =                  # 100
+
+# TODO
+# producer.partitioner
+# producer.transaction-id
+# producer.transaction-timeout
+# producer.send-wait
+# producer.send-crash
+# producer.send-quit
 
 # consumer.topic =                          # test-topic
 # consumer.poll-period =                    # 1000
 # consumer.wait-after-batch =               # 0
 # consumer.wait-after-record =              # 0
 # consumer.ack-every-x-msgs =               # 0
-# consumer.wait-period =                    # 5000
+# consumer.wait-cmd-period =                # 5000
 # consumer.local-id =                       # -1 (for log file only)
 # consumer.payload-trunc =                  # false