TBSchedule源碼學習筆記-線程組任務調度


根據上文的啟動過程,找到了線程組的實現。com.taobao.pamirs.schedule.taskmanager.TBScheduleManager

/**
* 1、任務調度分配器的目標: 讓所有的任務不重復,不遺漏的被快速處理。
* 2、一個Manager只管理一種任務類型的一組工作線程。
* 3、在一個JVM里面可能存在多個處理相同任務類型的Manager,也可能存在處理不同任務類型的Manager。
* 4、在不同的JVM里面可以存在處理相同任務的Manager
* 5、調度的Manager可以動態的隨意增加和停止
*
* 主要的職責:
* 1、定時向集中的數據配置中心更新當前調度服務器的心跳狀態
* 2、向數據配置中心獲取所有服務器的狀態來重新計算任務的分配。這么做的目標是避免集中任務調度中心的單點問題。
* 3、在每個批次數據處理完畢后,檢查是否有其它處理服務器申請自己把持的任務隊列,如果有,則釋放給相關處理服務器。
*
* 其它:
* 如果當前服務器在處理當前任務的時候超時,需要清除當前隊列,並釋放已經把持的任務。並向控制主動中心報警。
*
* @author xuannan
*
*/

@SuppressWarnings({ "rawtypes", "unchecked" })
abstract class TBScheduleManager implements IStrategyTask {
//.......//
}

這個類的構造方法是這樣的

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
//.......//
}

這里發現有兩個老朋友,aFactory就是那個調度服務器對象,baseTaskType是設置的平台類型,aScheduleCenter調度配置中心客戶端接口,那么ownSign是什么?
以下取自官方文檔:

OwnSign環境區域
是對運行環境的划分,進行調度任務和數據隔離。例如:開發環境、測試環境、預發環境、生產環境。
不同的開發人員需要進行數據隔離也可以用OwnSign來實現,避免不同人員的數據沖突。缺省配置的環境區域OwnSign=’BASE’。
例如:TaskType=’DataDeal’,配置的隊列是0、1、2、3、4、5、6、7、8、9。缺省的OwnSign=’BASE’。
此時如果再啟動一個測試環境,則Schedule會動態生成一個TaskType=’DataDeal-Test’的任務類型,環境會作為一個變量傳遞給業務接口,
由業務接口的實現類,在讀取數據和處理數據的時候進行確定。業務系統一種典型的做法就是在數據表中增加一個OWN_SIGN字段。
在創建數據的時候根據運行環境填入對應的環境名稱,在Schedule中就可以環境的區分了。

com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager 這個接口要好好看下,定義的方法有點多,但是一眼看下來更多的方法是為控制台頁面提供服務了,例如創建任務。諸如任務和任務相等信息的維護和查詢都在這個接口中定義。

那么TBScheduleManager 這個構造函數就用到了一個,繼續打開構造函數看

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
this.factory = aFactory;
//private static int nextSerialNumber = 0; 生成用戶標志不同的線程序號,用於區分不同的線程組
this.currentSerialNumber = serialNumber();
this.scheduleCenter = aScheduleCenter;
//按照任務類型加載調度任務信息
this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType);
log.info("create TBScheduleManager for taskType:"+baseTaskType);
//清除已經過期1天的TASK,OWN_SIGN的組合。超過一天沒有活動server的視為過期
//為什么要清除?如果任務確實能跑一天怎么辦
this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除過期OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval());
//通過調度服務器提供的方法獲得對應的bean
Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName());
if (dealBean == null) {
throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在");
}
//如果沒有實現調度器對外的基礎接口,肯定不能用啊
if (dealBean instanceof IScheduleTaskDeal == false) {
throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 沒有實現 IScheduleTaskDeal接口");
}
this.taskDealBean = (IScheduleTaskDeal)dealBean;
//任務的配置校驗,為什么要大於5倍 出發點是什么
if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){
throw new Exception("數據配置存在問題,死亡的時間間隔,至少要大於心跳線程的5倍。當前配置數據:JudgeDeadInterval = "
+ this.taskTypeInfo.getJudgeDeadInterval()
+ ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate());
}
//這個currenScheduleServer是類"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的實例,存儲了當前調度服務的信息
this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
//向調度中心客戶端注冊調度服務信息
scheduleCenter.registerScheduleServer(this.currenScheduleServer);
this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid();
//又啟動了一個定時任務,看這名字就知道是心跳
this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat");
this.heartBeatTimer.schedule(new HeartBeatTimerTask(this),
new java.util.Date(System.currentTimeMillis() + 500),
this.taskTypeInfo.getHeartBeatRate());
//對象創建時需要做的初始化工作,模版方法。
initial();
}

目前能觀察到的這個構造函數流程

1.按照任務名稱加載任務配置
2.清除過期OWN_SIGN信息
3.檢查任務配置是否正確
4.將調度服務信息注冊到調度中心客戶端
5.其他初始化操作

記錄一下tbschedule對任務配置的存儲方式(控制台頁面”任務管理”)
控制台:
這里寫圖片描述
zk節點:
這里寫圖片描述

節點內容格式化之后

{
"baseTaskType": "commonSyncAdvertiserTask",
"heartBeatRate": 5000,
"judgeDeadInterval": 60000,
"sleepTimeNoData": 500,
"sleepTimeInterval": 0,
"fetchDataNumber": 500,
"executeNumber": 10,
"threadNumber": 5,
"processorType": "SLEEP",
"permitRunStartTime": "0 * * * * ?",
"expireOwnSignInterval": 1,
"dealBeanName": "commonSyncAdvertiserTask",
"taskParameter": "3",
"taskKind": "static",
"taskItems": [
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9"
]
,
"maxTaskItemsOfOneThreadGroup": 0,
"version": 0,
"sts": "resume"
}

第4步主要包含以下代碼行

//這個currenScheduleServer是類"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的實例,存儲了當前調度服務的信息
this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
//向調度中心客戶端注冊調度服務信息
scheduleCenter.registerScheduleServer(this.currenScheduleServer);

當前調度服務信息都包含什么,有什么是狀態量,向調度中心客戶端注冊的意圖是什么?打開ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber()); 這個代碼看一下,這個ScheduleServer實例融合了哪些信息

public static ScheduleServer createScheduleServer(IScheduleDataManager aScheduleCenter,String aBaseTaskType,
String aOwnSign, int aThreadNum)
throws Exception {
ScheduleServer result = new ScheduleServer();
//調度任務類型(beanName)
result.baseTaskType = aBaseTaskType;
result.ownSign = aOwnSign;
//會將調度類型和環境區域 按這種方式組合baseType+"$" + ownSign
result.taskType = ScheduleUtil.getTaskTypeByBaseAndOwnSign(
aBaseTaskType, aOwnSign);
result.ip = ScheduleUtil.getLocalIP();
result.hostName = ScheduleUtil.getLocalHostName();
//aScheduleCenter.getSystemTime() 從哪來
result.registerTime = new Timestamp(aScheduleCenter.getSystemTime());
//任務配置的線程數
result.threadNum = aThreadNum;
result.heartBeatTime = null;
result.dealInfoDesc = "調度初始化";
result.version = 0;
//厲害了這里生成了一個自己的uuid
result.uuid = result.ip
+ "$"
+ (UUID.randomUUID().toString().replaceAll("-", "")
.toUpperCase());
SimpleDateFormat DATA_FORMAT_yyyyMMdd = new SimpleDateFormat("yyMMdd");
String s = DATA_FORMAT_yyyyMMdd.format(new Date(aScheduleCenter.getSystemTime()));
//這個作用是啥
result.id = Long.parseLong(s) * 100000000
+ Math.abs(result.uuid.hashCode() % 100000000);
return result;
}

在這里就可以發現aOwnSign 的作用原理了,就是作為一個標記和任務類型拼接在一塊,這樣對於同樣的任務不同的ownSign有不同的調度域,這個方法里有幾個關鍵點:

result.registerTime = new Timestamp(aScheduleCenter.getSystemTime()); 這個屬性的作用是?
result.uuid 這個作用是?
result.id 這個的作用是?為什么這么生成?為什么和aScheduleCenter.getSystemTime() 相關
aScheduleCenter.getSystemTime() 取到的是一個什么時間呢?

aScheduleCenter.getSystemTime() 取到的是個什么時間?之前只是初始化了IScheduleDataManager實現,估計構造函數里會有答案,看實現類com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK

public ScheduleDataManager4ZK(ZKManager aZkManager) throws Exception {
this.zkManager = aZkManager;
gson = new GsonBuilder().registerTypeAdapter(Timestamp.class,new TimestampTypeAdapter()).setDateFormat("yyyy-MM-dd HH:mm:ss").create();

this.PATH_BaseTaskType = this.zkManager.getRootPath() +"/baseTaskType";

if (this.getZooKeeper().exists(this.PATH_BaseTaskType, false) == null) {
//創建了一個永久節點
ZKTools.createPath(getZooKeeper(),this.PATH_BaseTaskType, CreateMode.PERSISTENT, this.zkManager.getAcl());
}
//當前服務器時間
loclaBaseTime = System.currentTimeMillis();
//Zookeeper服務器時間
String tempPath = this.zkManager.getZooKeeper().create(this.zkManager.getRootPath() + "/systime",null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
Stat tempStat = this.zkManager.getZooKeeper().exists(tempPath, false);

zkBaseTime = tempStat.getCtime();
ZKTools.deleteTree(getZooKeeper(), tempPath);
if(Math.abs(this.zkBaseTime - this.loclaBaseTime) > 5000){
log.error("請注意,Zookeeper服務器時間與本地時間相差 : " + Math.abs(this.zkBaseTime - this.loclaBaseTime) +" ms");
}
}
public long getSystemTime(){
//zk服務器時間+客戶端啟動時間
return this.zkBaseTime + ( System.currentTimeMillis() - this.loclaBaseTime);
}

可見這里返回了一個zk服務器時間+本地時間偏移量這樣一個值,這里做法我是這么理解的,在分布式環境下可能存在各機器時鍾不同步的情況(上下偏差),通過zk服務器時間加相對時間相當於把整個集群的時間都撥正了,這樣時鍾在tbschedule框架這里是可信的,所以可以把aScheduleCenter.getSystemTime()方法看作返回調度服務的集群可信時間,

目前為止以下兩個問題還沒有得到解答

result.uuid 這個作用是?
result.id 這個的作用是?為什么這么生成?為什么和aScheduleCenter.getSystemTime() 相關

只能接着看第4步里的scheduleCenter.registerScheduleServer(this.currenScheduleServer); 這個代碼行,在對這個currenScheduleServer小寶貝做了些什么

public void registerScheduleServer(ScheduleServer server) throws Exception {
//簡單
if(server.isRegister() == true){
throw new Exception(server.getUuid() + " 被重復注冊");
}
//本例 rootPath /dsp_official_0928_wd/schedule/
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask
String zkPath = this.PATH_BaseTaskType + "/" + server.getBaseTaskType() +"/" + server.getTaskType();
if (this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/server
zkPath = zkPath +"/" + this.PATH_Server;
if (this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
String realPath = null;
//此處必須增加UUID作為唯一性保障
String zkServerPath = zkPath + "/" + server.getTaskType() + "$"+ server.getIp() + "$"
+ (UUID.randomUUID().toString().replaceAll("-", "").toUpperCase())+"$";

//創建臨時節點
realPath = this.getZooKeeper().create(zkServerPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT_SEQUENTIAL);

//之前已經生成過uuid,那么為什么這里要重新設置一次????
server.setUuid(realPath.substring(realPath.lastIndexOf("/") + 1));

//
Timestamp heartBeatTime = new Timestamp(this.getSystemTime());
server.setHeartBeatTime(heartBeatTime);

//server序列化存儲到zk的節點上
String valueString = this.gson.toJson(server);
this.getZooKeeper().setData(realPath,valueString.getBytes(),-1);
server.setRegister(true);
}

這個registerScheduleServer方法看到這里,都是寫zk的查詢操作和設置操作,在這里也會發現zk維護的內容,就是說創建了zk節點用於維護調度服務數據,便於控制台頁面查詢和調度服務器間同步

rootPath/baseTaskType/ {策略管理>任務名稱}/${任務管理>任務處理的SpringBean}/server 下每個任務組都有自己的節點,這個節點會存儲調度服務數據,從數據內容上看該數據會因每一次調度而修改。

這里是一個調度節點的例子:
這里寫圖片描述

節點內容的格式化后如下:

{
"uuid": "commonSyncAdvertiserTask$127.0.0.1$35542D3EE3984FEAB15525C319358DE6$0000000060",
"id": 17120684352326,
"taskType": "commonSyncAdvertiserTask",
"baseTaskType": "commonSyncAdvertiserTask",
"ownSign": "BASE",
"ip": "127.0.0.1",
"hostName": "adm_adc02",
"threadNum": 5,
"registerTime": "2017-12-06 18:04:30",
"heartBeatTime": "2017-12-08 10:43:17",
"lastFetchDataTime": "2017-12-08 10:43:00",
"dealInfoDesc": "沒有數據,暫停調度:FetchDataCount=2439,FetchDataNum=0,DealDataSucess=0,DealDataFail=0,DealSpendTime=0,otherCompareCount=0",
"nextRunStartTime": "2017-12-08 10:44:00",
"nextRunEndTime": "當不能獲取到數據的時候pause",
"version": 34144,
"isRegister": true,
"managerFactoryUUID": "127.0.0.1$adm_adc02$AB1899D1EA4F4783B25EC4B4E04A6A79$0000000081"
}

控制台頁面觀察到的數據如下:
這里寫圖片描述

那么問題來了,存儲調度服務數據內容的節點值是什么時候被修改的,調度任務是怎么被啟動的?
回到com.taobao.pamirs.schedule.taskmanager.TBScheduleManager 構造方法,最后一行

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
//.......//
initial();
}

這個initial() 方法的實現有兩個,分別位於com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStaticcom.taobao.pamirs.schedule.taskmanager.TBScheduleManagerDynamic 經過之前的學習,已經很明確的知道就是使用的com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic實例,那么看一下這個方法的實現。這個方法里第一眼就看到一個關鍵字”start()”

public void initial() throws Exception{
//線程啊。。
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
try{
//唉呀媽呀,這個log老熟悉了
log.info("開始獲取調度任務隊列...... of " + currenScheduleServer.getUuid());
while (isRuntimeInfoInitial == false) {
if(isStopSchedule == true){
log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid());
return;
}
//log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial);
try{
//初始化運行期信息
initialRunningInfo();
//運行期信息是否初始化成功
isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
currenScheduleServer.getBaseTaskType(),
currenScheduleServer.getOwnSign());
}catch(Throwable e){
//忽略初始化的異常
log.error(e.getMessage(),e);
}
//每隔一秒重試一次
if(isRuntimeInfoInitial == false){
Thread.currentThread().sleep(1000);
}
}

int count =0;
//獲得框架系統時間
lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
//這個代碼的動作是什么?getCurrentScheduleTaskItemListNow();,好像是獲取調度隊列,如果沒拿到就每隔一秒重試
while(getCurrentScheduleTaskItemListNow().size() <= 0){
if(isStopSchedule == true){
log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid());
return;
}
Thread.currentThread().sleep(1000);
count = count + 1;
// log.error("嘗試獲取調度隊列,第" + count + "次 ") ;
}

String tmpStr ="TaskItemDefine:";
//遍歷調度隊列
for(int i=0;i< currentTaskItemList.size();i++){
if(i>0){
tmpStr = tmpStr +",";
}
//哦?循環體內字符串拼接......
tmpStr = tmpStr + currentTaskItemList.get(i);
}

log.info("獲取到任務處理隊列,開始調度:" + tmpStr +" of "+ currenScheduleServer.getUuid());

//任務總量
taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size();
//只有在已經獲取到任務處理隊列后才開始啟動任務處理器
computerStart();
}catch(Exception e){
log.error(e.getMessage(),e);
String str = e.getMessage();
if(str.length() > 300){
str = str.substring(0,300);
}
startErrorInfo = "啟動處理異常:" + str;
}
}
}.start();
}

看到這里疑問滿滿,初始化運行期信息都做了些什么??,這里貌似沒有看到任務項?任務項在哪里初始化的?線程組怎么按照任務項拆分的??
看初始化運行期信息相關的代碼,目前就兩行

public void initial() throws Exception{
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
//......//
//初始化運行期信息
initialRunningInfo();
//運行期信息是否初始化成功
isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
currenScheduleServer.getBaseTaskType(),
currenScheduleServer.getOwnSign());
//......//
}
}.start();
}

initialRunningInfo() 在做些什么?打開代碼看:

public void initialRunningInfo() throws Exception{
//清除過期的調度任務
scheduleCenter.clearExpireScheduleServer(this.currenScheduleServer.getTaskType(),this.taskTypeInfo.getJudgeDeadInterval());
//拿到任務數據節點名
List<String> list = scheduleCenter.loadScheduleServerNames(this.currenScheduleServer.getTaskType());
//如果當前是leader節點
if(scheduleCenter.isLeader(this.currenScheduleServer.getUuid(),list)){
//是第一次啟動,先清楚所有的垃圾數據
log.debug(this.currenScheduleServer.getUuid() + ":" + list.size());
//初始化任務調度的域信息和靜態任務信息
//如果是leader節點,做這樣的操作
//清除zk任務項節點並重建
// //本例 rootPath /dsp_official_0928_wd/schedule/
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/taskItem
//調用public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法創建任務項
this.scheduleCenter.initialRunningInfo4Static(this.currenScheduleServer.getBaseTaskType(), this.currenScheduleServer.getOwnSign(),this.currenScheduleServer.getUuid());
}
}

createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法創建任務項就是說根據任務管理里面的任務項配置在zk下創建節點
這里寫圖片描述

/**
* 根據基礎配置里面的任務項來創建各個域里面的任務項
* @param baseTaskType
* @param ownSign
* @param baseTaskItems
* @throws Exception
*/

public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception {
ScheduleTaskItem[] taskItems = new ScheduleTaskItem[baseTaskItems.length];
Pattern p = Pattern.compile("\\s*:\\s*\\{");

for (int i=0;i<baseTaskItems.length;i++){
taskItems[i] = new ScheduleTaskItem();
taskItems[i].setBaseTaskType(baseTaskType);
taskItems[i].setTaskType(ScheduleUtil.getTaskTypeByBaseAndOwnSign(baseTaskType, ownSign));
taskItems[i].setOwnSign(ownSign);
Matcher matcher = p.matcher(baseTaskItems[i]);
if(matcher.find()){
taskItems[i].setTaskItem(baseTaskItems[i].substring(0,matcher.start()).trim());
taskItems[i].setDealParameter(baseTaskItems[i].substring(matcher.end(),baseTaskItems[i].length()-1).trim());
}else{
taskItems[i].setTaskItem(baseTaskItems[i]);
}
taskItems[i].setSts(ScheduleTaskItem.TaskItemSts.ACTIVTE);
}
createScheduleTaskItem(taskItems);
}

/**
* 創建任務項,注意其中的 CurrentSever和RequestServer不會起作用
* @param taskItems
* @throws Exception
*/

public void createScheduleTaskItem(ScheduleTaskItem[] taskItems) throws Exception {
for (ScheduleTaskItem taskItem : taskItems){
String zkPath = this.PATH_BaseTaskType + "/" + taskItem.getBaseTaskType() + "/" + taskItem.getTaskType() +"/" + this.PATH_TaskItem;
if(this.getZooKeeper().exists(zkPath, false)== null){
ZKTools.createPath(this.getZooKeeper(), zkPath, CreateMode.PERSISTENT, this.zkManager.getAcl());
}
String zkTaskItemPath = zkPath + "/" + taskItem.getTaskItem();
this.getZooKeeper().create(zkTaskItemPath,null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/cur_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/req_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/sts",taskItem.getSts().toString().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/parameter",taskItem.getDealParameter().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/deal_desc",taskItem.getDealDesc().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
}

回到 public void initial() throws Exception 方法,方法最后調用了一個computerStart();方法,這個方法干嘛的?

public void initial() throws Exception{
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
//......//
computerStart();
//......//
}
}.start();
}

/**
* 開始的時候,計算第一次執行時間
* @throws Exception
*/

public void computerStart() throws Exception{
//只有當存在可執行隊列后再開始啟動隊列
//
boolean isRunNow = false;
if(this.taskTypeInfo.getPermitRunStartTime() == null){
isRunNow = true;
}else{
//獲得任務"執行開始時間"的設置
//以startrun:開始,則表示開機立即啟動調度.
String tmpStr = this.taskTypeInfo.getPermitRunStartTime();
if(tmpStr.toLowerCase().startsWith("startrun:")){
isRunNow = true;
tmpStr = tmpStr.substring("startrun:".length());
}


CronExpression cexpStart = new CronExpression(tmpStr);
//獲得集群校准后的系統當前時間(zk服務器時間+時間偏移量)
Date current = new Date( this.scheduleCenter.getSystemTime());
//第一次的開始執行時間
Date firstStartTime = cexpStart.getNextValidTimeAfter(current);
//到達開始執行時間后,啟動調度任務
this.heartBeatTimer.schedule(
new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr),
firstStartTime);
//看到沒有在這里改了ScheduleServer,用於后續調度使用。
this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime));
//如果沒有設置任務的終止時間,代表"當不能獲取到數據的時候pause"
if( this.taskTypeInfo.getPermitRunEndTime() == null
|| this.taskTypeInfo.getPermitRunEndTime().equals("-1")){
this.currenScheduleServer.setNextRunEndTime("當不能獲取到數據的時候pause");
}else{ // 如果設置了任務的終止時間
try {
//拿到“執行結束時間”設置
String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime();
CronExpression cexpEnd = new CronExpression(tmpEndStr);
//根據第一次執行開始時間,計算第一次執行結束時間
Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime);
//根據當前時間計算一個執行結束時間
Date nowEndTime = cexpEnd.getNextValidTimeAfter(current);
//避免任務不被執行的情況
if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){
isRunNow = true;
firstEndTime = nowEndTime;
}
//到達執行結束時間后暫停任務
this.heartBeatTimer.schedule(
new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr),
firstEndTime);
this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime));
} catch (Exception e) {
log.error("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e);
throw new Exception("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e);
}
}
}
if(isRunNow == true){
this.resume("開啟服務立即啟動");
}
//重寫zk節點信息,就是之前提到的server節點之后重寫
this.rewriteScheduleInfo();

}

這里小心下PauseOrResumeScheduleTask ,理解起來有點討厭,在run方法里又套了個調度任務,利用cron表達式計算一個下一次運行時間(啟動或暫停),再去跑一個PauseOrResumeScheduleTask調度任務。如果不打開看這個類很容易懵逼,以下是這個類的源碼,發現他會在run方法內利用cron再計算自己下一次的運行時間。然后再調用這樣一個猥瑣操作this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);

class PauseOrResumeScheduleTask extends java.util.TimerTask {
private static transient Logger log = LoggerFactory
.getLogger(HeartBeatTimerTask.class);
public static int TYPE_PAUSE = 1;
public static int TYPE_RESUME = 2;
TBScheduleManager manager;
Timer timer;
int type;
String cronTabExpress;
public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) {
this.manager = aManager;
this.timer = aTimer;
this.type = aType;
this.cronTabExpress = aCronTabExpress;
}
public void run() {
try {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
this.cancel();//取消調度任務
Date current = new Date(System.currentTimeMillis());
CronExpression cexp = new CronExpression(this.cronTabExpress);
Date nextTime = cexp.getNextValidTimeAfter(current);
if(this.type == TYPE_PAUSE){
manager.pause("到達終止時間,pause調度");
this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime));
}else{
manager.resume("到達開始時間,resume調度");
this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime));
}
this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
}
}
}

可以看到最后會重寫調度服務信息到zk對應的節點上,方法大概處理流程。

1.該方法先判斷是否設置有任務的開始執行時間,如果沒有設置那么立即開始執行任務項任務
2.如果有設置 “執行開始時間”設置,以當前時間為基准計算下一次開始執行時間,設置一個定時任務,在指定時間開始任務線程
3.如果有設置“執行結束時間”設置,則計算一個執行結束時間,設置一個定時任務,在指定的時間暫停線程

這里使用的“開啟服務”和“暫停服務”,分別使用了com.taobao.pamirs.schedule.taskmanager.TBScheduleManager類的public void resume(String message) throws Exception方法以及public void pause(String message) throws Exception方法


/**
* 處在了可執行的時間區間,恢復運行
* @throws Exception
*/

public void resume(String message) throws Exception{
if (this.isPauseSchedule == true) {
if(log.isDebugEnabled()){
log.debug("恢復調度:" + this.currenScheduleServer.getUuid());
}
this.isPauseSchedule = false;
this.pauseMessage = message;
if (this.taskDealBean != null) {
if (this.taskTypeInfo.getProcessorType() != null &&
this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
this.taskTypeInfo.setProcessorType("NOTSLEEP");
this.processor = new TBScheduleProcessorNotSleep(this,
taskDealBean,this.statisticsInfo);
}else{
this.processor = new TBScheduleProcessorSleep(this,
taskDealBean,this.statisticsInfo);
this.taskTypeInfo.setProcessorType("SLEEP");
}
}
rewriteScheduleInfo();
}
}
/**
* 超過運行的運行時間,暫時停止調度
* @throws Exception
*/

public void pause(String message) throws Exception{
if (this.isPauseSchedule == false) {
this.isPauseSchedule = true;
this.pauseMessage = message;
if (log.isDebugEnabled()) {
log.debug("暫停調度 :" + this.currenScheduleServer.getUuid()+":" + this.statisticsInfo.getDealDescription());
}
if (this.processor != null) {
this.processor.stopSchedule();
}
rewriteScheduleInfo();
}
}

發現resume()方法涉及tbschedule的SLEEP和NOTSLEEP模式的處理。先借用官方描述說明這兩種模式有什么區別

現有的工作線程模式分為Sleep模式和NotSleep模式。缺省是缺省是NOTSLEEP模式。在通常模式下,在通常情況下用Sleep模式。
在一些特殊情況需要用NotSleep模式。兩者之間的差異在后續進行描述。
4、Sleep模式和NotSleep模式的區別
1、ScheduleServer啟動的工作線程組線程是共享一個任務池的。
2、在Sleep的工作模式:當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處於活動狀態。如果是,則自己休眠;
如果其它線程都已經因為沒有任務進入休眠,當前線程是最后一個活動線程的時候,就調用業務接口,獲取需要處理的任務,放入任務池中,
同時喚醒其它休眠線程開始工作。
3、在NotSleep的工作模式:當一個線程任務處理完畢,從任務池中取不到任務的時候,立即調用業務接口獲取需要處理的任務,放入任務池中。
4、Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,導致其它線程不工作的情況。
5、在NotSleep模式下,減少了線程休眠的時間,避免大任務阻塞的情況,但為了避免數據被重復處理,增加了CPU在數據比較上的開銷。
同時要求業務接口實現對象的比較接口。
6、如果對任務處理不允許停頓的情況下建議用NotSleep模式,其它情況建議用sleep模式。

估計是個大活,后續再繼續整理。到這里先整理下業務流程,有點亂:

1.集成tbSchedule的應用,初始化一個com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory 對象
2.在這個調度服務器里,檢查zk的連接狀態,判斷zk是否連接成功
3.如果zk連接成功,初始化調度策略管理器(com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK)和調度任務客戶端(com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK)
4.啟動一個定時任務,這個定時任務會以2秒為一個周期對當前的zk狀態做檢查,如果zk連接失敗(重試次數5)會停止當前服務,否則刷新調度服務器,按照以下步驟做刷新
.1 取的本應用所有的可用策略,然后每個策略按照策略的總任務組數和機器數目分配任務組數目,並將計算的數目存儲到策略的應用數據節點上。
.2 根據zk的機器節點的分配數目(requestNum)創建任務組(com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic)
5.任務組中根據factory和調度任務客戶端 創建調度服務數據並存儲在server節點
6.任務組里面啟動一個任務線程,拉取控制台設置的數據,並按照配置設置調度任務到指定時間操作processor開始或停止,執行后將新信息回寫到server節點以便於下次計算。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com