跳到主要内容

定时任务机制说明

架构说明

全新的定时任务不再面向xxl-job开发,而是面向mq开发。

通过xxl-job解决多节点定时任务重复执行问题,开发过程中只需要一个xxl-job任务入口,然后对需要执行的任务发送mq,即可实现定时任务。

类图

11

*jobHandler 为xxl-job对接的,分别对应执行的任务

*Execute 为任务接口,分别代表每小时/每天/每月/每年任务,继承接口,即可在特定时间进行调用

JobAmqpExchange 为定时任务的AMQP消息静态常量,定时任务触发为发送消息,交给具体的消息处理去完成任务

OrderStatusChangeJobReceiver为定时任务接口的实现类。

11

11

由于类图关联关系不是很紧密,这里表明一个示例:

OrderStatusCheckJob实现各个任务接口,来完成定时任务的调用。

Every*ExecuteJobHandler 为xxl-job的核心类的继承,继承这个类,且加入对应的注解,才可以与xxl-job对接。

流程图

11

1、 xxl-job定时任务触发,调用javashop中的任务

2、 调用xxl任务类,发送AMQP消息

3、 AMQP消费者监听消息

4、 具体业务在AMQP消费监听中循环调用接口的实现类(这里为EveryDayExecute)

5、 业务进行处理,修改数据库或者缓存等

部署

需要配置文件 application.yml进行配置

xxl:
job:
admin:
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
appname: timing-job-execute
ip:
port: 6999
logpath: /Users/chopper/log
logretentiondays: -1
accessToken:

需要修改内容:

xxl.job.admin.address===>调用地址

xxl.job.executor.logpath===>日志路径

修改xxl-job-admin中的配置文件 xxl-job-admin.properties 数据库。

附件:

sql 右键新窗口打开在线下载

xxl-job 右键新窗口打开在线下载

开发

消息静态常量

进行AMQP消息常量等设置,新增定时任务,把这里的常量进行增加

package com.enation.app.javashop.job;

/**
* 定时任务AMQP消息定义
*
* @author kingapex
* @version 1.0
* @since 6.4
* 2017-08-17 18:00
*/
public class JobAmqpExchange {


/**
* 每小时执行
*/
public final static String EVERY_HOUR_EXECUTE = "EVERY_HOUR_EXECUTE";

/**
* 每日执行
*/
public final static String EVERY_DAY_EXECUTE = "EVERY_DAY_EXECUTE";

/**
* 每月执行
*/
public final static String EVERY_MONTH_EXECUTE = "EVERY_MONTH_EXECUTE";

/**
* 每年执行
*/
public final static String EVERY_YEAR_EXECUTE = "EVERY_YEAR_EXECUTE";

}

任务接口

Every***Execute为任务接口,示例代码如下,实现接口,注入spring即可


/**
* 每日执行
*
* @author liushuai
* @version v1.0
* @since v7.0
* 2018-07-06 上午4:24
*/
@JobHandler(value = "everyDayExecuteJobHandler")
@Component
public class EveryDayExecuteJobHandler extends IJobHandler {

protected final Log logger = LogFactory.getLog(this.getClass());

@Autowired
private AmqpTemplate amqpTemplate;
@Override
public ReturnT<String> execute(String param) throws Exception {

try {
amqpTemplate.convertAndSend(JobAmqpExchange.EVERY_DAY_EXECUTE,
JobAmqpExchange.EVERY_DAY_EXECUTE + "_ROUTING",
"");
} catch (Exception e) {
this.logger.error("每日任务AMQP消息发送异常:", e);
return ReturnT.FAIL;
}
return ReturnT.SUCCESS;
}
}

消费者

示例代码

package com.enation.app.javashop.consumer.core.receiver;

import com.enation.app.javashop.consumer.job.execute.EveryDayExecute;
import com.enation.app.javashop.core.base.JobAmqpExchange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

/**
* 每日执行调用
*
* @author Chopper
* @version v1.0
* @since v7.0
* 2018-07-25 上午8:26
*/
@Component
public class EveryDayExecuteReceiver {
protected final Log logger = LogFactory.getLog(this.getClass());

@Autowired
private List<EveryDayExecute> everyDayExecutes;

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = JobAmqpExchange.EVERY_DAY_EXECUTE + "_QUEUE"),
exchange = @Exchange(value = JobAmqpExchange.EVERY_DAY_EXECUTE, type = ExchangeTypes.FANOUT)
))
public void everyDay() {

for (EveryDayExecute everyDayExecute : everyDayExecutes) {
try {
everyDayExecute.everyDay();
} catch (Exception e) {
logger.error("每日任务异常:", e);
}
}

}

}

业务实现

package com.enation.app.javashop.consumer.job.execute.impl;

import com.enation.app.javashop.core.base.JobAmqpExchange;
import com.enation.app.javashop.consumer.job.execute.EveryDayExecute;
import com.enation.app.javashop.core.trade.order.service.OrderTaskManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 订单状态扫描
*
* @author chopper
* @version v1.0
* @since v7.0
* 2018-07-05 下午2:11
*/
@Component
public class OrderStatusCheckJob implements EveryDayExecute {

protected final Log logger = LogFactory.getLog(this.getClass());

@Autowired
private OrderTaskManager orderTaskManager;

/**
* 每晚23:30执行
*/
@Override
public void everyDay() {
/** 自动取消 */
try {
// 款到发货,新订单24小时未付款要自动取消
this.orderTaskManager.cancelTask();
} catch (Exception e) {
logger.error("自动取消出错", e);
}

/** 自动确认收货 */
try {
// 发货之后10天要自动确认收货
this.orderTaskManager.rogTask();
} catch (Exception e) {
logger.error("自动确认收货出错", e);
}

/** 自动完成天数 */
try {
// 确认收货7天后标记为完成
this.orderTaskManager.completeTask();
} catch (Exception e) {
logger.error("订单7天后标记为完成出错", e);
}

/** 自动支付天数 */
try {
this.orderTaskManager.payTask();
} catch (Exception e) {
logger.error("订单自动支付完成出错", e);
}

/** 售后失效天数 */
try {
// 完成后一个月没有申请售后,标记为售后过期
this.orderTaskManager.serviceTask();
} catch (Exception e) {
logger.error("订单标记为售后过期出错", e);
}

try {
// 超过14天不能评价
this.orderTaskManager.commentTask();
} catch (Exception e) {
logger.error("订单超过14天不能评价出错", e);
}

}

}

代码命名规范及存放位置

imgimg