--- title: "数据流" date: 2026-01-01 lastmod: 2026-01-01 description: "管理数据流,用于时间序列数据的自动索引管理" tags: ["数据流", "索引管理", "时间序列"] summary: "数据流是一种用于时间序列数据的索引管理机制,自动处理索引的创建和滚动。 API # 创建数据流 # PUT /_data_stream/{name} 获取数据流 # GET /_data_stream GET /_data_stream/{name} 删除数据流 # DELETE /_data_stream/{name} 获取数据流统计 # GET /_data_stream/_stats GET /_data_stream/{name}/_stats API 的作用 # 数据流是专为时间序列数据设计的索引管理方案。 数据流的特点 # 特点 描述 自动索引管理 自动创建和滚动后备索引 简化查询 通过单一名称查询多个时间分片 优化存储 自动管理数据生命周期 Rollover 根据大小、文档数或时间自动滚动 数据流 vs 普通索引 # 特性 数据流 普通索引 时间序列优化 是 否 自动滚动 是 否 查询简化 是 否 只读 是(后备索引) 否 API 的参数 # PUT /_data_stream/{name} # 路由参数 # 参数 类型 是否必填 描述 {name} 字符串 必需 数据流名称 Query String 参数 # 参数 类型 是否必填 默认值 描述 master_timeout 时间值 否 - 等待主节点响应的超时时间 GET /_data_stream # 路由参数 # 参数 类型 是否必填 描述 {name} 字符串 否 数据流名称,支持通配符 * Query String 参数 # 参数 类型 是否必填 默认值 描述 expand_wildcards 枚举值 否 all 通配符展开方式:all、open、closed、hidden、none DELETE /_data_stream/{name} # 路由参数 # 参数 类型 是否必填 描述 {name} 字符串 必需 数据流名称,支持逗号分隔或 * Query String 参数 # 参数 类型 是否必填 默认值 描述 master_timeout 时间值 否 - 等待主节点响应的超时时间 GET /_data_stream/_stats # 路由参数 # 参数 类型 是否必填 描述 {name} 字符串 否 数据流名称 Query String 参数 # 参数 类型 是否必填 默认值 描述 forbid_closed_indices 布尔值 否 true 是否禁止关闭的索引 示例 # 创建数据流 # PUT /_data_stream/logs 响应:" --- 数据流是一种用于时间序列数据的索引管理机制,自动处理索引的创建和滚动。 ## API ### 创建数据流 ``` PUT /_data_stream/{name} ``` ### 获取数据流 ``` GET /_data_stream GET /_data_stream/{name} ``` ### 删除数据流 ``` DELETE /_data_stream/{name} ``` ### 获取数据流统计 ``` GET /_data_stream/_stats GET /_data_stream/{name}/_stats ``` ## API 的作用 数据流是专为时间序列数据设计的索引管理方案。 ### 数据流的特点 | 特点 | 描述 | |------|------| | **自动索引管理** | 自动创建和滚动后备索引 | | **简化查询** | 通过单一名称查询多个时间分片 | | **优化存储** | 自动管理数据生命周期 | | **Rollover** | 根据大小、文档数或时间自动滚动 | ### 数据流 vs 普通索引 | 特性 | 数据流 | 普通索引 | |------|--------|----------| | 时间序列优化 | 是 | 否 | | 自动滚动 | 是 | 否 | | 查询简化 | 是 | 否 | | 只读 | 是(后备索引) | 否 | ## API 的参数 ### PUT /_data_stream/{name} #### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{name}` | 字符串 | 必需 | 数据流名称 | #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | 时间值 | 否 | - | 等待主节点响应的超时时间 | ### GET /_data_stream #### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{name}` | 字符串 | 否 | 数据流名称,支持通配符 `*` | #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `expand_wildcards` | 枚举值 | 否 | all | 通配符展开方式:`all`、`open`、`closed`、`hidden`、`none` | ### DELETE /_data_stream/{name} #### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{name}` | 字符串 | 必需 | 数据流名称,支持逗号分隔或 `*` | #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `master_timeout` | 时间值 | 否 | - | 等待主节点响应的超时时间 | ### GET /_data_stream/_stats #### 路由参数 | 参数 | 类型 | 是否必填 | 描述 | |------|------|----------|------| | `{name}` | 字符串 | 否 | 数据流名称 | #### Query String 参数 | 参数 | 类型 | 是否必填 | 默认值 | 描述 | |------|------|----------|--------|------| | `forbid_closed_indices` | 布尔值 | 否 | true | 是否禁止关闭的索引 | ## 示例 ### 创建数据流 ```bash PUT /_data_stream/logs ``` **响应:** ```json { "acknowledged": true } ``` **注意**:创建数据流前需要有匹配的索引模板,模板中需设置 `data_stream: true`。 ### 创建带模板的数据流 ```bash # 1. 创建索引模板 PUT /_index_template/logs_template { "index_patterns": ["logs*"], "data_stream": {}, "priority": 100, "template": { "settings": { "number_of_shards": 1, "number_of_replicas": 1 } } } # 2. 创建数据流 PUT /_data_stream/logs ``` ### 获取所有数据流 ```bash GET /_data_stream ``` **响应示例:** ```json { "data_streams": [ { "name": "logs", "timestamp_field": { "name": "@timestamp" }, "indices": ["logs-000001"], "generation": 1, "status": "GREEN", "template": "logs_template" } ] } ``` ### 获取特定数据流 ```bash GET /_data_stream/logs ``` ### 使用通配符获取数据流 ```bash GET /_data_stream/logs* ``` ### 删除数据流 ```bash DELETE /_data_stream/logs ``` **响应:** ```json { "acknowledged": true } ``` ### 删除多个数据流 ```bash DELETE /_data_stream/logs,metrics ``` ### 删除所有数据流 ```bash DELETE /_data_stream/* ``` ### 获取数据流统计 ```bash GET /_data_stream/_stats ``` **响应示例:** ```json { "data_stream_count": 2, "backing_indices": 4, "total_store_size": "10mb", "data_streams": [ { "data_stream": "logs", "backing_indices": 2, "store_size": "5mb", "maximum_timestamp": 1707043200000 }, { "data_stream": "metrics", "backing_indices": 2, "store_size": "5mb", "maximum_timestamp": 1707043200000 } ] } ``` ### 获取特定数据流统计 ```bash GET /_data_stream/logs/_stats ``` ### 向数据流写入数据 ```bash POST /logs/_doc { "@timestamp": "2026-02-04T10:00:00Z", "message": "Hello, data stream!", "level": "INFO" } ``` ### 查询数据流 ```bash GET /logs/_search { "query": { "match_all": {} } } ``` ## 响应字段说明 ### 数据流信息 | 字段 | 描述 | |------|------| | `name` | 数据流名称 | | `timestamp_field` | 时间戳字段配置 | | `indices` | 后备索引列表 | | `generation` | 代数(滚动次数) | | `status` | 健康状态 | | `template` | 关联的索引模板 | ### 数据流统计 | 字段 | 描述 | |------|------| | `data_stream_count` | 数据流总数 | | `backing_indices` | 后备索引总数 | | `total_store_size` | 总存储大小 | | `data_streams` | 各数据流统计详情 | | `maximum_timestamp` | 最大时间戳 | ## 使用场景 ### 场景 1:日志管理 ```bash # 创建日志数据流 PUT /_index_template/logs_template { "index_patterns": ["logs-*"], "data_stream": {}, "template": { "settings": { "lifecycle.name": "logs-policy" } } } PUT /_data_stream/logs # 写入日志 POST /logs/_doc { "@timestamp": "2026-02-04T10:00:00Z", "level": "INFO", "message": "Application started" } ``` ### 场景 2:指标监控 ```bash # 创建指标数据流 PUT /_index_template/metrics_template { "index_patterns": ["metrics-*"], "data_stream": {}, "template": { "settings": { "lifecycle.name": "hot-warm-cold" } } } PUT /_data_stream/metrics ``` ### 场景 3:审计日志 ```bash # 审计日志需要长期保存 PUT /_index_template/audit_template { "index_patterns": ["audit-*"], "data_stream": {}, "template": { "settings": { "lifecycle.name": "audit-retention" } } } PUT /_data_stream/audit ``` ### 场景 4:事件追踪 ```bash # 用户行为事件 PUT /_index_template/events_template { "index_patterns": ["events-*"], "data_stream": {}, "template": { "settings": { "lifecycle.name": "30-days-retention" } } } PUT /_data_stream/events ``` ## 数据流生命周期 ### 1. 创建阶段 ```bash # 创建模板 PUT /_index_template/my_template { "index_patterns": ["my-data-*"], "data_stream": {}, "priority": 100 } # 创建数据流 PUT /_data_stream/my-data ``` ### 2. 写入阶段 ```bash # 自动创建第一个后备索引 POST /my-data/_doc { "@timestamp": "2026-02-04T10:00:00Z", "value": 100 } ``` ### 3. 滚动阶段 ```bash # 手动滚动(或配置自动滚动) POST /my-data/_rollover ``` ### 4. 删除阶段 ```bash # 删除数据流及其所有后备索引 DELETE /_data_stream/my-data ``` ## 注意事项 1. **模板要求**:创建数据流前必须有匹配的索引模板 2. **时间戳字段**:必须包含 `@timestamp` 字段或配置的时间戳字段 3. **只读索引**:后备索引是隐藏且只读的 4. **不可变性**:后备索引创建后不能修改 5. **命名规范**:数据流名称必须小写,不能以 `.` 开头 6. **删除操作**:删除数据流会删除所有后备索引 ## 常见错误 ### 缺少索引模板 ```json { "error": { "type": "illegal_argument_exception", "reason": "no matching index template found for data stream [logs]" } } ``` **解决**:先创建带 `data_stream: {}` 的索引模板 ### 数据流已存在 ```json { "error": { "type": "resource_already_exists_exception", "reason": "data stream [logs] already exists" } } ``` ### 无效的数据流名称 ```json { "error": { "type": "illegal_argument_exception", "reason": "data stream name may not start with '.'" } } ``` ### 缺少时间戳字段 ```json { "error": { "type": "documentParsingException", "reason": "data stream timestamps must be specified" } } ``` ## 最佳实践 1. **命名规范**:使用描述性名称,如 `logs-nginx`、`metrics-system` 2. **模板管理**:为每个数据流创建专用模板 3. **生命周期策略**:配置 ILM 策略自动管理数据 4. **监控统计**:定期检查数据流统计信息 5. **容量规划**:根据数据量规划滚动策略 6. **备份策略**:配置快照策略保护重要数据