{
try
{
+
var accountInfo = action.AccountInfo;
var fileInfo = action.LocalFile;
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
return;
+ //Do not upload files in conflict
+ if (action.FileState.FileStatus == FileStatus.Conflict )
+ {
+ Log.InfoFormat("Skipping file in conflict [{0}]",fileInfo.FullName);
+ return;
+ }
+ if (action.FileState.FileStatus == FileStatus.Forbidden)
+ {
+ Log.InfoFormat("Skipping forbidden file [{0}]",fileInfo.FullName);
+ return;
+ }
var relativePath = fileInfo.AsRelativeTo(accountInfo.AccountPath);
if (relativePath.StartsWith(FolderConstants.OthersFolder))
var cloudFile = action.CloudFile;
var account = cloudFile.Account ?? accountInfo.UserName;
+ try
+ {
var client = new CloudFilesClient(accountInfo);
//Even if GetObjectInfo times out, we can proceed with the upload
return;
//TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash
- if (fileInfo is DirectoryInfo)
- {
- //If the directory doesn't exist the Hash property will be empty
- if (String.IsNullOrWhiteSpace(info.Hash))
- //Go on and create the directory
- await
- client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
- String.Empty, "application/directory");
- }
- else
- {
+ if (fileInfo is DirectoryInfo)
+ {
+ //If the directory doesn't exist the Hash property will be empty
+ if (String.IsNullOrWhiteSpace(info.Hash))
+ //Go on and create the directory
+ await
+ client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
+ String.Empty, "application/directory");
+ }
+ else
+ {
- var cloudHash = info.Hash.ToLower();
+ var cloudHash = info.Hash.ToLower();
- var hash = action.LocalHash.Value;
- var topHash = action.TopHash.Value;
+ var hash = action.LocalHash.Value;
+ var topHash = action.TopHash.Value;
- //If the file hashes match, abort the upload
- if (hash == cloudHash || topHash == cloudHash)
- {
- //but store any metadata changes
- StatusKeeper.StoreInfo(fullFileName, info);
- Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
- return;
- }
+ //If the file hashes match, abort the upload
+ if (hash == cloudHash || topHash == cloudHash)
+ {
+ //but store any metadata changes
+ StatusKeeper.StoreInfo(fullFileName, info);
+ Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
+ return;
+ }
- //Mark the file as modified while we upload it
- StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
- //And then upload it
+ //Mark the file as modified while we upload it
+ StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
+ //And then upload it
- //Upload even small files using the Hashmap. The server may already contain
- //the relevant block
+ //Upload even small files using the Hashmap. The server may already contain
+ //the relevant block
- //First, calculate the tree hash
- var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
- accountInfo.BlockHash, 2);
+ //First, calculate the tree hash
+ var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
+ accountInfo.BlockHash, 2);
- await
- UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
+ //TODO: If the upload fails with a 403, abort it and mark conflict
+
+ await
+ UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
+ }
+ //If everything succeeds, change the file and overlay status to normal
+ StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
+ }
+ catch (WebException exc)
+ {
+ var response=(exc.Response as HttpWebResponse);
+ if (response.StatusCode == HttpStatusCode.Forbidden)
+ {
+ StatusKeeper.SetFileState(fileInfo.FullName,FileStatus.Forbidden, FileOverlayStatus.Conflict);
+ }
}
- //If everything succeeds, change the file and overlay status to normal
- StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
}
//Notify the Shell to update the overlays
NativeMethods.RaiseChangeNotification(fullFileName);
namespace Pithos.Network\r
{\r
using System;\r
- using System.Collections.Generic;\r
- using System.Linq;\r
- using System.Text;\r
\r
/// <summary>\r
/// TODO: Update summary.\r
{\r
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);\r
\r
+/*\r
public static Func<FileStream, int, string, ConcurrentDictionary<int, byte[]>, int, Task<ConcurrentDictionary<int, byte[]>>> CalculateBlockHash;\r
\r
public static Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesRecursiveAsync(FileStream stream, int blockSize, string algorithm, ConcurrentDictionary<int, byte[]> hashes = null, int index = 0)\r
int read;\r
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
{\r
- //TODO: identify the value of index\r
-\r
using (var hasher = HashAlgorithm.Create(algorithm))\r
{\r
//This code was added for compatibility with the way Pithos calculates the last hash\r
hashes[index] = hash;\r
}\r
index += read;\r
- };\r
+ }\r
return hashes;\r
}\r
\r
var size = stream.Length;\r
Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
\r
-/*\r
var options = new ExecutionDataflowBlockOptions {BoundedCapacity = parallelism,MaxDegreeOfParallelism=parallelism};\r
var hashBlock=new ActionBlock<Tuple<int,byte[]>>(input=>\r
{\r
hashes[idx] = hash;\r
} \r
},options);\r
-*/\r
\r
var buffer = new byte[blockSize];\r
int read;\r
int index = 0;\r
+ while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
+ {\r
+ var block = new byte[read];\r
+ Buffer.BlockCopy(buffer, 0, block, 0, read);\r
+ await hashBlock.SendAsync(Tuple.Create(index, block));\r
+ index += read;\r
+ }\r
+ \r
+\r
+ hashBlock.Complete();\r
+ await hashBlock.Completion;\r
+\r
+ return hashes;\r
+ } \r
+ \r
+ public static async Task<ConcurrentDictionary<int, byte[]>> CalculateBlockHashesInPlace(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<int, byte[]>();\r
+\r
+ var path = stream.Name;\r
+ var size = stream.Length;\r
+ Log.DebugFormat("Hashing [{0}] size [{1}]",path,size);\r
+ \r
+\r
+ var buffer = new byte[blockSize];\r
+ var index = 0;\r
using (var hasher = HashAlgorithm.Create(algorithm))\r
{\r
+ int read;\r
while ((read = await stream.ReadAsync(buffer, 0, blockSize)) > 0)\r
{\r
- // var block = new byte[read];\r
-\r
//This code was added for compatibility with the way Pithos calculates the last hash\r
//We calculate the hash only up to the last non-null byte\r
var lastByteIndex = Array.FindLastIndex(buffer, read - 1, aByte => aByte != 0);\r
hashes[index] = hash;\r
index += read;\r
}\r
+ }\r
+ return hashes;\r
+ }\r
+*/\r
+\r
+ public static async Task<ConcurrentDictionary<long, byte[]>> CalculateBlockHashesInPlacePFor(FileStream stream, int blockSize, string algorithm, int parallelism)\r
+ {\r
+ if (stream == null)\r
+ throw new ArgumentNullException("stream");\r
+ if (String.IsNullOrWhiteSpace(algorithm))\r
+ throw new ArgumentNullException("algorithm");\r
+ if (blockSize <= 0)\r
+ throw new ArgumentOutOfRangeException("blockSize", "blockSize must be a value greater than zero ");\r
+ Contract.EndContractBlock();\r
+\r
+ var hashes = new ConcurrentDictionary<long, byte[]>();\r
+\r
+ var path = stream.Name;\r
+ var size = stream.Length;\r
+ Log.DebugFormat("Hashing [{0}] size [{1}]", path, size);\r
\r
- /*\r
- Buffer.BlockCopy(buffer,0,block,0,read);\r
- await hashBlock.SendAsync(Tuple.Create(index, block));\r
- */\r
- \r
+\r
+ var buffer = new byte[parallelism][];\r
+ var hashers = new HashAlgorithm[parallelism];\r
+ for (var i = 0; i < parallelism; i++)\r
+ {\r
+ buffer[i] = new byte[blockSize];\r
+ hashers[i] = HashAlgorithm.Create(algorithm);\r
}\r
- \r
+ try\r
+ {\r
+ var indices = new long[parallelism];\r
+ var bufferCount = new int[parallelism];\r
\r
-/*\r
- hashBlock.Complete();\r
- await hashBlock.Completion;\r
-*/\r
+ int read;\r
+ int bufIdx = 0;\r
+ long index = 0;\r
+ while ((read = await stream.ReadAsync(buffer[bufIdx], 0, blockSize)) > 0)\r
+ {\r
+ index += read;\r
+ indices[bufIdx] = index;\r
+ bufferCount[bufIdx] = read;\r
+ //If we have filled the last buffer or if we have read from the last block,\r
+ //we can calculate the clocks in parallel\r
+ if (bufIdx == parallelism - 1 || read < blockSize)\r
+ {\r
+ //var options = new ParallelOptions {MaxDegreeOfParallelism = parallelism};\r
+ Parallel.For(0, bufIdx + 1, idx =>\r
+ {\r
+ //This code was added for compatibility with the way Pithos calculates the last hash\r
+ //We calculate the hash only up to the last non-null byte\r
+ var lastByteIndex = Array.FindLastIndex(buffer[idx],\r
+ bufferCount[idx] - 1,\r
+ aByte => aByte != 0);\r
+\r
+ var hasher = hashers[idx];\r
+ var hash = hasher.ComputeHash(buffer[idx], 0, lastByteIndex + 1);\r
+ var filePosition = indices[idx];\r
+ /*\r
+ Trace.TraceInformation("Hashed [{0}] [{1}/{2}] [{3:p}]", path,\r
+ filePosition, size,\r
+ (double)filePosition / size);\r
+ */\r
+ hashes[filePosition] = hash;\r
+ });\r
+ }\r
+ bufIdx = (bufIdx + 1) % parallelism;\r
+ }\r
+ }\r
+ finally\r
+ {\r
+ for (var i = 0; i < parallelism; i++)\r
+ {\r
+ if (hashers[i] != null)\r
+ hashers[i].Dispose();\r
+ }\r
+\r
+ }\r
\r
return hashes;\r
}\r
\r
static BlockHashAlgorithms()\r
{\r
+/*\r
CalculateBlockHash = CalculateBlockHashesRecursiveAsync;\r
+*/\r
}\r
}\r
}\r