|
@@ -13,7 +13,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
-import org.apache.kafka.clients.producer.ProducerConfig;
|
|
|
import org.apache.kafka.common.config.SslConfigs;
|
|
|
import org.eclipse.microprofile.config.Config;
|
|
|
import org.eclipse.microprofile.config.ConfigProvider;
|
|
@@ -28,7 +27,7 @@ public class Consumer {
|
|
|
Properties props = new Properties();
|
|
|
|
|
|
// Standard mandatory configs.
|
|
|
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("bootstrap.server", String.class));
|
|
|
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cf.getValue("bootstrap.server", String.class));
|
|
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, cf.getOptionalValue("security.protocol", String.class).orElse("PLAINTEXT"));
|
|
|
if (props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG).equals("SSL")) {
|
|
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, cf.getValue("ssl.truststore.location", String.class));
|