using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
+using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
return FileOverlayStatus.Unversioned;
}
- public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+ public void ProcessExistingFiles(IEnumerable<FileInfo> paths)
{
var newFiles = (from file in paths
- where !_overlayCache.ContainsKey(file)
+ where !_overlayCache.ContainsKey(file.FullName)
select new
{
- FilePath = file,
+ FilePath = file.FullName.ToLower(),
OverlayStatus = FileOverlayStatus.Unversioned,
FileStatus = FileStatus.Created,
Checksum = Signature.CalculateMD5(file)
});
var files = new ConcurrentBag<string>();
- newFiles.ForAll(state =>
- {
+ foreach (var state in newFiles)
+ {
_overlayCache[state.FilePath] = state.OverlayStatus;
_statusCache[state.FilePath] = state.FileStatus;
_checksums[state.FilePath] = state.Checksum;
files.Add(state.FilePath);
- });
- return files.ToList();
+ }
}
agent.CloudClient.Authenticate("890329@vho.grnet.gr", "24989dce4e0fcb072f8cb60c8922be19");
- var fileName = @"vlc-1.1.11-win32.exe";
+ var fileName = @"AccessDatabaseEngine_x64.exe";
var filePath = Path.Combine(@"e:\pithos\", fileName);
if (File.Exists(filePath))
Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
};
- var checker = new InMemStatusChecker();
+ var checker = new MockStatusChecker();
foreach (var file in files)
{
Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
};
- var checker = new InMemStatusChecker();
+ var checker = new MockStatusChecker();
foreach (var file in files)
{
Tuple.Create(@"e:\pithos\0File4.txt", FileOverlayStatus.Modified)
};
- var checker = new InMemStatusChecker();
+ var checker = new MockStatusChecker();
foreach (var file in files)
{
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
+using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Pithos.Network;
public TreeHash ServerHash { get; private set; }
public string TempPath { get; private set; }
-
+
public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash)
- {
+ {
+ if (String.IsNullOrWhiteSpace(fragmentsPath))
+ throw new ArgumentNullException("fragmentsPath");
+ if (!Path.IsPathRooted(fragmentsPath))
+ throw new ArgumentException("The fragmentsPath must be rooted", "fragmentsPath");
+
+ if (string.IsNullOrWhiteSpace(filePath))
+ throw new ArgumentNullException("filePath");
+ if (!Path.IsPathRooted(filePath))
+ throw new ArgumentException("The filePath must be rooted", "filePath");
+
+ if (string.IsNullOrWhiteSpace(relativePath))
+ throw new ArgumentNullException("relativePath");
+ if (Path.IsPathRooted(relativePath))
+ throw new ArgumentException("The relativePath must NOT be rooted", "relativePath");
+
+ if (serverHash == null)
+ throw new ArgumentNullException("serverHash");
+ Contract.EndContractBlock();
+
FragmentsPath=fragmentsPath;
FilePath = filePath;
RelativePath=relativePath;
ServerHash = serverHash;
-
- Start();
- }
-
- public void Start()
- {
//The file will be stored in a temporary location while downloading with an extension .download
TempPath = Path.Combine(FragmentsPath, RelativePath + ".download");
+
var directoryPath = Path.GetDirectoryName(TempPath);
if (!Directory.Exists(directoryPath))
Directory.CreateDirectory(directoryPath);
+
+ LoadOrphans(directoryPath);
+ }
+
+ private void LoadOrphans(string directoryPath)
+ {
+ if (string.IsNullOrWhiteSpace(directoryPath))
+ throw new ArgumentNullException("directoryPath");
+ if (!Path.IsPathRooted(directoryPath))
+ throw new ArgumentException("The directoryPath must be rooted", "directoryPath");
+ Contract.EndContractBlock();
+
+ var fileNamename = Path.GetFileName(FilePath);
+ var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*");
+ foreach (var orphan in orphans)
+ {
+ using (HashAlgorithm hasher = HashAlgorithm.Create(ServerHash.BlockHash))
+ {
+ var buffer=File.ReadAllBytes(orphan);
+ //The server truncates nulls before calculating hashes, have to do the same
+ //Find the last non-null byte, starting from the end
+ var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0);
+ var binHash = hasher.ComputeHash(buffer,0,lastByteIndex);
+ var hash = binHash.ToHashString();
+ _orphanBlocks[hash] = orphan;
+ }
+ }
}
public void Commit()
- {
+ {
+ if (String.IsNullOrWhiteSpace(FilePath))
+ throw new InvalidOperationException("FilePath is empty");
+ if (String.IsNullOrWhiteSpace(TempPath))
+ throw new InvalidOperationException("TempPath is empty");
+ Contract.EndContractBlock();
+
//Copy the file to a temporary location. Changes will be made to the
//temporary file, then it will replace the original file
- File.Copy(FilePath, TempPath, true);
+ if (File.Exists(FilePath))
+ File.Copy(FilePath, TempPath, true);
//Set the size of the file to the size specified in the treehash
//This will also create an empty file if the file doesn't exist
private void SwapFiles()
{
+ if (String.IsNullOrWhiteSpace(FilePath))
+ throw new InvalidOperationException("FilePath is empty");
+ if (String.IsNullOrWhiteSpace(TempPath))
+ throw new InvalidOperationException("TempPath is empty");
+ Contract.EndContractBlock();
+
if (File.Exists(FilePath))
File.Replace(TempPath, FilePath, null, true);
else
private void ClearBlocks()
{
- foreach (var block in _blocks)
+ //Get all the the block paths, orphan or not
+ var paths= _blocks.Select(pair => pair.Value)
+ .Union(_orphanBlocks.Select(pair => pair.Value));
+ foreach (var filePath in paths)
{
- var filePath = block.Value;
File.Delete(filePath);
}
+
File.Delete(TempPath);
_blocks.Clear();
+ _orphanBlocks.Clear();
}
//Change the file's size, possibly truncating or adding to it
}*/
ConcurrentDictionary<int,string> _blocks=new ConcurrentDictionary<int, string>();
+ ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>();
+
+ public bool UseOrphan(int blockIndex, string blockHash)
+ {
+ string blockPath=null;
+ if (_orphanBlocks.TryGetValue(blockHash,out blockPath))
+ {
+ _blocks[blockIndex] = blockPath;
+ return true;
+ }
+ return false;
+ }
public Task StoreBlock(int blockIndex,byte[] buffer)
{
- var blockPath = String.Format("{0}.{1:3}", TempPath, blockIndex);
+ var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex);
_blocks[blockIndex] = blockPath;
-
+ //Remove any orphan files
+ if (File.Exists(blockPath))
+ File.Delete(blockPath);
+
return FileAsync.WriteAllBytes(blockPath, buffer);
}
OldPath = oldPath;
NewFileName = newFileName;
NewPath = newPath;
- LocalHash = new Lazy<string>(() => Signature.CalculateMD5(NewFileName), LazyThreadSafetyMode.ExecutionAndPublication);
+ //This is a rename operation, a hash will not be used
+ LocalHash = new Lazy<string>(() => String.Empty, LazyThreadSafetyMode.ExecutionAndPublication);
}
- public CloudAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile,FileState state)
+ public CloudAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile,FileState state,int blockSize, string algorithm)
{
Action = action;
LocalFile = localFile;
FileState = state;
if (LocalFile != null)
{
- LocalHash = new Lazy<string>(() => Signature.CalculateMD5(LocalFile),
+ LocalHash = new Lazy<string>(() => LocalFile.CalculateHash(blockSize,algorithm),
LazyThreadSafetyMode.ExecutionAndPublication);
}
}
}
catch (IOException exc)
{
- Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
- _agent.Post(state);
+ if (File.Exists(state.Path))
+ {
+ Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
+ _agent.Post(state);
+ }
+ else
+ {
+ Trace.TraceWarning("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
+ }
}
catch (Exception exc)
{
{
if (filePath.StartsWith(FragmentsPath))
return true;
+ if (_ignoreFiles.ContainsKey(filePath.ToLower()))
+ return true;
return false;
}
{WatcherChangeTypes.Renamed,FileStatus.Renamed}
};
+ private Dictionary<string,string> _ignoreFiles=new Dictionary<string, string>();
+
private WorkflowState UpdateFileStatus(WorkflowState state)
{
Debug.Assert(!state.Path.Contains("fragments"));
this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
break;
case FileStatus.Deleted:
- this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
+ //this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
break;
case FileStatus.Renamed:
this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
if (Directory.Exists(path))
return state;
- string hash = Signature.CalculateMD5(path);
-
+ var info = new FileInfo(path);
+ string hash = info.CalculateHash(StatusKeeper.BlockSize,StatusKeeper.BlockHash);
StatusKeeper.UpdateFileChecksum(path, hash);
state.Hash = hash;
}
+ public void Delete(string relativePath)
+ {
+ var absolutePath = Path.Combine(RootPath, relativePath);
+ if (File.Exists(absolutePath))
+ {
+ File.Delete(absolutePath);
+ _ignoreFiles[absolutePath.ToLower()] = absolutePath.ToLower();
+ }
+ StatusKeeper.ClearFileStatus(absolutePath);
+ }
}
}
using System.IO;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
+using Pithos.Network;
namespace Pithos.Core.Agents
{
}
+ public static string CalculateHash(this FileInfo info,int blockSize,string algorithm)
+ {
+ if (info.Length <= blockSize)
+ return Signature.CalculateMD5(info.FullName);
+ else
+ return Signature.CalculateTreeHash(info.FullName, blockSize, algorithm).TopHash.ToHashString();
+
+ }
+
}
}
using System.Diagnostics.Contracts;
using System.IO;
using System.Linq;
+using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
//Get the list of server objects changed since the last check
var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
CloudClient.ListObjects(PithosContainer,since));
+ //Get the list of deleted objects since the last check
+ var listTrash= Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
+ CloudClient.ListObjects(TrashContainer,since));
+
+ var listAll = Task.Factory.TrackedSequence(() => listObjects, () => listTrash);
//Next time we will check for all changes since the current check minus 1 second
//This is done to ensure there are no discrepancies due to clock differences
DateTime nextSince = DateTime.Now.AddSeconds(-1);
+
+
+
-
- var enqueueFiles = listObjects.ContinueWith(task =>
+
+ var enqueueFiles = listAll.ContinueWith(task =>
{
if (task.IsFaulted)
{
Trace.CorrelationManager.StartLogicalOperation("Listener");
Trace.TraceInformation("[LISTENER] Start Processing");
- var remoteObjects = task.Result;
+ var trashObjects = ((Task<IList<ObjectInfo>>)task.Result[1]).Result;
+ var remoteObjects = ((Task<IList<ObjectInfo>>)task.Result[0]).Result;
+
+
+ //Items with the same name, hash may be both in the container and the trash
+ //Don't delete items that exist in the container
+ var realTrash = from trash in trashObjects
+ where !remoteObjects.Any(info => info.Hash == trash.Hash)
+ select trash;
+ ProcessDeletedFiles(realTrash);
+
var remote=from info in remoteObjects
let name=info.Name
//over the remote files
foreach (var objectInfo in remote)
{
- var relativePath= objectInfo.Name.RelativeUrlToFilePath();// fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
+ var relativePath= objectInfo.Name.RelativeUrlToFilePath();
//and remove any matching objects from the list, adding them to the commonObjects list
if (FileAgent.Exists(relativePath))
{
var localFile = FileAgent.GetFileInfo(relativePath);
- var state=FileState.FindByFilePath(localFile.FullName);
- commonObjects.Add(Tuple.Create(objectInfo, localFile,state));
+ var state = FileState.FindByFilePath(localFile.FullName);
+ commonObjects.Add(Tuple.Create(objectInfo, localFile, state));
}
else
+ {
//If there is no match we add them to the localFiles list
- remoteOnly.Add(objectInfo);
+ //but only if the file is not marked for deletion
+ var targetFile = Path.Combine(FileAgent.RootPath, relativePath);
+ var fileStatus = StatusKeeper.GetFileStatus(targetFile);
+ if (fileStatus!=FileStatus.Deleted)
+ remoteOnly.Add(objectInfo);
+
+
+ }
}
//At the end of the iteration, the *remote* list will contain the files that exist
let localFile = pair.Item2
let state=pair.Item3
select new CloudAction(CloudActionType.MustSynch,
- localFile, objectInfo,state);
+ localFile, objectInfo,state,BlockSize,BlockHash);
+
+
+
//Collect all the actions
var allActions = actionsForRemote.Union(actionsForCommon);
return loop;
}
-
-
+ private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects)
+ {
+ foreach (var trashObject in trashObjects)
+ {
+ var relativePath = trashObject.Name.RelativeUrlToFilePath();
+ //and remove any matching objects from the list, adding them to the commonObjects list
+ FileAgent.Delete(relativePath);
+ }
+ }
+
private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
{
Contract.EndContractBlock();
this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified);
-
- CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName);
+ CloudClient.DeleteObject(PithosContainer, fileName, TrashContainer);
this.StatusKeeper.ClearFileStatus(fileName);
this.StatusKeeper.RemoveFileOverlayStatus(fileName);
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
+ if (relativeUrl == null)
+ throw new ArgumentNullException("relativeUrl");
+ if (String.IsNullOrWhiteSpace(localPath))
+ throw new ArgumentNullException("localPath");
+ if (!Path.IsPathRooted(localPath))
+ throw new ArgumentException("The localPath must be rooted", "localPath");
+ Contract.EndContractBlock();
+
+ var download=Task.Factory.Iterate(DownloadIterator(container, relativeUrl, localPath));
+ download.Wait();
+ }
+
+ private IEnumerable<Task> DownloadIterator(string container, Uri relativeUrl, string localPath)
+ {
+ if (String.IsNullOrWhiteSpace(container))
+ throw new ArgumentNullException("container");
if (relativeUrl==null)
throw new ArgumentNullException("relativeUrl");
if (String.IsNullOrWhiteSpace(localPath))
var url = relativeUrl.ToString();
if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase))
- return;
+ yield break;
//Are we already downloading or uploading the file?
using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
{
if (gate.Failed)
- return;
+ yield break;
//The file's hashmap will be stored in the same location with the extension .hashmap
//var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap");
//Retrieve the hashmap from the server
- var getHashMap = CloudClient.GetHashMap(container,url);
+ var getHashMap = CloudClient.GetHashMap(container,url);
+ yield return getHashMap;
- var downloadTask= getHashMap.ContinueWith(t =>
- {
- var serverHash=t.Result;
- //If it's a small file
- return serverHash.Hashes.Count == 1
- //Download it in one go
- ? DownloadEntireFile(container, relativeUrl, localPath)
- //Otherwise download it block by block
- : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
- });
+ var serverHash=getHashMap.Result;
+ //If it's a small file
+ var downloadTask=(serverHash.Hashes.Count == 1 )
+ //Download it in one go
+ ? DownloadEntireFile(container, relativeUrl, localPath)
+ //Otherwise download it block by block
+ : DownloadWithBlocks(container, relativeUrl, localPath, serverHash);
+
+ yield return downloadTask;
-
//Retrieve the object's metadata
- var getInfo = downloadTask.ContinueWith(t =>
- {
- t.PropagateExceptions();
- return CloudClient.GetObjectInfo(container, url);
- });
+ var info=CloudClient.GetObjectInfo(container, url);
//And store it
- var storeInfo = getInfo.ContinueWith(t =>
- {
- t.PropagateExceptions();
- StatusKeeper.StoreInfo(localPath, t.Result);
- });
-
- storeInfo.Wait();
+ StatusKeeper.StoreInfo(localPath, info);
+
+ //Notify listeners that a local file has changed
StatusNotification.NotifyChangedFile(localPath);
}
return getObject;
}
- public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
+ //Download a file asynchronously using blocks
+ public Task DownloadWithBlocks(string container, Uri relativeUrl, string localPath, TreeHash serverHash)
+ {
+ if (String.IsNullOrWhiteSpace(container))
+ throw new ArgumentNullException("container");
+ if (relativeUrl == null)
+ throw new ArgumentNullException("relativeUrl");
+ if (String.IsNullOrWhiteSpace(localPath))
+ throw new ArgumentNullException("localPath");
+ if (!Path.IsPathRooted(localPath))
+ throw new ArgumentException("The localPath must be rooted", "localPath");
+ if (serverHash == null)
+ throw new ArgumentNullException("serverHash");
+ Contract.EndContractBlock();
+
+ return Task.Factory.Iterate(BlockDownloadIterator(container, relativeUrl, localPath, serverHash));
+ }
+
+ private IEnumerable<Task> BlockDownloadIterator(string container,Uri relativeUrl, string localPath,TreeHash serverHash)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
- return Task.Factory.StartNew(() =>
- {
- //Calculate the temp file's treehash
- var treeHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash).Result;
+
+ //Calculate the file's treehash
+ var calcHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash);
+ yield return calcHash;
+ var treeHash = calcHash.Result;
- //And compare it with the server's hash
- var upHashes = serverHash.GetHashesAsStrings();
- var localHashes = treeHash.HashDictionary;
- for (int i = 0; i < upHashes.Length; i++)
+ //And compare it with the server's hash
+ var upHashes = serverHash.GetHashesAsStrings();
+ var localHashes = treeHash.HashDictionary;
+ for (int i = 0; i < upHashes.Length; i++)
+ {
+ //For every non-matching hash
+ var upHash = upHashes[i];
+ if (!localHashes.ContainsKey(upHash))
{
- //For every non-matching hash
- if (!localHashes.ContainsKey(upHashes[i]))
+ if (blockUpdater.UseOrphan(i, upHash))
{
- Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
- var start = i*BlockSize;
- long? end = null;
- if (i < upHashes.Length - 1 )
- end= ((i + 1)*BlockSize) ;
+ Trace.TraceInformation("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
+ continue;
+ }
+ Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
+ var start = i*BlockSize;
+ //To download the last block just pass a null for the end of the range
+ long? end = null;
+ if (i < upHashes.Length - 1 )
+ end= ((i + 1)*BlockSize) ;
- //Get its block
- var block= CloudClient.GetBlock(container, relativeUrl,start, end);
+ //Download the missing block
+ var getBlock = CloudClient.GetBlock(container, relativeUrl, start, end);
+ yield return getBlock;
+ var block = getBlock.Result;
- var store=block.Then(b => blockUpdater.StoreBlock(i, b));
- store.Wait();
+ //and store it
+ yield return blockUpdater.StoreBlock(i, block);
+
- Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
- }
+ Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
}
-
- blockUpdater.Commit();
- Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
- });
- }
+ }
+ blockUpdater.Commit();
+ Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
+ }
private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
{
+ if (fileInfo == null)
+ throw new ArgumentNullException("fileInfo");
+ if (String.IsNullOrWhiteSpace(hash))
+ throw new ArgumentNullException("hash");
+ if (topHash == null)
+ throw new ArgumentNullException("topHash");
+ Contract.EndContractBlock();
+
+ var upload = Task.Factory.Iterate(UploadIterator(fileInfo, hash, topHash));
+ upload.Wait();
+ }
+
+ private IEnumerable<Task> UploadIterator(FileInfo fileInfo, string hash,string topHash)
+ {
if (fileInfo==null)
throw new ArgumentNullException("fileInfo");
if (String.IsNullOrWhiteSpace(hash))
throw new ArgumentNullException("hash");
+ if (topHash == null)
+ throw new ArgumentNullException("topHash");
Contract.EndContractBlock();
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
- return;
+ yield break;
var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath);
{
//Abort if the file is already being uploaded or downloaded
if (gate.Failed)
- return;
+ yield break;
//Even if GetObjectInfo times out, we can proceed with the upload
//but store any metadata changes
this.StatusKeeper.StoreInfo(fullFileName, info);
Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName);
- return;
+ yield break;
}
//Mark the file as modified while we upload it
- var setStatus = Task.Factory.StartNew(() =>
- StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified));
+ StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
//And then upload it
//If the file is larger than the block size, try a hashmap PUT
{
//To upload using a hashmap
//First, calculate the tree hash
- var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);
-
- var putHashMap = setStatus.ContinueWith(t=>
- UploadWithHashMap(fileInfo,url,treeHash));
+ var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash);
+ yield return treeHash;
- putHashMap.Wait();
+ yield return Task.Factory.Iterate(UploadWithHashMap(fileInfo,url,treeHash));
+
}
else
{
//Otherwise do a regular PUT
- var put = setStatus.ContinueWith(t =>
- CloudClient.PutObject(PithosContainer,url,fullFileName,hash));
- put.Wait();
+ yield return CloudClient.PutObject(PithosContainer,url,fullFileName,hash);
}
//If everything succeeds, change the file and overlay status to normal
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal);
StatusNotification.NotifyChangedFile(fullFileName);
}
- public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
+ public IEnumerable<Task> UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash)
{
+ if(fileInfo==null)
+ throw new ArgumentNullException("fileInfo");
+ if (String.IsNullOrWhiteSpace(url))
+ throw new ArgumentNullException(url);
+ if (treeHash==null)
+ throw new ArgumentNullException("treeHash");
+ Contract.EndContractBlock();
+
var fullFileName = fileInfo.FullName;
//Send the hashmap to the server
var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
- var missingHashes = hashPut.Result;
- if (missingHashes.Count == 0)
- return;
+ yield return hashPut;
- var buffer = new byte[BlockSize];
- foreach (var missingHash in missingHashes)
+ var missingHashes = hashPut.Result;
+ //If the server returns no missing hashes, we are done
+ while (missingHashes.Count > 0)
{
- int blockIndex = -1;
- try
+
+ var buffer = new byte[BlockSize];
+ foreach (var missingHash in missingHashes)
{
//Find the proper block
- blockIndex = treeHash.Result.HashDictionary[missingHash];
+ var blockIndex = treeHash.Result.HashDictionary[missingHash];
var offset = blockIndex*BlockSize;
var read = fileInfo.Read(buffer, offset, BlockSize);
- if (read > 0)
- {
- //Copy the actual block data out of the buffer
- var data = new byte[read];
- Buffer.BlockCopy(buffer, 0, data, 0, read);
-
- //And POST them
- CloudClient.PostBlock(PithosContainer, data).Wait();
- Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,
- fullFileName);
- }
- }
- catch (Exception exc)
- {
- Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc);
+
+ //And upload the block
+ var postBlock = CloudClient.PostBlock(PithosContainer, buffer, 0, read);
+
+ //We have to handle possible exceptions in a continuation because
+ //*yield return* can't appear inside a try block
+ yield return postBlock.ContinueWith(t =>
+ t.ReportExceptions(
+ exc=>Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}",blockIndex, fullFileName, exc),
+ ()=>Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
}
- }
- UploadWithHashMap(fileInfo, url, treeHash);
-
+ //Repeat until there are no more missing hashes
+ hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
+ yield return hashPut;
+ missingHashes = hashPut.Result;
+ }
}
string fileName = Path.GetFileName(path);
//Bypass deleted files, unless the status is Deleted
- if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+ if (!File.Exists(path) && state.Status != FileStatus.Deleted)
{
state.Skip = true;
this.StatusKeeper.RemoveFileOverlayStatus(path);
return CompletedTask<object>.Default;
}
var fileState = FileState.FindByFilePath(path);
+ var blockHash = NetworkAgent.BlockHash;
+ var blockSize = NetworkAgent.BlockSize;
+
switch (state.Status)
{
case FileStatus.Created:
case FileStatus.Modified:
var info = new FileInfo(path);
- NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState));
+ NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
break;
case FileStatus.Deleted:
- NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState));
+ NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
break;
case FileStatus.Renamed:
NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
public void RestartInterruptedFiles()
{
- StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
- var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
-
- var pendingEntries = (from state in FileState.Queryable
- where interruptedStates.Contains(state.OverlayStatus) &&
- !state.FilePath.StartsWith(FragmentsPath) &&
- !state.FilePath.EndsWith(".ignore")
- select state).ToList();
- var staleEntries = from state in pendingEntries
- where !File.Exists(state.FilePath)
- select state;
- var staleKeys = staleEntries.Select(state=>state.Id);
- FileState.DeleteAll(staleKeys);
-
- var validEntries = from state in pendingEntries.Except(staleEntries)
- where File.Exists(state.FilePath)
+ StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
+
+ var pendingEntries = from state in FileState.Queryable
+ where state.FileStatus != FileStatus.Unchanged &&
+ !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
+ !state.FilePath.EndsWith(".ignore")
+ select state;
+
+ var validEntries = from state in pendingEntries
select new WorkflowState
{
Path = state.FilePath.ToLower(),
Hash = state.Checksum,
Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
FileStatus.Created :
- FileStatus.Modified,
+ state.FileStatus,
TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
WatcherChangeTypes.Created :
WatcherChangeTypes.Changed
using Castle.ActiveRecord;
using Castle.ActiveRecord.Framework;
using NHibernate.Engine;
+using Pithos.Core.Agents;
using Pithos.Interfaces;
using Pithos.Network;
[Property]
public string Checksum { get; set; }
+/*
[Property]
public string TopHash { get; set; }
+*/
[Property]
public long? Version { get; set; }
public static FileState FindByFilePath(string absolutePath)
{
+ if (string.IsNullOrWhiteSpace(absolutePath))
+ throw new ArgumentNullException("absolutePath");
+ Contract.EndContractBlock();
return Queryable.FirstOrDefault(s => s.FilePath == absolutePath.ToLower());
}
+ public static void DeleteByFilePath(string absolutePath)
+ {
+ if(string.IsNullOrWhiteSpace(absolutePath))
+ throw new ArgumentNullException("absolutePath");
+ Contract.EndContractBlock();
+
+ var stateKeys = from state in FileState.Queryable
+ where state.FilePath == absolutePath.ToLower()
+ select state.Id;
+ DeleteAll(stateKeys);
+ }
+
public static Task<FileState> CreateForAsync(string filePath,int blockSize,string algorithm)
{
if (blockSize <= 0)
//Skip updating the hash for folders
if (Directory.Exists(FilePath))
- return Task.Factory.FromResult(this);
+ return Task.Factory.FromResult(this);
- var results=Task.Factory.TrackedSequence(
- () => Task.Factory.StartNew(() => Signature.CalculateMD5(FilePath)),
- () => Signature.CalculateTreeHashAsync(FilePath, blockSize, algorithm)
- );
+ var results = Task.Factory.StartNew(() =>
+ {
+ var info = new FileInfo(FilePath);
+ return info.CalculateHash(blockSize, algorithm);
+ });
- var state=results.Then(hashes =>
+ var state=results.Then(hash =>
{
- Checksum = (hashes[0] as Task<string>).Result;
- TopHash = (hashes[1] as Task<TreeHash>).Result.TopHash.ToHashString();
+ Checksum = hash;
return Task.Factory.FromResult(this);
});
void ClearFileStatus(string path);
void SetPithosStatus(PithosStatus status);
FileOverlayStatus GetFileOverlayStatus(string path);
- IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths);
+ void ProcessExistingFiles(IEnumerable<FileInfo> paths);
void Stop();
void SetFileState(string path, FileStatus fileStatus, FileOverlayStatus overlayStatus);
void StoreInfo(string path, ObjectInfo objectInfo);
return default(FileOverlayStatus);
}
- public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+ public void ProcessExistingFiles(IEnumerable<FileInfo> paths)
{
Contract.Requires(paths!=null);
- return default(IEnumerable<string>);
}
public void Stop()
return FileOverlayStatus.Unversioned;
}
- public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+ public void StoreUnversionedFiles(IEnumerable<FileInfo> paths)
{
var newFiles = (from file in paths
<Compile Include="PithosMonitor.cs" />
<Compile Include="PithosWorkflow.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="InMemStatusChecker.cs" />
<Compile Include="StatusInfo.cs" />
<Compile Include="StatusService.cs" />
<Compile Include="TaskExtensions.cs" />
try
{
var fragmentsPath=Path.Combine(RootPath, FragmentsFolder);
+ var directory = new DirectoryInfo(path);
var files =
- from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
- where !filePath.StartsWith(fragmentsPath,StringComparison.InvariantCultureIgnoreCase) &&
- !filePath.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase)
- select filePath.ToLower();
- StatusKeeper.StoreUnversionedFiles(files);
+ from file in directory.EnumerateFiles("*", SearchOption.AllDirectories)
+ where !file.FullName.StartsWith(fragmentsPath,StringComparison.InvariantCultureIgnoreCase) &&
+ !file.Extension.Equals("ignore",StringComparison.InvariantCultureIgnoreCase)
+ select file;
+ StatusKeeper.ProcessExistingFiles(files);
}
catch (Exception exc)
return false;
if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
return false;
- if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
- return false;
+ if (x.CloudFile != null && y.CloudFile != null )
+ {
+ if (x.CloudFile.Hash == null & y.CloudFile.Hash != null)
+ return false;
+ if (x.CloudFile.Hash != null & y.CloudFile.Hash == null)
+ return false;
+ if (x.CloudFile.Hash == null & y.CloudFile.Hash == null)
+ return (x.CloudFile.Name == y.CloudFile.Name);
+ if (!x.CloudFile.Hash.Equals(y.CloudFile.Hash))
+ return false;
+ }
if (x.CloudFile == null ^ y.CloudFile == null ||
x.LocalFile == null ^ y.LocalFile == null)
return false;
public override int GetHashCode(CloudAction obj)
{
var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
- var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
+ var hash2 = (obj.CloudFile == null) ? int.MaxValue : (obj.CloudFile.Hash ?? obj.CloudFile.Name).GetHashCode();
var hash3 = obj.Action.GetHashCode();
return hash1 ^ hash2 & hash3;
}
var stream=File.OpenRead(path);
return stream;
}
- catch (Exception ex)
+ catch
{
Thread.Sleep(500);
if (++counter > 10)
using Castle.ActiveRecord.Framework.Config;
using NHibernate.Criterion;
using NHibernate.Impl;
+using Pithos.Core.Agents;
using Pithos.Interfaces;
using Pithos.Network;
if (!Directory.Exists(_pithosDataPath))
Directory.CreateDirectory(_pithosDataPath);
- File.Delete(Path.Combine(_pithosDataPath, "pithos.db"));
+ //File.Delete(Path.Combine(_pithosDataPath, "pithos.db"));
var source = GetConfiguration(_pithosDataPath);
ActiveRecordStarter.Initialize(source,typeof(FileState),typeof(FileTag));
}
- public IEnumerable<string> StoreUnversionedFiles(ParallelQuery<string> paths)
+ public void ProcessExistingFiles(IEnumerable<FileInfo> existingFiles)
{
- var existingFiles = (from state in FileState.Queryable
- select state.FilePath.ToLower()).ToList();
-
- var newFiles = paths.Except(existingFiles.AsParallel());
-
+ if(existingFiles ==null)
+ throw new ArgumentNullException("existingFiles");
+ Contract.EndContractBlock();
+ Dictionary<int, int> j;
-
- newFiles.ForAll(file =>
+ //Find new or matching files with a left join to the stored states
+ var fileStates = FileState.Queryable;
+ var currentFiles=from file in existingFiles
+ join state in fileStates on file.FullName.ToLower() equals state.FilePath.ToLower() into gs
+ from substate in gs.DefaultIfEmpty()
+ select new {File = file, State = substate};
+
+ //To get the deleted files we must get the states that have no corresponding
+ //files.
+ //We can't use the File.Exists method inside a query, so we get all file paths from the states
+ var statePaths = (from state in fileStates
+ select new {state.Id, state.FilePath}).ToList();
+ //and check each one
+ var missingStates= (from path in statePaths
+ where !File.Exists(path.FilePath)
+ select path.Id).ToList();
+ //Finally, retrieve the states that correspond to the deleted files
+ var deletedFiles = from state in fileStates
+ where missingStates.Contains(state.Id)
+ select new { File = default(FileInfo), State = state };
+
+ var pairs = currentFiles.Union(deletedFiles);
+
+ Parallel.ForEach(pairs, pair =>
{
-
- var createState = FileState.CreateForAsync(file,this.BlockSize,this.BlockHash)
- .ContinueWith(state =>{
- _persistenceAgent.Post(state.Result.Create);
- return state.Result;
- });
-
- /*Func<Guid, Task<TreeHash>> treeBuilder = (stateId) =>
- Signature.CalculateTreeHashAsync(file, BlockSize, BlockHash)
- .ContinueWith(treeTask =>
+ var fileState = pair.State;
+ var file = pair.File;
+ if (fileState == null)
+ {
+ //This is a new file
+ var fullPath = pair.File.FullName.ToLower();
+ var createState = FileState.CreateForAsync(fullPath, this.BlockSize, this.BlockHash);
+ createState.ContinueWith(state => _persistenceAgent.Post(state.Result.Create));
+ }
+ else if (file == null)
+ {
+ //This file was deleted while we were down. We should mark it as deleted
+ //We have to go through UpdateStatus here because the state object we are using
+ //was created by a different ORM session.
+ UpdateStatus(fileState.Id,state=> state.FileStatus = FileStatus.Deleted);
+ }
+ else
+ {
+ //This file has a matching state. Need to check for possible changes
+ var hashString = file.CalculateHash(BlockSize,BlockHash);
+ //If the hashes don't match the file was changed
+ if (fileState.Checksum != hashString)
{
- var treeHash = treeTask.Result;
- treeHash.FileId = stateId;
- return treeHash;
- });*/
-
- /* var createTree=createState.ContinueWith(stateTask =>
- treeBuilder(stateTask.Result.Id))
- .Unwrap();
-
- var saveTree=createTree.ContinueWith(treeTask =>
- treeTask.Result.Save(_pithosDataPath));*/
- });
-
- return newFiles;
-
+ UpdateStatus(fileState.Id, state => state.FileStatus = FileStatus.Modified);
+ }
+ }
+ });
+
}
+
+
public string BlockHash { get; set; }
});
}
+
+ /// <summary>
+ /// Sets the status of a specific state
+ /// </summary>
+ /// <param name="path"></param>
+ /// <param name="setter"></param>
+ private void UpdateStatus(Guid stateID, Action<FileState> setter)
+ {
+ _persistenceAgent.Post(() =>
+ {
+ using (new SessionScope())
+ {
+ var state = FileState.Find(stateID);
+ if (state == null)
+ {
+ Trace.TraceWarning("[NOFILE] Unable to set status for {0}.", stateID);
+ return;
+ }
+ setter(state);
+ state.Save();
+ }
+
+ });
+ }
public FileOverlayStatus GetFileOverlayStatus(string path)
{
state.Version = objectInfo.Version;
state.VersionTimeStamp = objectInfo.VersionTimestamp;
- if(objectInfo.Bytes>BlockSize)
- state.TopHash = objectInfo.Hash;
state.FileStatus = FileStatus.Unchanged;
state.OverlayStatus = FileOverlayStatus.Normal;
public void ClearFileStatus(string path)
{
//TODO:SHOULDN'T need both clear file status and remove overlay status
- _persistenceAgent.Post(()=>
- FileState.DeleteAll(new[] { path.ToLower() }));
+ _persistenceAgent.Post(()=> FileState.DeleteByFilePath(path));
}
public void UpdateFileChecksum(string path, string checksum)
{
return Then(first, next, CancellationToken.None);
}
-
- public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next,CancellationToken cancellationToken)
+
+ public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next, CancellationToken cancellationToken)
{
if (first == null) throw new ArgumentNullException("first");
if (next == null) throw new ArgumentNullException("next");
- var tcs = new TaskCompletionSource<T2>();
+ var tcs = new TaskCompletionSource<T2>();
first.ContinueWith(delegate
{
if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
}
catch (Exception exc) { tcs.TrySetException(exc); }
}
- },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+ }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return tcs.Task;
- }
+ }
- public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next,CancellationToken cancellationToken)
+ public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next, CancellationToken cancellationToken)
{
if (first == null) throw new ArgumentNullException("first");
if (next == null) throw new ArgumentNullException("next");
- var tcs = new TaskCompletionSource<object>();
+ var tcs = new TaskCompletionSource<object>();
first.ContinueWith(delegate
{
if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
}
catch (Exception exc) { tcs.TrySetException(exc); }
}
- },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+ }, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
return tcs.Task;
- }
+ }
+
+
+
+ public static void ReportExceptions(this Task task,Action<AggregateException> OnError,Action OnSuccess )
+ {
+ if (!task.IsCompleted) throw new InvalidOperationException("The task has not completed.");
+ if (task.IsFaulted)
+ task.Exception.Handle(exc=>
+ {
+ OnError(task.Exception);
+ return true;
+ });
+ else
+ {
+ OnSuccess();
+ }
+ }
}
-}
+}
\ No newline at end of file
}
}
- catch(RetryException e)
+ catch(RetryException)
{
Trace.TraceWarning("[RETRY FAIL] GetObjectInfo for {0} failed.");
return ObjectInfo.Empty;
}
- public Task PostBlock(string container,byte[] block)
+ public Task PostBlock(string container,byte[] block,int offset,int count)
{
if (String.IsNullOrWhiteSpace(container))
throw new ArgumentNullException("container");
if (block == null)
throw new ArgumentNullException("block");
+ if (offset < 0 || offset >= block.Length)
+ throw new ArgumentOutOfRangeException("offset");
+ if (count < 0 || count > block.Length)
+ throw new ArgumentOutOfRangeException("count");
if (String.IsNullOrWhiteSpace(Token))
throw new InvalidOperationException("Invalid Token");
if (StorageUrl == null)
- throw new InvalidOperationException("Invalid Storage Url");
+ throw new InvalidOperationException("Invalid Storage Url");
Contract.EndContractBlock();
var builder = GetAddressBuilder(container, "");
using (var client = new RestClient(_baseClient))
{
- client.Headers.Add("X-Copy-From", sourceUrl);
+ client.Headers.Add("X-Move-From", sourceUrl);
client.PutWithRetry(targetUrl, 3);
var expectedCodes = new[] {HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created};
- if (expectedCodes.Contains(client.StatusCode))
- {
- this.DeleteObject(sourceContainer, oldObjectName);
- }
- else
+ if (!expectedCodes.Contains(client.StatusCode))
throw CreateWebException("MoveObject", client.StatusCode);
}
}
+ public void DeleteObject(string sourceContainer, string objectName, string targetContainer)
+ {
+ if (String.IsNullOrWhiteSpace(sourceContainer))
+ throw new ArgumentNullException("sourceContainer", "The container property can't be empty");
+ if (String.IsNullOrWhiteSpace(objectName))
+ throw new ArgumentNullException("objectName", "The oldObjectName property can't be empty");
+ if (String.IsNullOrWhiteSpace(targetContainer))
+ throw new ArgumentNullException("targetContainer", "The container property can't be empty");
+
+ var targetUrl = targetContainer + "/" + objectName;
+ var sourceUrl = String.Format("/{0}/{1}", sourceContainer, objectName);
+
+ using (var client = new RestClient(_baseClient))
+ {
+ client.Headers.Add("X-Move-From", sourceUrl);
+ client.PutWithRetry(targetUrl, 3);
+
+ var expectedCodes = new[] {HttpStatusCode.OK, HttpStatusCode.NoContent, HttpStatusCode.Created,HttpStatusCode.NotFound};
+ if (!expectedCodes.Contains(client.StatusCode))
+ throw CreateWebException("DeleteObject", client.StatusCode);
+ }
+ }
+
private static WebException CreateWebException(string operation, HttpStatusCode statusCode)
{
Task GetObject(string container, string objectName, string fileName);
Task PutObject(string container, string objectName, string fileName, string hash = null);
+ void DeleteObject(string container, string objectName, string trashContainer);
void DeleteObject(string container, string objectName);
void MoveObject(string sourceContainer, string oldObjectName, string targetContainer,string newObjectName);
bool ObjectExists(string container,string objectName);
Task<TreeHash> GetHashMap(string container, string objectName);
Task<IList<string>> PutHashMap(string container, string objectName, TreeHash hash);
- Task PostBlock(string container,byte[] block);
+ Task PostBlock(string container,byte[] block,int offset,int count);
Task<byte[]> GetBlock(string container, Uri relativeUrl, long start, long? end);
}
return default(Task);
}
- public void DeleteObject(string container, string objectName)
+ public void DeleteObject(string container, string objectName, string trashContainer)
{
Contract.Requires(!String.IsNullOrWhiteSpace(Token));
Contract.Requires(StorageUrl!=null);
Contract.Requires(!String.IsNullOrWhiteSpace(container));
Contract.Requires(!String.IsNullOrWhiteSpace(objectName));
+ Contract.Requires(!String.IsNullOrWhiteSpace(trashContainer));
+ }
+
+ public void DeleteObject(string container, string objectName)
+ {
+ Contract.Requires(!String.IsNullOrWhiteSpace(Token));
+ Contract.Requires(StorageUrl!=null);
+ Contract.Requires(!String.IsNullOrWhiteSpace(container));
+ Contract.Requires(!String.IsNullOrWhiteSpace(objectName));
}
public void MoveObject(string sourceContainer, string oldObjectName, string targetContainer,string newObjectName)
return default(Task<IList<string>>);
}
- public Task PostBlock(string container,byte[] block)
+ public Task PostBlock(string container,byte[] block,int offset,int count)
{
Contract.Requires(!String.IsNullOrWhiteSpace(Token));
Contract.Requires(StorageUrl != null);
Contract.Requires(!String.IsNullOrWhiteSpace(container));
Contract.Requires(block != null);
-
+ Contract.Requires(offset>=0);
+ Contract.Requires(offset < block.Length);
+ Contract.Requires(count>=0);
+ Contract.Requires(count <= block.Length);
+
return default(Task);
}
ResponseHeaders.Clear();
TraceStart(method, uriString);
-
+ if (method == "PUT")
+ request.ContentLength = 0;
var response = (HttpWebResponse)GetWebResponse(request);
StatusCode = response.StatusCode;
StatusDescription = response.StatusDescription;
throw new ArgumentNullException("algorithm");
Contract.EndContractBlock();
- //DON'T calculate hashes for folders
- if (Directory.Exists(filePath))
- return null;
-
-
- var list = new List<byte[]>();
- using (var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, blockSize, false))
- using (var hasher = HashAlgorithm.Create(algorithm))
- {
- var buffer = new byte[blockSize];
- int read;
- while ((read=stream.Read(buffer, 0, blockSize)) > 0)
- {
- //This code was added for compatibility with the way Pithos calculates the last hash
- //We calculate the hash only up to the last non-null byte
- //TODO: Remove if the server starts using the full block instead of the trimmed block
- var lastByteIndex=Array.FindLastIndex(buffer,read-1, aByte => aByte != 0);
-
- var hash = hasher.ComputeHash(buffer, 0, lastByteIndex+1);
- list.Add(hash);
- }
- return new TreeHash(algorithm) { Hashes = list,
- BlockSize = blockSize,
- Bytes = stream.Length};
- }
+ var hash=CalculateTreeHashAsync(filePath, blockSize, algorithm);
+ return hash.Result;
}
public static Task<TreeHash> CalculateTreeHashAsync(FileInfo fileInfo, int blockSize, string algorithm)