DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Synchronizing Data from InfluxDB to Doris with SeaTunnel

This article provides a detailed guide on using SeaTunnel to synchronize data from InfluxDB to Doris. Leveraging SeaTunnel's robust data integration capabilities, users can efficiently transfer time-series data stored in InfluxDB to Doris, making it easily accessible for data querying and analysis.

Version Information:
SeaTunnel 2.3.3

InfluxDB 2.7.6

Doris 2.1.3 rc09

Preparation

The installation process for SeaTunnel 2.3.3 is omitted here and can be found in the official documentation.

After installing SeaTunnel 2.3.3, delete two connector JAR files, connector-hudi-2.3.3.jar and connector-datahub-2.3.3.jar, as these may cause database synchronization errors.

Add the following JAR files to your setup: seatunnel-api-2.3.3.jar, seatunnel-transforms-v2-2.3.3.jar, mysql-connector-java-8.0.28.jar, and jersey-client-1.19.4.jar. Without these, the data synchronization script may fail due to missing classes.

For InfluxDB 2.7.6, certain preliminary configurations are necessary, which will be detailed below.

To view field types easily when defining fields for the synchronization file, you can use the InfluxDB Studio-0.2.0 client. This client allows viewing field types, aiding in setting up field definitions for synchronization.

After installing InfluxDB 2.7.6 on Linux, you can access its UI through ip:8086 and input your username, password, organization, and bucket information.

Synchronization Process and Troubleshooting

Configuring InfluxDB credentials in SeaTunnel 2.3 might lead to errors, such as issues retrieving field information. After tracing the SeaTunnel code, it was found that the 401 authorization error was persistent.

Attempts to connect using the InfluxDB Studio management tool with the same username and password as on the UI also resulted in 401 errors. Further investigation revealed that the UI credentials could not directly authenticate to the database.

To set up database access credentials, run the following command to assign permissions:

influx v1 auth create -o orgName --read-bucket bucketId --username=username
Enter fullscreen mode Exit fullscreen mode

Alternatively:

influx v1 auth create -o "Organization" --write-bucket bucketId --read-bucket bucketId --username=username --password=password
Enter fullscreen mode Exit fullscreen mode

To delete an authorization, use:

influx v1 auth delete --id 'id'
Enter fullscreen mode Exit fullscreen mode

where id can be obtained from the output of influx v1 auth list.

Image description

After executing these commands and setting up the password, you should be able to log in successfully using the new credentials, allowing for data synchronization through SeaTunnel.

Data Synchronization Configuration File:

v1.batch.config_tmp.template

Below is a sample SeaTunnel configuration file to synchronize data from InfluxDB to Doris.

env {
  execution.parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  influxdb {
    url = "http://X.X.X.X:8086"
    token = "your_token" # optional
    org = "your_organization"
    bucket = "your_bucket" # optional
    database = "your_bucket"
    username = "your_influxdb_username"
    password = "your_influxdb_password"
    epoch = "H" # options available on InfluxDB documentation
    query_timeout_sec = 600
    measurement = "prometheus_remote_write" # data table
    fields = ["node_cpu_seconds_total", "node_memory_MemTotal_bytes"] # optional
    sql = """SELECT node_cpu_seconds_total as system_cpu_usage, cpu as process_occupy_physical_memory_size, job as create_dept, node_memory_MemTotal_bytes as process_read_written_file_system_total_bytes, node_memory_MemAvailable_bytes as process_open_file_describe_quantity, time as create_time FROM "prometheus_remote_write" where time > now() - 1h"""
    where = " where time > now() - 1h"

    schema {
      fields {
        system_cpu_usage = FLOAT
        process_occupy_physical_memory_size = INT
        create_dept = STRING
        process_read_written_file_system_total_bytes = FLOAT
        process_open_file_describe_quantity = FLOAT
        create_time = BIGINT
      }
    }
  }
}

sink {
  Doris {
    fenodes = "X.X.X.X:8030"
    username = "username"
    password = "password"
    table.identifier = "sbyw_data_acquisition.sbyw_application_process_type_tmp"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    sink.max-retries = 3
    batch_size = 10000
    result_table_name = "sbyw_application_process_type_tmp"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

transform {
  FieldMapper {
    source_table_name = "prometheus_remote_write"
    result_table_name = "sbyw_application_process_type_tmp"
    field_mapper = {
      system_cpu_usage = system_cpu_usage
      process_occupy_physical_memory_size = process_occupy_physical_memory_size
      process_read_written_file_system_total_bytes = process_read_written_file_system_total_bytes
      process_open_file_describe_quantity = process_open_file_describe_quantity
      create_time = create_time
      create_dept = create_dept
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Run the data synchronization script using:

./bin/seatunnel.sh -c ./config/v1.batch.config_tmp.template
Enter fullscreen mode Exit fullscreen mode

Doris Test Table and InfluxDB Data

Here is my Doris test table:

Image description

Below is a snapshot of the data from the InfluxDB Studio-0.2.0 client with InfluxDB 2.7.6 data and the synchronization results in Doris.

Image description

Note: InfluxDB 2.7.6 does not fully support SQL queries. It only allows basic, simple queries. Attempts to perform complex queries with aggregations and single fields will result in errors due to unsupported query structures.

Image description

However, the following type of query is supported. This is due to the design of InfluxDB 2, which does not support combining aggregation queries with individual field queries.

Image description

Final Results:

Image description

My Data Synchronized to Doris:

Image description

Here it is, the data is successfully synchronized from InfluxDB to Doris with SeaTunnel!

Top comments (0)