Browse Source

implemented stream splitting, added src data producer and result consumer

Grega Bremec 2 năm trước cách đây
mục cha
commit
10d39455f4

+ 47 - 0
streams/src/main/java/com/redhat/training/kafka/LargePaymentConsumer.java

@@ -0,0 +1,47 @@
+package com.redhat.training.kafka;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SslConfigs;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class LargePaymentConsumer {
+    public static Properties configureProperties() {
+        Config cf = ConfigProvider.getConfig();
+
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.server", String.class));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "largePaymentConsumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
+
+        return props;
+    }
+
+    public static void main(String... args) {
+        Consumer<String, Integer> consumer = new KafkaConsumer<>(configureProperties());
+
+        String topic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.largepayments", String.class).orElse("large-payments");
+        consumer.subscribe(Collections.singleton(topic));
+
+        while (true) {
+            ConsumerRecords<String, Integer> crs = consumer.poll(Duration.ofMillis(1000));
+            for (ConsumerRecord<String, Integer> cr : crs) {
+                System.out.println("Got large payment: " + cr.value());
+            }
+        }
+    }
+}

+ 66 - 0
streams/src/main/java/com/redhat/training/kafka/RandomPaymentProducer.java

@@ -0,0 +1,66 @@
+package com.redhat.training.kafka;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class RandomPaymentProducer {
+    public static Properties configureProperties() {
+        Config cf = ConfigProvider.getConfig();
+
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.server", String.class));
+        props.put(ProducerConfig.ACKS_CONFIG, "1");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
+
+        return props;
+    }
+
+    public static void main(String... args) {
+        Random random = new Random();
+        Producer<String, Integer> producer = new KafkaProducer<>(configureProperties());
+
+        String topic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.payments", String.class).orElse("payments");
+
+        while (true) {
+	        int sum = random.nextInt(5000);
+            ProducerRecord<String, Integer> record = new ProducerRecord<>(
+                    topic,
+                    sum
+                );
+
+            producer.send(record, new Callback() {
+                public void onCompletion(RecordMetadata rm, Exception e) {
+                        // if there was a problem, "e" will contain the exception that happened
+                        if (e != null) {
+                                System.out.println(e.getStackTrace());
+                        } else {
+				            System.out.println("Sent new payment: " + record.value());
+                        }
+                }
+            });
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+                break;
+            }
+        }
+        producer.close();
+    }
+}

+ 19 - 9
streams/src/main/java/com/redhat/training/kafka/SimpleStream.java

@@ -40,30 +40,40 @@ public class SimpleStream {
     public static void main(String[] args) {
         // Check properties first.
         Properties cfg = configureProperties();
+        String srcTopic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.payments", String.class).orElse("payments");
+        String dstTopic = ConfigProvider.getConfig().getOptionalValue("kafka.topic.largepayments", String.class).orElse("large-payments");
 
         Serde<String> ks = Serdes.String();
         Serde<Integer> vs = Serdes.Integer();
 
         StreamsBuilder b = new StreamsBuilder();
 
-        KStream<String, Integer> src = b.stream("payments", Consumed.with(ks, vs));
+        // just logs everything received
+        // b.stream(srcTopic, Consumed.with(ks, vs))
+        //         .foreach((key, val) -> System.out.println("Received key: " + key + ", value: " + val));
 
+        // this is the same for the following two examples
+        KStream<String, Integer> src = b.stream(srcTopic, Consumed.with(ks, vs));
+
+        // logs everything and sends records above $foo for further processing using a filter
+        // src.foreach((key, val) -> System.out.println("Received key: " + key + ", value: " + val));
+        // src.filter((key, value) -> value > 2500)
+        //         .to(dstTopic, Produced.with(ks, vs));
+
+        // uses the split processor (2.8.0+) to create substreams per-criteria and attach processors to them
         Map<String, KStream<String, Integer>> splits = src.split(Named.as("stream-"))
-                        .branch((k, v) -> true, Branched.as("orig"))
-                        .branch((k, v) -> true, Branched.as("copy"))
-                        .noDefaultBranch();
+                        .branch((k, v) -> v <= 2500, Branched.as("log"))
+                        .defaultBranch(Branched.as("proc"));
 
         System.out.println("Got the following streams:");
         for (String x : splits.keySet()) {
             System.out.println(" - " + x);
         }
 
-        splits.get("stream-copy")
-                .foreach((key, val) -> System.out.println("Received key: " + key + ", value: " + val));
+        splits.get("stream-log").foreach((key, val) -> System.out.println("Received LOW PAYMENT key: " + key + ", value: " + val));
 
-        splits.get("stream-orig")
-                .filter((key, value) -> value > 1000)
-                .to("large-payments", Produced.with(ks, vs));
+        splits.get("stream-proc").foreach((key, val) -> System.out.println("Received HIGH PAYMENT key: " + key + ", value: " + val));
+        splits.get("stream-proc").to(dstTopic, Produced.with(ks, vs));
 
         Topology t = b.build();
         TopologyDescription td = t.describe();