|
@@ -1,7 +1,6 @@
|
|
|
-package com.redhat.training.kafka.streams;
|
|
|
+package com.redhat.training.kafka.quarkus.streams;
|
|
|
|
|
|
import java.time.Duration;
|
|
|
-import java.util.Map;
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
import jakarta.enterprise.context.ApplicationScoped;
|
|
@@ -12,10 +11,8 @@ import org.apache.kafka.common.serialization.Serdes;
|
|
|
import org.apache.kafka.streams.KeyValue;
|
|
|
import org.apache.kafka.streams.StreamsBuilder;
|
|
|
import org.apache.kafka.streams.Topology;
|
|
|
-import org.apache.kafka.streams.kstream.Branched;
|
|
|
import org.apache.kafka.streams.kstream.Consumed;
|
|
|
import org.apache.kafka.streams.kstream.GlobalKTable;
|
|
|
-import org.apache.kafka.streams.kstream.Joined;
|
|
|
import org.apache.kafka.streams.kstream.KStream;
|
|
|
import org.apache.kafka.streams.kstream.KTable;
|
|
|
import org.apache.kafka.streams.kstream.KeyValueMapper;
|
|
@@ -24,8 +21,6 @@ import org.apache.kafka.streams.kstream.Named;
|
|
|
import org.apache.kafka.streams.kstream.Produced;
|
|
|
import org.apache.kafka.streams.kstream.Suppressed;
|
|
|
import org.apache.kafka.streams.kstream.TimeWindows;
|
|
|
-import org.apache.kafka.streams.kstream.Windowed;
|
|
|
-import org.apache.kafka.streams.kstream.WindowedSerdes;
|
|
|
|
|
|
import com.redhat.training.kafka.model.AggregatePaymentData;
|
|
|
import com.redhat.training.kafka.model.BankAccount;
|
|
@@ -44,15 +39,14 @@ public class PaymentsStream {
|
|
|
// Deserializer for message keys.
|
|
|
private final Serde<String> stringSerde = Serdes.String();
|
|
|
|
|
|
- // Serializer for message values
|
|
|
+ // Serializers for message values
|
|
|
private final Serde<Integer> integerSerde = Serdes.Integer();
|
|
|
-
|
|
|
+ private final Serde<Long> longSerde = Serdes.Long();
|
|
|
private final ObjectMapperSerde<PaymentTransaction> ptSerde = new ObjectMapperSerde<>(PaymentTransaction.class);
|
|
|
private final ObjectMapperSerde<BankAccount> baSerde = new ObjectMapperSerde<>(BankAccount.class);
|
|
|
private final ObjectMapperSerde<RiskAssessment> raSerde = new ObjectMapperSerde<>(RiskAssessment.class);
|
|
|
private final ObjectMapperSerde<EnrichedRiskAssessment> eraSerde = new ObjectMapperSerde<>(EnrichedRiskAssessment.class);
|
|
|
private final ObjectMapperSerde<AggregatePaymentData> apdSerde = new ObjectMapperSerde<>(AggregatePaymentData.class);
|
|
|
- private final Serde<Long> longSerde = Serdes.Long();
|
|
|
|
|
|
@Produces
|
|
|
public Topology buildTopology() {
|
|
@@ -61,7 +55,7 @@ public class PaymentsStream {
|
|
|
// Input data: payments topic (use producer's default profile exec:java),
|
|
|
// bank account data (use producer -Pbank-account-data exec:java to initialize), and
|
|
|
// risk status updates (use producer -Prisk-assessment-updates exec:java to start updates)
|
|
|
- KStream<String,Integer> payments = builder.stream("payments", Consumed.with(stringSerde, integerSerde));
|
|
|
+ KStream<String,Integer> payments = builder.stream("transactions", Consumed.with(stringSerde, integerSerde));
|
|
|
KTable<String,BankAccount> acctTable = builder.table("account-data",
|
|
|
Consumed.with(stringSerde, baSerde));
|
|
|
GlobalKTable<String,RiskAssessment> riskStatusTable = builder.globalTable("customer-risk-status",
|
|
@@ -85,6 +79,13 @@ public class PaymentsStream {
|
|
|
// - join on account table and enrich data;
|
|
|
// - join on risk status table, enrich again;
|
|
|
// Finally, apply risk assessment logic in a filter and send to "large-payments" if needed.
|
|
|
+ //
|
|
|
+ // TODO: There is potential for improvement here:
|
|
|
+ // - branch instead of filtering
|
|
|
+ // - mark low transactions as auto-approved and send them to transaction-data (branch 1)
|
|
|
+ // - mark high transactions with acceptable risk as auto-approved + risky, same destination (branch 2-1)
|
|
|
+ // - send high transactions with high risk to large-payments topic (branch 2-2)
|
|
|
+ // - receive transactions from approved-payments, mark them as manually approved and send them to transaction-data
|
|
|
payments
|
|
|
.filter((acctId, amt) -> amt > 10000)
|
|
|
.mapValues((amt) -> {
|
|
@@ -127,6 +128,7 @@ public class PaymentsStream {
|
|
|
.to("large-payments", Produced.with(stringSerde, eraSerde));
|
|
|
|
|
|
// Produce aggregated payment data (how many aggregate transactions in the last 30 seconds of time window).
|
|
|
+ // TODO:
|
|
|
payments
|
|
|
.groupByKey()
|
|
|
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(30), Duration.ofSeconds(5)))
|
|
@@ -144,6 +146,8 @@ public class PaymentsStream {
|
|
|
})
|
|
|
.to("aggregate-data",
|
|
|
Produced.with(stringSerde, apdSerde));
|
|
|
+ // TODO: There is potential for continuation of the story here:
|
|
|
+ // implement fraud detection logic for accounts that have seen above x amt transactions in the last window
|
|
|
|
|
|
// Update each account with the latest balance available.
|
|
|
payments.join(acctTable,
|