In the previous blog post Streaming data into Kafka S01/E01- Loading CSV file, I've illustrated how it can be easy to integrate data into Apache Kafka using the Kafka Connect framework.
In particular, we saw how to parse and transform CSV files to produce clean records into Kafka by using the Kafka Connect FilePulse connector.
XML(Extensible Markup Language) is another well-known data format. Usually, the XML format is not very appreciated by most developers because of its heaviness (or complexity). However, it's still used by many organizations to make systems interact with each other.
In this second article, we will see how to read records from XML files and load them into Kafka. To do this, we will once again use the Kafka Connect FilePulse connector, which offers native support for reading XML files.
Kafka Connect File Pulse connector
The Kafka Connect FilePulse connector is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).
For a broad overview of FilePulse, I suggest you read this article :
- Kafka Connect FilePulse - One Connector to Ingest them All!
- Streaming data into Kafka S01/E01- Loading CSV file
For more information, you can check-out the documentation here.
How to use the connector
The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
You can download the docker-compose.yml
file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Note : You can also install the connector either from GitHub Releases Page or from Confluent Hub.
Ingesting Data
We will start by creating a first connector with the following configuration. It specifies that the connector's tasks must use the XMLFileInputReader
to read the files that will be scheduled by the connector.
The connector will periodically scan the input directory that we set using the property fs.scan.directory.path
. Then, it will lookup for files matching the pattern .*\\.xml$
. Each file is uniquely identified and tracked depending on the value of the offset.strategy
. Here, the configuration specifies that a file is identified by its name
.
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"offset.strategy":"name",
"topic":"playlists-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Once the connector is created, you can check that it is properly started by executing:
$ curl -s \
localhost:8083/connectors/source-xml-filepulse-00/status \
| jq '.connector.state'
"RUNNING"
Now, let's create an XML files called playlists.xml
with the following content :
$ cat <<EOF > playlists.xml
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<playlist name="BestOfStarWars">
<track>
<title>Duel of the Fates</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)</album>
<duration>4:14</duration>
</track>
<track>
<title>Star Wars (Main Theme)</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)</album>
<duration>10:52</duration>
</track>
</playlist>
</playlists>
EOF
Then, copy this file from your host to the Docker container which runs the connector. You can run the following commands:
// Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp playlists.xml connect://tmp/kafka-connect/examples/playlists-00.xml
Finally, you can consume the playlists-filepulse-xml-00
topic to verify that the connector has detected and processed the XML file:
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfStarWars"
},
"track": {
"array": [
{
"Track": {
"title": {
"string": "Duel of the Fates"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "4:14"
}
}
},
{
"Track": {
"title": {
"string": "Star Wars (Main Theme)"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Star Wars: The Empire Strikes Back (Original Motion Picture Soundtrack)"
},
"duration": {
"string": "10:52"
}
}
}
]
}
}
}
}
}
}
Note: In the example above, we are using kafkacat to consume messages. The option -o-1 is used to only consume the latest message.
As you have noticed from the above example, the Connect FilePulse has automatically inferred the schema from the input XML file.
So, let's check the produced Avro schema by using the HTTP SchemaRegistry endpoint :
$ curl -X GET \
http://localhost:8081/subjects/playlists-filepulse-xml-00-value/versions/latest/schema \
| jq
Forcing Array-type
Let's create a second XML file named single-track-playlist.xml
with the following content :
$ cat <<EOF > single-track-playlists.xml
<?xml version="1.0" encoding="UTF-8"?>
<playlists>
<playlist name="BestOfJWilliams">
<track>
<title>Theme From Jurassic Park</title>
<artist>John Williams, London Symphony Orchestra</artist>
<album>Jurassic Park</album>
<duration>3:27</duration>
</track>
</playlist>
</playlists>
EOF
Then copy it to the Docker container as previously.
$ docker cp single-track-playlists.xml \
connect://tmp/kafka-connect/examples/playlists-01.xml
Now, let's consume the topic playlists-filepulse-xml-00
. You will see that the field named track
is of type Track
, whereas in the first example it was of type array.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfJWilliams"
},
"track": {
"Track": {
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
}
}
}
}
}
}
This is a common problem when trying to infer a data schema from an XML file. It's a bit difficult to identify that a field must be an array when only one element is present.
To solve this problem, it's possible to configure the XMLFileInputReader
to force some fields to be of type array.
Let's update the connector with the given configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"offset.strategy":"name",
"topic":"playlists-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Then recopy the XML file with a new name, so that it will be pickup by the connector.
$ docker cp single-track-playlists.xml connect://tmp/kafka-connect/examples/playlist-03.xml
Finally, consume the output topic to observe the effect of the new configuration.
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t playlists-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"playlists": {
"Playlists": {
"playlist": {
"Playlist": {
"name": {
"string": "BestOfJWilliams"
},
"track": {
"array": [
{
"Track": {
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
}
]
}
}
}
}
}
}
The track
field is now an array field.
Splitting XML using XPath expression
So far we have only used the "XMLFileInputReader" to read a single record from each XML file.
But we could just as well produce one record for each track
element present in the input XML files. To do this, we will simply override the property xpath.expression
, whose default value is /
.
So, let's create a new connector with the following configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "//playlist/track",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Then, consume the messages from the output topic tracks-filepulse-xml-00
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-00 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"artist": {
"string": "John Williams, London Symphony Orchestra"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
}
}
You should get one record per track
.
Extracting string values
By default, the XMLFileInputReader
expects the XPath expression to return a result of type NodeSet
.
If your goal is to extract a single text element from an input XML files you will have to set the property xpath.result.type=STRING
.
For example, if we configure a connector with the following XPath expression //playlist/track/title/text()
then the connector will produce messages in the form :
{
"message": {
"string": "Theme From Jurassic Park"
}
}
Then, you can use Single Message Transformations (SMT) such as ExtractField to replace the entire value with the extracted field.
Renaming fields
It's sometimes useful to be able to change the name of a field. This can be because you need to further contextualize a field or because you are not satisfied with the field names present in the XML input file.
For example, we can rename the field artist
because the field contains a comma-separated list of artists
(not a single one).
The File Pulse connector allows us to define complex pipelines to parse, transform, and enrich data through the use of processing Filters.
Thus, to rename the field artist
into artists
we will use the RenameFilter.
Let's create a new connector with this new configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"xpath.expression": "//playlist/track",
"filters": "RenameArtist",
"filters.RenameArtist.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
"filters.RenameArtist.field": "artist",
"filters.RenameArtist.target": "artists",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Next, consume the produced messages from the output topic tracks-filepulse-xml-02
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-02 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
},
"artists": {
"string": "John Williams, London Symphony Orchestra"
}
}
Splitting fields into an array
Finally, I'm going to show you how you can combine the filters to build a complete processing pipeline.
For this last example we will split the "artists" field into a table using the SplitFilter
.
Let's create a final connector with the following configuration :
$ curl \
-i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-xml-filepulse-03/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"force.array.on.fields": "track",
"xpath.expression": "//playlist/track",
"filters": "RenameArtists, SplitArtists",
"filters.RenameArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter",
"filters.RenameArtists.field": "artist",
"filters.RenameArtists.target": "artists",
"filters.SplitArtists.type": "io.streamthoughts.kafka.connect.filepulse.filter.SplitFilter",
"filters.SplitArtists.split": "artists",
"filters.SplitArtists.separator": ",",
"offset.strategy":"name",
"topic":"tracks-filepulse-xml-03",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Finally, consume the output topic tracks-filepulse-xml-03
to observe the result of our filter-chain.
$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t tracks-filepulse-xml-03 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
(output)
{
"title": {
"string": "Theme From Jurassic Park"
},
"album": {
"string": "Jurassic Park"
},
"duration": {
"string": "3:27"
},
"artists": {
"array": [
{
"string": "John Williams"
},
{
"string": " London Symphony Orchestra"
}
]
}
}
And voila!
Conclusion
We have seen in this article that it can be fairly easy to load records from XML files into Apache Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before loading it into Apache Kafka.
Please, share this article if you like this project. You can even add a ⭐ to the GitHub repository to support us.
Thank you very much.
Top comments (2)
Hi Florian, Is there a way to start the FilePulse Connector using the old way. i.e.
bin/connectStandalone worker.properties connector1.properties [connector2.properties ...]
Hi, Yes FilePulse is a standard Kafka Connect so you can deploy it with the standalone or distributed mode.