瀏覽代碼

core api producer

Grega Bremec 7 月之前
父節點
當前提交
6ad5c26bae

+ 53 - 0
code/core-api-producer/pom.xml

@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>com.redhat.training.kafka</groupId>
+        <artifactId>parent</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>core-api-producer</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.release}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.smallrye.config</groupId>
+            <artifactId>smallrye-config</artifactId>
+            <version>${smallrye.release}</version>
+        </dependency>
+        <!-- <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <version>1.3.2</version>
+        </dependency> -->
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+          <version>${slf4j.release}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.4.1</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <mainClass>com.redhat.training.kafka.coreapi.producer.Producer</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 103 - 0
code/core-api-producer/src/main/java/com/redhat/training/kafka/coreapi/producer/Producer.java

@@ -0,0 +1,103 @@
+package com.redhat.training.kafka.coreapi.producer;
+
+import java.util.Properties;
+import java.util.Random;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SslConfigs;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public class Producer {
+    static final Logger LOG = Logger.getLogger(Producer.class.getName());
+
+    public static final String[] quotes = {
+        "\"I agree with everything you say, but I would attack to the death your right to say it.\" -- Tom Stoppard (1937 - )",
+        "\"Any fool can tell the truth, but it requires a man of some sense to know how to lie well.\" -- Samuel Butler (1835 - 1902)",
+        "\"There is no nonsense so gross that society will not, at some time, make a doctrine of it and defend it with every weapon of communal stupidity.\" -- Robertson Davies",
+        "\"The nation behaves well if it treats the natural resources as assets which it must turn over to the next generation increased, and not impaired, in value.\" -- Theodore Roosevelt (1858 - 1919), Speech before the Colorado Live Stock Association, Denver, Colorado, August 19, 1910",
+        "\"I wish you sunshine on your path and storms to season your journey. I wish you peace in the world in which you live... More I cannot wish you except perhaps love to make all the rest worthwhile.\" -- Robert A. Ward",
+        "\"Fall seven times, stand up eight.\" -- Japanese Proverb",
+        "\"I think the world is run by 'C' students.\" -- Al McGuire",
+        "\"What makes the engine go? Desire, desire, desire.\" -- Stanley Kunitz, O Magazine, September 2003",
+        "\"Do not pursue what is illusory - property and position: all that is gained at the expense of your nerves decade after decade and can be confiscated in one fell night. Live with a steady superiority over life - don't be afraid of misfortune, and do not yearn after happiness; it is after all, all the same: the bitter doesn't last forever, and the sweet never fills the cup to overflowing.\" -- Alexander Solzhenitsyn (1918 - )",
+        "\"Anyone who goes to a psychiatrist ought to have his head examined.\" -- Samuel Goldwyn (1882 - 1974)",
+        "\"An expert is a person who has made all the mistakes that can be made in a very narrow field.\" -- Niels Bohr (1885 - 1962)",
+        "\"It's going to come true like you knew it, but it's not going to feel like you think.\" -- Rosie O'Donnell, Today Show interview, 04-08-08",
+        "\"Your primary goal should be to have a great life. You can still have a good day, enjoy your child, and ultimately find happiness, whether your ex is acting like a jerk or a responsible person. Your happiness is not dependent upon someone else.\" -- Julie A., M.A. Ross and Judy Corcoran, Joint Custody with a Jerk: Raising a Child with an Uncooperative Ex, 2011",
+        "\"You have to keep plugging away. We are all growing. There is no shortcut. You have to put time into it to build an audience\" -- John Gruber, How to Blog for Money by Learning from Comics, SXSW 2006",
+        "\"We are advertis'd by our loving friends.\" -- William Shakespeare (1564 - 1616)",
+        "\"We shall find peace. We shall hear the angels, we shall see the sky sparkling with diamonds.\" -- Anton Chekhov (1860 - 1904), 1897",
+        "\"Do not be fooled into believing that because a man is rich he is necessarily smart. There is ample proof to the contrary.\" -- Julius Rosenwald (1862 - 1932)",
+        "\"See, that's all you're thinking about, is winning. You're confirming your sense of self- worth through outward reward instead of through inner appreciation.\" -- Barbara Hall, Northern Exposure, Gran Prix, 1994",
+        "\"If you think you can do a thing or think you can't do a thing, you're right.\" -- Henry Ford (1863 - 1947), (attributed)",
+        "\"I am here and you will know that I am the best and will hear me.\" -- Leontyne Price, O Magazine, December 2003 ",
+    };
+
+    public static Properties configProperties() {
+        Config cf = ConfigProvider.getConfig();
+        Properties props = new Properties();
+
+        // Standard mandatory configs.
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getValue("security.protocol", String.class));
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getOptionalValue("ssl.truststore.location", String.class).orElse("SSL"));
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.truststore.password", String.class));
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.bootstrap.server", String.class));
+
+        // Fixed config, not changeable.
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.IntegerSerializer.class.getName());
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName());
+
+        // Optional stuff.
+        props.put(ProducerConfig.ACKS_CONFIG, cf.getOptionalValue("producer.acks", String.class).orElse("1"));
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, cf.getOptionalValue("producer.batch", String.class).orElse("16384"));
+        props.put(ProducerConfig.LINGER_MS_CONFIG, cf.getOptionalValue("producer.linger", String.class).orElse("0"));
+        props.put(ProducerConfig.RETRIES_CONFIG, cf.getOptionalValue("producer.retries", String.class).orElse("2147483647"));
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, cf.getOptionalValue("producer.max-inflight", String.class).orElse("5"));
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, cf.getOptionalValue("producer.idempotent", String.class).orElse("true"));
+        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, cf.getOptionalValue("producer.delivery-timeout", String.class).orElse("120000"));
+
+        return props;
+    }
+    public static void main(String... args) {
+        int sendSize = ConfigProvider.getConfig().getOptionalValue("producer.num-records-per-roll", Integer.class).orElse(100);
+        int waitAfterBatch = ConfigProvider.getConfig().getOptionalValue("producer.wait-after-roll", Integer.class).orElse(500);
+        int howManyrolls = ConfigProvider.getConfig().getOptionalValue("producer.num-rolls", Integer.class).orElse(1);
+        String topic = ConfigProvider.getConfig().getOptionalValue("producer.topic", String.class).orElse("test-topic");
+
+        Random rnd = new Random();
+        KafkaProducer<Integer, String> prod = new KafkaProducer<>(configProperties());
+
+        for (int x = 0; x < howManyrolls; x++) {
+            for (int y = 0; y < sendSize; y++) {
+                int idx = rnd.nextInt(quotes.length);
+                ProducerRecord<Integer, String> rec = new ProducerRecord<>(topic, y, quotes[idx]);
+                prod.send(rec, new Callback() {
+                    public void onCompletion(RecordMetadata rm, Exception e) {
+                        if (e != null) {
+                            LOG.warning(e.getMessage());
+                        } else {
+                            LOG.info(String.format("Sent: T:%s P:%d K:%d V:%s", rm.topic(), rm.partition(), rec.key(), rec.value()));
+                        }
+                    }
+                });
+            }
+            if (x < (howManyrolls - 1)) {
+                LOG.info(String.format("Trying to sleep for %d ms...", waitAfterBatch));;
+                try {
+                    Thread.sleep(waitAfterBatch);
+                } catch (InterruptedException ie) {
+                    LOG.warning("Interrupted: " + ie.getMessage());
+                }
+            }
+        }
+
+        prod.close();
+    }
+}

+ 16 - 0
code/core-api-producer/src/main/resources/META-INF/microprofile-config.properties

@@ -0,0 +1,16 @@
+kafka.bootstrap.server = bf-brokers-kafka-bootstrap-kafka-break-fix.apps.ocp4.example.com:443
+ssl.truststore.location = bf-brokers.p12
+ssl.truststore.password = FIXME
+
+# producer.acks = 
+# producer.batch = 
+# producer.linger = 
+# producer.retries = 
+# producer.max-inflight = 
+# producer.idempotent = 
+# producer.delivery-timeout = 
+
+# producer.num-records-per-roll = 
+# producer.wait-after-roll = 
+# producer.num-rolls = 
+# producer.topic = 

+ 63 - 0
code/pom.xml

@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.redhat.training.kafka</groupId>
+    <artifactId>parent</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <properties>
+        <compiler-plugin.version>3.12.1</compiler-plugin.version>
+        <maven.compiler.release>17</maven.compiler.release>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
+        <quarkus.platform.group-id>com.redhat.quarkus.platform</quarkus.platform.group-id>
+        <quarkus.platform.version>3.8.5.SP1-redhat-00001</quarkus.platform.version>
+        <kafka.release>3.7.0.redhat-00007</kafka.release>
+        <slf4j.release>1.7.36</slf4j.release>
+        <smallrye.release>3.9.1</smallrye.release>
+        <skipITs>true</skipITs>
+        <surefire-plugin.version>3.2.5</surefire-plugin.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>${quarkus.platform.group-id}</groupId>
+                <artifactId>${quarkus.platform.artifact-id}</artifactId>
+                <version>${quarkus.platform.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <repositories>
+        <repository>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <id>redhat</id>
+            <url>https://maven.repository.redhat.com/ga</url>
+        </repository>
+    </repositories>
+
+    <pluginRepositories>
+        <pluginRepository>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+            <id>redhat</id>
+            <url>https://maven.repository.redhat.com/ga</url>
+        </pluginRepository>
+    </pluginRepositories>
+</project>