雲計算

Kafka 多線程消費問題

Step By Step

Kafka實例創建,這裡使用阿里雲Kafka消息隊列。為了方便本地測試,創建公網 + VPC實例,參考鏈接
公網接入

消費端程序

1、參數配置

參考:Kafka消息發送的三種模式 連接參數配置部分。

2、消費端:Code Sample

import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {

    //加載kafka.properties
    public static Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

    public static Properties initConfig()
    {
        Properties props = new Properties();
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //設置接入點,即控制檯的實例詳情頁顯示的“默認接入點”
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //兩次poll之間的最大允許間隔
        //請不要改得太大,服務器會掐掉空閒連接,不要超過30000
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);

        //每次poll的最大數量
        //注意該值不要改得太大,如果poll太多數據,而不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);

        //當前消費實例所屬的消費組,請在控制檯申請之後填寫
        //屬於同一個組的消費實例,會負載消費消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));

        return props;
    }


    public static void main(String[] args) {

        Properties properties = initConfig();
        int consumerThreadNum = 6; // 可以設置和Topic的分區數據一致,這樣一個分區就可以分配一個線程來消費消息。

        String topic  = kafkaProperties.getProperty("topic");
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(properties, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread{
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties props, String topic)
        {
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        public void run()
        {
            try {
                while (true)
                {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record: records) {
                        System.out.println("Thread num: " + this.getName());
                        System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
                    }
                }
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }finally {
                kafkaConsumer.close();
            }
        }
    }
}

3、消費端情況
圖片.png

4、服務端控制檯查看
圖片.png

參考鏈接

Kafka消息發送的三種模式
訂閱者最佳實踐

Leave a Reply

Your email address will not be published. Required fields are marked *