ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

时序数据库TDengineSQL 写入

2023-12-26 10:25:12  阅读:83  来源: 互联网

标签:


SQL 写入简介​

应用通过连接器执行 INSERT 语句来插入数据,用户还可以通过 TDengine CLI,手动输入 INSERT 语句插入数据。

一次写入一条​

下面这条 INSERT 就将一条记录写入到表 d1001 中:

INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31);

这里的ts1为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多条​

TDengine 支持一次写入多条记录,比如下面这条命令就将两条记录写入到表 d1001 中:

INSERT INTO d1001 VALUES (ts1, 10.2, 220, 0.23) (ts2, 10.3, 218, 0.25);

这里的ts1ts2为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

一次写入多表​

TDengine 也支持一次向多个表写入数据,比如下面这条命令就向 d1001 写入两条记录,向 d1002 写入一条记录:

INSERT INTO d1001 VALUES (ts1, 10.3, 219, 0.31) (ts2, 12.6, 218, 0.33) d1002 VALUES (ts3, 12.3, 221, 0.31);

这里的ts1ts2ts3为Unix时间戳(Unix timestamp),允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的 KEEP 值。时间戳详情规则参考 TDengine SQL数据写入 关于时间戳一节

详细的 SQL INSERT 语法规则参考 TDengine SQL 的数据写入。

INFO
  • 要提高写入效率,需要批量写入。一般来说一批写入的记录条数越多,插入效率就越高。但一条记录不能超过 48KB,一条 SQL 语句总长度不能超过 1MB。
  • TDengine 支持多线程同时写入,要进一步提高写入速度,一个客户端需要打开多个同时写。但线程数达到一定数量后,无法再提高,甚至还会下降,因为线程频繁切换,会带来额外开销,合适的线程数量与服务端的处理能力,服务端的具体配置,数据库的参数,数据定义的 Schema,写入数据的 Batch Size 等很多因素相关。一般来说,服务端和客户端处理能力越强,所能支持的并发写入的线程可以越多;数据库配置时的 vgroups 参数值越多(但仍然要在服务端的处理能力以内)则所能支持的并发写入越多;数据定义的 Schema 越简单,所能支持的并发写入越多。
WARNING
  • 对同一张表,如果新插入记录的时间戳已经存在,则指定了新值的列会用新值覆盖旧值,而没有指定新值的列则不受影响。
  • 写入的数据的时间戳必须大于当前时间减去数据库配置参数 KEEP 的时间。如果 KEEP 配置为 3650 天,那么无法写入比 3650 天还早的数据。写入数据的时间戳也不能大于当前时间加配置参数 DURATION。如果 DURATION 为 2,那么无法写入比当前时间还晚 2 天的数据。

示例程序​

普通 SQL 写入​

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
package com.taos.example;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;


public class RestInsertExample {
    private static Connection getConnection() throws SQLException {
        String jdbcUrl = "jdbc:TAOS-RS://localhost:6041?user=root&password=taosdata";
        return DriverManager.getConnection(jdbcUrl);
    }

    private static List<String> getRawData() {
        return Arrays.asList(
                "d1001,2018-10-03 14:38:05.000,10.30000,219,0.31000,'California.SanFrancisco',2",
                "d1001,2018-10-03 14:38:15.000,12.60000,218,0.33000,'California.SanFrancisco',2",
                "d1001,2018-10-03 14:38:16.800,12.30000,221,0.31000,'California.SanFrancisco',2",
                "d1002,2018-10-03 14:38:16.650,10.30000,218,0.25000,'California.SanFrancisco',3",
                "d1003,2018-10-03 14:38:05.500,11.80000,221,0.28000,'California.LosAngeles',2",
                "d1003,2018-10-03 14:38:16.600,13.40000,223,0.29000,'California.LosAngeles',2",
                "d1004,2018-10-03 14:38:05.000,10.80000,223,0.29000,'California.LosAngeles',3",
                "d1004,2018-10-03 14:38:06.500,11.50000,221,0.35000,'California.LosAngeles',3"
        );
    }


    /**
     * The generated SQL is:
     * INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000)
     * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000)
     * power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000)
     * power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000)
     * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000)
     * power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000)
     * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000)
     * power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000)
     */
    private static String getSQL() {
        StringBuilder sb = new StringBuilder("INSERT INTO ");
        for (String line : getRawData()) {
            String[] ps = line.split(",");
            sb.append("power." + ps[0]).append(" USING power.meters TAGS(")
                    .append(ps[5]).append(", ") // tag: location
                    .append(ps[6]) // tag: groupId
                    .append(") VALUES(")
                    .append('\'').append(ps[1]).append('\'').append(",") // ts
                    .append(ps[2]).append(",") // current
                    .append(ps[3]).append(",") // voltage
                    .append(ps[4]).append(") "); // phase
        }
        return sb.toString();
    }

    public static void insertData() throws SQLException {
        try (Connection conn = getConnection()) {
            try (Statement stmt = conn.createStatement()) {
                stmt.execute("CREATE DATABASE power KEEP 3650");
                stmt.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
                        "TAGS (location BINARY(64), groupId INT)");
                String sql = getSQL();
                int rowCount = stmt.executeUpdate(sql);
                System.out.println("rowCount=" + rowCount); // rowCount=8
            }
        }
    }

    public static void main(String[] args) throws SQLException {
        insertData();
    }
}

查看源码

NOTE
  1. 无论 RESTful 方式建立连接还是本地驱动方式建立连接,以上示例代码都能正常工作。
  2. 唯一需要注意的是:由于 RESTful 接口无状态, 不能使用 USE db; 语句来切换数据库, 所以在上面示例中使用了dbName.tbName指定表名。

参数绑定写入​

TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这些 API 目前也仅支持用问号 ? 来代表待绑定的参数。在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。

需要注意的是,只有使用原生连接的连接器,才能使用参数绑定功能。

  • Java
  • Python
  • Go
  • Rust
  • Node.js
  • C#
  • C
  • PHP
package com.taos.example;

import com.taosdata.jdbc.TSDBPreparedStatement;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;

public class StmtInsertExample {
    private static String datePattern = "yyyy-MM-dd HH:mm:ss.SSS";
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern(datePattern);

    private static List<String> getRawData(int size) {
        SimpleDateFormat format = new SimpleDateFormat(datePattern);
        List<String> result = new ArrayList<>();
        long current = System.currentTimeMillis();
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            String time = format.format(current + i);
            int id = random.nextInt(10);
            result.add("d" + id + "," + time + ",10.30000,219,0.31000,California.SanFrancisco,2");
        }
        return result.stream()
                .sorted(Comparator.comparing(s -> s.split(",")[0])).collect(Collectors.toList());
    }

    private static Connection getConnection() throws SQLException {
        String jdbcUrl = "jdbc:TAOS://localhost:6030?user=root&password=taosdata";
        return DriverManager.getConnection(jdbcUrl);
    }

    private static void createTable(Connection conn) throws SQLException {
        try (Statement stmt = conn.createStatement()) {
            stmt.execute("CREATE DATABASE if not exists power KEEP 3650");
            stmt.executeUpdate("use power");
            stmt.execute("CREATE STABLE if not exists meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " +
                    "TAGS (location BINARY(64), groupId INT)");
        }
    }

    private static void insertData() throws SQLException {
        try (Connection conn = getConnection()) {
            createTable(conn);
            String psql = "INSERT INTO ? USING power.meters TAGS(?, ?) VALUES(?, ?, ?, ?)";
            try (TSDBPreparedStatement pst = (TSDBPreparedStatement) conn.prepareStatement(psql)) {
                String tableName = null;
                ArrayList<Long> ts = new ArrayList<>();
                ArrayList<Float> current = new ArrayList<>();
                ArrayList<Integer> voltage = new ArrayList<>();
                ArrayList<Float> phase = new ArrayList<>();
                for (String line : getRawData(100000)) {
                    String[] ps = line.split(",");
                    if (tableName == null) {
                        // bind table name and tags
                        tableName = "power." + ps[0];
                        pst.setTableName(ps[0]);
                        pst.setTagString(0, ps[5]);
                        pst.setTagInt(1, Integer.valueOf(ps[6]));
                    } else {
                        if (!tableName.equals(ps[0])) {
                            pst.setTimestamp(0, ts);
                            pst.setFloat(1, current);
                            pst.setInt(2, voltage);
                            pst.setFloat(3, phase);
                            pst.columnDataAddBatch();
                            pst.columnDataExecuteBatch();

                            // bind table name and tags
                            tableName = ps[0];
                            pst.setTableName(ps[0]);
                            pst.setTagString(0, ps[5]);
                            pst.setTagInt(1, Integer.valueOf(ps[6]));
                            ts.clear();
                            current.clear();
                            voltage.clear();
                            phase.clear();
                        }
                    }
                    // bind values
                    // ps[1] looks like: 2018-10-03 14:38:05.000
                    LocalDateTime localDateTime = LocalDateTime.parse(ps[1], formatter);
                    ts.add(localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli());
                    current.add(Float.valueOf(ps[2]));
                    voltage.add(Integer.valueOf(ps[3]));
                    phase.add(Float.valueOf(ps[4]));
                }
                pst.setTimestamp(0, ts);
                pst.setFloat(1, current);
                pst.setInt(2, voltage);
                pst.setFloat(3, phase);
                pst.columnDataAddBatch();
                pst.columnDataExecuteBatch();
            }
        }
    }

    public static void main(String[] args) throws SQLException {
        insertData();
    }
}

标签:
来源:

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有