前言:

前几天,我通过 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具,完成了对 topic 分区副本数的增加。其实 kafka-reassign-partitions.sh 不仅可以实现分区副本数的增加,它还可以实现对 topic 分区的分配。

所以对于 topic 分区分配以及分区副本数的增加,本篇小文都会讲到,图文实操,讲解详细,看完别忘了点赞哦!

Kafka:2.11-1.1.0

一、准备工作

1、创建一个 8 分区、1 副本的 topic:

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 。

1
2
3
4
5
# 创建名为create17的topic
./bin/kafka-topics.sh --create --zookeeper cdh-worker-1:2181/kafka --replication-factor 1 --partitions 8 --topic create17

# 查看名为create17的topic详情:分区数、副本数、Isr等
./bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka --topic create17

二、分区分配

已知,Kafka 集群中有两个 kafka broker ,id 分别为 200、201 ,现在再加一个 broker 节点,id 为 202 。Kafka 只会负载均衡新创建的 topic 分区。所以现在,我们需要将已存在的 create17 topic 的 8 个分区均匀分布在 3 个 broker 节点上,以便实现尽可能的负载均衡,提高写入和消费速度。

Kafka 不会对已存在的分区进行均衡分配,所以需要我们手动执行分区分配操作。

1、声明要分配分区的 topic 列表

Kafka 的 bin 目录中有一个 kafka-reassign-partitions.sh 脚本工具,我们可以通过它分配分区。在此之前,我们需要先按照要求定义一个文件,里面说明哪些 topic 需要分配分区。文件内容如下:

1
2
3
4
5
6
7
8
9
> cat topic-generate.json
{
"topics": [
{
"topic": "create17"
}
],
"version": 1
}

2、通过 –topics-to-move-json-file 参数,生成分区分配策略 –generate

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --topics-to-move-json-file topic-generate.json --broker-list "200,201,202" --generate
  • –broker-list:值为要分配的 kafka broker id,以逗号分隔,该参数必不可少。脚本会根据你的 topic-generate.json 文件,获取 topic 列表,为这些 topic 生成分布在 broker list 上面的分区分配策略。

一定要注意 –zookeeper 参数的值啊,确定好 kafka 元数据在 zk 上面的路径。

输出结果中有你当前的分区分配策略,也有 Kafka 期望的分配策略,在期望的分区分配策略里,kafka 已经尽可能的为你分配均衡。

我们先将 Current partition replica assignment 的内容备份,以便回滚到原来的分区分配状态。

然后将 Proposed partition reassignment configuration 的内容拷贝到一个新的文件中(文件名称、格式任意,但要保证内容为json格式)。

1
2
> cat partition-replica-reassignment.json
{"version":1,"partitions":[{"topic":"create17","partition":2,"replicas":[200],"log_dirs":["any"]},{"topic":"create17","partition":7,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":4,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":1,"replicas":[202],"log_dirs":["any"]},{"topic":"create17","partition":6,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":3,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":0,"replicas":[201],"log_dirs":["any"]},{"topic":"create17","partition":5,"replicas":[200],"log_dirs":["any"]}]}

3、通过 –reassignment-json-file 参数,执行分区分配策略 –execute

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute

4、通过 –reassignment-json-file 参数,检查分区分配进度 –verify

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --verify

我们再来看一下 topic : create17 的详细信息,broker 200 上有两个分区、broker 201、202 上分别有三个分区:

三、增大分区副本数

1、增加各 partition 所属的 replicas broker id

修改 partition-replica-reassignment.json 文件,增加各 partition 所属的 replicas broker id 。

1
{"version":1,"partitions":[{"topic":"create17","partition":6,"replicas":[201,200,202]},{"topic":"create17","partition":2,"replicas":[201,200,202]},{"topic":"create17","partition":4,"replicas":[201,200,202]},{"topic":"create17","partition":5,"replicas":[201,200,202]},{"topic":"create17","partition":0,"replicas":[201,200,202]},{"topic":"create17","partition":3,"replicas":[201,200,202]},{"topic":"create17","partition":1,"replicas":[201,200,202]},{"topic":"create17","partition":7,"replicas":[201,200,202]}]}

2、通过 –reassignment-json-file 参数,执行分区副本分配策略 –execute

1
./bin/kafka-reassign-partitions.sh --zookeeper cdh-worker-1:2181/kafka --reassignment-json-file partition-replica-reassignment.json --execute

然后,查看一下 topic:create17 ,发现:Replicas 列表正如上述 partition-replica-reassignment.json 描述的一样。

每个 partitiion 的所有 replicas 叫做 “assigned replicas” ,”assigned replicas” 中的第一个 replica 叫 “preferred replica”,当 kafka leader replica 挂掉的话,partition 会选择 “preferred replica” 做为 leader replica 。

但根据上述图片示例,很明显,Replicas 列表分配不均。我们可以使用 –generate 参数再生成一份分配相对均匀的分区副本策略,这样就不用我们自己排列组合了。

最后得到的分区副本策略是这样的:

1
{"version":1,"partitions":[{"topic":"create17","partition":2,"replicas":[200,202,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":7,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":4,"replicas":[202,200,201],"log_dirs":["any","any","any"]},{"topic":"create17","partition":1,"replicas":[202,201,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":6,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":3,"replicas":[201,202,200],"log_dirs":["any","any","any"]},{"topic":"create17","partition":0,"replicas":[201,200,202],"log_dirs":["any","any","any"]},{"topic":"create17","partition":5,"replicas":[200,201,202],"log_dirs":["any","any","any"]}]}

再执行一下 –execute 操作,最后的 topic:create17 的详细信息为:

这样的话,假如 broker 202 挂了的话,分区1、7 的 Leader replica 会变为 201;分区4的 Leader replica 会变为 200 ,比上面咱们手动生成的策略要好。

四、小结

1、本文介绍了使用 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具完成对 topic 的分区分配、分区副本增加操作。该脚本有三个参数:

  • –generate:配合着 –topics-to-move-json-file 可以生成分区分配策略,该参数适用于分区多的情况。
  • –execute:配合着 –reassignment-json-file 可以执行分区分配策略。
  • –verify:配合着 –reassignment-json-file 可以检查分区分配进度。

通过以上命令,是既可以分配分区,也可以增加分区副本数,非常方便。

我们大多情况都是在新增 broker 时,选择分配分区,修改的是 Partition Replicas。

2、也简单介绍了 kafka preferred replica ,它是 “assigned replicas” 中的第一个 replica 。当 kafka leader replica 停掉的话, partition 会选择 “preferred replica” 做为 leader replica 。

抛一个小疑问,如果分区 leader 都在同一 broker 上,如何让其负载均衡到每个 broker 上呢?我们在《kafka 如何对 topic 分区 replica leader 进行负载均衡》揭晓。

好了,本篇 kafka 实操就到这里结束啦,感觉有帮助的朋友,希望能点个赞哦!