DEV Community

Apache SeaTunnel
Apache SeaTunnel

Posted on

Source Code Analysis of Apache SeaTunnel Zeta Engine (Part 1): Server Initialization

This series of articles is based on Apache SeaTunnel version 2.3.6 and introduces the full process of handling a task from submission to execution with the Zeta engine. This document aims to assist newcomers to SeaTunnel by providing some guidance.

The article will be divided into three parts, covering the following aspects:

  1. Initialization of the SeaTunnel Server
  2. Task submission process on the Client side
  3. Task execution process upon receiving the task on the Server side

Due to the extensive source code analysis involved, this series of articles will document the overall task process.

References

Introduction to the Author

Hi the community, I'm Liu Naijie, a big data developer who has been involved in Apache SeaTunnel development for over a year. I have contributed some PRs to SeaTunnel and added some interesting features, including support for Avro file formats, nested structure queries in SQL Transform, and adding tags to nodes for resource isolation.

Recently, SeaTunnel has been implemented internally at my company, and I need to introduce SeaTunnel's technical architecture to my colleagues and bosses, as well as provide a detailed running process to help them better understand development and maintenance.

However, I found that there doesn't seem to be an article that analyzes the entire task execution process in detail, which would help developers more easily locate issues and add features.

So, I took some time to write this article, hoping to inspire other experts to write more source code analysis articles.

Cluster Topology

First, let's get an overview of the SeaTunnel Zeta engine architecture. SeaTunnel is implemented using Hazelcast for distributed cluster communication.

Since version 2.3.6, nodes in the cluster can be assigned as Master or Worker nodes, separating scheduling from execution to prevent excessive load on the Master node and avoid potential issues.

Version 2.3.6 also added a feature to add tag attributes to each node. When submitting a task, tags can be used to select the nodes where the task will run, achieving resource isolation.

Image description

The server side of the cluster is divided into Master and Worker nodes. The Master node is responsible for receiving requests, generating logical plans, allocating tasks, etc. (compared to previous versions, it now includes additional Backup nodes, which is a significant improvement for cluster stability).

The Worker node, on the other hand, is responsible only for task execution, which includes data reading and writing.

When submitting a task, you can create a Hazelcast client connection to the cluster for communication or use the REST API for communication.

Server Startup

After getting a general understanding of the cluster architecture, let's look at the specific process.

First, let's examine the Server startup process.

The command to start the Server is:

sh bin/seatunnel-cluster.sh -d -r <node role type>
Enter fullscreen mode Exit fullscreen mode

Looking into this script, you’ll find that it ultimately executes the following command:

java -cp seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer <other_java_jvm_config_and_args>
Enter fullscreen mode Exit fullscreen mode

Let’s check the code for starter.seatunnel.SeaTunnelServer:

public class SeaTunnelServer {
    public static void main(String[] args) throws CommandException {
        ServerCommandArgs serverCommandArgs =
                CommandLineUtils.parse(
                        args,
                        new ServerCommandArgs(),
                        EngineType.SEATUNNEL.getStarterShellName(),
                        true);
        SeaTunnel.run(serverCommandArgs.buildCommand());
    }
}
Enter fullscreen mode Exit fullscreen mode

This part uses JCommander to parse user-provided arguments and build and run a command. The serverCommandArgs.buildCommand returns the class:

public class ServerExecuteCommand implements Command<ServerCommandArgs> {

    private final ServerCommandArgs serverCommandArgs;

    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }

    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        String clusterRole = this.serverCommandArgs.getClusterRole();
        if (StringUtils.isNotBlank(clusterRole)) {
            if (EngineConfig.ClusterRole.MASTER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.MASTER);
            } else if (EngineConfig.ClusterRole.WORKER.toString().equalsIgnoreCase(clusterRole)) {
                seaTunnelConfig.getEngineConfig().setClusterRole(EngineConfig.ClusterRole.WORKER);

                // In Hazelcast, lite members will not store IMap data.
                seaTunnelConfig.getHazelcastConfig().setLiteMember(true);
            } else {
                throw new SeaTunnelEngineException("Not supported cluster role: " + clusterRole);
            }
        } else {
            seaTunnelConfig
                    .getEngineConfig()
                    .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
        }

        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, the configuration information is modified based on the role type.

When it is a Worker node, the Hazelcast node type is set to lite member. In Hazelcast, lite members do not store data.

Then, a Hazelcast instance is created and passed the SeaTunnelNodeContext instance and the modified configuration information.

public class SeaTunnelNodeContext extends DefaultNodeContext {

    private final SeaTunnelConfig seaTunnelConfig;

    public SeaTunnelNodeContext(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.seaTunnelConfig = seaTunnelConfig;
    }

    @Override
    public NodeExtension createNodeExtension(@NonNull Node node) {
        return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
    }

    @Override
    public Joiner createJoiner(Node node) {
        JoinConfig join =
                getActiveMemberNetworkConfig(seaTunnelConfig.getHazelcastConfig()).getJoin();
        join.verify();

        if (node.shouldUseMulticastJoiner(join) && node.multicastService != null) {
            super.createJoiner(node);
        } else if (join.getTcpIpConfig().isEnabled()) {
            log.info("Using LiteNodeDropOutTcpIpJoiner TCP/IP discovery");
            return new LiteNodeDropOutTcpIpJoiner(node);
        } else if (node.getProperties().getBoolean(DISCOVERY_SPI_ENABLED)
                || isAnyAliasedConfigEnabled(join)
                || join.isAutoDetectionEnabled()) {
            super.createJoiner(node);
        }
        return null;
    }

    private static boolean isAnyAliasedConfigEnabled(JoinConfig join) {
        return !AliasedDiscoveryConfigUtils.createDiscoveryStrategyConfigs(join).isEmpty();
    }

    private boolean usePublicAddress(JoinConfig join, Node node) {
        return node.getProperties().getBoolean(DISCOVERY_SPI_PUBLIC_IP_ENABLED)
                || allUsePublicAddress(
                        AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
    }
}
Enter fullscreen mode Exit fullscreen mode

In SeaTunnelNodeContext, the createNodeExtension method is overridden to use the engine.server.NodeExtension class.

The code for this class is:

public class NodeExtension extends DefaultNodeExtension {
    private final NodeExtensionCommon extCommon;

    public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
        super(node);
        extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
    }

    @Override
    public void beforeStart() {
        // TODO Get Config from Node here
        super.beforeStart();
    }

    @Override
    public void afterStart() {
        super.afterStart();
        extCommon.afterStart();
    }

    @Override
    public void beforeClusterStateChange(
            ClusterState currState, ClusterState requestedState, boolean isTransient) {
        super.beforeClusterStateChange(currState, requestedState, isTransient);
        extCommon.beforeClusterStateChange(requestedState);
    }

    @Override
    public void onClusterStateChange(ClusterState newState, boolean isTransient) {
        super.onClusterStateChange(newState, isTransient);
        extCommon.onClusterStateChange(newState);
    }

    @Override
    public Map<String, Object> createExtensionServices() {
        return extCommon.createExtensionServices();
    }

    @Override
    public TextCommandService createTextCommandService() {
        return new TextCommandServiceImpl(node) {
            {
                register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this));
                register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this));
                register(HTTP_GET, new RestHttpGetCommandProcessor(this));
                register(HTTP_POST, new RestHttpPostCommandProcessor(this));
            }
        };
    }

    @Override
    public void printNodeInfo() {
        extCommon.printNodeInfo(systemLogger);
    }
}
Enter fullscreen mode Exit fullscreen mode

In this part, we see that the SeaTunnelServer class is initialized in the constructor. This class is the core server-side class, and its full class name is org.apache.seatunnel.engine.server.SeaTunnelServer.

Let's review the code for this class:

public class SeaTunnelServer
        implements ManagedService, MembershipAwareService, LiveOperationsTracker {

    private static final ILogger LOGGER = Logger.getLogger(SeaTunnelServer.class);

    public static final String SERVICE_NAME = "st:impl:seaTunnelServer";

    private NodeEngineImpl nodeEngine;
    private final LiveOperationRegistry liveOperationRegistry;

    private volatile SlotService slotService;
    private TaskExecutionService taskExecutionService;
    private ClassLoaderService classLoaderService;
    private CoordinatorService coordinatorService;
    private ScheduledExecutorService monitorService;

    @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;

    private final SeaTunnelConfig seaTunnelConfig;

    private volatile boolean isRunning = true;

    public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
        this.liveOperationRegistry = new LiveOperationRegistry();
        this.seaTunnelConfig = seaTunnelConfig;
        LOGGER.info("SeaTunnel server start...");
    }



    @Override
    public void init(NodeEngine engine, Properties hzProperties) {
         ...
        if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal()
                == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
            startWorker();
            startMaster();

        } else if (EngineConfig.ClusterRole.WORKER.ordinal()
                == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) {
            startWorker();
        } else {
            startMaster();
        }
        ...
    }

    ....
}
Enter fullscreen mode Exit fullscreen mode

This class is the core code on the SeaTunnel Server side, and it starts relevant components based on the role of the node.

A brief summary of the SeaTunnel process:

SeaTunnel utilizes Hazelcast's foundational capabilities to implement cluster networking and invoke core startup code.

For those interested in a deeper understanding of this area, it's worth checking out Hazelcast's related content. Here is a summary of the invocation path:

Classes loaded in sequence:

  • starter.SeaTunnelServer
  • ServerExecuteCommand
  • SeaTunnelNodeContext
  • NodeExtension
  • server.SeaTunnelServer

Image description

Next, let's look in detail at the components created in the Master and Worker nodes.

Worker Node

private void startWorker() {
    taskExecutionService =
            new TaskExecutionService(
                    classLoaderService, nodeEngine, nodeEngine.getProperties());
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
}

public SlotService getSlotService() {
    if (slotService == null) {
        synchronized (this) {
            if (slotService == null) {
                SlotService service =
                        new DefaultSlotService(
                                nodeEngine,
                                taskExecutionService,
                                seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                service.init();
                slotService = service;
            }
        }
    }
    return slotService;
}
Enter fullscreen mode Exit fullscreen mode

In the startWorker method, two components are initialized: taskExecutionService and slotService. Both are related to task execution.

SlotService

First, let's look at the initialization of SlotService.

@Override
public void init() {
    initStatus = true;
    slotServiceSequence = UUID.randomUUID().toString();
    contexts = new ConcurrentHashMap<>();
    assignedSlots = new ConcurrentHashMap<>();
    unassignedSlots = new ConcurrentHashMap<>();
    unassignedResource = new AtomicReference<>(new ResourceProfile());
    assignedResource = new AtomicReference<>(new ResourceProfile());
    scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(
                    r ->
                            new Thread(
                                    r,
                                    String.format(
                                            "hz.%s.seaTunnel.slotService.thread",
                                            nodeEngine.getHazelcastInstance().getName())));
    if (!config.isDynamicSlot()) {
        initFixedSlots();
    }
    unassignedResource.set(getNodeResource());
    scheduledExecutorService.scheduleAtFixedRate(
            () -> {
                try {
                    LOGGER.fine(
                            "start send heartbeat to resource manager, this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                    sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                } catch (Exception e) {
                    LOGGER.warning(
                            "failed send heartbeat to resource manager, will retry later. this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                }
            },
            0,
            DEFAULT_HEARTBEAT_TIMEOUT,
            TimeUnit.MILLISECONDS);
}
Enter fullscreen mode Exit fullscreen mode

In SeaTunnel, there is a concept of dynamic Slots. If set to true, each node does not have a fixed number of Slots and can accept any number of tasks. If set to a fixed number of Slots, the node can only accept the number of tasks equal to the fixed Slots.

During initialization, the number of Slots is set based on whether dynamic Slots are enabled or not.

private void initFixedSlots() {
    long maxMemory = Runtime.getRuntime().maxMemory();
    for (int i = 0; i < config.getSlotNum(); i++) {
        unassignedSlots.put(
                i,
                new SlotProfile(
                        nodeEngine.getThisAddress(),
                        i,
                        new ResourceProfile(
                                CPU.of(0), Memory.of(maxMemory / config.getSlotNum())),
                        slotServiceSequence));
    }
}
Enter fullscreen mode Exit fullscreen mode

It can also be seen that a thread is started to periodically send heartbeats to the Master node. The heartbeat information includes the current node's information, such as the number of assigned and unassigned Slots. The Worker node updates this information to the Master node periodically through heartbeats.

@Override
public synchronized WorkerProfile getWorkerProfile() {
    WorkerProfile workerProfile = new WorkerProfile(nodeEngine.getThisAddress());
    workerProfile.setProfile(getNodeResource());
    workerProfile.setAssignedSlots(assignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedSlots(unassignedSlots.values().toArray(new SlotProfile[0]));
    workerProfile.setUnassignedResource(unassignedResource.get());
    workerProfile.setAttributes(nodeEngine.getLocalMember().getAttributes());
    workerProfile.setDynamicSlot(config.isDynamicSlot());
    return workerProfile;
}

private ResourceProfile getNodeResource() {
    return new ResourceProfile(CPU.of(0), Memory.of(Runtime.getRuntime().maxMemory()));
}
Enter fullscreen mode Exit fullscreen mode

TaskExecutionService

This component is related to task submission. Here, we briefly look at the related code and will delve into it further later.

When the Worker node initializes, it creates a TaskExecutionService object and calls its start method.

private final ExecutorService executorService =
        newCachedThreadPool(new BlockingTaskThreadFactory());

public TaskExecutionService(
        ClassLoaderService classLoaderService,
        NodeEngineImpl nodeEngine,
        HazelcastProperties properties) {
        // Load configuration
    seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
    this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
    this.nodeEngine = nodeEngine;
    this.classLoaderService = classLoaderService;
    this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
    // Metrics related
    MetricsRegistry registry = nodeEngine.getMetricsRegistry();
    MetricDescriptor descriptor =
            registry.newMetricDescriptor()
                    .withTag(MetricTags.SERVICE, this.getClass().getSimpleName());
    registry.registerStaticMetrics(descriptor, this);
    // Scheduled task to update metrics in IMAP
    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(
            this::updateMetricsContextInImap,
            0,
            seaTunnelConfig.getEngineConfig().getJobMetricsBackupInterval(),
            TimeUnit.SECONDS);

    serverConnectorPackageClient =
            new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig);

    eventBuffer = new ArrayBlockingQueue<>(2048);
    // Event forwarding service
    eventForwardService =
            Executors.newSingleThreadExecutor(
                    new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build());
    eventForwardService.submit(
            () -> {
                List<Event> events = new ArrayList<>();
                RetryUtils.RetryMaterial retryMaterial =
                        new RetryUtils.RetryMaterial(2, true, e -> true);
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        events.clear();

                        Event first = eventBuffer.take();
                        events.add(first);

                        eventBuffer.drainTo(events, 500);
                        JobEventReportOperation operation = new JobEventReportOperation(events);

                        RetryUtils.retryWithException(
                                () ->
                                        NodeEngineUtil.sendOperationToMasterNode(
                                                        nodeEngine, operation)
                                                .join(),
                                retryMaterial);

                        logger.fine("Event forward success, events " + events.size());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.info("Event forward thread interrupted");
                    } catch (Throwable t) {
                        logger.warning(
                                "Event forward failed, discard events " + events.size(), t);
                    }
                }
            });
}

public void start() {
    runBusWorkSupplier.runNewBusWork(false);
}
Enter fullscreen mode Exit fullscreen mode

In this class, a thread pool is created as a member variable. A scheduled task is created to update job status in IMAP, and a task is created to send Event information to the Master node. The Master node then sends these Events to external services.

Image description

Master Node

private void startMaster() {
    coordinatorService =
            new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);
}
Enter fullscreen mode Exit fullscreen mode

We can see that two components are started in the Master node: the coordinator component and the monitoring component.

The monitoring component's task is straightforward: it periodically prints cluster information.

CoordinatorService

public CoordinatorService(
        @NonNull NodeEngineImpl nodeEngine,
        @NonNull SeaTunnelServer seaTunnelServer,
        EngineConfig engineConfig) {
    this.nodeEngine = nodeEngine;
    this.logger = nodeEngine.getLogger(getClass());
    this.executorService =
            Executors.newCachedThreadPool(
                    new ThreadFactoryBuilder()
                            .setNameFormat("seatunnel-coordinator-service-%d")
                            .build());
    this.seaTunnelServer = seaTunnelServer;
    this.engineConfig = engineConfig;
    masterActiveListener = Executors.newSingleThreadScheduledExecutor();
    masterActiveListener.scheduleAtFixedRate(
            this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
}


private void checkNewActiveMaster() {
    try {
        if (!isActive && this.seaTunnelServer.isMasterNode()) {
            logger.info(
                    "This node become a new active master node, begin init coordinator service");
            if (this.executorService.isShutdown()) {
                this.executorService =
                        Executors.newCachedThreadPool(
                                new ThreadFactoryBuilder()
                                        .setNameFormat("seatunnel-coordinator-service-%d")
                                        .build());
            }
            initCoordinatorService();
            isActive = true;
        } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
            isActive = false;
            logger.info(
                    "This node become leave active master node, begin clear coordinator service");
            clearCoordinatorService();
        }
    } catch (Exception e) {
        isActive = false;
        logger.severe(ExceptionUtils.getMessage(e));
        throw new SeaTunnelEngineException("check new active master error, stop loop", e);
    }
}
Enter fullscreen mode Exit fullscreen mode

During initialization, a thread is started to periodically check if the current node is a Master node. If the current node is not a Master but becomes one in the cluster, it will call initCoordinatorService() to initialize its state and set the status to True.

If the node is marked as a Master but is no longer a Master in the cluster, it will perform a state cleanup.

private void initCoordinatorService() {
    // Retrieve distributed IMAP from Hazelcast
    runningJobInfoIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
    runningJobStateIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
    runningJobStateTimestampsIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
    ownedSlotProfilesIMap =
            nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
    metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
    // Initialize JobHistoryService
    jobHistoryService =
            new JobHistoryService(
                    runningJobStateIMap,
                    logger,
                    runningJobMasterMap,
                    nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_METRICS),
                    nodeEngine
                            .getHazelcastInstance()
                            .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
                    engineConfig.getHistoryJobExpireMinutes());
    // Initialize EventProcessor for sending events to other services
    eventProcessor =
            createJobEventProcessor(
                    engineConfig.getEventReportHttpApi(),
                    engineConfig.getEventReportHttpHeaders(),
                    nodeEngine);

    // If the user has configured the connector package service, create it on the master node.
    ConnectorJarStorageConfig connectorJarStorageConfig =
            engineConfig.getConnectorJarStorageConfig();
    if (connectorJarStorageConfig.getEnable()) {
        connectorPackageService = new ConnectorPackageService(seaTunnelServer);
    }
    // After cluster recovery, attempt to restore previous historical tasks
    restoreAllJobFromMasterNodeSwitchFuture =
            new PassiveCompletableFuture(
                    CompletableFuture.runAsync(
                            this::restoreAllRunningJobFromMasterNodeSwitch, executorService));
}
Enter fullscreen mode Exit fullscreen mode

In CoordinatorService, distributed maps (IMAPs), which are a data structure provided by Hazelcast, are pulled. This structure ensures data consistency across the cluster and is used in SeaTunnel to store task information, slot information, etc.

An EventProcessor is also created here. This class is used to send event notifications to other services. For example, if a task fails, it can send a message to a configured endpoint to achieve event-driven notifications.

Lastly, since the node startup could be due to a cluster crash or a node switch, historical running tasks need to be restored. It will attempt to restore these tasks by fetching the list of previously running tasks from the IMAP.

The IMAP data can be persisted to file systems like HDFS, allowing task states to be retrieved and restored even after a complete system reboot.

Components running within CoordinatorService include:

  • executorService (available on all nodes that can be elected as Master)
  • jobHistoryService (runs on the Master node)
  • eventProcessor (runs on the Master node)

Image description

On both Master and standby nodes:

  • Periodically check if the node is a Master; if it is, perform the corresponding state transition.

On the Master node:

  • Periodically print cluster state information.
  • Start the forwarding service to relay events to external services.

On Worker nodes, after startup:

  • Periodically report state information to the Master node.
  • Update task information in the IMAP.
  • Forward events generated by the Worker to the Master node to be pushed to external services.

At this point, all server-side service components have been successfully started. This concludes the article!

Top comments (0)