influxdb数据库
            十月 20, 2024
            
  
        
        概念
- 时间序列数据库: InfluxDB专注于存储和查询时间序列数据,这使得它非常适合处理按时间排序的数据流,如传感器数据、日志事件等。
 - 标签和字段: InfluxDB使用标签(tags)和字段(fields)来组织和查询数据。标签是用于标识和过滤数据的键值对,而字段包含实际的数值数据。
 - 测量(Measurements): 数据在InfluxDB中被组织成测量,每个测量包含一组相关的数据点。一个数据点由时间戳、标签和字段组成。
 - 保留策略(Retention Policies): InfluxDB允许你为数据指定保留策略,定义数据在数据库中的保存时间。这对于自动删除旧数据以释放空间很有用。
 - 查询语言: InfluxDB使用类似SQL的查询语言(InfluxQL)来进行数据查询和分析。你可以执行范围查询、聚合和过滤以提取所需的信息。
 - 连续查询(Continuous Queries): 允许你在数据库中预定义查询,并自动定期执行这些查询以生成新的聚合数据,这对于实时分析非常有用。
 - 插件和集成: InfluxDB支持多种插件和集成,包括Grafana、Telegraf等,以便更好地与其他工具协同工作。
 - 高性能: InfluxDB被设计成能够高效地处理大量的写入和查询操作,以满足高度动态的数据需求。
 
下表列出了 InfluxDB 和 MySQL 中一些常见概念的对比:
| InfluxDB | MySQL | 描述 | 
|---|---|---|
| Bucket | Database | 存储时间序列数据的容器,类似于 MySQL 中的数据库 | 
| Measurement | Table | 存储时间序列数据的表,类似于 MySQL 中的表 | 
| Tag | Index | 包含元数据或标签的键值对,用于标识和过滤数据 | 
| Field | Column | 存储实际数据值的字段,类似于 MySQL 中的列 | 
| Point | Row | 单个时间序列数据点,包括时间戳、标签和字段值 | 
| Timestamp | DATETIME 或 TIMESTAMP | 数据点的时间戳,表示数据点的时间 | 
| Retention Policy | N/A | 确定数据在系统中保存的时间范围,类似于 MySQL 中的数据保留策略 | 
| Continuous Query | View | 预先定义的查询,自动在后台计算并写入新的数据 | 
| Kapacitor | Stored Procedures or Triggers | 用于处理和处理数据流的实时数据处理引擎 | 
启动
1  | docker run -d \  | 
使用web
- 访问:xxxx:8086
 
使用手册
1. 界面一览

2. Bucket
ⅰ. 创建数据库


a. 插入数据

Tips:
- 在InfluxDB中,如果你插入数据时没有指定时间戳,系统将使用插入数据的当前时间戳。这是InfluxDB的默认行为。
 - 时间戳用于计算数据是否过期
 - 数据的过期时间通常是通过设置保留策略(Retention Policy)来实现的,可以在创建Bucket时选择关联一个保留策略
 - InfluxDB执行数据清理的频率是由配置文件中的compaction-interval参数决定的,默认值是每隔10分钟。
 
3. telegraf
- 安装telegraf(乌班图)
 
1  | 添加 Telegraf 存储库  | 
- 配置Telegraf的输入插件
 

- 选择插件
 

- 确认
 

- 服务器上运行这两条指令(可以使用nohup后台启动)
 

- 运行
 

- 查看数据收集情况
 

4. go语言交互influxdb

5. influxdb的查询语法
- 关键字及其使用的顺序:
 
- from: 指定数据源,通常是一个桶(bucket)。
 - range: 定义查询的时间范围。
 - filter: 过滤数据,类似于 SQL 中的 WHERE 子句。
 - group: 对数据进行分组,可以按标签、时间等进行分组。
 - aggregateWindow: 在时间窗口内对数据进行聚合。
 - pivot: 透视操作,将表格数据重新排列。
 - map: 对数据进行转换或添加新的列。
 - sort: 对数据进行排序。
 - limit: 限制返回的结果数量。
 - keep: 选择要保留的列。
 - yield: 生成结果集。
 
- 示例
 
1  | from(bucket: "your-bucket-name")  | 
6. 详细分析(✅常用,❎不常用)
- from()
 
- ✅bucket(必需): 参数,用于指定数据存储的桶(Bucket)名称。
 - ❎start 和 stop(可选): 可选参数,用于指定查询的时间范围。它们分别表示查询的开始和结束时间。如果不提供这两个参数,将查询所有时间范围内的数据。
 - ❎range(可选): 可选参数,提供了更灵活的时间范围设置,可以代替 start 和 stop 参数。它接受一个对象,包含 start 和 stop 属性。
 - ❎measurement(可选): 可选参数,用于过滤指定测量(Measurement)的数据。
 - ❎organization(可选): 可选参数,用于指定组织(Organization)的名称。如果 InfluxDB 实例启用了多组织支持,可以使用此参数选择特定组织的数据。
 
1  | from(  | 
- range()
 
- ✅start(必需): 查询的开始时间。可以是相对时间(例如 -1h 表示过去一小时)或绝对时间(例如 “2023-12-05T12:00:00Z”)。
 - ❎stop(可选): 查询的结束时间。可以是相对时间或绝对时间。如果不指定,表示查询直到当前时间。
 
1  | range(start:1h,stop:now())  | 
- filter()
 
✅基本过滤条件
1  | filter(fn: (r) => r["_measurement"] == "temperature")  | 
✅复合过滤条件
1  | filter(fn: (r) => r["_measurement"] == "temperature" and r["_field"] == "value")  | 
✅比较运算符
1  | filter(fn: (r) => r["_value"] > 25.0)  | 
❎正则表达式匹配
1  | filter(fn: (r) => r["_measurement"] =~ /temperature*/)  | 
❎时间条件
1  | filter(fn: (r) => r["_time"] > 2023-12-05T12:00:00Z)  | 
- group()
 
✅columns****(必需): 定义用于分组的列。可以是标签名、时间列或字段名。多个列之间使用逗号分隔。
❎mode****(可选): 定义分组模式。有两种模式可用:
**”by”**:按列值进行分组。
**”except”**:按照除指定列之外的所有其他列进行分组。
❎include****(可选): 仅在 mode: “except” 时有效,用于指定在 except 模式下仍然包括在分组中的列。
1  | group(columns: ["_tagKey", "_field"], mode: "by")  | 
- aggregateWindow()
 
- ✅every****(必需): 定义时间窗口的大小,表示在多长时间范围内进行一次聚合。可以使用相对时间(例如 5m 表示每 5 分钟聚合一次)或绝对时间。
 - ✅fn****(必需): 定义在每个时间窗口内执行的聚合函数。常见的聚合函数包括 mean(平均值)、sum(总和)、count(计数)等。
 - ❎createEmpty****(可选): 如果设置为 true,则在窗口内没有数据时生成一个空行,而不是忽略该窗口。默认为 false。
 - ❎timeSrc****(可选): 指定用于时间的源列,默认为 _time。
 
1  | |> aggregateWindow(every:1h,fn:mean,createEmpty:false,timeSrc:"_time")  | 
- pivot()
 
rowKey(必需): 定义表格的行键,即原数据中的列,这些列的值将变成新表格的行。
colKey(必需): 定义表格的列键,即原数据中的列,这些列的值将变成新表格的列。
valueColumn(必需): 定义新表格中的值来自于原数据中的哪一列。
groupBy(可选): 定义表格数据在进行聚合操作之前按照哪些列进行分组。这个参数是一个字符串数组。
举个例子
1  | _time | _field | _value  | 
- 使用函数
 
1  | from(bucket: "your-bucket-name")  | 
- 得到新的表格
 
1  | _time | temperature | humidity  | 
- map()
 
- 函数的参数是一个函数,该函数用于对每一行数据进行转换。
 
1  | |> map(fn: (r) => ({  | 
- 在这个例子中,map() 函数接收每一行数据 r,然后返回一个新对象,包含了原始数据的所有字段以及新增的 newColumn 列和修改过的 modifiedColumn 列。你可以根据具体的需求自定义转换逻辑。
 
- sort
 
- ✅columns(必需): 定义一个包含列名的字符串数组,表示按照哪些列进行排序。可以包含多个列名,排序将按照列名数组的顺序进行。(一百是根据时间来排序,desc不填就是时间升序)
 - ❎desc(可选): 定义排序方式,是升序还是降序。默认为 false,表示升序;如果设置为 true,则表示降序。
 
1  | |> sort(columns: ["_time"], desc: true)  | 
- limit
 
- ✅n(必需): 定义返回的行数,即限制查询结果返回的行数。
 - ❎offset(可选): 定义返回结果的起始位置。默认为 0,表示从查询结果的第一行开始。
 
1  | |> limit(n: 5, offset: 0)  | 
- keep
 
- ✅columns(必需): 定义一个包含要保留的列名的字符串数组,表示保留哪些列。
 - ❎fn(可选): 定义一个匿名函数,该函数接受一个列名作为输入,并返回一个新的列名。可以用于对保留的列进行重命名。
 
1  | keep(columns: ["column1", "column2"], fn: (column) => "new_name")  | 
- yield(写不写)
 
- name(可选): 定义返回结果的名称。如果没有提供 name 参数,则使用默认的名称。
 - yieldTag(可选): 将指定的标签添加到返回结果中。通常用于标识特定的查询结果。
 - yieldKey(可选): 将指定的键添加到返回结果中。通常用于标识特定的查询结果。
 - yieldValue(可选): 将指定的值添加到返回结果中。通常用于标识特定的查询结果。
 
1  | yield(name: "result_name", yieldTag: "tag_value", yieldKey: "key_value", yieldValue: "value_value")  | 
小demo
1  | from(bucket: "test_cpu") // 从名为 "test_cpu" 的 bucket 中获取数据  | 
查看评论