-// **CloudFilesClient** provides a simple client interface to CloudFiles and Pithos
+#region
+/* -----------------------------------------------------------------------
+ * <copyright file="CloudFilesClient.cs" company="GRNet">
+ *
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ * </copyright>
+ * -----------------------------------------------------------------------
+ */
+#endregion
+
+// **CloudFilesClient** provides a simple client interface to CloudFiles and Pithos
//
// The class provides methods to upload/download files, delete files, manage containers
using System;
using System.Collections.Generic;
+using System.Collections.Specialized;
using System.ComponentModel.Composition;
+using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
using System.Net;
+using System.Reflection;
using System.Security.Cryptography;
using System.Text;
+using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Pithos.Interfaces;
[Export(typeof(ICloudClient))]
public class CloudFilesClient:ICloudClient
{
+ private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
+
//CloudFilesClient uses *_baseClient* internally to communicate with the server
//RestClient provides a REST-friendly interface over the standard WebClient.
private RestClient _baseClient;
public Uri StorageUrl { get; set; }
- protected Uri RootAddressUri { get; set; }
+ public Uri RootAddressUri { get; set; }
- private Uri _proxy;
+ /* private WebProxy _proxy;
+ public WebProxy Proxy
+ {
+ get { return _proxy; }
+ set
+ {
+ _proxy = value;
+ if (_baseClient != null)
+ _baseClient.Proxy = value;
+ }
+ }
+*/
+
+ /* private Uri _proxy;
public Uri Proxy
{
get { return _proxy; }
if (_baseClient != null)
_baseClient.Proxy = new WebProxy(value);
}
- }
+ }*/
public double DownloadPercentLimit { get; set; }
public double UploadPercentLimit { get; set; }
public bool UsePithos { get; set; }
- private static readonly ILog Log = LogManager.GetLogger("CloudFilesClient");
public CloudFilesClient(string userName, string apiKey)
{
Contract.Ensures(StorageUrl != null);
Contract.Ensures(_baseClient != null);
Contract.Ensures(RootAddressUri != null);
- Contract.EndContractBlock();
+ Contract.EndContractBlock();
_baseClient = new RestClient
{
BaseAddress = accountInfo.StorageUri.ToString(),
- Timeout = 10000,
- Retries = 3
+ Timeout = 30000,
+ Retries = 3,
};
StorageUrl = accountInfo.StorageUri;
Token = accountInfo.Token;
Log.InfoFormat("[AUTHENTICATE] Start for {0}", UserName);
+ var groups = new List<Group>();
+
using (var authClient = new RestClient{BaseAddress=AuthenticationUrl})
- {
- if (Proxy != null)
- authClient.Proxy = new WebProxy(Proxy);
+ {
+ /* if (Proxy != null)
+ authClient.Proxy = Proxy;*/
Contract.Assume(authClient.Headers!=null);
{
BaseAddress = storageUrl,
Timeout = 10000,
- Retries = 3
+ Retries = 3,
+ //Proxy=Proxy
};
StorageUrl = new Uri(storageUrl);
throw new InvalidOperationException("Failed to obtain token url");
Token = token;
+ /* var keys = authClient.ResponseHeaders.AllKeys.AsQueryable();
+ groups = (from key in keys
+ where key.StartsWith("X-Account-Group-")
+ let name = key.Substring(16)
+ select new Group(name, authClient.ResponseHeaders[key]))
+ .ToList();
+
+*/
}
Log.InfoFormat("[AUTHENTICATE] End for {0}", UserName);
-
+ Debug.Assert(_baseClient!=null);
- return new AccountInfo {StorageUri = StorageUrl, Token = Token, UserName = UserName};
+ return new AccountInfo {StorageUri = StorageUrl, Token = Token, UserName = UserName,Groups=groups};
}
if (client.StatusCode == HttpStatusCode.NoContent)
return new List<ContainerInfo>();
var infos = JsonConvert.DeserializeObject<IList<ContainerInfo>>(content);
+
+ foreach (var info in infos)
+ {
+ info.Account = account;
+ }
return infos;
}
private string GetAccountUrl(string account)
{
- return new Uri(this.RootAddressUri, new Uri(account,UriKind.Relative)).AbsoluteUri;
+ return new Uri(RootAddressUri, new Uri(account,UriKind.Relative)).AbsoluteUri;
}
public IList<ShareAccountInfo> ListSharingAccounts(DateTime? since=null)
{
- using (log4net.ThreadContext.Stacks["Share"].Push("List Accounts"))
+ using (ThreadContext.Stacks["Share"].Push("List Accounts"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
client.IfModifiedSince = since;
//Extract the username from the base address
- client.BaseAddress = RootAddressUri.AbsoluteUri;
+ client.BaseAddress = RootAddressUri.AbsoluteUri;
var content = client.DownloadStringWithRetry(@"", 3);
}
}
- //Request listing of all objects in a container modified since a specific time.
- //If the *since* value is missing, return all objects
- public IList<ObjectInfo> ListSharedObjects(DateTime? since = null)
+
+ /// <summary>
+ /// Request listing of all objects in a container modified since a specific time.
+ /// If the *since* value is missing, return all objects
+ /// </summary>
+ /// <param name="knownContainers">Use the since variable only for the containers listed in knownContainers. Unknown containers are considered new
+ /// and should be polled anyway
+ /// </param>
+ /// <param name="since"></param>
+ /// <returns></returns>
+ public IList<ObjectInfo> ListSharedObjects(HashSet<string> knownContainers,DateTime? since = null )
{
- using (log4net.ThreadContext.Stacks["Share"].Push("List Objects"))
+ using (ThreadContext.Stacks["Share"].Push("List Objects"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
+ //'since' is not used here because we need to have ListObjects return a NoChange result
+ //for all shared accounts,containers
+
+ Func<ContainerInfo, string> GetKey = c => String.Format("{0}\\{1}", c.Account, c.Name);
+
+ var accounts = ListSharingAccounts();
+ var containers = (from account in accounts
+ let conts = ListContainers(account.name)
+ from container in conts
+ select container).ToList();
+ var items = from container in containers
+ let actualSince=knownContainers.Contains(GetKey(container))?since:null
+ select ListObjects(container.Account , container.Name, actualSince);
+ var objects=items.SelectMany(r=> r).ToList();
+
+ //Store any new containers
+ foreach (var container in containers)
+ {
+ knownContainers.Add(GetKey(container));
+ }
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("END");
+ return objects;
+ }
+ }
- var objects = new List<ObjectInfo>();
- var accounts = ListSharingAccounts(since);
- foreach (var account in accounts)
+ public void SetTags(ObjectInfo target,IDictionary<string,string> tags)
+ {
+ if (String.IsNullOrWhiteSpace(Token))
+ throw new InvalidOperationException("The Token is not set");
+ if (StorageUrl == null)
+ throw new InvalidOperationException("The StorageUrl is not set");
+ if (target == null)
+ throw new ArgumentNullException("target");
+ Contract.EndContractBlock();
+
+ using (ThreadContext.Stacks["Share"].Push("Share Object"))
+ {
+ if (Log.IsDebugEnabled) Log.DebugFormat("START");
+
+ using (var client = new RestClient(_baseClient))
{
- var containers = ListContainers(account.name);
- foreach (var container in containers)
+
+ client.BaseAddress = GetAccountUrl(target.Account);
+
+ client.Parameters.Clear();
+ client.Parameters.Add("update", "");
+
+ foreach (var tag in tags)
{
- var containerObjects = ListObjects(account.name, container.Name, null);
- objects.AddRange(containerObjects);
+ var headerTag = String.Format("X-Object-Meta-{0}", tag.Key);
+ client.Headers.Add(headerTag, tag.Value);
}
+
+ client.DownloadStringWithRetry(target.Container, 3);
+
+
+ client.AssertStatusOK("SetTags failed");
+ //If the status is NOT ACCEPTED we have a problem
+ if (client.StatusCode != HttpStatusCode.Accepted)
+ {
+ Log.Error("Failed to set tags");
+ throw new Exception("Failed to set tags");
+ }
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("END");
}
- if (Log.IsDebugEnabled) Log.DebugFormat("END");
- return objects;
}
+
+
}
public void ShareObject(string account, string container, string objectName, string shareTo, bool read, bool write)
throw new ArgumentNullException("shareTo");
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Share"].Push("Share Object"))
+ using (ThreadContext.Stacks["Share"].Push("Share Object"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
throw new ArgumentNullException("accountInfo");
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Account"].Push("GetPolicies"))
+ using (ThreadContext.Stacks["Account"].Push("GetPolicies"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
}
}
+ public void UpdateMetadata(ObjectInfo objectInfo)
+ {
+ if (objectInfo == null)
+ throw new ArgumentNullException("objectInfo");
+ Contract.EndContractBlock();
+
+ using (ThreadContext.Stacks["Objects"].Push("UpdateMetadata"))
+ {
+ if (Log.IsDebugEnabled) Log.DebugFormat("START");
+
+
+ using(var client=new RestClient(_baseClient))
+ {
+
+ client.BaseAddress = GetAccountUrl(objectInfo.Account);
+
+ client.Parameters.Clear();
+
+
+ //Set Tags
+ foreach (var tag in objectInfo.Tags)
+ {
+ var headerTag = String.Format("X-Object-Meta-{0}", tag.Key);
+ client.Headers.Add(headerTag, tag.Value);
+ }
+
+ //Set Permissions
+
+ var permissions=objectInfo.GetPermissionString();
+ client.SetNonEmptyHeaderValue("X-Object-Sharing",permissions);
+
+ client.SetNonEmptyHeaderValue("Content-Disposition",objectInfo.ContendDisposition);
+ client.SetNonEmptyHeaderValue("Content-Encoding",objectInfo.ContentEncoding);
+ client.SetNonEmptyHeaderValue("X-Object-Manifest",objectInfo.Manifest);
+ var isPublic = objectInfo.IsPublic.ToString().ToLower();
+ client.Headers.Add("X-Object-Public", isPublic);
+
+
+ /*var uriBuilder = client.GetAddressBuilder(objectInfo.Container, objectInfo.Name);
+ uriBuilder.Query = "update=";
+ var uri = uriBuilder.Uri.MakeRelativeUri(this.RootAddressUri);*/
+ var address = String.Format("{0}/{1}?update=",objectInfo.Container, objectInfo.Name);
+ client.PostWithRetry(address,"application/xml");
+
+ //client.UploadValues(uri,new NameValueCollection());
+
+
+ client.AssertStatusOK("UpdateMetadata failed");
+ //If the status is NOT ACCEPTED or OK we have a problem
+ if (!(client.StatusCode == HttpStatusCode.Accepted || client.StatusCode == HttpStatusCode.OK))
+ {
+ Log.Error("Failed to update metadata");
+ throw new Exception("Failed to update metadata");
+ }
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("END");
+ }
+ }
+
+ }
+
+ public void UpdateMetadata(ContainerInfo containerInfo)
+ {
+ if (containerInfo == null)
+ throw new ArgumentNullException("containerInfo");
+ Contract.EndContractBlock();
+
+ using (ThreadContext.Stacks["Containers"].Push("UpdateMetadata"))
+ {
+ if (Log.IsDebugEnabled) Log.DebugFormat("START");
+
+
+ using(var client=new RestClient(_baseClient))
+ {
+
+ client.BaseAddress = GetAccountUrl(containerInfo.Account);
+
+ client.Parameters.Clear();
+
+
+ //Set Tags
+ foreach (var tag in containerInfo.Tags)
+ {
+ var headerTag = String.Format("X-Container-Meta-{0}", tag.Key);
+ client.Headers.Add(headerTag, tag.Value);
+ }
+
+
+ //Set Policies
+ foreach (var policy in containerInfo.Policies)
+ {
+ var headerPolicy = String.Format("X-Container-Policy-{0}", policy.Key);
+ client.Headers.Add(headerPolicy, policy.Value);
+ }
+
+
+ var uriBuilder = client.GetAddressBuilder(containerInfo.Name,"");
+ var uri = uriBuilder.Uri;
+
+ client.UploadValues(uri,new NameValueCollection());
+
+
+ client.AssertStatusOK("UpdateMetadata failed");
+ //If the status is NOT ACCEPTED or OK we have a problem
+ if (!(client.StatusCode == HttpStatusCode.Accepted || client.StatusCode == HttpStatusCode.OK))
+ {
+ Log.Error("Failed to update metadata");
+ throw new Exception("Failed to update metadata");
+ }
+
+ if (Log.IsDebugEnabled) Log.DebugFormat("END");
+ }
+ }
+
+ }
+
public IList<ObjectInfo> ListObjects(string account, string container, DateTime? since = null)
{
throw new ArgumentNullException("container");
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Objects"].Push("List"))
+ using (ThreadContext.Stacks["Objects"].Push("List"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
client.AssertStatusOK("ListObjects failed");
+ if (client.StatusCode==HttpStatusCode.NotModified)
+ return new[]{new NoModificationInfo(account,container)};
//If the result is empty, return an empty list,
var infos = String.IsNullOrWhiteSpace(content)
? new List<ObjectInfo>()
{
info.Container = container;
info.Account = account;
+ info.StorageUri = this.StorageUrl;
}
- if (Log.IsDebugEnabled) Log.DebugFormat("START");
+ if (Log.IsDebugEnabled) Log.DebugFormat("END");
return infos;
}
}
}
-
-
public IList<ObjectInfo> ListObjects(string account, string container, string folder, DateTime? since = null)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
+/*
if (String.IsNullOrWhiteSpace(folder))
throw new ArgumentNullException("folder");
+*/
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Objects"].Push("List"))
+ using (ThreadContext.Stacks["Objects"].Push("List"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
var content = client.DownloadStringWithRetry(container, 3);
client.AssertStatusOK("ListObjects failed");
- var infos = JsonConvert.DeserializeObject<IList<ObjectInfo>>(content);
+ if (client.StatusCode==HttpStatusCode.NotModified)
+ return new[]{new NoModificationInfo(account,container,folder)};
+ var infos = JsonConvert.DeserializeObject<IList<ObjectInfo>>(content);
+ foreach (var info in infos)
+ {
+ info.Account = account;
+ if (info.Container == null)
+ info.Container = container;
+ info.StorageUri = this.StorageUrl;
+ }
if (Log.IsDebugEnabled) Log.DebugFormat("END");
return infos;
}
throw new ArgumentNullException("container", "The container property can't be empty");
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Containters"].Push("Exists"))
+ using (ThreadContext.Stacks["Containters"].Push("Exists"))
{
if (Log.IsDebugEnabled) Log.DebugFormat("START");
throw new ArgumentNullException("objectName", "The objectName property can't be empty");
Contract.EndContractBlock();
- using (log4net.ThreadContext.Stacks["Objects"].Push("GetObjectInfo"))
+ using (ThreadContext.Stacks["Objects"].Push("GetObjectInfo"))
{
using (var client = new RestClient(_baseClient))
case HttpStatusCode.OK:
case HttpStatusCode.NoContent:
var keys = client.ResponseHeaders.AllKeys.AsQueryable();
- var tags = (from key in keys
- where key.StartsWith("X-Object-Meta-")
- let name = key.Substring(14)
- select new {Name = name, Value = client.ResponseHeaders[name]})
- .ToDictionary(t => t.Name, t => t.Value);
+ var tags = client.GetMeta("X-Object-Meta-");
var extensions = (from key in keys
where key.StartsWith("X-Object-") && !key.StartsWith("X-Object-Meta-")
select new {Name = key, Value = client.ResponseHeaders[key]})
.ToDictionary(t => t.Name, t => t.Value);
+
+ var permissions=client.GetHeaderValue("X-Object-Sharing", true);
+
+
var info = new ObjectInfo
{
+ Account = account,
+ Container = container,
Name = objectName,
- Hash = client.GetHeaderValue("ETag"),
+ ETag = client.GetHeaderValue("ETag"),
+ UUID=client.GetHeaderValue("X-Object-UUID"),
+ X_Object_Hash = client.GetHeaderValue("X-Object-Hash"),
Content_Type = client.GetHeaderValue("Content-Type"),
+ Bytes = Convert.ToInt64(client.GetHeaderValue("Content-Length",true)),
Tags = tags,
Last_Modified = client.LastModified,
- Extensions = extensions
+ Extensions = extensions,
+ ContentEncoding=client.GetHeaderValue("Content-Encoding",true),
+ ContendDisposition = client.GetHeaderValue("Content-Disposition",true),
+ Manifest=client.GetHeaderValue("X-Object-Manifest",true),
+ PublicUrl=client.GetHeaderValue("X-Object-Public",true),
+ StorageUri=this.StorageUrl,
};
+ info.SetPermissions(permissions);
return info;
case HttpStatusCode.NotFound:
return ObjectInfo.Empty;
}
catch (RetryException)
{
- Log.WarnFormat("[RETRY FAIL] GetObjectInfo for {0} failed.");
+ Log.WarnFormat("[RETRY FAIL] GetObjectInfo for {0} failed.",objectName);
return ObjectInfo.Empty;
}
catch (WebException e)
}
}
+
+
public ContainerInfo GetContainerInfo(string account, string container)
{
if (String.IsNullOrWhiteSpace(container))
{
case HttpStatusCode.OK:
case HttpStatusCode.NoContent:
+ var tags = client.GetMeta("X-Container-Meta-");
+ var policies = client.GetMeta("X-Container-Policy-");
+
var containerInfo = new ContainerInfo
{
+ Account=account,
Name = container,
+ StorageUrl=this.StorageUrl.ToString(),
Count =
long.Parse(client.GetHeaderValue("X-Container-Object-Count")),
Bytes = long.Parse(client.GetHeaderValue("X-Container-Bytes-Used")),
BlockHash = client.GetHeaderValue("X-Container-Block-Hash"),
- BlockSize=int.Parse(client.GetHeaderValue("X-Container-Block-Size"))
+ BlockSize=int.Parse(client.GetHeaderValue("X-Container-Block-Size")),
+ Last_Modified=client.LastModified,
+ Tags=tags,
+ Policies=policies
};
+
+
return containerInfo;
case HttpStatusCode.NotFound:
return ContainerInfo.Empty;
}
public void CreateContainer(string account, string container)
- {
+ {
+ if (String.IsNullOrWhiteSpace(account))
+ throw new ArgumentNullException("account");
if (String.IsNullOrWhiteSpace(container))
- throw new ArgumentNullException("container", "The container property can't be empty");
+ throw new ArgumentNullException("container");
Contract.EndContractBlock();
using (var client = new RestClient(_baseClient))
/// <remarks>This method should have no timeout or a very long one</remarks>
//Asynchronously download the object specified by *objectName* in a specific *container* to
// a local file
- public Task GetObject(string account, string container, string objectName, string fileName)
+ public async Task GetObject(string account, string container, string objectName, string fileName,CancellationToken cancellationToken)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container", "The container property can't be empty");
//object to avoid concurrency errors.
//
//Download operations take a long time therefore they have no timeout.
- var client = new RestClient(_baseClient) { Timeout = 0 };
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
+ using(var client = new RestClient(_baseClient) { Timeout = 0 })
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- //The container and objectName are relative names. They are joined with the client's
- //BaseAddress to create the object's absolute address
- var builder = client.GetAddressBuilder(container, objectName);
- var uri = builder.Uri;
+ //The container and objectName are relative names. They are joined with the client's
+ //BaseAddress to create the object's absolute address
+ var builder = client.GetAddressBuilder(container, objectName);
+ var uri = builder.Uri;
- //Download progress is reported to the Trace log
- Log.InfoFormat("[GET] START {0}", objectName);
- client.DownloadProgressChanged += (sender, args) =>
- Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
- fileName, args.ProgressPercentage,
- args.BytesReceived,
- args.TotalBytesToReceive);
+ //Download progress is reported to the Trace log
+ Log.InfoFormat("[GET] START {0}", objectName);
+ client.DownloadProgressChanged += (sender, args) =>
+ Log.InfoFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+ fileName, args.ProgressPercentage,
+ args.BytesReceived,
+ args.TotalBytesToReceive);
+
+ //Start downloading the object asynchronously
+ await client.DownloadFileTaskAsync(uri, fileName, cancellationToken).ConfigureAwait(false);
- //Start downloading the object asynchronously
- var downloadTask = client.DownloadFileTask(uri, fileName);
-
- //Once the download completes
- return downloadTask.ContinueWith(download =>
- {
- //Delete the local client object
- client.Dispose();
- //And report failure or completion
- if (download.IsFaulted)
- {
- Log.ErrorFormat("[GET] FAIL for {0} with \r{1}", objectName,
- download.Exception);
- }
- else
- {
- Log.InfoFormat("[GET] END {0}", objectName);
- }
- });
+ //Once the download completes
+ //Delete the local client object
+ }
+ //And report failure or completion
}
catch (Exception exc)
{
- Log.ErrorFormat("[GET] END {0} with {1}", objectName, exc);
+ Log.ErrorFormat("[GET] FAIL {0} with {1}", objectName, exc);
throw;
}
+ Log.InfoFormat("[GET] END {0}", objectName);
}
//Don't use a timeout because putting the hashmap may be a long process
- var client = new RestClient(_baseClient) { Timeout = 0 };
+ var client = new RestClient(_baseClient) { Timeout = 0 };
if (!String.IsNullOrWhiteSpace(account))
client.BaseAddress = GetAccountUrl(account);
//Send the tree hash as Json to the server
client.Headers[HttpRequestHeader.ContentType] = "application/octet-stream";
- var uploadTask=client.UploadStringTask(uri, "PUT", hash.ToJson());
-
-
+ var jsonHash = hash.ToJson();
+
+ client.Headers.Add("ETag",hash.MD5);
+ var uploadTask=client.UploadStringTask(uri, "PUT", jsonHash);
+ if (Log.IsDebugEnabled)
+ Log.DebugFormat("Hashes:\r\n{0}", jsonHash);
return uploadTask.ContinueWith(t =>
{
{
//In case of 409 the missing parts will be in the response content
using (var stream = response.GetResponseStream())
- using(var reader=new StreamReader(stream))
+ using(var reader=stream.GetLoggedReader(Log))
{
- //We need to cleanup the content before returning it because it contains
+ //We used to have to cleanup the content before returning it because it contains
//error content after the list of hashes
- var hashes = new List<string>();
- string line=null;
- //All lines up to the first empty line are hashes
- while(!String.IsNullOrWhiteSpace(line=reader.ReadLine()))
- {
- hashes.Add(line);
- }
-
+ //
+ //As of 30/1/2012, the result is a proper Json array so we don't need to read the content
+ //line by line
+
+ var serializer = new JsonSerializer();
+ serializer.Error += (sender, args) => Log.ErrorFormat("Deserialization error at [{0}] [{1}]", args.ErrorContext.Error, args.ErrorContext.Member);
+ var hashes = (List<string>)serializer.Deserialize(reader, typeof(List<string>));
return hashes;
}
- }
- else
- //Any other status code is unexpected and the exception should be rethrown
- throw ex;
+ }
+ //Any other status code is unexpected and the exception should be rethrown
+ Log.LogError(response);
+ throw ex;
}
+
//Any other status code is unexpected but there was no exception. We can probably continue processing
- else
- {
- Log.WarnFormat("Unexcpected status code when putting map: {0} - {1}",client.StatusCode,client.StatusDescription);
- }
+ Log.WarnFormat("Unexcpected status code when putting map: {0} - {1}",client.StatusCode,client.StatusDescription);
+
return empty;
});
}
- public Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end)
+
+ public async Task<byte[]> GetBlock(string account, string container, Uri relativeUrl, long start, long? end, CancellationToken cancellationToken)
{
if (String.IsNullOrWhiteSpace(Token))
throw new InvalidOperationException("Invalid Token");
throw new InvalidOperationException("Invalid Storage Url");
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
- if (relativeUrl== null)
+ if (relativeUrl == null)
throw new ArgumentNullException("relativeUrl");
- if (end.HasValue && end<0)
+ if (end.HasValue && end < 0)
throw new ArgumentOutOfRangeException("end");
- if (start<0)
+ if (start < 0)
throw new ArgumentOutOfRangeException("start");
Contract.EndContractBlock();
-
//Don't use a timeout because putting the hashmap may be a long process
- var client = new RestClient(_baseClient) {Timeout = 0, RangeFrom = start, RangeTo = end};
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
+ using (var client = new RestClient(_baseClient) {Timeout = 0, RangeFrom = start, RangeTo = end})
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
- var uri = builder.Uri;
+ var builder = client.GetAddressBuilder(container, relativeUrl.ToString());
+ var uri = builder.Uri;
+
+ client.DownloadProgressChanged += (sender, args) =>
+ {
+ Log.DebugFormat("[GET PROGRESS] {0} {1}% {2} of {3}",
+ uri.Segments.Last(), args.ProgressPercentage,
+ args.BytesReceived,
+ args.TotalBytesToReceive);
+ DownloadProgressChanged(sender, args);
+ };
- return client.DownloadDataTask(uri)
- .ContinueWith(t=>
- {
- client.Dispose();
- return t.Result;
- });
+
+ var result = await client.DownloadDataTaskAsync(uri, cancellationToken).ConfigureAwait(false);
+ return result;
+ }
}
+ public event UploadProgressChangedEventHandler UploadProgressChanged;
+ public event DownloadProgressChangedEventHandler DownloadProgressChanged;
- public Task PostBlock(string account, string container, byte[] block, int offset, int count)
+ public async Task PostBlock(string account, string container, byte[] block, int offset, int count,CancellationToken token)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
throw new InvalidOperationException("Invalid Storage Url");
Contract.EndContractBlock();
-
- //Don't use a timeout because putting the hashmap may be a long process
- var client = new RestClient(_baseClient) { Timeout = 0 };
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
- var builder = client.GetAddressBuilder(container, "");
- //We are doing an update
- builder.Query = "update";
- var uri = builder.Uri;
+ try
+ {
- client.Headers[HttpRequestHeader.ContentType] = "application/octet-stream";
+ //Don't use a timeout because putting the hashmap may be a long process
+ using (var client = new RestClient(_baseClient) { Timeout = 0 })
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- Log.InfoFormat("[BLOCK POST] START");
+ token.Register(client.CancelAsync);
- client.UploadProgressChanged += (sender, args) =>
- Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
- args.ProgressPercentage, args.BytesSent,
- args.TotalBytesToSend);
- client.UploadFileCompleted += (sender, args) =>
- Log.InfoFormat("[BLOCK POST PROGRESS] Completed ");
+ var builder = client.GetAddressBuilder(container, "");
+ //We are doing an update
+ builder.Query = "update";
+ var uri = builder.Uri;
-
- //Send the block
- var uploadTask = client.UploadDataTask(uri, "POST", block)
- .ContinueWith(upload =>
- {
- client.Dispose();
+ client.Headers[HttpRequestHeader.ContentType] = "application/octet-stream";
- if (upload.IsFaulted)
- {
- var exception = upload.Exception.InnerException;
- Log.ErrorFormat("[BLOCK POST] FAIL with \r{0}", exception);
- throw exception;
+ Log.InfoFormat("[BLOCK POST] START");
+
+ client.UploadProgressChanged += (sender, args) =>
+ {
+ Log.InfoFormat("[BLOCK POST PROGRESS] {0}% {1} of {2}",
+ args.ProgressPercentage, args.BytesSent,
+ args.TotalBytesToSend);
+ UploadProgressChanged(sender, args);
+ };
+ client.UploadFileCompleted += (sender, args) =>
+ Log.InfoFormat("[BLOCK POST PROGRESS] Completed ");
+
+ var buffer = new byte[count];
+ Buffer.BlockCopy(block, offset, buffer, 0, count);
+ //Send the block
+ await client.UploadDataTaskAsync(uri, "POST", buffer).ConfigureAwait(false);
+ Log.InfoFormat("[BLOCK POST] END");
}
-
- Log.InfoFormat("[BLOCK POST] END");
- });
- return uploadTask;
+ }
+ catch (TaskCanceledException )
+ {
+ Log.Info("Aborting block");
+ throw;
+ }
+ catch (Exception exc)
+ {
+ Log.ErrorFormat("[BLOCK POST] FAIL with \r{0}", exc);
+ throw;
+ }
}
- public Task<TreeHash> GetHashMap(string account, string container, string objectName)
+ public async Task<TreeHash> GetHashMap(string account, string container, string objectName)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
//object to avoid concurrency errors.
//
//Download operations take a long time therefore they have no timeout.
- //TODO: Do we really? this is a hashmap operation, not a download
- var client = new RestClient(_baseClient) { Timeout = 0 };
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
-
-
- //The container and objectName are relative names. They are joined with the client's
- //BaseAddress to create the object's absolute address
- var builder = client.GetAddressBuilder(container, objectName);
- builder.Query = "format=json&hashmap";
- var uri = builder.Uri;
+ //TODO: Do they really? this is a hashmap operation, not a download
//Start downloading the object asynchronously
- var downloadTask = client.DownloadStringTask(uri);
-
- //Once the download completes
- return downloadTask.ContinueWith(download =>
+ using (var client = new RestClient(_baseClient) { Timeout = 0 })
{
- //Delete the local client object
- client.Dispose();
- //And report failure or completion
- if (download.IsFaulted)
- {
- Log.ErrorFormat("[GET HASH] FAIL for {0} with \r{1}", objectName,
- download.Exception);
- throw download.Exception;
- }
-
- //The server will return an empty string if the file is empty
- var json = download.Result;
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
+
+ //The container and objectName are relative names. They are joined with the client's
+ //BaseAddress to create the object's absolute address
+ var builder = client.GetAddressBuilder(container, objectName);
+ builder.Query = "format=json&hashmap";
+ var uri = builder.Uri;
+
+
+ var json = await client.DownloadStringTaskAsync(uri).ConfigureAwait(false);
var treeHash = TreeHash.Parse(json);
- Log.InfoFormat("[GET HASH] END {0}", objectName);
+ Log.InfoFormat("[GET HASH] END {0}", objectName);
return treeHash;
- });
+ }
}
catch (Exception exc)
{
throw;
}
-
-
}
/// <param name="fileName"></param>
/// <param name="hash">Optional hash value for the file. If no hash is provided, the method calculates a new hash</param>
/// <remarks>>This method should have no timeout or a very long one</remarks>
- public Task PutObject(string account, string container, string objectName, string fileName, string hash = null)
+ public async Task PutObject(string account, string container, string objectName, string fileName, string hash = null, string contentType = "application/octet-stream")
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container", "The container property can't be empty");
throw new ArgumentNullException("objectName", "The objectName property can't be empty");
if (String.IsNullOrWhiteSpace(fileName))
throw new ArgumentNullException("fileName", "The fileName property can't be empty");
- if (!File.Exists(fileName))
- throw new FileNotFoundException("The file does not exist",fileName);
- Contract.EndContractBlock();
+/*
+ if (!File.Exists(fileName) && !Directory.Exists(fileName))
+ throw new FileNotFoundException("The file or directory does not exist",fileName);
+*/
try
{
- var client = new RestClient(_baseClient){Timeout=0};
- if (!String.IsNullOrWhiteSpace(account))
- client.BaseAddress = GetAccountUrl(account);
-
- var builder = client.GetAddressBuilder(container, objectName);
- var uri = builder.Uri;
-
- string etag = hash ?? CalculateHash(fileName);
-
- client.Headers.Add("Content-Type", "application/octet-stream");
- client.Headers.Add("ETag", etag);
-
-
- Log.InfoFormat("[PUT] START {0}", objectName);
- client.UploadProgressChanged += (sender, args) =>
+ using (var client = new RestClient(_baseClient) { Timeout = 0 })
{
- using (log4net.ThreadContext.Stacks["PUT"].Push("Progress"))
- {
- Log.InfoFormat("{0} {1}% {2} of {3}", fileName, args.ProgressPercentage,
- args.BytesSent, args.TotalBytesToSend);
- }
- };
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
- client.UploadFileCompleted += (sender, args) =>
- {
- using (log4net.ThreadContext.Stacks["PUT"].Push("Progress"))
- {
- Log.InfoFormat("Completed {0}", fileName);
- }
- };
- return client.UploadFileTask(uri, "PUT", fileName)
- .ContinueWith(upload=>
- {
- client.Dispose();
-
- if (upload.IsFaulted)
- {
- var exc = upload.Exception.InnerException;
- Log.ErrorFormat("[PUT] FAIL for {0} with \r{1}",objectName,exc);
- throw exc;
- }
- else
- Log.InfoFormat("[PUT] END {0}", objectName);
- });
+ var builder = client.GetAddressBuilder(container, objectName);
+ var uri = builder.Uri;
+
+ string etag = hash ?? CalculateHash(fileName);
+
+ client.Headers.Add("Content-Type", contentType);
+ client.Headers.Add("ETag", etag);
+
+
+ Log.InfoFormat("[PUT] START {0}", objectName);
+ client.UploadProgressChanged += (sender, args) =>
+ {
+ using (ThreadContext.Stacks["PUT"].Push("Progress"))
+ {
+ Log.InfoFormat("{0} {1}% {2} of {3}", fileName,
+ args.ProgressPercentage,
+ args.BytesSent, args.TotalBytesToSend);
+ }
+ };
+
+ client.UploadFileCompleted += (sender, args) =>
+ {
+ using (ThreadContext.Stacks["PUT"].Push("Progress"))
+ {
+ Log.InfoFormat("Completed {0}", fileName);
+ }
+ };
+ if (contentType=="application/directory")
+ await client.UploadDataTaskAsync(uri, "PUT", new byte[0]).ConfigureAwait(false);
+ else
+ await client.UploadFileTaskAsync(uri, "PUT", fileName).ConfigureAwait(false);
+ }
+
+ Log.InfoFormat("[PUT] END {0}", objectName);
}
catch (Exception exc)
{
client.Headers.Add("X-Move-From", sourceUrl);
client.AllowedStatusCodes.Add(HttpStatusCode.NotFound);
+ Log.InfoFormat("[TRASH] [{0}] to [{1}]",sourceUrl,targetUrl);
client.PutWithRetry(targetUrl, 3);
var expectedCodes = new[] {HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created,HttpStatusCode.NotFound};
return new WebException(String.Format("{0} failed with unexpected status code {1}", operation, statusCode));
}
-
+
+/*
+ public IEnumerable<ObjectInfo> ListDirectories(ContainerInfo container)
+ {
+ var directories=this.ListObjects(container.Account, container.Name, "/");
+ }
+*/
+
+ public bool CanUpload(string account, ObjectInfo cloudFile)
+ {
+ Contract.Requires(!String.IsNullOrWhiteSpace(account));
+ Contract.Requires(cloudFile!=null);
+
+ using (var client = new RestClient(_baseClient))
+ {
+ if (!String.IsNullOrWhiteSpace(account))
+ client.BaseAddress = GetAccountUrl(account);
+
+
+ var parts = cloudFile.Name.Split('/');
+ var folder = String.Join("/", parts,0,parts.Length-1);
+
+ var fileUrl=String.Format("{0}/{1}/{2}.pithos.ignore",cloudFile.Container,folder,Guid.NewGuid());
+
+ client.Parameters.Clear();
+ try
+ {
+ client.PutWithRetry(fileUrl, 3, @"application/octet-stream");
+
+ var expectedCodes = new[] { HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created};
+ var result=(expectedCodes.Contains(client.StatusCode));
+ DeleteObject(account, cloudFile.Container, fileUrl);
+ return result;
+ }
+ catch
+ {
+ return false;
+ }
+ }
+ }
}
public class ShareAccountInfo