在Java中使用線程池處理K線程上N個用戶的M任務(N*M >> K)。

[英]Using ThreadPools to process M tasks of N users on K threads (N*M >> K) in Java

I have a RESTful-styled RPC (remote procedure call) API running on a tomcat server that processes data of N users with M tasks on K threads. Mostly one user has around 20 to 500 tasks (but M could be between 1 to 5000). One task needs around 10 to 20 seconds to complete, but can be between 1 second and 20 minutes. Currently, mostly the system has one user, sometimes up to three, but it increases to around 10 users at the same time in the near future. Our server has 10 cores, therefore I'd like to use 10 threads. At the moment every user gets 5 threads for processing, which works fine. But a) most of the time the machine is only utilized 50% (which results in needles waiting in the "30-minute" range), sometimes the server load is up to 150%.


Requirements to solution:


  1. at all times the server is utilized to 100% (if there are tasks)
  2. 在任何時候,服務器都被使用到100%(如果有任務的話)
  3. that all users are treated the same regarding thread execution (same amount of threads finished as every other user)
  4. 所有用戶對線程執行的處理都是相同的(與其他用戶相同的線程數量)
  5. a new user does not have to wait until all tasks of a earlier user are done (especially in the case where user1 has 5000 tasks and user2 has 1 this is important)
  6. 新用戶不需要等到早期用戶的所有任務完成(特別是在user1有5000個任務,user2有1個任務的情況下)。

Solutions that come to mind:


  1. just use a FixedThreadPoolExecutor with 10 threads, violates condition 3


  2. use the PriorityBlockingQueue and implement the compareTo method in my task -> can not use the threadpoolExecutors submit method (and therefore I do not know when a submitted task is over)


  3. implement a "round robin" like blocking queue, where the K threads (in our case 10) take new tasks from the N internal queues in a round robin way -> to be able to put a task into the right queue, I need a "submit"-method that takes more than one parameter (I need to implement a ThreadPoolExecutor, too)

    實現一個“循環”像阻塞隊列,K的線程(在我們的例子中10)把新任務從N內部隊列循環賽的方式- >能夠把任務放在正確的隊列,我需要一個“提交”方法需要多個參數(我也需要實現一個ThreadPoolExecutor)

I tried to make an illustration of what I mean by round robin like blocking queue (if not helpful feel free to edit it out):


  --                       --
  --        --        --   --             queue task load, 
  --   --   --   --   --   --        --   one task denoted by --
  --   --   --   --   --   --   --   -- 
| Q1 | Q2 | Q3 | Q4 | Q5 | Q6 | Q7 | QN |
|                      *   ^            |
|                  last|   |next        |
|                           -------------
\                          /
 \    |    |    |    |    |
 | T1 | T2 | T3 | T4 | TK |

Is there an elegant solution to use mostly Java standard APIs (or any other widespread Java API) for achieving this kind of processing behavior (might it be one of my proposed solutions or any another solution)? Or do you have any other hints on how to tackle this issue?

是否有一種優雅的解決方案來使用大多數Java標准API(或任何其他廣泛的Java API)來實現這種處理行為(可能是我提出的解決方案或其他解決方案之一)?或者你有其他的關於如何解決這個問題的提示嗎?

3 个解决方案



Addressing your requirements:


1) maximizing thread usage: any ThreadPoolExecutor will take care of this.
2) all users are treated the same: essentially requires a round-robin setup.
3) avoid new users waiting in FIFO order: same as #2.


Also you mentioned the ability to submit and get a result.


You might consider a standalone PriorityBlockingQueue<Job> using a wrapper object, e.g.:

您可以使用一個包裝器對象來考慮一個獨立的PriorityBlockingQueue ,例如:

class Job implements Comparable<Job> {
    private int priority;
    private YourCallable task;

    public Job(int priority, YourCallable task) {
        this.priority = priority;
        this.task = task;

    public int compareTo(Job job) {
        // use whatever order you prefer, based on the priority int

Your producer offers a Job to the PriorityBlockingQueue with a priority assigned (based on your round-robin rule or whatever), and a task that implements Callable. Your consumer then does queue.poll for a Job.


Once you have that in hand, you can grab the task contained inside that Job object and send it off for processing on a ThreadPoolExecutor of your choosing.




If you agree that minimizing the overall task latency is a good replacement for requirements 2 and 3, and you have good-enough task runtime estimates, then I may have an answer.


You store the task submit time with each task, so that later you can always compute its estimated latency. You can then build a PriorityBlockingQueue that, when inserting a new task, always inserts it at a queue position that provides some fairness and attempts to minimize overall latency. This will, at first, put long-running tasks at a disadvantage. I have not tried it myself, but I would assign a task priority based on your estimated run time, and use estimatedRuntime-waitingTime as priority (taking the lowest-priotity job first). This will give heavy tasks a chance after they waited enough to have negative priority. Until then, light tasks will have a better chance to be first, even if they have just been submitted. This scheduling will only be so fair as your estimates allow, though.


As for the round-robin requirement: If that is really important, you can handle this in the queue as well. Basically, when you use a thread pool, you can implement yur scheduling strategy in terms of where you insert new jobs in the queue. If you can estimate job latency, you can balance that across your users, too, by inserting at the right position.




I have been working on a solution similar to the round-robin setup. It gets complicated real fast but I think I came up with a decent implementation. It is probably not a very elegant solution but there are unit-tests showing some functions. Unfortunately, TaskQ is not yet at a "1.0" stage.


It does cover your points 1 to 3:


  • you can specify the amount of threads to use and if there are enough tasks, all threads are used.
  • 您可以指定要使用的線程數量,如果有足夠的任務,則使用所有線程。
  • each user will get a turn as threads become available
  • 當線程可用時,每個用戶將得到一個轉換。
  • if one user A has 500 tasks in queue and another user B comes along with 1 task, the task from user B will get executed as soon as a thread is available.
  • 如果一個用戶在隊列中有500個任務,而另一個用戶B有一個任務,那么用戶B的任務就會在線程可用時立即執行。

There is no manual/documentation yet, I hope you can find the time to investigate. A unit test showing some usage is here, the main class to extend/use is RunnableTaskQWithQos.




粤ICP备14056181号  © 2014-2020 ITdaan.com