瀏覽代碼

sample quarkus weather data producer and consumer

Grega Bremec 7 月之前
父節點
當前提交
9789d91c52
共有 18 個文件被更改,包括 743 次插入0 次删除
  1. 129 0
      quarkus-weather-consumer/pom.xml
  2. 19 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/Weather.java
  3. 10 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/WeatherObjectDeserializer.java
  4. 5 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/WeatherType.java
  5. 44 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/KafkaClientConfigsProducer.java
  6. 75 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastConsumer.java
  7. 25 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastResource.java
  8. 27 0
      quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastStats.java
  9. 38 0
      quarkus-weather-consumer/src/main/resources/application.properties
  10. 129 0
      quarkus-weather-producer/pom.xml
  11. 54 0
      quarkus-weather-producer/src/main/docker/Dockerfile.jvm
  12. 51 0
      quarkus-weather-producer/src/main/docker/Dockerfile.legacy-jar
  13. 27 0
      quarkus-weather-producer/src/main/docker/Dockerfile.native
  14. 23 0
      quarkus-weather-producer/src/main/docker/Dockerfile.native-distroless
  15. 19 0
      quarkus-weather-producer/src/main/java/com/redhat/training/kafka/model/Weather.java
  16. 5 0
      quarkus-weather-producer/src/main/java/com/redhat/training/kafka/model/WeatherType.java
  17. 36 0
      quarkus-weather-producer/src/main/java/com/redhat/training/kafka/quarkus/ProducerApp.java
  18. 27 0
      quarkus-weather-producer/src/main/resources/application.properties

+ 129 - 0
quarkus-weather-consumer/pom.xml

@@ -0,0 +1,129 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>quarkus-weather-consumer</artifactId>
+  <version>1.0.0-SNAPSHOT</version>
+  <properties>
+    <compiler-plugin.version>3.12.1</compiler-plugin.version>
+    <maven.compiler.release>17</maven.compiler.release>
+    <surefire-plugin.version>3.2.5</surefire-plugin.version>
+    <skipITs>true</skipITs>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <!-- Community BOM -->
+    <!-- <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
+    <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id> -->
+    <!-- RHBoK BOM -->
+    <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
+    <quarkus.platform.group-id>com.redhat.quarkus.platform</quarkus.platform.group-id>
+    <quarkus.platform.version>3.8.5.SP1-redhat-00001</quarkus.platform.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>${quarkus.platform.group-id}</groupId>
+        <artifactId>${quarkus.platform.artifact-id}</artifactId>
+        <version>${quarkus.platform.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <dependencies>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-arc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-resteasy-jackson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-junit5</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>${quarkus.platform.group-id}</groupId>
+        <artifactId>quarkus-maven-plugin</artifactId>
+        <version>${quarkus.platform.version}</version>
+        <extensions>true</extensions>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build</goal>
+              <goal>generate-code</goal>
+              <goal>generate-code-tests</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${compiler-plugin.version}</version>
+        <configuration>
+            <compilerArgs>
+                <arg>-parameters</arg>
+            </compilerArgs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${surefire-plugin.version}</version>
+        <configuration>
+          <systemPropertyVariables>
+            <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+            <maven.home>${maven.home}</maven.home>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <version>${surefire-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
+                <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+                <maven.home>${maven.home}</maven.home>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+</plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>native</id>
+      <activation>
+        <property>
+          <name>native</name>
+        </property>
+      </activation>
+      <properties>
+        <skipITs>false</skipITs>
+        <quarkus.native.enabled>true</quarkus.native.enabled>
+      </properties>
+    </profile>
+  </profiles>
+</project>

+ 19 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/Weather.java

@@ -0,0 +1,19 @@
+package com.redhat.training.kafka.model;
+
+import java.io.Serializable;
+
+public class Weather implements Serializable {
+    private WeatherType weather;
+
+    public WeatherType getWeather() {
+        return weather;
+    }
+
+    public void setWeather(WeatherType weather) {
+        this.weather = weather;
+    }
+
+    public String toString() {
+        return "Weather will be " + this.getWeather().toString();
+    }
+}

+ 10 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/WeatherObjectDeserializer.java

@@ -0,0 +1,10 @@
+package com.redhat.training.kafka.model;
+
+import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
+
+public class WeatherObjectDeserializer extends ObjectMapperDeserializer<Weather> {
+
+    public WeatherObjectDeserializer() {
+        super(Weather.class);
+    }
+}

+ 5 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/model/WeatherType.java

@@ -0,0 +1,5 @@
+package com.redhat.training.kafka.model;
+
+public enum WeatherType {
+    SUNNY, CLOUDY, RAINY, MIXED, WEIRD;
+}

+ 44 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/KafkaClientConfigsProducer.java

@@ -0,0 +1,44 @@
+package com.redhat.training.kafka.quarkus;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.jboss.logging.Logger;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+
+@ApplicationScoped
+public class KafkaClientConfigsProducer {
+    private static final Logger LOG = Logger.getLogger(KafkaClientConfigsProducer.class);
+
+    @ConfigProperty(name = "consumer.client.id")
+    Optional<String> clientId;
+
+    @Produces
+    @Identifier("quarkus-consumer-configs")
+    public Map<String, Object> produceConsumerConfig() {
+        LOG.info("Producing consumer default settings...");
+
+        HashMap<String,Object> config = new HashMap<>();
+
+        if (this.clientId.isPresent()) {
+            LOG.warnv("Setting client ID from consumer.client.id: {0}", this.clientId.get());
+            config.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId.get());
+            config.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, this.clientId.get());
+        } else {
+            LOG.warn("Client ID not provided by consumer.client.id, not assigning.");
+        }
+        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName());
+
+        return config;
+    }
+}

+ 75 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastConsumer.java

@@ -0,0 +1,75 @@
+package com.redhat.training.kafka.quarkus;
+
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import com.redhat.training.kafka.model.Weather;
+
+import io.quarkus.logging.Log;
+import io.quarkus.runtime.StartupEvent;
+import io.smallrye.reactive.messaging.kafka.KafkaClientService;
+
+
+@Singleton
+public class WeatherForecastConsumer {
+
+    @Inject
+    WeatherForecastStats stats;
+
+    // Injection approach:
+    // @Inject
+    // @Channel("weather-forecast")
+    // Publisher<Weather> valuesPublisher;
+    // Multi<Weather> values;
+
+    // Do low-level client manipulation:
+    @Inject
+    KafkaClientService kcs;
+    void onStartup(@Observes StartupEvent se) {
+        Log.infov("Consumer starting up, consumer for channel \"weather-forecast\" has an ID of {0}",
+                    kcs.getConsumer("weather-forecast").configuration().get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+    }
+
+    @Incoming("weather-forecast")
+    @Acknowledgment(Acknowledgment.Strategy.MANUAL)
+    public CompletionStage<Void> processReading(Message<ConsumerRecord<String,Weather>> weather) {
+        String r = weather.getPayload().key();
+        Weather w = weather.getPayload().value();
+        stats.addForecast(w);
+        Log.infov("Received weather forecast run {0}: {1}", r, w.getWeather());
+
+        // use KafkaClientService directly to commit offsets to broker
+        // WARNING: this is VERY fishy as consumer is not thread-safe
+        // weather.getPayload().partition() -> this record's partition
+        // weather.getPayload().offset() -> this record's offset
+        // LOG.info("Committing current offsets...");
+        // kcs.getConsumer("weather-forecast").commit(Map<TopicPartition,OffsetAndMetadata>);
+        // kcs.getConsumer("weather-forecast").commitAsync(Map<TopicPartition,OffsetAndMetadata>);
+        return weather.ack();
+    }
+
+    // Above method can also be:
+    // public void consume(Weather forecast) {
+    // public Uni<Void> consume(Message<Weather> msg) {
+    // public Uni<Void> consume(ConsumerRecord<String, Weather> rec) {
+    // public CompletionStage<Void> processReading(Message<Weather> weather) {
+
+    // If long-running, annotate with either:
+    // @Blocking
+    // @Transactional
+    // public Multi<Integer> getEvents() {
+    //     // stats.add(humidityValue);
+    //     // System.out.println("Received humidity value: " + humidityValue);
+    //     return values;
+    // }
+
+}

+ 25 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastResource.java

@@ -0,0 +1,25 @@
+package com.redhat.training.kafka.quarkus;
+
+import java.util.Map;
+
+import com.redhat.training.kafka.model.WeatherType;
+
+import io.quarkus.logging.Log;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+
+@ApplicationScoped
+@Path("/weather")
+public class WeatherForecastResource {
+
+    @Inject
+    WeatherForecastStats stats;
+
+    @GET
+    public Map<WeatherType,Integer> getWeatherStats() {
+        Log.info("Returning current forecast histogram: " + stats.getHistogram());
+        return stats.getHistogram();
+    }
+}

+ 27 - 0
quarkus-weather-consumer/src/main/java/com/redhat/training/kafka/quarkus/WeatherForecastStats.java

@@ -0,0 +1,27 @@
+package com.redhat.training.kafka.quarkus;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.redhat.training.kafka.model.Weather;
+import com.redhat.training.kafka.model.WeatherType;
+
+import jakarta.inject.Singleton;
+
+@Singleton
+public class WeatherForecastStats {
+    HashMap<WeatherType,Integer> weatherStats = new HashMap<>();
+
+    public void addForecast(Weather w) {
+        int currentCount = 0;
+        if (this.weatherStats.keySet().contains(w.getWeather())) {
+            currentCount = this.weatherStats.get(w.getWeather());
+        }
+        currentCount++;
+        this.weatherStats.put(w.getWeather(), currentCount);
+    }
+
+    public Map<WeatherType,Integer> getHistogram() {
+        return this.weatherStats;
+    }
+}

+ 38 - 0
quarkus-weather-consumer/src/main/resources/application.properties

@@ -0,0 +1,38 @@
+# global bootstrap server list (there can be one per-channel)
+kafka.bootstrap.servers = localhost:9092,localhost:9192,localhost:9292
+
+# SSL/PLAINTEXT connection
+kafka.security.protocol = PLAINTEXT
+# kafka.ssl.truststore.location = ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jks
+# kafka.ssl.truststore.password = password
+
+# client id
+consumer.client.id = unique-345642
+
+# Kafka connector
+mp.messaging.incoming.weather-forecast.connector = smallrye-kafka
+mp.messaging.incoming.weather-forecast.topic = weather-forecast
+mp.messaging.incoming.weather-forecast.group.id = weatherApp
+mp.messaging.incoming.weather-forecast.poll-timeout = 1000
+
+# arbitrary client config can be provided by this bean as defaults, but is overridden by any property here
+mp.messaging.incoming.weather-forecast.kafka-configuration = quarkus-consumer-configs
+
+# mp.messaging.incoming.weather-forecast.commit-strategy = ignore/latest/throttled
+# mp.messaging.incoming.weather-forecast.throttled.unprocessed-record-max-age.ms = 60000
+
+# mp.messaging.incoming.weather-forecast.failure-strategy = fail/ignore/dead-letter-queue
+# mp.messaging.incoming.weather-forecast.dead-letter-queue.topic = DLQ
+# mp.messaging.incoming.weather-forecast.dead-letter-queue.key.serializer =
+# mp.messaging.incoming.weather-forecast.dead-letter-queue.value.serializer =
+
+mp.messaging.incoming.weather-forecast.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+mp.messaging.incoming.weather-forecast.value.deserializer = com.redhat.training.kafka.model.WeatherObjectDeserializer
+
+# AVRO & schema registry
+# mp.messaging.incoming.weather-forecast.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer
+# mp.messaging.incoming.weather-forecast.apicurio.registry.url = https:.../
+# mp.messaging.incoming.weather-forecast.apicurio.registry.use-specific-avro-reader = true
+
+# disable dev services
+quarkus.kafka.devservices.enabled=false

+ 129 - 0
quarkus-weather-producer/pom.xml

@@ -0,0 +1,129 @@
+<?xml version="1.0"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>com.redhat.training.kafka</groupId>
+  <artifactId>quarkus-weather-producer</artifactId>
+  <version>1.0.0-SNAPSHOT</version>
+  <properties>
+    <compiler-plugin.version>3.12.1</compiler-plugin.version>
+    <maven.compiler.release>17</maven.compiler.release>
+    <surefire-plugin.version>3.2.5</surefire-plugin.version>
+    <skipITs>true</skipITs>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <!-- Community BOM -->
+    <!-- <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
+    <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id> -->
+    <!-- RHBoK BOM -->
+    <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
+    <quarkus.platform.group-id>com.redhat.quarkus.platform</quarkus.platform.group-id>
+    <quarkus.platform.version>3.8.5.SP1-redhat-00001</quarkus.platform.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>${quarkus.platform.group-id}</groupId>
+        <artifactId>${quarkus.platform.artifact-id}</artifactId>
+        <version>${quarkus.platform.version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <dependencies>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-arc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-resteasy-jackson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.quarkus</groupId>
+      <artifactId>quarkus-junit5</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.rest-assured</groupId>
+      <artifactId>rest-assured</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>${quarkus.platform.group-id}</groupId>
+        <artifactId>quarkus-maven-plugin</artifactId>
+        <version>${quarkus.platform.version}</version>
+        <extensions>true</extensions>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build</goal>
+              <goal>generate-code</goal>
+              <goal>generate-code-tests</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${compiler-plugin.version}</version>
+        <configuration>
+            <compilerArgs>
+                <arg>-parameters</arg>
+            </compilerArgs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${surefire-plugin.version}</version>
+        <configuration>
+          <systemPropertyVariables>
+            <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+            <maven.home>${maven.home}</maven.home>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <version>${surefire-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
+                <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
+                <maven.home>${maven.home}</maven.home>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+</plugins>
+  </build>
+
+  <profiles>
+    <profile>
+      <id>native</id>
+      <activation>
+        <property>
+          <name>native</name>
+        </property>
+      </activation>
+      <properties>
+        <skipITs>false</skipITs>
+        <quarkus.native.enabled>true</quarkus.native.enabled>
+      </properties>
+    </profile>
+  </profiles>
+</project>

+ 54 - 0
quarkus-weather-producer/src/main/docker/Dockerfile.jvm

@@ -0,0 +1,54 @@
+####
+# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
+#
+# Before building the container image run:
+#
+# ./mvnw package
+#
+# Then, build the image with:
+#
+# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/kafka-quickstart-jvm .
+#
+# Then run the container using:
+#
+# docker run -i --rm -p 8080:8080 quarkus/kafka-quickstart-jvm
+#
+# If you want to include the debug port into your docker image
+# you will have to expose the debug port (default 5005) like this :  EXPOSE 8080 5050
+#
+# Then run the container using :
+#
+# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/kafka-quickstart-jvm
+#
+###
+FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 
+
+ARG JAVA_PACKAGE=java-11-openjdk-headless
+ARG RUN_JAVA_VERSION=1.3.8
+ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
+# Install java and the run-java script
+# Also set up permissions for user `1001`
+RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
+    && microdnf update \
+    && microdnf clean all \
+    && mkdir /deployments \
+    && chown 1001 /deployments \
+    && chmod "g+rwX" /deployments \
+    && chown 1001:root /deployments \
+    && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \
+    && chown 1001 /deployments/run-java.sh \
+    && chmod 540 /deployments/run-java.sh \
+    && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/lib/security/java.security
+
+# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size.
+ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
+# We make four distinct layers so if there are application changes the library layers can be re-used
+COPY --chown=1001 target/quarkus-app/lib/ /deployments/lib/
+COPY --chown=1001 target/quarkus-app/*.jar /deployments/
+COPY --chown=1001 target/quarkus-app/app/ /deployments/app/
+COPY --chown=1001 target/quarkus-app/quarkus/ /deployments/quarkus/
+
+EXPOSE 8080
+USER 1001
+
+ENTRYPOINT [ "/deployments/run-java.sh" ]

+ 51 - 0
quarkus-weather-producer/src/main/docker/Dockerfile.legacy-jar

@@ -0,0 +1,51 @@
+####
+# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode
+#
+# Before building the container image run:
+#
+# ./mvnw package -Dquarkus.package.type=legacy-jar
+#
+# Then, build the image with:
+#
+# docker build -f src/main/docker/Dockerfile.legacy-jar -t quarkus/kafka-quickstart-legacy-jar .
+#
+# Then run the container using:
+#
+# docker run -i --rm -p 8080:8080 quarkus/kafka-quickstart-legacy-jar
+#
+# If you want to include the debug port into your docker image
+# you will have to expose the debug port (default 5005) like this :  EXPOSE 8080 5050
+#
+# Then run the container using :
+#
+# docker run -i --rm -p 8080:8080 -p 5005:5005 -e JAVA_ENABLE_DEBUG="true" quarkus/kafka-quickstart-legacy-jar
+#
+###
+FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3 
+
+ARG JAVA_PACKAGE=java-11-openjdk-headless
+ARG RUN_JAVA_VERSION=1.3.8
+ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en'
+# Install java and the run-java script
+# Also set up permissions for user `1001`
+RUN microdnf install curl ca-certificates ${JAVA_PACKAGE} \
+    && microdnf update \
+    && microdnf clean all \
+    && mkdir /deployments \
+    && chown 1001 /deployments \
+    && chmod "g+rwX" /deployments \
+    && chown 1001:root /deployments \
+    && curl https://repo1.maven.org/maven2/io/fabric8/run-java-sh/${RUN_JAVA_VERSION}/run-java-sh-${RUN_JAVA_VERSION}-sh.sh -o /deployments/run-java.sh \
+    && chown 1001 /deployments/run-java.sh \
+    && chmod 540 /deployments/run-java.sh \
+    && echo "securerandom.source=file:/dev/urandom" >> /etc/alternatives/jre/lib/security/java.security
+
+# Configure the JAVA_OPTIONS, you can add -XshowSettings:vm to also display the heap size.
+ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
+COPY target/lib/* /deployments/lib/
+COPY target/*-runner.jar /deployments/app.jar
+
+EXPOSE 8080
+USER 1001
+
+ENTRYPOINT [ "/deployments/run-java.sh" ]

+ 27 - 0
quarkus-weather-producer/src/main/docker/Dockerfile.native

@@ -0,0 +1,27 @@
+####
+# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode
+#
+# Before building the container image run:
+#
+# ./mvnw package -Pnative
+#
+# Then, build the image with:
+#
+# docker build -f src/main/docker/Dockerfile.native -t quarkus/kafka-quickstart .
+#
+# Then run the container using:
+#
+# docker run -i --rm -p 8080:8080 quarkus/kafka-quickstart
+#
+###
+FROM registry.access.redhat.com/ubi8/ubi-minimal:8.3
+WORKDIR /work/
+RUN chown 1001 /work \
+    && chmod "g+rwX" /work \
+    && chown 1001:root /work
+COPY --chown=1001:root target/*-runner /work/application
+
+EXPOSE 8080
+USER 1001
+
+CMD ["./application", "-Dquarkus.http.host=0.0.0.0"]

+ 23 - 0
quarkus-weather-producer/src/main/docker/Dockerfile.native-distroless

@@ -0,0 +1,23 @@
+####
+# This Dockerfile is used in order to build a distroless container that runs the Quarkus application in native (no JVM) mode
+#
+# Before building the container image run:
+#
+# ./mvnw package -Pnative
+#
+# Then, build the image with:
+#
+# docker build -f src/main/docker/Dockerfile.native-distroless -t quarkus/kafka-quickstart .
+#
+# Then run the container using:
+#
+# docker run -i --rm -p 8080:8080 quarkus/kafka-quickstart
+#
+###
+FROM quay.io/quarkus/quarkus-distroless-image:1.0
+COPY target/*-runner /application
+
+EXPOSE 8080
+USER nonroot
+
+CMD ["./application", "-Dquarkus.http.host=0.0.0.0"]

+ 19 - 0
quarkus-weather-producer/src/main/java/com/redhat/training/kafka/model/Weather.java

@@ -0,0 +1,19 @@
+package com.redhat.training.kafka.model;
+
+import java.io.Serializable;
+
+public class Weather implements Serializable {
+    private WeatherType weather;
+
+    public WeatherType getWeather() {
+        return weather;
+    }
+
+    public void setWeather(WeatherType weather) {
+        this.weather = weather;
+    }
+
+    public String toString() {
+        return "Weather will be " + this.getWeather().toString();
+    }
+}

+ 5 - 0
quarkus-weather-producer/src/main/java/com/redhat/training/kafka/model/WeatherType.java

@@ -0,0 +1,5 @@
+package com.redhat.training.kafka.model;
+
+public enum WeatherType {
+    SUNNY, CLOUDY, RAINY, MIXED, WEIRD;
+}

+ 36 - 0
quarkus-weather-producer/src/main/java/com/redhat/training/kafka/quarkus/ProducerApp.java

@@ -0,0 +1,36 @@
+package com.redhat.training.kafka.quarkus;
+
+import java.time.Duration;
+import java.util.Random;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.kafka.Record;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.jboss.logging.Logger;
+
+import com.redhat.training.kafka.model.Weather;
+import com.redhat.training.kafka.model.WeatherType;
+
+@ApplicationScoped
+public class ProducerApp {
+    private static final Logger LOG = Logger.getLogger(ProducerApp.class);
+
+    private final Random random = new Random();
+
+    @Outgoing("weather-forecast")
+    public Multi<Record<String, Weather>> generate() {
+        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
+                .onOverflow().drop()
+                .map(tick -> {
+                    String fcast = "fcast-run-" + random.nextInt(10);
+
+                    Weather w = new Weather();
+                    w.setWeather(WeatherType.values()[random.nextInt(WeatherType.values().length)]);
+
+                    LOG.infov("Forecast run: {0}, forecast: {1}", fcast, w);
+                    return Record.of(fcast, w);
+                });
+    }
+}

+ 27 - 0
quarkus-weather-producer/src/main/resources/application.properties

@@ -0,0 +1,27 @@
+# global bootstrap server list (there can be one per-channel)
+kafka.bootstrap.servers = localhost:9092,localhost:9192,localhost:9292
+
+# SSL/PLAINTEXT connection
+kafka.security.protocol = PLAINTEXT
+# kafka.ssl.truststore.location = ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jks
+# kafka.ssl.truststore.password = password
+
+# Kafka connector + some low-level configuration
+mp.messaging.outgoing.weather-forecast.connector = smallrye-kafka
+mp.messaging.outgoing.weather-forecast.topic = weather-forecast
+mp.messaging.outgoing.weather-forecast.acks = 1
+mp.messaging.outgoing.weather-forecast.max-inflight-messages = 5
+
+# arbitrary client config can be provided by this bean as defaults, but is overridden by any property here
+# mp.messaging.outgoing.weather-forecast.kafka-configuration = bean-identifier-of-producer-class
+
+# key and value serializers
+mp.messaging.outgoing.weather-forecast.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+mp.messaging.outgoing.weather-forecast.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer
+
+# AVRO & schema registry
+# mp.messaging.outgoing.weather-forecast.value.serializer = io.apicurio.registry.utils.serde.AvroKafkaSerializer
+# mp.messaging.outgoing.weather-forecast.schema.registry.url = https://../
+
+# disable Quarkus dev services
+quarkus.kafka.devservices.enabled = false