时间戳和可选的 元数据标头

发布时间:2025-06-24 17:55:54  作者:北方职教升学中心  阅读量:907


这是一个示例事件:

事件键:白富美
事件的值:向土豪 支付了520
事件时间戳:“yyyy 0520 13:14”
生产者是那些向Kafka 发布(写入)事件的客户端应用程序。
topic:topicA,partition:1
topic:topicA,partition:1
topic:topicA,partition:0
topic:topicA,partition:0
topic:topicA,partition:0

18.生产者_拦截器

19.生产者_拦截器二

20.生产者_消息序列化一

21.生产者_消息序列化二

添加依赖
  }
   
public void setName(String name) {
       
this.name = name;
  }
   
public int getAge() {
       
return age;
  }
   
public void setAge(int age) {
       
this.age = age;
  }
   
public String getAddress() {
       
return address;
  }
   
public void setAddress(String address)
{
       
this.address = address;
  }
}
393
编写自定义序列化类
<dependency>
<groupId>org.codehaus.jackson</groupId>
   
<artifactId>jackson-mapper
asl</artifactId>
   
<version>1.9.13</version>
</dependency>
package com.itbaizhan.kafka.producer;
import
org.apache.kafka.common.serialization.Seri
alizer;
import
org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class UserSerializer implements
Serializer<UserVo>{
   
private ObjectMapper objectMapper;
   
@Override
   
public void configure(Map<String, ?>
configs, boolean isKey) {
       
objectMapper = new ObjectMapper();
     
//Serializer.super.configure(configs,
isKey);
  }
404
编写生产者程序
   
@Override
   
public byte[] serialize(String topic,
UserVo data) {
       
byte[] ret = null;
       
try{
           
ret =
objectMapper.writeValueAsString(data)
                 
.getBytes(StandardCharsets.UTF_8);
      } catch(IOException e) {
           
throw new
SerializationException("Error when
serializing UserVo to byte[],exception is
" + e.getMessage());
      }
       
return ret;
  }
   
@Override
   
public void close() {
       
objectMapper = null;
       
//Serializer.super.close();
  }
}
package com.itbaizhan.kafka.producer;
import
org.apache.kafka.clients.producer.*;
41import
org.apache.kafka.common.serialization.Stri
ngSerializer;
import java.util.Properties;
import
java.util.concurrent.ExecutionException;
public class UserSerProducer{
   
public static void main(String[] args)
throws ExecutionException,
InterruptedException{
       
//TODO 1.声明并实例化Kafka Producer
配置文件对象
       
Properties prop = new
Properties();
       
//TODO 2.为配置文件对象设置参数
       
// 2.1 配置bootstrap_servers
     
prop.put(ProducerConfig.BOOTSTRAP_SERVERS
_CONFIG,"node2:9092,node3:9092,node4:9092"
);
       
// 2.2 配置keyvalue的序列化类
     
prop.put(ProducerConfig.KEY_SERIALIZER_CL
ASS_CONFIG,
StringSerializer.class.getName());
     
prop.put(ProducerConfig.VALUE_SERIALIZER_
CLASS_CONFIG,
UserSerializer.class.getName());
42       
//TODO 3.声明并实例化生产者对象 注意
value的泛型类型
       
KafkaProducer<String,UserVo>
producer = new KafkaProducer<String,
UserVo>(prop);
       
//TODO 4.发送消息
       
UserVo userVo = new
UserVo("tuhao",18,"北京");
       
producer.send(new
ProducerRecord<String,UserVo>("topicA",
userVo),
           
new Callback() {
               
//如下方法在生产者收到acks确认
时异步调用
               
@Override
               
public void
onCompletion(RecordMetadata
recordMetadata, Exception e) {
                   
if(e == null){
                       
//无异常信息,输出主题
和分区信息到控制台
                     
System.out.println("topic:"+recordMetadat
a.topic()
                             
+",partition:"+recordMetadata.partition()
);
                  }else{ //打印异常信息
                     
System.out.println(e.getMessage());
                  }
              }
435
node2上开启Kafka消费者进行消费
6
运行UserSerProducer
7
观察node2Kafka消费者消费消息的情况
实时效果反馈
1. 关于Kafka生产者消息序列化的描述,正确的是:
A
默认提供了序列化类,如BytesSerializerKafka 中的主题始
终是多生产者和多订阅者:一个主题可以N(N>=0)个向其写入事件
的生产者,以及订阅这些事件的N(N>=0)个消费者。
B
自定义序列化类需要实现
org.apache.kafka.common.serialization.Serializer
主题:事件被组织并持久地存储在主题中。值、例如,生产者
永远不需要等待消费者。
关闭测试:kafka.sh stop

10.kafka入门_Topic命令行操作

11.kafka入门_消息发送和接收

章节二.生产者

12.生产者_发送数据原理剖析一

13.生产者_发送数据原理剖析二

14.生产者_同步发送数据一

15.生产者_同步发送数据二

node2上开启Kafka消费者进行消费
7
运行SyncCustomProducer
     
prop.put(ProducerConfig.KEY_SERIALIZER_CL
ASS_CONFIG,
StringSerializer.class.getName());
     
prop.put(ProducerConfig.VALUE_SERIALIZER_
CLASS_CONFIG,
StringSerializer.class.getName());
       
//TODO 3.声明并实例化生产者对象
       
KafkaProducer<String,String>
producer =
           
new KafkaProducer<String,
String>(prop);
       
//TODO 4.发送消息
       
for(int i = 0;i<5;i++){
           
//同步发送消息
           
producer.send(new
ProducerRecord<>
("topicA","sync_msg"+i)).get();
      }
       
//TODO 5.关闭生产者
       
producer.close();
  }
}
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
228
观察node2Kafka消费者消费消息的情况
生产者_异步发送数据
代码实现
1
创建类UnSyncCustomProducer
2
编写代码
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
sync_msg0
sync_msg1
sync_msg2
sync_msg3
sync_msg4

16.生产者_异步发送数据

17.生产者_异步回调发送数据

代码实现
1
创建类UnSyncCallBackCustomProducer
2
编写代码
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
unsync_msg0
unsync_msg1
unsync_msg2
unsync_msg3
unsync_msg4
package com.itbaizhan.kafka.producer;
26import
org.apache.kafka.clients.producer.*;
import
org.apache.kafka.common.serialization.Stri
ngSerializer;
import java.util.Properties;
import
java.util.concurrent.ExecutionException;
public class UnSyncCallBackCustomProducer
{
   
public static void main(String[] args)
throws ExecutionException,
InterruptedException{
       
//TODO 1.声明并实例化Kafka Producer
配置文件对象
       
Properties prop = new
Properties();
       
//TODO 2.为配置文件对象设置参数
       
// 2.1 配置bootstrap_servers
     
prop.put(ProducerConfig.BOOTSTRAP_SERVERS
_CONFIG,"node2:9092,node3:9092,node4:9092"
);
       
// 2.2 配置keyvalue的序列化类
     
prop.put(ProducerConfig.KEY_SERIALIZER_CL
ASS_CONFIG,
StringSerializer.class.getName());
27     
prop.put(ProducerConfig.VALUE_SERIALIZER_
CLASS_CONFIG,
StringSerializer.class.getName());
       
//TODO 3.声明并实例化生产者对象
       
KafkaProducer<String,String>
producer = new KafkaProducer<String,
String>(prop);
       
//TODO 4.发送消息
       
for(int i = 0;i<5;i++){
           
//异步发送消息 不调用get()方法
           
producer.send(new
ProducerRecord<>("topicA", "unsync_msg" +
i),
               
new Callback() {
                   
//如下方法在生产者收到acks
确认时异步调用
                   
@Override
                   
public void
onCompletion(RecordMetadata
recordMetadata, Exception e) {
                       
if(e == null){
                           
//无异常信息,输
出主题和分区信息到控制台
                         
System.out.println("topic:"+recordMetadat
a.topic()
                                 
+",partition:"+recordMetadata.partition()
);
                      }else{ //打印异常信息
283
node2上开启Kafka消费者进行消费
4
运行UnSyncCallBackCustomProducer
5
观察node2Kafka消费者消费消息的情况
6
控制台输出信息
                         
System.out.println(e.getMessage());
                      }
                  }
              });
           
Thread.sleep(5);
      }
       
//TODO 5.关闭生产者
       
producer.close();
  }
}
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
[root@node2 ~]# kafka-console-consumer.sh
--bootstrap-server node2:9092 --topic
topicA
unsync_msg0
unsync_msg1
unsync_msg2
unsync_msg3
unsync_msg4
29生产者_拦截器
拦截器(Interceptor)kafka0.10.0.0版本中引入的新功能,主
要用于实现clients端的定制化控制逻辑。Kafka 提供了各种保证,例如一次性处理
事件的能力。主题中的事件
可以根据需要随时读取——与传统的消息传递系统不同,事件在消
费后不会被删除。在Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是
实现Kafka 众所周知的高可扩展性的关键设计元素。
C
生产者序列化机制使用起来比较简单,需要在构造producer
对象之前指定参数key.serializervalue.serializer
IntegerSerializer

Kafka消息队列

章节一.kafka入门

4.kafka入门_消息队列两种模式

5.kafka入门_架构相关名词

Kafka入门_架构相关名词
事件记录了世界或您的业务中发生了某事的事实。时间戳和可选的
元数据标头。Kafka
的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非
常好的

6.kafka入门_基础架构

7.kafka入门_下载安装一

8.kafka入门_下载安装二

9.kafka入门_集群启停脚本

Kafka入门_集群启停脚本
[root@node2 opt]# vim /etc/profile
# kafka的环境变量
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@node2 opt]# source /etc/profile
[root@node2 ~]# kafka-topics.sh --version
3.0.1 (Commit:8e30984f43e64d8b)
kafka-server-start.sh -daemon
/opt/kafka/config/server.properties
[root@node2 opt]# jps
3248QuorumPeerMain
3761Jps
3736Kafka
kafka-server-stop.sh
[root@node2 opt]# cd /root/
11[root@node2 ~]# mkdir bin/
[root@node2 ~]# cd bin/
[root@node2 bin]# vim kafka.sh
#!/bin/bash
if[ $# -lt 1]
then
echo "Please input arg:[start/stop]"
exit
fi
case $1 in
start)
fori innode2 node3 node4
do
   
echo "--------start $i's kafka--------"
   
ssh $i/opt/kafka/bin/kafka-server-start.sh
-daemon/opt/kafka/config/server.properties
done
;;
stop)
fori innode2 node3 node4
do
   
echo "--------stop $i's kafka--------"
   
ssh $i/opt/kafka/bin/kafka-server-stop.sh
done
;;
*)
echo "Arg ErrorPlease input arg:
[start/stop]"
exit
;;  
esac

参数 描述

--bootstrap-server
node3:9092
连接的Kafka Broker 主机名称和端口号
--topic
<String: topic> 比如:topicA
操作的topic 名称
--list
查看所有主题
--create
创建主题
--delete
删除主题
--alter
修改主题
--describe
查看主题详细描述
--partitions
<Integer: # of partitions>
设置分区数
--replication-factor
<Integer: replication factor>
设置分区副本
--config
<String: name=value>
更新系统默认的配置
--version
查看当前系统kafka的版本
添加可执行权限:[root@node2 bin]# chmod +x kafka.sh
启动测试:kafka.sh start 注意:提前启动zk集群。
          });
       
Thread.sleep(50);
       
//TODO 5.关闭生产者
       
producer.close();
  }
}

22.生产者_分区的优势

23.生产者_分区策略

24.生产者_分区实战一

25.生产者_分区实战二

26.生产者_自定义分区机制一

27.生产者_自定义分区机制二

28.生产者_消息无丢失

29.生产者_数据去重

30.生产者_数据去重_幂等性

31.生产者_数据去重_事务原理分析

32.生产者_数据去重_事务代码实现

章节三.BROKER

33.BROKER_ZOOKEEPER存储信息

34.BROKER_工作流程

35.BROKER_服役新节点

36.BROKER_退役节点

37.BROKER_replica

章节四.消费者

38.消费者_消费方式

39.消费者_消费规则

40.消费者_独立消费主题实战

41.消费者_独立消费主题实战一

package com.itbaizhan.kafka.consumer;
import
org.apache.kafka.clients.consumer.Consumer
Config;
import
org.apache.kafka.clients.consumer.Consumer
Record;
import
org.apache.kafka.clients.consumer.Consumer
Records;
99import
org.apache.kafka.clients.consumer.KafkaCon
sumer;
import
org.apache.kafka.common.serialization.Stri
ngDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
//创建一个独立消费者,消费topicA主题下的数据
public class CustomTopicConsumer{
   
public static void main(String[] args)
{
       
//1.创建消费者属性文件对象
       
Properties prop = new
Properties();
       
//2.为属性对象设置相关参数
       
//设置kafka服务器
     
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS
_CONFIG,"node2:9092");
       
//设置keyvalue的序列化类
     
prop.put(ConsumerConfig.KEY_DESERIALIZER_
CLASS_CONFIG,
             
StringDeserializer.class.getName());
100     
prop.put(ConsumerConfig.VALUE_DESERIALIZE
R_CLASS_CONFIG,
             
StringDeserializer.class.getName());
       
//设置消费者的消费者组的名称
     
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"
testCg");
       
//3.创建消费者对象
       
KafkaConsumer<String,String>
kafkaConsumer =
               
new KafkaConsumer<String,
String>(prop);
       
//4.注册要消费的主题
       
/*ArrayList<String> topics = new
ArrayList<>();
       
topics.add("topicA");
       
kafkaConsumer.subscribe(topics);*/
     
kafkaConsumer.subscribe(Arrays.asList("to
picA"));
       
//5.拉取数据并打印输出
       
while(true){
           
//6.设置1s消费一批数据
           
ConsumerRecords<String,
String> consumerRecords =
                 
kafkaConsumer.poll(Duration.ofSeconds(1))
;
           
//7.打印输出消费到的数据
1016
运行CustomTopicConsumer
7
Kafka 集群控制台,创建Kafka 生产者,并输入数据。在文档中
也称为记录或消息。
自定义拦截器需要实现
org.apache.kafka.clients.producer.ProducerInterceptor接口。
9
Ctrl+C关闭生产者
10
消费者程序

42.消费者_独立消费主题实战二

43.消费者_消费者组概述

44.消费者_消费者组实战

45.消费者_offset剖析

46.消费者_offset自动提交

47.消费者_offset手动提交

48.消费者_offset手动提交实战

同步提交:
package com.itbaizhan.kafka.consumer;
import
org.apache.kafka.clients.consumer.ConsumerConfi
g;
import
org.apache.kafka.clients.consumer.ConsumerRecor
d;
import
org.apache.kafka.clients.consumer.ConsumerRecor
ds;
118import
org.apache.kafka.clients.consumer.KafkaConsumer
;
import
org.apache.kafka.common.serialization.StringDes
erializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerHandSyncCommit{
   
public static void main(String[] args) {
       
//1.创建属性对象
       
Properties prop = new Properties();
       
//2.设置相关参数
     
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONF
IG,
             
"node2:9092,node3:9092,node4:9092");
     
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS
_CONFIG,
             
StringDeserializer.class.getName());
     
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLA
SS_CONFIG,
             
StringDeserializer.class.getName());
       
//配置消费者组
119异步提交:
     
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghan
dSyncCommit");
       
//设置为非自动提交
     
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CON
FIG,false);
       
//3.创建消费者对象
       
KafkaConsumer<String,String> consumer=
               
new KafkaConsumer<String,
String>(prop);
       
//4.注册消费主题
     
consumer.subscribe(Arrays.asList("topicA"));
       
//5.消费数据
       
while(true){
           
ConsumerRecords<String, String>
records =
                 
consumer.poll(Duration.ofSeconds(1));
           
for(ConsumerRecord record:records){
             
System.out.println(record.value());
          }
           
//6.同步提交offset
           
consumer.commitSync();
      }
  }
}
package com.itbaizhan.kafka.consumer;
120import
org.apache.kafka.clients.consumer.ConsumerConfi
g;
import
org.apache.kafka.clients.consumer.ConsumerRecor
d;
import
org.apache.kafka.clients.consumer.ConsumerRecor
ds;
import
org.apache.kafka.clients.consumer.KafkaConsumer
;
import
org.apache.kafka.common.serialization.StringDes
erializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerHandASyncCommit{
   
public static void main(String[] args) {
       
//1.创建属性对象
       
Properties prop = new Properties();
       
//2.设置相关参数
     
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONF
IG,
             
"node2:9092,node3:9092,node4:9092");
121     
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS
_CONFIG,
             
StringDeserializer.class.getName());
     
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLA
SS_CONFIG,
             
StringDeserializer.class.getName());
       
//配置消费者组
     
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cghan
dAsyncCommit");
       
//设置为非自动提交
     
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CON
FIG,false);
       
//3.创建消费者对象
       
KafkaConsumer<String,String> consumer=
               
new KafkaConsumer<String,
String>(prop);
       
//4.注册消费主题
     
consumer.subscribe(Arrays.asList("topicA"));
       
//5.消费数据
       
while(true){
           
ConsumerRecords<String, String>
records =
                 
consumer.poll(Duration.ofSeconds(1));
           
for(ConsumerRecord record:records){
122             
System.out.println(record.value());
          }
           
//6.同步提交offset
           
consumer.commitAsync();
      }
  }
}

Spark分布式计算框架一

章节一.概述

1.课程介绍

2.概述_什么是Spark?

3.概述_Spark主要功能

4.概述_SPARK与hadoop

5.概述_spark技术栈

6.概述_PYSPARK VS SPARK

章节二.运行模式

7.运行模式_概述

8.运行模式_WORDCOUNT一

9.运行模式_WORDCOUNT二

10.运行模式_local模式安装

11.运行模式_local模式webui

12.运行模式_Spark目录介绍

13.运行模式_spark源码解析

14.运行模式_spark-submit

15.运行模式_standalone架构分析

16.运行模式_standalone模式安装一

17.运行模式_standalone模式安装二

18.运行模式_standalone启动测试

19.运行模式_standalone执行任务

20.运行模式_查看历史日志webui

21.运行模式_standaloneHA安装

22.运行模式_standalone测试

23.运行模式_YARN模式概述

​​​​​​​

24.运行模式_YARN模式安装

25.运行模式_yarn client

26.运行模式_yarn cluster

27.运行模式_spark submit参数

8
IDEA 控制台观察接收到的数据。相反,您可以通过每个主题的配置设置来定义
Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。它可以使得用户在消息发
送前以及producer回调逻辑前有机会对消息做一些定制化需求,比
如修改消息等。同时允许指定多个Interceptor按序作用于同一条消
息从而形成一个拦截器链(Interceptor Chain)。当您向Kafka 读取或写入数据时,您以事件的
形式执行此操作。
消费者是订阅(读取和处理)这些事件的那些客户端应用程
序。从概念上讲,事件具有键、StringSerializer等。