All files
[pithos-ms-client] / trunk / Libraries / ParallelExtensionsExtras / ParallelAlgorithms / ParallelAlgorithms_WhileNotEmpty.cs
1 //--------------------------------------------------------------------------
2 // 
3 //  Copyright (c) Microsoft Corporation.  All rights reserved. 
4 // 
5 //  File: ParallelAlgorithms_WhileNotEmpty.cs
6 //
7 //--------------------------------------------------------------------------
8
9 using System.Collections.Concurrent;
10 using System.Collections.Generic;
11 using System.Threading.Tasks;
12
13 namespace System.Threading.Algorithms
14 {
15     public static partial class ParallelAlgorithms
16     {
17         /// <summary>Processes data in parallel, allowing the processing function to add more data to be processed.</summary>
18         /// <typeparam name="T">Specifies the type of data being processed.</typeparam>
19         /// <param name="initialValues">The initial set of data to be processed.</param>
20         /// <param name="body">The operation to execute for each value.</param>
21         public static void WhileNotEmpty<T>(IEnumerable<T> initialValues, Action<T, Action<T>> body)
22         {
23             WhileNotEmpty(s_defaultParallelOptions, initialValues, body);
24         }
25
26         /// <summary>Processes data in parallel, allowing the processing function to add more data to be processed.</summary>
27         /// <typeparam name="T">Specifies the type of data being processed.</typeparam>
28         /// <param name="parallelOptions">A ParallelOptions instance that configures the behavior of this operation.</param>
29         /// <param name="initialValues">The initial set of data to be processed.</param>
30         /// <param name="body">The operation to execute for each value.</param>
31         public static void WhileNotEmpty<T>(
32             ParallelOptions parallelOptions,
33             IEnumerable<T> initialValues,
34             Action<T, Action<T>> body)
35         {
36             // Validate arguments
37             if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
38             if (initialValues == null) throw new ArgumentNullException("initialValues");
39             if (body == null) throw new ArgumentNullException("body");
40
41             // Create two lists to alternate between as source and destination.
42             var lists = new[] { new ConcurrentStack<T>(initialValues), new ConcurrentStack<T>() };
43
44             // Iterate until no more items to be processed
45             for (int i = 0; ; i++)
46             {
47                 // Determine which list is the source and which is the destination
48                 int fromIndex = i % 2;
49                 var from = lists[fromIndex];
50                 var to = lists[fromIndex ^ 1];
51
52                 // If the source is empty, we're done
53                 if (from.IsEmpty) break;
54
55                 // Otherwise, process all source items in parallel, adding any new items into the destination
56                 Action<T> adder = newItem => to.Push(newItem);
57                 Parallel.ForEach(from, parallelOptions, e => body(e, adder));
58
59                 // Clear out the source as it's now been fully processed
60                 from.Clear();
61             }
62         }
63     }
64 }