瀏覽代碼

sample ser/des for weather + enum, an avro avsc

Grega Bremec 7 月之前
父節點
當前提交
f1adea1e83

+ 55 - 0
weather-consumer/pom.xml

@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>weather-consumer</artifactId>
+  <version>1.0.1</version>
+  <name>weather consumer</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>17</maven.compiler.source>
+    <maven.compiler.target>17</maven.compiler.target>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>3.7.0.redhat-00007</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>2.0.7.redhat-00003</version>
+    </dependency>
+    <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>2.17.2.redhat-00001</version>
+      </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <mainClass>com.redhat.training.kafka.ConsumerApp</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 65 - 0
weather-consumer/src/main/java/com/redhat/training/kafka/ConsumerApp.java

@@ -0,0 +1,65 @@
+package com.redhat.training.kafka;
+
+import java.time.Duration;
+import java.util.Properties;
+import java.util.Collections;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+
+public class ConsumerApp
+{
+    public static void main(String[] args) {
+        Consumer<Void, Weather> consumer = new KafkaConsumer<>(configureProperties());
+        consumer.subscribe(Collections.singletonList("weather-forecast"));
+
+        while (true) {
+            ConsumerRecords<Void, Weather> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
+
+            for (ConsumerRecord<Void, Weather> record : records) {
+                System.out.println("Current weather forecast from partition " + record.partition() + "): " + record.value().toString());
+            }
+
+            // consumer.commitSync();
+            // consumer.commitAsync();
+        }
+    }
+
+    private static Properties configureProperties() {
+        Properties props = new Properties();
+
+        // configuration properties
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+            "localhost:9092,localhost:9192");
+            //"my-cluster-kafka-bootstrap-gcbtjd-kafka-cluster.apps.eu46a.prod.ole.redhat.com:443");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "weatherApp");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                     org.apache.kafka.common.serialization.VoidDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                        com.redhat.training.kafka.WeatherDeserializer.class.getName());
+        // props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 500);
+
+        // custom partition assignment strategy
+        // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        //                         org.apache.kafka.clients.consumer.RangeAssignor.class.getName());
+        // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        //                         org.apache.kafka.clients.consumer.RoundRobinAssignor.class.getName());
+        // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        //                         org.apache.kafka.clients.consumer.StickyAssignor.class.getName());
+        // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+        //                         org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName());
+
+        // TLS properties, if necessary
+        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/johndoe/AD482/truststore.jks");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+
+        return props;
+    }
+}

+ 19 - 0
weather-consumer/src/main/java/com/redhat/training/kafka/Weather.java

@@ -0,0 +1,19 @@
+package com.redhat.training.kafka;
+
+import java.io.Serializable;
+
+public class Weather implements Serializable {
+    private WeatherType weather;
+
+    public WeatherType getWeather() {
+        return weather;
+    }
+
+    public void setWeather(WeatherType weather) {
+        this.weather = weather;
+    }
+
+    public String toString() {
+        return "Weather will be " + this.getWeather().toString();
+    }
+}

+ 19 - 0
weather-consumer/src/main/java/com/redhat/training/kafka/WeatherDeserializer.java

@@ -0,0 +1,19 @@
+package com.redhat.training.kafka;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class WeatherDeserializer implements Deserializer<Weather> {
+    private ObjectMapper om = new ObjectMapper();
+
+    @Override
+    public Weather deserialize(String topic, byte[] data) {
+        try {
+            return om.readValue(data, Weather.class);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

+ 5 - 0
weather-consumer/src/main/java/com/redhat/training/kafka/WeatherType.java

@@ -0,0 +1,5 @@
+package com.redhat.training.kafka;
+
+public enum WeatherType {
+    SUNNY, CLOUDY, RAINY, MIXED, WEIRD;
+}

+ 55 - 0
weather-producer/pom.xml

@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>weather-producer</artifactId>
+  <version>1.0.1</version>
+  <name>weather producer</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <maven.compiler.source>17</maven.compiler.source>
+    <maven.compiler.target>17</maven.compiler.target>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>3.7.0.redhat-00007</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>2.0.7.redhat-00003</version>
+    </dependency>
+    <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>2.17.2.redhat-00001</version>
+      </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <mainClass>com.redhat.training.kafka.ProducerApp</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 14 - 0
weather-producer/src/main/avro/Weather.avsc

@@ -0,0 +1,14 @@
+{
+    "type": "record",
+    "name": "Weather",
+    "fields": [
+        {
+            "name": "weather",
+            "type": {
+                "type": "enum",
+                "name": "WeatherType",
+                "symbols": ["SUNNY", "CLOUDY", "RAINY", "MIXED", "WEIRD"]
+            }
+        }
+    ]
+}

+ 81 - 0
weather-producer/src/main/java/com/redhat/training/kafka/ProducerApp.java

@@ -0,0 +1,81 @@
+package com.redhat.training.kafka;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
+
+public class ProducerApp {
+    public static Properties configureProperties() {
+        Properties props = new Properties();
+
+        // the bootstrap server(s)
+        props.put(
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                "localhost:9092,localhost:9192"
+                // "my-cluster-kafka-bootstrap-gcbtjd-kafka-cluster.apps.eu46a.prod.ole.redhat.com:443"
+        );
+
+        // key and value serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                    org.apache.kafka.common.serialization.VoidSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                    com.redhat.training.kafka.WeatherSerializer.class.getName());
+
+        // configure the SSL connection (if necessary)
+        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+        //              "/home/johndoe/AD482/truststore.jks");
+        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
+
+        // acknowledgment level
+        // props.put(ProducerConfig.ACKS_CONFIG, "1"); // ask leader to ack
+
+        // use a custom partitioner
+        // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
+        //             org.apache.kafka.clients.producer.RoundRobinPartitioner.class.getName());
+
+        return props;
+    }
+
+    public static void main(String[] args) {
+        Producer<Void, Weather> producer = new KafkaProducer<>(configureProperties());
+        Random wgen = new Random();
+
+        while (true) {
+            Weather w = new Weather();
+            w.setWeather(WeatherType.values()[wgen.nextInt(WeatherType.values().length)]);
+            ProducerRecord<Void, Weather> r = new ProducerRecord<Void,Weather>("weather-forecast", w);
+            producer.send(r,
+                (rm, e) -> {
+                    if (e != null) {
+                        throw new RuntimeException(e);
+                    } else {
+                        printRecord(r, rm);
+                    }
+                });
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            }
+        }
+        // it's an endless loop, so this is unreachable code
+        // producer.close();
+    }
+
+    private static void printRecord(ProducerRecord<Void,Weather> record, RecordMetadata meta) {
+        System.out.println("Sent record:");
+        System.out.println("\tKey = " + record.key());
+        System.out.println("\tValue = " + record.value());
+        System.out.println("\tTopic = " + meta.topic());
+        System.out.println("\tPartition = " + meta.partition());
+    }
+}

+ 19 - 0
weather-producer/src/main/java/com/redhat/training/kafka/Weather.java

@@ -0,0 +1,19 @@
+package com.redhat.training.kafka;
+
+import java.io.Serializable;
+
+public class Weather implements Serializable {
+    private WeatherType weather;
+
+    public WeatherType getWeather() {
+        return weather;
+    }
+
+    public void setWeather(WeatherType weather) {
+        this.weather = weather;
+    }
+
+    public String toString() {
+        return "Weather will be " + this.getWeather().toString();
+    }
+}

+ 20 - 0
weather-producer/src/main/java/com/redhat/training/kafka/WeatherSerializer.java

@@ -0,0 +1,20 @@
+package com.redhat.training.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class WeatherSerializer implements Serializer<Weather> {
+    private ObjectMapper om = new ObjectMapper();
+
+    @Override
+    public byte[] serialize(String arg0, Weather arg1) {
+        try {
+            return om.writeValueAsBytes(arg1);
+        } catch (JsonProcessingException jpe) {
+            throw new RuntimeException(jpe);
+        }
+    }
+
+}

+ 5 - 0
weather-producer/src/main/java/com/redhat/training/kafka/WeatherType.java

@@ -0,0 +1,5 @@
+package com.redhat.training.kafka;
+
+public enum WeatherType {
+    SUNNY, CLOUDY, RAINY, MIXED, WEIRD;
+}