//Download progress is reported to the Trace log
Log.InfoFormat("[GET] START {0}", objectName);
- client.DownloadProgressChanged += (sender, args) =>
+ /*client.DownloadProgressChanged += (sender, args) =>
Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
fileName, args.ProgressPercentage,
args.BytesReceived,
- args.TotalBytesToReceive);
-
+ args.TotalBytesToReceive);*/
+ var progress = new Progress<DownloadProgressChangedEventArgs>(args =>
+ {
+ Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+ fileName, args.ProgressPercentage,
+ args.BytesReceived,
+ args.TotalBytesToReceive);
+ DownloadProgressChanged(this, args);
+ });
//Start downloading the object asynchronously
- await client.DownloadFileTaskAsync(uri, fileName, cancellationToken).ConfigureAwait(false);
+ await client.DownloadFileTaskAsync(uri, fileName, cancellationToken,progress).ConfigureAwait(false);
//Once the download completes
//Delete the local client object
var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
var uri = builder.Uri;
- client.DownloadProgressChanged += (sender, args) =>
+/* client.DownloadProgressChanged += (sender, args) =>
{
Log.DebugFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
uri.Segments.Last(), args.ProgressPercentage,
args.BytesReceived,
args.TotalBytesToReceive);
DownloadProgressChanged(sender, args);
- };
+ };*/
+ var progress = new Progress<DownloadProgressChangedEventArgs>(args =>
+ {
+ Log.DebugFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+ uri.Segments.Last(), args.ProgressPercentage,
+ args.BytesReceived,
+ args.TotalBytesToReceive);
+ DownloadProgressChanged(this, args);
+ });
- var result = await client.DownloadDataTaskAsync(uri, cancellationToken).ConfigureAwait(false);
+ var result = await client.DownloadDataTaskAsync(uri, cancellationToken,progress).ConfigureAwait(false);
return result;
}
}
Log.InfoFormat("[BLOCK POST] START");
+/*
client.UploadProgressChanged += (sender, args) =>
{
Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
args.TotalBytesToSend);
UploadProgressChanged(sender, args);
};
+*/
client.UploadFileCompleted += (sender, args) =>
Log.InfoFormat("[BLOCK POST PROGRESS] Completed ");
+ var progress=new Progress<UploadProgressChangedEventArgs>(args=>
+ {
+ Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
+ args.ProgressPercentage, args.BytesSent,
+ args.TotalBytesToSend);
+ UploadProgressChanged(this, args);
+ });
var buffer = new byte[count];
Buffer.BlockCopy(block, offset, buffer, 0, count);
//Send the block
- using (var ctr = token.Register(client.CancelAsync))
- {
- await client.UploadDataTaskAsync(uri, "POST", buffer).ConfigureAwait(false);
- }
+ await client.UploadDataTaskAsync(uri, "POST", buffer,token,progress).ConfigureAwait(false);
Log.InfoFormat("[BLOCK POST] END");
}
}
using System;
using System.Collections.Generic;
+using System.ComponentModel;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Text;
using System.Net;
using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
using log4net;
namespace Pithos.Network
}
}
}
-
- public static void LogError(this ILog log,HttpWebResponse response)
+
+ public static void LogError(this ILog log, HttpWebResponse response)
{
if (log.IsDebugEnabled)
{
if (response != null)
{
var body = response.ReadToEnd();
- log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers,body);
+ log.ErrorFormat("Headers:\n{0}\nBody:{1}", response.Headers, body);
}
}
}
var reader = new StreamReader(stream);
if (!log.IsDebugEnabled)
return reader;
-
+
using (reader)
{
var body = reader.ReadToEnd();
}
- public static IEnumerable<T> Range<T>(this IList<T> list, int start,int end)
+ public static IEnumerable<T> Range<T>(this IList<T> list, int start, int end)
{
- Contract.Requires(start>=0);
- Contract.Requires(end<list.Count);
- Contract.Requires(start<=end);
+ Contract.Requires(start >= 0);
+ Contract.Requires(end < list.Count);
+ Contract.Requires(start <= end);
if (list == null)
yield break;
-
+
for (var i = 0; i <= end; i++)
{
yield return list[i];
}
-
+
}
+ public static Task<byte[]> UploadDataTaskAsync(this WebClient webClient, Uri address, string method, byte[] data, CancellationToken cancellationToken, IProgress<UploadProgressChangedEventArgs> progress)
+ {
+ var tcs = new TaskCompletionSource<byte[]>(address);
+ if (cancellationToken.IsCancellationRequested)
+ {
+ tcs.TrySetCanceled();
+ }
+ else
+ {
+ CancellationTokenRegistration ctr = cancellationToken.Register(webClient.CancelAsync);
+ UploadDataCompletedEventHandler completedHandler = null;
+ UploadProgressChangedEventHandler progressHandler = null;
+ if (progress != null)
+ progressHandler = (s, e) => PithosEAPCommon.HandleProgress(tcs, e, () => e, progress);
+ completedHandler =(sender, e) =>PithosEAPCommon.HandleCompletion(tcs, true, e,() => e.Result,() =>
+ {
+ ctr.Dispose();
+ webClient.UploadDataCompleted -= completedHandler;
+ webClient.UploadProgressChanged -= progressHandler;
+ });
+ webClient.UploadDataCompleted += completedHandler;
+ webClient.UploadProgressChanged += progressHandler;
+ try
+ {
+ webClient.UploadDataAsync(address, method, data, tcs);
+ if (cancellationToken.IsCancellationRequested)
+ webClient.CancelAsync();
+ }
+ catch
+ {
+ webClient.UploadDataCompleted -= completedHandler;
+ webClient.UploadProgressChanged -= progressHandler;
+ throw;
+ }
+ }
+ return tcs.Task;
+ }
+ }
+ internal static class PithosEAPCommon
+ {
+ internal static void HandleProgress<T, E>(TaskCompletionSource<T> tcs, ProgressChangedEventArgs eventArgs, Func<E> getProgress, IProgress<E> callback)
+ {
+ if (eventArgs.UserState != tcs)
+ return;
+ callback.Report(getProgress());
+ }
+ internal static void HandleCompletion<T>(TaskCompletionSource<T> tcs, bool requireMatch, AsyncCompletedEventArgs e, Func<T> getResult, Action unregisterHandler)
+ {
+ if (requireMatch)
+ {
+ if (e.UserState != tcs)
+ return;
+ }
+ try
+ {
+ unregisterHandler();
+ }
+ finally
+ {
+ if (e.Cancelled)
+ tcs.TrySetCanceled();
+ else if (e.Error != null)
+ tcs.TrySetException(e.Error);
+ else
+ tcs.TrySetResult(getResult());
+ }
+ }
}
+
}