|
@@ -30,9 +30,11 @@ public class SimpleStream {
|
|
|
|
|
|
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simpleStreamProcessor");
|
|
|
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("kafka.server", String.class));
|
|
|
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
|
|
- props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
|
|
|
- props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
|
|
|
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("kafka.protocol", String.class).orElse("PLAINTEXT"));
|
|
|
+ if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore", String.class));
|
|
|
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, cf.getValue("ssl.password", String.class));
|
|
|
+ }
|
|
|
|
|
|
return props;
|
|
|
}
|
|
@@ -48,19 +50,17 @@ public class SimpleStream {
|
|
|
|
|
|
StreamsBuilder b = new StreamsBuilder();
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
KStream<String, Integer> src = b.stream(srcTopic, Consumed.with(ks, vs));
|
|
|
|
|
|
|
|
|
-
|
|
|
+ src.foreach((key, val) -> System.out.println("Received key: " + key + ", value: " + val));
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
+
|
|
|
Map<String, KStream<String, Integer>> splits = src.split(Named.as("stream-"))
|
|
|
.branch((k, v) -> v <= 2500, Branched.as("log"))
|
|
|
.defaultBranch(Branched.as("proc"));
|
|
@@ -75,11 +75,17 @@ public class SimpleStream {
|
|
|
splits.get("stream-proc").foreach((key, val) -> System.out.println("Received HIGH PAYMENT key: " + key + ", value: " + val));
|
|
|
splits.get("stream-proc").to(dstTopic, Produced.with(ks, vs));
|
|
|
|
|
|
+
|
|
|
Topology t = b.build();
|
|
|
+
|
|
|
+
|
|
|
TopologyDescription td = t.describe();
|
|
|
System.out.println("**** TOPOLOGY ****\n" + td.toString());
|
|
|
|
|
|
+
|
|
|
KafkaStreams str = new KafkaStreams(t, cfg);
|
|
|
+
|
|
|
+
|
|
|
final CountDownLatch cd = new CountDownLatch(1);
|
|
|
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown") {
|
|
|
@Override
|
|
@@ -88,7 +94,6 @@ public class SimpleStream {
|
|
|
cd.countDown();
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
try {
|
|
|
str.start();
|
|
|
cd.await();
|