原理介紹
所謂的消息交付可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什麼樣的承諾。常見的承諾有以下三種:
- 最多一次(at most once):消息可能會丟失,但絕不會被重複發送。
- 至少一次(at least once):消息不會丟失,但有可能被重複發送。
- 精確一次(exactly once):消息不會丟失,也不會被重複發送。
其中at most once 和 at least once在發送端通過是否接收發送的結果來實現。
對於exactly once的情況,目前主要通過冪等性和事務實現。
創建Topic
版本要求:開源版本為2.2.0的專業版實例、Local存儲
冪等性 Producer
指定 Producer 冪等性的方法很簡單,僅需要設置一個參數即可,即 props.put(“enable.idempotence”, ture)
// 冪等參數設置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 請求的最長等待時間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
// 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
// 設置創建的Kafka
String topic = "Local_Topic_Demo"; //消息所屬的Topic,請在控制檯申請之後,填寫在這裡
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
Future future1 = producer.send(record1);
future1.get();//不關心是否發送成功,則不需要這行
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
Future future2 = producer.send(record1);
future2.get();//不關心是否發送成功,則不需要這行
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
Future future3 = producer.send(record1);
future3.get();//不關心是否發送成功,則不需要這行
} catch(Exception e) {
e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
}
當前的冪等性只能保證單分區上,即一個冪等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實現多個分區的冪等性。其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這裡的會話,你可以理解為 Producer 進程的一次運行。當你重啟了 Producer 進程之後,這種冪等性保證就喪失了。
事務性Producer
可以通過事務(transaction)或者依賴事務型 Producer,實現多分區以及多會話上的消息無重複,這也是冪等性 Producer 和事務型 Producer 的最大區別!
設置事務型 Producer 的方法:
- 和冪等性 Producer 一樣,開啟 enable.idempotence = true。
- 設置 Producer 端參數 transactional. id。最好為其設置一個有意義的名字。
// 冪等參數設置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 事務支持設置
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"taro_transaction_id");
// Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 請求的最長等待時間
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3 * 1000);
// 構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
// 如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//消息所屬的Topic,請在控制檯申請之後,填寫在這裡
String topic = "Local_Topic_Demo";
// 開啟事務
producer.initTransactions();
producer.beginTransaction();
try{
ProducerRecord<String,String> record1 = new ProducerRecord<>(topic,"msg1");
producer.send(record1);
ProducerRecord<String,String> record2 = new ProducerRecord<>(topic,"msg2");
producer.send(record2);
ProducerRecord<String,String> record3 = new ProducerRecord<>(topic,"msg3");
producer.send(record3);
producer.commitTransaction();
} catch(Exception e) {
producer.abortTransaction();
e.printStackTrace();//連接錯誤、No Leader錯誤都可以通過重試解決;消息太大這類錯誤kafkaProducer不會進行任何重試,直接拋出異常
}
小結
冪等性 Producer 和事務型 Producer 都是 Kafka 社區力圖為 Kafka 實現精確一次處理語義所提供的工具,只是它們的作用範圍是不同的。冪等性 Producer 只能保證單分區、單會話上的消息冪等性;而事務能夠保證跨分區、跨會話間的冪等性。從交付語義上來看,自然是事務型 Producer 能做的更多。沒有免費的午餐,比起冪等性 Producer,事務型 Producer 的性能要更差,在實際使用過程中,我們需要仔細評估引入事務的開銷,切不可無腦地啟用事務。