DEV Community

Cover image for Quick tip: Using Apache Beam with SingleStoreDB
Akmal Chaudhri for SingleStore

Posted on • Edited on

Quick tip: Using Apache Beam with SingleStoreDB

Abstract

An Apache Beam SingleStoreDB I/O connector is now available and, in this short article, we'll see how to connect to SingleStoreDB to read from a table and write to a table using Java.

The Java code files used in this article are available on GitHub.

Introduction

Previous articles have shown examples of SingleStoreDB use cases and integrations with various connectors. A new addition to the list of connectors is the Apache Beam SingleStoreDB I/O connector. This short article will test the connector with basic read-and-write operations using Java.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Beam Demo Group as our Workspace Group Name and beam-demo as our Workspace Name.

Once we've created our database in the following steps, we'll make a note of our password and host name.

Create a Database and Tables

In our SingleStoreDB Cloud account, we'll use the SQL Editor to create a new database, as follows:

CREATE DATABASE IF NOT EXISTS adtech;
Enter fullscreen mode Exit fullscreen mode

We'll also create two tables, derived from the previous AdTech example, as follows:

USE adtech;

CREATE TABLE campaigns_read (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

CREATE TABLE campaigns_write (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);
Enter fullscreen mode Exit fullscreen mode

We'll populate the campaigns_read table, as follows:

INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
Enter fullscreen mode Exit fullscreen mode

The complete SQL code is listed in Appendix A.

Create a Maven project

For quick testing, we'll use maven and build and run our code from the command line.

All the project code files are listed in Appendix B.

pom.xml

The pom.xml file is very straightforward with details of the Java version, the three main dependencies and that we want to build a single jar file with all the dependencies.

S2ReadTable class

Our Java code will provide the connection details and read the data from the campaigns_read table as key-value. We'll pass a parameter to our SQL query to make it more interesting.

        Pipeline pipeline = Pipeline.create();

        PCollection<KV<Integer, String>> data = pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
            .withStatementPreparator(new StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setInt(1, 7);
                }
            })
            .withRowMapper(new RowMapper<KV<Integer, String>>() {
                public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                    return KV.of(resultSet.getInt(1), resultSet.getString(2));
                }
            })
        );

        data
            .apply(MapElements
                .into(TypeDescriptors.strings())
                .via((KV<Integer, String> kv) -> kv.getKey() + "," + kv.getValue()))
            .apply(TextIO
                .write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
            );
Enter fullscreen mode Exit fullscreen mode

Once the data are in our PCollection, we'll convert the key-values to string format and write the data into a file. We'll replace /path/to/ with the actual path where we want to write the file to.

S2WriteTable class

We can also write data into a SingleStoreDB table. Our Java code will provide the connection details and read the data from a file as key-value and write it into the campaigns_write table.

        Pipeline pipeline = Pipeline.create();

        PCollection<String> lines = pipeline.apply(
            TextIO.read().from("/path/to/s2-00000-of-00001.csv"));

        PCollection<KV<Integer, String>> keyValues = lines.apply(
            MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
                .via((String line) -> {
                    String[] fields = line.split(",");
                    return KV.of(Integer.parseInt(fields[0]), fields[1]);
                })
        );

        keyValues.apply(SingleStoreIO.<KV<Integer, String>>write()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withTable("campaigns_write")
            .withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
                public List<String> mapRow(KV<Integer, String> element) {
                    List<String> result = new ArrayList<>();
                    result.add(element.getKey().toString());
                    result.add(element.getValue());
                    return result;
                }
            })
        );
Enter fullscreen mode Exit fullscreen mode

We'll replace /path/to/ with the actual path where we want to read the file from. We'll use the same file created by S2ReadTable.

Build and Run the Code

Just for testing purposes, we'll declare two environment variables:

export S2_HOST="<host>"
export S2_PASSWORD="<password>"
Enter fullscreen mode Exit fullscreen mode

We'll replace the <host> and <password> with the values from our SingleStoreDB Cloud account.

Next, we'll build the code, as follows:

mvn clean compile assembly:single
Enter fullscreen mode Exit fullscreen mode

First, we'll run the S2ReadTable Java code, as follows:

java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2ReadTable
Enter fullscreen mode Exit fullscreen mode

The CSV file should be written to the location we specified. If we look at the contents, we should see results similar to the following:

9,warmth
11,virtual city
8,ultra light
13,dream burger
10,run healthy
12,online lifestyle
14,super bowl tweet
Enter fullscreen mode Exit fullscreen mode

Second, we'll run the S2WriteTable Java code as follows:

java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2WriteTable
Enter fullscreen mode Exit fullscreen mode

If we switch to the SQL Editor in SingleStoreDB Cloud, we can check the contents of the campaigns_write table as follows:

SELECT * FROM campaigns_write;
Enter fullscreen mode Exit fullscreen mode

The output should be similar to the following:

+-------------+------------------+
| campaign_id | campaign_name    |
+-------------+------------------+
|          12 | online lifestyle |
|          10 | run healthy      |
|          13 | dream burger     |
|           9 | warmth           |
|           8 | ultra light      |
|          14 | super bowl tweet |
|          11 | virtual city     |
+-------------+------------------+
Enter fullscreen mode Exit fullscreen mode

Summary

This short article has shown examples of how to read from and write to SingleStoreDB using Apache Beam. Further information is available in the documentation.

Appendix A — SQL Code

CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;

CREATE TABLE campaigns_read (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

CREATE TABLE campaigns_write (
    campaign_id SMALLINT(6),
    campaign_name VARCHAR(255)
);

INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');

SELECT * FROM campaigns_write;
Enter fullscreen mode Exit fullscreen mode

Appendix B — Java Project Code

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.s2</groupId>
  <artifactId>s2-app</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>2.44.0</version>
      <scope>runtime</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
    <dependency>
      <groupId>com.singlestore</groupId>
      <artifactId>singlestore-jdbc-client</artifactId>
      <version>1.1.4</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-singlestore</artifactId>
      <version>2.44.0</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>
Enter fullscreen mode Exit fullscreen mode

S2ReadTable.java

package com.s2.beam;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.StatementPreparator;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class S2ReadTable {
    public static void main(String[] args) {

        String s2_host = System.getenv("S2_HOST");
        String s2_password = System.getenv("S2_PASSWORD");

        Pipeline pipeline = Pipeline.create();

        PCollection<KV<Integer, String>> data = pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
            .withStatementPreparator(new StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setInt(1, 7);
                }
            })
            .withRowMapper(new RowMapper<KV<Integer, String>>() {
                public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                    return KV.of(resultSet.getInt(1), resultSet.getString(2));
                }
            })
        );

        data
            .apply(MapElements
                .into(TypeDescriptors.strings())
                .via((KV<Integer, String> kv) -> kv.getKey() + "," + kv.getValue()))
            .apply(TextIO
                .write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
            );

        pipeline.run().waitUntilFinish();

    }
}
Enter fullscreen mode Exit fullscreen mode

S2WriteTable.java

package com.s2.beam;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.UserDataMapper;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.ArrayList;
import java.util.List;

public class S2WriteTable {
    public static void main(String[] args) {

        String s2_host = System.getenv("S2_HOST");
        String s2_password = System.getenv("S2_PASSWORD");

        Pipeline pipeline = Pipeline.create();

        PCollection<String> lines = pipeline.apply(
            TextIO.read().from("/path/to/s2-00000-of-00001.csv"));

        PCollection<KV<Integer, String>> keyValues = lines.apply(
            MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
                .via((String line) -> {
                    String[] fields = line.split(",");
                    return KV.of(Integer.parseInt(fields[0]), fields[1]);
                })
        );

        keyValues.apply(SingleStoreIO.<KV<Integer, String>>write()
            .withDataSourceConfiguration(DataSourceConfiguration
                .create(s2_host)
                .withUsername("admin")
                .withPassword(s2_password)
                .withDatabase("adtech"))
            .withTable("campaigns_write")
            .withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
                public List<String> mapRow(KV<Integer, String> element) {
                    List<String> result = new ArrayList<>();
                    result.add(element.getKey().toString());
                    result.add(element.getValue());
                    return result;
                }
            })
        );

        pipeline.run().waitUntilFinish();

    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (0)