Skip to content

Data Stream In ES

本文介绍ES中的Data Stream(数据流)。

1. 什么是Data Stream

Data Stream (数据流)是基于索引的抽象层,对于存储只追加(append-only)的时序数据有优化。Data Stream底层是多个索引,但是对于用户来说,只需要使用一个名称来访问这些索引,这个名称就是数据流名称。Data Stream 非常适合用于日志、事件、指标以及其他不断产生的数据。

对于Data Stream,我们可以发生索引(index)和搜索(search)请求,Data Stream会自动将请求路由到底层的索引。我们可以使用ILM(Index Lifecycle Management)来自动管理底层索引,例如,可以使用ILM来自动将底层的旧索引移动到便宜的硬件中进行存储,并且删除不需要的索引。我们也可以使用Data Stream Lifecycle来自动管理生命周期。

Data Stream底层是多个隐藏的、自动创建的索引:

data streams diagram

Data Stream 需要一个匹配的索引模板,该模板用于定义底层索引的字段映射和索引设置,以及定义使用的ILM策略。

每个在Data Stream中的文档必须包含一个@timestamp字段,数据类型为datedate_nanos。如果索引模板没有定义@timestamp字段,ES会自动添加@timestamp字段,并且设置为date数据类型。

同一个索引模板可以被多个Data Stream使用,如果一个索引模板被Data Stream使用,那么就不能删除该模板。

底层的索引名称是自动产生的,并且是唯一的,索引命名规则如下:

txt
.ds-<data-stream>-<yyyy.MM.dd>-<generation>
  • <data-stream>:数据流的名称;
  • <yyyy.MM.dd>:索引创建时的年月日;
  • <generation>:索引序号,六位数字,从000001开始;

对于Data Stream来说,底层的索引会有一个写索引(write index),该索引也就是最新创建的索引:

data streams index request

当发送写操作(例如index)时,Data Stream会把该请求路由到写索引。

在使用ILM来管理底层索引时,如果写索引(write index)发生了Rollover,那么会创建一个新索引,新索引成为写索引。我们也可以手动触发Rollover,创建新索引。

当发送读操作(例如search)时,Data Stream会把该请求路由到所有的底层索引:

data streams search request

2. 如何创建Data Stream

可以遵循以下步骤,创建Data Stream。

2.1 创建ILM策略

首先使用以下API创建ILM策略,与创建其他ILM策略一样:

json
PUT _ilm/policy/my_data_stream_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "set_priority": {
            "priority": 100
          }, 
          "rollover": {
            "max_docs": 2
          }
        }
      },
      "warm":{
        "min_age": "5m", 
        "actions": {
          "readonly":{},
          "allocate": {
            "number_of_replicas": 0
          },
          "shrink": {
            "number_of_shards": 1
          }
        } 
      }
    }
  }
}

2.2 创建索引模板

使用如下API创建索引模板:

json
PUT _index_template/my_data_stream_template
{
  "index_patterns": ["data-stream-test-*"],
  "priority": 500,
  "data_stream": {},
  "template": {
    "mappings":{
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "message": {
          "type": "text"
        }
      }
    },
    "settings": {
      "number_of_shards": 2,
      "number_of_replicas": 1,
      "index.lifecycle.name": "my_data_stream_ilm_policy"
    }
  }
}
  • "priority":推荐设置优先级大于200,以避免内置索引模板的冲突;
  • "data_stream":启用Data Stream;
  • @timestamp :设置@timestamp字段,并且数据类型为date
  • index.lifecycle.name:设置索引生命周期管理策略名称;

2.3 创建Data Stream

使用如下API创建Data Stream:

txt
PUT _data_stream/<data-stream-name>
  • <data-stream-name>需要匹配索引模板中模式;

例如:

txt
PUT _data_stream/data-stream-test-01

image-20260101185654152

也可以直接如下API,向Data Stream中添加文档,会自动创建Data Stream:

json
// 批量插入文档
PUT data-stream-test-01/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06 16:21:15", "message": "123" }
{ "create":{ } }
{ "@timestamp": "2099-05-06 16:25:42", "message": "456" }

// 插入一个文档
POST data-stream-test-01/_doc
{
  "@timestamp": "2026-01-01 18:51:15",
  "message": "test,first try of data stream"
}

3. 使用Data Stream

本小节介绍如何使用Data Stream。

3.1 插入文档

向Data Stream中插入单个文档:

json
POST /<data-stream-name>/_doc/
{
  "@timestamp": "2099-03-08 11:06:07",
  "message": "Login successful"
}

向Data Stream中同时插入多个文档:

json
PUT /<data-stream-name>/_bulk
{ "create":{ } }
{ "@timestamp": "2099-05-06 16:21:15", "message": "123" }
{ "create":{ } }
{ "@timestamp": "2099-05-06 16:25:42", "message": "456" }

3.2 搜索文档

搜索Data Stream与搜索Index相同,使用_search API ,如下:

json
POST /<data-stream-name>/_search
{
  "query": {
    "match_all": {}
  }
}

3.3 更新文档

可以使用如下API更新Data Stream中符合条件的文档:

json
POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "message": "test"
    }
  },
  "script": {
    "source": "ctx._source.message = params.new_msg",
    "params": {
      "new_msg": "XgdX0NoX"
    }
  }
}

3.4 删除文档

可以使用如下API删除Data Stream中符合条件的文档:

json
POST /<data-stream-name>/_delete_by_query
{
  "query": {
    "range": {
      "@timestamp": {
        "gt": "2025-10-01 00:00:00",
        "lte": "now",
        "format": "yyyy-MM-dd HH:mm:ss",
        "time_zone": "Asia/Shanghai"
      }
    }
  }
}

3.5 手动Rollover

当Data Stream的写索引(write index)满足ILM策略中Rollover条件时,会自动Rollover,我们也可以通过如下API手动触发Rollover:

txt
POST /<data-stream-name>/_rollover/

上述API会立即触发Rollover。

返回会显示新索引的名称,如下:

json
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : ".ds-data-stream-test-01-2026.01.01-000001",
  "new_index" : ".ds-data-stream-test-01-2026.01.02-000002",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : { }
}

注意,即使当前写索引(write index)是空的(0 文档、0 数据),也会创建新的索引,并将写权限切换到新索引。

我们可以在Rollover中添加条件,以避免空的索引被Rollover,如下:

json
POST /data-stream-test-01/_rollover
{
  "conditions": {
    "max_docs": 1
  }
}

详情查看_rollover API:https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-rollover

3.6 获取统计数据

可以使用如下API获取关于数据流的统计数据:

txt
GET /_data_stream/_stats?human=true

如果要获取某一个数据流的统计数据,则可以加上数据流名称:

txt
GET /_data_stream/<data-stream-name>/_stats?human=true

例如:

json
{
  "_shards" : {
    "total" : 8,
    "successful" : 8,
    "failed" : 0
  },
  "data_stream_count" : 2,
  "backing_indices" : 2,
  "total_store_size" : "15.9kb",
  "total_store_size_bytes" : 16373,
  "data_streams" : [
    {
      "data_stream" : "data-stream-test-02",
      "backing_indices" : 1,
      "store_size" : "7.8kb",
      "store_size_bytes" : 8003,
      "maximum_timestamp" : 1767294000000
    },
    {
      "data_stream" : "data-stream-test-01",
      "backing_indices" : 1,
      "store_size" : "8.1kb",
      "store_size_bytes" : 8370,
      "maximum_timestamp" : 1767293475000
    }
  ]
}
  • data_streams:返回所有的Data Stream,每一个元素(数据流)包含以下内容:
    • data_stream:数据流的名称;
    • backing_indices:数据流底层的索引个数;
    • store_sizestore_size_bytes:数据流占据的存储空间;
    • maximum_timestamp:是指数据流中所有文档的 @timestamp 字段的最大值(即最新的时间戳),以毫秒级的 Unix epoch 时间表示

4. TSDS

4.1 TSDS概述

TSDS,全称Time Series Data Stream,是为了存储指标时序数据优化了的Data Stream。

首先明确什么是指标时序数据(Metrics Time Series Data)?

指标时序数据是指随时间变化的数值型指标数据,每条记录都带有时间戳(timestamp)和一个或多个数值指标(metrics),通常用于监控系统性能、业务指标或物理设备状态。例如,每分钟监测某电脑的CPU使用率,每小时某传感器监控某地的温度和湿度等。

在TSDS中,文档字段分为:

  • 维度字段(dimension field):代表数据的属性或标签,回答“这是哪台机器?哪个地区?哪个服务?”等问题。

    例如,假设有传感器测量温度和湿度,那么传感器ID sensor_id和地点location就是维度字段;

    要标识某个字段为维度字段,那么在Mapping中,将该字段的time_series_dimension设置为true,以下数据类型可以作为维度字段:keywordipbyteshortintegerlongunsigned_longboolean

  • 指标字段(metric field):代表需要度量、统计的数值,回答“CPU 用多少?请求多少?延迟多久?”等问题。

    例如,假设有传感器测量温度和湿度,那么温度temperature和湿度humidity就是指标字段;

    要标识某个字段为指标字段,那么在Mapping中,设置该字段的time_series_metric属性,该属性的有效值如下:

    • counter:表示指标是单调递增的或重置为0,例如,统计某个进程抛出的异常,那么异常数就是单调递增的,当该进程重启时,异常数重置为0;
    • gauge:单表一个任意的数值,可以增加或减少,例如温度、湿度等;

    以下数据类型可以作为指标字段:所有的数字类型、aggregate_metric_double(当降采样时内部使用);

  • 元数据字段(metadata field)_tsid元数据字段是由ES自动产生的,通过维度字段产生,如果维度字段值相同,那么元数据字段值也相同;

4.2 创建TSDS

创建TSDS与创建Data Stream步骤一样,只不过设置有些许不同,具体如下。

4.2.1 创建ILM策略

与Data Stream相同,如下:

json
PUT _ilm/policy/my_tsds_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "set_priority": {
            "priority": 100
          }, 
          "rollover": {
            "max_docs": 2
          }
        }
      },
      "warm":{
        "min_age": "5m", 
        "actions": {
          "readonly":{},
          "allocate": {
            "number_of_replicas": 0
          },
          "shrink": {
            "number_of_shards": 1
          }
        } 
      }
    }
  }
}

4.2.2 创建索引模板

创建TSDS的索引模板与创建普通的Data Stream的索引模板有些许设置的不同:

json
PUT _index_template/my-weather-sensor-index-template
{
  "index_patterns": ["metrics-weather_sensors-*"],
  "data_stream": { },
  "template": {
    "settings": {
      "index.mode": "time_series",
      "index.lifecycle.name": "my_tsds_ilm_policy"
    },
    "mappings": {
      "properties": {
        "sensor_id": {
          "type": "keyword",
          "time_series_dimension": true
        },
        "location": {
          "type": "keyword",
          "time_series_dimension": true
        },
        "temperature": {
          "type": "half_float",
          "time_series_metric": "gauge"
        },
        "humidity": {
          "type": "half_float",
          "time_series_metric": "gauge"
        },
        "@timestamp": {
          "type": "date"
        }
      }
    }
  },
  "priority": 500
}
  • data_stream:启用Data Stream;
  • index.mode:值设置为time_series,表示启用TSDS;
  • time_series_dimension:设置为true,表示该字段为维度字段;
  • time_series_metric:表示该字段为指标字段;

4.2.3 创建TSDS

与创建普通索引相同,例如:

txt
PUT _data_stream/metrics-weather_sensors-01

创建完成后,就可以像使用Data Stream一样,使用TSDS了。

4.3 降采样

4.3.1 降采样的概念

如果我们以每秒钟的频率采集某地的温度和湿度,那么一天下来,总共会采集到60×60×24=86400 条数据,即86400个文档。

但是,对于数月或一年以前的数据,我们并不关心这个地方每秒的温度和湿度,可能只关系这个地方每个小时的温度和湿度,那么经过降采样,一天的数据量只有24条。从86400到24,显然大大降低了存储空间。这就是降采样。

降采样是针对时间序列数据(尤其是指标类时序数据,如 CPU 使用率、请求量等)的一种数据聚合和压缩技术:将高频率的原始数据按照固定时间间隔(如每 5 秒采集一次)聚合为低频率的数据(如每 1 小时一个值),从而大幅减少存储空间,同时保留长期趋势分析的能力。

在ES中,降采样通过索引生命周期动作实现,原理如下:

  • 降采样是在索引维度进行的;

  • 当ES在索引维度上进行降采样时,会将元数据字段_tdis相同,且落在同一个降采样时间间隔内的原始文档划分为一个组,然后将该组的文档映射为一个文档;

    例如,原始数据每 1 秒采集一次,降采样到 hourly(每小时),那么所有落在同一个小时区间内的原始文档会被汇总成一条新的降采样文档。

  • 对于维度字段,由于同组文档的维度字段相同,所以新的降采样文档的维度字段就是原始文档的维度字段;

  • 对于指标字段,按照time_series_metric不同值,处理如下:

    • counter:分组内最后一条文档的值;
    • gauge:计算聚合值,包括min、max、sum、value_count,这些值被打包存储到一个 aggregate_metric_double 类型的字段中;
  • 对于其他字段,存储分组内最后一条文档的值;

  • 使用降采样后的新索引替换原始索引,之后删除原始索引;

4.3.2 降采样的配置

要配置降采样,在ILM策略中增加downsample动作配置。

参考文档:https://www.elastic.co/docs/reference/elasticsearch/index-lifecycle-actions/ilm-downsample

downsample可用周期:hot、warm、cold

如果在hot周期配置downsample,必须和rollover一起使用。

降采样生成的新索引名称为downsample-<original-index-name>-<random-uuid>

属性如下:

  • fixed_interval:必需,降采样的时间间隔,例如1h1d
  • force_merge_index:降采样生成的新索引是否合并为一个段segment,默认值为true
  • sampling_method:降采样的方法,可选值为aggregate(默认值)和last_value

4.3.3 降采样案例

在ILM策略的hot周期配置降采样动作:

json
PUT _ilm/policy/my_tsds_ilm_policy_2
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "set_priority": {
            "priority": 100
          }, 
          "rollover": {
            "max_docs": 5
          },
          "downsample":{
            "fixed_interval": "1h"
          }
        }
      },
      "warm":{
        "min_age": "2m", 
        "actions": {
          "readonly":{},
          "allocate": {
            "number_of_replicas": 0
          },
          "shrink": {
            "number_of_shards": 1
          }
        } 
      }
    }
  }
}

然后往TSDS中添加文档,以达到Rollover条件。

使用以下API查看索引的生命周期:

txt
GET .ds-downsample-metrics-weather_sensors-01-2026.01.02-000001/_ilm/explain

会发现动作卡在了downsample了,提示消息如下:

txt
The [index.time_series.end_time] setting for index [.ds-downsample-metrics-weather_sensors-01-2026.01.02-000001] is [1767347675000]. Waiting until the index's time series end time lapses before proceeding with action [downsample] as the index can still accept writes.

也就是说,这个索引的index.time_series.end_time值为1767347675000(北京时间2026-01-02 17:54:35),必须要过了这个时间点,downsample才能继续执行,因为该索引还有可能接受新的文档,以防止漏文档,具体查看Time-bound Indices。

等待过了end_time后,会发现降采样完成执行,降采样后的文档如下:

json
{
  "@timestamp": "2026-01-02T07:15:12.000Z",
  "_doc_count": 6,
  "sensor_id": "A01",
  "temperature": {
    "min": 0,
    "max": 39,
    "sum": 82,
    "value_count": 6
  },
  "humidity": 2
}

4.4 Time-bound Indices

在普通的Data Stream中,写操作只会路由到最新的写索引(write index),不同于普通的Data Stream,TSDS使用了Time-bound Indices(时间绑定索引)。在TSDS的底层索引中,会有两个属性:

当新文档插入到TSDS时,TSDS会根据新文档的时间戳@timestamp,路由到符合该时间戳的索引,即使该索引不是写索引(write index),示意图如下:

time bound indices

当创建TSDS时,第一个索引可以接受的时间戳范围计算如下:

  • index.time_series.start_time:公式为now - index.look_back_time,其中now表示当前时间,index.look_back_time为在模板中的设置值,默认为2h(2小时);
  • index.time_series.end_time:公式为now + index.look_ahead_timeindex_back_time也是可以设置的,默认为30m(30分钟);

当发生Rollover后,新生成的索引可以接受的时间戳范围计算如下:

  • index.time_series.start_time:为旧索引的index.time_series.end_time时间,以确保索引之间不出现时间空隙;
  • index.time_series.end_time:公式为now + index.look_ahead_time

如果时间到了index.time_series.end_time,还没有发生Rollover,那么ES 会自动将 end_time 向前延长一个 look_ahead_time 的长度。

5. Data Stream Lifecycle

Data Stream Lifecycle (DSL) 是 Elasticsearch 推出的一种更简单、更自动化的时序数据管理方式。与传统的 ILM(Index Lifecycle Management)相比,它将管理逻辑从“索引级别”提升到了“数据流级别”,大大简化了配置。

虽然配置更简单,但是相比于ILM,DSL的功能也更简单,可配置的主要包含两个功能:

  • 降采样:针对旧数据,可以降低分辨率;
  • 设置数据保留时间:当数据超过了保留时间,ES会把这些数据删除;

5.1 DSL的工作流程

DSL 是一个周期性轮询的任务(由 data_streams.lifecycle.poll_interval参数控制,在配置文件中设置,默认值为5m),它对 Data Stream 中的每个后备索引(Backing Index)执行以下操作:

  1. 检查:检查Data Stream是否配置了DSL,如果没有配置,则跳过;
  2. Rollover:DSL 会根据集群层面的默认规则(cluster.lifecycle.default.rollover)自动判断当前“写入索引”是否过大或过旧。如果满足条件,会自动创建一个新索引,旧索引变为“非写入索引”。

cluster.lifecycle.default.rollover在elasticsearch.yml配置文件中设置,默认值如下:

yaml
cluster.lifecycle.default.rollover: max_age=auto,max_primary_shard_size=50gb,min_docs=1,max_primary_shard_docs=200000000

以上设置表示,只要满足以下任一条件,Data Stream就会发生Rollover:

  • 任一主分片大小达到50gb;

  • 任一主分片包含2亿个文档;

  • 或者根据设置的数据保留时间( data_retention )自动计算Rollover频率;

    并且需要满足最小的文档数为1,即不会Rollover空索引。

  1. 长尾合并 (Tail Merge):这是 DSL 的一大特性。当索引不再接收新数据后,它不会像 ILM 那样进行耗时的 force_merge,而是只针对那些零碎的小分段(Small Segments)进行合并。
  2. 降采样(Downsampling):如果配置了 downsampling,DSL 会在此阶段将历史明细数据聚合为低分辨率的数据;
  3. 保留策略 (Retention):如果索引的“生命周期”超过了设置的数据保留时间,那么该索引将被物理删除;

5.2 配置DSL

如果要配置DSL,那么需要在索引模板中配置,例如:

json
PUT _index_template/my-index-template-dsl
{
  "index_patterns": ["dsl-test"],
  "data_stream": { },
  "priority": 500,
  "template": {
    "mappings": {
      "properties": {
        "@timestamp":{
          "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
        },
        "sensor":{
          "type": "keyword",
          "time_series_dimension": true
        },
        "temperature":{
          "type": "half_float",
          "time_series_metric": "gauge"
        }
      }
    },
    "settings": {
      "index.mode": "time_series",  // 由于开启了降采样,必须启用时间序列模式
      "index.lifecycle.prefer_ilm": false  // 强制 DSL 优先
    },
    "lifecycle": {
      "enabled": true,
      "data_retention": "30d",
      "downsampling": [
        {
          "after": "1d",          // rollover 后 1 天开始第一轮
          "fixed_interval": "1h"  // 聚合为每 1 小时一条
        },
        {
          "after": "7d",          // 第一轮完成后(或 rollover 后 7 天)
          "fixed_interval": "1d"   // 聚合为每 1 天一条
        }
      ]
    }
  }
}
  • index.lifecycle.prefer_ilm:此处Data Stream是由DSL管理的,所以把prefer_ilm设置为false,表示DSL优先级更高;

  • lifecycle.enabled:启用DSL;

  • lifecycle.data_retention:设置数据的保留时间,这里设置为30天,即发生Rollover(不是创建时间)的30天后,数据就会被删除了;

    索引的生命周期,是从发生Rollover后开始计算的,这样能保证索引中的每个文档都大于设置的数据保留时间;如果以索引的创建时间开始计算,那么可能新插入的文档,就会因为索引创建时间到达了数据保留时间而被删除。

  • lifecycle.downsampling:设置降采样规则,数组元素属性如下:

    • after:Rollover发生多少时间后,触发该条降采样规则;
    • fixed_interval:多长时间间隔的数据聚合成一条数据;

当创建了索引模板后,就可以创建Data Stream:

txt
PUT _data_stream/dsl-test

之后就可以使用该DSL 了。

参考资料

[1] https://www.elastic.co/docs/manage-data/data-store/data-streams

[2] https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-rollover

[3] https://www.elastic.co/docs/reference/elasticsearch/configuration-reference/data-stream-lifecycle-settings#cluster-lifecycle-default-rollover