Step By Step
消費端程序
1、參數配置
參考:Kafka消息發送的三種模式 連接參數配置部分。
2、消費端:Code Sample
import com.base.JavaKafkaConfigurer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
//加載kafka.properties
public static Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
public static Properties initConfig()
{
Properties props = new Properties();
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//設置接入點,即控制檯的實例詳情頁顯示的“默認接入點”
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//兩次poll之間的最大允許間隔
//請不要改得太大,服務器會掐掉空閒連接,不要超過30000
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 25000);
//每次poll的最大數量
//注意該值不要改得太大,如果poll太多數據,而不能在下次poll之前消費完,則會觸發一次負載均衡,產生卡頓
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//當前消費實例所屬的消費組,請在控制檯申請之後填寫
//屬於同一個組的消費實例,會負載消費消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
return props;
}
public static void main(String[] args) {
Properties properties = initConfig();
int consumerThreadNum = 6; // 可以設置和Topic的分區數據一致,這樣一個分區就可以分配一個線程來消費消息。
String topic = kafkaProperties.getProperty("topic");
for (int i = 0; i < consumerThreadNum; i++) {
new KafkaConsumerThread(properties, topic).start();
}
}
public static class KafkaConsumerThread extends Thread{
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic)
{
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
public void run()
{
try {
while (true)
{
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println("Thread num: " + this.getName());
System.out.println(String.format("Consume partition:%d offset:%d the message body:%s", record.partition(), record.offset(),record.value()));
}
}
}
catch (Exception e)
{
e.printStackTrace();
}finally {
kafkaConsumer.close();
}
}
}
}
3、消費端情況
4、服務端控制檯查看