開發與維運

Kafka消息發送的三種模式

Step by Step

實例及接入準備

Kafka實例創建,這裡使用阿里雲Kafka消息隊列。為了方便本地測試,創建公網 + VPC實例,參考鏈接

公網接入

三種消息發送方式

1、發後即忘(fire-and-forget)

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

public class KafkaProducerDemo {

    public static void main(String args[]) {
        //設置sasl文件的路徑
        JavaKafkaConfigurer.configureSasl();
        //加載kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //  設置接入點,請通過控制檯獲取對應Topic的接入點
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //  設置SSL根證書的路徑,請記得將XXX修改為自己的路徑
        //  與sasl路徑類似,該文件也不能被打包到jar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        //  根證書store的密碼,保持不變
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        //  接入協議,目前支持使用SASL_SSL協議接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        //  SASL鑑權方式,保持不變
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //  Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //  請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //  設置客戶端內部重試次數
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //  設置客戶端內部重試間隔
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);

        //  hostname校驗改成空
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //  構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
        //  如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //  構造一個Kafka消息
        String topic = kafkaProperties.getProperty("topic"); //消息所屬的Topic,請在控制檯申請之後,填寫在這裡
        String value = "this is the message's value"; //消息的內容

        long t1 = System.currentTimeMillis();

        try {
            for (int i =0; i < 10000; i++) {
                //  發送消息:發後即忘
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                producer.send(kafkaMessage);
            }
         // 將緩衝區的全部消息push到broker當中
         producer.flush();
         producer.close();

         long t2 = System.currentTimeMillis();
         System.out.println("發送消息耗時:" + (t2-t1));
        } catch (Exception e) {
            //  客戶端內部重試之後,仍然發送失敗,業務要應對此類錯誤
            //  參考常見報錯: https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }
        System.out.println("消息發送完成!");
    }
}

2、同步(sync)

long t1=System.currentTimeMillis();

        try {
            //批量獲取 futures 可以加快速度, 但注意,批量不要太大
            List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
            for (int i =0; i < 10000; i++) {
                //發送消息,並獲得一個Future對象
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                futures.add(metadataFuture);
            }
            producer.flush();
            for (Future<RecordMetadata> future: futures) {
                //同步獲得Future對象的結果
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }

            producer.close(); // 關閉producer

            long t2=System.currentTimeMillis();
            System.out.println("同步發送耗時:" + (t2-t1));
        } catch (Exception e) {
            //客戶端內部重試之後,仍然發送失敗,業務要應對此類錯誤
            //參考常見報錯: https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }

3、異步(async)

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;

public class KafkaProducerDemoAsync {

    public static void main(String args[]) {
        //設置sasl文件的路徑
        JavaKafkaConfigurer.configureSasl();

        //加載kafka.properties
        Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();

        Properties props = new Properties();
        //設置接入點,請通過控制檯獲取對應Topic的接入點
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
        //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑
        //與sasl路徑類似,該文件也不能被打包到jar中
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
        //根證書store的密碼,保持不變
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        //接入協議,目前支持使用SASL_SSL協議接入
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        //SASL鑑權方式,保持不變
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        //Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //請求的最長等待時間
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //設置客戶端內部重試次數
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //設置客戶端內部重試間隔
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);

        //hostname校驗改成空
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

        //構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可;
        //如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //構造一個Kafka消息
        String topic = kafkaProperties.getProperty("topic"); //消息所屬的Topic,請在控制檯申請之後,填寫在這裡
        String value = "this is the message's value"; //消息的內容

        long t1=System.currentTimeMillis();

        try {
            for (int i =0; i < 10000; i++) {
                //發送消息,並獲得一個Future對象
                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                producer.send(kafkaMessage,new MyProducerCallback());
            }
         //   將緩衝區的全部消息push到broker當中
         producer.flush();
         producer.close();
         long t2=System.currentTimeMillis();

         System.out.println("發送消息耗時:" + (t2-t1));

        } catch (Exception e) {
            //客戶端內部重試之後,仍然發送失敗,業務要應對此類錯誤
            //參考常見報錯: https://help.aliyun.com/document_detail/68168.html?spm=a2c4g.11186623.6.567.2OMgCB
            System.out.println("error occurred");
            e.printStackTrace();
        }

        System.out.println("消息發送完成!");
    }

    /**
     * callback 類實現
     */
    private static class MyProducerCallback implements Callback {

        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                e.printStackTrace();
                return;
            }
            System.out.println(recordMetadata.topic());
            System.out.println(recordMetadata.partition());
            System.out.println(recordMetadata.offset());
            System.out.println("Coming in MyProducerCallback");
        }
    }
}
時間對比
fire-and-forget:發送消息耗時:456
sync:同步發送耗時:613
async:發送消息耗時:713

更多參考

發佈者最佳實踐

Leave a Reply

Your email address will not be published. Required fields are marked *