kafak安装与使用
1.前言
学习kafka的基础是先把kafka系统部署起来,然后简单的使用它,从直观上感觉它,然后逐步的深入了解它。
本文介绍了kafka部署方法,包括配置,安装和简单的使用。
2.kafka下载和安装
kafka版本一直在更新,且每次更新,变化均比较大,如配置文件有改动,kafka 0.7到0.8.1版本变化很大,包括加入,支持集群内复制,支持多个数据目录,请求处理改为异步,实现partition动态管理,基于时间的日志段删除
2.1下载地址:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz。
kafka目录结构
说明:涂黑部分为我自己创建文件夹
目录 | 说明 |
bin | 操作kafka的可执行脚本,还包含windows下脚本 |
config | 配置文件所在目录 |
libs | 依赖库目录 |
logs | 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller |
2.1 安装以及启动kafka
步骤1:
lizhitao@localhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ cd kafka_2.10-0.8.1.1.tgz
步骤2:
配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)
进入kafka安装工程根目录编辑 vim config/server.properties 修改属性zookeeper.connect=ip:8081,ip2:8082
步骤3:
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect
kafka server端config/server.properties参数说明和解释如下:
server.properties配置属性说明
根据属性说明完成配置
broker.id = 1
port = 9092
步骤4: 启动服务
cd kafka-0.8.1
lizhitao@localhost:~$ bin/kafka-server-start.sh config/server.properties
[2014-04-16 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-04-16 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
步骤5:创建topic
lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
步骤6:验证topic是否创建成功
lizhitao@localhost:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
步骤7:发送一些消息验证,在console模式下,启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
步骤7:启动一个consumer
lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
3.配置kafka集群模式,需要由多个broker组成
步骤1:
因为需要在同一个目录(config)下配置多个server.properties,操作步骤如下:
lizhitao@localhost:~$ cp config/server.properties config/server-1.properties
lizhitao@localhost:~$ cp config/server.properties config/server-2.properties
步骤2:
需要编辑并设置如下文件属性:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
启动服务
lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-1.properties &
...
lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-2.properties &
...
步骤3:
创建topic
lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
.....
topic created success....
lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3Configs:
Topic: my-replicated-topic Partition: 0Leader: 1Replicas: 1,2,0Isr: 1,2,0
描述topic中分区,同步副本情况
lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1Configs:
Topic: test Partition: 0 Leader: 0Replicas: 0Isr: 0
步骤4:作为生产者发送消息
lizhitao@localhost:~$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
步骤5:消费topic数据
lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
步骤6:
检查consumer offset位置
lizhitao@localhost:~$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
本文作者:李志涛 来源:CSDN博客
CIO之家 www.ciozj.com 微信公众号:imciow