发布时间:2025-06-24 19:00:58  作者:北方职教升学中心  阅读量:574


而Zookeeper可以提供分布式锁和选举的功能,因此Kafka可以利用Zookeeper来实现leader选举。

综上所述,Kafka依赖Zookeeper主要是为了协调分布式系统、因此,在安装kafka之前需要先安装JDK

Kafka 为什么依赖 Zookeeper

  • 1.协调分布式系统:Kafka是一个分布式系统,各个节点之间需要进行协调和同步,而Zookeeper正是为分布式系统提供协调和同步的服务的。

    自定义字段
    当我们的元数据没办法支撑我们的业务时,我们还可以自定义添加一些字段
    filebeat.inputs:- type: log  enabled: truepaths:    - /opt/elk/logs/*.log  tags: ["web", "test"]#添加自定义tag,便于后续的处理fields:  #添加自定义字段from: web-test  fields_under_root: true#true为添加到根节点,false为添加到子节点中setup.template.settings:  index.number_of_shards: 3output.console:  pretty: trueenable: true

    添加完成后,重启 filebeat

    ./filebeat -e-cshengxia-log.yml
    filebeat.inputs:-type:log  enabled:truepaths:-/opt/elk/logs/*.logtags:["web","test"]fields:from:web-test  fields_under_root:falsesetup.template.settings:index.number_of_shards:1output.elasticsearch:hosts:["192.168.40.150:9200","192.168.40.137:9200","192.168.40.138:9200"]
    Logstash 配置

    1.修改Logstash 配置文件(下面 output 将日志打印到本地,观察日志是否采集到,日志格式是否正确)

    cat>/usr/local/logstash-7.16.1/config/logstash.conf <<"EOF"input {  kafka {    bootstrap_servers => "10.0.5.163:9092"    topics => ["hosts_10-0-5-163"]    group_id => "test"    codec => "json"  }} filter {  if [type] == "access" {    json {      source => "message"      remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]    }  }} output {  stdout {    codec=>rubydebug  }}EOF

    2.执行前台启动命令

    #查看是否已存在进程,将其停止ps-ef|greplogstash |grep-vgrep|awk'{print $2}'|xargskill-9#启动 Logstashlogstash -f/usr/local/logstash-7.16.1/config/logstash.conf 

    3.查看kafka Group 和队列信息

    #进入kafka 安装目录cd/usr/local/kafka_2.12-3.4.0/bin#查看所有topic./kafka-topics.sh  --bootstrap-server 10.0.5.163:9092 --lis#查看Group./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --list#查看队列./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --grouptest--describe

    在这里插入图片描述
    4.修改配置文件,将output 将日志写入elasticsearch

    cat>/usr/local/logstash-7.16.1/config/logstash.conf <<"EOF"input {  kafka {    bootstrap_servers => "10.0.5.163:9092"    topics => ["hosts_10-0-5-163"]    group_id => "test"    codec => "json"  }}filter {  if [type] == "access" {    json {      source => "message"      remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]    }  }} output{  if [type] == "access" {    elasticsearch {      hosts => ["http://127.0.0.1:9200"]      user => "elastic"      password => "elk@2023"      index => "access-%{+YYYY.MM.dd}"    }  }  else if [type] == "messages" {    elasticsearch {      hosts => ["http://127.0.0.1:9200"]      user => "elastic"      password => "elk@2023"      index => "messages-%{+YYYY.MM.dd}"    }  }}EOF

    4.后台启动 Logstash

    #查看是否已存在进程,将其停止ps-ef|greplogstash |grep-vgrep|awk'{print $2}'|xargskill-9#启动 Logstashnohuplogstash -f/usr/local/logstash-7.16.1/config/logstash.conf  >/tmp/logstash.log 2>&1&

    查看服务日志是否正常

    查看日志是否有 ERROR 持续输出tailf /tmp/logstash.log #查看logstash 端口是否监听netstat-lntp|grep9600
元数据管理和领导选举。

Elastic 中国社区官方博客
https://blog.csdn.net/ubuntutouch/category_9209092.html

Kafka

kafka的源代码是基于Scala语言编写的,运行在Java虚拟机(即:JVM)上。

  • 3.领导选举:Kafka的一个分区只会分配给一个broker进行读写,而这个broker就是该分区的leader。
  • 2.元数据管理:Kafka的元数据包括了集群的配置、Zookeeper提供了一个分布式的文件系统,可以方便地存储和管理这些元数据信息。broker的状态等信息,而这些信息需要被所有的Kafka节点共享和维护。


    ZK安装
    来源于apache

    1.下载
    下载地址:https://zookeeper.apache.org/releases.html

    2.解压安装包

    tar-zxfapache-zookeeper-3.7.1-bin.tar.gz -C/usr/local//usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper-3.7.1/

    3.拷贝配置文件,

    cp/usr/local/zookeeper-3.7.1/conf/zoo_sample.cfg /usr/local/zookeeper-3.7.1/conf/zoo.cfg

    4.修改配置文件

    #在配置文件中加一行监听本机 IP 即可clientPortAddress=10.0.5.163

    zookeeper默认会占用8080端口,如果你本机已有服务在使用8080,可以把下面参数添加到zoo.cfg 文件里,自定义端口
    admin.serverPort=8001

    5.启动zk

    /usr/local/zookeeper-3.7.1/bin/zkServer.sh start

    6.查看端口是否监听

    netstat-lntp|grep2181

    如果服务未监听,请查看日志排查问题
    more zookeeper-root-server-VM-5-163-centos.out


    kafka 部署

    1.下载

    下载地址:https://kafka.apache.org/downloads

    2.解压安装包

    tar-zxfkafka_2.12-3.4.0.tgz -C/usr/local/

    3.修改kafka配置

    vim/usr/local/kafka_2.12-3.4.0/config/server.properties #修改 zk 的IPzookeeper.connect=10.0.5.163:2181 #修改监听地址listeners=PLAINTEXT://10.0.5.163:9092

    4.启动kafka

    nohup/usr/local/kafka_2.12-3.4.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.4.0/config/server.properties >/tmp/kafka.log 2>&1&

    5.查看端口是否监听

    netstat-lntp|grep9092

    Flebeat部署

    原理流程如下:
    首先是input输入,可以指定多个数据输入源,然后通过通配符进行日志文件的匹配
    匹配到日志后,就会使用Harvester(收割机),将日志源源不断的读取到来
    然后收割机收割到的日志,就传递到Spooler(卷轴),然后卷轴就在将他们传到对应的地方

    1.下载

    wgethttps://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.16.1-linux-x86_64.tar.gz

    2.解压二进制包

    tarzxf filebeat-7.16.1-linux-x86_64.tar.gz -C/usr/local/mv/usr/local/filebeat-7.16.1-linux-x86_64/ /usr/local/filebeat-7.16.1

    3.创建 Filebeat 配置文件

    #备份模板文件mv/usr/local/filebeat-7.16.1/filebeat.yml /usr/local/filebeat-7.16.1/filebeat.yml.bak#创建配置文件cat>/usr/local/filebeat-7.16.1/filebeat.yml <<"EOF"filebeat.inputs:- type: log  tail_files: true  backoff: "1s"  paths:      - /var/log/nginx/access.json.log  fields:    type: access  fields_under_root: true- type: log  tail_files: true  backoff: "1s"  paths:      - /var/log/messages  fields:    type: messages  fields_under_root: trueoutput:  kafka:    hosts: ["10.0.5.163:9092"]    topic: hosts_10-0-5-163EOF

    4.启动Fielbeat

    #查看是否已存在进程,将其停止ps-ef|grepfilebeat |grep-vgrep|awk'{print $2}'|xargskill-9#启动Filebeatnohup/usr/local/filebeat-7.16.1/filebeat  -e-c/usr/local/filebeat-7.16.1/filebeat.yml >/tmp/filebeat.log 2>&1&#查看进程ps-ef|grepfilebeat #查看是否与ZK建立连接netstat-ntp|egrep-w'9092|filebeat'

    Fielbeat使用

    启动

    ./filebeat -e-cshengxia.yml

    yaml文件介绍

    filebeat.inputs:# filebeat input输入-type:stdin    # 标准输入enabled:true# 启用标准输入setup.template.settings:index.number_of_shards:3# 指定下载数output.console:# 控制台输出pretty:true# 启用美化功能enable:true

    输送至ElasticSearch或者Logstash,在Kibana中实现可视化
    然后我们在控制台输入hello,就能看到我们会有一个json的输出,是通过读取到我们控制台的内容后输出的,内容如下

    {"@timestamp":"2023-05-31T22:57:58.700Z",  "@metadata":{#元数据信息    "beat":"filebeat",    "type":"_doc",    "version":"8.8.1"},  "log":{"offset":0,    "file":{"path":""}},  "message":"hello",#元数据信息"input":{#控制台标准输入    "type":"stdin"#元数据信息  },  "ecs":{"version":"8.0.0"},  "host":{"name":"elk-node1"},  "agent":{#版本以及主机信息    "id":"5d5e4b99-8ee3-42f5-aae3-b0492d723730",    "name":"elk-node1",    "type":"filebeat",    "version":"8.8.1",    "ephemeral_id":"24b4fd16-5466-4d7e-b4b8-b73d41f77de0"}}参考文档:https://blog.csdn.net/qq_52589631/article/details/131216188

    再次创建一个文件,叫 shengxia-log.yml,然后在文件里添加如下内容

    filebeat.inputs:-type:log  enabled:truepaths:-/opt/elk/logs/*.logsetup.template.settings:index.number_of_shards:3output.console:pretty:trueenable:true

    添加完成后,我们在到下面目录创建一个日志文件

    # 创建文件夹mkdir-p/opt/elk/logs# 进入文件夹cd/opt/elk/logs# 追加内容echo"hello world">>test.log

    然后再次启动filebeat

    ./filebeat -e-cshengxia-log.yml

    能够发现,它已经成功加载到了我们的日志文件 test.log
    同时我们还可以继续往文件中追加内容
    追加后,我们再次查看filebeat,也能看到刚刚我们追加的内容
    检测到日志文件有更新,立刻就会读取到更新的内容,并且输出到控制台。当leader宕机后,需要从剩余的broker中选举一个新的leader。