Modified download methods to use async download overrides with progress
authorpkanavos <pkanavos@gmail.com>
Wed, 4 Jul 2012 14:17:19 +0000 (17:17 +0300)
committerpkanavos <pkanavos@gmail.com>
Wed, 4 Jul 2012 14:17:19 +0000 (17:17 +0300)
Created WebClient.UploadDataTaskAsync extension method with cancellation and progress

trunk/Pithos.Network/CloudFilesClient.cs
trunk/Pithos.Network/WebExtensions.cs

index 8f79dd4..4bc2a7c 100644 (file)
@@ -1062,15 +1062,22 @@ namespace Pithos.Network
 
                     //Download progress is reported to the Trace log
                     Log.InfoFormat("[GET] START {0}", objectName);
-                    client.DownloadProgressChanged += (sender, args) =>
+                    /*client.DownloadProgressChanged += (sender, args) =>
                                                       Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
                                                                      fileName, args.ProgressPercentage,
                                                                      args.BytesReceived,
-                                                                     args.TotalBytesToReceive);
-
+                                                                     args.TotalBytesToReceive);*/
+                    var progress = new Progress<DownloadProgressChangedEventArgs>(args =>
+                                {
+                                    Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+                                                   fileName, args.ProgressPercentage,
+                                                   args.BytesReceived,
+                                                   args.TotalBytesToReceive);
+                                    DownloadProgressChanged(this, args);
+                                });
                     
                     //Start downloading the object asynchronously
-                    await client.DownloadFileTaskAsync(uri, fileName, cancellationToken).ConfigureAwait(false);
+                    await client.DownloadFileTaskAsync(uri, fileName, cancellationToken,progress).ConfigureAwait(false);
 
                     //Once the download completes
                     //Delete the local client object
@@ -1200,17 +1207,25 @@ namespace Pithos.Network
                 var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
                 var uri = builder.Uri;
 
-                client.DownloadProgressChanged += (sender, args) =>
+/*                client.DownloadProgressChanged += (sender, args) =>
                                                       {
                                                           Log.DebugFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
                                                                           uri.Segments.Last(), args.ProgressPercentage,
                                                                           args.BytesReceived,
                                                                           args.TotalBytesToReceive);
                                                           DownloadProgressChanged(sender, args);
-                                                      };
+                                                      };*/
+                var progress = new Progress<DownloadProgressChangedEventArgs>(args =>
+                {
+                    Log.DebugFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+                                    uri.Segments.Last(), args.ProgressPercentage,
+                                    args.BytesReceived,
+                                    args.TotalBytesToReceive);
+                    DownloadProgressChanged(this, args);
+                });
 
 
-                var result = await client.DownloadDataTaskAsync(uri, cancellationToken).ConfigureAwait(false);
+                var result = await client.DownloadDataTaskAsync(uri, cancellationToken,progress).ConfigureAwait(false);
                 return result;
             }
         }
@@ -1255,6 +1270,7 @@ namespace Pithos.Network
 
                     Log.InfoFormat("[BLOCK POST] START");
 
+/*
                     client.UploadProgressChanged += (sender, args) =>
                                                         {
                                                             Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
@@ -1262,16 +1278,21 @@ namespace Pithos.Network
                                                                            args.TotalBytesToSend);
                                                             UploadProgressChanged(sender, args);
                                                         };
+*/
                     client.UploadFileCompleted += (sender, args) =>
                                                   Log.InfoFormat("[BLOCK POST PROGRESS] Completed ");
 
+                    var progress=new Progress<UploadProgressChangedEventArgs>(args=>
+                    {
+                        Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
+                                        args.ProgressPercentage, args.BytesSent,
+                                        args.TotalBytesToSend);
+                        UploadProgressChanged(this, args);                                                                                      
+                    });
                     var buffer = new byte[count];
                     Buffer.BlockCopy(block, offset, buffer, 0, count);
                     //Send the block
-                    using (var ctr = token.Register(client.CancelAsync))
-                    {
-                        await client.UploadDataTaskAsync(uri, "POST", buffer).ConfigureAwait(false);
-                    }
+                    await client.UploadDataTaskAsync(uri, "POST", buffer,token,progress).ConfigureAwait(false);
                     Log.InfoFormat("[BLOCK POST] END");
                 }
             }
index 84afde8..5aae235 100644 (file)
@@ -1,10 +1,13 @@
 using System;
 using System.Collections.Generic;
+using System.ComponentModel;
 using System.Diagnostics.Contracts;
 using System.Linq;
 using System.Text;
 using System.Net;
 using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
 using log4net;
 
 namespace Pithos.Network
@@ -25,15 +28,15 @@ namespace Pithos.Network
                 }
             }
         }
-    
-        public static void LogError(this ILog log,HttpWebResponse response)
+
+        public static void LogError(this ILog log, HttpWebResponse response)
         {
             if (log.IsDebugEnabled)
             {
                 if (response != null)
                 {
                     var body = response.ReadToEnd();
-                    log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers,body);
+                    log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);
                 }
             }
         }
@@ -43,7 +46,7 @@ namespace Pithos.Network
             var reader = new StreamReader(stream);
             if (!log.IsDebugEnabled)
                 return reader;
-            
+
             using (reader)
             {
                 var body = reader.ReadToEnd();
@@ -61,24 +64,92 @@ namespace Pithos.Network
 
         }
 
-        public static IEnumerable<T> Range<T>(this IList<T> list, int start,int end)
+        public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)
         {
-            Contract.Requires(start>=0);
-            Contract.Requires(end<list.Count);
-            Contract.Requires(start<=end);
+            Contract.Requires(start >= 0);
+            Contract.Requires(end < list.Count);
+            Contract.Requires(start <= end);
 
             if (list == null)
                 yield break;
-            
+
             for (var i = 0; i <= end; i++)
             {
                 yield return list[i];
             }
-            
+
         }
 
+        public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)
+        {
+            var tcs = new TaskCompletionSource<byte[]>(address);
+            if (cancellationToken.IsCancellationRequested)
+            {
+                tcs.TrySetCanceled();
+            }
+            else
+            {
+                CancellationTokenRegistration ctr = cancellationToken.Register(webClient.CancelAsync);
+                UploadDataCompletedEventHandler completedHandler = null;
+                UploadProgressChangedEventHandler progressHandler = null;
+                if (progress != null)
+                    progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);
+                completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>
+                { 
+                    ctr.Dispose();
+                    webClient.UploadDataCompleted -= completedHandler;
+                    webClient.UploadProgressChanged -= progressHandler;
+                });
+                webClient.UploadDataCompleted += completedHandler;
+                webClient.UploadProgressChanged += progressHandler;
+                try
+                {
+                    webClient.UploadDataAsync(address, method, data, tcs);
+                    if (cancellationToken.IsCancellationRequested)
+                        webClient.CancelAsync();
+                }
+                catch
+                {
+                    webClient.UploadDataCompleted -= completedHandler;
+                    webClient.UploadProgressChanged -= progressHandler;
+                    throw;
+                }
+            }
+            return tcs.Task;
+        }
 
+    }
 
+    internal static class PithosEAPCommon
+    {
+        internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)
+        {
+            if (eventArgs.UserState != tcs)
+                return;
+            callback.Report(getProgress());
+        }
 
+        internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)
+        {
+            if (requireMatch)
+            {
+                if (e.UserState != tcs)
+                    return;
+            }
+            try
+            {
+                unregisterHandler();
+            }
+            finally
+            {
+                if (e.Cancelled)
+                    tcs.TrySetCanceled();
+                else if (e.Error != null)
+                    tcs.TrySetException(e.Error);
+                else
+                    tcs.TrySetResult(getResult());
+            }
+        }
     }
+
 }