Kafka Java客戶端代碼示例
kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)
kafka是linkedin用于日志處理的分布式消息隊(duì)列,linkedin的日志數(shù)據(jù)容量大,但對(duì)可靠性要求不高,其日志數(shù)據(jù)主要包括用戶行為(登錄、瀏覽、點(diǎn)擊、分享、喜歡)以及系統(tǒng)運(yùn)行日志(CPU、內(nèi)存、磁盤、網(wǎng)絡(luò)、系統(tǒng)及進(jìn)程狀態(tài))
當(dāng)前很多的消息隊(duì)列服務(wù)提供可靠交付保證,并默認(rèn)是即時(shí)消費(fèi)(不適合離線)。
高可靠交付對(duì)linkedin的日志不是必須的,故可通過降低可靠性來提高性能,同時(shí)通過構(gòu)建分布式的集群,允許消息在系統(tǒng)中累積,使得kafka同時(shí)支持離線和在線日志處理
測(cè)試環(huán)境
kafka_2.10-0.8.1.1 3個(gè)節(jié)點(diǎn)做的集群
zookeeper-3.4.5 一個(gè)實(shí)例節(jié)點(diǎn)
代碼示例
消息生產(chǎn)者代碼示例
- import java.util.Collections;
 - import java.util.Date;
 - import java.util.Properties;
 - import java.util.Random;
 - import kafka.javaapi.producer.Producer;
 - import kafka.producer.KeyedMessage;
 - import kafka.producer.ProducerConfig;
 - /**
 - * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 - * @author Fung
 - *
 - */
 - public class ProducerDemo {
 - public static void main(String[] args) {
 - Random rnd = new Random();
 - int events=100;
 - // 設(shè)置配置屬性
 - Properties props = new Properties();
 - props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");
 - props.put("serializer.class", "kafka.serializer.StringEncoder");
 - // key.serializer.class默認(rèn)為serializer.class
 - props.put("key.serializer.class", "kafka.serializer.StringEncoder");
 - // 可選配置,如果不配置,則使用默認(rèn)的partitioner
 - props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");
 - // 觸發(fā)acknowledgement機(jī)制,否則是fire and forget,可能會(huì)引起數(shù)據(jù)丟失
 - // 值為0,1,-1,可以參考
 - // http://kafka.apache.org/08/configuration.html
 - props.put("request.required.acks", "1");
 - ProducerConfig config = new ProducerConfig(props);
 - // 創(chuàng)建producer
 - Producer<String, String> producer = new Producer<String, String>(config);
 - // 產(chǎn)生并發(fā)送消息
 - long start=System.currentTimeMillis();
 - for (long i = 0; i < events; i++) {
 - long runtime = new Date().getTime();
 - String ip = "192.168.2." + i;//rnd.nextInt(255);
 - String msg = runtime + ",www.example.com," + ip;
 - //如果topic不存在,則會(huì)自動(dòng)創(chuàng)建,默認(rèn)replication-factor為1,partitions為0
 - KeyedMessage<String, String> data = new KeyedMessage<String, String>(
 - "page_visits", ip, msg);
 - producer.send(data);
 - }
 - System.out.println("耗時(shí):" + (System.currentTimeMillis() - start));
 - // 關(guān)閉producer
 - producer.close();
 - }
 - }
 
消息消費(fèi)者代碼示例
- import java.util.HashMap;
 - import java.util.List;
 - import java.util.Map;
 - import java.util.Properties;
 - import java.util.concurrent.ExecutorService;
 - import java.util.concurrent.Executors;
 - import kafka.consumer.Consumer;
 - import kafka.consumer.ConsumerConfig;
 - import kafka.consumer.KafkaStream;
 - import kafka.javaapi.consumer.ConsumerConnector;
 - /**
 - * 詳細(xì)可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 - *
 - * @author Fung
 - *
 - */
 - public class ConsumerDemo {
 - private final ConsumerConnector consumer;
 - private final String topic;
 - private ExecutorService executor;
 - public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
 - consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
 - this.topic = a_topic;
 - }
 - public void shutdown() {
 - if (consumer != null)
 - consumer.shutdown();
 - if (executor != null)
 - executor.shutdown();
 - }
 - public void run(int numThreads) {
 - Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
 - topicCountMap.put(topic, new Integer(numThreads));
 - Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
 - .createMessageStreams(topicCountMap);
 - List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 - // now launch all the threads
 - executor = Executors.newFixedThreadPool(numThreads);
 - // now create an object to consume the messages
 - //
 - int threadNumber = 0;
 - for (final KafkaStream stream : streams) {
 - executor.submit(new ConsumerMsgTask(stream, threadNumber));
 - threadNumber++;
 - }
 - }
 - private static ConsumerConfig createConsumerConfig(String a_zookeeper,
 - String a_groupId) {
 - Properties props = new Properties();
 - props.put("zookeeper.connect", a_zookeeper);
 - props.put("group.id", a_groupId);
 - props.put("zookeeper.session.timeout.ms", "400");
 - props.put("zookeeper.sync.time.ms", "200");
 - props.put("auto.commit.interval.ms", "1000");
 - return new ConsumerConfig(props);
 - }
 - public static void main(String[] arg) {
 - String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };
 - String zooKeeper = args[0];
 - String groupId = args[1];
 - String topic = args[2];
 - int threads = Integer.parseInt(args[3]);
 - ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
 - demo.run(threads);
 - try {
 - Thread.sleep(10000);
 - } catch (InterruptedException ie) {
 - }
 - demo.shutdown();
 - }
 - }
 
消息處理類
- import kafka.consumer.ConsumerIterator;
 - import kafka.consumer.KafkaStream;
 - public class ConsumerMsgTask implements Runnable {
 - private KafkaStream m_stream;
 - private int m_threadNumber;
 - public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
 - m_threadNumber = threadNumber;
 - m_stream = stream;
 - }
 - public void run() {
 - ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
 - while (it.hasNext())
 - System.out.println("Thread " + m_threadNumber + ": "
 - + new String(it.next().message()));
 - System.out.println("Shutting down Thread: " + m_threadNumber);
 - }
 - }
 
Partitioner類示例
- import kafka.producer.Partitioner;
 - import kafka.utils.VerifiableProperties;
 - public class PartitionerDemo implements Partitioner {
 - public PartitionerDemo(VerifiableProperties props) {
 - }
 - @Override
 - public int partition(Object obj, int numPartitions) {
 - int partition = 0;
 - if (obj instanceof String) {
 - String key=(String)obj;
 - int offset = key.lastIndexOf('.');
 - if (offset > 0) {
 - partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;
 - }
 - }else{
 - partition = obj.toString().length() % numPartitions;
 - }
 - return partition;
 - }
 - }
 
參考















 
 
 


 
 
 
 