在Java中使用線程池處理K線程上N個用戶的M任務(N*M >> K)。

[英]Using ThreadPools to process M tasks of N users on K threads (N*M >> K) in Java


I have a RESTful-styled RPC (remote procedure call) API running on a tomcat server that processes data of N users with M tasks on K threads. Mostly one user has around 20 to 500 tasks (but M could be between 1 to 5000). One task needs around 10 to 20 seconds to complete, but can be between 1 second and 20 minutes. Currently, mostly the system has one user, sometimes up to three, but it increases to around 10 users at the same time in the near future. Our server has 10 cores, therefore I'd like to use 10 threads. At the moment every user gets 5 threads for processing, which works fine. But a) most of the time the machine is only utilized 50% (which results in needles waiting in the "30-minute" range), sometimes the server load is up to 150%.

我在tomcat服務器上運行了一個rest風格的RPC(遠程過程調用)API,該API處理在K線程上使用M任務的N用戶的數據。大多數用戶有大約20到500個任務(但是M可能在1到5000之間)。一個任務需要大約10到20秒完成,但可以在1秒到20分鍾之間。目前,大多數系統只有一個用戶,有時多達3個,但在不久的將來,它會同時增加到10個用戶。我們的服務器有10個內核,因此我想使用10個線程。目前,每個用戶都有5個線程用於處理,這很正常。但是,大部分時間機器只使用了50%(這導致了在“30分鍾”范圍內等待的針),有時服務器負載高達150%。

Requirements to solution:

需求解決方案:

  1. at all times the server is utilized to 100% (if there are tasks)
  2. 在任何時候,服務器都被使用到100%(如果有任務的話)
  3. that all users are treated the same regarding thread execution (same amount of threads finished as every other user)
  4. 所有用戶對線程執行的處理都是相同的(與其他用戶相同的線程數量)
  5. a new user does not have to wait until all tasks of a earlier user are done (especially in the case where user1 has 5000 tasks and user2 has 1 this is important)
  6. 新用戶不需要等到早期用戶的所有任務完成(特別是在user1有5000個任務,user2有1個任務的情況下)。

Solutions that come to mind:

想到的解決方案:

  1. just use a FixedThreadPoolExecutor with 10 threads, violates condition 3

    只需使用帶有10個線程的FixedThreadPoolExecutor,就違反了條件3。

  2. use the PriorityBlockingQueue and implement the compareTo method in my task -> can not use the threadpoolExecutors submit method (and therefore I do not know when a submitted task is over)

    使用PriorityBlockingQueue並在我的任務中實現compareTo方法——>不能使用threadpoolExecutors提交方法(因此我不知道提交的任務何時結束)。

  3. implement a "round robin" like blocking queue, where the K threads (in our case 10) take new tasks from the N internal queues in a round robin way -> to be able to put a task into the right queue, I need a "submit"-method that takes more than one parameter (I need to implement a ThreadPoolExecutor, too)

    實現一個“循環”像阻塞隊列,K的線程(在我們的例子中10)把新任務從N內部隊列循環賽的方式- >能夠把任務放在正確的隊列,我需要一個“提交”方法需要多個參數(我也需要實現一個ThreadPoolExecutor)

I tried to make an illustration of what I mean by round robin like blocking queue (if not helpful feel free to edit it out):

我試着舉例說明我所說的輪詢調度(如阻塞隊列)的含義(如果沒有幫助的話,可以編輯它):

  --                       --
  --        --        --   --             queue task load, 
  --   --   --   --   --   --        --   one task denoted by --
  --   --   --   --   --   --   --   -- 
| Q1 | Q2 | Q3 | Q4 | Q5 | Q6 | Q7 | QN |
|                      *   ^            |
|                  last|   |next        |
|                           -------------
\                          /
 \    |    |    |    |    |
 | T1 | T2 | T3 | T4 | TK |

Is there an elegant solution to use mostly Java standard APIs (or any other widespread Java API) for achieving this kind of processing behavior (might it be one of my proposed solutions or any another solution)? Or do you have any other hints on how to tackle this issue?

是否有一種優雅的解決方案來使用大多數Java標准API(或任何其他廣泛的Java API)來實現這種處理行為(可能是我提出的解決方案或其他解決方案之一)?或者你有其他的關於如何解決這個問題的提示嗎?

3 个解决方案

#1


0  

Addressing your requirements:

解決您的需求:

1) maximizing thread usage: any ThreadPoolExecutor will take care of this.
2) all users are treated the same: essentially requires a round-robin setup.
3) avoid new users waiting in FIFO order: same as #2.

1)最大化線程使用:任何ThreadPoolExecutor都會處理這個問題。2)所有用戶都得到相同的處理:基本上需要一個循環設置。3)避免新用戶在FIFO訂單中等待:與#2相同。

Also you mentioned the ability to submit and get a result.

您還提到了提交和獲得結果的能力。

You might consider a standalone PriorityBlockingQueue<Job> using a wrapper object, e.g.:

您可以使用一個包裝器對象來考慮一個獨立的PriorityBlockingQueue ,例如:

class Job implements Comparable<Job> {
    private int priority;
    private YourCallable task;

    public Job(int priority, YourCallable task) {
        this.priority = priority;
        this.task = task;
    }

    @Override
    public int compareTo(Job job) {
        // use whatever order you prefer, based on the priority int
    }
}

Your producer offers a Job to the PriorityBlockingQueue with a priority assigned (based on your round-robin rule or whatever), and a task that implements Callable. Your consumer then does queue.poll for a Job.

您的生產者為PriorityBlockingQueue提供了一個任務,優先級分配(基於您的循環規則或其他),以及一個實現可調用的任務。然后您的消費者就會排隊。調查工作。

Once you have that in hand, you can grab the task contained inside that Job object and send it off for processing on a ThreadPoolExecutor of your choosing.

一旦您掌握了這一點,您就可以獲取該作業對象中包含的任務,並將其發送到您選擇的ThreadPoolExecutor上進行處理。

#2


0  

If you agree that minimizing the overall task latency is a good replacement for requirements 2 and 3, and you have good-enough task runtime estimates, then I may have an answer.

如果您同意最小化整個任務延遲是對需求2和3的良好替代,並且您有足夠好的任務運行時估計,那么我可能會有一個答案。

You store the task submit time with each task, so that later you can always compute its estimated latency. You can then build a PriorityBlockingQueue that, when inserting a new task, always inserts it at a queue position that provides some fairness and attempts to minimize overall latency. This will, at first, put long-running tasks at a disadvantage. I have not tried it myself, but I would assign a task priority based on your estimated run time, and use estimatedRuntime-waitingTime as priority (taking the lowest-priotity job first). This will give heavy tasks a chance after they waited enough to have negative priority. Until then, light tasks will have a better chance to be first, even if they have just been submitted. This scheduling will only be so fair as your estimates allow, though.

您將任務提交時間存儲到每個任務中,這樣以后您就可以一直計算其估計的延遲時間。然后,您可以構建一個PriorityBlockingQueue,當插入一個新任務時,總是將其插入到一個隊列位置,該位置提供了一些公平性,並試圖最小化總體延遲。首先,這將使長時間運行的任務處於劣勢。我自己沒有嘗試過,但是我將根據您的估計運行時間分配任務優先級,並使用estimatedRuntime-waitingTime作為優先級(以最低優先級的工作為優先)。這將給繁重的任務一個機會,因為他們等待了足夠多的負面優先級。在那之前,即使他們剛被提交,光任務還是會有更好的機會成為第一。不過,這個調度只會像您的估計允許的那樣公平。

As for the round-robin requirement: If that is really important, you can handle this in the queue as well. Basically, when you use a thread pool, you can implement yur scheduling strategy in terms of where you insert new jobs in the queue. If you can estimate job latency, you can balance that across your users, too, by inserting at the right position.

關於循環需求:如果這真的很重要,那么您也可以在隊列中處理這個問題。基本上,當您使用線程池時,您可以根據您在隊列中插入新工作的位置實現yur調度策略。如果您可以估計作業延遲,您也可以通過插入正確的位置來平衡您的用戶。

#3


0  

I have been working on a solution similar to the round-robin setup. It gets complicated real fast but I think I came up with a decent implementation. It is probably not a very elegant solution but there are unit-tests showing some functions. Unfortunately, TaskQ is not yet at a "1.0" stage.

我一直在研究類似於循環設置的解決方案。它很快就變得復雜了,但我想我想出了一個不錯的實現方法。它可能不是一個非常優雅的解決方案,但是有一些單元測試顯示了一些功能。不幸的是,TaskQ還沒有進入“1.0”階段。

It does cover your points 1 to 3:

它涵蓋了你的點1到3:

  • you can specify the amount of threads to use and if there are enough tasks, all threads are used.
  • 您可以指定要使用的線程數量,如果有足夠的任務,則使用所有線程。
  • each user will get a turn as threads become available
  • 當線程可用時,每個用戶將得到一個轉換。
  • if one user A has 500 tasks in queue and another user B comes along with 1 task, the task from user B will get executed as soon as a thread is available.
  • 如果一個用戶在隊列中有500個任務,而另一個用戶B有一個任務,那么用戶B的任務就會在線程可用時立即執行。

There is no manual/documentation yet, I hope you can find the time to investigate. A unit test showing some usage is here, the main class to extend/use is RunnableTaskQWithQos.

目前還沒有手冊/文件,我希望你能抽出時間去調查。在這里,顯示了一些用法的單元測試,主要用於擴展/使用的是RunnableTaskQWithQos。


注意!

本站翻译的文章,版权归属于本站,未经许可禁止转摘,转摘请注明本文地址:https://www.itdaan.com/blog/2014/12/01/720916946ffd6217f6fb54ae44eb43fd.html



 
粤ICP备14056181号  © 2014-2020 ITdaan.com