发布时间:2025-06-24 18:35:56  作者:北方职教升学中心  阅读量:800


删除操作总是先删除最旧的日志# 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。

Kafka清理Topic消息

参考链接:https://cloud.tencent.com/developer/article/1590094

快速配置删除法

  1. kafka启动之前,在server.properties配置delete.topic.enable=true

  2. 执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manage集群管理工具删除。log.retention.hours=4# 当剩余空间低于log.retention.bytes字节,则开始删除1oglog.retention.bytes=37580963840# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除log.retention.check.interval.ms=1000

    offset删除数据

    # 生成数据# 1. 创建一个新的topic test, 3个分区,1个副本I have no name!@ape-kafka-0:/$ kafka-topics.sh --create--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topictest--partitions3--replication-factor 1Created topic test.# 2. 生成随机消息100条kafka-verifiable-producer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topictest--max-messages 100# 3. 查看topic消息有多少I have no name!@ape-kafka-0:/$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topictest--time-1test:0:0test:1:100test:2:0# 4. 将配置文件编辑如下,将会将partition 重0删除到49,50并不会删除cat<<EOF>offset.json{"partitions":[{"topic":"test", "partition":1, "offset":50}], "version":1}EOF# 5. 执行删除kafka-delete-records.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --offset-json-file offset.json# 6. 取出消息,看是否符合预期,实际测试0-49被删除了kafka-console-consumer.sh --bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topictest--from-beginning
如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,并且在Zookeeper中的/admin/delete_topics下创建对应的子节点,加上配置,重启kafka,之前的topic就真正删除了

  • 优点由Kafka来完成Topic的相关删除,只需要修改server.properties配置文件的delete.topic.enable为true就可以了

  • 缺点:需要重启Kafka来完成配置文件的生效

  • # 默认是false,注意等号前后一定不能有空格,否则配置会不生效delete.topic.enable=true# Bitnami Chart环境变量设置(涉及重启了)KAFKA_CFG_DELETE_TOPIC_ENABLE=true# 创建新的Topic logstash_test(拥有3个副本)kafka-topics.sh --create--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_test --partitions1--replication-factor 3# 查看Topic logstash_test的状态,发现Leader是1(broker.id=0),有三个备份分别是0,1,2I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_testTopic: logstash_test    TopicId: 1j9d-WGVTzKTpGdTtO0YFQ PartitionCount: 1ReplicationFactor: 3Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0Leader: 0Replicas: 0,2,1 Isr: 0,2,1	# 查看Zookeeper上的Topic$ zkCli.sh -serverlocalhost:2181[zk: localhost:2181(CONNECTED)0]ls/brokers/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test][zk: localhost:2181(CONNECTED)1]ls/config/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]# 查看Kafka的server.properties配置文件中log.dirs 的目录I have no name!@ape-kafka-0:/$ ls/bitnami/kafka/data/logstash_test-0/00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata# 删除Topic logstash_testI have no name!@ape-kafka-0:/$ kafka-topics.sh --delete--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_test# 再次查看Topic logstash_test的状态,说明Topic已经被删除了I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_testError whileexecuting topic command:Topic 'logstash_test'does not exist as expected[2024-06-26 03:13:45,323]ERROR java.lang.IllegalArgumentException: Topic 'logstash_test'does not exist as expected        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:399)at kafka.admin.TopicCommand$TopicService.describeTopic(TopicCommand.scala:311)at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)at kafka.admin.TopicCommand.main(TopicCommand.scala)(kafka.admin.TopicCommand$)# 再次查看Zookeeper上的Topic,logstash_test也已经被删除了[zk: localhost:2181(CONNECTED)2]ls/brokers/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog][zk: localhost:2181(CONNECTED)3]ls/config/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog]# 再次查看/log.dirs 目录,logstash_test相关日志也被删除了I have no name!@ape-kafka-0:/$ ls/bitnami/kafka/data/logstash_test*ls: cannot access '/bitnami/kafka/data/logstash_test*':No such fileor directory

    手动删除数据

    1. 优点:不需要重启Kafka服务,直接删除Topic对应的系统日志,然后在Zookeeper中删除对应的目录
    2. 缺点:需要人为手动删除,删除之后重新创建同名的Topic会有问题(使用方式一不会有此问题)
    3. 不对推荐使用这个方法:简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over
    # 创建新的Topic logstash_test(拥有3个副本)I have no name!@ape-kafka-0:/$ kafka-topics.sh --create--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3--partitions1--topiclogstash_testWARNING: Due to limitations inmetric names, topics with a period ('.')or underscore ('_')could collide. To avoid issues it is best to use either, but not both.Created topic logstash_test.# 查看Topic logstash_test的状态,发现Leader是1(broker.id=1),有三个备份分别是0,1,2I have no name!@ape-kafka-0:/$ kafka-topics.sh --describe--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_testTopic: logstash_test    TopicId: S7bPYklqRXy6GB8Qwq67_A PartitionCount: 1ReplicationFactor: 3Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0Leader: 1Replicas: 1,0,2 Isr: 1,0,2	# 查看Zookeeper上的Topic[zk: localhost:2181(CONNECTED)0]ls/brokers/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test][zk: localhost:2181(CONNECTED)1]ls/config/topics[__consumer_offsets, frontend_invoke_queue, frontend_invoke_result_log, lake_add_namelist, lake_entrylog, logstash_test]# 查看Kafka的server.properties配置文件中log.dirs的目录I have no name!@ape-kafka-0:/$ ls/bitnami/kafka/data/logstash_test-0/00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint  partition.metadata# 删除Zookeeper上的Topic[zk: localhost:2181(CONNECTED)5]deleteall /brokers/topics/logstash_test[zk: localhost:2181(CONNECTED)6]deleteall /config/topics/logstash_test# 删除Topic logstash_test的log文件(这里Kafka集群的所有节点都要删除)rm-rf/bitnami/kafka/data/logstash_test*# 查询还有哪些topicI have no name!@ape-kafka-0:/$ kafka-topics.sh --list--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092__consumer_offsetsfrontend_invoke_queuefrontend_invoke_result_loglake_add_namelistlake_entryloglogstash_test# 再次查看Topic logstash_test的状态,可以发现topic还是存在的,这个时候需要手动删除一下topic(数据已清理)I have no name!@ape-kafka-1:/$ kafka-topics.sh --describe--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_testTopic: logstash_test    TopicId: S7bPYklqRXy6GB8Qwq67_A PartitionCount: 1ReplicationFactor: 3Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824Topic: logstash_test    Partition: 0Leader: 1Replicas: 1,0,2 Isr: 1,0,2# 删除之后创建同名的Topic会有问题I have no name!@ape-kafka-1:/$ kafka-topics.sh --create--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --replication-factor 3--partitions1--topiclogstash_testWARNING: Due to limitations inmetric names, topics with a period ('.')or underscore ('_')could collide. To avoid issues it is best to use either, but not both.Error whileexecuting topic command:Topic 'logstash_test'already exists.[2024-06-26 03:38:34,038]ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'logstash_test'already exists. (kafka.admin.TopicCommand$)# 删除topic,删除失败(重启kafka后恢复)I have no name!@ape-kafka-1:/$ kafka-topics.sh --delete--bootstrap-server ape-kafka-0.ape-kafka-headless:9092,ape-kafka-1.ape-kafka-headless:9092,ape-kafka-2.ape-kafka-headless:9092 --topiclogstash_testError whileexecuting topic command:This server does not hostthis topic-partition.[2024-06-26 03:40:30,871]ERROR org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not hostthis topic-partition. (kafka.admin.TopicCommand$)

    设置删除策略

    1. 简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over
    2. 相关参数如下,kafka启动之前,在server.properties配置
    #日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖log.cleanup.policy =delete # 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。