Skip to content

Logstash介绍

本文主要介绍如何使用Logstash。

1. Logstash介绍

1.1 概述

Logstash是开源的数据处理管道工具,主要负责从多个来源获取数据,处理数据,然后把处理后的数据输出到指定目的地。

因此,Logstash的数据处理三大阶段如下:

  • 输入(input):用于接收数据,例如读取文件、监听端口、消费 Kafka、接收 syslog 等;
  • 处理/过滤(filter):用于处理数据,例如解析日志、提取字段、改名、加时间戳、GeoIP、脱敏等;
  • 输出(output):用于输出数据,例如写入 Elasticsearch、Kafka、文件、Redis、MongoDB 等;

我们可以在管道(pipeline)中定义以上的流程,如下图所示:

basic logstash pipeline

1.2 执行模式

Logstash事件(event)处理管道协调输入(input)、过滤器(filter)、输出(output)三者的执行,具体如下:

  • 输入阶段 (Input Stage): 每个输入源都在自己的独立线程中运行。它们负责接收数据并将事件写入到一个中央队列 (Central Queue)

  • 中央队列: 这是一个中转站。默认情况下,它存在于内存中,但也可以配置在磁盘上。

  • 工作线程 (Worker Threads): Logstash 会运行多个管道工作线程(Pipeline Worker Threads)。

    • 每个工作线程从中央队列中取出一批 (Batch) 事件。

    • 线程带着这批事件依次经过“过滤器(filter)”进行处理,最后通过“输出(output)”发送出去。

经过上面的了解,队列的位置(内存或磁盘)和大小、批次大小 (Batch size)和工作线程数量都是可以手动配置的,这直接影响 Logstash 的处理性能

2. 基础案例

本小节主要介绍在MacOS系统上,Logstash的安装以及基础案例。

2.1 安装Logstash

下载地址:https://www.elastic.co/downloads/logstash

下载完成后,将压缩包解压,注意,解压后的目录路径不要包含冒号:。Logstash主目录包含的文件和文件夹如下:

名称描述
bin存放二进制脚本,包括logstash用于启动Logstash、logstash-plugin用于安装插件
config存放配置文件,包括:
1. logstash.yml:Logstash配置文件
2. pipelines.yml:用于定义多个管道
3. jvm.options:用于调整JVM参数
4. log4j.properties:用于调整Logstash的日志参数;
data由Logstash和插件使用的数据文件,例如队列持久化
logs默认的日志目录,刚下载时没有
vendor插件目录
jdk.app用于运行Logstash的JDK

Logstash是运行在JRE上的,因此,在下载时会自带JDK,并且在运行时默认使用自带的JDK。

如果要使用其他JDK,需要设置LS_JAVA_HOME环境变量。

2.2 第一个案例

当下载解压Logstash后,进入Logstash主目录,执行以下命令:

bash
bin/logstash -e 'input { stdin { } } output { stdout {} }'

-e参数表示在命令行中定义管道(用于快速测试),上述命令表示定义一个输入,从标准输入(键盘)获取,然后输出到标准输出(屏幕)。

待Logstash启动完成后,在命令行中随意输入内容,Logstash会获取该输入,并输出处理后的内容,结果如下:

image-20260110180911747

2.3 定义pipeline

我们可以将pipeline保存在文件中,例如pipelines/first_pipeline.conf

yaml
input{
    stdin { }
}

output{
    stdout { }
}

之后,使用如下命令运行Logstash:

bash
bin/logstash -f pipelines/first_pipeline.conf

-f参数使用指定的管道配置文件来运行Logstash。

3. 插件

本小节介绍Logstash中的插件。

3.1 input插件

input插件用于获取数据,插件列表:https://www.elastic.co/docs/reference/logstash/plugins/input-plugins。本小节以jdbc为例,介绍如何从MySQL数据库获取数据。

3.1.1 jdbc

jdbc输入插件用于从任意数据库获取数据到Logstash,我们可以使用cron表达式定时获取数据,或者只获取一次数据。SQL查询结果集中的每一行成为一个事件(event),结果集中的列成为事件中的字段(field)。

下面是jdbc的基础配置案例:

yaml
input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    jdbc_password => "xxx"
    
    parameters => { "favorite_artist" => "Beethoven" }
    statement => "SELECT * from songs where artist = :favorite_artist"
    schedule => "* * * * *"
  }
}
  • jdbc_driver_library:指定JDBC驱动所在的路径;
  • jdbc_driver_class:JDBC驱动类;
  • jdbc_connection_string:JDBC链接串;
  • jdbc_user:数据库用户;
  • jdbc_password:数据库用户密码;
  • parameters:参数,可用于SQL;
  • statement:SQL 语句,其中可以使用:param的方式来引用参数值;
  • schedule:cron表达式,表示SQL语句的执行频率;

除了以上基本参数,还有一些基本参数:

  • interval:与schedule功能相同,定义SQL语句的执行频率,例如10s,表示下一次执行是在上一次执行完成后10s才开始,可以理解为固定延迟

  • period:同样是定义SQL语句的执行频率,例如10s,表示下一次执行是在上一次执行开始后10s才开始,可以理解为固定频率

    关于interval与period的区别,假设有个SQL需要执行12s,interval和period都设置为10s

    interval:0s(SQL-1执行) -> 12s(SQL-1执行结束) -> 等待10s -> 22s(SQL2开始执行) -> 34s(SQL-2执行结束)

    period: 0s(SQL-1执行) -> 10s(SQL-2开始执行) -> 12s(SQL-1执行结束) -> 22s(SQL-2执行结束)

    可以发现,如果SQL执行事件过长或period设置的间隔过短,会造成上一个SQL还没结束,又继续执行下一个SQL,对数据库造成压力。

    但是,JDBC input 在调度层面是单 worker 线程的,也就是说,同一个 JDBC input 实例(同一个插件配置)永远只能同时运行一个查询执行。如果上一个查询还没结束,下一次触发(无论 period、interval 还是 schedule)都会被忽略/跳过(不会排队,也不会并发执行)。

  • statement_filepath:如果SQL太长或者过于繁琐,那么可以把SQL写在文件中,通过statement_filepath指定文件路径;

除了以上的基本配置,还有一些特定场景下的配置。

处理大结果集:即一条SQL返回的行数很大,例如几十万行

jdbc_fetch_size:在jdbc插件中,设置该值来限制一次将结果集中的多少数据拉回,该配置没有默认值,如果没有设置,那么使用JDBC驱动的默认值。

假设我们使用的驱动是:

xml
<dependency>
    <groupId>com.mysql</groupId>
    <artifactId>mysql-connector-j</artifactId>
    <version>9.4.0</version>
</dependency>

JDBC驱动默认的fetch size为0,并且,JDBC驱动默认是不开启流式返回的!

以下代码判断是否开启流式返回:

java
private boolean useServerFetch() throws SQLException {
    Lock connectionLock = checkClosed().getConnectionLock();
    connectionLock.lock();
    try {
        return this.session.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue() && this.query.getResultFetchSize() > 0
                && this.query.getResultType() == Type.FORWARD_ONLY;
    } finally {
        connectionLock.unlock();
    }
}

需要同时满足以下三个条件:

  • useCursorFetch 属性为 true: 这是在连接数据库的 URL 中配置的参数(例如 jdbc:mysql://host/db?useCursorFetch=true)。如果全局没开启这个开关,这里直接返回 false
  • fetchSize > 0 在 Java 代码中必须显式调用了 statement.setFetchSize(n),并且 n 大于 0。这告诉驱动程序每次“抓取”多少行。我们通过Logstash获取,显然不能设置Java代码,因此可以在数据库连接串中设置默认的fetchSize,例如jdbc:mysql://host/db?useCursorFetch=true&defaultFetchSize=1000
  • ResultSet 类型为 FORWARD_ONLY 查询结果必须是只读且只能向前滚动的。如果你设置了结果集可以随机跳转(比如 SCROLL_INSENSITIVE),则无法使用游标抓取。在Logstash的jdbc插件中,默认使用PreparedStatement,因此,结果集类型默认就是FORWARD_ONLY的;

综合以上条件,如果要在jdbc输入插件中设置fetch size,需要同时设置数据库连接串如下:

jdbc:mysql://host/db?useCursorFetch=true&defaultFetchSize=1000&useServerPrepStmts=true

除了使用fetch size方案来处理大结果集,我们也可以使用分页的方式,jdbc输入插件提供以下参数来分页:

  • jdbc_paging_enabled:是否启用分页功能,默认为false

  • jdbc_page_size:页大小,默认值为100000;

  • jdbc_paging_mode:分页模式,可选值为auto, explicit,默认值为auto

    • auto:当设置分页模式为auto时,我们的SQL语句在执行前,会先执行一个count查询,用于统计结果集行数,如果行数大于0,那么会自动分为多个分页查询(通过LIMIT指定),其中分页大小由jdbc_page_size指定;

    • explicit:当设置分页模式为explicit时,需要我们在SQL语句中手动设置分页,使用:size表示jdbc_page_size的值,:offset会根据:size的值自动计算。当查询返回的结果集行数不等于:size的值时,查询结束。例如:

      json
      input {
        jdbc {
          statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value LIMIT :size OFFSET :offset",
          jdbc_paging_enabled => true,
          jdbc_paging_mode => "explicit",
          jdbc_page_size => 100000
        }
      }

      什么时候需要使用explicit分页模式?

      1. auto分页模式下,存在性能问题;

      2. SQL 很复杂,分页不能简单使用count来计算结果集行数;

      3. Statement不是SQL,而是调用存储过程:

        json
        input {
          jdbc {
            statement => "CALL fetch_my_data(:sql_last_value, :offset, :size)",
            jdbc_paging_enabled => true,
            jdbc_paging_mode => "explicit",
            jdbc_page_size => 100000
          }
        }

使用PreparedStatement的场景

PreparedStatement = “提前准备好”的 SQL 语句 + 占位符(?) + 参数安全绑定

它的工作流程:

  1. 先把 SQL 语句“准备”(prepare)好,发送给数据库,例如:select * from stu where name = ?;
  2. 数据库会对这条 SQL 进行语法解析、语义分析、生成执行计划(这一步最耗时);
  3. 之后每次执行时,只需要传递不同的参数值,不需要重新解析 SQL;
  4. 参数通过 ? 占位符来标记,驱动程序会负责安全地把参数值绑定进去;

在MySQL命令行工具中,实现如下:

sql
-- 准备PreparedStatement, select_emp_by_no为名称
PREPARE select_emp_by_no from 'select * from employees where emp_no = ?';

-- 设置参数与执行
SET @empNo = 10001;
EXECUTE select_emp_by_no USING @empNo;

-- 清理PreparedStatement
DEALLOCATE PREPARE select_emp_by_no;

在 MySQL 驱动中,PreparedStatement 有两种底层实现:

  • ClientPreparedStatement(默认):客户端模拟预编译,参数替换发生在 JVM 内,实际上,发送给MySQL的语句还是完整的拼接后SQL;
  • ServerPreparedStatement(推荐生产):真正把 PREPARE 发到 MySQL 服务器执行(在数据库连接串中加 useServerPrepStmts=true参数开启);

在jdbc输入插件中,也支持PreparedStatement场景,具体配置如下:

  • use_prepared_statements:是否使用PreparedStatement,默认值为false
  • prepared_statement_bind_values:绑定的参数值;
  • prepared_statement_name:发送给数据库的PreparedStatement名称;

案例如下:

json
input {
  jdbc {
    statement => "SELECT * FROM mgd.seq_sequence WHERE _sequence_key > ? AND _sequence_key < ? + ? ORDER BY _sequence_key ASC"
    prepared_statement_bind_values => [1, 2, 4]
    prepared_statement_name => "foobar"
    use_prepared_statements => true
    // ...
  }
}

增量更新场景:增量更新是指只同步自上次运行以来新增或变更的数据。主要由以下配置实现:

  • record_last_run:是否记录上次运行的追踪列最大值,默认值为true,当开启后,追踪值保存在last_run_metadata_path指定的文件中;
  • last_run_metadata_path:用于保存追踪列的值,默认值为"<path.data>/plugins/inputs/jdbc/logstash_jdbc_last_run"
  • use_column_value:是否开启“基于某一列的值”进行增量追踪,默认值为false
  • tracking_column:指定追踪列;
  • tracking_column_type:追踪列的类型,可选值为 numeric, timestamp,默认值为numeric

如果开启了record_last_run,那么可以在Statement中使用:sql_last_value来表示追踪列的值。

  • 如果没有开启use_column_value,那么追踪列默认为SQL开始执行的时间。当SQL开始执行时,会从last_run_metadata_path中获取上次的执行时间(默认为1970-01-01 00:00:00),赋值给:sql_last_value,在SQL中使用;SQL执行结束后,会将本次SQL开始执行的时间点写入last_run_metadata_path中,供下次使用;
  • 如果开启了use_column_value,那么写入的值就不是SQL开始执行的时间,而是自己执行的追踪列;

下面的案例演示了基于修改时间的增量同步:

json
input {
  jdbc {
    statement => """
      SELECT * FROM users 
      WHERE updated_at > :sql_last_value 
      AND updated_at < NOW()  -- 防止未来时间
      ORDER BY updated_at ASC
    """

    # === 增量核心配置 ===
    use_column_value => true
    tracking_column => "updated_at"   # 用时间戳列
		tracking_column_type => "timestamp"
    record_last_run => true
    last_run_metadata_path => "/xxx/users_last_time.yml"
  }
}

时区转换问题:Elasticsearch和Logstash是用UTC时间来存储时间的,如果数据库时间不是UTC时间,那么就需要进行时区转换。

  • jdbc_default_timezone:设置数据库的时区,默认没有值,那么数据库时间会被当作 JVM 的默认时区(即 Logstash 进程运行所在机器的时区)来解释,除了设置该项,也可以在数据库连接串中使用serverTimezone=Asia/Shanghai设置;
  • plugin_timezone:jdbc插件时间,可选值为utc, local,默认值为utc

当设置以上参数时,时区转换会发生在两个地方:

  1. 当从数据库查询时间数据后,认为该时间是jdbc_default_timezone时区的时间,会将其转换为UTC时区的时间;
  2. 当SQL中使用了:sql_last_value并且类型为时间戳时,由于存储:sql_last_value都是以UTC时间存储的,如果设置了plugin_timezone,那么会将存储的:sql_last_value转换为plugin_timezone指定的时区,再将其转换为jdbc_default_timezone指定的时区进行查询;综上所诉,plugin_timezone没鸟用!

例如,现在数据库中存在以下数据:

image-20260111154825187

然后配置如下:

json
input{
    jdbc { 
        jdbc_driver_library => "/Applications/apache-maven-3.8.8/repo/com/mysql/mysql-connector-j/9.4.0/mysql-connector-j-9.4.0.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/demo?useSSL=false&useCurlFetch=true&defaultFetchSize=1000"
        jdbc_user => "root"
        jdbc_password => "xxx"
        statement => "SELECT * FROM stu where created_at < :sql_last_value"
        jdbc_default_timezone => "Asia/Shanghai"
        plugin_timezone => "utc"
        # schedule => "* * * * *"
    }
}

output{
    stdout { }
}

结果如下:

json
{
            "id" => 1,
    "created_at" => 2026-01-11T07:10:38.000Z,
    "@timestamp" => 2026-01-11T07:49:53.823236Z,
      "@version" => "1",
          "name" => "zs"
}

可以发现在屏幕显示时created_at被转换为UTC时间了。

并且查看/Users/xxx/projects/logstash-9.2.3/data/plugins/inputs/jdbc/logstash_jdbc_last_run路径中保存的sql_last_value的值,结果是UTC时间,如下:

txt
--- !ruby/object:DateTime '2026-01-11 07:58:24.022610000 Z'

在日志中,可以看到SQL如下:

txt
[2026-01-11T15:58:24,038][INFO ][logstash.inputs.jdbc     ][main][15ede8e7c160889a4fc8a17d11d96d3c88321d1484314654d4e4f04aae104c20] (0.002305s) SELECT * FROM stu where created_at < '2026-01-11 15:57:46'

3.1.2 input公共配置

每个input插件都有其独特的配置,也有一些公共配置,如下:

配置类型是否必需
add_fieldhashNo
codeccodecNo
enable_metricbooleanNo
idstringNo
tagsarrayNo
typestringNo
  • add_field:向事件中添加字段,例如:

    json
    add_field => {
      "field1" => "value1"
      "field2" => "value2"
    }
  • codec:在数据到达 input 插件之前(或从 input 插件读取出来之前),直接对原始输入数据进行解码/编码,而不需要在 pipeline 中额外加一个 filter 插件来处理。

  • enable_metric:在 Logstash 中,input 插件默认会记录所有可用的 metrics(如 events_in, events_out, duration_in_millis 等),这些指标会通过 Logstash 的监控 API、Prometheus exporter 或 xpack monitoring 暴露出来。如果想针对某个特定 input 实例禁用 metrics 收集(例如性能敏感或减少日志噪声),可以设置 enable_metrics => false

  • id:为input插件实例设置ID,如果没有手动设置,Logstash会自动产生一个;推荐手动设置,有利于监控和日志查找;

  • tags:为事件添加任意数量的标签;

  • type:为事件添加type字段;

3.2 filter插件

3.2.1 mutate

mutate用来对数据字段执行一系列操作,例如重命名、替换、更新等。

操作命令作用描述示例配置常见用途
add_field添加新字段
如果事件中有同名的字段,那么新添加的值和原值会成为数组
add_field => { "env" => "prod" }添加标签、环境变量、组合字段
remove_field删除字段(支持数组删除多个)remove_field => [ "tmp_field", "debug" ]清理无用字段、减少存储大小
rename重命名字段
如果目标字段名称已存在,那么目标字段旧值会被替换
rename => { "old_name" => "new_name" }规范化字段名(如 src_ip → source_ip)
replace替换字段值(覆盖原有值)
如果目标字段不存在,那么会添加新字段
replace => { "status" => "%{http_status}" }合并字段、格式化
update只在字段存在时更新值(不存在不创建)update => { "count" => "%{count} + 1" }安全更新,避免意外新增字段
convert转换字段类型(string → integer/float/boolean/date 等)convert => { "age" => "integer" "price" => "float" "active" => "boolean" }类型规范化(ES 映射要求)
gsub对字段进行正则替换(类似 Ruby 的 gsub)gsub => [ "message", "[^a-zA-Z0-9]", "_" ]清理特殊字符、脱敏
split把字符串按分隔符拆分成数组split => { "tags" => "," }把逗号分隔的标签拆成数组
join把数组字段拼接成字符串join => { "tags" => "," }反向操作 split
lowercase / uppercase转小写/大写lowercase => [ "username" ]统一大小写
strip去除字段首尾空格strip => [ "description" ]清理脏数据
coerce设置默认值
如果一个字段存在,但是值为null,设置为默认值
coerce => {"field1" => "default_value" }

TIP

注意,add_fieldremove_field是公共配置,不仅在mutate插件中可以使用,在其他filter插件中也可以使用。

mutate 插件内部会按配置的顺序逐个执行,但同一个 mutate 块内的操作顺序不一定严格保证(例如 add_fieldremove_field 可能并行)。

官方推荐:如果多个操作之间有依赖关系(比如先 renameconvert,再 add_field),建议拆成多个独立的 mutate 块

filter {
  mutate { rename => { "old_ip" => "new_ip" } }      # 第一步:重命名
  mutate { convert => { "new_ip" => "string" } }     # 第二步:转换类型
  mutate { add_field => { "ip_type" => "ipv4" } }    # 第三步:添加新字段
}

这样能严格保证顺序,避免意外。

3.2.2 ruby

ruby插件用于执行ruby代码,主要有两种方式定义ruby代码:

  • code:直接在插件里通过code属性定义ruby代码,例如:

    json
    filter {
      ruby {
      	code => "具体代码"
    	}
    }
  • path:通过path属性定义ruby脚本文件,在文件中编写代码,例如:

    json
    filter {
      ruby {
        # Cancel 90% of events
        path => "/etc/logstash/drop_percentage.rb"
        script_params => { "percentage" => 0.9 }
      }
    }

    script_params用于传递参数;

    在脚本文件.rb中,需要编写以下两个方法:

    • register(params):可选的,用于接收script_params传递的参数;
    • filter(event):必须的,用于接收事件(event),并且返回事件数组;

    例如:

    ruby
    # 接收参数
    def register(params)
    	@drop_percentage = params["percentage"]
    end
    
    # 定义处理方法
    def filter(event)
    	if rand >= @drop_percentage
    		return [event]
    	else
        # 如果返回空数组,表示丢弃该事件
    		return []
    	end
    end

例如,下面的ruby插件用于将_分割的字段名称改为小驼峰命名:

json
filter{
    mutate {
        add_field => { 
            "my_id" => "test" 
            "[content][type_id]" => "example"
            "[content][user_name]" => "sample_user"
        }
    }

    ruby {
        code => '
            def snake_to_camel(snake)
                snake.split("_").map.with_index { |w, i| i == 0 ? w : w.capitalize }.join("")
            end

            def deep_convert(obj)
                case obj
                when Hash
                  obj.transform_keys { |k|
                      (k.is_a?(String) && k =~ /_/ && !k.start_with?("@")) ? snake_to_camel(k) : k
                  }.transform_values { |v| deep_convert(v) }
                  when Array
                  obj.map { |v| deep_convert(v) }
                else
                	obj
                end
            end

            # 主逻辑
            original = event.to_hash.dup
            converted = deep_convert(original)

            # 清空旧字段(保留 @ 开头的元数据)
            original.keys.each do |k|
                event.remove(k) unless k.start_with?("@")
            end

            # 写入新字段
            converted.each do |k, v|
                event.set(k, v) unless k.start_with?("@")
            end
        '
  }
}

TIP

如果要在add_field中添加嵌套对象,字段命名应为[content][type_id],而不是content.type_id

也可以在ruby中添加字段和嵌套对象:

txt
ruby {
    code => '
        event.set("content", {
            "type_id"   => "example",
            "user_name" => "sample_user"
        })
        event.set("my_id", "test")
    '
}

3.2.3 jdbc_streaming

jdbc_streaming插件会执行一段SQL,并且把结果保存在事件里以target指定的字段中(以数组形式)。并且,jdbc_streaming会把结果集保存在LRU缓存中,并带有过期时间。

jdbc_streamingjdbc input插件工作原理类似,不同的配置如下:

  • target:将结果集保存在哪个字段(以数组形式),该配置没有默认值,必须设置,如果没有设置,会报错!

  • parameters:指定参数,类型为hash,在SQL中,可以使用:param_name的形式使用参数,例如:

    json
    jdbc_streaming {
      statement => "select * from demo where name = :name or id = :id"
      parameters => {
      	"name" => "zs"
      	"id" => "[id]"
    	}
    }

    在SQL中:name会被替换为zs[id]表示字段中id字段值。

  • default_hash:当 SQL 查询没有返回任何结果时,为事件提供一组默认的“保底”字段。default_hash的类型为hash, Key(键名) 必须与 SQL 语句查出来的 Column Name(列名) 完全一致。例如:

    json
    jdbc_streaming {
        statement => "
            select c.`name` as course, g.grade as grade
            from grade g
            left join course c on g.course_id = c.id
            where g.stu_id= 2
        "
        target => "grades"
        default_hash => {
            "course" => "unknown"
            "grade" => 0
        }
    }

    结果为:

    json
    {
                "id" => 1,
        "@timestamp" => 2026-01-13T13:02:30.208725Z,
              "tags" => [
            [0] "_jdbcstreamingdefaultsused"
        ],
              "name" => "zs",
          "@version" => "1",
        "created_at" => 2026-01-11T07:10:38.000Z,
            "grades" => [
            [0] {
                "course" => "unknown",
                 "grade" => 0
            }
        ]
    }
  • use_cache:是否使用LRU缓存,默认值为true

  • cache_size:缓存条目数,默认值为500,当达到缓存条目数后,最近最少使用的条目会被替换;

  • cache_expiration:缓存过期时间,默认值为5,时间单位为秒(s),也可以使用小数,例如0.25,代表250毫秒。

    假设某次SQL发生错误时,没有返回数据,那么会导致缓存中保存着空数据,此时会从default_hash中获取值填充到事件中,直至缓存过期,否则之后的同样参数的SQL查询,都会使用default_hash中的默认数据。

  • jdbc_streaming中,同样支持PreparedStatement,以MySQL为例,连接串设置如下:

    txt
    jdbc:mysql://localhost:3306/mydatabase?cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048&useServerPrepStmts=true

例如,原始事件来源于jdbc查询学生表,然后根据学生ID查询成绩:

json
filter {
    jdbc_streaming {
        jdbc_driver_library => "/Applications/apache-maven-3.8.8/repo/com/mysql/mysql-connector-j/9.4.0/mysql-connector-j-9.4.0.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/demo?useSSL=false&useCurlFetch=true&defaultFetchSize=1000&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048&useServerPrepStmts=true"
        jdbc_user => "root"
        jdbc_password => "xxx"
        statement => "
            select c.`name` as course, g.grade as grade
            from grade g
            left join course c on g.course_id = c.id
            where g.stu_id= ?
        "
        target => "grades"
        use_prepared_statements => true
        prepared_statement_name => "select_grades_by_stu_id"
        prepared_statement_bind_values => [ "[id]" ]  # [id]表示事件中的id字段
        default_hash => {
            "course" => "unknown"
            "grade" => 0
        }
    }
}

3.2.4 grok

grok插件用于将任意的文本按照正则表达式解析为结构化的数据。

dissect也可以将文本解析为结构化的数据,两者的区别在于:dissect没有使用正则表达式,更快。

grok基础

grok使用了Oniguruma作为正则表达式实现库,在grok中,如果要使用正则表达式匹配模式并且命名,语法如下:

txt
(?<field_name>pattern)

例如,下面的正则表达式将匹配10-11位包含0-9和A-F的文本,并且命名为queue_id

txt
(?<queue_id>[0-9A-F]{10,11})

例如,如果我们的事件中包含message字段,并且内容如下:

txt
"message": "190239889A 990239889B"

grok插件中实现如下:

json
grok {
    match => { "message" => "(?<queue_id>[0-9A-F]{10,11})" }
}

match表示要匹配的字段及其模式,后续详解

经过解析后,事件内容如下:

json
{
  "message": "190239889A 990239889B",
  "queue_id": "190239889A"
}

message文本中第一个匹配正则表达式[0-9A-F]{10,11}的文本被提取出来,作为queue_id字段的值。

假设message的值为123456789ABC,那么queue_id的值为123456789AB

除了直接在match中使用正则表达式,我们可以把正则表达式命名,单独放在模式文件中,并且在grok插件中引用。

首先,在运行bin/logstash时,终端所在的当前工作目录下创建名称为patterns的目录。

然后,在该目录下创建一个名为my_pattern的文件(文件名称不重要),内容如下:

txt
# contents of ./patterns/my_pattern:
QUEUEID [0-9A-F]{10,11}

自定义正则表达式名称语法规则如下:

txt
模式名称 正则表达式

之后,在grok中引用该模式,并且使用命名模式的方式匹配文本:

json
grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{QUEUEID:queue_id}" }
}
  • pattern_dir:表示自定义模式文件所在目录;

    .表示在运行bin/logstash时,终端所在的当前工作目录;

  • %{QUEUEID:queue_id}:表示匹配规则,语法如下

    txt
    %{SYNTAX:SEMANTIC:DATA_TYPE}
    • SYNTAX:正则表达式名称,可以是自己定义的(例如QUEUEID),也可以是grok内置的名称,完整列表:https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/legacy/grok-patterns
    • SEMANTIC:表示匹配的内容应该放到哪个字段,就是目标字段名称,例如queue_id
    • DATA_TYPE:默认情况下,grok匹配的内容保存为字符串,如果向将匹配的内容转换为其他数据类型,可以指明DATA_TYPE,现在只支持intfloat数据类型;

以上就是grok的基础使用,现在讲解grok的配置:

  • match:用于定义要匹配的字段以及匹配模式,格式为hash,例如:

    txt
    grok {
      match => {
        "message" => "Duration: %{NUMBER:duration} Speed: %{NUMBER:speed}"
        "extra" => "%{WORD:other_info}"
      }
    }

    也可以针对一个字段,分多次进行匹配:

    txt
    filter {
      grok {
        match => { 
          "message" => [
              "%{IP:client_ip} %{WORD:method}",  # 模式 A
              "%{IP:client_ip}"                  # 模式 B
          ]
        }
        # 默认 break_on_match => true
        break_on_match => false
      }
    }
  • break_on_match:默认情况下,break_on_match 是开启的。 如果在 match 中定义了一个包含多个模式(Patterns)的数组,Logstash 会按顺序从上往下匹配。只要其中一个模式匹配成功,Logstash 就会直接跳出该 grok 块,不再尝试后面的模式。

    如果将此参数设为 false,Logstash 会尝试运行数组中的每一个模式,无论前面的有没有匹配成功。

    例如,需要从同一行日志中提取多组互不相关的结构:

    txt
    filter {
      grok {
        match => { "message" => [
            "ID: %{NUMBER:id}",
            "User: %{WORD:user}"
        ]}
        break_on_match => false
      }
    }
  • pattern_definitions:用于定义模式名称和正则表达式,类型为hash,使用这种方式定义的命名正则表达式只能在该grok插件中使用,例如:

    json
    grok {
        pattern_definitions => { 
            "QUEUEID" => "[0-9A-Z]{10,11}" 
        }
        match => { "message" => [
            "%{QUEUEID:queue_id}", 
            "%{QUEUEID:queue_id_2}" 
            ]
        }
        break_on_match => false
    }
  • patterns_dir:用于定义命名的正则表达式所在目录,建议使用绝对路径

  • target:提取后的内容字段放在哪里,没有默认值,表示直接放在事件根路径下;

  • overwrite:表示要覆盖的字段,类型为数组,例如:

    txt
    grok {
        pattern_definitions => { 
            "QUEUEID" => "[0-9A-Z]{10,11}" 
        }
        match => { "message" => [
            "%{QUEUEID:message}"
            ]
        }
        break_on_match => false
        overwrite => [ "message" ]
    }

    解析完成后,原message字段内容会被解析后的值覆盖重写。

    如果要覆盖嵌套对象里的字段,需要使用对象引用语法:

    txt
    grok {
      match => { "somefield" => "%{NUMBER} %{GREEDYDATA:[nested][field][test]}" }
      overwrite => [ "[nested][field][test]" ]
    }

3.3 output插件

output插件用于将事件(event)写出到指定的目的地。

3.3.1 elasticsearch

3.3.1.1 基本设置

elasticsearch插件用于将事件存入ES中,基本属性如下:

  • hosts:指定ES的连接地址,默认值为[//127.0.0.1],可以指定多个ES节点地址,插件将会使用负载均衡策略来发送请求;
  • user:连接ES的用户名;
  • password:连接ES的用户密码;
  • index:指定写入索引名称;
  • document_id:指定文档ID,默认会自动生成,如果想使用自己的ID,可以获取事件中的字段,例如%{[id]}
  • action:指定对文档执行什么操作,可选值如下:
    • index:索引文档(如果文档 ID 不存在,就新建;如果文档 ID 已经存在,就删除旧文档并存入新文档);
    • create:新建文档(如果文档 ID 已经存在,操作会报错);
    • update:通过文档ID更新文档;
    • delete:通过文档ID删除文档;
  • doc_as_upsert:是否启用upsert,即文档ID存在就部分更新(而不是覆盖整个文档),不存在就插入,默认值为false,如果要启用upsert,应该将action设置为update

下面的例子说明了最基本的插件使用,将事件保存到指定的索引中,并使用业务主键ID,启用upsert:

json
elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "your_strong_password123"
    index => "stu_info"
    document_id => "%{id}"
    action => "update"
    doc_as_upsert => true

    data_stream => false
    manage_template => false
    ilm_enabled => false
}

使用以上的配置,可以发现我们禁用了索引模板管理,如果在ES中没有自定义索引模板,那么ES将使用字段自动映射,结果可能不是我们想要的。

3.3.1.2 索引模板

elasticsearch插件中可以启用索引模板管理,自定义索引模板,相关配置如下:

  • manage_template:是否启用索引模板管理;
  • template_name:索引模板名称;
  • template:定义了索引模板内容文件所在路径;
  • template_overwrite:是否使用Logstash的索引模板覆盖ES中同名的索引模板,默认值为false,如果启用该项配置,那么每次 Logstash 启动并建立连接时,它会无视 ES 中是否已存在该模板,直接用它自己加载的模板文件(通过 template 参数指定的,或者内置的)去覆盖 ES 里的旧模板。

下面是自定义索引模板的例子:

json
elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "your_strong_password123"
    index => "stu_info-01"
    document_id => "%{id}"
    action => "update"
    doc_as_upsert => true

    manage_template => true
    template_name => "stu_info_template"
    template => "/Users/xxx/projects/logstash-9.2.3/config/my_stu_index_template.json"
    template_overwrite => true

    data_stream => false
    ilm_enabled => false
}
json
{
    "index_patterns": ["stu_info-*"],
    "template": {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "queue_id": {
                    "type": "keyword"
                },
                "id": {
                    "type": "integer"
                },
                "name": {
                    "type": "keyword"
                },
                "message": {
                    "type": "text"
                },
                "created_at": {
                    "type": "date"
                },
                "@timestamp": {
                    "type": "date"
                }
            }
        },
        "aliases": {
            "stu": {}
        }
    }
}

可以发现在ES中,成功按照配置创建了索引模板,并且索引也成功应用了模板。

3.3.1.3 ILM

elasticsearch插件中,也可以启用ILM管理,相关设置如下:

  • ilm_enabled:是否启用ILM管理;
  • ilm_policy:ILM策略的名称,需要我们提前在ES中自定义ILM策略,然后在Logstash中引用;
  • ilm_pattern:ILM管理的索引名称模式,默认值为{now/d}-000001,模式必须以数字结尾;
  • ilm_rollover_alias:用于ILM管理的别名,当指定了ilm_rollover_alias,那么index的值将被忽略,最终,生成的索引名称为ilm_rollover_alias+-+ilm_pattern`;

例如,假设在ES中已经有名为my-filebeat-test-ilm-policy的ILM策略(当索引文档数超过2时,会发生Rollover),配置如下:

json
elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "your_strong_password123"
    index => "stu_info-01"
    document_id => "%{id}"
    action => "update"
    doc_as_upsert => true

    manage_template => true
    template_name => "stu_info_template"
    template => "/Users/xxx/projects/logstash-9.2.3/pipelines/my_stu_index_template.json"
    template_overwrite => true

    ilm_enabled => true
    ilm_policy => "my-filebeat-test-ilm-policy"
    ilm_rollover_alias => "stu_info_ilm"
    ilm_pattern => "000001"

    data_stream => false

}

会发现索引模板stu_info_template的配置中添加了ILM相关配置:

image-20260117140503675

并且生成了stu_info_ilm-000001的索引:

image-20260117140629565

向索引中写入更多数据,会发现ILM正常发挥作用:

image-20260117141200790

3.3.1.4 Data Stream

elasticsearch插件中也可以使用Data Stream,主要配置如下:

  • data_stream:是否启用Data Stream,如果设置为true,那么必须启用ecs_compatibility

  • ecs_compatibility:是否启用ECS,可选值为disabledv1v8

  • data_stream_type:数据流类型,可选值为 logs(日志)、metrics(指标)、synthetics(探测)或 traces(追踪),默认值为logs

  • data_stream_dataset:数据集,默认值为generic

  • data_stream_namespace:命名空间,默认值为default

    IMPORTANT

    最终数据流的名称会由data_stream_type+data_stream_dataset+data_stream_namespace构成;

  • data_stream_auto_routing:是否通过事件中指定的字段来将事件路由到相应的数据流中,即会根据事件中字段%{[data_stream][type]}-%{[data_stream][dataset]}-%{[data_stream][namespace]}的值,来生成数据流名称,如果事件中缺少相关字段,将会使用对应的配置:data_stream_type, data_stream_datasetdata_stream_namespace;默认值为true

  • data_stream_sync_fields:如果事件中缺少相关字段(%{[data_stream][type]}-%{[data_stream][dataset]}-%{[data_stream][namespace]}),是否自动添加,默认值为true

TIP

如果要启用Data Stream,需要在索引模板中启用Data Stream,并且匹配的模式需要满足数据流名称。

并且,需要在ES侧管理索引模板,在Logstash中不能再管理索引模板。

在索引模板中引用ILM策略名称。

索引模板不能再加别名。

索引模板调整如下:

json
{
  "name": "stu_info_template",
  "index_template": {
    "index_patterns": [
      "logs-stu_info-test-*"
    ],
    "template": {
      "settings": {
        "index": {
          "number_of_shards": "1",
          "number_of_replicas": "1",
          "index.lifecycle.name": "my-filebeat-test-ilm-policy"
        }
      },
      "mappings": {
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "name": {
            "type": "keyword"
          },
          "created_at": {
            "type": "date"
          },
          "id": {
            "type": "integer"
          },
          "message": {
            "type": "text"
          },
          "queue_id": {
            "type": "keyword"
          }
        }
      }
    },
    "composed_of": [],
    "priority": 500,
    "data_stream": {
      "hidden": false,
      "allow_custom_routing": false
    }
  }
}

TIP

如果启用了Data Stream,那么需要把action设置为create,并且不能有ILM和索引模板的配置:

json
elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "your_strong_password123"
    document_id => "%{id}"
    action => "create"

    data_stream => true
    ecs_compatibility => "v8"
    data_stream_type => "logs"
    data_stream_dataset => "stu_info"
    data_stream_namespace => "test-data-stream"

}

结果如下:

image-20260117151400028

image-20260117151307849

image-20260117151319742

3.3.1.5 其他设置

除了以上的应用场景,elasticsearch还有一些其他设置。

HTTP压缩

  • compression_level:发送给ES的HTTP压缩等级,默认值为1,可选值为0-9的整数,值越大表示压缩率越高,但CPU耗时也更高,值为0表示禁用压缩;

重试与死信队列

elasticsearch插件使用bulk API 来导入数据,当整个 bulk 请求因为网络问题HTTP 状态码不是 200 时(例如 502、503、504、429、413 等),插件会无限重试(indefinitely retry)。没有最大重试次数限制,这是为了尽量保证数据不丢失。重试间隔采用指数退避 + 封顶策略:

  • retry_initial_interval 开始(默认 2 秒),可修改;
  • 每次失败后等待时间翻倍;
  • 最高等待时间不超过 retry_max_interval(默认 64 秒),可修改;

对于单条文档级别(Bulk Response 中的每条 item)的重试 / 处理策略,会根据返回码不同,采取不同的处理方式:

Elasticsearch 返回的 item 状态码当前处理方式(8.x 主流版本)是否可配置备注
网络异常、超时放入重试队列 → 指数退避重试是(retry_initial_interval 等)通常会无限重试
429 (Too Many Requests)放入重试队列 → 指数退避重试集群过载最常见场景
503放入重试队列 → 指数退避重试集群临时不可用
409 (Version Conflict)直接丢弃 + 打印 warning可通过 retry_on_conflict 加大 ES 端重试次数插件不再负责重试 409,推荐在 ES 端重试(性能更好)
400 (Bad Request)送入 Dead Letter Queue(如果开启)可通过 dlq_custom_codes 扩展通常是 mapping 冲突、非法字段等不可恢复错误
404 (Not Found)送入 Dead Letter Queue(如果开启)可通过 dlq_custom_codes 扩展常见于 update/delete 操作目标文档不存在
其他非预期错误(如 500)大多数情况下 → 无限重试整个 bulk除非明确配置进 DLQ

为了确保消息不丢失,可以配置死信队列(Dead Letter Queue,DLQ),指定哪些状态码的事件需要放入死信队列,配置如下:

txt
dlq_custom_codes => [400, 404, 409, 413]

4. 高级设置

本小节介绍有关Logstash的相关概念与配置。

4.1 keystore

在之前的配置文件中,MySQL、ES的用户密码是直接明文写在文件中的,显然这是不安全的,因此,Logstash提供了keystore来存储敏感信息,即以键值对的形式存储敏感信息(敏感信息会加密存储在keystore中),然后在配置文件中通过键的方式获取值,这样就避免了敏感信息的泄露。

为了使用keystore,首先需要创建,在bin目录下有logstash-keystore的脚本,使用如下命令创建keystore:

bash
bin/logstash-keystore create

允许keystore无密码保护,之后就成功创建了,会提示在以下路径成功创建了keystore:

txt
Created Logstash keystore at /Users/xxx/projects/logstash-9.2.3/config/logstash.keystore

之后,我们就可以向keystore中添加键值对了,语法如下:

bash
bin/logstash-keystore add MYSQL_PWD ES_PWD

运行上述命令后,会依次提示输入对应键的值。

之后,在配置文件中,就可以使用如下语法访问对应键的敏感信息了:

txt
${KEY}

例如:

json
input{
    jdbc { 
        // ...
        jdbc_password => "${MYSQL_PWD}"
    }
}

如果要查看keystore中的所有键,使用如下命令:

bash
bin/logstash-keystore list

如果要删除keystore中的对应键值对,使用如下命令:

bash
bin/logstash-keystore remove ES_PWD

4.2 访问事件字段

在Logstash管道中,通常包含三个阶段:输入(input)阶段产生事件,过滤(filter)阶段修改事件,输出(output)阶段传送事件。

在过滤和输出阶段,我们会有访问事件字段的需求,Logstash也提供了相应的方法来访问对应的字段,基本语法为:

txt
[fieldname]

如果访问的是根部字段,可以省略外面的中括号,但是不建议。

如果访问嵌套字段,那么可以使用如下方法:

txt
[fieldname][sub_fieldname]

例如,假设现在有如下事件:

json
{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html",
  "response": {
    "status": 200,
    "bytes": 52353
  },
  "ua": {
    "os": "Windows 7"
  }
}

要想访问ua.os字段,使用[ua][os]语法,要想访问ip字段,使用[ip]语法(虽然也可以直接使用ip,但是不推荐)。

sprintf 格式

在 Logstash 中,sprintf 格式是一种非常有用的语法,它的核心作用是:把字段的内容提取出来,并插入到另一个字符串中。

其基本语法格式是:

txt
%{[field_name]}

基于上面的事件,可以使用%{[response][status]}方式获取到值:200。

sprintf格式强大之处在于可以使用java时间格式的方式访问当前时间,语法如下:

txt
%{{yyyy-MM-dd HH:mm:ss}}

上述语法会输出当前日期:2026-01-17。

也可以使用如下特殊值,来输出当前时间:

txt
%{{TIME_NOW}}

sprintf还支持废弃的joda时间格式,语法如下:

txt
%{+FORMAT}

例如,在filter阶段添加以下字段:

json
mutate {
    add_field => { 
        # java 格式
        "now1" => "%{{yyyy-MM-dd HH:mm:ss}}"
        # 特殊值 TIME_NOW
        "now2" => "%{{TIME_NOW}}" 
        # joda 格式
        "now3" => "%{+YYYY-MM-dd HH:mm:ss}"
    }
}

输出如下:

json
{
    "now3" => "2026-01-17 12:52:00",
    "now2" => "2026-01-17T12:52:00.689616Z",
    "now1" => "2026-01-17 12:52:00"
}

4.3 条件逻辑

在有些时候,需要根据不同条件执行不同的动作,例如,当事件中的日志等级为debug时,需要丢弃该事件。

Logstash也提供的条件逻辑,语法如下:

ruby
if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

EXPRESSION是布尔表达式,返回truefalse,可以使用的操作符如下:

  • 比较运算符:==, !=, <, >, <=, >=

  • 正则表达式:=~, !~

  • 包含运算符:in,not in

  • 逻辑运算符:and, or, nand, xor,!

例如,当日志等级为debug时,丢弃该事件:

json
filter {
  if [loglevel] == "debug" {
    drop { }
  }
}

在以下情况中,if [foo]返回false

  • [foo]不存在事件中;
  • [foo]存在,但是值为nullfalse

@metadata字段

@metadata是一个特殊的字段,并不会出现在最终的事件中,因此,我们可以将其作为一个临时存放数据的地方,而不用清除字段:

json
input { stdin { } }

filter {
  grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
  date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
  stdout { codec => rubydebug }
}

4.4 多管道配置

假设现在我们需要读取程序日志以及从MySQL同步数据到ES,如果按照现有的知识,有以下两种方案:

  • 开启两个Logstash实例,一个读取程序日志,一个同步MySQL数据;
  • 开启一个Logstash实例,在一个管道文件中写入多个输入源,并且在过滤和输出阶段使用条件逻辑,执行不同的动作;

以上两种方案都有缺点,Logstash提供了第三种方案:多管道配置。

编写多个管道文件(例如,一个管道用于读取程序日志,一个管道用于同步MySQL数据),然后,在pipelines.yml中配置管道,格式如下:

yaml
- pipeline.id: my-pipeline_1
  path.config: "/etc/path/to/p1.config"
  pipeline.workers: 3
- pipeline.id: my-other-pipeline
  path.config: "/etc/different/path/p2.cfg"
  queue.type: persisted
  • pipeline.id:管道ID,需唯一;
  • path.config:管道配置文件地址;
  • pipeline.workers:用于该管道的处理线程数;
  • queue.type:用于该管道的队列类型;

上面两个配置是可选的,如果没有配置,则会使用logstash.yml中的配置。

当启动Logstash实例(没有参数)时,会从pipelines.yml中读取管道配置,并且启动所有管道。

如果启动Logstash实例时有参数(-e或-f),那么Logstash会忽略pipelines.yml文件

TIP

当使用多管道时,需要注意,Logstash 默认会为每个管道分配等同于 CPU 核心数 的 Worker 线程。

如果有 8 核 CPU,开了 4 个管道,如果不手动限制,Logstash 会尝试启动 4×8=32 个线程。这会导致 CPU 频繁进行上下文切换,性能反而下降。因此,需要在 pipelines.yml 中手动指定每个管道的 pipeline.workers

4.5 队列与数据持久化

当输入(input)插件获取事件后,会把事件发送到队列中,供worker获取进行后续的过滤和输出。Logstash提供两种队列:内存队列和持久化队列。

4.5.1 内存队列

默认情况下,Logstash使用内存有限队列在输入和过滤阶段之间缓存事件。

内存队列优点在于易于配置、管理,更高的吞吐量。

但是,内存队列具有容易丢失数据的缺点,以及难以应对数据爆发增长。

内存队列的大小由两个参数决定:

  • pipeline.batch.size:输入插件一次性发送到队列的事件数量,默认值为125;
  • pipelines.workers:用于从队列中获取事件进行过滤、输出处理的worker线程数,默认值为主机CPU核心数;

内存队列的大小是以上两个参数的乘积。

4.5.2 持久化队列

由于内存队列具有容易丢失数据的缺点,Logstash提供了持久化队列(Persistent Queue,简称PQ),会把事件保存到磁盘空间中。

默认情况下,持久化队列是关闭状态的,如果要启用持久化队列,在logstash.yml中配置如下:

yaml
queue.type: persisted
# 默认值为内存队列
# queue.type: memory

如果是想针对单个管道配置持久化队列,在pipelines.yml中配置。

当启用后,会在path.queue配置中指定的目录下创建持久化队列相关的文件,默认值为path.data/queue

持久化队列的工作流程如下:

  1. 入队 (Ingest):输入插件接收到数据,立即将其写入磁盘上的“头页”(Head Page)。
  2. 落盘同步 (Fsync):根据配置(如 queue.checkpoint.writes),Logstash 强制将数据刷入物理磁盘。此时,即便掉电,数据也已安全。
  3. 处理 (Processing):Filter 和 Output 线程从磁盘读取数据进行加工。此时磁盘上的数据并不会立即删除。
  4. 确认与清理 (ACK & Checkpoint):只有当 Output 确认数据已成功发送(如 ES 返回 200),Logstash 才会标记该数据为“已处理”。当一个页面文件内的所有数据都被确认后,该页面文件才会被删除。

持久化队列在磁盘上由三类文件组成:

  • 数据页 (Page Files): 实际存储数据的地方。文件名类似 page.1, page.2。每个文件大小固定(默认 64MB)。

    数据页分为两种:头页(Head Page)和尾页(Tail Page)。

    头页是当前唯一活跃的可写文件,所有新进来的事件都会按顺序排在文件末尾(Append Only)。

    当头页达到 queue.page_capacity(默认 64MB)时,它就会被“封存”,身份转变为尾页(此时会创建新的头页)。一旦变成尾页,里面的内容就再也不会被修改。Filter 和 Output 线程会从最早的尾页开始读取数据。只有当一个尾页里所有的事件都被 Output 确认(ACK)后,这个文件才会被整体删除。

  • 检查点文件 (Checkpoint Files): 这是持久化队列的灵魂。它记录了:

    • 哪些数据已经写入了。
    • 哪些数据已经被 Output 确认处理完了。
    • 当前的读写位置在哪里。
  • 锁文件 (.lock): 确保只有一个 Logstash 实例能操作这个队列目录。

根据以上关于持久化队列的原理,现在理解相关配置就变得容易了(注意,以下配置是针对单个管道的):

  • queue.page_capacity:页大小,默认值为64MB;
  • queue.max_bytes:队列的总大小限制,默认值为1GB;
  • queue.checkpoint.writes:当写入多少事件后,调用一次fsnyc进行落盘同步,默认值为1024,即写入1024个事件后,调用fsync,这些事件才会真正保存到磁盘中,此时就算突然断电也不会丢失数据。换句话说,如果此时写了500个事件,如果突然发生断电,那么这500个事件没有落盘,仍然会丢失。如果要确保每个事件都不会丢失,设置queue.checkpoint.writes: 1
  • queue.checkpoint.acks:当输出端完成输出后,会向队列发送确认(ACK)信号,队列在接收到多少个确认信号后,才会更新检查点文件(checkpoint),以更新哪些事件被处理了。默认值为1024。如果队列接收到了500个事件的确认信号,此时突然断电,由于这500个事件的确认信号还没有被写入检查点文件,Logstash重启后,worker线程仍然会读取这500个事件进行过滤、输出处理,所以,有可能会造成事件的重复发送,需要输出端进行去重。如果要保证没有事件被重复发送,设置queue.checkpoint.acks: 1

所以,持久化队列仍然有一些限制:

  1. 如果突然发生断电,那么没有被checkpoint的数据仍然会丢失或重复发送;
  2. 如果数据发送端不遵循请求-响应(request-response)模式,那么无法使用持久化队列。因为发送端发完就认为成功(无 ACK 机制),Logstash 即使收到但崩溃前未处理完,发送端不会重发;

4.5.3 死信队列

默认情况下,如果一个事件由于映射错误(Mapping Error)、条件逻辑错误等问题,导致这些事件处理失败,无法发送到输出端,Logstash会把这些事件丢弃,如果发生这种情况,仍然有丢失数据的问题。

Logstash提供了死信队列机制,用于处理这种情况。

死信队列目前只适用于elasticsearch 输出插件以及条件逻辑错误(if ... else if ... else中的比较错误,例如将字符串与数字进行比较)。

死信队列工作图如下:

Diagram showing pipeline reading from the dead letter queue

当出现上述问题时,Logstash会发送一条死信消息到死信队列,该死信消息包括以下内容:

  • 原始事件;
  • 描述死信产生原因的元数据;
  • 插件信息;
  • 消息进入死信队列的时间戳;

在Logstash中也提供了dead_letter_queue输入插件,用于读取死信队列中的事件,参考:https://www.elastic.co/docs/reference/logstash/dead-letter-queues#processing-dlq-events

默认情况下,死信队列处于关闭状态,如果要启用,在logstash.yml配置文件中使用如下配置:

yaml
dead_letter_queue.enable: true

默认情况下,死信队列相关文件会保存在path.data/dead_letter_queue目录,并且按管道分组,如果要修改该目录,使用如下配置:

yaml
path.dead_letter_queue: "path/to/data/dead_letter_queue"