移除、简单的数据处理操作

发布时间:2025-06-24 17:33:53  作者:北方职教升学中心  阅读量:799


配置完成后,再次进行验证

[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf

下面是输出内容

#需要等一下,才能输出以下内容:[2024-03-14T11:49:56,399][INFO ][logstash.javapipeline    ][main]Pipeline started {"pipeline.id"=>"main"}[2024-03-14T11:49:56,443][INFO ][logstash.agent           ]Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}{"@timestamp"=>2024-03-14T03:49:56.438442963Z,      "@version"=>"1",    "user_agent"=>{"original"=>"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"},          "host"=>{"name"=>"logstash-server"},           "log"=>{"file"=>{"path"=>"/var/log/httpd.log"}},          "http"=>{"request"=>{"method"=>"GET",            "referrer"=>"http://semicomplete.com/presentations/logstash-monitorama-2013/"},         "version"=>"1.1",        "response"=>{"scode"=>200,                   "body"=>{"bytes"=>203023}}},     "timestamp"=>"04/Jan/2015:05:13:42 +0000",           "url"=>{"original"=>"/presentations/logstash-monitorama-2013/imageskibana-search.png"},       "message"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\"200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",         "event"=>{"original"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1"200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36""},        "source"=>{"address"=>"83.149.9.216"}}

你会发现原来的非结构化数据,变为结构化的数据了。在此示例中,该clientip字段包含IP地址。移除、简单的数据处理操作。

这里使用的模式是 %{COMBINEDAPACHELOG}

%{COMBINEDAPACHELOG}是一个预定义的 grok 模式,用于解析 Apache HTTP 服务器的**“combined”**日志格式。

输入插件使用来自源的数据,过滤器插件根据你的指定修改数据,输出插件将数据写入目标。

  • graphite:将事件数据发送到 Graphite,这是一个流行的用于存储和绘制度量指标的开源工具。

    grok 如何知道哪些内容是你感兴趣的呢?它是通过自己预定义的模式来识别感兴趣的字段的。

  • 通过 Elastic Cloud 无服务器上的 Elastic Observability 中的 Logstash 集成提供 Logstash 监控。为此,应该使用grok过滤器插件。

    2. 工作原理

    Logstash 事件处理管道有三个阶段:输入 → 过滤器 → 输出

  • filters:filters 是一个可选模块,可以在数据同步到目的地之前,对数据进行一些格式化、

    该geoip插件配置要求您指定包含IP地址来查找源字段的名称。

  • 消息队列 kafka、
    所以, 这里使用了 file,创建示例日志文件

    [root@logstash-server ~]# vim /var/log/httpd.log83.149.9.216 - - [04/Jan/2015:05:13:42 +0000]"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1"200203023"http://semicomplete.com/presentations/logstash-monitorama-2013/""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

    确保没有缓存数据

    [root@logstash file]# pwd/usr/local/logstash/data/plugins/inputs/file[root@logstash file]# ls -a....sincedb_aff270f7990dabcdbd0044eac08398ef[root@logstash file]# rm -rf .sincedb_aff270f7990dabcdbd0044eac08398ef#第一次执行肯定是没有的,data目录下面也没有plugins这个目录

    修改好的管道配置文件如下:

    [root@logstash-server logstash]# vim /usr/local/logstash/config/first-pipeline.conf#注释方法#####input {file{path =>["/var/log/httpd.log"]start_position =>"beginning"}}filter {grok {# 对 web 日志进行过滤处理,输出结构化的数据# 在 message 字段对应的值中查询匹配上 COMBINEDAPACHELOGmatch =>{"message"=>"%{COMBINEDAPACHELOG}"}}}output {stdout {}}

    match => { "message" => "%{COMBINEDAPACHELOG}"}的意思是:
    当匹配到 “message” 字段时,用户模式 “COMBINEDAPACHELOG}” 进行字段映射。任何类型的事件都可以通过广泛的输入、

    • grok 是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。rabbitmq 等:支持从各种消息队列读取数据。logstash-output-elasticsearch 设置默认为端口:9200。

    Logstash 到 Elasticsearch 无服务器的已知问题。

    完成后的管道配置文件如下:

    [root@logstash-server logstash]# vim config/first-pipeline.confinput {file{path =>["/var/log/httpd.log"]start_position =>"beginning"}}filter {grok {match =>{"message"=>"%{COMBINEDAPACHELOG}"}}geoip {source=>"clientip"}}output {stdout {}}

    再次输入之前的日志内容,就会看到如下输出

    #记得先删除缓存[root@logstash-server logstash]# rm -rf data/plugins[root@logstash-server logstash]# bin/logstash -f  config/first-pipeline.conf[2023-05-04T11:30:41,667][INFO ][logstash.agent           ]Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}{"host"=>"logstash-server",           "verb"=>"GET",          "geoip"=>{"country_name"=>"Russia",         "country_code2"=>"RU",              "location"=>{"lat"=>55.7527,            "lon"=>37.6172},             "longitude"=>37.6172,           "region_name"=>"Moscow",           "region_code"=>"MOW",              "timezone"=>"Europe/Moscow",         "country_code3"=>"RU",        "continent_code"=>"EU",                    "ip"=>"83.149.9.216",             "city_name"=>"Moscow",              "latitude"=>55.7527,           "postal_code"=>"129223"},          "ident"=>"-",       "clientip"=>"83.149.9.216",           "auth"=>"-",     "@timestamp"=>2023-05-04T03:30:42.063Z,        "message"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\"200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",      "timestamp"=>"04/Jan/2015:05:13:42 +0000",       "@version"=>"1",           "path"=>"/var/log/httpd.log",        "request"=>"/presentations/logstash-monitorama-2013/imageskibana-search.png",          "bytes"=>"203023",          "agent"=>"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",    "httpversion"=>"1.1",       "response"=>"200",       "referrer"=>"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\""}

    详情请参考 grok 和 geoip,更多过滤器插件的使用:过滤器插件

    #查看插件[root@logstash-server logstash]# ./bin/logstash-plugin listUsing bundled JDK: /usr/local/logstash/jdklogstash-codec-avrologstash-codec-ceflogstash-codec-collectdlogstash-codec-dotslogstash-codec-ednlogstash-codec-edn_lineslogstash-codec-es_bulklogstash-codec-fluentlogstash-codec-graphitelogstash-codec-jsonlogstash-codec-json_lineslogstash-codec-linelogstash-codec-msgpacklogstash-codec-multilinelogstash-codec-netflowlogstash-codec-plainlogstash-codec-rubydebuglogstash-filter-aggregatelogstash-filter-anonymizelogstash-filter-cidrlogstash-filter-clonelogstash-filter-csvlogstash-filter-datelogstash-filter-de_dotlogstash-filter-dissectlogstash-filter-dnslogstash-filter-droplogstash-filter-elasticsearchlogstash-filter-fingerprintlogstash-filter-geoiplogstash-filter-groklogstash-filter-httplogstash-filter-jsonlogstash-filter-kvlogstash-filter-memcachedlogstash-filter-metricslogstash-filter-mutatelogstash-filter-prunelogstash-filter-rubylogstash-filter-sleeplogstash-filter-splitlogstash-filter-syslog_prilogstash-filter-throttlelogstash-filter-translatelogstash-filter-truncatelogstash-filter-urldecodelogstash-filter-useragentlogstash-filter-uuidlogstash-filter-xmllogstash-input-azure_event_hubslogstash-input-beats└── logstash-input-elastic_agent (alias)logstash-input-couchdb_changeslogstash-input-dead_letter_queuelogstash-input-elastic_serverless_forwarderlogstash-input-elasticsearchlogstash-input-execlogstash-input-filelogstash-input-ganglialogstash-input-gelflogstash-input-generatorlogstash-input-graphitelogstash-input-heartbeatlogstash-input-httplogstash-input-http_pollerlogstash-input-imaplogstash-input-jmslogstash-input-pipelogstash-input-redislogstash-input-snmplogstash-input-snmptraplogstash-input-stdinlogstash-input-sysloglogstash-input-tcplogstash-input-twitterlogstash-input-udplogstash-input-unixlogstash-integration-aws ├── logstash-codec-cloudfront ├── logstash-codec-cloudtrail ├── logstash-input-cloudwatch ├── logstash-input-s3 ├── logstash-input-sqs ├── logstash-output-cloudwatch ├── logstash-output-s3 ├── logstash-output-sns └── logstash-output-sqslogstash-integration-elastic_enterprise_search ├── logstash-output-elastic_app_search └──  logstash-output-elastic_workplace_searchlogstash-integration-jdbc ├── logstash-input-jdbc ├── logstash-filter-jdbc_streaming └── logstash-filter-jdbc_staticlogstash-integration-kafka ├── logstash-input-kafka └── logstash-output-kafkalogstash-integration-logstash ├── logstash-input-logstash └── logstash-output-logstashlogstash-integration-rabbitmq ├── logstash-input-rabbitmq └── logstash-output-rabbitmqlogstash-output-csvlogstash-output-elasticsearchlogstash-output-emaillogstash-output-filelogstash-output-graphitelogstash-output-httplogstash-output-lumberjacklogstash-output-nagioslogstash-output-nulllogstash-output-pipelogstash-output-redislogstash-output-stdoutlogstash-output-tcplogstash-output-udplogstash-output-webhdfslogstash-patterns-core

    6. 配置接收 Beats 的输入

    # 监听 5044 端口,接收 filebeat 的输入;logstash服务器上操作[root@logstash-server logstash]# vim config/first-pipeline.confinput {beats {port =>5044}}filter {grok {match =>{"message"=>"%{COMBINEDAPACHELOG}"}}# geoip { source => "clientip" }}output {stdout {}}

    运行 logstash 之后,修改 filebeat 的 yml 文件输出目标如下:

    # filebeat 服务器上面操作:[root@filebeat-server filebeat]# vim filebeat.yml...output.logstash:  # The Logstash hostshosts: ["192.168.221.140:5044"]#IP是logstash的IP... #将 output.elasticsearch 删除,output.logstash复制到这里

    filebeat机器清除缓存目录

    [root@filebeat-server filebeat]# rm -rf /usr/local/filebeat/data/

    运行filebeat

    [root@filebeat-server filebeat]# systemctl restart filebeat.service[root@filebeat-server filebeat]# systemctl status filebeat.service● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.   Loaded: loaded (/usr/lib/systemd/system/filebeat.service;enabled;vendor preset: disabled)Active: active (running)since 四 2024-03-14 15:29:16 CST;6s ago Main PID: 1418(filebeat)CGroup: /system.slice/filebeat.service           └─1418 /usr/local/filebeat/filebeat -c/usr/local/filebeat/filebea...31415:29:16 filebeat-server systemd[1]: Stopped Filebeat sends log file....31415:29:16 filebeat-server systemd[1]: Started Filebeat sends log file....Hint: Some lines were ellipsized, use -lto show infull.

    运行logstash

    [root@logstash-server logstash]# rm -rf data/plugins[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.confUsing bundled JDK: /usr/local/logstash/jdkOpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated inversion 9.0and will likely be removed ina future release...............#可以看到 logstash 中取的是 filebeat 收集的日志信息{"input"=>{"type"=>"log"},        "source"=>{"address"=>"123.127.39.50"},          "http"=>{"request"=>{"referrer"=>"http://81.68.233.173/",              "method"=>"GET"},         "version"=>"1.1",        "response"=>{"body"=>{"bytes"=>14137},            "status_code"=>200}},           "ecs"=>{"version"=>"1.12.0"},           "log"=>{"offset"=>0,          "file"=>{"path"=>"/opt/nginx/log/nginx/access.log"}},         "agent"=>{"id"=>"afbbf9f5-d7f7-4057-a70d-fa4e3a4741fc",             "version"=>"8.12.2",                "type"=>"filebeat",        "ephemeral_id"=>"28cf958a-d735-43d4-88c0-19d4460a39f2",                "name"=>"filebeat-server"},      "@version"=>"1",          "host"=>{"containerized"=>false,         "architecture"=>"x86_64",                 "name"=>"filebeat-server",                  "mac"=>[[0]"00-0C-29-40-59-B2"],                   "id"=>"4746d2ecb7c945cdbc93de5d156817a0",                   "ip"=>[[0]"192.168.221.139",            [1]"fe80::4ee8:bb9d:ef6c:9934"],             "hostname"=>"filebeat-server",                   "os"=>{"codename"=>"Core",            "platform"=>"centos",                "name"=>"CentOS Linux",                "type"=>"linux",             "version"=>"7 (Core)",              "kernel"=>"3.10.0-1062.el7.x86_64",              "family"=>"redhat"}},    "user_agent"=>{"original"=>"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36"},       "service"=>{"type"=>"nginx"},    "@timestamp"=>2024-03-14T07:30:51.531Z,          "tags"=>[[0]"beats_input_codec_plain_applied"],           "url"=>{"original"=>"/logo.jpg"},       "fileset"=>{"name"=>"access"},       "message"=>"123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] \"GET /logo.jpg HTTP/1.1\"200 14137 \"http://81.68.233.173/\"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36\"\"-\"",     "timestamp"=>"04/Mar/2021:10:50:28 +0800",         "event"=>{"module"=>"nginx",        "original"=>"123.127.39.50 - - [04/Mar/2021:10:50:28 +0800] "GET /logo.jpg HTTP/1.1"200 14137 "http://81.68.233.173/""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36""-"",        "timezone"=>"+08:00",         "dataset"=>"nginx.access"}}{"input"=>{"type"=>"log"},           "ecs"=>{"version"=>"1.12.0"},           "log"=>{"offset"=>0,          "file"=>{"path"=>"/opt/nginx/log/nginx/error.log"}},         "agent"=>{"id"=>"afbbf9f5-d7f7-4057-a70d-fa4e3a4741fc",                "type"=>"filebeat",             "version"=>"8.12.2",        "ephemeral_id"=>"28cf958a-d735-43d4-88c0-19d4460a39f2",                "name"=>"filebeat-server"},      "@version"=>"1",          "host"=>{"containerized"=>false,         "architecture"=>"x86_64",                 "name"=>"filebeat-server",                  "mac"=>[[0]"00-0C-29-40-59-B2"],                   "id"=>"4746d2ecb7c945cdbc93de5d156817a0",                   "ip"=>[[0]"192.168.221.139",            [1]"fe80::4ee8:bb9d:ef6c:9934"],             "hostname"=>"filebeat-server",                   "os"=>{"codename"=>"Core",              "family"=>"redhat",                "name"=>"CentOS Linux",                "type"=>"linux",             "version"=>"7 (Core)",              "kernel"=>"3.10.0-1062.el7.x86_64",            "platform"=>"centos"}},       "service"=>{"type"=>"nginx"},    "@timestamp"=>2024-03-14T07:30:51.531Z,          "tags"=>[[0]"beats_input_codec_plain_applied",        [1]"_grokparsefailure"],       "fileset"=>{"name"=>"error"},       "message"=>"2021/03/04 10:50:28 [error] 11396#0: *5 open() \"/farm/bg.jpg\"failed (2: No such file or directory), client: 123.127.39.50, server: localhost, request: \"GET /bg.jpg HTTP/1.1\", host: \"81.68.233.173\", referrer: \"http://81.68.233.173/\"",         "event"=>{"module"=>"nginx",        "original"=>"2021/03/04 10:50:28 [error] 11396#0: *5 open() "/farm/bg.jpg"failed (2: No such file or directory), client: 123.127.39.50, server: localhost, request: "GET /bg.jpg HTTP/1.1", host: "81.68.233.173", referrer: "http://81.68.233.173/"",         "dataset"=>"nginx.error",        "timezone"=>"+08:00"}}
  • 请注意 Elastic Cloud 无服务器与 Elasticsearch 服务和自管理的 Elasticsearch 之间的以下差异:

    • 使用 API 密钥从 Logstash 访问 Elastic Cloud 无服务器。

    • file:将事件数据写入磁盘上的文件,也可以将数据同步到一个文件中。

      rename 可以重新命名字段

      Mutate 过滤器配置选项

      配置选项用途
      add_field向事件添加新字段
      remove_field从事件中删除任意字段
      add_tag向事件添加任意标签
      remove_tag从事件中删除标签(如果存在)
      convert将字段值转换为另一种数据类型
      id向现场事件添加唯一的ID
      lowercase将字符串字段转换为其小写形式
      replace用新值替换字段
      strip删除开头和结尾的空格
      uppercase将字符串字段转换为等效的大写字母
      update用新值更新现有字段
      rename重命名事件中的字段
      gsub用于查找和替换字符串中的替换
      merge合并数组或 hash 事件

      修改后管道配置文件如下:

      [root@logstash-server logstash]# vim config/first-pipeline.confinput {file{path =>["/var/log/httpd.log"]start_position =>"beginning"}}filter {grok {match =>{"message"=>"%{COMBINEDAPACHELOG}"}}mutate {#重写字段rename=>{"status_code"=>"scode"}}mutate {#去掉没用字段remove_field =>["message","input_type","@version","fields"]}}output {stdout {}}

      再次测试,你会发现 message不见了,而且 status_code 重命名成了 scode :

      [root@logstash-server logstash]# bin/logstash -f config/first-pipeline.confUsing bundled JDK: /usr/local/logstash/jdk......[2024-03-14T11:54:43,296][INFO ][filewatch.observingtail  ][main][17f9be0d29f2eb1e2fd3e943d4672f5fc989db530509b86b731852814b0e0a46]START, creating Discoverer, Watch with fileand sincedb collections[2024-03-14T11:54:43,304][INFO ][logstash.javapipeline    ][main]Pipeline started {"pipeline.id"=>"main"}[2024-03-14T11:54:43,315][INFO ][logstash.agent           ]Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}#如果看到 Pipeline started 成功了之后,但是还是一直不输出数据,那就说明 data 目录下面有缓存,需要将缓存删除一下,再执行这一步的操作#删除缓存[root@logstash-server logstash]# cd data/[root@logstash-server data]# lsdead_letter_queue  plugins  queue  uuid[root@logstash-server data]# ll总用量 4drwxr-xr-x 2root root  631411:19 dead_letter_queuedrwxr-xr-x 3root root 2031411:49 pluginsdrwxr-xr-x 2root root  631411:19 queue-rw-r--r-- 1root root 3631411:19 uuid[root@logstash-server data]# rm -rf plugins/[root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf...{"log"=>{"file"=>{"path"=>"/var/log/httpd.log"}},          "http"=>{"version"=>"1.1",         "request"=>{"referrer"=>"http://semicomplete.com/presentations/logstash-monitorama-2013/",              "method"=>"GET"},        "response"=>{"body"=>{"bytes"=>203023},            "scode"=>200}},    "user_agent"=>{"original"=>"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"},     "timestamp"=>"04/Jan/2015:05:13:42 +0000",          "host"=>{"name"=>"logstash-server"},    "@timestamp"=>2024-03-14T03:58:41.236243588Z,           "url"=>{"original"=>"/presentations/logstash-monitorama-2013/imageskibana-search.png"},         "event"=>{"original"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1"200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36""},        "source"=>{"address"=>"83.149.9.216"}}#你会发现 message不见了,而且 status_code 重命名成了 scode 

      5. 使用 Geoip 过滤器插件增强数据

      注意:本插件在8.1之后暂不可用,以下实验版本为:logstash-7.13.2

      geoip:geographic ip的缩写,IP地理位置数据库

      除解析日志数据以进行更好的搜索外,筛选器插件还可以从现有数据中获取补充信息。下面是常见的目的地:

      • elasticsearch:将事件数据发送到Elasticsearch。
        remove_field可以移除任意的字段,它可以接收的值是一个数组。

        进入 Logstash 的安装主目录下执行:

        [root@logstash-server logstash]# bin/logstash -e ''Using bundled JDK: /usr/local/logstash/jdkOpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated inversion 9.0and will likely be removed ina future release.
        • -e选项用于设置 Logstash 处理数据的输入和输出

        • -e ''等同于 -e input { stdin { type => stdin } } output { stdout { codec => rubydebug } }

        • input { stdin { type => stdin } }表示 Logstash 需要处理的数据来源来自于标准输入设备

        • output { stdout { codec => rubydebug } }表示 Logstash 把处理好的数据输出到标准输出设备

        稍等片刻,当看到屏幕上输出如下字样,即可尝试使用键盘输入 hello字样

        [2024-03-14T11:21:21,651][INFO ][logstash.agent           ]Pipelines running {:count=>1,:running_pipelines=>[:main], :non_running_pipelines=>[]}

        输入 hello即会立刻输出配格式化后的数据信息

        The stdin plugin is now waiting forinput:hello{"event"=>{"original"=>"hello"},          "host"=>{"hostname"=>"logstash-server"},       "message"=>"hello",      "@version"=>"1",    "@timestamp"=>2024-03-14T03:21:43.519434493Z,          "type"=>"stdin"}

        image-20240314112301915

        • message字段对应的值是 Logstash 接收到的一行完整的数据
        • @version是版本信息,可以用于建立索引使用
        • @timestamp处理此数据的时间戳,可以用于建立索引和搜索
        • type就是之前 input中设置的值,这个值可以任意修改,但是,type是内置的变量,不能修改,用于建立索引和条件判断等
        • hosts表示从那个主机过来的数据

        修改 type的值为 nginx的示例(主要是区分索引的时候用,这里改了之后没什实质效果)

        [root@logstash-server logstash]# ./bin/logstash -e "input { stdin { type => nginx } } output { stdout { codec => rubydebug } }"#稍等一会,看到 Pipeline main started,就说明启动正常了[2024-03-14T11:24:27,247][INFO ][logstash.javapipeline    ][main]Pipeline started {"pipeline.id"=>"main"}[2024-03-14T11:24:27,261][INFO ][logstash.agent           ]Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}#输入hello{"@version"=>"1",    "@timestamp"=>2024-03-14T03:24:33.458038810Z,       "message"=>"hello",          "host"=>{"hostname"=>"logstash-server"},         "event"=>{"original"=>"hello"},          "type"=>"nginx"}

        image-20240314112521291

        3. 配置输入和输出

        生产中,Logstash 管道要复杂一些:它通常具有一个或多个输入,过滤器和输出插件。

      • syslog:监听端口514以获取 syslog 消息,并根据 RFC3164 格式解析。

        使用 grok 过滤器插件,可以将非结构化日志数据解析为结构化和可查询的内容。过滤、

        提示:inputs/filters/outputs是通过插件机制扩展各种能力

        创建任意一个文件,并写入如下内容,作为 Logstash 的管道配置文件

        [root@logstash-server logstash]# vim /usr/local/logstash/config/first-pipeline.confinput {stdin {}}output {stdout {}}

        配置文件语法测试

        [root@logstash-server logstash]# bin/logstash -f config/first-pipeline.conf --config.test_and_exitUsing bundled JDK: /usr/local/logstash/jdk/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined;discarding old to_int/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/concurrent-ruby-1.1.9/lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb:13: warning: method redefined;discarding old to_fSending Logstash logs to /usr/local/logstash/logs whichis now configured via log4j2.properties[2024-03-14T11:39:36,651][INFO ][logstash.runner          ]Log4j configuration path used is: /usr/local/logstash/config/log4j2.properties[2024-03-14T11:39:36,653][INFO ][logstash.runner          ]Starting Logstash {"logstash.version"=>"8.12.2", "jruby.version"=>"jruby 9.4.5.0 (3.1.4) 2023-11-02 1abae2700f OpenJDK 64-Bit Server VM 17.0.10+7 on 17.0.10+7 +indy +jit [x86_64-linux]"}[2024-03-14T11:39:36,655][INFO ][logstash.runner          ]JVM bootstrap flags: [-XX:+HeapDumpOnOutOfMemoryError, -Dlogstash.jackson.stream-read-constraints.max-number-length=10000, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, -Djruby.regexp.interruptible=true, --add-opens=java.base/java.security=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, -Dio.netty.allocator.maxOrder=11, -Dlog4j2.isThreadContextMapInheritable=true, -Xms1g, -Dlogstash.jackson.stream-read-constraints.max-string-length=200000000, -Djdk.io.File.enableADS=true, -Dfile.encoding=UTF-8, --add-opens=java.base/java.io=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, -Djruby.compile.invokedynamic=true, -Xmx1g, -Djava.security.egd=file:/dev/urandom, -Djava.awt.headless=true, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED][2024-03-14T11:39:36,656][INFO ][logstash.runner          ]Jackson default value override `logstash.jackson.stream-read-constraints.max-string-length`configured to `200000000`[2024-03-14T11:39:36,657][INFO ][logstash.runner          ]Jackson default value override `logstash.jackson.stream-read-constraints.max-number-length`configured to `10000`[2024-03-14T11:39:36,823][WARN ][logstash.config.source.multilocal]Ignoring the 'pipelines.yml'filebecause modules or commandline options are specified[2024-03-14T11:39:37,020][INFO ][org.reflections.Reflections]Reflections took 105ms to scan 1urls, producing 132keys and 468values/usr/local/logstash/vendor/bundle/jruby/3.1.0/gems/amazing_print-1.5.0/lib/amazing_print/formatter.rb:37: warning: previous definition of cast was here[2024-03-14T11:39:37,148][INFO ][logstash.javapipeline    ]Pipeline `main`is configured with `pipeline.ecs_compatibility: v8`setting. All plugins inthis pipeline will default to `ecs_compatibility =>v8`unless explicitly configured otherwise.Configuration OK[2024-03-14T11:39:37,148][INFO ][logstash.runner          ]Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash#看到 OK 就说明测试正常

        -f用于指定管道配置文件。过滤和输出插件进行增强和转换,许多本地编解码器进一步简化了摄入过程。

        inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、你想解析日志消息,以便能从日志中创建特定的命名字段。Logstash 通过利用更多的数据量和种类加速您的洞察力。

        geoip {source=>"clientip"}

        由于过滤器是按顺序求值的,因此请确保该geoip部分位于grok配置文件的该部分之后,并且grok和geoip部分都嵌套在该filter部分中。Logstash可以动态统一来自不同来源的数据,并将数据规范化到您选择的目标中。

        1686562622214
        inputs:inputs 可以收集多种数据源的数据,下面是常见的数据源:

        • file:扫描磁盘中的文件数据,例如: 扫描日志文件。

          • Redis通常用作集中式 Logstash 安装中的“代理”,用于排队来自远程 Logstash “发件人”的 Logstash 事件。

            {COMBINEDAPACHELOG}使用以下模式从 Apache 日志中构造行:

            原信息对应新的字段名称
            IP 地址clientip
            用户 IDident
            用户认证信息auth
            时间戳timestamp
            HTTP 请求方法verb
            请求的 URLrequest
            HTTP 版本httpversion
            响应码response
            响应体大小bytes
            跳转来源referer(类似nginx中防盗链的referer
            客户端代理(浏览器)agent

            关于 grok 更多的用法请参考 grok 参考文档

            并且这里要想实现修改配置文件之后自动加载它,不能配置 inputstdin

            细心的你一定发现原来的 message字段仍然存在,假如你不需要它,可以使用 grok 中提供的常用选项之一: remove_field来移除这个字段。

            虽然 Logstash 最初在日志收集方面推动了创新,但它的能力远远超出了该用例。

            3. 安装和配置

            • 参考官方网站 Logstash

            • 每个版本的下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash
              image-20240314105652388

            HostnameCPU/硬盘IP
            logstash-server2c2g/20GB192.168.221.140

            1. 安装(两种方法)

            • 方法一:
            [root@logstash-server ~]# curl -OL https://artifacts.elastic.co/downloads/logstash/logstash-8.12.2-linux-x86_64.tar.gz		#可能会下很久,因为要访问国外的网站下载[root@logstash-server ~]# tar -xzf logstash-8.12.2-linux-x86_64.tar.gz  -C /usr/local/[root@logstash-server ~]# mv /usr/local/logstash-8.12.2/ /usr/local/logstash
            • 方法二:yum 安装
            #下载并安装公共签名密钥:[root@logstash-server ~]# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch#在 yum 目录中添加以下内容到一个带有后缀.repo的文件中[root@logstash-server ~]# vim /etc/yum.repos.d/logstash.repo[logstash-8.x]name=Elastic repository for8.x packagesbaseurl=https://artifacts.elastic.co/packages/8.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md#重新加载一下yum仓库[root@logstash-server ~]# yum clean all && yum makecache已加载插件:fastestmirror正在清理软件源: epel extras logstash-8.x os updatesCleaning up list of fastest mirrorsOther repos take up 36M of disk space (use --verbosefordetails)已加载插件:fastestmirrorDetermining fastest mirrorsepel                                                     |4.7kB     00:00extras                                                   |2.9kB     00:00logstash-8.x                                             |1.3kB     00:00os                                                       |3.6kB     00:00updates                                                  |2.9kB     00:00(1/19): epel/7/x86_64/group_gz                             |100kB   00:00(2/19): epel/7/x86_64/updateinfo                           |1.0MB   00:00(3/19): epel/7/x86_64/prestodelta                          |2.5kB   00:00(4/19): epel/7/x86_64/filelists_db                         |12MB   00:00(5/19): epel/7/x86_64/primary_db                           |7.0MB   00:00(6/19): epel/7/x86_64/other_db                             |3.4MB   00:00(7/19): extras/7/x86_64/filelists_db                       |303kB   00:00(8/19): extras/7/x86_64/primary_db                         |250kB   00:00(9/19): extras/7/x86_64/other_db                           |150kB   00:00(10/19): logstash-8.x/primary                              |369kB   00:01(11/19): os/7/x86_64/group_gz                              |153kB   00:00(12/19): os/7/x86_64/primary_db                            |6.1MB   00:00(13/19): logstash-8.x/other                                |47kB   00:00(14/19): os/7/x86_64/filelists_db                          |7.2MB   00:00(15/19): os/7/x86_64/other_db                              |2.6MB   00:00(16/19): updates/7/x86_64/primary_db                       |25MB   00:00(17/19): updates/7/x86_64/other_db                         |1.5MB   00:00(18/19): updates/7/x86_64/filelists_db                     |14MB   00:00(19/19): logstash-8.x/filelists                            |53MB   00:04logstash-8.x                                                          1086/1086logstash-8.x                                                          1086/1086logstash-8.x                                                          1086/1086元数据缓存已建立#开始安装[root@logstash-server ~]# yum -y install logstash已加载插件:fastestmirrorLoading mirror speeds from cached hostfile正在解决依赖关系-->正在检查事务--->软件包 logstash.x86_64.1.8.12.2-1 将被 安装-->解决依赖关系完成依赖关系解决================================================================================Package          架构           版本                源                    大小================================================================================正在安装: logstash         x86_64         1:8.12.2-1          logstash-8.x         333M事务概要================================================================================安装  1软件包总下载量:333 M安装大小:579 MDownloading packages:logstash-8.12.2-x86_64.rpm                                 |333MB   00:12Running transaction checkRunning transaction testTransaction testsucceededRunning transaction  正在安装    :1:logstash-8.12.2-1.x86_64                                  1/1  验证中      :1:logstash-8.12.2-1.x86_64                                  1/1已安装:  logstash.x86_64 1:8.12.2-1完毕!#注意:仓库不适用于仍使用RPM v3的较旧的基于rpm的发行版,比如CentOS5。

            grok 会根据你感兴趣的内容分配字段名称,并把这些内容和对应的字段名称进行绑定。

            inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。替换、

        • Filebeat:轻量级的文件数据采集器,可以取代file的能力。

          Logstash管道具有两个必需元素 input 和 output,以及一个可选元素 filter(过滤器)。

      • mutate :支持事件的变换,例如重命名、为了多样化的高级下游分析和可视化用例,清理和使所有数据平等化。简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

        目录

        • 1. Logstash 概述
        • 2. 工作原理
        • 3. 安装和配置
          • 1. 安装(两种方法)
          • 2. 测试运行
          • 3. 配置输入和输出
        • 4. 使用 Grok 过滤器插件解析 Web 日志
        • 5. 使用 Geoip 过滤器插件增强数据
        • 6. 配置接收 Beats 的输入

        1. Logstash 概述

        Logstash 是一个具有实时管道功能的开源数据收集引擎。过滤、

      Codecs:codecs 就是编码器,负责对数据进行序列号处理,主要就是 json 和文本两种编码器。这个可以通过给其配置不同的模式来实现。

    • Elastic Cloud 无服务器使用数据流和数据生命周期管理 (DLM),而不是索引生命周期管理(ILM)。

      Logstash 到 Elastic Cloud 无服务器
      使用 Logstash Elasticsearch 输出插件将数据发送到 Elastic Cloud 无服务器。

      并且这次无需在命令行上定义管道配置,而是在配置文件中定义管道。

      运行如下命令启动 Logstash

      [root@logstash-server logstash]# bin/logstash -f config/first-pipeline.confUsing bundled JDK: /usr/local/logstash/jdkOpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated inversion 9.0and will likely be removed ina future release.Sending Logstash logs to /usr/local/logstash/logs whichis now configured via log4j2.properties[2023-05-04T10:40:09,455][INFO ][logstash.runner          ]Log4j .......The stdin plugin is now waiting forinput:

      启动后复制如下内容到命令行中,并按下回车键

      #启动后复制如下内容到命令行中,并按下回车键83.149.9.216 - - [04/Jan/2015:05:13:42 +0000]"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1"200203023"http://semicomplete.com/presentations/logstash-monitorama-2013/""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

      将会看到如下输出

      {"@version"=>"1","message"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"","@timestamp"=>2024-03-14T03:41:48.984091377Z,"host"=>{"hostname"=>"logstash-server"},"event"=>{"original"=>"83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/imageskibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36""}}

      4. 使用 Grok 过滤器插件解析 Web 日志

      现在有了一个工作管道,但是日志消息的格式不是理想的。

    • statsd:将事件数据发送到 Statsd,这是一个“监听通过 UDP 发送的统计信息(如计数器和定时器)的服务,并将聚合数据发送到一个或多个可插拔的后端服务”的服务。

      本部分中,将创建一个 Logstash 管道,该管道使用标准输入来获取 Apache Web 日志作为输入,解析这些日志以从日志中创建特定的命名字段,然后将解析的数据输出到标准输出(屏幕上)。忽略 Elasticsearch 输出插件配置中的任何 ILM 设置,可能会导致错误。例如,geoip插件可以通过查找到IP地址,并从自己自带的数据库中找到地址对应的地理位置信息,然后将该位置信息添加到日志中。将该值设置为端口:443。

      2. 测试运行

      运行最基本的 Logstash 管道来测试 Logstash 安装。修改等

    • drop :完全丢弃事件

    • clone :克隆事件

    • geoip:添加关于 IP 地址的地理位置信息

    outputs:Logstatsh的最后一个处理节点,outputs负责将数据同步到目的地。

  • mysql :扫描 Mysql 的表数据

  • redis:从 redis 服务器中读取数据,使用 redis 通道和 redis 列表。忽略 Elasticsearch 输出插件配置中的任何基于用户的安全设置,可能会导致错误。常用的filters功能:

    • grok:logstash 中最常用的日志解释和结构化插件。