All files
[pithos-ms-client] / trunk / Libraries / ParallelExtensionsExtras / Partitioners / ChunkPartitioner.cs
diff --git a/trunk/Libraries/ParallelExtensionsExtras/Partitioners/ChunkPartitioner.cs b/trunk/Libraries/ParallelExtensionsExtras/Partitioners/ChunkPartitioner.cs
new file mode 100644 (file)
index 0000000..744f54b
--- /dev/null
@@ -0,0 +1,290 @@
+//--------------------------------------------------------------------------
+// 
+//  Copyright (c) Microsoft Corporation.  All rights reserved. 
+// 
+//  File: ChunkPartitioner.cs
+//
+//--------------------------------------------------------------------------
+
+using System.Collections.Generic;
+using System.Threading;
+
+namespace System.Collections.Concurrent.Partitioners
+{
+    /// <summary>
+    /// Partitions an enumerable into chunks based on user-supplied criteria.
+    /// </summary>
+    public static class ChunkPartitioner
+    {
+        /// <summary>Creates a partitioner that chooses the next chunk size based on a user-supplied function.</summary>
+        /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
+        /// <param name="source">The data being partitioned.</param>
+        /// <param name="nextChunkSizeFunc">A function that determines the next chunk size based on the
+        /// previous chunk size.</param>
+        /// <returns>A partitioner.</returns>
+        public static OrderablePartitioner<TSource> Create<TSource>(
+            IEnumerable<TSource> source, Func<int, int> nextChunkSizeFunc)
+        {
+            return new ChunkPartitioner<TSource>(source, nextChunkSizeFunc);
+        }
+
+        /// <summary>Creates a partitioner that always uses a user-specified chunk size.</summary>
+        /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
+        /// <param name="source">The data being partitioned.</param>
+        /// <param name="chunkSize">The chunk size to be used.</param>
+        /// <returns>A partitioner.</returns>
+        public static OrderablePartitioner<TSource> Create<TSource>(
+            IEnumerable<TSource> source, int chunkSize)
+        {
+            return new ChunkPartitioner<TSource>(source, chunkSize);
+        }
+
+        /// <summary>Creates a partitioner that chooses chunk sizes between the user-specified min and max.</summary>
+        /// <typeparam name="TSource">The type of the data being partitioned.</typeparam>
+        /// <param name="source">The data being partitioned.</param>
+        /// <param name="minChunkSize">The minimum chunk size to use.</param>
+        /// <param name="maxChunkSize">The maximum chunk size to use.</param>
+        /// <returns>A partitioner.</returns>
+        public static OrderablePartitioner<TSource> Create<TSource>(
+            IEnumerable<TSource> source, int minChunkSize, int maxChunkSize)
+        {
+            return new ChunkPartitioner<TSource>(source, minChunkSize, maxChunkSize);
+        }
+    }
+
+    /// <summary>
+    /// Partitions an enumerable into chunks based on user-supplied criteria.
+    /// </summary>
+    internal sealed class ChunkPartitioner<T> : OrderablePartitioner<T>
+    {
+        private readonly IEnumerable<T> _source;
+        private readonly Func<int, int> _nextChunkSizeFunc;
+
+        public ChunkPartitioner(IEnumerable<T> source, Func<int, int> nextChunkSizeFunc)
+            // The keys will be ordered across both individual partitions and across partitions,
+            // and they will be normalized.
+            : base(true, true, true)
+        {
+            // Validate and store the enumerable and function (used to determine how big
+            // to make the next chunk given the current chunk size)
+            if (source == null) throw new ArgumentNullException("source");
+            if (nextChunkSizeFunc == null) throw new ArgumentNullException("nextChunkSizeFunc");
+            _source = source;
+            _nextChunkSizeFunc = nextChunkSizeFunc;
+        }
+
+        public ChunkPartitioner(IEnumerable<T> source, int chunkSize)
+            : this(source, prev => chunkSize) // uses a function that always returns the specified chunk size
+        {
+            if (chunkSize <= 0) throw new ArgumentOutOfRangeException("chunkSize");
+        }
+
+        public ChunkPartitioner(IEnumerable<T> source, int minChunkSize, int maxChunkSize) :
+            this(source, CreateFuncFromMinAndMax(minChunkSize, maxChunkSize)) // uses a function that grows from min to max
+        {
+            if (minChunkSize <= 0 ||
+                minChunkSize > maxChunkSize) throw new ArgumentOutOfRangeException("minChunkSize");
+        }
+
+        private static Func<int, int> CreateFuncFromMinAndMax(int minChunkSize, int maxChunkSize)
+        {
+            // Create a function that returns exponentially growing chunk sizes between minChunkSize and maxChunkSize
+            return delegate(int prev)
+            {
+                if (prev < minChunkSize) return minChunkSize;
+                if (prev >= maxChunkSize) return maxChunkSize;
+                int next = prev * 2;
+                if (next >= maxChunkSize || next < 0) return maxChunkSize;
+                return next;
+            };
+        }
+
+        /// <summary>
+        /// Partitions the underlying collection into the specified number of orderable partitions.
+        /// </summary>
+        /// <param name="partitionCount">The number of partitions to create.</param>
+        /// <returns>An object that can create partitions over the underlying data source.</returns>
+        public override IList<IEnumerator<KeyValuePair<long, T>>> GetOrderablePartitions(int partitionCount)
+        {
+            // Validate parameters
+            if (partitionCount <= 0) throw new ArgumentOutOfRangeException("partitionCount");
+
+            // Create an array of dynamic partitions and return them
+            var partitions = new IEnumerator<KeyValuePair<long, T>>[partitionCount];
+            var dynamicPartitions = GetOrderableDynamicPartitions(true); 
+            for (int i = 0; i < partitionCount; i++)
+            {
+                partitions[i] = dynamicPartitions.GetEnumerator(); // Create and store the next partition
+            }
+            return partitions;
+        }
+
+        /// <summary>Gets whether additional partitions can be created dynamically.</summary>
+        public override bool SupportsDynamicPartitions { get { return true; } }
+
+        /// <summary>
+        /// Creates an object that can partition the underlying collection into a variable number of
+        /// partitions.
+        /// </summary>
+        /// <returns>
+        /// An object that can create partitions over the underlying data source.
+        /// </returns>
+        public override IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions()
+        {
+            return new EnumerableOfEnumerators(this, false);
+        }
+
+        private IEnumerable<KeyValuePair<long, T>> GetOrderableDynamicPartitions(bool referenceCountForDisposal)
+        {
+            return new EnumerableOfEnumerators(this, referenceCountForDisposal);
+        }
+
+        // The object used to dynamically create partitions
+        private class EnumerableOfEnumerators : IEnumerable<KeyValuePair<long, T>>, IDisposable
+        {
+            private readonly ChunkPartitioner<T> _parentPartitioner;
+            private readonly object _sharedLock = new object();
+            private readonly IEnumerator<T> _sharedEnumerator;
+            private long _nextSharedIndex;
+            private int _activeEnumerators;
+            private bool _noMoreElements;
+            private bool _disposed;
+            private bool _referenceCountForDisposal;
+
+            public EnumerableOfEnumerators(ChunkPartitioner<T> parentPartitioner, bool referenceCountForDisposal)
+            {
+                // Validate parameters
+                if (parentPartitioner == null) throw new ArgumentNullException("parentPartitioner");
+
+                // Store the data, including creating an enumerator from the underlying data source
+                _parentPartitioner = parentPartitioner;
+                _sharedEnumerator = parentPartitioner._source.GetEnumerator();
+                _nextSharedIndex = -1;
+                _referenceCountForDisposal = referenceCountForDisposal;
+            }
+
+            IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
+            public IEnumerator<KeyValuePair<long, T>> GetEnumerator()
+            {
+                if (_referenceCountForDisposal)
+                {
+                    Interlocked.Increment(ref _activeEnumerators);
+                }
+                return new Enumerator(this);
+            }
+
+            private void DisposeEnumerator(Enumerator enumerator)
+            {
+                if (_referenceCountForDisposal)
+                {
+                    if (Interlocked.Decrement(ref _activeEnumerators) == 0)
+                    {
+                        _sharedEnumerator.Dispose();
+                    }
+                }
+            }
+
+            private class Enumerator : IEnumerator<KeyValuePair<long, T>>
+            {
+                private EnumerableOfEnumerators _parentEnumerable;
+                private List<KeyValuePair<long, T>> _currentChunk = new List<KeyValuePair<long, T>>();
+                private int _currentChunkCurrentIndex;
+                private int _lastRequestedChunkSize;
+                private bool _disposed;
+
+                public Enumerator(EnumerableOfEnumerators parentEnumerable)
+                {
+                    if (parentEnumerable == null) throw new ArgumentNullException("parentEnumerable");
+                    _parentEnumerable = parentEnumerable;
+                }
+
+                public bool MoveNext()
+                {
+                    if (_disposed) throw new ObjectDisposedException(GetType().Name);
+
+                    // Move to the next cached element. If we already retrieved a chunk and if there's still
+                    // data left in it, just use the next item from it.
+                    ++_currentChunkCurrentIndex;
+                    if (_currentChunkCurrentIndex >= 0 &&
+                        _currentChunkCurrentIndex < _currentChunk.Count) return true;
+
+                    // First, figure out how much new data we want. The previous requested chunk size is used
+                    // as input to figure out how much data the user now wants.  The initial chunk size
+                    // supplied is 0 so that the user delegate is made aware that this is the initial request
+                    // such that it can select the initial chunk size on first request.
+                    int nextChunkSize = _parentEnumerable._parentPartitioner._nextChunkSizeFunc(_lastRequestedChunkSize);
+                    if (nextChunkSize <= 0) throw new InvalidOperationException(
+                        "Invalid chunk size requested: chunk sizes must be positive.");
+                    _lastRequestedChunkSize = nextChunkSize;
+
+                    // Reset the list
+                    _currentChunk.Clear();
+                    _currentChunkCurrentIndex = 0;
+                    if (nextChunkSize > _currentChunk.Capacity) _currentChunk.Capacity = nextChunkSize;
+
+                    // Try to grab the next chunk of data
+                    lock (_parentEnumerable._sharedEnumerator)
+                    {
+                        // If we've already discovered that no more elements exist (and we've gotten this
+                        // far, which means we don't have any elements cached), we're done.
+                        if (_parentEnumerable._noMoreElements) return false;
+
+                        // Get another chunk
+                        for (int i = 0; i < nextChunkSize; i++)
+                        {
+                            // If there are no more elements to be retrieved from the shared enumerator, mark
+                            // that so that other partitions don't have to check again. Return whether we
+                            // were able to retrieve any data at all.
+                            if (!_parentEnumerable._sharedEnumerator.MoveNext())
+                            {
+                                _parentEnumerable._noMoreElements = true;
+                                return _currentChunk.Count > 0;
+                            }
+
+                            ++_parentEnumerable._nextSharedIndex;
+                            _currentChunk.Add(new KeyValuePair<long, T>(
+                                _parentEnumerable._nextSharedIndex,
+                                _parentEnumerable._sharedEnumerator.Current));
+                        }
+                    }
+
+                    // We got at least some data
+                    return true;
+                }
+
+                public KeyValuePair<long, T> Current
+                {
+                    get
+                    {
+                        if (_currentChunkCurrentIndex >= _currentChunk.Count)
+                        {
+                            throw new InvalidOperationException("There is no current item.");
+                        }
+                        return _currentChunk[_currentChunkCurrentIndex];
+                    }
+                }
+
+                public void Dispose()
+                {
+                    if (!_disposed)
+                    {
+                        _parentEnumerable.DisposeEnumerator(this);
+                        _disposed = true;
+                    }
+                }
+
+                object IEnumerator.Current { get { return Current; } }
+                public void Reset() { throw new NotSupportedException(); }
+            }
+
+            public void Dispose()
+            {
+                if (!_disposed)
+                {
+                    if (!_referenceCountForDisposal) _sharedEnumerator.Dispose();
+                    _disposed = true;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file