반응형
java로 Producer,Consumer구현하기
package kafka.demo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class Consumer1 {
private final static String BOOTSTRAP_SERVER = "localhost:9092";
private final static String TOPIC_NAME = "topic5";
private final static String GROUP_ID = "group_one";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for(ConsumerRecord<String,String> record:records){
System.out.println(">>>"+record);
}
}
}
}
package kafka.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Slf4j
public class Producer1 {
private final static String BOOTSTRAP_SERVER = "localhost:9092";
private final static String TOPIC_NAME = "topic5";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.ACKS_CONFIG,"all");
configs.put(ProducerConfig.RETRIES_CONFIG, "100");
KafkaProducer<String,String> producer = new KafkaProducer<>(configs);
String message = "First Message";
ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,message);
RecordMetadata metadata= producer.send(record).get();
System.out.printf(">>> %s, %d, %d",message,metadata.partition(),metadata.offset());
producer.flush();
producer.close();
}
}
반응형
'Spring > Kafka' 카테고리의 다른 글
태태개발일지(Kafka) (0) | 2023.08.07 |
---|---|
태태개발일지 (Kafka) (0) | 2023.08.06 |