TPL dataflow鏈接多個消費者不工作。

[英]TPL Dataflows LinkTo multiple consumers not working


I have a BufferBlock to which I post messages:

我有一個緩沖塊,我可以向它發送消息:

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

This is a 5GB file with 24 million rows.

這是一個5GB的文件,有2400萬行。

I now have a Target block which is using a ActionBlock:

我現在有一個目標塊,它使用的是ActionBlock:

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

Now in a console application, I specify:

現在在控制台應用程序中,我指定:

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

I have kept bounded capacity as 1 for testing only.

我保持有界容量為1,僅用於測試。

and now I link these three consumers to my source using LinkTo:

現在我用LinkTo將這三個消費者鏈接到我的來源:

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

This goes to Thread.Sleep(5000) statement after 10003 rows and the Post in the while loop always returns false.

這將用於Thread.Sleep(5000)語句,在10003行之后,while循環中的Post總是返回false。

I was expecting that as I have LinkTo, so the solaceTargetBlocks when done will be able to pick next messages, but LinkTo is not clearing the BufferBlock. So, then how can I load balance between multiple Consumers. Do I have to Receive and write a simple Load Balancing logic to distribute between Consumers?

我希望,因為我有LinkTo,所以在完成之后,solaceTargetBlocks將能夠選擇下一個消息,但是LinkTo並沒有清除緩沖塊。那么,如何在多個消費者之間實現負載均衡呢?我是否必須接收並編寫一個簡單的負載均衡邏輯來在消費者之間進行分配?

1 个解决方案

#1


8  

From the documentation of the Post method on the DataflowBlock<T> class (emphasis mine):

從DataflowBlock 類的Post方法文檔(強調我的):

This method will return once the target block has decided to accept or decline the item,

一旦目標塊決定接受或拒絕該項,此方法將返回,

This means that the target can choose to decline the block (which is the behavior that you're seeing).

這意味着目標可以選擇減少塊(這是您看到的行為)。

Further on, it states:

進一步,它州:

For target blocks that support postponing offered messages, or for blocks that may do more processing in their Post implementation, consider using SendAsync, which will return immediately and will enable the target to postpone the posted message and later consume it after SendAsync returns.

對於支持延遲提供的消息的目標塊,或者支持在其Post實現中進行更多處理的塊,可以考慮使用SendAsync,它將立即返回,並使目標能夠延遲發布的消息,並在SendAsync返回后使用它。

This means that you may have better results (depending on the target block) in that your message may be postponed, but still processed, as opposed to outright rejected.

這意味着您可能有更好的結果(取決於目標塊),因為您的消息可能被延遲,但仍然被處理,而不是直接被拒絕。

I imagine that the BoundedCapacity property settings on both the BufferBlock<T> and the three ActionBlock<TInput> instances have something to do with what you're seeing:

我想,BufferBlock 和三個ActionBlock 實例的BoundedCapacity屬性設置與您所看到的有關:

  • Your maximum buffer on the BufferBlock<T> is 10000; once you put 10,000 items into the queue, it will reject the rest (see second quote above), as it can't process them (SendAsync won't work here either, as it can't buffer the message to be postponed).

    緩沖塊 的最大緩沖區是10000;一旦您將10,000個項目放入隊列,它將拒絕其余的(請參閱上面的第二個引用),因為它不能處理它們(SendAsync也不能在這里工作,因為它不能緩沖要延遲的消息)。

  • Your maximum buffer on the ActionBlock<TInput> instances is 1, and you have three of them.

    ActionBlock 實例上的最大緩沖區是1,其中有3個實例。

10,000 + (1 * 3) = 10,000 + 3 = 10,003

10000 +(1 * 3)= 10000 + 3 = 10,003。

To get around this, you need to do a few things.

要解決這個問題,你需要做一些事情。

First, you need to set a more reasonable value for MaxDegreeOfParallelism property the ExecutionDataFlowBlockOptions when creating the ActionBlock<TInput> instances.

首先,在創建ActionBlock 實例時,需要為MaxDegreeOfParallelism屬性ExecutionDataFlowBlockOptions設置一個更合理的值。

By default, the MaxDegreeOfParallelism for an ActionBlock<TInput> is set to 1; this guarantees that calls will be serialized and you don't have to worry about thread-safety. If you want the ActionBlock<T> to be concerned about thread-safety, then keep this setting.

默認情況下,ActionBlock 的MaxDegreeOfParallelism設置為1;這保證了調用將被序列化,並且您不必擔心線程安全性。如果您希望ActionBlock 關注線程安全,那么保持這個設置。

If the ActionBlock<TInput> is thread-safe, then you have no reason to throttle it, and you should set MaxDegreeOfParallelism to DataflowBlockOptions.Unbounded.

如果ActionBlock 是線程安全的,那么您沒有理由限制它,您應該將MaxDegreeOfParallelism設置為DataflowBlockOptions.Unbounded。

Chances are if you're accessing some sort of shared resource in the ActionBlock<TInput> that can be accessed concurrently on a limited basis, then you're probably doing the wrong thing.

如果您正在訪問ActionBlock 中的某種可在有限的基礎上並發訪問的共享資源,那么您可能做錯了。

If you have some sort of shared resource, then chances are you should run it through another block and set the MaxDegreeOfParallelism on that.

如果您有某種共享資源,那么您應該通過另一個塊運行它,並在其上設置MaxDegreeOfParallelism。

Second, if you are concerned with throughput and are ok with dropped items, then you should set the BoundedCapacity property.

其次,如果您關心吞吐量,並且對丟棄的項沒有問題,那么應該設置BoundedCapacity屬性。

Also note, you indicate "if consumers are slow, sleep for a while"; there's no reason to do this if you wire up your blocks correctly, you should just let the data flow through and place the restrictions only where you need them. Your producer shouldn't be responsible for throttling the consumer, let the consumer be responsible for the throttling.

還要注意的是,你指出“如果消費者反應慢,那就睡一會兒”;如果您正確地連接您的塊,那么沒有理由這樣做,您應該讓數據流通過並只在需要它們的地方放置限制。你的生產者不應該負責節流消費者,讓消費者負責節流。

Lastly, your code doesn't look like you need to implement the dataflow block interfaces yourself. You could construct it like so:

最后,您的代碼看起來不需要自己實現dataflow塊接口。你可以這樣構造它:

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

Also note that having three ActionBlock<TInput> instances is uncessary, unless you need to filter the output to different actions (which you aren't doing here), so the above really reduces to this (assuming your action is thread-safe, so you're going to increase MaxDegreeOfParallelism to Unbounded anyways):

還要注意,如果有3個ActionBlock 實例是不需要處理的,除非您需要將輸出過濾到不同的操作(您在這里沒有這樣做),那么上面的操作就會減少到這個程度(假設您的操作是線程安全的,那么您將把MaxDegreeOfParallelism增加到無界的任何方式):

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

注意!

本站翻译的文章,版权归属于本站,未经许可禁止转摘,转摘请注明本文地址:https://www.itdaan.com/blog/2012/09/20/720b9ec60945a2cdf582831021ac1b2f.html



 
粤ICP备14056181号  © 2014-2020 ITdaan.com