Kaynağa Gözat

reset risk status for everyone to 0 at startup

Grega Bremec 7 ay önce
ebeveyn
işleme
3aefaba7b3

+ 7 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/GeneratedData.java

@@ -54,6 +54,13 @@ public class GeneratedData {
         return accounts[rand.nextInt(accounts.length)];
     }
 
+    public static String getCustomerId(int id) {
+        if (id >= usernames.length) {
+            return null;
+        }
+        return usernames[id];
+    }
+
     public static BankAccount getBankAccount(int id) {
         if (id >= accounts.length) {
             return null;

+ 24 - 0
payments-producer/src/main/java/com/redhat/training/kafka/coreapi/RiskAssessmentProducer.java

@@ -22,6 +22,30 @@ public class RiskAssessmentProducer {
 
         Producer<String,RiskAssessment> rap = new KafkaProducer<>(ProducerSettings.configureRiskProperties());
 
+        // reset everyone's risk score to zero at start
+        LOG.info("Resetting risk status for all customers to 0...");
+        int cid = 0;
+        while (true) {
+            String customerId = GeneratedData.getCustomerId(cid);
+            cid++;
+
+            if (customerId == null) {
+                break;
+            }
+            RiskAssessment ra = new RiskAssessment();
+            ra.setCustomerId(customerId);
+            ra.setAssessmentScore(0);
+            LOG.info(" - " + customerId);
+            try {
+                ProducerRecord<String,RiskAssessment> rapr =
+                        new ProducerRecord<String,RiskAssessment>(topic, ra.getCustomerId(), ra);
+                rap.send(rapr).get();
+            } catch (ExecutionException | InterruptedException e) {
+                LOG.warning(e.getMessage());
+                continue;
+            }
+        }
+
         while (true) {
             RiskAssessment ra = new RiskAssessment();