智能科技

当前位置:澳门威斯尼人平台登陆 > 智能科技 > 技术头条,quartz定时任务框架调度机制解析

技术头条,quartz定时任务框架调度机制解析

来源:http://www.tessiz.com 作者:澳门威斯尼人平台登陆 时间:2019-11-21 09:48

原标题:如何高效排查日均调度量超两百万次的重复调度问题? | 技术头条

 

前言

  开心一刻

    晚上回家,爸妈正在吵架,见我回来就都不说话了,看见我妈坐在那里瞪着我爸,我就问老爸“你干什么了惹我妈生这么大气?” 我爸说“没有什么啊,倒是你,这么大了还没有媳妇,要是你有媳妇给我们生一个孙子玩,我们致于吵架吗?”我一听就感觉要坏,老爸你这是来了一招调虎离山啊,实力坑儿子啊,果然我妈改瞪我了,然后完全不理我爸,直接指着我开骂了……图片 1

  路漫漫其修远兮,吾将上下而求索!

  github:

  码云:

  (将Quartz持久化到数据库的做法)

图片 2

转自集群调度机制调研及源码分析

java定时任务调度的实现方式 

  Timer

    这个相信大家都有用过,我也用过,但用的不多;

    特点是:简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务;能实现简单的定时任务,稍微复杂点的定时任务却不好实现。
  ScheduledExecutor

    这个我相信大家也都用过,而且用的比Timer多;正是鉴于Timer的缺陷,Java 5推出了基于线程池设计的ScheduledExecutor;

    特点:每一个被调度的任务都会由线程池中一个线程去执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。

    虽然用ScheduledExecutor和Calendar能够实现复杂任务调度,但实现起来还是比较麻烦,对开发还是不够友善。

  Spring Scheduler

    spring对任务调度的实现支持,可以指定任务的执行时间,但对任务队列和线程池的管控较弱;一般集成于项目中,小任务很方便。
  JCronTab

    JCronTab则是一款完全按照crontab语法编写的java任务调度工具。

    特点:

      可指定任务的执行时间;

      提供完全按照Unix的UNIX-POSIX crontab的格式来规定时间;

      支持多种任务调度的持久化方法,包括普通文件、数据库以及 XML 文件进行持久化;

      JCronTab内置了发邮件功能,可以将任务执行结果方便地发送给需要被通知的人;

      设计和部署是高性能并可扩展。

  Quartz

    本文主角,请往下看
  当然还有XXL-JOB、Elastic-Job、Saturn等等

 

作者 | 余慧娟

quartz2.2.1集群调度机制调研及源码分析
引言
quartz集群架构
调度器实例化
调度过程
触发器的获取
触发trigger:
Job执行过程:
总结:
附:

quartz相关概念

  Scheduler:调度器,进行任务调度;quartz的大脑
  Job:业务job,亦可称业务组件;定时任务的具体执行业务需要实现此接口,调度器会调用此接口的execute方法完成我们的定时业务
  JobDetail:用来定义业务Job的实例,我们可以称之为quartz job,很多时候我们谈到的job指的是JobDetail
  Trigger:触发器,用来定义一个指定的Job何时被执行
  JobBuilder:Job构建器,用来定义或创建JobDetail的实例;JobDetail限定了只能是Job的实例
  TriggerBuilder:触发器构建器,用来定义或创建触发器的实例

  具体为什么要分这么细,大家可以去查阅下相关资料,你会发现很多东西

QRTZ_CALENDARS 以 Blob 类型存储 Quartz 的 Calendar 信息 

责编 | 郭芮

 

工程实现

  pom.xml

图片 3图片 4

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.lee</groupId>    <artifactId>spring-boot-quartz</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <java.version>1.8</java.version>        <maven.compiler.source>1.8</maven.compiler.source>        <maven.compiler.target>1.8</maven.compiler.target>        <druid.version>1.1.10</druid.version>        <pagehelper.version>1.2.5</pagehelper.version>        <druid.version>1.1.10</druid.version>    </properties>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.0.3.RELEASE</version>    </parent>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-thymeleaf</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-quartz</artifactId>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>druid-spring-boot-starter</artifactId>            <version>${druid.version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>        </dependency>        <dependency>            <groupId>com.github.pagehelper</groupId>            <artifactId>pagehelper-spring-boot-starter</artifactId>            <version>${pagehelper.version}</version>        </dependency>        <!-- 日志 -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-logging</artifactId>            <exclusions>            <!-- 排除spring-boot-starter-logging中的全部依赖 -->                <exclusion>                    <groupId>*</groupId>                    <artifactId>*</artifactId>                </exclusion>            </exclusions>            <scope>test</scope>     <!-- 打包的时候不打spring-boot-starter-logging.jar -->        </dependency>        <dependency>            <groupId>ch.qos.logback</groupId>            <artifactId>logback-classic</artifactId>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>        </dependency>    </dependencies>    <build>        <finalName>spring-boot-quartz</finalName>        <plugins>            <!-- 打包项目 mvn clean package -->            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

View Code

  application.xml

图片 5图片 6

server:  port: 9001  servlet:    context-path: /quartzspring:  thymeleaf:    mode: HTML    cache: false  #连接池配置  datasource:    type: com.alibaba.druid.pool.DruidDataSource    druid:      driver-class-name: com.mysql.jdbc.Driver      url: jdbc:mysql://localhost:3306/spring-boot-quartz?useSSL=false&useUnicode=true      username: root      password: 123456      initial-size: 1                     #连接池初始大小      max-active: 20                      #连接池中最大的活跃连接数      min-idle: 1                         #连接池中最小的活跃连接数      max-wait: 60000                     #配置获取连接等待超时的时间      pool-prepared-statements: true    #打开PSCache,并且指定每个连接上PSCache的大小      max-pool-prepared-statement-per-connection-size: 20      validation-query: SELECT 1 FROM DUAL      validation-query-timeout: 30000      test-on-borrow: false             #是否在获得连接后检测其可用性      test-on-return: false             #是否在连接放回连接池后检测其可用性      test-while-idle: true             #是否在连接空闲一段时间后检测其可用性  quartz:    #相关属性配置    properties:      org:        quartz:          scheduler:            instanceName: quartzScheduler            instanceId: AUTO          jobStore:            class: org.quartz.impl.jdbcjobstore.JobStoreTX            driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate            tablePrefix: QRTZ_            isClustered: false            clusterCheckinInterval: 10000            useProperties: false          threadPool:            class: org.quartz.simpl.SimpleThreadPool            threadCount: 10            threadPriority: 5            threadsInheritContextClassLoaderOfInitializingThread: true    #数据库方式    job-store-type: JDBC    #初始化表结构    jdbc:      initialize-schema: NEVER#mybatis配置mybatis:  type-aliases-package: com.lee.quartz.entity  mapper-locations: classpath:mybatis/mapper/*.xml#分页配置, pageHelper是物理分页插件pagehelper:  #4.0.0以后版本可以不设置该参数,该示例中是5.1.4  helper-dialect: mysql  #启用合理化,如果pageNum<1会查询第一页,如果pageNum>pages会查询最后一页  reasonable: truelogging:  level:    com.lee.quartz.mapper: debug

View Code

  这样,quartz就配置好了,应用里面直接用即可

  JobController.java

图片 7图片 8

package com.lee.quartz.web;import com.github.pagehelper.PageInfo;import com.lee.quartz.common.Result;import com.lee.quartz.entity.QuartzJob;import com.lee.quartz.service.IJobService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/job")public class JobController {    private final static Logger LOGGER = LoggerFactory.getLogger(JobController.class);    @Autowired    private IJobService jobService;        @SuppressWarnings({ "unchecked", "rawtypes" })    @PostMapping("/add")    public Result save(QuartzJob quartz){        LOGGER.info("新增任务");        Result result = jobService.saveJob;        return result;    }    @PostMapping("/list")    public PageInfo list(String jobName,Integer pageNo,Integer pageSize){        LOGGER.info("任务列表");        PageInfo pageInfo = jobService.listQuartzJob(jobName, pageNo, pageSize);        return pageInfo;    }    @PostMapping("/trigger")    public  Result trigger(String jobName, String jobGroup) {        LOGGER.info("触发任务");        Result result = jobService.triggerJob(jobName, jobGroup);        return result;    }    @PostMapping("/pause")    public  Result pause(String jobName, String jobGroup) {        LOGGER.info("停止任务");        Result result = jobService.pauseJob(jobName, jobGroup);        return result;    }    @PostMapping("/resume")    public  Result resume(String jobName, String jobGroup) {        LOGGER.info("恢复任务");        Result result = jobService.resumeJob(jobName, jobGroup);        return result;    }    @PostMapping("/remove")    public  Result remove(String jobName, String jobGroup) {        LOGGER.info("移除任务");        Result result = jobService.removeJob(jobName, jobGroup);        return result;    }}

View Code

  JobServiceImpl.java

图片 9图片 10

package com.lee.quartz.service.impl;import com.github.pagehelper.PageHelper;import com.github.pagehelper.PageInfo;import com.lee.quartz.common.Result;import com.lee.quartz.entity.QuartzJob;import com.lee.quartz.mapper.JobMapper;import com.lee.quartz.service.IJobService;import org.quartz.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.List;@Servicepublic class JobServiceImpl implements IJobService {    @Autowired    private Scheduler scheduler;    @Autowired    private JobMapper jobMapper;    @Override    public PageInfo listQuartzJob(String jobName, Integer pageNum, Integer pageSize) {        PageHelper.startPage(pageNum, pageSize);        List<QuartzJob> jobList = jobMapper.listJob;        PageInfo pageInfo = new PageInfo;        return pageInfo;    }    @Override    public Result saveJob(QuartzJob quartz){        try {            //如果是修改  展示旧的 任务            if(quartz.getOldJobGroup() != null && !"".equals(quartz.getOldJobGroup{                JobKey key = new JobKey(quartz.getOldJobName(),quartz.getOldJobGroup;                scheduler.deleteJob;            }            //构建job信息            Class cls = Class.forName(quartz.getJobClassName ;            cls.newInstance();            JobDetail job = JobBuilder.newJob.withIdentity(quartz.getJobName(),                    quartz.getJobGroup                    .withDescription(quartz.getDescription.build();            // 触发时间点            CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(quartz.getCronExpression;            Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger"+quartz.getJobName(), quartz.getJobGroup                    .startNow().withSchedule(cronScheduleBuilder).build();            //交由Scheduler安排触发            scheduler.scheduleJob(job, trigger);        } catch (Exception e) {            e.printStackTrace();            return Result.error();        }        return Result.ok();    }    @Override    public Result triggerJob(String jobName, String jobGroup) {        JobKey key = new JobKey(jobName,jobGroup);        try {            scheduler.triggerJob;        } catch (SchedulerException e) {            e.printStackTrace();            return Result.error();        }        return Result.ok();    }    @Override    public Result pauseJob(String jobName, String jobGroup) {        JobKey key = new JobKey(jobName,jobGroup);        try {            scheduler.pauseJob;        } catch (SchedulerException e) {            e.printStackTrace();            return Result.error();        }        return Result.ok();    }    @Override    public Result resumeJob(String jobName, String jobGroup) {        JobKey key = new JobKey(jobName,jobGroup);        try {            scheduler.resumeJob;        } catch (SchedulerException e) {            e.printStackTrace();            return Result.error();        }        return Result.ok();    }    @Override    public Result removeJob(String jobName, String jobGroup) {        try {            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);            // 停止触发器            scheduler.pauseTrigger(triggerKey);            // 移除触发器            scheduler.unscheduleJob(triggerKey);            // 删除任务            scheduler.deleteJob(JobKey.jobKey(jobName, jobGroup));            System.out.println("removeJob:"+JobKey.jobKey;        } catch (Exception e) {            e.printStackTrace();            return Result.error();        }        return Result.ok();    }}

View Code

  主要就是以上文件,详情请查看spring-boot-quartz

  工程里面数据源用的druid,springboot默认也会将该数据源应用到quartz,如果想给quartz单独配置数据源,可配合@QuartzDataSource来实现(更多quarz数据源问题,请查看spring-boot-2.0.3之quartz集成,数据源问题,源码探究)

  最终效果如下

图片 11

QRTZ_CRON_TRIGGERS 存储 Cron Trigger,包括 Cron表达式和时区信息 

系统自从改用Quartz做任务调度后,一日的调度量均在两百万次以上。随着调度量的增加,突然开始出现job重复调度的情况,且没有规律可循。网上也没有说得较为清楚的解决办法,于是我们开始调试Quartz源码,并最终找到了问题所在。

引言

quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于quratz集群方案的讨论:http://www.iteye.com/topic/40970 ITeye创始人@Robbin在8楼给出了自己对quartz集群应用方案的意见.

后来有人总结了三种quratz集群方案:http://www.iteye.com/topic/114965

1.单独启动一个Job Server来跑job,不部署在web容器中.其他web节点当需要启动异步任务的时候,可以通过种种方式(DB, JMS, Web Service, etc)通知Job Server,而Job Server收到这个通知之后,把异步任务加载到自己的任务队列中去。

2.独立出一个job server,这个server上跑一个spring+quartz的应用,这个应用专门用来启动任务。在jobserver上加上hessain,得到业务接口,这样jobserver就可以调用web container中的业务操作,也就是正真执行任务的还是在cluster中的tomcat。在jobserver启动定时任务之后,轮流调用各地址上的业务操作(类似apache分发tomcat一样),这样可以让不同的定时任务在不同的节点上运行,减低了一台某个node的压力

3.quartz本身事实上也是支持集群的。在这种方案下,cluster上的每一个node都在跑quartz,然后也是通过数据中记录的状态来判断这个操作是否正在执行,这就要求cluster上所有的node的时间应该是一样的。而且每一个node都跑应用就意味着每一个node都需要有自己的线程池来跑quartz.

总的来说,第一种方法,在单独的server上执行任务,对任务的适用范围有很大的限制,要访问在web环境中的各种资源非常麻烦.但是集中式的管理容易从架构上规避了分布式环境的种种同步问题.第二种方法在在第一种方法的基础上减轻了jobserver的重量,只发送调用请求,不直接执行任务,这样解决了独立server无法访问web环境的问题,而且可以做到节点的轮询.可以有效地均衡负载.第三种方案是quartz自身支持的集群方案,在架构上完全是分布式的,没有集中的管理,quratz通过数据库锁以及标识字段保证多个节点对任务不重复获取,并且有负载平衡机制和容错机制,用少量的冗余,换取了高可用性(high avilable HA)和高可靠性.(个人认为和git的机制有异曲同工之处,分布式的冗余设计,换取可靠性和速度).

本文旨在研究quratz为解决分布式任务调度中存在的防止重复执行和负载均衡等问题而建立的机制.以调度流程作为顺序,配合源码理解其中原理.

quratz的配置,及具体应用请参考CRM项目组的另一篇文章:CRM使用Quartz集群总结分享

trigger状态

  org.quartz.impl.jdbcjobstore.Constants中存放了一些列的常量,源代码如下

图片 12图片 13

/*  * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved. *  * Licensed under the Apache License, Version 2.0 (the "License"); you may not  * use this file except in compliance with the License. You may obtain a copy  * of the License at  *  *   http://www.apache.org/licenses/LICENSE-2.0  *    * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the  * License for the specific language governing permissions and limitations  * under the License. *  */package org.quartz.impl.jdbcjobstore;/** * <p> * This interface can be implemented by any <code>{@link * org.quartz.impl.jdbcjobstore.DriverDelegate}</code> * class that needs to use the constants contained herein. * </p> *  * @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a> * @author James House */public interface Constants {    /*     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~     *      * Constants.     *      * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~     */    // Table names    String TABLE_JOB_DETAILS = "JOB_DETAILS";    String TABLE_TRIGGERS = "TRIGGERS";    String TABLE_SIMPLE_TRIGGERS = "SIMPLE_TRIGGERS";    String TABLE_CRON_TRIGGERS = "CRON_TRIGGERS";    String TABLE_BLOB_TRIGGERS = "BLOB_TRIGGERS";    String TABLE_FIRED_TRIGGERS = "FIRED_TRIGGERS";    String TABLE_CALENDARS = "CALENDARS";    String TABLE_PAUSED_TRIGGERS = "PAUSED_TRIGGER_GRPS";    String TABLE_LOCKS = "LOCKS";    String TABLE_SCHEDULER_STATE = "SCHEDULER_STATE";    // TABLE_JOB_DETAILS columns names        String COL_SCHEDULER_NAME = "SCHED_NAME";        String COL_JOB_NAME = "JOB_NAME";    String COL_JOB_GROUP = "JOB_GROUP";    String COL_IS_DURABLE = "IS_DURABLE";    String COL_IS_VOLATILE = "IS_VOLATILE";    String COL_IS_NONCONCURRENT = "IS_NONCONCURRENT";    String COL_IS_UPDATE_DATA = "IS_UPDATE_DATA";    String COL_REQUESTS_RECOVERY = "REQUESTS_RECOVERY";    String COL_JOB_DATAMAP = "JOB_DATA";    String COL_JOB_CLASS = "JOB_CLASS_NAME";    String COL_DESCRIPTION = "DESCRIPTION";    // TABLE_TRIGGERS columns names    String COL_TRIGGER_NAME = "TRIGGER_NAME";    String COL_TRIGGER_GROUP = "TRIGGER_GROUP";    String COL_NEXT_FIRE_TIME = "NEXT_FIRE_TIME";    String COL_PREV_FIRE_TIME = "PREV_FIRE_TIME";    String COL_TRIGGER_STATE = "TRIGGER_STATE";    String COL_TRIGGER_TYPE = "TRIGGER_TYPE";    String COL_START_TIME = "START_TIME";    String COL_END_TIME = "END_TIME";    String COL_PRIORITY = "PRIORITY";    String COL_MISFIRE_INSTRUCTION = "MISFIRE_INSTR";    String ALIAS_COL_NEXT_FIRE_TIME = "ALIAS_NXT_FR_TM";    // TABLE_SIMPLE_TRIGGERS columns names    String COL_REPEAT_COUNT = "REPEAT_COUNT";    String COL_REPEAT_INTERVAL = "REPEAT_INTERVAL";    String COL_TIMES_TRIGGERED = "TIMES_TRIGGERED";    // TABLE_CRON_TRIGGERS columns names    String COL_CRON_EXPRESSION = "CRON_EXPRESSION";    // TABLE_BLOB_TRIGGERS columns names    String COL_BLOB = "BLOB_DATA";    String COL_TIME_ZONE_ID = "TIME_ZONE_ID";    // TABLE_FIRED_TRIGGERS columns names    String COL_INSTANCE_NAME = "INSTANCE_NAME";    String COL_FIRED_TIME = "FIRED_TIME";    String COL_SCHED_TIME = "SCHED_TIME";        String COL_ENTRY_ID = "ENTRY_ID";    String COL_ENTRY_STATE = "STATE";    // TABLE_CALENDARS columns names    String COL_CALENDAR_NAME = "CALENDAR_NAME";    String COL_CALENDAR = "CALENDAR";    // TABLE_LOCKS columns names    String COL_LOCK_NAME = "LOCK_NAME";    // TABLE_LOCKS columns names    String COL_LAST_CHECKIN_TIME = "LAST_CHECKIN_TIME";    String COL_CHECKIN_INTERVAL = "CHECKIN_INTERVAL";    // MISC CONSTANTS    String DEFAULT_TABLE_PREFIX = "QRTZ_";    // STATES    String STATE_WAITING = "WAITING";    String STATE_ACQUIRED = "ACQUIRED";    String STATE_EXECUTING = "EXECUTING";    String STATE_COMPLETE = "COMPLETE";    String STATE_BLOCKED = "BLOCKED";    String STATE_ERROR = "ERROR";    String STATE_PAUSED = "PAUSED";    String STATE_PAUSED_BLOCKED = "PAUSED_BLOCKED";    String STATE_DELETED = "DELETED";    /**     * @deprecated Whether a trigger has misfired is no longer a state, but      * rather now identified dynamically by whether the trigger's next fire      * time is more than the misfire threshold time in the past.     */    String STATE_MISFIRED = "MISFIRED";    String ALL_GROUPS_PAUSED = "_$_ALL_GROUPS_PAUSED_$_";    // TRIGGER TYPES    /** Simple Trigger type. */    String TTYPE_SIMPLE = "SIMPLE";    /** Cron Trigger type. */    String TTYPE_CRON = "CRON";    /** Calendar Interval Trigger type. */    String TTYPE_CAL_INT = "CAL_INT";    /** Daily Time Interval Trigger type. */    String TTYPE_DAILY_TIME_INT = "DAILY_I";    /** A general blob Trigger type. */    String TTYPE_BLOB = "BLOB";}// EOF

View Code

  里面有quartz的表名、各个表包含的列名、trigger状态、trigger类型等内容

  状态包括

    WAITING:等待中
    ACQUIRED:将触发,此时还未到trigger真正的触发时刻
    EXECUTING:触发,亦可理解成执行中,trigger真正的触发时刻
    COMPLETE:完成,不再触发
    BLOCKED:受阻,不允许并发执行job时会出现(@DisallowConcurrentExecution)
    ERROR:出错
    PAUSED:暂停中
    PAUSED_BLOCKED:暂停受阻,不允许并发执行job时会出现(@DisallowConcurrentExecution)
    DELETED:已删除
    MISFIRED:触发失败,已弃用,有另外的替代方式

  状态变化流程图如下所示

图片 14

  trigger的初始状态是WAITING,处于WAITING状态的trigger等待被触发。调度线程会不停地扫triggers表,根据NEXT_FIRE_TIME提前拉取即将触发的trigger,如果这个trigger被该调度线程拉取到,它的状态就会变为ACQUIRED。因为是提前拉取trigger,并未到达trigger真正的触发时刻,所以调度线程会等到真正触发的时刻,再将trigger状态由ACQUIRED改为EXECUTING。如果这个trigger不再执行,就将状态改为COMPLETE,否则为WAITING,开始新的周期。如果这个周期中的任何环节抛出异常,trigger的状态会变成ERROR。如果手动暂停这个trigger,状态会变成PAUSED。

QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相联 Job的执行信息 QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息 

如果没有耐性看完源码解析,可以直接拉到文章最末,有直接简单的解决办法。本文中使用的Quartz版本为2.3.0,且使用JDBC模式存储Job。

quartz集群架构

图片 15

quartz的分布式架构如上图,可以看到数据库是各节点上调度器的枢纽.各个节点并不感知其他节点的存在,只是通过数据库来进行间接的沟通.

实际上,quartz的分布式策略就是一种以数据库作为边界资源的并发策略.每个节点都遵守相同的操作规范,使得对数据库的操作可以串行执行.而不同名称的调度器又可以互不影响的并行运行.

组件间的通讯图如下:(*注:主要的sql语句附在文章最后)

图片 16

quartz运行时由QuartzSchedulerThread类作为主体,循环执行调度流程。JobStore作为中间层,按照quartz的并发策略执行数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放入线程池运行。LockHandler负责获取LOCKS表中的数据库锁。

整个quartz对任务调度的时序大致如下:

图片 17

梳理一下其中的流程,可以表示为:

0.调度器线程run()

1.获取待触发trigger

    1.1数据库LOCKS表TRIGGER_ACCESS行加锁

    1.2读取JobDetail信息

    1.3读取trigger表中触发器信息并标记为"已获取"

    1.4commit事务,释放锁

2.触发trigger

    2.1数据库LOCKS表STATE_ACCESS行加锁

    2.2确认trigger的状态

    2.3读取trigger的JobDetail信息

    2.4读取trigger的Calendar信息

    2.3更新trigger信息

    2.3commit事务,释放锁

3实例化并执行Job

    3.1从线程池获取线程执行JobRunShell的run方法

可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执行操作前获取锁 操作完成后释放锁.这一规则可以看做是quartz解决集群问题的核心思想.

规则流程图:

图片 18

进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.

集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.

下面我们深入源码,从微观上观察quartz集群调度的细节

总结

  Quartz作为一个开源的作业调度框架,提供了巨大的灵活性而不牺牲简单性。我们能够用它来为执行一个作业而创建简单的或复杂的调度。它有很多特征,如:数据库、集群、插件、JavaMail支持,EJB作业预构建,支持cron-like表达式等等;

  springboot集成quartz非常简单,最简单的情况下只需要引入依赖我们就可以享受quartz提供的功能,springboot默认会帮我们配置好quartz;当然我们也可以自定义配置来实现quartz的定制;

QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler实例(假如是用于一个集群中) 

准备

调度器实例化

一个最简单的quartz helloworld应用如下:

图片 19

public class HelloWorldMain {
    Log log = LogFactory.getLog(HelloWorldMain.class);

    public void run() {
        try {
            //取得Schedule对象
            SchedulerFactory sf = new StdSchedulerFactory();
            Scheduler sch = sf.getScheduler(); 

            JobDetail jd = new JobDetail("HelloWorldJobDetail",Scheduler.DEFAULT_GROUP,HelloWorldJob.class);
            Trigger tg = TriggerUtils.makeMinutelyTrigger(1);
            tg.setName("HelloWorldTrigger");

            sch.scheduleJob(jd, tg);
            sch.start();
        } catch ( Exception e ) {
            e.printStackTrace();

        }
    }
    public static void main(String[] args) {
        HelloWorldMain hw = new HelloWorldMain();
        hw.run();
    }
}

图片 20

我们看到初始化一个调度器需要用工厂类获取实例:

SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sch = sf.getScheduler(); 

然后启动:

sch.start();
下面跟进StdSchedulerFactory的getScheduler()方法:

图片 21

public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        //从"调度器仓库"中根据properties的SchedulerName配置获取一个调度器实例
        Scheduler sched = schedRep.lookup(getSchedulerName());
        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
        //初始化调度器
        sched = instantiate();
        return sched;
    }

图片 22

跟进初始化调度器方法sched = instantiate();发现是一个700多行的初始化方法,涉及到

  • 读取配置资源,
  • 生成QuartzScheduler对象,
  • 创建该对象的运行线程,并启动线程;
  • 初始化JobStore,QuartzScheduler,DBConnectionManager等重要组件,
    至此,调度器的初始化工作已完成,初始化工作中quratz读取了数据库中存放的对应当前调度器的锁信息,对应CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS两个LOCK_NAME.

图片 23

public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {
        if (dsName == null) {
            throw new SchedulerConfigException("DataSource name not set.");
        }
        classLoadHelper = loadHelper;
        if(isThreadsInheritInitializersClassLoadContext()) {
            log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
            initializersLoader = Thread.currentThread().getContextClassLoader();
        }

        this.schedSignaler = signaler;
        // If the user hasn't specified an explicit lock handler, then
        // choose one based on CMT/Clustered/UseDBLocks.
        if (getLockHandler() == null) {

            // If the user hasn't specified an explicit lock handler,
            // then we *must* use DB locks with clustering
            if (isClustered()) {
                setUseDBLocks(true);
            }

            if (getUseDBLocks()) {
                if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
                    if(getSelectWithLockSQL() == null) {
                        //读取数据库LOCKS表中对应当前调度器的锁信息
                        String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
                        getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
                        setSelectWithLockSQL(msSqlDflt);
                    }
                }
                getLog().info("Using db table-based data access locking (synchronization).");
                setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
            } else {
                getLog().info(
                    "Using thread monitor-based data access locking (synchronization).");
                setLockHandler(new SimpleSemaphore());
            }
        }
    }

图片 24

当调用sch.start();方法时,scheduler做了如下工作:

1.通知listener开始启动

2.启动调度器线程

3.启动plugin

4.通知listener启动完成

图片 25

public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        //通知该调度器的listener启动开始
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
            //启动调度器的线程
            this.resources.getJobStore().schedulerStarted();            
            //启动plugins
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        //通知该调度器的listener启动完成
        notifySchedulerListenersStarted();
    }

图片 26

参考

  几种任务调度的Java实现方法与比较

  小柒2012 / spring-boot-quartz

  boot-features-quartz

  作业调度系统—Quartz

  记一次Quartz重复调度的问题排查

  Quartz FAQ

QRTZ_LOCKS 存储程序的悲观锁的信息(假如使用了悲观锁) 

首先,因为本文是代码级别的分析文章,因而需要提前了解Quartz的用途和用法,网上有很多不错的文章,可以提前自行了解。

调度过程

调度器启动后,调度器的线程就处于运行状态了,开始执行quartz的主要工作–调度任务.

前面已介绍过,任务的调度过程大致分为三步:

1.获取待触发trigger

2.触发trigger

3.实例化并执行Job

下面分别分析三个阶段的源码.

QuartzSchedulerThread是调度器线程类,调度过程的三个步骤就承载在run()方法中,分析见代码注释:

按 Ctrl+C 复制代码

按 Ctrl+C 复制代码

调度器每次获取到的trigger是30s内需要执行的,所以要等待一段时间至trigger执行前2ms.在等待过程中涉及到一个新加进来更紧急的trigger的处理逻辑.分析写在注释中,不再赘述.

可以看到调度器的只要在运行状态,就会不停地执行调度流程.值得注意的是,在流程的最后线程会等待一个随机的时间.这就是quartz自带的负载平衡机制.

以下是三个步骤的跟进:

QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息 

其次,在用法之外,我们还需要了解一些Quartz框架的基础概念:

触发器的获取

调度器调用:

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

在数据库中查找一定时间范围内将会被触发的trigger.参数的意义如下:参数1:nolaterthan = now+3000ms,即未来30s内将会被触发.参数2 最大获取数量,大小取线程池线程剩余量与定义值得较小者.参数3 时间窗口 默认为0,程序会在nolaterthan后加上窗口大小来选择trigger.quratz会在每次触发trigger后计算出trigger下次要执行的时间,并在数据库QRTZ2_TRIGGERS中的NEXT_FIRE_TIME字段中记录.查找时将当前毫秒数与该字段比较,就能找出下一段时间内将会触发的触发器.查找时,调用在JobStoreSupport类中的方法:

图片 27

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {

        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) {
            lockName = LOCK_TRIGGER_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName,
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        //...异常处理回调方法
                    }
                });
    }

图片 28

该方法关键的一点在于执行了executeInNonManagedTXLock()方法,这一方法指定了一个锁名,两个回调函数.在开始执行时获得锁,在方法执行完毕后随着事务的提交锁被释放.在该方法的底层,使用 for update语句,在数据库中加入行级锁,保证了在该方法执行过程中,其他的调度器对trigger进行获取时将会等待该调度器释放该锁.此方法是前面介绍的quartz集群策略的的具体实现,这一模板方法在后面的trigger触发过程还会被使用.

public static final String SELECT_FOR_LOCK = "SELECT * FROM "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
            " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

进一步解释:quratz在获取数据库资源之前,先要以for update方式访问LOCKS表中相应LOCK_NAME数据将改行锁定.如果在此前该行已经被锁定,那么等待,如果没有被锁定,那么读取满足要求的trigger,并把它们的status置为STATE_ACQUIRED,如果有tirgger已被置为STATE_ACQUIRED,那么说明该trigger已被别的调度器实例认领,无需再次认领,调度器会忽略此trigger.调度器实例之间的间接通信就体现在这里.

JobStoreSupport.acquireNextTrigger()方法中:

int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);

最后释放锁,这时如果下一个调度器在排队获取trigger的话,则仍会执行相同的步骤.这种机制保证了trigger不会被重复获取.按照这种算法正常运行状态下调度器每次读取的trigger中会有相当一部分已被标记为被获取.

获取trigger的过程进行完毕.

QRTZ_JOB_LISTENERS 存储有关已配置的 JobListener 的信息 

  • Quartz把触发job叫做fire。TRIGGERSTATE是当前trigger的状态,PREVFIRE_TIME是上一次触发的时间,NEXTFIRETIME是下一次触发的时间,misfire是指这个job在某一时刻要触发、却因为某些原因没有触发的情况。
  • Quartz在运行时,会起两类线程(不止两类),一类用于调度job的调度线程(单线程),一类是用于执行job具体业务的工作池。
  • Quartz自带的表里面,本文将涉及其中3张表:

触发trigger:

QuartzSchedulerThread line336:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

调用JobStoreSupport类的triggersFired()方法:

图片 29

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
        return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
                new TransactionCallback<List<TriggerFiredResult>>() {
                    public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                        List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
                        TriggerFiredResult result;
                        for (OperableTrigger trigger : triggers) {
                            try {
                              TriggerFiredBundle bundle = triggerFired(conn, trigger);
                              result = new TriggerFiredResult(bundle);
                            } catch (JobPersistenceException jpe) {
                                result = new TriggerFiredResult(jpe);
                            } catch(RuntimeException re) {
                                result = new TriggerFiredResult(re);
                            }
                            results.add(result);
                        }
                        return results;
                    }
                },
                new TransactionValidator<List<TriggerFiredResult>>() {
                    @Override
                    public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException {
                        //...异常处理回调方法
                    }
                });
    }

图片 30

此处再次用到了quratz的行为规范:executeInNonManagedTXLock()方法,在获取锁的情况下对trigger进行触发操作.其中的触发细节如下:

图片 31

protected TriggerFiredBundle triggerFired(Connection conn,
            OperableTrigger trigger)
        throws JobPersistenceException {
        JobDetail job;
        Calendar cal = null;
        // Make sure trigger wasn't deleted, paused, or completed...
        try { // if trigger was deleted, state will be STATE_DELETED
            String state = getDelegate().selectTriggerState(conn,
                    trigger.getKey());
            if (!state.equals(STATE_ACQUIRED)) {
                return null;
            }
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't select trigger state: "
                    + e.getMessage(), e);
        }
        try {
            job = retrieveJob(conn, trigger.getJobKey());
            if (job == null) { return null; }
        } catch (JobPersistenceException jpe) {
            try {
                getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                getDelegate().updateTriggerState(conn, trigger.getKey(),
                        STATE_ERROR);
            } catch (SQLException sqle) {
                getLog().error("Unable to set trigger state to ERROR.", sqle);
            }
            throw jpe;
        }
        if (trigger.getCalendarName() != null) {
            cal = retrieveCalendar(conn, trigger.getCalendarName());
            if (cal == null) { return null; }
        }
        try {
            getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't insert fired trigger: "
                    + e.getMessage(), e);
        }
        Date prevFireTime = trigger.getPreviousFireTime();
        // call triggered - to update the trigger's next-fire-time state...
        trigger.triggered(cal);
        String state = STATE_WAITING;
        boolean force = true;

        if (job.isConcurrentExectionDisallowed()) {
            state = STATE_BLOCKED;
            force = false;
            try {
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_BLOCKED, STATE_WAITING);
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_BLOCKED, STATE_ACQUIRED);
                getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                        STATE_PAUSED_BLOCKED, STATE_PAUSED);
            } catch (SQLException e) {
                throw new JobPersistenceException(
                        "Couldn't update states of blocked triggers: "
                                + e.getMessage(), e);
            }
        }

        if (trigger.getNextFireTime() == null) {
            state = STATE_COMPLETE;
            force = true;
        }
        storeTrigger(conn, trigger, job, true, state, force, false);
        job.getJobDataMap().clearDirtyFlag();
        return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
                .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
                .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
    }

图片 32

该方法做了以下工作:

1.获取trigger当前状态

2.通过trigger中的JobKey读取trigger包含的Job信息

3.将trigger更新至触发状态

4.结合calendar的信息触发trigger,涉及多次状态更新

5.更新数据库中trigger的信息,包括更改状态至STATE_COMPLETE,及计算下一次触发时间.

6.返回trigger触发结果的数据传输类TriggerFiredBundle

 

从该方法返回后,trigger的执行过程已基本完毕.回到执行quratz操作规范的executeInNonManagedTXLock方法,将数据库锁释放.

trigger触发操作完成

QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数,间隔,以及已触的次数 

Job执行过程:

再回到线程类QuartzSchedulerThread的 line353这时触发器都已出发完毕,job的详细信息都已就位

QuartzSchedulerThread line:368

 

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
shell.initialize(qs);

 

为每个Job生成一个可运行的RunShell,并放入线程池运行.

在最后调度线程生成了一个随机的等待时间,进入短暂的等待,这使得其他节点的调度器都有机会获取数据库资源.如此就实现了quratz的负载平衡.

这样一次完整的调度过程就结束了.调度器线程进入下一次循环.

QRTZ_BLOG_TRIGGERS Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候) 

  • triggers表。triggers表里记录了某个trigger的PREVFIRETIME(上次触发时间),NEXT_FIRETIME(下一次触发时间),TRIGGERSTATE(当前状态)。虽未尽述,但是本文用到的只有这些。
  • locks表。Quartz支持分布式,也就是会存在多个线程同时抢占相同资源的情况,而Quartz正是依赖这张表处理这种状况,具体见下文。
  • fired_triggers表。记录正在触发的triggers信息。

总结:

简单地说,quartz的分布式调度策略是以数据库为边界资源的一种异步策略.各个调度器都遵守一个基于数据库锁的操作规则保证了操作的唯一性.同时多个节点的异步运行保证了服务的可靠.但这种策略有自己的局限性.摘录官方文档中对quratz集群特性的说明:

Only one node will fire the job for each firing. What I mean by that is, if the job has a repeating trigger that tells it to fire every 10 seconds, then at 12:00:00 exactly one node will run the job, and at 12:00:10 exactly one node will run the job, etc. It won't necessarily be the same node each time - it will more or less be random which node runs it. The load balancing mechanism is near-random for busy schedulers (lots of triggers) but favors the same node for non-busy (e.g. few triggers) schedulers. 

The clustering feature works best for scaling out long-running and/or cpu-intensive jobs (distributing the work-load over multiple nodes). If you need to scale out to support thousands of short-running (e.g 1 second) jobs, consider partitioning the set of jobs by using multiple distinct schedulers (including multiple clustered schedulers for HA). The scheduler makes use of a cluster-wide lock, a pattern that degrades performance as you add more nodes (when going beyond about three nodes - depending upon your database's capabilities, etc.).

说明指出,集群特性对于高cpu使用率的任务效果很好,但是对于大量的短任务,各个节点都会抢占数据库锁,这样就出现大量的线程等待资源.这种情况随着节点的增加会越来越严重.

QRTZ_TRIGGER_LISTENERS 存储已配置的 TriggerListener 的信息 

附:

通讯图中关键步骤的主要sql语句:

图片 33

3.
select TRIGGER_ACCESS from QRTZ2_LOCKS for update
4.
SELECT TRIGGER_NAME,
TRIGGER_GROUP,
NEXT_FIRE_TIME,
PRIORITY
FROM QRTZ2_TRIGGERS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND TRIGGER_STATE = 'ACQUIRED'
AND NEXT_FIRE_TIME <= '{timekey 30s latter}'
AND ( MISFIRE_INSTR = -1
OR ( MISFIRE_INSTR != -1
AND NEXT_FIRE_TIME >= '{timekey now}' ) )
ORDER BY NEXT_FIRE_TIME ASC,
PRIORITY DESC;
5.
SELECT *
FROM QRTZ2_JOB_DETAILS
WHERE SCHEDULER_NAME = CRMscheduler
AND JOB_NAME = ?
AND JOB_GROUP = ?;
6.
UPDATE TQRTZ2_TRIGGERS
SET TRIGGER_STATE = 'ACQUIRED'
WHERE SCHED_NAME = 'CRMscheduler'
AND TRIGGER_NAME = '{triggerName}'
AND TRIGGER_GROUP = '{triggerGroup}'
AND TRIGGER_STATE = 'waiting';
7.
INSERT INTO QRTZ2_FIRED_TRIGGERS
(SCHEDULER_NAME,
ENTRY_ID,
TRIGGER_NAME,
TRIGGER_GROUP,
INSTANCE_NAME,
FIRED_TIME,
SCHED_TIME,
STATE,
JOB_NAME,
JOB_GROUP,
IS_NONCONCURRENT,
REQUESTS_RECOVERY,
PRIORITY)
VALUES( 'CRMscheduler', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
8.
commit;
12.
select STAT_ACCESS from QRTZ2_LOCKS for update
13.
SELECT TRIGGER_STATE FROM QRTZ2_TRIGGERS WHERE SCHEDULER_NAME = 'CRMscheduler' AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?;
14.
SELECT TRIGGER_STATE
FROM QRTZ2_TRIGGERS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND TRIGGER_NAME = ?
AND TRIGGER_GROUP = ?;
14.
SELECT *
FROM QRTZ2_JOB_DETAILS
WHERE SCHEDULER_NAME = CRMscheduler
AND JOB_NAME = ?
AND JOB_GROUP = ?;
15.
SELECT *
FROM QRTZ2_CALENDARS
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND CALENDAR_NAME = ?;
16.
UPDATE QRTZ2_FIRED_TRIGGERS
SET INSTANCE_NAME = ?,
FIRED_TIME = ?,
SCHED_TIME = ?,
ENTRY_STATE = ?,
JOB_NAME = ?,
JOB_GROUP = ?,
IS_NONCONCURRENT = ?,
REQUESTS_RECOVERY = ?
WHERE SCHEDULER_NAME = 'CRMscheduler'
AND ENTRY_ID = ?;
17.
UPDATE TQRTZ2_TRIGGERS
SET TRIGGER_STATE = ?
WHERE SCHED_NAME = 'CRMscheduler'
AND TRIGGER_NAME = '{triggerName}'
AND TRIGGER_GROUP = '{triggerGroup}'
AND TRIGGER_STATE = ?;
18.
UPDATE QRTZ2_TRIGGERS
SET JOB_NAME = ?,
JOB_GROUP = ?,
DESCRIPTION = ?,
NEXT_FIRE_TIME = ?,
PREV_FIRE_TIME = ?,
TRIGGER_STATE = ?,
TRIGGER_TYPE = ?,
START_TIME = ?,
END_TIME = ?,
CALENDAR_NAME = ?,
MISFIRE_INSTRUCTION = ?,
PRIORITY = ?,
JOB_DATAMAP = ?
WHERE SCHEDULER_NAME = SCHED_NAME_SUBST
AND TRIGGER_NAME = ?
AND TRIGGER_GROUP = ?;
19.
commit;

图片 34

原文地址:

QRTZ_TRIGGERS 存储已配置的 Trigger 的信息 

  • TRIGGER_STATE,也就是trigger的状态,主要有以下几类:

图片 35

 

图 1 trigger状态变化图

quartz 持久化数据库表格字段解释

trigger的初始状态是WAITING,处于WAITING状态的trigger等待被触发。调度线程会不停地扫triggers表,根据NEXTFIRETIME提前拉取即将触发的trigger,如果这个trigger被该调度线程拉取到,它的状态就会变为ACQUIRED。因为是提前拉取trigger,并未到达trigger真正的触发时刻,所以调度线程会等到真正触发的时刻,再将trigger状态由ACQUIRED改为EXECUTING。如果这个trigger不再执行,就将状态改为COMPLETE,否则为WAITING,开始新的周期。如果这个周期中的任何环节抛出异常,trigger的状态会变成ERROR。如果手动暂停这个trigger,状态会变成PAUSED。

建表,SQL语句在quartz-1.6.6docsdbTables文件夹中可以找到,介绍下主要的几张表: 
       表qrtz_job_details: 保存job详细信息,该表需要用户根据实际情况初始化 
       job_name:集群中job的名字,该名字用户自己可以随意定制,无强行要求 
       job_group:集群中job的所属组的名字,该名字用户自己随意定制,无强行要求 
       job_class_name:集群中个note job实现类的完全包名,quartz就是根据这个路径到classpath找到该job类 
       is_durable:是否持久化,把该属性设置为1,quartz会把job持久化到数据库中 
       job_data:一个blob字段,存放持久化job对象 

开始排查

       表qrtz_triggers: 保存trigger信息 
       trigger_name: trigger的名字,该名字用户自己可以随意定制,无强行要求 
       trigger_group:trigger所属组的名字,该名字用户自己随意定制,无强行要求 
       job_name: qrtz_job_details表job_name的外键 
       job_group: qrtz_job_details表job_group的外键 
       trigger_state:当前trigger状态,设置为ACQUIRED,如果设置为WAITING,则job不会触发 
       trigger_cron:触发器类型,使用cron表达式 

分布式状态下的数据访问

       表qrtz_cron_triggers:存储cron表达式表 
       trigger_name: qrtz_triggers表trigger_name的外键 
       trigger_group: qrtz_triggers表trigger_group的外键 
       cron_expression:cron表达式 
       
       表qrtz_scheduler_state:存储集群中note实例信息,quartz会定时读取该表的信息判断集群中每个实例的当前状态 
       instance_name:之前配置文件中org.quartz.scheduler.instanceId配置的名字,就会写入该字段,如果设置为AUTO,quartz会根据物理机名和当前时间产生一个名字 
       last_checkin_time:上次检查时间 
       checkin_interval:检查间隔时间 

前文提到,trigger的状态储存在数据库,Quartz支持分布式,所以如果起了多个Quartz服务,会有多个调度线程来抢夺触发同一个trigger。MySQL在默认情况下执行select 语句,是不上锁的,那么如果同时有1个以上的调度线程抢到同一个trigger,是否会导致这个trigger重复调度呢?我们来看看,Quartz是如何解决这个问题的。

步骤4
 配置quartz.properties文件:
#调度标识名 集群中每一个实例都必须使用相同的名称 org.quartz.scheduler.instanceName = scheduler

首先,我们先来看下JobStoreSupport类的executeInNonManagedTXLock()方法:

#ID设置为自动获取 每一个必须不同 org.quartz.scheduler.instanceId = AUTO

图片 36

#数据保存方式为持久化 org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

图 2 executeInNonManagedTXLock方法的具体实现

#数据库平台 org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate #数据库别名 随便取org.quartz.jobStore.dataSource = myXADS

这个方法的官方介绍:

#表的前缀 org.quartz.jobStore.tablePrefix = QRTZ_

/**

#设置为TRUE不会出现序列化非字符串类到 BLOB 时产生的类版本问题 org.quartz.jobStore.useProperties = true

* Executethe given callback having acquired the given lock.

#加入集群 org.quartz.jobStore.isClustered = true

*Depending onthe JobStore,the surrounding transaction maybe

#调度实例失效的检查时间间隔 org.quartz.jobStore.clusterCheckinInterval = 20000 

*assumed tobe already present(managed).

#容许的最大作业延长时间 org.quartz.jobStore.misfireThreshold = 60000

*

#ThreadPool 实现的类名 org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

*@param lockName The name of the lock toacquire, forexample

#线程数量 org.quartz.threadPool.threadCount = 10

* "TRIGGER_ACCESS". Ifnull, thenno lock isacquired ,but the

#线程优先级 org.quartz.threadPool.threadPriority = 5

*lockCallback isstill executed ina transaction.

#自创建父线程 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true 

*/

#设置数据源org.quartz.dataSource.myXADS.jndiURL = CT

也就是说,传入的callback方法在执行过程中携带了指定的锁,并开启了事务,注释也提到,lockName就是指定的锁的名字,如果lockName是空的,那么callback方法的执行不在锁的保护下,但依然在事务中。

#jbdi类名 org.quartz.dataSource.myXADS.java.naming.factory.initial = weblogic.jndi.WLInitialContextFactory #URLorg.quartz.dataSource.myXADS.java.naming.provider.url = t3://localhost:7001

这意味着,我们使用这个方法,不仅可以保证事务,还可以选择保证callback方法的线程安全。

 

接下来,我们来看一下executeInNonManagedTXLock(…)中的obtainLock(conn,lockName)方法,即抢锁的过程。这个方法是在Semaphore接口中定义的,Semaphore接口通过锁住线程或者资源,来保护资源不被其他线程修改,由于我们的调度信息是存在数据库的,所以现在查看DBSemaphore.java中obtainLock方法的具体实现:

【注】:在J2EE工程中如果想用数据库管理Quartz的相关信息,就一定要配置数据源,这是Quartz的要求。

图片 37

图 3 obtainLock方法具体实现

我们通过调试查看expandedSQL和expandedInsertSQL这两个变量:

图片 38

图 4 expandedSQL和expandedInsertSQL的具体内容

图4可以看出,obtainLock方法通过locks表的一个行锁(lockName确定)来保证callback方法的事务和线程安全。拿到锁后,obtainLock方法将lockName写入threadlocal。当然在releaseLock的时候,会将lockName从threadlocal中删除。

总而言之,executeInNonManagedTXLock()方法保证了在分布式的情况下,同一时刻只有一个线程可以执行这个方法。

Quartz的调度过程

图片 39

图 5 Quartz的调度时序图

QuartzSchedulerThread是调度线程的具体实现,图5是这个线程run()方法的主要内容,图中只提到了正常的情况下,也就是流程中没有出现异常的情况下的处理过程。由图可以看出,调度流程主要分为以下三步:

1、拉取待触发trigger:

调度线程会一次性拉取距离现在一定时间窗口内的、一定数量内的、即将触发的trigger信息。那么,时间窗口和数量信息如何确定呢?我们先来看一下,以下几个参数:

  • idleWaitTime: 默认30s,可通过配置属性org.quartz.scheduler.idleWaitTime设置。
  • availThreadCount:获取可用(空闲)的工作线程数量,总会大于1,因为该方法会一直阻塞,直到有工作线程空闲下来。
  • maxBatchSize:一次拉取trigger的最大数量,默认是1,可通过org.quartz.scheduler.batchTriggerAcquisitionMaxCount改写。
  • batchTimeWindow:时间窗口调节参数,默认是0,可通过org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改写。
  • misfireThreshold: 超过这个时间还未触发的trigger,被认为发生了misfire,默认60s,可通过org.quartz.jobStore.misfireThreshold设置。

调度线程一次会拉取NEXT_FIRETIME小于(now

  • idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个triggers,默认情况下,会拉取未来30s、过去60s之间还未fire的1个trigger。随后将这些triggers的状态由WAITING改为ACQUIRED,并插入firedtriggers表。

2、触发trigger:

首先,我们会检查每个trigger的状态是不是ACQUIRED,如果是,则将状态改为EXECUTING,然后更新trigger的NEXTFIRETIME,如果这个trigger的NEXTFIRETIME为空,也就是未来不再触发,就将其状态改为COMPLETE。如果trigger不允许并发执行(即Job的实现类标注了@DisallowConcurrentExecution),则将状态变为BLOCKED,否则就将状态改为WAITING。

3、包装trigger,丢给工作线程池:

遍历triggers,如果其中某个trigger在第二步出错,即返回值里面有exception或者为null,就会做一些triggers表,fired_triggers表的内容修正,跳过这个trigger,继续检查下一个。否则,则根据trigger信息实例化JobRunShell(实现了Thread接口),同时依据JOB_CLASS_NAME实例化Job,随后我们将JobRunShell实例丢入工作线。

在JobRunShell的run()方法,Quartz会在执行job.execute()的前后通知之前绑定的监听器,如果job.execute()执行的过程中有异常抛出,则执行结果jobExEx会保存异常信息,反之如果没有异常抛出,则jobExEx为null。然后根据jobExEx的不同,得到不同的执行指令instCode。

JobRunShell将trigger信息,job信息和执行指令传给triggeredJobComplete()方法来完成最后的数据表更新操作。例如如果job执行过程有异常抛出,就将这个trigger状态变为ERROR,如果是BLOCKED状态,就将其变为WAITING等等,最后从fired_triggers表中删除这个已经执行完成的trigger。注意,这些是在工作线程池异步完成。

排查问题

在前文,我们可以看到,Quartz的调度过程中有3次(可选的)上锁行为,为什么称为可选?因为这三个步骤虽然在executeInNonManagedTXLock方法的保护下,但executeInNonManagedTXLock方法可以通过设置传入参数lockName为空,取消上锁。

在翻阅代码时,我们看到第一步拉取待触发的trigger时:

java protectedTriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger)throwsJobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn,trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: "

  • e.getMessage(), e); }

在加锁之前对lockName做了一次判断,而非像其他加锁方法一样,默认传入的就是LOCKTRIGGERACCESS:

java public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { //默认上锁 returnexecuteInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallback<List<TriggerFiredResult >>() { //省略 },new TransactionValidator<List<TriggerFiredResult>>() { //省略 }); }

通过调试发现isAcquireTriggersWithinLock()的值是false,因而导致传入的lockName是null。我在代码中加入日志,可以更清楚地看到这个过程。

图片 40

图 6 调度日志

由图6可以清楚看到,在拉取待触发的trigger时,默认是不上锁。如果这种默认配置有问题,岂不是会频繁发生重复调度的问题?而事实上并没有,原因在于Quartz默认采取乐观锁,也就是允许多个线程同时拉取同一个trigger。我们看一下Quartz在调度流程的第二步fire trigger的时候做了什么,注意此时是上锁状态:

java protectedTriggerFiredBundle triggerFired(Connection conn, OperableTrigger trigger)throwsJobPersistenceException { JobDetail job; Calendar cal = null; // Make sure trigger wasn't deleted, paused, or completed... try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn,trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn't select trigger state: "

  • e.getMessage(), e); }

调度线程如果发现当前trigger的状态不是ACQUIRED,也就是说,这个trigger被其他线程fire了,就会返回null。在之前我们提到,在调度流程的第三步,如果发现某个trigger第二步的返回值是null,就会跳过第三步,取消fire。在通常的情况下,乐观锁能保证不发生重复调度,但是难免发生ABA问题,我们看一下这是发生重复调度时的日志:

图片 41

图 7 重复调度的日志

在第一步时,也就是Quartz在拉取到符合条件的triggers 到将他们的状态由WAITING改为ACQUIRED之间停顿了有超过9ms的时间,而另一台服务器正是趁着这9ms的空档完成了WAITING-->ACQUIRED-->EXECUTING-->WAITING(也就是一个完整的状态变化周期)的全部过程,参见下图。

图片 42

图 8 重复调度原因示意图

如何去解决这个问题呢?在配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true,这样,在调度流程的第一步,也就是拉取待即将触发的triggers时,是上锁的状态,即不会同时存在多个线程拉取到相同的trigger的情况,也就避免了重复调度的危险。

解决办法

如何去解决这个问题呢?在配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true,这样,在调度流程的第一步,也就是拉取待即将触发的triggers时,是上锁的状态,即不会同时存在多个线程拉取到相同的trigger的情况,也就避免的重复调度的危险。

心得

此次排查过程并非一帆风顺,走过一些坑,也有一些非技术相关的体会:

学习是一个需要不断打磨、修正的能力。就我个人而言,为了学Quartz,刚开始去翻一个2.4MB大小的源码时毫无头绪,且效率低下,所以立刻转换方向,先了解这个框架的运行模式,在做什么,有哪些模块,是怎么做的,再找主线,翻相关的源码。之后在一次次使用中,碰到问题再翻之前没看的源码,就越来越顺利。

之前也听过其他同事的学习方法,感觉并不完全适合自己,可能每个人状态经验不同,学习方法也稍有不同。在平时的学习中,需要去感受自己的学习效率,参考建议,尝试,感受效果,改进,会越来越清晰自己适合什么。这里很感谢我的师父,用简短的话先帮我捋顺了调度流程,这样我再看源码就不那么吃力了。

要质疑“经验”和“理所应当”,惯性思维会蒙住你的双眼。在大规模的代码中很容易被习惯迷惑,一开始,我们看到上锁的那个方法的时候,认为这个上锁技巧很棒,这个方法就是为了解决并发的问题,“应该”都上锁了,上锁了就不会有并发的问题了,怎么可能几次与数据库的交互都上锁,突然某一次不上锁呢?直到看到拉取待触发的trigger方法时,觉得有丝丝不对劲,打下日志,才发现实际上是没上锁的。

日志很重要。虽然我们可以调试,但是没有日志,我们是无法发现并证明程序发生了ABA问题。

最重要的是,不要害怕问题,即使是Quartz这样大型的框架,解决问题也不一定需要把2.4MB的源码通通读懂。只要有时间,问题都能解决,只是好的技巧能缩短这个时间,而我们需要在一次次实战中磨练技巧。

style="font-size: 16px;">作者介绍:余慧娟,拍拍贷研发工程师,平时喜欢写一些帮助新手入门的文章,翻译一些外文,看到点赞会很开心,希望自己码字的速度越来越快。yuhuijuan.com是我的个人博客,欢迎关注。

style="font-size: 16px;">声明:本文为作者投稿,版权归其个人所有。 class="backword">返回搜狐,查看更多

责任编辑:

本文由澳门威斯尼人平台登陆发布于智能科技,转载请注明出处:技术头条,quartz定时任务框架调度机制解析

关键词:

上一篇:没有了

下一篇:没有了