X-Git-Url: https://code.grnet.gr/git/pithos-ms-client/blobdiff_plain/2b753c231bb911d42040013e144c17e264e75475..d78cbf094dc59fc605a766b8b2c1f45af67b135e:/trunk/Libraries/ParallelExtensionsExtras/ParallelAlgorithms/ParallelAlgorithms_Scan.cs diff --git a/trunk/Libraries/ParallelExtensionsExtras/ParallelAlgorithms/ParallelAlgorithms_Scan.cs b/trunk/Libraries/ParallelExtensionsExtras/ParallelAlgorithms/ParallelAlgorithms_Scan.cs new file mode 100644 index 0000000..caf908d --- /dev/null +++ b/trunk/Libraries/ParallelExtensionsExtras/ParallelAlgorithms/ParallelAlgorithms_Scan.cs @@ -0,0 +1,219 @@ +//-------------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +// File: ParallelAlgorithms_Scan.cs +// +//-------------------------------------------------------------------------- + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace System.Threading.Algorithms +{ + public static partial class ParallelAlgorithms + { + /// Computes a parallel prefix scan over the source enumerable using the specified function. + /// The type of the data in the source. + /// The source data over which a prefix scan should be computed. + /// The function to use for the scan. + /// The results of the scan operation. + /// + /// For very small functions, such as additions, an implementation targeted + /// at the relevant type and operation will perform significantly better than + /// this generalized implementation. + /// + public static T[] Scan(IEnumerable source, Func function) + { + return Scan(source, function, loadBalance: false); + } + + /// Computes a parallel prefix scan over the source enumerable using the specified function. + /// The type of the data in the source. + /// The source data over which a prefix scan should be computed. + /// The function to use for the scan. + /// Whether to load-balance during process. + /// The results of the scan operation. + /// + /// For very small functions, such as additions, an implementation targeted + /// at the relevant type and operation will perform significantly better than + /// this generalized implementation. + /// + public static T[] Scan(IEnumerable source, Func function, bool loadBalance) + { + // Validate arguments + if (source == null) throw new ArgumentNullException("source"); + + // Create output copy + var output = source.ToArray(); + + // Do the prefix scan in-place on the copy and return the results + ScanInPlace(output, function, loadBalance); + return output; + } + + /// Computes a parallel prefix scan in-place on an array using the specified function. + /// The type of the data in the source. + /// The data over which a prefix scan should be computed. Upon exit, stores the results. + /// The function to use for the scan. + /// The results of the scan operation. + /// + /// For very small functions, such as additions, an implementation targeted + /// at the relevant type and operation will perform significantly better than + /// this generalized implementation. + /// + public static void ScanInPlace(T[] data, Func function) + { + ScanInPlace(data, function, loadBalance:false); + } + + /// Computes a parallel prefix scan in-place on an array using the specified function. + /// The type of the data in the source. + /// The data over which a prefix scan should be computed. Upon exit, stores the results. + /// The function to use for the scan. + /// Whether to load-balance during process. + /// The results of the scan operation. + /// + /// For very small functions, such as additions, an implementation targeted + /// at the relevant type and operation will perform significantly better than + /// this generalized implementation. + /// + public static void ScanInPlace(T [] data, Func function, bool loadBalance) + { + // Validate arguments + if (data == null) throw new ArgumentNullException("data"); + if (function == null) throw new ArgumentNullException("function"); + + // Do the prefix scan in-place and return the results. This implementation + // of parallel prefix scan ends up executing the function twice as many + // times as the sequential implementation. Thus, only if we have more than 2 cores + // will the parallel version have a chance of running faster than sequential. + if (Environment.ProcessorCount <= 2) + { + InclusiveScanInPlaceSerial(data, function, 0, data.Length, 1); + } + else if (loadBalance) + { + InclusiveScanInPlaceWithLoadBalancingParallel(data, function, 0, data.Length, 1); + } + else // parallel, non-loadbalance + { + InclusiveScanInPlaceParallel(data, function); + } + } + + /// Computes a sequential prefix scan over the array using the specified function. + /// The type of the data in the array. + /// The data, which will be overwritten with the computed prefix scan. + /// The function to use for the scan. + /// The start of the data in arr over which the scan is being computed. + /// The length of the data in arr over which the scan is being computed. + /// The inclusive distance between elements over which the scan is being computed. + /// No parameter validation is performed. + private static void InclusiveScanInPlaceSerial(T[] arr, Func function, int arrStart, int arrLength, int skip) + { + for (int i = arrStart; i + skip < arrLength; i += skip) + { + arr[i + skip] = function(arr[i], arr[i + skip]); + } + } + + /// Computes a sequential exclusive prefix scan over the array using the specified function. + /// The data, which will be overwritten with the computed prefix scan. + /// The function to use for the scan. + /// The inclusive lower bound of the array at which to start the scan. + /// The exclusive upper bound of the array at which to end the scan. + public static void ExclusiveScanInPlaceSerial(T[] arr, Func function, int lowerBoundInclusive, int upperBoundExclusive) + { + T total = arr[lowerBoundInclusive]; + arr[lowerBoundInclusive] = default(T); + for (int i = lowerBoundInclusive + 1; i < upperBoundExclusive; i++) + { + T prevTotal = total; + total = function(total, arr[i]); + arr[i] = prevTotal; + } + } + + /// Computes a parallel prefix scan over the array using the specified function. + /// The type of the data in the array. + /// The data, which will be overwritten with the computed prefix scan. + /// The function to use for the scan. + /// The start of the data in arr over which the scan is being computed. + /// The length of the data in arr over which the scan is being computed. + /// The inclusive distance between elements over which the scan is being computed. + /// No parameter validation is performed. + private static void InclusiveScanInPlaceWithLoadBalancingParallel(T[] arr, Func function, + int arrStart, int arrLength, int skip) + { + // If the length is 0 or 1, just return a copy of the original array. + if (arrLength <= 1) return; + int halfInputLength = arrLength / 2; + + // Pairwise combine. Use static partitioning, as the function + // is likely to be very small. + Parallel.For(0, halfInputLength, i => + { + int loc = arrStart + (i * 2 * skip); + arr[loc + skip] = function(arr[loc], arr[loc + skip]); + }); + + // Recursively prefix scan the pairwise computations. + InclusiveScanInPlaceWithLoadBalancingParallel(arr, function, arrStart + skip, halfInputLength, skip * 2); + + // Generate output. As before, use static partitioning. + Parallel.For(0, (arrLength % 2) == 0 ? halfInputLength - 1 : halfInputLength, i => + { + int loc = arrStart + (i * 2 * skip) + skip; + arr[loc + skip] = function(arr[loc], arr[loc + skip]); + }); + } + + /// Computes a parallel inclusive prefix scan over the array using the specified function. + public static void InclusiveScanInPlaceParallel(T[] arr, Func function) + { + int procCount = Environment.ProcessorCount; + T[] intermediatePartials = new T[procCount]; + using (var phaseBarrier = new Barrier(procCount, + _ => ExclusiveScanInPlaceSerial(intermediatePartials, function, 0, intermediatePartials.Length))) + { + // Compute the size of each range + int rangeSize = arr.Length / procCount; + int nextRangeStart = 0; + + // Create, store, and wait on all of the tasks + var tasks = new Task[procCount]; + for (int i = 0; i < procCount; i++, nextRangeStart += rangeSize) + { + // Get the range for each task, then start it + int rangeNum = i; + int lowerRangeInclusive = nextRangeStart; + int upperRangeExclusive = i < procCount - 1 ? nextRangeStart + rangeSize : arr.Length; + tasks[rangeNum] = Task.Factory.StartNew(() => + { + // Phase 1: Prefix scan assigned range, and copy upper bound to intermediate partials + InclusiveScanInPlaceSerial(arr, function, lowerRangeInclusive, upperRangeExclusive, 1); + intermediatePartials[rangeNum] = arr[upperRangeExclusive - 1]; + + // Phase 2: One thread only should prefix scan the intermediaries... done implicitly by the barrier + phaseBarrier.SignalAndWait(); + + // Phase 3: Incorporate partials + if (rangeNum != 0) + { + for (int j = lowerRangeInclusive; j < upperRangeExclusive; j++) + { + arr[j] = function(intermediatePartials[rangeNum], arr[j]); + } + } + }); + } + + // Wait for all of the tasks to complete + Task.WaitAll(tasks); + } + } + } +} \ No newline at end of file