kafka分布式搭建+java程序開發


回顧過去200多年,第一次工業革命蒸汽時代,第二次工業革命電石油能源時代,第三次工業革命原子能、電子計算機時代,第四次工業革命我認為是互聯網數據時代,誰掌握了數據,誰就掌握了財富與權力。在當今世界如何做到對大量數據收集分析成為世界話題,kafka插件應用而生,kafka做為流處理,被各個著名互聯網公司廣泛應用,列如今日頭條的廣告日志收集采用kafka插件

運行kafka,需要依賴 zookeeper,我們可以單獨下載安裝zookeeper集群,也可以利用kafka自帶集群zookeeper集群

從kafka官網下載二進制編碼包,下載地址:http://kafka.apache.org/downloads,我安裝的是kafka_2.11-0.10.1.0.tgz版本

第一步:准備工作

1.准備三台機器192.168.138.130、192.168.138.131、192.168.138.132

2.安裝jdk(我安裝的是jdk-10_linux-x64_bin.tar.gz)

3.查看各個服務器防火牆是否關閉,如果沒有關閉,要及時關閉防火牆

第二步:啟動zookeeper需要修改zookeeper.properties配置文件

1.新增

tickTime=2000(心跳和超時)
initLimit=10(初始化連接時最長能忍受多少個心跳時間間隔數)
syncLimit=5(Leader與Follower之間發送消息,請求和應答時間長度,最長不能超過多少個tickTime 的時間長度)

server.1=192.168.138.130:2888:3888

server.2=192.168.138.131:2888:3888

server.3=192.168.138.132:2888:3888

2.修改

dataDir=/home/jiasd/datas/zookeeper(zookeeper數據存儲路徑)

3.執行命令啟動zookeeper集群

bin/zookeeper-server-start.sh config/zookeeper.properties

出現錯誤日志

Unrecognized VM option ‘UseCompressedOops’

Error: Clould not create the Java Vritual Machine.

Error: A fatal exception has occurres . Program will exit.

解決方法:
找到bin/kafka-run-class.sh 文件,使用vim打開,我的這個版本是在115行
113 # JVM performance options
14 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
115  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC     -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRe    mark -XX:+DisableExplicitGC -Djava.awt.headless=true"
116 fi
去掉-XX:+UseCompressedOops這個設置

第三步:啟動kafka需要修改server.properties配置文件

1.修改

broker.id=0(每個broker保持唯一,可依次遞增,另外兩個機器我分別配置為broker.id=1,broker.id=1)

log.dirs=/home/jiasd/datas/kafka/logs(kafka數據存放地址)

num.partitions=3(每個topic的分區個數,大於等於broke數量)

zookeeper.connect=192.168.138.130:2181,192.168.138.131:2181,192.168.138.132:2181(連接zookeeper集群)

2.執行命令啟動kafka集群

bin/kafka-server-start.sh config/server.properties

第四步:接下來我們驗證一下kafka集群是否可以正常工作

1.創建一個topic

cd到192.168.138.130機器kafka的安裝目錄下,執行bin/kafka-topics.sh --create --zookeeper 192.168.138.130:2181 --replication-factor 3 --partitions 3 --topic test

2.查看topic

再執行bin/kafka-topics.sh --list --zookeeper 192.168.138.130:2181

3.創建一個生產者

再執行bin/kafka-console-producer.sh -broker-list 192.168.138.130:9092 -topic test

4.創建一個消費者

cd到192.168.138.131機器kafka的安裝目錄下,執行bin/kafka-console-consumer.sh -zookeeper 192.168.138.131:2181 -from-beginning -topic test

5.發送消息與接收消息演示

在第3步驟執行完后,輸入數據i say hello world,我們192.168.138.131機器上收到i produce hello world


以上步驟完成了kafka集群搭建與測試,接下來我們通過java程序完成消息發送與接收,這里我用的是IDEA開發工具

1.建立一個maven項目

2.在pom.xml里配置kafka依賴版本包,如下:

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.0</version>

        </dependency>

</dependencies>

3.編寫producer與consumer程序

生產者

package kafka;
/**
 * Created by lenovo on 2018/5/13.
 */
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("zk.connect", "192.168.138.131:2181,192.168.138.130:2181,192.168.138.132:2181");
        props.put("metadata.broker.list","192.168.138.131:9092,192.168.138.130:9092,192.168.138.132:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        for (int i = 1; i <= 100; i++) {
            Thread.sleep(2000);
            producer.send(new KeyedMessage<String, String>("test", "i say hello world"));
        }
    }
}


消費者

package kafka;


/**
 * Created by lenovo on 2018/5/13.
 */
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class ConsumerDemo {
    private static final String topic = "test";
    private static final Integer threads = 1;
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "192.168.138.131:2181,192.168.138.130:2181,192.168.138.132:2181");
        props.put("group.id", "1111");
        props.put("auto.offset.reset", "smallest");
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1);
        topicCountMap.put("testtopics", 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test");
        for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
            new Thread(new Runnable() {
                public void run() {
                    for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
                        String msg = new String(mm.message());
                        System.out.println(msg);

                    }


                }


            }).start();


        }


    }


}



注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com