首页  ·  知识 ·  大数据
kafka介绍,安装以及简单的java调用kafka代码
网友  CSDN  消息队列  编辑:卡米啦   图片来源:网络
如果将Kafka在zookeeper的默认目录,修改为自定义目录时,在运行过程中会报出java.lang.IllegalArgumentException:Pathlengthmustbe>0”错误

Producer :消息生产者,向broker发消息的客户端。
Consumer :消息消费者,向broker取消息的客户端
Topic :一个队列,主题。

Message:消息是kafka处理的对象,在kafka中,消息是被发布到broker的topic中。而consumer也是从相应的topic中拿数据。也就是说,message是按topic存储的
Consumer Group :将topic消息的广播发给consumer的手段。一个topic可以有多个CG。
Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka


安装过程

(1)下载解压。官网下载kafka,http://kafka.apache.org/  

解压到安装目录下 tar -xcvf 

(2)修改配置文件/usr/local/kafka/config/server.properties,修改如下内容

broker.id=0

host.name=hadoop1

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181,hadoop4:2181,hadoop5:2181,hadoop6:2181

(3)修改完配置文件即可将整个文件夹传输到其他节点  scp -r 。。。。

(4)传输完之后修改每个节点的broker.id的编号,递增。

(5)启动zookeeper。

这边可以使用kafka自带的zookeeper,也可以使用自己安装的zookeeper。

启动自己安装的zookeeper  :   /app/zookeeper-3.4.6/bin/zkServer.sh start

各个节点均启动完成之后,可以查看zk的状态  /app/zookeeper-3.4.6/bin/zkServer.sh status

(6)启动kafka

/app/kafka_2.9.2-0.8.2.1/bin/kafka-server-start.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &

尾部加上&的作用是可以启动完之后直接按回车退出,继续下一步操作。也可不加&

(7)创建topic

/app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --create --topic idoall_testTopic --replication-factor 6 --partitions 2 --zookeeper hadoop1:2181


可通过指令查看所有的topic     /app/kafka_2.9.2-0.8.2.1/bin/kafka-topics.sh --list --zookeeper hadoop2:2181

(8)发送消息
/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-producer.sh --broker-list hadoop3:9092 --sync --topic idoall_testTopic

(9)消费消息。重新打开一个终端执行指令

/app/kafka_2.9.2-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --topic idoall_testTopic --from-beginning
在原终端中输入消息,新终端中会显示出输入的消息。

关闭kafka指令。  /app/kafka_2.9.2-0.8.2.1/bin/kafka-server-stop.sh /app/kafka_2.9.2-0.8.2.1/config/server.properties &

java调用kafka

首先创建一个java project,网上很多说需创建maven工程,本人经过测验,发现maven工程和普通的java project均可调用。

需要注意,工程所采用的jar包,可以在相应版本的kafka安装文件夹的lib目录下引用,不同版本的jar包可能不通用,

如果出现java.lang.NoClassDefFoundError: scala/reflect/ClassManifest的报错,可能是由于jar包不匹配引起的。


Producer端代码:

package com;


import java.util.Date;
import java.util.Properties;
import java.text.SimpleDateFormat;   

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class Producertest {
     
     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("zk.connect", "hadoop1:2181/kafka,hadoop2:2181/kafka,hadoop3:2181/kafka,hadoop4:2181/kafka,hadoop5:2181/kafka,hadoop6:2181/kafka");
         // serializer.class为消息的序列化类
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
         props.put("metadata.broker.list", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
         // 设置Partition类, 对队列进行合理的划分
         //props.put("partitioner.class", "idoall.testkafka.Partitionertest");
         // ACK机制, 消息发送需要kafka服务端确认
         props.put("request.required.acks", "1");

          props.put("num.partitions", "6");
         ProducerConfig config = new ProducerConfig(props);
         Producer<String, String> producer = new Producer<String, String>(config);
         for (int i = 0; i < 10; i++)
         {
           // KeyedMessage<K, V>
           //   K对应Partition Key的类型
           //   V对应消息本身的类型
//   topic: "test", key: "key", message: "message"
           SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");      
           Date curDate = new Date(System.currentTimeMillis());//获取当前时间      
           String str = formatter.format(curDate);   
            
           String msg = "idoall.org" + i+"="+str;
           String key = i+"";
           producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg));
         }
       }
}

Consumer端代码:

package com;
import java.util.HashMap;
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  
   
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;

public class Consumertest extends Thread{
      
     private final ConsumerConnector consumer;  
    private final String topic;  

    public static void main(String[] args) {  
      Consumertest consumerThread = new Consumertest("idoall_testTopic");  
        consumerThread.start();  
    }  
    public Consumertest(String topic) {  
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());  
        this.topic =topic;  
    }  

private static ConsumerConfig createConsumerConfig() {  
    Properties props = new Properties();  
    // 设置zookeeper的链接地址
    props.put("zookeeper.connect","hadoop1,hadoop2,hadoop3,hadoop4,hadoop5,hadoop6:2181");  
    // 设置group id
    props.put("group.id", "1");  
    // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
    props.put("auto.commit.interval.ms", "1000");
    props.put("zookeeper.session.timeout.ms","10000");  
    return new ConsumerConfig(props);  
}  

public void run(){  
     //设置Topic=>Thread Num映射关系, 构建具体的流
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  
    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
 
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();  
    System.out.println("*********Results********");  
    while(it.hasNext()){  
        System.err.println("get data:" +new String(it.next().message()));  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  
}


之后运行代码。

运行Producer端代码,会发现之前的客户端中会接收到代码中发送的消息。

运行consumer端代码,在终端中输入消息,eclipse中会读取到发送的消息,打印出来。

至此,简单的java调用kafka操作完成。

如果将Kafka在zookeeper的默认目录,修改为自定义目录时,在运行过程中会报出java.lang.IllegalArgumentException: Path length must be > 0”错误

网上找了好久,发现别人说这是一个Bug,由于initZk()方法没有对路径进行处理导致

原代码:

private def initZk(): ZkClient = {
   info("Connecting to zookeeper on " + config.zkConnect)
   val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   ZkUtils.setupCommonPaths(zkClient)
   zkClient
}

 

解决代码:


private def initZk(): ZkClient = {
   info("Connecting to zookeeper on " + config.zkConnect)
   val chroot = {      if (config.zkConnect.indexOf("/") > 0)
       config.zkConnect.substring(config.zkConnect.indexOf("/"))      else
       ""
   }    if (chroot.length > 1) {
     val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
     val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
     info("Created zookeeper path " + chroot)
     zkClientForChrootCreation.close()
   }
   val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   ZkUtils.setupCommonPaths(zkClient)
   zkClient
}


如果无法修改,那么可以将自定义目录修改成原来的默认目录,则不会报错。


本文作者:网友 来源:CSDN
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读
也许感兴趣的
我们推荐的
主题最新
看看其它的