我有一个基于TPL Dataflow的应用程序,仅使用批处理块,然后使用操作块,即可正常工作.
I have a TPL Dataflow based application, that worked fine using only a batch block, then an action block.
我添加了一个TransformBlock,尝试在发布到批处理块之前对源中的数据进行转换,但是我的操作块从未被击中.没有抛出任何错误或异常.
I've added in a TransformBlock to try and trasnform the data from the source before posting to the batch block, but my action block is never getting hit. There are no errors or exceptions being thrown.
我不确定是否需要完成变换块,因为它似乎只被击中了一次.
I am unsure if I need to complete my transform block, as it only seems to be being hit once.
除了返回输出类型的对象之外,是否还需要添加一个步骤到我的转换代码中?
Is there a step that I need to add to my transform code other than returning an object of the output type?
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { public const int BATCH_SIZE = 10; static void Main(string[] args) { Console.WriteLine("Application started"); //Create the pipeline of actions var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); var batchBlock = new BatchBlock<string>(BATCH_SIZE); var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); Console.WriteLine("Blocks created"); //link the actions transformBlock.LinkTo(batchBlock); batchBlock.LinkTo(uploadFilesToAzureBlock); batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete()); Console.WriteLine("Blocks linked"); var testInputs = new List<string> { "Kyle", "Stephen", "Jon", "Conor", "Adrian", "Marty", "Richard", "Norbert", "Kerri", "Mark", "Declan", "Ray", "Paul", "Andrew", "Rachel", "David", "Darrell" }; Console.WriteLine("Data created"); var i = 0; foreach (var name in testInputs) { Console.WriteLine("Posting name {0}", i); transformBlock.Post(name); i++; } batchBlock.Complete(); uploadFilesToAzureBlock.Completion.Wait(); Console.WriteLine("Finishing"); Console.ReadKey(); } private static void OutputStrings(IEnumerable<string> strings) { Console.WriteLine("Beginning Batch..."); foreach (var s in strings) { Console.WriteLine(s); } Console.WriteLine("Completing Batch..."); } private static string TransformString(string input) { return input += " has been processed"; } } } 推荐答案正如上面"usr"所提到的,我没有传播这些块的完成.以下代码可以正常工作.
As mentioned by "usr" above, I had not propagated the completion of the blocks. The following code works perfectly.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { public const int BATCH_SIZE = 10; static void Main(string[] args) { Console.WriteLine("Application started"); //Create the pipeline of actions var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); var batchBlock = new BatchBlock<string>(BATCH_SIZE); var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); Console.WriteLine("Blocks created"); //link the actions transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true }); batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true }); batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete()); Console.WriteLine("Blocks linked"); var testInputs = new List<string> { "Kyle", "Stephen", "Jon", "Conor", "Adrian", "Marty", "Richard", "Norbert", "Kerri", "Mark", "Declan", "Ray", "Paul", "Andrew", "Rachel", "David", "Darrell" }; Console.WriteLine("Data created"); var i = 0; foreach (var name in testInputs) { Console.WriteLine("Posting name {0}", i); transformBlock.Post(name); i++; } transformBlock.Complete(); outputStringsBlock.Completion.Wait(); Console.WriteLine("Finishing"); Console.ReadKey(); } private static void OutputStrings(IEnumerable<string> strings) { Console.WriteLine("Beginning Batch..."); Console.WriteLine(""); foreach (var s in strings) { Console.WriteLine(s); } Console.WriteLine(""); Console.WriteLine("Completing Batch..."); } private static string TransformString(string input) { return input += " has been processed"; } } }更多推荐
TPL数据流转换块先发布到批处理块,然后再执行操作块
发布评论