In the era of big data in games, data plays a crucial role in the smoothness and playability of games. Likewise, this has placed increasing demands on the richness and instantaneity of game data.
At the Apache SeaTunnel (Incubating) & Apache Inlong (Incubating) Meetup in May, Zhang Xiaodong, Senior Big Data Development Engineer of Yoozoo, shared with us the topic of Apache SeaTunnel (Incubating) implementation in Yoozoo Data Access Platform.
The presentation can be divided into four parts.
- Service background introduction
- The introduction of Apache SeaTunnel (Incubating)
- Apache SeaTunnel (Incubating) practice in the data access platform
- Future plans
Zhang Xiaodong
Senior Big Data Development Engineer, Yoozoo
In charge of the data platform construction of data centre, mainly involving development work related to the data access platform.
01 Background
01 Service Background
Yoozoo is a company with game development and operation as its core business, and the platform data is mainly generated by players in the games playing. At present, we internally divide the data into two categories, one is KPI data, including registration, activation, login, logout, recharge, online, upgrade data, etc., characterized by high instantaneity and relatively small data volume, less than 10G/day for a single game.
The other category is the ecological data, including players, activities, guilds, props, gameplay, shop purchases, resource production and sales, missions, chat, etc., characterized by low instantaneity, calculated by hours or days, and relatively large data volume in terabyte-level.
In data collection, the data sources are distributed in multiple regional time zones in China and overseas, with multiple network environments on intranet and extranet, and are mainly saved in the form of files, databases, message queues, etc. Data collection tools through Fluentd include Fluentd, Flume, Filebeat, Scribe, Logstash, Flink, etc. in real-time collection scenarios, with Fluentd and Flume dominating, and offline scenarios with the use of custom scripting tools, Sqoop, DataX, custom Spark tasks, etc. . Data is relayed via Kafka or FTP, etc.
The collected data is exported to various storage engines such as Hive, Clickhouse, HBase, StarRocks, ElasticSearch, etc. Based on these data, after the calculation of the data warehouses, we have developed a one-stop business analysis system, BA System, which provides product panoramic data query and analysis capabilities, as well as report development and display capabilities, including user profiling, game analysis, financial reconciliation, chat monitoring, advertising analysis, and other functional segments.
02 Technical challenges
We use many types of data synchronization and migration tools in this process, but there are still some practical needs that cannot be well supported. For example, in the game player profiling scenario, we need to import the user profile tag-wide table calculated by Data Warehouse from Hive to StarRocks with regular scheduling, to provide BA platform with user query, analysis, and selection functions. To solve this issue, we propose to use DataX, Spark, or SeaTunnel to implement it.
Another issue is to explore real-time data warehousing based on StarRocks, which requires consuming Kafka data to write to StarRocks in real-time. We propose to use Flume, Flink, or SeaTunnel to solve this problem.
As StarRocks is a popular database catching eyes in recent years, it requires some custom development work on our part, regardless of which tool is used to implement it. So we conder comprehensively from the prospects of resource control, development cost, ease of use, scalability, and community activity and Apache SeaTunnel (Incubating) was more suited to our needs, both in terms of short-term requirements and long-term planning.
02 Introducing SeaTunnel
01 Basic introduction
SeaTunnel is an up-and-coming Apache incubation project, a next-generation high-performance, distributed, mass data integration framework with an active community and rapid release updates.
In terms of architecture, SeaTunnel is highly abstracted from the data processing and is designed in a configurable and plug-in way so that users can easily use it out of the box.
In terms of performance, SeaTunnel embraces Spark and Flink engines, which naturally support distributed architecture and provide excellent performance.
In terms of capability, SeaTunnel already supports mainstream data sources and outputs. It can be applied in real-time/offline scenarios for data synchronization and data computation.
02 Customised plug-ins
SeaTunnel provides a complete plug-in extension system, supporting users to customize the development of Spark and Flink Source, TransForm and Sink plug-ins in stream mode and batch mode according to their needs, which is sufficient to meet most of the needs in daily development work.
For example, to support the aforementioned “user profile data migration”, the Spark engine custom StarRocks Sink plugin from SeaTunnel is used. The plugin needs to inherit the SparkBatchSink abstract class, define the specific output logic in the output method, and encapsulate the Stream Load import method provided by StarRocks. On top of this, we have added some additional features such as pre-operation, Label generation, and JSON import support. The diagram below depicts the general flow of data processing by the plugin.
Once the plugin has been customized, it is referenced in the configuration file using the full class name (see below).
03 Submitting the application
The image below shows the official way of configuring and submitting the application provided by SeaTunnel. It is necessary to define the configuration of the plugins env, source, transForm, and sink in the disk configuration file and then submit the application via the command line launch tool provided by SeaTunnel, setting the path to the submission mode and configuration file.
In practice, we found some contradictions in the official way of configuring and submitting applications when combined with our scheduling system. One is that it is not convenient to pass parameters dynamically, and the other is that it is not user-friendly for users to maintain both a plugin configuration file and a startup script during use.
To address this issue, we used Python to customize a new startup script that encapsulates the official startup script, enabling the user to shield the official plugin disk configuration file from the user, allowing the application to be configured and submitted in the following way, maintaining the configuration and submission information within a single script.
When configuring timed scheduling tasks in conjunction with the scheduling system, the custom startup tool receives the application submission and plugin configuration parameters, automatically generates the configuration file in a temporary directory, and calls the official submission tool to submit the SeaTunnel application. The disk configuration file is automatically deleted when the application finishes executing and resolve to the ApplicationId to pull the remote execution log.
03 Data Access Platform
01 Background
Here’s a quick introduction to our data access platform. We provide a DTS platform and an ETL platform internally. Users configure data source acquisition tasks in the DTS platform and ETL output tasks in the ETL platform. Once the task is configured, the platform automatically deploys the task at the corresponding node. The underlying collection link collects disk files via Fluentd, and forwards them to the Server, which sends the data to Kafka. Flume consumes Kafka, does simple ETL processing and outputs them to HDFS, and then compresses them into Hive Orc tables. Some ETL tasks are going to be processed by Flink gradually.
While based on this platform system above, we have faced some challenges.
- Scattered components: There is no complete link, managing different components in the link by multiple platforms is not convenient for problem troubleshooting and data source tracking.
- Scattered platforms: Data access involves the operation of multiple platforms, link management, monitoring, alerting, and other functions are scattered, which is not convenient for maintenance and iteration.
- Performance limitations: Flume, as the main ETL component at the bottom of the platform, is not convenient for data processing, resource control, and function iteration.
Therefore, we plan to integrate the original DTS and ETL platform, unify the access links and introduce SeaTunnel to solve the above problems.
02 Data access platform architecture
The green part of the diagram shows the platform management module, which provides services to users upwards and manages the various components of the collection link downwards. In the management module, we want to provide automated deployment, node management, component management, link visualization, link monitoring, data monitoring, and other functions. We have abstracted the underlying acquisition link into six components.
- Source: data source, describing data source storage rules and parsing rules
- Collect: Data source collection task
- Server: data distribution service, mainly used for data flood bypass and domestic and international data forwarding
- MQ: Message Queue
- ETL: data processing and output
- Storage: the output layer of the external storage engine, providing monitoring of the output layer
The metrics module, which collects and aggregates the metric information generated by each component, dovetails with the management module and Prometheus to provide monitoring and alerting services.
Users configure access link tasks within a unified data access platform, which parses the user configuration, converts it into the configuration information required by each component in the link, and automatically deploys the tasks on the corresponding nodes. Once the link is live, a visualization of the link is provided to facilitate task management and data traceability, and task monitoring and data monitoring capabilities are provided on the link. One of the components, ETL, is implemented using SeaTunnel Flink.
03 SeaTunnel integration and re-development
In the process of integrating SeaTunnel within the platform services, I encountered several issues such as
- When submitting applications relied on Spark and Flink’s command-line submission tools spark-submit, and Flink run respectively, there is no API provided for submitting applications
- The configuration plugin relies closely on the disk configuration file and cannot configure tasks by API.
- No API is provided for application monitoring and management, and tasks cannot be easily stopped, restarted, and monitored.
To address these issues we combed through and investigated the SeaTunnel source code and found that it could be solved by re-modifying the compiled SeaTunnel and developing some external tools.
By wrapping SparkLauncher by Spark, wrapping RestCluster Client tool by Flink, and customizing the API of application management, the functions of application submission, stopping, and status monitoring can be achieved. Alternatively, more specific in-app execution process monitoring can be achieved by wrapping the Restful APIs provided by Spark and Flink.
Plug-in configuration can be done by modifying the SeaTunnel source code to replace the original way of loading the disk configuration file with a Map structure.
Before the modification, users needed to first configure the disk configuration file to use SeaTunnel, call the startup tool and pass in the configuration file disk path. The parseFile method of the ConfigFactory would be used in SeaTunnel to load the disk configuration file corresponding to the parsed file path, generating a Config instance for the subsequent generation of plug-in functionality.
After modification, the user can configure the SeaTunnel application via the web service. The service organizes the configuration information into JSON format and passes it to the SeaTunnel startup class via the encapsulated RestClusterClient API tool, which starts SeaTunnel. In SeaTunnel, a tool class parses JSON into Map structure is added, and the ConfigFactory’s parseFile method is modified to parseMap method, generating Concig instances to submit the applications. And RestClusterClient provides the function of task information and status listening.
The image below depicts some of the more specific modifications to wrap the RestClusterClient tool within the Flink Operator API to submit and manage SeaTunnel tasks. CommandMapArgs were added to the original CommandLineArgs to convert the JSON configuration into a Map configuration. The Map configuration is parsed through ConfigFactory.parseMap to generate Config instances.
In the specific application process of the data access platform, SeaTunnel only needs to consume Kafka to output Hive and StarRocks at present. According to service needs, Schema checking plug-ins and game whitelist filtering plug-ins are customized. Schema checking plug-ins will regularly load updated Schema information in the configuration to enable field parsing and validation of consumed data, and filter data that does not conform to the rules, such as the number of fields and field types. The Whitelist plug-in regularly loads and updates the configured whitelist to filter game data outside the configuration.
04 Platform usage example — creating access tasks
The following shows the process of creating a data access task on the data access platform, taking File real-time collection as an example. The first thing to do is to fill in the basic information about the task, where the data types are the various data categories mentioned at the beginning. Each data type represents a data template and has a basic fixed configuration of the link component, which only needs to be partially modified.
The source component defines the basic description of the data source, including information on the type of data source, parsing method, data volume, etc., as well as the location of nodes and directories where the data is located. The platform will then deploy acquisition tasks on the acquisition nodes according to these configurations, monitor the acquisition files, and parse the acquisition data.
The Collect component is a specific collection process implemented using Flume, which collects the target file, defines the basic filtering conditions of the data, and the Channel type, and sends the data to the port of the Server. The Server is a fixed listening port to send the Kafka process. A Server is a group of nodes, mainly responsible for data flooding bypass and routing, for example, data sent from abroad to China needs a relay Server.
The underlying ETL part is implemented by SeaTunnel Flink, where the parameters for SeaTunnel to consume Kafka can be set. Data can be sampled from Source or Kafka, parsed to generate Schema, and users supplement and modify the Schema, including field names, field types, whether they are nullable, whether they are primary keys and SQL expressions for processing the fields. Once the task is deployed, the data is verified against the Schema, the fields are processed according to SQL functions based on whether they are nullable and primary key statistics.
Finally, the Storage component is configured, which is the output of SeaTunnel. There are options to export to Hive, StarRocks, Clickhouse, etc. The platform will access the internal metadata system to select information such as clusters, library table partitions, etc.
04 Planning for the future
The data access platform is still in a relatively early stage of development, and many areas need further iteration and optimization, mainly including
- Improve the link monitoring module and support Metrics docking to Prometheus
- Adding SeaTunnel plug-in management module, supporting users to upload custom plug-ins
- Enriching SeaTunnel’s data calculation capabilities
Thanks for reading, that’s all I have to share today.
About SeaTunnel
SeaTunnel (formerly Waterdrop) is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day in a stable and efficient manner.
Why do we need SeaTunnel?
SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.
- Data loss and duplication
- Task buildup and latency
- Low throughput
- Long application-to-production cycle time
- Lack of application status monitoring
SeaTunnel Usage Scenarios
- Massive data synchronization
- Massive data integration
- ETL of large volumes of data
- Massive data aggregation
- Multi-source data processing
Features of SeaTunnel
- Rich components
- High scalability
- Easy to use
- Mature and stable
How to get started with SeaTunnel quickly?
Want to experience SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.
https://seatunnel.apache.org/docs/2.1.0/developement/setup
How can I contribute?
We invite all partners who are interested in making local open-source global to join the SeaTunnel contributors family and foster open-source together!
Submit an issue:
https://github.com/apache/incubator-seatunnel/issues
Contribute code to:
https://github.com/apache/incubator-seatunnel/pulls
Subscribe to the community development mailing list :
dev-subscribe@seatunnel.apache.org
Development Mailing List :
Join Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-10u1eujlc-g4E~ppbinD0oKpGeoo_dAw
Follow Twitter:
https://twitter.com/ASFSeaTunnel
Come and join us!
Top comments (0)