Parcourir la source

initial import

Grega Bremec il y a 2 ans
commit
24789a17b5

+ 12 - 0
.gitignore

@@ -0,0 +1,12 @@
+.DS_Store
+.*.sw?
+*.vim
+.vscode
+.venv
+target
+bin
+obj
+tmp*
+broker
+truststore.jks
+*-workspace.json

+ 30 - 0
consumer/pom.xml

@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>sample-consumer</artifactId>
+  <version>1.0</version>
+  <name>consumer</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>11</maven.compiler.source>
+    <maven.compiler.target>11</maven.compiler.target>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>2.7.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.30</version>
+    </dependency>
+  </dependencies>
+</project>

+ 49 - 0
consumer/src/main/java/com/redhat/training/kafka/ConsumerApp.java

@@ -0,0 +1,49 @@
+package com.redhat.training.kafka;
+
+import java.time.Duration;
+import java.util.Properties;
+import java.util.Collections;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+
+public class ConsumerApp
+{
+    public static void main(String[] args) {
+        Consumer<Integer, String> consumer = new KafkaConsumer<>(configureProperties());
+        consumer.subscribe(Collections.singletonList("my-topic"));
+
+        while (true) {
+            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+
+            for (ConsumerRecord<Integer, String> record : records) {
+                System.out.println("Received quote (partition " + record.partition() + "): " + record.key() + " -> " + record.value());
+            }
+        }
+    }
+
+    private static Properties configureProperties() {
+        Properties props = new Properties();
+
+        // TODO: Add Kafka configuration properties
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            "localhost:9092");
+            //"my-cluster-kafka-bootstrap-gcbtjd-kafka-cluster.apps.eu46a.prod.ole.redhat.com:443");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "famousQuotes");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 500);
+        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/johndoe/AD482/truststore.jks");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+
+        return props;
+    }
+}

+ 35 - 0
producer/pom.xml

@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>sample-producer</artifactId>
+  <version>1.0</version>
+  <name>producer</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>11</maven.compiler.source>
+    <maven.compiler.target>11</maven.compiler.target>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>2.7.0</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.13.3</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.30</version>
+    </dependency>
+  </dependencies>
+</project>

+ 121 - 0
producer/src/main/java/com/redhat/training/kafka/ProducerApp.java

@@ -0,0 +1,121 @@
+package com.redhat.training.kafka;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
+
+public class ProducerApp {
+    public static final String[] quotes = {
+"\"I agree with everything you say, but I would attack to the death your right to say it.\" -- Tom Stoppard (1937 - )",
+"\"Any fool can tell the truth, but it requires a man of some sense to know how to lie well.\" -- Samuel Butler (1835 - 1902)",
+"\"There is no nonsense so gross that society will not, at some time, make a doctrine of it and defend it with every weapon of communal stupidity.\" -- Robertson Davies",
+"\"The nation behaves well if it treats the natural resources as assets which it must turn over to the next generation increased, and not impaired, in value.\" -- Theodore Roosevelt (1858 - 1919), Speech before the Colorado Live Stock Association, Denver, Colorado, August 19, 1910",
+"\"I wish you sunshine on your path and storms to season your journey. I wish you peace in the world in which you live... More I cannot wish you except perhaps love to make all the rest worthwhile.\" -- Robert A. Ward",
+"\"Fall seven times, stand up eight.\" -- Japanese Proverb",
+"\"I think the world is run by 'C' students.\" -- Al McGuire",
+"\"What makes the engine go? Desire, desire, desire.\" -- Stanley Kunitz, O Magazine, September 2003",
+"\"Do not pursue what is illusory - property and position: all that is gained at the expense of your nerves decade after decade and can be confiscated in one fell night. Live with a steady superiority over life - don't be afraid of misfortune, and do not yearn after happiness; it is after all, all the same: the bitter doesn't last forever, and the sweet never fills the cup to overflowing.\" -- Alexander Solzhenitsyn (1918 - )",
+"\"Anyone who goes to a psychiatrist ought to have his head examined.\" -- Samuel Goldwyn (1882 - 1974)",
+"\"An expert is a person who has made all the mistakes that can be made in a very narrow field.\" -- Niels Bohr (1885 - 1962)",
+"\"It's going to come true like you knew it, but it's not going to feel like you think.\" -- Rosie O'Donnell, Today Show interview, 04-08-08",
+"\"Your primary goal should be to have a great life. You can still have a good day, enjoy your child, and ultimately find happiness, whether your ex is acting like a jerk or a responsible person. Your happiness is not dependent upon someone else.\" -- Julie A., M.A. Ross and Judy Corcoran, Joint Custody with a Jerk: Raising a Child with an Uncooperative Ex, 2011",
+"\"You have to keep plugging away. We are all growing. There is no shortcut. You have to put time into it to build an audience\" -- John Gruber, How to Blog for Money by Learning from Comics, SXSW 2006",
+"\"We are advertis'd by our loving friends.\" -- William Shakespeare (1564 - 1616)",
+"\"We shall find peace. We shall hear the angels, we shall see the sky sparkling with diamonds.\" -- Anton Chekhov (1860 - 1904), 1897",
+"\"Do not be fooled into believing that because a man is rich he is necessarily smart. There is ample proof to the contrary.\" -- Julius Rosenwald (1862 - 1932)",
+"\"See, that's all you're thinking about, is winning. You're confirming your sense of self- worth through outward reward instead of through inner appreciation.\" -- Barbara Hall, Northern Exposure, Gran Prix, 1994",
+"\"If you think you can do a thing or think you can't do a thing, you're right.\" -- Henry Ford (1863 - 1947), (attributed)",
+"\"I am here and you will know that I am the best and will hear me.\" -- Leontyne Price, O Magazine, December 2003 ",
+    };
+
+    public static Properties configureProperties() {
+        Properties props = new Properties();
+
+        // TODO: configure the bootstrap server
+        props.put(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                "localhost:9092,localhost:9192"
+                // "my-cluster-kafka-bootstrap-gcbtjd-kafka-cluster.apps.eu46a.prod.ole.redhat.com:443"
+        );
+
+        // TODO: configure the key and value serializers
+        props.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.IntegerSerializer"
+        );
+        props.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer"
+        );
+
+        // TODO: configure the SSL connection
+        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        // props.put(
+        //         SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+        //         "/home/johndoe/AD482/truststore.jks"
+        // );
+        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+
+        // acknowledgment level
+        props.put(ProducerConfig.ACKS_CONFIG, "1"); // ask leader to ack
+
+        return props;
+    }
+
+    public static void main(String[] args) {
+        Random random = new Random();
+        Producer<Integer, String> producer = new KafkaProducer<>(
+                configureProperties()
+        );
+
+        for (int i = 0; i < 100; i++) {
+	        int idx = random.nextInt(quotes.length);
+            ProducerRecord<Integer, String> record = new ProducerRecord<>(
+                    "my-topic",
+                    idx,
+		            quotes[idx]
+            );
+
+            // fire-and-forget
+            //producer.send(record);
+
+            // synchronous
+        //     try {
+        //         producer.send(record).get();
+        //     } catch (Exception e) {
+        //         e.printStackTrace();
+        //     }
+
+            // asynchronous
+            producer.send(record, new Callback() {
+                public void onCompletion(RecordMetadata rm, Exception e) {
+                        // if there was a problem, "e" will contain the exception that happened
+                        if (e != null) {
+                                System.out.println(e.getStackTrace());
+                        } else {
+				            printRecord(record, rm);
+                        }
+                }
+            });
+
+        }
+
+        producer.close();
+    }
+
+    private static void printRecord(ProducerRecord record, RecordMetadata meta) {
+        System.out.println("Sent record:");
+        System.out.println("\tKey = " + record.key());
+        System.out.println("\tValue = " + record.value());
+        System.out.println("\tTopic = " + meta.topic());
+        System.out.println("\tPartition = " + meta.partition());
+    }
+}

+ 46 - 0
streams/pom.xml

@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>sample-stream</artifactId>
+  <version>1.0</version>
+  <name>producer</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>11</maven.compiler.source>
+    <maven.compiler.target>11</maven.compiler.target>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>2.8.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams</artifactId>
+      <version>2.8.0</version>
+    </dependency>
+    <dependency>
+        <groupId>io.smallrye.config</groupId>
+        <artifactId>smallrye-config</artifactId>
+        <version>2.13.3</version>
+    </dependency>
+    <!-- smallrye-config declares everything but this as transitive -->
+    <dependency>
+        <groupId>javax.annotation</groupId>
+        <artifactId>javax.annotation-api</artifactId>
+        <version>1.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.7.30</version>
+    </dependency>
+  </dependencies>
+</project>

+ 89 - 0
streams/src/main/java/com/redhat/training/kafka/SimpleStream.java

@@ -0,0 +1,89 @@
+package com.redhat.training.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyDescription;
+import org.apache.kafka.streams.kstream.Branched;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.Produced;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class SimpleStream {
+
+    public static Properties configureProperties() {
+        Config cf = ConfigProvider.getConfig();
+
+        Properties props = new Properties();
+
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simpleStreamProcessor");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.server", String.class));
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
+
+        return props;
+    }
+
+    public static void main(String[] args) {
+        // Check properties first.
+        Properties cfg = configureProperties();
+
+        Serde<String> ks = Serdes.String();
+        Serde<Integer> vs = Serdes.Integer();
+
+        StreamsBuilder b = new StreamsBuilder();
+
+        KStream<String, Integer> src = b.stream("payments", Consumed.with(ks, vs));
+
+        Map<String, KStream<String, Integer>> splits = src.split(Named.as("stream-"))
+                        .branch((k, v) -> true, Branched.as("orig"))
+                        .branch((k, v) -> true, Branched.as("copy"))
+                        .noDefaultBranch();
+
+        System.out.println("Got the following streams:");
+        for (String x : splits.keySet()) {
+            System.out.println(" - " + x);
+        }
+
+        splits.get("stream-copy")
+                .foreach((key, val) -> System.out.println("Received key: " + key + ", value: " + val));
+
+        splits.get("stream-orig")
+                .filter((key, value) -> value > 1000)
+                .to("large-payments", Produced.with(ks, vs));
+
+        Topology t = b.build();
+        TopologyDescription td = t.describe();
+        System.out.println("**** TOPOLOGY ****\n" + td.toString());
+
+        KafkaStreams str = new KafkaStreams(t, cfg);
+        final CountDownLatch cd = new CountDownLatch(1);
+        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown") {
+            @Override
+            public void run() {
+                str.close();
+                cd.countDown();
+            }
+        });
+
+        try {
+            str.start();
+            cd.await();
+        } catch (InterruptedException ie) {
+            System.out.println("Interrupted during await()...");
+        }
+    }
+}

+ 3 - 0
streams/src/main/resources/META-INF/microprofile-config.properties

@@ -0,0 +1,3 @@
+kafka.server = my-kafka-cluster:port
+ssl.truststore = path/to/truststore.jks
+ssl.password = truststorepass