1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > kafka官方文档学习笔记2--QuickStart

kafka官方文档学习笔记2--QuickStart

时间:2020-05-03 18:51:29

相关推荐

kafka官方文档学习笔记2--QuickStart

下载kafka

/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压安装包

> tar -xzf kafka_2.11-1.0.0.tgz> cd kafka_2.11-1.0.0/bin

查看bin目录下主要几个脚本功能如下:

注:kafka的安装包除了包括kafka自身的工具以外,也包括了一系列简易的zookeeper工具,能够通过zookeeper-server-start.sh脚本启动简易的单点zookeeper实例,供kafka使用。但一般仅限于测试环境使用;

config目录下存放的是kafka服务、自带zk服务以及基于命令行的生产者、消费者工具对应的配置文件,常用如下:

启动zk服务,默认端口:2181

> bin/zookeeper-server-start.sh config/zookeeper.properties[-01-16 20:22:52,327] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)...

启动kafka服务,默认端口:9092

> bin/kafka-server-start.sh config/server.properties[-01-16 20:23:52,758] INFO KafkaConfig values:...

经过如上两步,我们就启动了一个简易的kafka集群(具有1个zookeeper实例和1个kafka实例的集群)

查看zookeeper中存放的kafka信息

> bin/zookeeper-shell.sh localhost:2181Connecting to localhost:2181Welcome to ZooKeeper!JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:nullls /[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]ls /brokers[ids, topics, seqid]ls /brokers/topics[test]ls /brokers/ids[0]

"ls /"命令列出了zk根节点下的所有元素,可以看到kafka在zk中存放了集群(cluster)、实例(brokers)、消费者(consumers)等信息;zookeeper服务作为kafka的元数据管理服务,因而每次对kafka服务操作都需要指定zookeeper服务的地址,以便于获取kafka的元数据,连接到正确的kafka集群;

创建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testCreated topic "test".

创建一个名为test的topic,包含1个复本,1个分区;

查看集群中的所有topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181test

启动生产者,并写入测试消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test> Hello World1> I'm a programer

启动消费者,接收消息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningHello World1I'm a programer

可以看到生产者写入的消息,都能够立刻被消费者接收并打印出来。需要注意的是,生产者和消费者通过topic这个概念来建立联系,只有消费者指定与生产者相同的topic,才能够消费其产生的消费;

删除topic

> bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testTopic test is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true.

建立多个kafka实例的集群

拷贝配置文件,修改实例ID、日志目录、监听端口:

> cp config/server.properties config/server-1.properties> cp config/server.properties config/server-2.properties

修改配置项如下:

config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2

启动实例:

> bin/kafka-server-start.sh config/server-1.properties &...> bin/kafka-server-start.sh config/server-2.properties &...

新建topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topicCreated topic "my-replicated-topic".

新建一个名为my-replicated-topic的topic,有3个副本和1个分区;

查看topic状态描述

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

topic上有几个partition,就会展示几行记录;字段含义如下:

leader:标识当前partition的leader节点是那个,通过broker.id标识;一个partition只有一个leader节点,负责接收和处理读写请求;replicas:标识当前partition的所有副本所在的节点,无论节点是否是leader节点,也无论节点是否"存活",通过broker.id标识;isr:标识存活且与leader节点同步的节点,即可用的副本(包括leader借点);通过broker.id标识;

查看最初创建的test状态描述:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

可以看到,因为test只有1个副本、1个partition,所以只能分布在一个实例上;

模拟leader切换

对于包含多个副本的topic而言,当一个副本所在的实例不可用时,将会从其它可用副本中选择一个作为leader;

在集群节点都正常的情况下,查看topic的状态:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

关掉broker.id=0的实例,再次查看,发现leader节点已经切换,同时isr中不包含"不可用"节点0:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2

重新启动broker.id=0的实例,再次查看,发现isr中包括了节点0,说明可用。

使用kafka connect导入/导出数据

kafka connect是kafka与外部系统交互的工具,通过运行不同的connector,实现与不同外部系统的交互,包括数据的导入/导出。如下模拟从文件导入数据到kafka,以及从kafka导出数据到文件;

首先,创建文件,写入测试数据:

> cd kafka_2.11-1.0.0> echo "Hello World" > test.txt

注:一定是在kafka根目录中创建名为test.txt的文件,否则不会读取;

2.启动2个单点的connector,这两个connector都是kafka自带的,一个用于读取文件写入topic,另一个用于将topic中数据导出到文件;

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties[-01-17 10:37:32,568] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)

connect-console-source.properties文件内容:

name=local-console-source# connector入口connector.class=org.apache.kafka.connect.file.FileStreamSourceConnectortasks.max=1# connector关联的topictopic=connect-test

connect-console-sink.properties文件内容:

name=local-console-sink# connector入口connector.class=org.apache.kafka.connect.file.FileStreamSinkConnectortasks.max=1# connector关联的topictopics=connect-test

在kafka根目录可以看到生成了名为test.sink.txt的文件,其中的内容即为test.txt中的内容,持续向test.txt中append内容,test.sink.txt中的内容也随之append;

注:因为同步过程是监听文件的增量变化,如果改变test.txt中旧有内容,则旧数据不发生变化,覆盖同一行旧数据,貌似会产生一个空行;

整个同步过程是:

test.txt -> FileStreamSourceConnector -> connect-test(topic) -> FileStreamSinkConnector -> test.sink.txt

由于是通过topic存放过往数据,因此在topic中也可以看到相应的数据:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning{"schema":{"type":"string","optional":false},"payload":"Hello World"}{"schema":{"type":"string","optional":false},"payload":""}{"schema":{"type":"string","optional":false},"payload":"Hello World1"}{"schema":{"type":"string","optional":false},"payload":"Hello World2"}

使用kafka stream处理数据

参考官方文档:/10/documentation/streams/quickstart

kafka生态

kafka周边包含很多组件,参看wiki:/confluence/display/KAFKA/Ecosystem

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。