Spring/Kafka

태태개발일지(Kafka)

태태코 2023. 8. 8. 22:30
반응형

java로 Producer,Consumer구현하기

 

package kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class Consumer1 {
    private final static String BOOTSTRAP_SERVER = "localhost:9092";
    private final  static String TOPIC_NAME = "topic5";
    private final  static  String GROUP_ID = "group_one";
    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for(ConsumerRecord<String,String> record:records){
                System.out.println(">>>"+record);
            }
        }
    }
}
package kafka.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Slf4j
public class Producer1 {
    private final static String BOOTSTRAP_SERVER = "localhost:9092";
    private final  static String TOPIC_NAME = "topic5";

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        configs.put(ProducerConfig.RETRIES_CONFIG, "100");

        KafkaProducer<String,String> producer = new KafkaProducer<>(configs);
        String message =  "First Message";

        ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,message);

        RecordMetadata metadata=  producer.send(record).get();

        System.out.printf(">>> %s, %d, %d",message,metadata.partition(),metadata.offset());

        producer.flush();
        producer.close();
    }
}
반응형

'Spring > Kafka' 카테고리의 다른 글

태태개발일지(Kafka)  (0) 2023.08.07
태태개발일지 (Kafka)  (0) 2023.08.06