欢迎您的访问
专注架构,Java,数据结构算法,Python技术分享

Quartz源码(二):scheduler.start()启动源码分析

scheduler.start()是Quartz的启动方式!下面开始源码的分析!
说明:我都是分析的jobStore 方式为jdbc的SimpleTrigger!RAM的方式类似分析方式!


源码中相关sql中内容解释:

{0} : 表的前缀 ,如表qrtz_trigger ,{0}== qrtz_

{1} :quartz.properties 中配置的 org.quartz.scheduler.instanceName: myInstanceName ,{1} ==myInstanceName


scheduler.start() 调用 .QuartzScheduler.start();

Quartz 的启动要调用start()方法进行线程的启动,并执行需要出发的Trigger,start方法里面进行的操作:

  1. 启动的初始化
  2. 判断是否集群,对应不同的操作
  3. 若是非集群,首先有恢复机制,恢复任何失败或misfire的作业,并根据需要清理数据存储。
  4. 初始化线程管理,唤醒所有等待的线程!

线程中启动线程是调用start()方法,但是真正执行线程任务的操作在run()中!

开启 scheduler调用start();

QuartzScheduler.start();下面就是简单的源码分析:

    public void start() throws SchedulerException {

        if (shuttingDown|| closed) {
            throw new SchedulerException(
                "The Scheduler cannot be restarted after shutdown() has been called.");
        }

        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        notifySchedulerListenersStarting();

        if (initialStart == null) {//初始化标识为null,进行初始化操作
            initialStart = new Date();
            this.resources.getJobStore().schedulerStarted();//1 主要分析的地方      
            startPlugins();
        } else {

            resources.getJobStore().schedulerResumed();//2 如果已经初始化过,则恢复jobStore
        }

        schedThread.togglePause(false);//3 唤醒所有等待的线程

        getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");

        notifySchedulerListenersStarted();
    }

一、启动调度任务

this.resources.getJobStore().schedulerStarted() ;主要分析的地方,实际上 是调用 QuartzSchedulerResources中的JobStore进行启动!看下面代码:

    public void schedulerStarted() throws SchedulerException {
        //是集群
        if (isClustered()) {
            clusterManagementThread = new ClusterManager();
            if(initializersLoader != null)
                clusterManagementThread.setContextClassLoader(initializersLoader);
            clusterManagementThread.initialize();
        } else {//不是集群
            try {
                recoverJobs();/1、恢复job 
            } catch (SchedulerException se) {
                throw new SchedulerConfigException(
                    "Failure occured during job recovery.", se);
            }
        }

        misfireHandler = new MisfireHandler();
        if(initializersLoader != null)
            misfireHandler.setContextClassLoader(initializersLoader);
        misfireHandler.initialize();//2、 获取ThreadExecutor 线程管理
        schedulerRunning = true;

        getLog().debug("JobStore background threads started (as scheduler was started).");
    }   

1 、恢复job recoverJobs();

    //启动的时候 有一个恢复机制:
    //recoverJobs -----  将恢复任何失败或misfire的作业,并根据需要清理数据存储。
    protected void recoverJobs() throws JobPersistenceException {
        executeInNonManagedTXLock(
            LOCK_TRIGGER_ACCESS,
            new VoidTransactionCallback() {
                public void executeVoid(Connection conn) throws JobPersistenceException {
                    recoverJobs(conn);//恢复job
                }
            }, null);
    }

    protected void recoverJobs(Connection conn) throws JobPersistenceException {
        try {
            1//更新不一致的作业状态     先修改状态,将 ACQUIRED 和 BLOCKED ---> WAITING
                int rows = getDelegate().updateTriggerStatesFromOtherStates(conn,
                                                                            STATE_WAITING, STATE_ACQUIRED, STATE_BLOCKED);

            rows += getDelegate().updateTriggerStatesFromOtherStates(conn,
                                                                     STATE_PAUSED, STATE_PAUSED_BLOCKED, STATE_PAUSED_BLOCKED);

            //----更新sql---      
            //"UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND (TRIGGER_STATE = ? OR TRIGGER_STATE = ?)"

            getLog().info(
                "Freed " + rows
                + " triggers from 'acquired' / 'blocked' state.");

            // clean up misfired jobs
            //1.1 清理misfire的jobs
            recoverMisfiredJobs(conn, true);

            // recover jobs marked for recovery that were not fully executed
            //1.2 恢复未完全执行的标记为恢复的作业 --查询 qrtz_fire_trigger
            List<OperableTrigger> recoveringJobTriggers = getDelegate()
                .selectTriggersForRecoveringJobs(conn);
            getLog()
                .info(
                "Recovering "
                + recoveringJobTriggers.size()
                + " jobs that were in-progress at the time of the last shut-down.");

            for (OperableTrigger recoveringJobTrigger: recoveringJobTriggers) {
                if (jobExists(conn, recoveringJobTrigger.getJobKey())) {
                    recoveringJobTrigger.computeFirstFireTime(null);
                    storeTrigger(conn, recoveringJobTrigger, null, false,
                                 STATE_WAITING, false, true);
                }
            }
            getLog().info("Recovery complete.");

            // remove lingering 'complete' triggers...
            //1.3 移除state == complete   
            List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
            for(TriggerKey ct: cts) {
                removeTrigger(conn, ct);
            }
            getLog().info(
                "Removed " + cts.size() + " 'complete' triggers.");

            // clean up any fired trigger entries
            //1.4 清理任何已触发的触发器条目
            int n = getDelegate().deleteFiredTriggers(conn);
            getLog().info("Removed " + n + " stale fired job entries.");
        } catch (JobPersistenceException e) {
            throw e;
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't recover jobs: "
                                              + e.getMessage(), e);
        }
    }

1.1 清理misfire的jobs recoverMisfiredJobs(conn, true);

    //是否有misfire的Trigger
    //我们必须仍然寻找MISFIRED状态,以防触发器被遗忘
    //在此状态下升级到此版本不支持
    a1hasMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime(),    
                                   maxMisfiresToHandleAtATime, misfiredTriggers);   
    ////getMisfireTime() 当前时间 -(减去) 一分钟 ,maxMisfiresToHandleAtATime == -1   ,misfiredTriggers== null 

    "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND NOT (MISFIRE_INSTR = -1) AND NEXT_FIRE_TIME < ? AND TRIGGER_STATE = ? ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC"

        上面sql查询出来结果是个list
                    (aa1)若resultList.size() == count 返回 TRUE!! 否则 返回false
                    (aa2)不等于 count ,封装数据,到resultList中,triggername  TriggerGroup

            //查询出来有misfire 的 Trigger        
        b2 misfiredTriggers.size() > 0
        (bb1)输出日志信息   getLog().info(
        "Handling " + misfiredTriggers.size() + 
        " trigger(s) that missed their scheduled fire-time.");

    bb2)循环 misfiredTriggers List集合
                for (TriggerKey triggerKey: misfiredTriggers) {
                    //retrieveTrigger ,检索Trigger,检索到进行数据封装
                    OperableTrigger trig = 
                        retrieveTrigger(conn, triggerKey);
                         //retrieveTrigger 执行的操作
                        1"SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
                        2)关联Trigger对应的类型,如simpleTrigger

                    if (trig == null) {
                        continue;
                    }

                    //do 更新misfire的触发器
                    doUpdateOfMisfiredTrigger(conn, trig, false, STATE_WAITING, recovering); //recovering===TRUE
                        1cal = retrieveCalendar(conn, trig.getCalendarName()); 搞这个表,qrtz_calendar
                        2trig.updateAfterMisfire(cal); //simpleTrigger默认的misfire 机制  
                             setNextFireTime(new Date()); //设置下次执行的时间(next_fire_time)为当前时间!这里比较重要!!!
                        3 getNextFireTime != null
                             if (trig.getNextFireTime() == null) {
                                storeTrigger(conn, trig,
                                    null, true, STATE_COMPLETE, forceState, recovering);
                                schedSignaler.notifySchedulerListenersFinalized(trig);
                            } else {
                                storeTrigger(conn, trig, null, true, newStateIfNotComplete,
                                        forceState, false);

                                // job == null  replaceExisting ==true  state==waitting   forceState==false  recovering==false
                                storeTrigger(Connection conn,
                                    OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
                                    boolean forceState, boolean recovering)

                                    //Insert or update a trigger.
                                   boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
                                    if (existingTrigger) {

                                        //state == waitting 
                                        getDelegate().updateTrigger(conn, newTrigger, state, job);

                                        //更新sql
                                        /*
                                        "UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"*/

                                    } else {
                                        getDelegate().insertTrigger(conn, newTrigger, state, job);
                                        //插入sql
                                        /*"INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY)  VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"*/
                                    }
                            }

        c3 long earliestNewTime = Long.MAX_VALUE; // long earliestNewTime = Long.MAX_VALUE; === 9223372036854775807
              if(trig.getNextFireTime() != null && trig.getNextFireTime().getTime() < earliestNewTime){
                    earliestNewTime = trig.getNextFireTime().getTime();
              }

1.2、 恢复未完全执行的标记为恢复的作业

    List<OperableTrigger> recoveringJobTriggers = getDelegate()
        .selectTriggersForRecoveringJobs(conn);                 
    // INSTANCE_NAME == dufy_test    REQUESTS_RECOVERY == true  实际封装到数据库查询是 REQUESTS_RECOVERY== 1
    "SELECT * FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1} AND INSTANCE_NAME = ? AND REQUESTS_RECOVERY = ?"
        //具体怎么是 true是怎么转换成为 1的见附1图片!

      Recovery complete.恢复完成!!  

附1:true 是如何转换为 1 的:
202002201005\_1.png

1.3 移除state == complete

    List<TriggerKey> cts = getDelegate().selectTriggersInState(conn, STATE_COMPLETE);
    -----------------------------------------------------------------------------
        "SELECT TRIGGER_NAME, TRIGGER_GROUP FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ?"
        -----------------------------------------------------------------------------
        for(TriggerKey ct: cts) {
            removeTrigger(conn, ct);
            ---------------------------------------------------------------------
                a)删除前,先查询jobDetail
                JobDetail job = getDelegate().selectJobForTrigger(conn,getClassLoadHelper(), key, false);
            "SELECT J.JOB_NAME, J.JOB_GROUP, J.IS_DURABLE, J.JOB_CLASS_NAME, J.REQUESTS_RECOVERY FROM {0}TRIGGERS T, {0}JOB_DETAILS J WHERE T.SCHED_NAME = {1} AND J.SCHED_NAME = {1} AND T.TRIGGER_NAME = ? AND T.TRIGGER_GROUP = ? AND T.JOB_NAME = J.JOB_NAME AND T.JOB_GROUP = J.JOB_GROUP"

                b)删除触发器,其侦听器及其Simple / Cron / BLOB子表条目。
                boolean removedTrigger = deleteTriggerAndChildren(conn, key);
            deleteTrigger(Connection conn, TriggerKey triggerKey)
                b1deleteTriggerExtension
                "DELETE FROM {0}SIMPLE_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"
                "DELETE FROM {0}BLOB_TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

                b2"DELETE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?"

                c)是否删除jobdetail ,判断 isDurable 默认 false
                if (null != job && !job.isDurable()) {
                    int numTriggers = getDelegate().selectNumTriggersForJob(conn,
                                                                            job.getKey());
                    ---------------------------------------------------------
                        "SELECT COUNT(TRIGGER_NAME) FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                        ---------------------------------------------------------
                        if (numTriggers == 0) {
                            // Don't call removeJob() because we don't want to check for
                            // triggers again.
                            //不要调用removeJob(),因为我们不想再次检查触发器。
                            deleteJobAndChildren(conn, job.getKey()); //删除作业及其侦听器。
                            -----------------------------------------------------
                                //deleteJobDetail(Connection conn, JobKey jobKey) 删除给定作业的作业明细记录。
                                "DELETE FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"
                                -----------------------------------------------------
                        }
                }
        }

1.4 清理任何已触发的触发器条目

     int n = getDelegate().deleteFiredTriggers(conn);
     ----------------------------------------------------------------------------
     "DELETE FROM {0}FIRED_TRIGGERS WHERE SCHED_NAME = {1}"
     ----------------------------------------------------------------------------

2、获取ThreadExecutor 线程管理 misfireHandler.initialize();

    public void initialize() {
            ThreadExecutor executor = getThreadExecutor();
            //getThreadExecutor ==  private ThreadExecutor threadExecutor = new DefaultThreadExecutor();
            executor.execute(MisfireHandler.this); //启动线程执行 对应job的 execute方法
            //MisfireHandler  ==  class MisfireHandler extends Thread  继承了Thread
    }

二、 如果已经初始化过,则恢复

jobStoreresources.getJobStore().schedulerResumed();.如果已经初始化过,则恢复调度器运行 !

    private volatile boolean schedulerRunning = false;//默认schedulerRunning = false
    public void schedulerResumed() {
        schedulerRunning = true;
    }

三、 唤醒所有等待的线程

schedThread.togglePause(false);

    schedThread.togglePause(false);
    //指示主处理循环在下一个可能的点暂停。
    void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
                signalSchedulingChange(0);
                ------------------------------------------
                    //发信号通知主要处理循环,已经进行了调度的改变 - 以便中断在等待misfire时间到达时可能发生的任何睡眠。
                    public void signalSchedulingChange(long candidateNewNextFireTime) {
                    synchronized(sigLock) {
                        signaled = true;
                        signaledNextFireTime = candidateNewNextFireTime;
                        sigLock.notifyAll();  // private final Object sigLock = new Object();
                    }
                }
                ------------------------------------------
            } else {
                sigLock.notifyAll();//唤醒所有等待的线程
            }
        }
    }   

四、总结

在看源码的时候最好要自己能够结合源码,通过文章在debug一下,可以加深印象和理解!

作者:阿飞云 | 来源:https://blog.csdn.net/u010648555/column/info/14251

赞(0) 打赏
版权归原创作者所有,任何形式转载请联系作者;码农code之路 » Quartz源码(二):scheduler.start()启动源码分析

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏