版本

HDP:3.0.1.0

Kafka:2.11-1.1.1

本文章部分内容摘自 朱忠华老师的《深入理解Kafka:核心设计与实践原理》,也特别推荐广大读者购买阅读。

一、生产者概述

《Kafka基础(一):基本概念及生产者、消费者示例》中,我们介绍了Kafka的架构,基本概念及生产者、消费者示例,本章主要介绍 Kafka 的生产者相关知识。

1、生产流程

生产者用于生产数据,比如将用户的活动记录、度量指标、日志信息等存储到 Kafka 中,供消费者消费。生产者 发送消息的主要流程如下图所示:

  • 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键Key 以及 值Value,其中 Topic 和 Value 是必须要声明的,Partition 和 Key 可以不用指定。
  • 调用 send() 方法进行消息发送。
  • 因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key 和 value 对象序列化成字节数组。
  • 接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。
  • 接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 broker 上。
  • broker 成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)。如果发送失败,可以选择重试或者直接抛出异常。

2、ProducerRecord

再着重说一下 ProducerRecord 构造方法,该方法的参数 topic 和 value 属性是必填项,其余属性(比如:分区号、时间戳、key、headers)是选填项。对应的 ProducerRecord 的构造方法也有多种:

使用者可根据场景来选择合适的 ProducerRecord 。

3、生产者属性配置

关于生产者的属性有很多,其中有三个属性是必要要配置的,分别为:bootstrap.servers、key.serializer、value.serializer 。代码示例如下所示:

1
2
3
4
5
6
7
8
9
10
private static final String BROKERLIST = "node71.xdata:6667,node72.xdata:6667,node73.xdata:6667";
private static final String TOPIC = "test";

private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERLIST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
  • bootstrap.servers:该属性指定 brokers 的地址清单,格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其它 broker 的信息。建议至少提供两个 broker 的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。

  • key.serializer:将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会用这个类把 key 序列化为字节数组。

  • value.serializer:和 key.serializer 一样,用于 value 的序列化。

以上三个属性是必须要配置的,下面还有一些别的属性可以不用配置,默认。

  • acks:此配置指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数保障了消息发送的可靠性。默认值为 1。

    • acks=0。生产者不会等待服务器的反馈,该消息会被立刻添加到 socket buffer 中并认为已经发送完成。也就是说,如果发送过程中发生了问题,导致服务器没有接收到消息,那么生产者也无法知道。在这种情况下,服务器是否收到请求是没法保证的,并且参数retries也不会生效(因为客户端无法获得失败信息)。每个记录返回的 offset 总是被设置为-1。好处就是由于生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
    • acks=1。只要集群leader收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达leader节点(比如leader节点崩溃,新leader还没有被选举出来),生产者会收到一个错误的响应,为了避免丢失消息,生产者会重发消息(根据配置的retries参数确定重发次数)。不过如果一个没有收到消息的节点成为首领,消息还是会丢失,这个时候的吞吐量取决于使用的是同步发送还是异步发送。
    • acks=all。只有当集群中参与复制的所有节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,但是延迟最高。
  • buffer.memory:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。默认值为33554432 字节。如果应用程序发送消息的速度超过发送到服务器的速度,那么会导致生产者内存不足。这个时候,send() 方法会被阻塞,如果阻塞的时间超过了max.block.ms (在kafka0.9版本之前为block.on.buffer.full 参数)配置的时长,则会抛出一个异常。

  • compression.type:该参数用于配置生产者生成数据时可以压缩的类型,默认值为 none(不压缩)。还可以指定snappy、gzip或lz4等类型,snappy 压缩算法占用较少的 CPU,gzip 压缩算法占用较多的 CPU,但是压缩比最高,如果网络带宽比较有限,可以使用该算法,使用压缩可以降低网络传输开销和存储开销,这往往是 kafka 发送消息的瓶颈所在。

  • retries:该参数用于配置当生产者发送消息到服务器失败,服务器返回错误响应时,生产者可以重发消息的次数,如果达到了这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,可以通过 retry.backoff.on 参数来改变这个时间间隔。

更多的属性配置可参考官网:http://kafka.apachecn.org/documentation.html#configuration

二、生产者发送消息的三种方式

Kafka 生产者发送消息有三种方式,分别为:普通发送(发后即忘)、同步发送、异步发送。

1、普通发送(发后即忘)

性能高,可靠性差,易发生信息丢失。如果我们不关心发送结果,那么就可以使用此种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @description: 方式一:发后即忘,性能高,可靠性差,易发生信息丢失。
* 如果没有指定分区号,在ProducerRecord里面指定每条消息的key值,会根据key值来判断发往哪个分区。
* 如果指定分区号,会忽略对key值得判断,直接将消息发送到指定分区。
* @param: props
* @return: void
*/
private static void fireAndForgetSend(Properties props) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = null;
List<PartitionInfo> partitions = producer.partitionsFor(TOPIC);
// 得到主题的分区数
int numPartitions = partitions.size();
System.out.println(numPartitions);
for (int i = 1; i <= 10; i++) {
String messageStr = "睡觉了,这是第" + i + "条数据";
// 当指定发送消息的分区时,程序就不会根据key值再判断发往哪个分区了。
record = new ProducerRecord<>(TOPIC, 0, String.valueOf(i), messageStr);
//生产者发布消息到KAFKA_TEST,若Topic不存在则自动创建。
producer.send(record);
try {
// 时间间隔1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}

2、同步发送

和上面普通发送消息一样,只不过这里我们调用了 Future 对象的 get() 方法来等待 kafka 的响应,程序运行到这里会产生阻塞,直到获取 kafka 集群的响应。而这个响应有两种情况:

 (1)正常响应:返回一个 RecordMetadata 对象,通过该对象我们能够获取消息的偏移量、分区等信息。

 (2)异常响应:基本上来说会发生两种异常:

  • 一类是可重试异常,该错误可以通过重发消息来解决。比如连接错误,可以通过再次连接后继续发送上一条未发送的消息;再比如集群没有首领(no leader),因为我们知道集群首领宕机之后,会有一个时间来进行首领的选举,如果这时候发送消息,肯定是无法发送的。

  • 一类是无法重试异常,比如消息太大异常,对于这类异常,KafkaProducer 不会进行任何重试,直接抛出异常。

同步发送消息适合需要保证每条消息的发送结果,优点是能够精确的知道什么消息发送成功,什么消息发送失败,而对于失败的消息我们也可以采取措施进行重新发送。缺点则是增加了每条消息发送的时间,当发送消息频率很高时,此种方式便不适合了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @description: 方式二:同步发送消息,可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,可以捕获并进行相应的处理。
* 性能较"发后即忘"的方式差,需要阻塞等待一条消息发送完再发送下一条信息。
* @param: props
* @return: void
*/
private static void syncSend(Properties props) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "sync send!");
try {
// 方式一
// producer.send(record).get();
// 方式二
// future代表一个任务的声明周期。
Future<RecordMetadata> future = producer.send(record);
// 获取消息的元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。
// 如果在应用代码中需要这些信息,可以使用这种方式。如果不需要,可采用方式一的写法。
RecordMetadata metadata = future.get();
System.out.println(metadata.topic() + " - " + metadata.partition() + " - " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
producer.close();
}

3、异步发送

单纯的send()方法就是异步请求,不过与 ”发后即忘“ 方式不同的是,我们需要对发送失败的消息进行异常日志记录,方便日后分析。异步发送也可以获取每条记录的详细信息。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @description: 方式三:异步发送消息,增加一个回调函数。单纯的send()方法也是异步请求。
* @param: props
* @return: void
*/
private static void asyncSend(Properties props) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "async send!");
producer.send(record, (recordMetadata, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println(recordMetadata.topic() + " - " + recordMetadata.partition() + " - " + recordMetadata.offset());
}
});
producer.close();
}

三、生产者拦截器

1、拦截器概述

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。ProducerInterceptor 接口中包含 3 个方法:

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
  • void onAcknowledgement(RecordMetadata var1, Exception var2);

  • void close();

KafkaProducer 在将消息序列化和计算分区之前,会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic 、key 和 partition 等信息。如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩(Log Compaction)的功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

2、自定义拦截器

下面通过一个示例来演示生产者拦截器的具体用法,ProducerInterceptorPrefix 中通过 onSend() 方法来为每条信息添加一个前缀 “prefix1 - ”,并且通过 onAcknowledgement() 方法来计算消息发送的成功率。ProducerInterceptorPrefix 的具体实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.hdp.project.kafka.producer;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
* @author CREATE_17
* @description: 生产者的拦截器
* @date 2019/6/19 13:08
*/
public class ProducerInterceptorPrefix implements ProducerInterceptor {

private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;

@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
String modifiedValue = "prefix1 - " + producerRecord.value();
ProducerRecord<String, String> record = new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(), modifiedValue, producerRecord.headers());
return record;
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}

@Override
public void close() {
double successRatio = (double) sendSuccess / (sendSuccess + sendFailure);
System.out.println("发送成功率:" + String.format("%f", successRatio * 100) + "%");
}

@Override
public void configure(Map<String, ?> map) {

}
}

实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数 interceptor.classes 中指定这个拦截器,此参数的默认值为“”,示例如下:

1
2
// 指定拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());

消息发送成功之后,如果消费的话,就可以发现每条消息的前缀都加上了 “prefix1 - ”。

KafkaProducer 可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行。配置的时候,各拦截器之间使用逗号隔开,示例如下:

1
2
// 指定多个拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName() + "," + xxx.class.getName());

四、序列化器

1、序列化器概述

生产者需要用序列化器(Serializer)把对象转化为字节数组才能通过网络发送给 Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换为相应的对象。上面对应程序中的序列化器也使用了客户端自带的 org.apache.kafka.common.serialization.StringSerializer,除了用于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,该接口有三个方法:

  • void configure(Map<String, ?> var1, boolean var2);
  • byte[] serialize(String var1, T var2);

  • void close();

configure() 方法用来配置当前类,serialize() 方法用来执行序列化操作,close() 方法用来关闭当前的序列化器,一般情况下,close() 是一个空方法。生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。

2、StringSerializer

接下来,我们看一下 Kafka 自带的 StringSerializer ,将 String 类型转为 byte[] 类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package org.apache.kafka.common.serialization;

import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;

public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

public StringSerializer() {
}

public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}

if (encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}

}

public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}

public void close() {
}
}

3、自定义序列化器

下面通过一个示例来演示生产者自定义序列化器的具体用法。

假设我们要发送的消息都是Company对象,这个Company的定义很简单,只有name和address,示例代码参考如下(为了构建方便,示例中使用了 lombok 工具):

idea 安装 lombok 插件,并在 pom.xml 文件内添加:

1
2
3
4
5
<!-- lombok工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

Company 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.hdp.project.kafka.producer;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author CREATE_17
* @description: 定义Company对象
* @date 2019/6/20
*/
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Company {
private String name;
private String address;
}

Company 类对应的序列化器 CompanySerializer :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.hdp.project.kafka.producer;

import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

/**
* @author CREATE_17
* @description: 自定义序列化器
* @date 2019/6/20
*/
public class CompanySerializer implements Serializer<Company> {

@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String s, Company company) {
if (company == null) {
return null;
}
byte[] name, address;
try {
if (company.getName() != null) {
name = company.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}

if (company.getAddress() != null) {
address = company.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}


@Override
public void close() {

}
}

实现自定义的 CompanySerializer 之后,需要在 KafkaProducer 的配置参数 value.serializer 中指定这个序列化器。假如我们要发送一个 Company 对象到Kafka,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.hdp.project.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* @author CREATE_17
* @description: 自定义生产者,结合自定义序列化器。
* @date 2019/6/21 0021
*/
public class ProducerSelfSerializer {

private static final String BROKERLIST = "node71.xdata:6667,node72.xdata:6667,node73.xdata:6667";
private static final String TOPIC = "test";

private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKERLIST);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 自定义序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());
props.put("client.id", "producer.client.id.demo");
props.put("retries", 3);
// acks有三个匹配项,均为字符串类型,分别为:"1","0","all或-1"。
props.put(ProducerConfig.ACKS_CONFIG, "1");
return props;
}

public static void main(String[] args) {
Properties properties = initConfig();
KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);
// 数据消息
Company company = Company.builder().name("xiaoliang").address("shandongqingdao").build();
ProducerRecord<String, Company> record = new ProducerRecord<>(TOPIC, company);
try {
// 经过尝试,必须指定.get(),不指定的话,虽然程序不报错,但数据生产不成功。
producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}

执行 ProducerSelfSerializer 类,发送一条 Company 对象到 Kafka ,“xiaoliangshandongqingdao” 记录就被存储到了 test 主题中。

4、序列化框架

我们还可以使用已有的序列化框架:比如 JSON、Avro、Thrift 或者 Protobuf。

五、分区器

1、默认分区器

如果 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器。其原理是根据 key 这个字段来计算 partition 的值。其作用就是为消息分配分区。

Kafka 提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了 org.apache.kafka.clients.producer.Producer 接口,这个接口定义了 2 个方法,具体如下所示:

  • int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
  • void close();

其中 partition() 方法用来计算分区号,返回值为 int 类型。partition() 方法中的参数分别为:主题、key、序列化后的 key、value、序列化后的 value,以及集群的元数据信息。通过这些可以实现功能丰富的分区器。close() 方法在关闭分区器的时候用来回收一些资源。

在默认分区器 DefaultPartitioner 的实现中,如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同 key 的消息会被写入到同一分区。如果 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区中。

注意:如果 key 不为 null,那么计算得到的分区号会是所有分区中的任意一个;如果 key 为 null,那么计算得到的分区号仅为可用分区中的任意一个。请注意两者之间的差别。

2、自定义分区器

如果想实现自定义分区器,需要实现 org.apache.kafka.clients.producer.Producer 接口,重写 partition() 方法。在实现了自定义分区器之后,需要通过配置参数 partitioner.class 来指定这个分区器,示例如下:

1
2
// 假设自定义分区器的名字叫DemoPartitioner
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DemoPartitioner.class.getName());

六、总结

本文主要介绍了 Kafka 生产者的相关知识,先了解了 Kafka 发送数据的流程,又介绍了生产者发送消息的三种方式,最后概述了拦截器、序列化器、分区器及其自定义写法。

关于本文中 Kafka 生产者代码已上传至 github