Преглед на файлове

fixed default bootstrap servers, add slf4j api, not just simple, fix optional settings, add a bit more output

Grega Bremec преди 7 месеца
родител
ревизия
85c29e42f8

+ 5 - 6
sample-stream/pom.xml

@@ -26,12 +26,11 @@
         <artifactId>smallrye-config</artifactId>
         <version>3.5.4.redhat-00001</version>
     </dependency>
-    <!-- smallrye-config declares everything but this as transitive -->
-    <!-- <dependency>
-        <groupId>javax.annotation</groupId>
-        <artifactId>javax.annotation-api</artifactId>
-        <version>1.3.2</version>
-    </dependency> -->
+    <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-api</artifactId>
+        <version>2.0.7.redhat-00003</version>
+      </dependency>
     <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>

+ 7 - 5
sample-stream/src/main/java/com/redhat/training/kafka/LargePaymentConsumer.java

@@ -24,10 +24,11 @@ public class LargePaymentConsumer {
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "largePaymentConsumer");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
-        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
-
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("kafka.protocol", String.class).orElse("PLAINTEXT"));
+        if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
+            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
+            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
+        }
         return props;
     }
 
@@ -37,10 +38,11 @@ public class LargePaymentConsumer {
         String topic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.largepayments", String.class).orElse("large-payments");
         consumer.subscribe(Collections.singleton(topic));
 
+        System.out.println("Consuming large payment data from \"" + topic + "\"...");
         while (true) {
             ConsumerRecords<String, Integer> crs = consumer.poll(Duration.ofMillis(1000));
             for (ConsumerRecord<String, Integer> cr : crs) {
-                System.out.println("Got large payment: " + cr.value());
+                System.out.println("Got large payment: " + cr.key() + " -> " + cr.value());
             }
         }
     }

+ 8 - 9
sample-stream/src/main/java/com/redhat/training/kafka/RandomPaymentProducer.java

@@ -24,10 +24,11 @@ public class RandomPaymentProducer {
         props.put(ProducerConfig.ACKS_CONFIG, "1");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
-        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
-
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("kafka.protocol", String.class).orElse("PLAINTEXT"));
+        if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
+            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
+            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
+        }
         return props;
     }
 
@@ -37,6 +38,7 @@ public class RandomPaymentProducer {
 
         String topic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.payments", String.class).orElse("payments");
 
+        System.out.println("Sending random payment data to \"" + topic + "\"...");
         while (true) {
 	        int sum = random.nextInt(5000);
             ProducerRecord<String, Integer> record = new ProducerRecord<>(
@@ -44,16 +46,13 @@ public class RandomPaymentProducer {
                     sum
                 );
 
-            producer.send(record, new Callback() {
-                public void onCompletion(RecordMetadata rm, Exception e) {
-                        // if there was a problem, "e" will contain the exception that happened
+            producer.send(record, (rm, e) -> {
                         if (e != null) {
                                 System.out.println(e.getStackTrace());
                         } else {
 				            System.out.println("Sent new payment: " + record.value());
                         }
-                }
-            });
+                    });
 
             try {
                 Thread.sleep(1000);

+ 3 - 0
sample-stream/src/main/java/com/redhat/training/kafka/SimpleStream.java

@@ -94,7 +94,10 @@ public class SimpleStream {
                 cd.countDown();
             }
         });
+
+        // start the application
         try {
+            System.out.println("Starting payment stream processor...");
             str.start();
             cd.await();
         } catch (InterruptedException ie) {

+ 1 - 1
sample-stream/src/main/resources/META-INF/microprofile-config.properties

@@ -1,4 +1,4 @@
-kafka.server = localhost:9091
+kafka.server = localhost:9092,localhost:9192,localhost:9292
 # kafka.protocol = PLAINTEXT
 # ssl.truststore = path/to/truststore.jks
 # ssl.password = truststorepass