Selaa lähdekoodia

refactor producer into three separate ones, add mvn exec profiles, introduce config

Grega Bremec 7 kuukautta sitten
vanhempi
commit
6fcf09db70

+ 116 - 53
payments-producer/pom.xml

@@ -1,59 +1,122 @@
 <?xml version="1.0" encoding="UTF-8"?>
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-  <groupId>com.redhat.training.kafka.coreapi</groupId>
-  <artifactId>payments-producer</artifactId>
-  <version>1.0.0-SNAPSHOT</version>
-  <name>payment producer</name>
-  <packaging>jar</packaging>
+    <groupId>com.redhat.training.kafka.coreapi</groupId>
+    <artifactId>payments-producer</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <name>payment producer</name>
+    <packaging>jar</packaging>
 
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <maven.compiler.source>17</maven.compiler.source>
-    <maven.compiler.target>17</maven.compiler.target>
-  </properties>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+    </properties>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>3.7.0.redhat-00007</version>
-    </dependency>
-    <dependency>
-        <groupId>org.slf4j</groupId>
-        <artifactId>slf4j-api</artifactId>
-        <version>2.0.7.redhat-00003</version>
-      </dependency>
-      <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-simple</artifactId>
-      <version>2.0.7.redhat-00003</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-      <version>2.17.2.redhat-00001</version>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-        <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>exec-maven-plugin</artifactId>
-            <version>3.4.1</version>
-            <executions>
-                <execution>
-                    <goals>
-                        <goal>java</goal>
-                    </goals>
-                </execution>
-            </executions>
-            <configuration>
-                <mainClass>com.redhat.training.kafka.coreapi.ProducerApp</mainClass>
-            </configuration>
-        </plugin>
-    </plugins>
-</build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.7.0.redhat-00007</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>2.0.7.redhat-00003</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>2.0.7.redhat-00003</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.17.2.redhat-00001</version>
+        </dependency>
+        <dependency>
+            <groupId>io.smallrye.config</groupId>
+            <artifactId>smallrye-config</artifactId>
+            <version>3.5.4.redhat-00001</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <mainClass>com.redhat.training.kafka.coreapi.PaymentProducer</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <profiles>
+        <profile>
+            <id>risk-assessment-updates</id>
+            <activation>
+                <property>
+                    <name>risk-assessment-updates</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>3.4.1</version>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>java</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <mainClass>com.redhat.training.kafka.coreapi.RiskAssessmentProducer</mainClass>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>bank-account-data</id>
+            <activation>
+                <property>
+                    <name>bank-account-data</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>exec-maven-plugin</artifactId>
+                        <version>3.4.1</version>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>java</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                        <configuration>
+                            <mainClass>com.redhat.training.kafka.coreapi.BankAccountProducer</mainClass>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
 </project>

+ 48 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/BankAccountProducer.java

@@ -0,0 +1,48 @@
+package com.redhat.training.kafka.coreapi;
+
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+import com.redhat.training.kafka.model.BankAccount;
+
+public class BankAccountProducer {
+    private static final Logger LOG = Logger.getLogger(BankAccountProducer.class.getName());
+
+    public static void main(String... args) {
+        LOG.info("Initializing account data...");
+
+        String topic = ConfigProvider.getConfig().getOptionalValue("topic.bank-accounts", String.class).orElse("account-data");
+
+        Producer<String,BankAccount> adp = new KafkaProducer<>(ProducerSettings.configureAccountProperties());
+
+        int ctr = 0;
+        while (true) {
+            BankAccount ba = GeneratedData.getBankAccount(ctr);
+            if (ba == null) {
+                break;
+            } else {
+                ctr++;
+            }
+
+            try {
+                LOG.info("Sending account: " + ba.toString());
+                ProducerRecord<String,BankAccount> ad = new ProducerRecord<String,BankAccount>(topic, ba.getAccountNumber(), ba);
+                adp.send(ad).get();
+            } catch (ExecutionException | InterruptedException e) {
+                LOG.warning(e.getMessage());
+                continue;
+            }
+        }
+
+        LOG.info("Done sending account data. Closing producer.");
+
+        adp.close();
+
+        LOG.info("Finished. Good bye.");
+    }
+}

+ 63 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/GeneratedData.java

@@ -0,0 +1,63 @@
+package com.redhat.training.kafka.coreapi;
+
+import java.util.Random;
+
+import com.redhat.training.kafka.model.BankAccount;
+
+public class GeneratedData {
+    private static final String[] accounts = {
+        "4f1fabc1-2dfc-475d-ad59-dbe9be76f381",
+        "c2119898-eae8-45a8-b24a-83e964c3440f",
+        "a29112f1-ffc8-486d-b8aa-07f14daa4ea1",
+        "961eb104-ef35-46f6-9fa5-9493513157ca",
+        "70998997-6acf-43f5-98c7-41315975c5cc",
+        "96686115-6ca7-4739-9198-5dd52084f563",
+        "cc151e37-694c-46ef-a5e5-3ece1939485c",
+        "8ae565a0-0d76-464b-8f32-be4a116c0d4c",
+        "ea4e728a-a33c-4fcc-a43b-aba37b58f598",
+        "8e81d57f-eb56-4a39-80c3-89b0019ea316",
+    };
+
+    private static final String[] usernames = {
+        "jdoe",
+        "janed",
+        "tjones",
+        "ljohnson",
+        "mikep",
+        "catbat",
+        "qmd",
+        "py",
+        "aletter",
+        "abug",
+    };
+
+    private static final String[] userfullnames = {
+        "John Doe",
+        "Jane Doe",
+        "Tom Jones",
+        "Linda Johnson",
+        "Mike Pearson",
+        "Cathy Bates",
+        "Quasi Modo",
+        "老百姓",
+        "Anita Letterback",
+        "Aida Bugg",
+    };
+
+    private static final Random rand = new Random();
+
+    public static String getRandomCustomerId() {
+        return usernames[rand.nextInt(usernames.length)];
+    }
+
+    public static String getRandomAccountId() {
+        return accounts[rand.nextInt(accounts.length)];
+    }
+
+    public static BankAccount getBankAccount(int id) {
+        if (id >= accounts.length) {
+            return null;
+        }
+        return new BankAccount(accounts[id], usernames[id], userfullnames[id], 0);
+    }
+}

+ 48 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/PaymentProducer.java

@@ -0,0 +1,48 @@
+package com.redhat.training.kafka.coreapi;
+
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class PaymentProducer {
+    static final Logger LOG = Logger.getLogger(PaymentProducer.class.getName());
+
+    public static void main(String[] args) {
+        LOG.info("Sending payment transaction information...");
+
+        Random random = new Random();
+        String topic = ConfigProvider.getConfig().getOptionalValue("topic.payments", String.class).orElse("payments");
+
+        Producer<String,Integer> pp = new KafkaProducer<>(ProducerSettings.configurePaymentProperties());
+        while (true) {
+            String k = GeneratedData.getRandomAccountId();
+            Integer v = random.nextInt(100000);
+            try {
+                LOG.info("Sending payment: " + k + " -> " + v);
+                ProducerRecord<String,Integer> pr = new ProducerRecord<String,Integer>(topic, k, v);
+                pp.send(pr).get();
+            } catch (ExecutionException | InterruptedException e) {
+                LOG.warning(e.getMessage());
+                continue;
+            }
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+                LOG.warning(ie.getMessage());
+                break;
+            }
+        }
+
+        LOG.info("Done sending payment transactions. Closing producer.");
+
+        pp.close();
+
+        LOG.info("Finished. So long.");
+    }
+}

+ 0 - 191
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/ProducerApp.java

@@ -1,191 +0,0 @@
-package com.redhat.training.kafka.coreapi;
-
-import java.util.Properties;
-import java.util.Random;
-import java.util.logging.Logger;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import com.redhat.training.kafka.model.BankAccount;
-import com.redhat.training.kafka.model.RiskAssessment;
-
-public class ProducerApp {
-    static final Logger LOG = Logger.getLogger(ProducerApp.class.getName());
-
-    private static final String[] accounts = {
-        "4f1fabc1-2dfc-475d-ad59-dbe9be76f381",
-        "c2119898-eae8-45a8-b24a-83e964c3440f",
-        "a29112f1-ffc8-486d-b8aa-07f14daa4ea1",
-        "961eb104-ef35-46f6-9fa5-9493513157ca",
-        "70998997-6acf-43f5-98c7-41315975c5cc",
-        "96686115-6ca7-4739-9198-5dd52084f563",
-        "cc151e37-694c-46ef-a5e5-3ece1939485c",
-        "8ae565a0-0d76-464b-8f32-be4a116c0d4c",
-        "ea4e728a-a33c-4fcc-a43b-aba37b58f598",
-        "8e81d57f-eb56-4a39-80c3-89b0019ea316",
-    };
-
-    private static final String[] users = {
-        "jdoe",
-        "janed",
-        "tjones",
-        "ljohnson",
-        "mikep",
-        "catbat",
-        "qmd",
-        "py",
-        "aletter",
-        "abug",
-    };
-
-    public static Properties configureGenericProperties() {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost:9092,localhost:9192,localhost:9292");
-        props.put(ProducerConfig.ACKS_CONFIG, "1");
-        return props;
-    }
-    public static Properties configureAccountProperties() {
-        Properties props = configureGenericProperties();
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                org.apache.kafka.common.serialization.StringSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                com.redhat.training.kafka.serdes.BankAccountSerializer.class.getName());
-        return props;
-    }
-    public static Properties configureRiskProperties() {
-        Properties props = configureGenericProperties();
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                org.apache.kafka.common.serialization.StringSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                com.redhat.training.kafka.serdes.RiskAssessmentSerializer.class.getName());
-        return props;
-    }
-    public static Properties configurePaymentProperties() {
-        Properties props = configureGenericProperties();
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                org.apache.kafka.common.serialization.StringSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                org.apache.kafka.common.serialization.IntegerSerializer.class.getName());
-        return props;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Random random = new Random();
-
-        // 
-        if (System.getProperty("initialize.accounts") != null &&
-                System.getProperty("initialize.accounts").equals("true")) {
-            LOG.info("Initializing account data...");
-            Producer<String,BankAccount> adp = new KafkaProducer<>(configureAccountProperties());
-            ProducerRecord<String,BankAccount> ad =
-                new ProducerRecord<String,BankAccount>("account-data",
-                    "4f1fabc1-2dfc-475d-ad59-dbe9be76f381",
-                    new BankAccount("4f1fabc1-2dfc-475d-ad59-dbe9be76f381",
-                                    "jdoe",
-                                    "John Doe",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "c2119898-eae8-45a8-b24a-83e964c3440f",
-                    new BankAccount("c2119898-eae8-45a8-b24a-83e964c3440f",
-                                    "janed",
-                                    "Jane Doe",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "a29112f1-ffc8-486d-b8aa-07f14daa4ea1",
-                    new BankAccount("a29112f1-ffc8-486d-b8aa-07f14daa4ea1",
-                                    "tjones",
-                                    "Tom Jones",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "961eb104-ef35-46f6-9fa5-9493513157ca",
-                    new BankAccount("961eb104-ef35-46f6-9fa5-9493513157ca",
-                                    "ljohnson",
-                                    "Linda Johnson",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "70998997-6acf-43f5-98c7-41315975c5cc",
-                    new BankAccount("70998997-6acf-43f5-98c7-41315975c5cc",
-                                    "mikep",
-                                    "Mike Pearson",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "96686115-6ca7-4739-9198-5dd52084f563",
-                    new BankAccount("96686115-6ca7-4739-9198-5dd52084f563",
-                                    "catbat",
-                                    "Cathy Bates",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "cc151e37-694c-46ef-a5e5-3ece1939485c",
-                    new BankAccount("cc151e37-694c-46ef-a5e5-3ece1939485c",
-                                    "qmd",
-                                    "Quasi Modo",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "8ae565a0-0d76-464b-8f32-be4a116c0d4c",
-                    new BankAccount("8ae565a0-0d76-464b-8f32-be4a116c0d4c",
-                                    "py",
-                                    "老百姓",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "ea4e728a-a33c-4fcc-a43b-aba37b58f598",
-                    new BankAccount("ea4e728a-a33c-4fcc-a43b-aba37b58f598",
-                                    "aletter",
-                                    "Anita Letterback",
-                                    0));
-            adp.send(ad).get();
-
-            ad = new ProducerRecord<String,BankAccount>("account-data",
-                    "8e81d57f-eb56-4a39-80c3-89b0019ea316",
-                    new BankAccount("8e81d57f-eb56-4a39-80c3-89b0019ea316",
-                                    "abug",
-                                    "Aida Bugg",
-                                    0));
-            adp.send(ad).get();
-            adp.close();
-        }
-
-        Producer<String,Integer> pp = new KafkaProducer<>(configurePaymentProperties());
-        Producer<String,RiskAssessment> rap = new KafkaProducer<>(configureRiskProperties());
-        while (true) {
-            // send a random payment event to a random account
-            String k = accounts[random.nextInt(accounts.length)];
-            Integer v = random.nextInt(100000);
-            ProducerRecord<String,Integer> pr = new ProducerRecord<String,Integer>("payments", k, v);
-            pp.send(pr).get();
-            LOG.info("Sent record " + k + " -> " + v);
-
-            // sometimes, also send a customer risk assessment update
-            if (random.nextBoolean()) {
-                RiskAssessment ra = new RiskAssessment();
-                ra.setCustomerId(users[random.nextInt(users.length)]);
-                ra.setAssessmentScore(random.nextInt(10) + 1);
-                LOG.info("Updating customer risk status for " + ra.getCustomerId() + " to " + ra.getAssessmentScore());
-                ProducerRecord<String,RiskAssessment> rapr =
-                    new ProducerRecord<String,RiskAssessment>("customer-risk-status", ra.getCustomerId(), ra);
-                rap.send(rapr);
-            }
-
-            Thread.sleep(1000);
-        }
-    }
-}

+ 50 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/ProducerSettings.java

@@ -0,0 +1,50 @@
+package com.redhat.training.kafka.coreapi;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class ProducerSettings {
+    private static final Config cf = ConfigProvider.getConfig();
+
+    public static Properties configureGenericProperties() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                    cf.getValue("bootstrap.server", String.class));
+
+        props.put(ProducerConfig.ACKS_CONFIG,
+                    cf.getOptionalValue("producer.acks", String.class).orElse("all"));
+        props.put(ProducerConfig.LINGER_MS_CONFIG,
+                    cf.getOptionalValue("producer.linger", Integer.class).orElse(0));
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG,
+                    cf.getOptionalValue("producer.batch", Integer.class).orElse(16384));
+        return props;
+    }
+    public static Properties configureAccountProperties() {
+        Properties props = configureGenericProperties();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                com.redhat.training.kafka.serdes.BankAccountSerializer.class.getName());
+        return props;
+    }
+    public static Properties configureRiskProperties() {
+        Properties props = configureGenericProperties();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                com.redhat.training.kafka.serdes.RiskAssessmentSerializer.class.getName());
+        return props;
+    }
+    public static Properties configurePaymentProperties() {
+        Properties props = configureGenericProperties();
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.StringSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                org.apache.kafka.common.serialization.IntegerSerializer.class.getName());
+        return props;
+    }
+}

+ 55 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/RiskAssessmentProducer.java

@@ -0,0 +1,55 @@
+package com.redhat.training.kafka.coreapi;
+
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+import com.redhat.training.kafka.model.RiskAssessment;
+
+public class RiskAssessmentProducer {
+    private static final Logger LOG = Logger.getLogger(RiskAssessmentProducer.class.getName());
+
+    public static void main(String... args) {
+        LOG.info("Updating risk assessment data...");
+
+        Random rand = new Random();
+        String topic = ConfigProvider.getConfig().getOptionalValue("topic.risk-assessments", String.class).orElse("customer-risk-status");
+
+        Producer<String,RiskAssessment> rap = new KafkaProducer<>(ProducerSettings.configureRiskProperties());
+
+        while (true) {
+            RiskAssessment ra = new RiskAssessment();
+
+            ra.setCustomerId(GeneratedData.getRandomCustomerId());
+            ra.setAssessmentScore(rand.nextInt(10) + 1);
+
+            try {
+                LOG.info("Updating customer risk status for " + ra.getCustomerId() + " to " + ra.getAssessmentScore());
+                ProducerRecord<String,RiskAssessment> rapr =
+                    new ProducerRecord<String,RiskAssessment>(topic, ra.getCustomerId(), ra);
+                rap.send(rapr).get();
+            } catch (ExecutionException | InterruptedException e) {
+                LOG.warning(e.getMessage());
+                continue;
+            }
+
+            try {
+                Thread.sleep(rand.nextInt(5000));
+            } catch (InterruptedException ie) {
+                LOG.warning(ie.getMessage());
+                break;
+            }
+        }
+
+        LOG.info("Done sending risk assessments. Closing producer.");
+
+        rap.close();
+
+        LOG.info("Finished. Exiting.");
+    }
+}

+ 13 - 0
payments-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -0,0 +1,13 @@
+bootstrap.server = localhost:9092,localhost:9192,localhost:9292
+
+#security.protocol = PLAINTEXT/SSL (defaults to PLAINTEXT)
+#ssl.truststore.location = somewhere.p12
+#ssl.truststore.password = verysecret
+
+producer.acks = 1
+# producer.linger = 0
+# producer.batch = 16384
+
+topic.payments = payments
+topic.bank-accounts = account-data
+topic.risk-assessments = customer-risk-status