Bläddra i källkod

sort of done with the apps

Grega Bremec 7 månader sedan
förälder
incheckning
bdf4500f1f

+ 48 - 0
code/core-api-consumer/pom.xml

@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>com.redhat.training.kafka</groupId>
+        <artifactId>parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>core-api-consumer</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.release}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.smallrye.config</groupId>
+            <artifactId>smallrye-config</artifactId>
+            <version>${smallrye.release}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+          <version>${slf4j.release}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <mainClass>com.redhat.training.kafka.coreapi.consumer.Consumer</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 147 - 0
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/Consumer.java

@@ -0,0 +1,147 @@
+package com.redhat.training.kafka.coreapi.consumer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Consumer {
+    static final Logger LOG = LoggerFactory.getLogger(Consumer.class.getName());
+
+    public static Properties configProperties() {
+        Config cf = ConfigProvider.getConfig();
+        Properties props = new Properties();
+
+        // Standard mandatory configs.
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("bootstrap.server", String.class));
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("security.protocol", String.class).orElse("PLAINTEXT"));
+        if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
+            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore.location", String.class));
+            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.truststore.password", String.class));
+        }
+
+        // Fixed config, not changeable.
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.IntegerDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
+
+        // Optional stuff.
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, cf.getOptionalValue("consumer.group-id", String.class).orElse("test-app"));
+        if (cf.getOptionalValue("consumer.instance-id", String.class).isPresent()) {
+            props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, cf.getValue("consumer.instance-id", String.class));
+        }
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, cf.getOptionalValue("consumer.auto-commit", String.class).orElse("true"));
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, cf.getOptionalValue("consumer.ac-interval", String.class).orElse("5000"));
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, cf.getOptionalValue("consumer.fetch-min-bytes", String.class).orElse("1"));
+
+        switch (cf.getOptionalValue("consumer.assignment-strategy", String.class).orElse("cooperative")) {
+            case "range":
+                props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                            org.apache.kafka.clients.consumer.RangeAssignor.class.getName());
+                break;
+            case "rr":
+                props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                            org.apache.kafka.clients.consumer.RoundRobinAssignor.class.getName());
+                break;
+            case "sticky":
+                props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                            org.apache.kafka.clients.consumer.StickyAssignor.class.getName());
+                break;
+            case "cooperative":
+                props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                            org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName());
+        }
+
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, cf.getOptionalValue("consumer.heartbeat-interval", String.class).orElse("3000"));
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, cf.getOptionalValue("consumer.session-timeout", String.class).orElse("45000"));
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, cf.getOptionalValue("consumer.auto-offset-reset", String.class).orElse("latest"));
+
+        return props;
+    }
+
+    public static void main(String... args) {
+        // the remaining configurations
+        String topic = ConfigProvider.getConfig().getOptionalValue("consumer.topic", String.class).orElse("test-topic");
+        int pollPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.poll-period", Integer.class).orElse(1000);
+        int waitAfterRecv = ConfigProvider.getConfig().getOptionalValue("consumer.wait-after-recv", Integer.class).orElse(0);
+        int waitPeriod = ConfigProvider.getConfig().getOptionalValue("consumer.wait-period", Integer.class).orElse(5000);
+        int localId = ConfigProvider.getConfig().getOptionalValue("consumer.local-id", Integer.class).orElse(-1);
+        boolean truncPayload = ConfigProvider.getConfig().getOptionalValue("consumer.payload-trunc", Boolean.class).orElse(false);
+
+        // keep a payload log for each run, truncate it
+        LOG.info("Opening payload log...");
+        PrintWriter pl;
+        try {
+            String logfile = "payload.log";
+            if (localId > -1) {
+                logfile = "payload-" + localId + ".log";
+            }
+            File payloadLog = new File(logfile);
+            if (truncPayload) {
+                LOG.info("Truncating payload log per request.");
+                payloadLog.delete();
+                payloadLog.createNewFile();
+            }
+            pl = new PrintWriter(payloadLog);
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not (re)create payload log: " + ioe.getMessage());
+        }
+
+        // create a consumer and subscribe to topic
+        KafkaConsumer<Integer, String> kc = new KafkaConsumer<>(configProperties());
+        kc.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListenerImpl());
+
+        LOG.info(String.format("Starting to poll for records of up to %d ms...", pollPeriod));
+        boolean exitRequest = false;
+        while (true) {
+            ConsumerRecords<Integer, String> recs = kc.poll(Duration.ofMillis(pollPeriod));
+            for (ConsumerRecord<Integer, String> rec : recs) {
+                if (rec.value().equals("quit")) {
+                    LOG.info("Received \"quit\" message. Exiting.");
+                    exitRequest = true;
+                    break;
+                }
+                if (rec.value().equals("crash")) {
+                    LOG.info("Received \"crash\" message. Crashing.");
+                    throw new RuntimeException("User requested crash.");
+                }
+                if (rec.value().equals("wait")) {
+                    LOG.info(String.format("Received \"wait\" message. Yielding for %d ms.", waitPeriod));
+                    try {
+                        Thread.sleep(waitPeriod);
+                    } catch (InterruptedException ie) {
+                        LOG.warn("Interrupted in wait-upon-request: " + ie.getMessage());
+                    }
+                    break;
+                }
+
+                // so it wasn't a control message - make sense of what we received
+                LOG.info(String.format("Received: T:%s P:%d K:%d V:%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
+                pl.println(String.format("%s,%d,%d,%s", rec.topic(), rec.partition(), rec.key(), rec.value()));
+            }
+            if (exitRequest) {
+                break;
+            }
+            try {
+                Thread.sleep(waitAfterRecv);
+            } catch (InterruptedException ie) {
+                LOG.warn("Interrupted in sleep-after-recv: " + ie.getMessage());
+            }
+        };
+        kc.close();
+        pl.close();
+    }
+}

+ 22 - 0
code/core-api-consumer/src/main/java/com/redhat/training/kafka/coreapi/consumer/ConsumerRebalanceListenerImpl.java

@@ -0,0 +1,22 @@
+package com.redhat.training.kafka.coreapi.consumer;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerRebalanceListenerImpl implements ConsumerRebalanceListener {
+    private final Logger LOG = LoggerFactory.getLogger(ConsumerRebalanceListenerImpl.class.getName());
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+        LOG.info("NEW PARTITIONS ASSIGNED: " + partitions.toString());
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+        LOG.info("OLD PARTITIONS REVOKED: " + partitions.toString());
+    }
+}

+ 37 - 0
code/core-api-consumer/src/main/resources/META-INF/microprofile-config.properties

@@ -0,0 +1,37 @@
+bootstrap.server = localhost:9092,localhost:9192,localhost:9292
+
+# PLAINTEXT is the default, use SSL and specify the below two if using it
+security.protocol = PLAINTEXT
+ssl.truststore.location = bf-brokers.p12
+ssl.truststore.password = FIXME
+
+# producer.topic =                          # test-topic
+# producer.num-rolls =                      # 1
+# producer.num-records-per-roll =           # 100
+# producer.wait-after-roll =                # 5000
+# producer.wait-after-send =                # 500
+
+# producer.acks =                           # all
+# producer.max-inflight =                   # 5
+# producer.idempotent =                     # true
+# producer.batch =                          # 16384
+# producer.linger =                         # 0
+# producer.retries =                        # 2147483647
+# producer.delivery-timeout =               # 120000
+
+# consumer.topic =                          # test-topic
+# consumer.poll-period =                    # 1000
+# consumer.wait-after-recv =                # 0
+# consumer.wait-period =                    # 5000
+# consumer.local-id =                       # -1 (for log file only)
+# consumer.payload-trunc =                  # false
+
+# consumer.group-id =                       # test-app
+# consumer.instance-id =                    # null
+# consumer.auto-commit =                    # true
+# consumer.ac-interval =                    # 5000
+# consumer.fetch-min-bytes =                # 1
+# consumer.assignment-strategy =            # cooperative (range, rr, sticky)
+# consumer.heartbeat-interval =             # 3000
+# consumer.session-timeout =                # 45000
+# consumer.auto-offset-reset =              # latest

+ 56 - 9
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -1,8 +1,10 @@
 package com.redhat.training.kafka.coreapi.producer;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Properties;
 import java.util.Random;
-import java.util.logging.Logger;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.Callback;
@@ -13,9 +15,11 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.config.SslConfigs;
 import org.eclipse.microprofile.config.Config;
 import org.eclipse.microprofile.config.ConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Producer {
-    static final Logger LOG = Logger.getLogger(Producer.class.getName());
+    static final Logger LOG = LoggerFactory.getLogger(Producer.class.getName());
 
     public static final String[] quotes = {
         "\"I agree with everything you say, but I would attack to the death your right to say it.\" -- Tom Stoppard (1937 - )",
@@ -45,7 +49,7 @@ public class Producer {
         Properties props = new Properties();
 
         // Standard mandatory configs.
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.bootstrap.server", String.class));
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("bootstrap.server", String.class));
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("security.protocol", String.class).orElse("PLAINTEXT"));
         if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
             props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore.location", String.class));
@@ -57,26 +61,67 @@ public class Producer {
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
 
         // Optional stuff.
-        props.put(ProducerConfig.ACKS_CONFIG, cf.getOptionalValue("producer.acks", String.class).orElse("all"));
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, cf.getOptionalValue("producer.max-inflight", String.class).orElse("5"));
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, cf.getOptionalValue("producer.idempotent", String.class).orElse("true"));
+        boolean idempotence = true;
+        if (cf.getOptionalValue("producer.acks", String.class).isPresent()) {
+            String acks = cf.getValue("producer.acks", String.class);
+            if (!acks.equals("all")) {
+                LOG.info("Setting idempotence to false as acks != all.");
+                idempotence = false;
+            }
+            props.put(ProducerConfig.ACKS_CONFIG, acks);
+        } else {
+            props.put(ProducerConfig.ACKS_CONFIG, "all");
+        }
+        if (cf.getOptionalValue("producer.max-inflight", Integer.class).isPresent()) {
+            Integer maxInflight = cf.getValue("producer.max-inflight", Integer.class);
+            if (maxInflight > 5) {
+                LOG.info("Setting idempotence to false as max-inflight > 5.");
+                idempotence = false;
+            }
+            props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInflight);
+        } else {
+            props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
+        }
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, cf.getOptionalValue("producer.idempotent", Boolean.class).orElse(idempotence));
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, cf.getOptionalValue("producer.batch", String.class).orElse("16384"));
         props.put(ProducerConfig.LINGER_MS_CONFIG, cf.getOptionalValue("producer.linger", String.class).orElse("0"));
         props.put(ProducerConfig.RETRIES_CONFIG, cf.getOptionalValue("producer.retries", String.class).orElse("2147483647"));
         props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.delivery-timeout", String.class).orElse("120000"));
+        // if (cf.getOptionalValue("producer.partitioner", String.class).isPresent()) {
+        //     switch (cf.getValue("producer.partitioner", String.class)) {
+        //         case ""
+        //     }
+        // }
 
         return props;
     }
     public static void main(String... args) {
+        // the remaining configurables
         String topic = ConfigProvider.getConfig().getOptionalValue("producer.topic", String.class).orElse("test-topic");
         int howManyrolls = ConfigProvider.getConfig().getOptionalValue("producer.num-rolls", Integer.class).orElse(1);
         int sendSize = ConfigProvider.getConfig().getOptionalValue("producer.num-records-per-roll", Integer.class).orElse(100);
         int waitAfterBatch = ConfigProvider.getConfig().getOptionalValue("producer.wait-after-roll", Integer.class).orElse(5000);
         int waitAfterSend = ConfigProvider.getConfig().getOptionalValue("producer.wait-after-send", Integer.class).orElse(500);
 
+        // keep a payload log for each run, truncate it
+        LOG.info("Opening payload log...");
+        PrintWriter pl;
+        try {
+            File payloadLog = new File("payload.log");
+            payloadLog.delete();
+            payloadLog.createNewFile();
+            pl = new PrintWriter(payloadLog);
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not (re)create payload log: " + ioe.getMessage());
+        }
+
+        // pick random quotes
         Random rnd = new Random();
+
+        // producer that will send the records
         KafkaProducer<Integer, String> prod = new KafkaProducer<>(configProperties());
 
+        LOG.info(String.format("Starting to produce %d records per roll, %d rolls...", sendSize, howManyrolls));
         for (int x = 0; x < howManyrolls; x++) {
             for (int y = 0; y < sendSize; y++) {
                 int idx = rnd.nextInt(quotes.length);
@@ -84,8 +129,9 @@ public class Producer {
                 prod.send(rec, new Callback() {
                     public void onCompletion(RecordMetadata rm, Exception e) {
                         if (e != null) {
-                            LOG.warning(e.getMessage());
+                            LOG.warn(e.getMessage());
                         } else {
+                            pl.println(String.format("%s,%d,%d,%s", rm.topic(), rm.partition(), rec.key(), rec.value()));
                             LOG.info(String.format("Sent: T:%s P:%d K:%d V:%s", rm.topic(), rm.partition(), rec.key(), rec.value()));
                         }
                     }
@@ -93,7 +139,7 @@ public class Producer {
                 try {
                     Thread.sleep(waitAfterSend);
                 } catch (InterruptedException ie) {
-                    LOG.warning("Interrupted in sleep-after-send: " + ie.getMessage());
+                    LOG.warn("Interrupted in sleep-after-send: " + ie.getMessage());
                 }
             }
             if (x < (howManyrolls - 1)) {
@@ -101,11 +147,12 @@ public class Producer {
                 try {
                     Thread.sleep(waitAfterBatch);
                 } catch (InterruptedException ie) {
-                    LOG.warning("Interrupted in sleep-after-roll: " + ie.getMessage());
+                    LOG.warn("Interrupted in sleep-after-roll: " + ie.getMessage());
                 }
             }
         }
 
         prod.close();
+        pl.close();;
     }
 }

+ 22 - 4
code/core-api-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -1,8 +1,9 @@
-kafka.bootstrap.server = localhost:9092,localhost:9192,localhost:9292
+bootstrap.server = localhost:9092,localhost:9192,localhost:9292
 
-security.protocol = PLAINTEXT               # this is the default
-ssl.truststore.location = bf-brokers.p12    # only used if the above is SSL
-ssl.truststore.password = FIXME             # ditto
+# PLAINTEXT is the default, use SSL and specify the below two if using it
+security.protocol = PLAINTEXT
+ssl.truststore.location = bf-brokers.p12
+ssl.truststore.password = FIXME
 
 # producer.topic =                          # test-topic
 # producer.num-rolls =                      # 1
@@ -17,3 +18,20 @@ ssl.truststore.password = FIXME             # ditto
 # producer.linger =                         # 0
 # producer.retries =                        # 2147483647
 # producer.delivery-timeout =               # 120000
+
+# consumer.topic =                          # test-topic
+# consumer.poll-period =                    # 1000
+# consumer.wait-after-recv =                # 0
+# consumer.wait-period =                    # 5000
+# consumer.local-id =                       # -1 (for log file only)
+# consumer.payload-trunc =                  # false
+
+# consumer.group-id =                       # test-app
+# consumer.instance-id =                    # null
+# consumer.auto-commit =                    # true
+# consumer.ac-interval =                    # 5000
+# consumer.fetch-min-bytes =                # 1
+# consumer.assignment-strategy =            # cooperative (range, rr, sticky)
+# consumer.heartbeat-interval =             # 3000
+# consumer.session-timeout =                # 45000
+# consumer.auto-offset-reset =              # latest

+ 27 - 0
code/get-logs.sh

@@ -0,0 +1,27 @@
+#!/bin/sh
+MYDIR=$(dirname $0)
+
+if [ ! -e "${MYDIR}/core-api-producer/payload.log" ]; then
+    echo "ERROR: Missing producer log."
+    exit 1
+fi
+if [ "$(echo ${MYDIR}/core-api-consumer/payload*.log)" = "${MYDIR}/core-api-consumer/payload*.log" ]; then
+    echo "ERROR: Missing consumer log."
+    exit 1
+fi
+
+if [ -e "${MYDIR}/producer.log" ] || [ -e "${MYDIR}/consumerl.log" ]; then
+    echo "WARNING: Existing logs will be overwritten. Do you want to continue?"
+    select resp in "Y" "N"; do
+        if [ "${resp}" = "N" ]; then
+            echo "Exiting."
+            exit 0
+        fi
+    done
+    rm -f "${MYDIR}/producer.log" "${MYDIR}/consumer.log"
+fi
+
+sort -gk2,3 -t, ${MYDIR}/core-api-producer/payload.log > ${MYDIR}/producer.log
+sort -gk2,3 -t, ${MYDIR}/core-api-consumer/payload*.log > ${MYDIR}/consumer.log
+rm -f ${MYDIR}/core-api-producer/payload.log ${MYDIR}/core-api-consumer/payload*.log
+echo "Producer and Consumer logs available in ${MYDIR} - old logs removed."

+ 1 - 1
code/pom.xml

@@ -17,7 +17,7 @@
         <quarkus.platform.group-id>com.redhat.quarkus.platform</quarkus.platform.group-id>
         <quarkus.platform.version>3.8.5.SP1-redhat-00001</quarkus.platform.version>
         <kafka.release>3.7.0.redhat-00007</kafka.release>
-        <slf4j.release>1.7.36</slf4j.release>
+        <slf4j.release>2.0.16</slf4j.release>
         <smallrye.release>3.9.1</smallrye.release>
         <skipITs>true</skipITs>
         <surefire-plugin.version>3.2.5</surefire-plugin.version>