I want to share my experience in setupping and configuring Quartz Scheduler in Cluster mode with Spring Boot.
Modules description
So we have two application in the project: supervisor and worker.
Supervisor - is the main application there you can manage jobs(schedule/remove/check statuses). Besides, I didn't find how to disable execution of jobs on master, so it also have an a "worker" capabilities.
Worker - simply executes scheduled jobs, nothing interesting.
I use MySQL as database for Quartz cluster. For simplicity I launch it in docker by using docker compose.
The config files…
Spring config file for supervisor contains connection string to the database and credentials and port that will be used for REST API.
endpoints:
jmx:
unique-names: true
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
password: 12345
url: jdbc:mysql://localhost/test?createDatabaseIfNotExist=true&useSSL=false&allowPublicKeyRetrieval=true
username: root
liquibase:
change-log: classpath:db/changelog/db.changelog-master.xml
Also it contains path to the file with rules for initial creation of tables in MySQL. The SQL tables creation scripts are from quartz-2.2.3-distribution.tar.gz/quartz-2.2.3-distribution.tar
Spring config file for worker are almost same — it not contains info for initial table creation and different REST API port.
Quartz config files for supervisor and worker are also almost identical:
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.instanceName=spring-boot-quartz-cluster-example
org.quartz.scheduler.instanceId=AUTO
#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.threadCount=1
#============================================================================
# Configure JobStore
#============================================================================
#org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
# In new spring use LocalDataSourceJobStore https://github.com/spring-projects/spring-framework/issues/27709
org.quartz.jobStore.class=org.springframework.scheduling.quartz.LocalDataSourceJobStore
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=true
org.quartz.jobStore.misfireThreshold=60000
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.clusterCheckinInterval=20000
the difference is only in how many thread pool are — org.quartz.threadPool.threadCount
The code…
So for first we need autowiring support for jobs, and spring still not have support for this: SPR-14471, SPR-9698. Really, vote for this issues!
Fortunately the workaround is available and I say many thanks to this guy and still don’t understand why it’s not in spring yet…
TestJob1
package com.github.hronom.spring.boot.quartz.cluster.example.common.job;
import com.github.hronom.spring.boot.quartz.cluster.example.common.service.TestService;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
@DisallowConcurrentExecution
public class TestJob1 implements Job {
@Autowired
private TestService testService;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
String id = jobExecutionContext.getJobDetail().getKey().getName();
testService.run(id);
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
}
It just call method of the autowired local class that currently runs inside instance where job executed. Because service can throw exception, I add capturing of all exception and wrap it in JobExecutionException
.
TestServiceImpl
The service are also simple - it emulates long running process and randomly throws exceptions:
package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.services;
import com.github.hronom.spring.boot.quartz.cluster.example.common.service.TestService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Service;
import java.util.Random;
import java.util.concurrent.TimeUnit;
@Service
public class TestServiceImpl implements TestService {
private final Log logger = LogFactory.getLog(getClass());
private final Random random = new Random();
public void run(String id) throws Exception {
logger.info("Running job on supervisor, job id " + id);
if (random.nextInt(3) == 1) {
throw new Exception("Randomly generated test exception on supervisor");
}
try {
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
} catch (InterruptedException e) {
logger.error("Error", e);
}
logger.info("Completed job on supervisor, job id " + id);
}
}
Throwing of exception added to check mechanic how Quartz handle exceptions in jobs.
JobsListenerService
Each instance of supervisor/worker has a jobs listener to listen events raised by local execution of jobs.
package com.github.hronom.spring.boot.quartz.cluster.example.common.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;
import org.springframework.stereotype.Service;
@Service
public class JobsListenerService implements JobListener {
private final Log logger = LogFactory.getLog(getClass());
@Override
public String getName() {
return "Main Listener";
}
@Override
public void jobToBeExecuted(JobExecutionContext context) {
logger.info("Job to be executed " + context.getJobDetail().getKey().getName());
}
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
logger.info("Job execution vetoed " + context.getJobDetail().getKey().getName());
}
@Override
public void jobWasExecuted(
JobExecutionContext context, JobExecutionException jobException
) {
logger.info(
"Job was executed " +
context.getJobDetail().getKey().getName() +
(jobException != null ? ", with error" : "")
);
}
}
SchedulerConfig
package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.configs;
import com.github.hronom.spring.boot.quartz.cluster.example.common.spring.AutowiringSpringBeanJobFactory;
import com.github.hronom.spring.boot.quartz.cluster.example.common.service.JobsListenerService;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import java.io.IOException;
import java.util.Properties;
import javax.sql.DataSource;
import liquibase.integration.spring.SpringLiquibase;
@Configuration
@EnableAsync
@EnableScheduling
public class SchedulerConfig {
@Bean
public JobFactory jobFactory(
ApplicationContext applicationContext,
// Injecting SpringLiquibase to ensure liquibase is already initialized and created the Quartz tables
SpringLiquibase springLiquibase
) {
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource, JobFactory jobFactory, JobsListenerService jobsListenerService)
throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
factory.setJobFactory(jobFactory);
factory.setQuartzProperties(quartzProperties());
factory.setGlobalJobListeners(jobsListenerService);
// https://medium.com/@rudra.ramesh/use-following-code-in-supervisor-app-while-creating-schedulerfactorybean-object-now-supervisor-fd2f95365350
// If you need to disable launching of jobs on supervisor use this:
//factory.setAutoStartup(false);
return factory;
}
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
}
As you can see we create custom JobFactory
— AutowiringSpringBeanJobFactory
for autowiring support in jobs.
Also we create SchedulerFactoryBean
that sets:
- data source(MySQL)
- custom job factory for autowiring
- Quartz properties from config file
- jobs global listener(listener this listen for events of the local scheduler)
JobsService
This service contains methods to manage jobs on the supervisor, worker doesn’t has such service.
package com.github.hronom.spring.boot.quartz.cluster.example.supervisor.services;
import com.github.hronom.spring.boot.quartz.cluster.example.common.job.TestJob1;
import com.github.hronom.spring.boot.quartz.cluster.example.supervisor.controllers.JobStatus;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
import org.quartz.utils.Key;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
@Service
public class JobsService {
private final String groupName = "normal-group";
private final Scheduler scheduler;
@Autowired
public JobsService(SchedulerFactoryBean schedulerFactory) {
this.scheduler = schedulerFactory.getScheduler();
}
public List<String> addNewJobs(int jobs) throws SchedulerException {
LinkedList<String> list = new LinkedList<>();
for (int i = 0; i < jobs; i++) {
list.add(addNewJob());
}
return list.stream().sorted(Comparator.naturalOrder()).collect(Collectors.toList());
}
public String addNewJob() throws SchedulerException {
String id = UUID.randomUUID().toString();
JobDetail job =
newJob(TestJob1.class)
.withIdentity(id, groupName)
// http://www.quartz-scheduler.org/documentation/quartz-2.2.x/configuration/ConfigJDBCJobStoreClustering.html
// https://stackoverflow.com/a/19270566/285571
.requestRecovery(true)
.build();
Trigger trigger =
newTrigger()
.withIdentity(id + "-trigger", groupName)
.startNow()
.withSchedule(
simpleSchedule().withIntervalInSeconds(30)
)
.build();
scheduler.scheduleJob(job, trigger);
return id;
}
public boolean deleteJob(String id) throws SchedulerException {
JobKey jobKey = new JobKey(id, groupName);
return scheduler.deleteJob(jobKey);
}
public List<String> getJobs() throws SchedulerException {
return scheduler
.getJobKeys(GroupMatcher.jobGroupEquals(groupName))
.stream()
.map(Key::getName)
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());
}
/**
* Check realization was inspired by https://stackoverflow.com/a/31479434/285571
*/
public List<JobStatus> getJobsStatuses() throws SchedulerException {
LinkedList<JobStatus> list = new LinkedList<>();
for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))) {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobDetail.getKey());
for (Trigger trigger : triggers) {
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
if (Trigger.TriggerState.COMPLETE.equals(triggerState)) {
list.add(new JobStatus(jobKey.getName(), true));
} else {
list.add(new JobStatus(jobKey.getName(), false));
}
}
}
list.sort(Comparator.comparing(o -> o.id));
return list;
}
}
For test purposes we use method addNewJobs
to add new jobs in batch mode(by default we add 10 jobs)
The method addNewJob
adds new jobs to scheduler by doing next things:
- Create job detail there we acquire job id and group name. Also we set that job must be recoverable. The recoverability of job means that if one node in cluster fails/crashes while processing job, the job will be processed on other alive node. More description about this here and here.
- Creating trigger with interval 30 sec. We want to fire trigger only once.
Also we have other interesting method getJobsStatuses
. It helps check status of jobs across all cluster. For doing that we retrieve all available jobs details and check the state of trigger for this job details. So if trigger state is COMPLETE
this means that trigger has already fired and job now executing.
This realization was inspired by: https://stackoverflow.com/a/31479434/285571
That’s all, full source code available at GitHub: https://github.com/Hronom/spring-boot-quartz-cluster-example
Also I want to say thanks to post that gives me a good starting point: http://www.opencodez.com/java/quartz-scheduler-with-spring-boot.htm
Top comments (1)
Nice jobs