AsyncTask -- Java異步調用框架


版權聲明:本文為博主原創文章,未經博主允許不得轉載。
目錄(?)[-]
原創文章轉載請注明作者jmppok及出處httpblogcsdnnetjmppokarticledetails44590991
AsyncTask是個人編寫的一個Java異步調用框架支持用戶
為什么需要AsyncTask與Asyn4J 區別
1Java傳統的Thread 和 Runable功能不足
2 Asyn4J
設計及實現
1接口設計
2 代碼實現
測試
1自定義MyTask
2自定義MyTaskExecutor
3配置MyTaskExecutor
4提交Task並實時監聽進度
5終止Task執行然后重新啟動程序進行Task恢復測試
不足
原創文章,轉載請注明作者:jmppok及出處:http://blog.csdn.net/jmppok/article/details/44590991

AsyncTask是個人編寫的一個Java異步調用框架,支持用戶:
1)自定義Task,並可設置Task的類型(Type), 子類型(subType),超時時間(TImeout),標識(Flag-可用來區分不同的Task),Task的輸入參數(input)等。
2)可通過submitTask,提交 到框架中異步執行,框架查找對應的TaskExectuor,多線程執行。
3)可自定義對應TaskExecutor,通過配置添加到框架中。TaskExecutor支持Execotor Chain, 多個Executor可以組合在一起順序執行。並且支持在Task執行過程中,實時通知任務調用者Task的狀態,進度等。
4)用戶可使用TaskCollector通過TaskManager查詢所有的Task,支持按Task Id,Task Type, Task SubType, Task State, Task Flag, Task beginTIme, Task finishTime等多種方式的組合查詢。
5)支持持久化,用戶提交的Task可以被存儲在數據庫中。即使Task在執行過程中被中斷,重新啟動后會從數據庫中恢復后繼續執行。
6)用戶可通過查詢接口可獲取Task的引用ITaskReference,通過ITaskReference可實時獲取Task的狀態(State)和進度Progress。
7)用戶可定義Task的FinishedCallBack回調,在Submit Task時傳入,在Task完成后自動回調。
8)通過ITaskReference的waitForTask,支持用戶以同步方式使用。
9)用戶可通過ITaskReference獲取Task的執行結果或錯誤信息。

代碼:https://git.oschina.net/jmpp/AsyncTask

1.為什么需要AsyncTask?與Asyn4J 區別?
1.1Java傳統的Thread 和 Runable功能不足
Java提供了Thread,ThreadPool等多線程編程接口。但這些都是基礎接口,雖然使用方便,但功能比較簡單,很多場景下都無法滿足需求。
比如下面的幾個場景:
1)我需要提交一個任務,改任務在后台異步執行,同時我要實時觀察任務的狀態,進度等信息。
2)在提交任務時希望傳入參數,任務完成后能主動通知我,並能獲取結果。
3)任務持久化,我希望在任務執行完畢后,可以查詢到執行的任務列表。或者任務失敗后能重新執行。
如果要實現這些場景,Java本身自帶的接口顯然無法滿足,必須要有一個新的框架來實現。

1.2 Asyn4J
Asyn4J也是一個類似的框架,但它目前還不支持任務的超時設置,持久化,任務回調等功能。

2.設計及實現
2.1接口設計
直接上圖

2.2 代碼實現
具體實現代碼見 Git@OSChttps://git.oschina.net/jmpp/AsyncTask
代碼結構如下:

這里簡單說一下實現思路:
1) 整個實現還是基於Java的Thread和ThreadPool,沒有用第三方框架。
2)持久化基於MySQL,只有一個數據庫表tasks,見tasks.sql.
3)持久化層的實現用到了Mybatis,給予Mybatis的代碼生成工具,直接生成了tasks表對應的數據結構。
4)要持久化必然還要用到對象序列化,這里使用了Kryo。為啥用Kryo,見我的另一篇文章:Java對象序列化小結。
5)日志使用了Log4j。

3.測試
具體可見代碼:https://git.oschina.net/jmpp/AsyncTask
3.1自定義MyTask
[java] view plain copy 在CODE上查看代碼片派生到我的代碼片
package test.mytask;

import com.lenovo.asynctask.Task;

/**
* 類 MyTask 的實現描述:TODO 類實現描述
*
* @author ligh4 2015年3月12日下午2:42:56
*/
public class MyTask extends Task {

/** 
* @param taskType
* @param inputParam
* @param timeoutMills
*/
public MyTask(Object inputParam, int timeoutMills) {
super(MyTask.class.getSimpleName(), inputParam, timeoutMills);
setNeedPersistence(true);
}

}

3.2自定義MyTaskExecutor
[java] view plain copy 在CODE上查看代碼片派生到我的代碼片
package test.mytask;

import com.lenovo.asynctask.ITaskExecutor;
import com.lenovo.asynctask.ITaskReferenceInternal;
import com.lenovo.asynctask.TaskState;
import com.lenovo.asynctask.util.LogHelper;

/**
* 類 TestTaskExecutor 的實現描述:TODO 類實現描述
*
* @author ligh4 2015年3月12日下午2:43:19
*/
public class MyTaskExecutor extends ITaskExecutor {

/** 
* @author ligh4 2015年3月12日下午2:46:51
*/
@Override
public Object execute(ITaskReferenceInternal taskRef) {
LogHelper.debug("begin execute MyTask...");

for (int i = 0; i < 100; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
LogHelper.exception(e);
}
taskRef.setProgress(i + 1);
}

return taskRef.getInput().toString().toUpperCase();
}

/**
* @author ligh4 2015年3月12日下午2:46:51
*/
@Override
public Object continue_execute(ITaskReferenceInternal taskRef) {
if (taskRef.getState() == TaskState.running) {
int i = taskRef.getProgress();
for (; i < 100; i++) {
try {
Thread.sleep(1000);
} catch (Exception e) {
LogHelper.exception(e);
}
taskRef.setProgress(i + 1);
}

return taskRef.getInput().toString().toUpperCase();
} else {
taskRef.setState(TaskState.failed, "");
return null;
}

}

}

3.3配置MyTaskExecutor
taskexecutors.properties中添加:
[java] view plain copy 在CODE上查看代碼片派生到我的代碼片
MyTask = test.mytask.MyTaskExecutor
其實是task的type = task的Executor
3.4提交Task並實時監聽進度
[java] view plain copy 在CODE上查看代碼片派生到我的代碼片
package test.mytask;

import java.util.List;

import com.lenovo.asynctask.ITaskFinishedCallback;
import com.lenovo.asynctask.ITaskReference;
import com.lenovo.asynctask.TaskCollector;
import com.lenovo.asynctask.TaskManager;
import com.lenovo.asynctask.TaskState;
import com.lenovo.asynctask.util.DateUtil;
import com.lenovo.asynctask.util.LogHelper;

/**
* 類 TestContinueTask 的實現描述:TODO 類實現描述
*
* @author ligh4 2015年3月23日上午9:42:14
*/
public class TestContinueTask {

/** 
* @author ligh4 2015年3月12日下午2:52:45
* @param args
*/
public static void main(String[] args) throws Exception {
TaskManager.instance().start();

List<ITaskReference> tasks = queryRunningTasks();
if (tasks == null || tasks.size() == 0) {
submitAndWaitTask();
} else {
for (ITaskReference taskReference : tasks) {
queryTaskProgress(taskReference);
}
}

TaskManager.instance().stop();
}

public static void submitAndWaitTask() throws Exception {
MyTask task = new MyTask("liguanghui", 200000);
ITaskReference taskReference = TaskManager.instance().submitTask(task,
new ITaskFinishedCallback() {

@Override
public void onTaskFinished(ITaskReference taskRef) {
LogHelper.debug(taskRef.getId() + ";" + taskRef.getState().toString() + ";"
+ DateUtil.format(taskRef.getStartedTime()) + " "
+ DateUtil.format(taskRef.getFinishedTime()) + ";"
+ taskRef.getResult().toString());

}
});

queryTaskProgress(taskReference);
}

public static void queryTaskProgress(ITaskReference taskReference) throws Exception {
String taskID = taskReference.getId();
while (!taskReference.isFinished()) {
LogHelper.debug(taskID + ": progress " + taskReference.getProgress());
Thread.sleep(1000);
}
LogHelper.debug(taskID + ": finished. ");
}

public static List<ITaskReference> queryRunningTasks() {
TaskCollector collector = new TaskCollector();
collector.setTaskStateFilter(new TaskState[] { TaskState.running });
collector.setTaskTypeFilter(new String[] { MyTask.class.getSimpleName() });
return TaskManager.instance().findTasks(collector);

}

}

3.5終止Task執行然后重新啟動程序,進行Task恢復測試
還是同3.4一樣的代碼
1)第一次運行沒有Task,會提交一個Task。 submitAndWaitTask();
2)如果改Task沒有被執行完畢就被終止,第二次啟動后該Task就會恢復。
3)這時queryRunningTasks就會查詢到正在運行的Task,並且進度到等待該Task的分支。
4)當然如果你停止的時間很長才重新啟動,會發現Task超時。
4.不足
1.整體實現比較簡單,特別是數據庫表中存儲Java對象序列化的字段,偷懶用的varchar(2000),可能超出,最好改為Blob。(為啥當時不用Blob,因為偷懶,如果Blob的話mybatis生成的代碼就比較復雜了,會有一個XXwithBlob,調用不方便….)
2.線程池個數寫死了,應該可以配置。
3.測試比較簡單,可能有未知bug。


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com