資安

阿里雲Kafka冪等生產者與事務生產者

原理介紹

所謂的消息交付可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什麼樣的承諾。常見的承諾有以下三種:

  • 最多一次(at most once):消息可能會丟失,但絕不會被重複發送。
  • 至少一次(at least once):消息不會丟失,但有可能被重複發送。
  • 精確一次(exactly once):消息不會丟失,也不會被重複發送。

其中at most once 和 at least once在發送端通過是否接收發送的結果來實現。

對於exactly once的情況,目前主要通過冪等性和事務實現。

創建Topic

版本要求:開源版本為2.2.0的專業版實例、Local存儲

圖片.png

冪等性 Producer

指定 Producer 冪等性的方法很簡單,僅需要設置一個參數即可,即 props.put(“enable.idempotence”, ture)

 // 冪等參數設置
  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
        // 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        // 設置創建的Kafka
        String topic = "Local_Topic_Demo"; //消息所屬的Topic,請在控制檯申請之後,填寫在這裡
        
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            Future future1 = producer.send(record1);
            future1.get();//不關心是否發送成功,則不需要這行

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);
            Future future2 = producer.send(record1);
            future2.get();//不關心是否發送成功,則不需要這行

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);
            Future future3 = producer.send(record1);
            future3.get();//不關心是否發送成功,則不需要這行

        } catch(Exception e) {
            e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
        }

當前的冪等性只能保證單分區上,即一個冪等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實現多個分區的冪等性。其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這裡的會話,你可以理解為 Producer 進程的一次運行。當你重啟了 Producer 進程之後,這種冪等性保證就喪失了。

事務性Producer

可以通過事務(transaction)或者依賴事務型 Producer,實現多分區以及多會話上的消息無重複,這也是冪等性 Producer 和事務型 Producer 的最大區別!

設置事務型 Producer 的方法:

  • 和冪等性 Producer 一樣,開啟 enable.idempotence = true。
  • 設置 Producer 端參數 transactional. id。最好為其設置一個有意義的名字。
        // 冪等參數設置
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 事務支持設置
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
        // Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
        // 構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
        // 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //消息所屬的Topic,請在控制檯申請之後,填寫在這裡
        String topic = "Local_Topic_Demo"; 

        // 開啟事務
        producer.initTransactions();
        producer.beginTransaction();
        try{
            ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
            producer.send(record1);

            ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
            producer.send(record2);

            ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
            producer.send(record3);

            producer.commitTransaction();
        } catch(Exception e) {
            producer.abortTransaction();
            e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
        }

小結

冪等性 Producer 和事務型 Producer 都是 Kafka 社區力圖為 Kafka 實現精確一次處理語義所提供的工具,只是它們的作用範圍是不同的。冪等性 Producer 只能保證單分區、單會話上的消息冪等性;而事務能夠保證跨分區、跨會話間的冪等性。從交付語義上來看,自然是事務型 Producer 能做的更多。沒有免費的午餐,比起冪等性 Producer,事務型 Producer 的性能要更差,在實際使用過程中,我們需要仔細評估引入事務的開銷,切不可無腦地啟用事務。

參考鏈接

Kafka消息發送的三種模式
冪等生產者和事務生產者是一回事嗎?

Leave a Reply

Your email address will not be published. Required fields are marked *