fetch.max.wait.ms默认500ms

发布时间:2025-06-24 19:02:58  作者:北方职教升学中心  阅读量:466


log.flush.interval.ms

每隔多久刷数据到磁盘,默认是null。

2)state:每一个主题topic下面的分区partition对应的leader和isr是谁。

——以每一个broker节点为key,后面跟上请求request,放到一个队列里面进行发送。controller争先抢占注册节点,谁先抢到,谁负责leader选举。fetch.max.wait.ms默认500ms。消费者组下所有的消费者提交offset的时候就往这个分区提交offset。

none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。

4)leader制定消费方案。

num.io.threads

默认是8

  • 数据切分后,从消费者Consumer的角度,也可以按照分区一块一块进行处理,能够提高处理的并发度。

            也就是说,当一个节点挂掉之后,其余两个分区策略不发生变化,而是增加了另外的几个。消费者获取服务器端一批消息的最大字节数。

    6)coordinator把消费方案下发给各个consumer。默认重试的次数是输入的最大值。

  • 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
    否则在重试此失败消息的时候,其他的消息可能发送成功了
    retry.backoff.ms
    两次重试之间的时间间隔,默认是 100ms。生产环境建议该值大小为 5-100ms 之间。

    2)消息队列的两种模式

            Kafka的消费模式主要有两种:一种是一对一的消费,也即点对点的通信,即一个发送一个接收。

    该参数占总核数的50%的1/3

    log.flush.interval.messages
    强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。

     消费者详细消费流程:

    (1)消费者组想要进行工作,首先需要创建一个消费者网络连接客户端  (ConsumerNetworkClient),主要用来和Kafka集群进行交互。

    2)注册完成之后选择controller节点,每个broker上都有一个对应的controller。

    (4)Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。Producer生产者

    2.1 生产者消息发送流程

    2.1.1 发送原理

    2.2 异步发送API

    2.2.1 普通异步发送

    2.2.2 带回调函数的异步发送

    2.3 同步发送API

    2.4 生产者分区

    2.4.1 分区好处

    2.4.2 默认分区规则

    2.4.3 自定义分区

    2.5 生产者如何提高吞吐量

    2.6 生产者提高数据可靠性

    2.6.1 ack应答原理

     2.6.2 可靠性分析

    2.6.3 数据重复分析

    2.7 数据去重

    2.7.1 数据传递语义

    2.7.2 幂等性

    2.7.3 生产者事务

    2.8 数据有序

    2.9 数据乱序

    2.10 生产者核心参数配置

    三、

    max.in.flight.requests.per.connection
    允许最多没有返回 ack 的次数, 默认为 5,开启幂等性要保证该值是 1-5 的数字。也可能丢失数据

    -1(all): 生产者发送过来的数据Leader和ISR队列里面的所有节点收齐数据后应答。

    log.cleanup.policy

    默认是delete,表示所有数据启用删除策略。

    retries 表示重试次数。

    三、

    2.10 生产者核心参数配置

    参数名称描述
    bootstrap.servers

    生产者连接集群所需的broker地址清单。

    2.4.2 默认分区规则

    1)指定分区,按分区进行划分

    2)不指定分区,指定key,按key的hashcode值%分区数

    3)不指定分区,不指定key,粘性

            第一次随机,一旦粘上一个,直到该分区挂掉(批次大小到了,或者响应时间到了)再切换分区。

    二、最多可以缓存五个请求。RabbitMQ、

    ——为什么Kafka不用Java的序列化器?

            Java的序列化传输的数据比较重。

    注意:这种方式只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。可能会丢失数据

     1:生产者发送过来的数据,Leader收到数据后应答。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。数据传输线程数。Kafka2.8.0以后也可以配置不采用ZK

            主要功能分为三块:生产者Producer、

    0.9版本之前,offset会存储在zk中。那么消费者组是如何形成的呢?由coordinator组件辅助实现消费者组的初始化和分区的分配。因此这不是一个绝对最大值。默认打开

    2.7.3 生产者事务

    说明:开启事务,必须开启幂等性

    (2)CNC前期进行一些准备工作,首先调用 sendFetchs方法,用来抓取数据的初始化。但如果所有的offset都存储在zk中,那么消费者customer会和zk进行大量的交互,导致网络上数据传输非常频繁,传输压力过大。

    ——发送数据的条件:

            1. 只有数据累积到batch.size之后,sender才会发送数据,默认16k

            2. 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。发送完请求之后,会通过回调方法onSuccess将对应的结果拉取过来。ActiveMQ、

    目录

    一、

    注意2:消费者组之间互不影响,消费者组只是逻辑上的一个订阅者。

    (3)Kafka 中副本分为:Leader 和 Follower。

    2)如果是下游数据处理不及时:提高每批次拉取的数量

    注意:消费者能看到的最大的offset是4,Kafka是只有主副本全部将该数据落磁盘之后才对消费者进行可见。

    漏消费:先提交offset后消费,有可能会造成数据的漏消费。gzip、容易产生数据倾斜。

    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

              假如现在有7个分区,3个消费者,排序后的分区将会是0,1,...,6,消费者排序完之后将会是C0,C1,C2。

    3.2 Broker工作流程

    1)每台broker节点启动之后,都会向zk注册, 增加对应节点。

    1)所有的消费者都会主动向coordinator发送请求,加入到组当中。Interceptors(拦截器)后,才会处理数据。topic越多,C0消费的分区会比其他消费者明显多消费N个分区。

    enable.auto.commit默认值为true

            通过Kafka可以进行缓冲,我们可以将海量的数据先放到Kafka中,Kafka集群按照Hadoop的上传速度进行文件的传输。

    c. 流量削峰

            有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

    log.index.interval.bytes
    默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的减少分配的变动,可以节省大量的开销。

    a. 点对点模式

    • 一个生产者,一个消费者,一个topic,会删除数据(使用不多)

    b. 发布/订阅模式

    • 多个生产者,多个消费者,而且相互独立
    • 多个topic
    • 不会删除数据

     1.4 基础架构

    1)生产者:100T数据

    2)broker

            (1)broker即服务器,如hadoop101/hadoop102/hadoop103

            (2)topic主题,对数据分类

            (3)分区

            (4)可靠性:副本

            (5)leader、副本拉去线程数。

    partition是物理上的概念,每个partition对应于一个log文件

    log.retention.mskafka中数据保存的时间,毫秒级别,默认关闭。但是如果某 些broker宕机会导致Leader Partition过于集中在其他少部分broker,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。

    heartbeat.interval.ms

    Kafka消费者和coordinator之间的心跳时间,默认3s。-1与all等价。

    (4)拉去过来的数据会放在消息队列queue中

    (5)数据拉过来之后,消费者FetchedRecords从队列中抓取数据。Kafka 生产者只会把数据发往 Leader,

    然后 Follower 找 Leader 进行同步数据。通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

    5)leader把消费方法发送给coordinator。

    4.3.1 Range + 再平衡

    1)Range分区策略原理

            Range是对每个topic而言的。

    4.1 customer工作流程

    4.1.1 消费者总体工作流程

    1)生产者producer向每一个分区的leader发送数据

    2)follower主动和leader同步数据,保证数据的可靠性

    3)consumer可以消费某一个分区的数据,也可以消费多个分区的数据。 默认值是-1,-1 和 all是等价的。自动提交

    auto.commit.interval.ms如果enable.auto.commit为true,则该值定义了消费者偏移量向Kafka提交的频率。
    auto.offset.reset
    earliest:自动重置偏移量到最早的偏移量。

    fetch.min.bytes默认1字节

    public static void main(String[] args) {// 1. 配置信息    Properties properties = new Properties();    // 连接    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFFIG, "hadoop02:9092,hadoop03:9092");    // 反序列化    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class);    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserizlizer.class);    // 组id    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");// 2. 创建消费者    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);    // 指定位置进行消费    Set<TopicPartition> assignment = kafkaConsumer.assignment;    // 保证分区分配方案已经制定完毕    while(assignment.size() == 0){        kafkaConsumer.poll(Duration.ofSeconds(1));        assignment = kafkaConsumer.assignment();    }    // 指定消费的offset    for(TopicPartition topicPartition : assignment){        kafkaConsumer.seek(topicPartition, 100);    }// 3.订阅主题     ArrayList<String> topics = new ArrayList<>();    topics.add("first");    kafkaConsumer.subscribe(topics);// 4. 消费数据    while(true){        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));        for(ConsumerRecord<String, String> consumerRecord : consumerRecords){            System.out.println(consumerRecord);        }    }}

    4.4.5 指定时间消费

            需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 

     4.1.3 消费者组的初始化流程

            生产者把数据发送到Kafka集群,由消费者组中的消费者进行数据消费。于是引入了Kafka。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。因此Kafka提供了手动提交offset的API。RoundRobin、

    batch.size
    缓冲区一批数据最大值, 默认 16k

     2)index文件中保存的offset为相对offset,可以确保offset的值所占空间不会过大。

    数据可靠性分析

            如果分区副本设置为1个,或者ISR里应答的最小副本数量(默认为1)设置为1,和ack=1的效果是一样的,仍有丢数的风险。

            粘性分区是Kafka从0.11.x版本开始引入这种分配策略。

    (2)基于大小:默认关闭

            生产环境中,acks=0很少使用;acks=1一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

    a. 解耦合

            耦合:当实现某个功能的时候,直接接入当前接口

            解耦合:利用消息队列,将相应的消息发送到消息队列。默认策略是Range+CooperativeSticky 。

    2)使用 

    开启参数enable.idempotence默认为true,false关闭。单位 ms, 默认值是 0ms,表示没有延迟。期间会准备一些参数:

            Fetch.min.bytes        每批次最小抓取的字节数,默认1字节

            fetch.max.wait.ms     一批数据最小值未达到的超时时间,默认500ms

            Fetch.max.bytes        每批次最大抓取大小,默认50m

    (3)准备完毕之后调用send方法,发送请求。

    fetch.max.bytes

    默认default:50M

    2)异步发送

            将外部数据发送到队列DQueue中,不管数据是否发送到Kafka集群中。

    -1(all):生产者发送过来的数据,Leader+和 isr 队列里面所有节点收齐数据后应答。

    整个参数值要占总核数的50%

    num.replica.fetchers

    默认是1

    session.timeout.ms

    Kafka消费者和coordinate之间连接超时时间,默认45s。但是,如果Kafka中没有数据,消费者会陷入循环,一直返回空数据。

    leader.imbalance.check.interval.seconds
    默认值 300 秒

    5)controller不存储数据,会将leader信息和isr信息上传到ZK。

    支持压缩类型:none、

    max.poll.records一次poll拉去数据参会消息的最大条数,默认500条一般不建议修改,交给系统自己管理。

    ——应答成功,清除掉所有的请求request,同时清理分区的数据。

            Max.poll.records        一次拉去数据返回消息的最大条数,默认500条

    (6)数据拉去过来之后,经过parseRecord(反序列化)、

    一般不建议修改解耦

    发布/订阅:消息的发布者不会将消息直接发送给特点的订阅者,而是将发布的消息(数据)分为不同的类型,订阅者只接收感兴趣的消息,根据需求选择性订阅。

    生产者Producer:对接外部设备(外部数据)

    消费者Consumer:处理数据

    Topic:存储数据。

    超过该值,该消费者被移除,消费者组执行再平衡。如消息的key是用户ID,value是用户的资料。为了查看该系统主题,将配置文件 config / consumer.properties 中添加配置 exclude. internal.topics=false 

    3.3 Kafka副本Follower

    3.3.1 Kafka副本基本信息

    (1)Kafka 副本作用:提高数据可靠性。

    2.5 生产者如何提高吞吐量

    • batch.size:批次大小,默认16k
    • linger.ms:等待时间默认为0,修改为5-100ms
    • compression.type:压缩snappy
    • RecordAccumulator:缓冲区大小默认为32M,修改为64m

    2.6 生产者提高数据可靠性

    2.6.1 ack应答原理

    0:生产者发送过来的数据,不需要等数据落盘应答。

    1)消息队列的应用场景

            主要应用场景包括:缓存/消峰、Topic。

    该值比如小于session.timeout.ms,也不应高于session.timeout.ms的1/3。

    4.4.2 自动提交offset

    自动提交offset的相关参数:

            enable.auto.commit:是否开启自动提交offset功能,默认是true

            auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

    // 是否自动提交offset    propertoes.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 提交offset的时间周期为1000ms,默认为5000ms    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

    4.4.3 手动提交offset

            虽然自动提交offset十分便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。

    offset.topic.num.partitions

    __consumer_offsets的分区数。单位ms,默认为0ms,表示没有延迟。合理控制分区的任务,可以实现负载均衡的效果。

    4)controller决定leader选举:在isr中存活为前提,按照AR中(AR启动的时候会有固定的顺序)排在前面的优先。两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会失败重试;而异步提交则没有失败重试机制,可能提交失败。指定消息队列名字 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); consumer.subscribe(topics);// 5. 消费数据 while(true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { // 如果数据过程过程中失败,可以将相关位置记录下来 System.out.println( record.topic() + "\t" + record.offset() + "\t" + record.partition() + "\t" + "key:" + record.key() + "\t" + "value:" + record.value() + "\t" + record.timestamp() ); consumer.commitAsync(); } } }}

    4.2.2 独立消费者案例(订阅分区)

    public class MyConsumer {    public static void main(String[] args) {// 1. 配置        Properties pro = new Properties();    // 连接        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.153.139:9092");    // 反序列化        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);    // 组id        pro.put(ConsumerConfig.GROUP_ID_CONFIG, "GROUP1");// 2. 创建一个消费者        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(pro);// 3. 订阅主题对应的分区        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();        topicPartitions.add(new TopicPartition("bigdata",0));        consumer.assign(topicPartitions);// 4. 消费数据        while(true){            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));            for(ConsumerRecords<String, String> record : records){                System.out.println(record);            }        }    }}

    4.3 生产经验——分区的分配以及再平衡

            一个consumer group中有多个consumer组成,一个topic有多个partition组成。自动Leader Partition平衡。 Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。每个segment包含:

            .log                  日志文件

            .index              偏移量索引文件

            .timeindex        时间戳索引文件

    每个segment大小为1G

    log文件和 index文件:

     1)index为稀疏索引

    4.3.2 RoundRobin + 再平衡

    1)RoundRobin分区策略原理

            RoundRobin 针对集群中所有Topic而言