}
}
-/*
- public void AddFromEnumerable(IEnumerable<TMessage> enumerable)
- {
- foreach (var message in enumerable)
- {
- Post(message);
- }
- }
-*/
-
public IEnumerable<TMessage> GetEnumerable()
{
return _messages;
if (onError != null)
onError(ex);
}
- });
+ },CancellationToken);
+ }
+ public Task<T> LoopAsync<T>(Task<T> process, Action loop,Action<Exception> onError=null)
+ {
+ return process.ContinueWith(t =>
+ {
+ //Spawn the Loop immediatelly
+ Task.Factory.StartNew(loop,CancellationToken);
+ //Then process possible exceptions
+ if (t.IsFaulted)
+ {
+ var ex = t.Exception.InnerException;
+ if (ex is OperationCanceledException)
+ Stop();
+ if (onError != null)
+ onError(ex);
+ }
+ return default(T);
+ },CancellationToken);
}
}
}
--- /dev/null
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics.Contracts;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Pithos.Network;
+
+namespace Pithos.Core.Agents
+{
+ class BlockUpdater
+ {
+ public string FilePath { get; private set; }
+ public string RelativePath { get; private set; }
+
+ public string FragmentsPath { get; private set; }
+
+ public TreeHash ServerHash { get; private set; }
+
+ public string TempPath { get; private set; }
+
+
+ public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash)
+ {
+ FragmentsPath=fragmentsPath;
+ FilePath = filePath;
+ RelativePath=relativePath;
+ ServerHash = serverHash;
+
+ Start();
+ }
+
+ public void Start()
+ {
+ //The file will be stored in a temporary location while downloading with an extension .download
+ TempPath = Path.Combine(FragmentsPath, RelativePath + ".download");
+ var directoryPath = Path.GetDirectoryName(TempPath);
+ if (!Directory.Exists(directoryPath))
+ Directory.CreateDirectory(directoryPath);
+ }
+
+
+ public void Commit()
+ {
+ //Copy the file to a temporary location. Changes will be made to the
+ //temporary file, then it will replace the original file
+ File.Copy(FilePath, TempPath, true);
+
+ //Set the size of the file to the size specified in the treehash
+ //This will also create an empty file if the file doesn't exist
+ SetFileSize(TempPath, ServerHash.Bytes);
+
+ //Update the temporary file with the data from the blocks
+ using (var stream = File.OpenWrite(TempPath))
+ {
+ foreach (var block in _blocks)
+ {
+ var blockPath = block.Value;
+ var blockIndex = block.Key;
+ using (var blockStream = File.OpenRead(blockPath))
+ {
+ var offset = blockIndex*ServerHash.BlockSize;
+ stream.Seek(offset, SeekOrigin.Begin);
+ blockStream.CopyTo(stream);
+ }
+ }
+ }
+ SwapFiles();
+
+ ClearBlocks();
+ }
+
+ private void SwapFiles()
+ {
+ if (File.Exists(FilePath))
+ File.Replace(TempPath, FilePath, null, true);
+ else
+ File.Move(TempPath, FilePath);
+ }
+
+ private void ClearBlocks()
+ {
+ foreach (var block in _blocks)
+ {
+ var filePath = block.Value;
+ File.Delete(filePath);
+ }
+ File.Delete(TempPath);
+ _blocks.Clear();
+ }
+
+ //Change the file's size, possibly truncating or adding to it
+ private void SetFileSize(string filePath, long fileSize)
+ {
+ if (String.IsNullOrWhiteSpace(filePath))
+ throw new ArgumentNullException("filePath");
+ if (!Path.IsPathRooted(filePath))
+ throw new ArgumentException("The filePath must be rooted", "filePath");
+ if (fileSize < 0)
+ throw new ArgumentOutOfRangeException("fileSize");
+ Contract.EndContractBlock();
+
+ using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
+ {
+ stream.SetLength(fileSize);
+ }
+ }
+
+ /* //Check whether we should copy the local file to a temp path
+ private bool ShouldCopy(string localPath, string tempPath)
+ {
+ //No need to copy if there is no file
+ if (!File.Exists(localPath))
+ return false;
+
+ //If there is no temp file, go ahead and copy
+ if (!File.Exists(tempPath))
+ return true;
+
+ //If there is a temp file and is newer than the actual file, don't copy
+ var localLastWrite = File.GetLastWriteTime(localPath);
+ var tempLastWrite = File.GetLastWriteTime(tempPath);
+
+ //This could mean there is an interrupted download in progress
+ return (tempLastWrite < localLastWrite);
+ }*/
+
+ ConcurrentDictionary<int,string> _blocks=new ConcurrentDictionary<int, string>();
+
+ public Task StoreBlock(int blockIndex,byte[] buffer)
+ {
+ var blockPath = String.Format("{0}.{1:3}", TempPath, blockIndex);
+ _blocks[blockIndex] = blockPath;
+
+ return FileAsync.WriteAllBytes(blockPath, buffer);
+ }
+
+ private Task WriteAsync(string filePath, byte[] buffer, int offset, int count)
+ {
+ var stream = FileAsync.OpenWrite(filePath);
+ try
+ {
+ stream.Seek(offset, SeekOrigin.Begin);
+ var write = stream.WriteAsync(buffer, 0, count);
+ return write.ContinueWith(s => stream.Close());
+ }
+ catch (Exception ex)
+ {
+ stream.Close();
+ return Task.Factory.FromException(ex);
+ }
+ }
+
+ }
+}
using System.IO;
using System.Linq;
using System.Text;
+using System.Threading.Tasks;
using Pithos.Interfaces;
using Pithos.Network;
loop = () =>
{
var message = inbox.Receive();
- var process = message.ContinueWith(t =>
- {
- var state = t.Result;
- Process(state);
- inbox.DoAsync(loop);
- });
-
- process.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
- {
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- Trace.TraceError("[ERROR] File Event Processing:\r{0}", ex);
- }
- });
+ var process=message.Then(Process,inbox.CancellationToken);
+ inbox.LoopAsync(process,loop,ex=>
+ Trace.TraceError("[ERROR] File Event Processing:\r{0}", ex));
};
loop();
});
}
+ private Task<object> Process(WorkflowState state)
+ {
+ Debug.Assert(!Ignore(state.Path));
+
+ var networkState = NetworkGate.GetNetworkState(state.Path);
+ //Skip if the file is already being downloaded or uploaded and
+ //the change is create or modify
+ if (networkState != NetworkOperation.None &&
+ (
+ state.TriggeringChange == WatcherChangeTypes.Created ||
+ state.TriggeringChange == WatcherChangeTypes.Changed
+ ))
+ return CompletedTask<object>.Default;
+
+ try
+ {
+ UpdateFileStatus(state);
+ UpdateOverlayStatus(state);
+ UpdateFileChecksum(state);
+ WorkflowAgent.Post(state);
+ }
+ catch (IOException exc)
+ {
+ Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
+ _agent.Post(state);
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceWarning("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
+ }
+ return CompletedTask<object>.Default;
+ }
+
+
+/*
+ private Task Process(Task<WorkflowState> action)
+ {
+ return action.ContinueWith(t => Process(t.Result));
+ }
+*/
+
+
public bool Pause
{
get { return _watcher == null || !_watcher.EnableRaisingEvents; }
}
- private void Process(WorkflowState state)
- {
- Debug.Assert(!Ignore(state.Path));
-
- var networkState = NetworkGate.GetNetworkState(state.Path);
- //Skip if the file is already being downloaded or uploaded and
- //the change is create or modify
- if (networkState != NetworkOperation.None &&
- (
- state.TriggeringChange == WatcherChangeTypes.Created ||
- state.TriggeringChange == WatcherChangeTypes.Changed
- ))
- return;
-
- try
- {
- UpdateFileStatus(state);
- UpdateOverlayStatus(state);
- UpdateFileChecksum(state);
- WorkflowAgent.Post(state);
- }
- catch (IOException exc)
- {
- Trace.TraceWarning("File access error occured, retrying {0}\n{1}", state.Path, exc);
- _agent.Post(state);
- }
- catch (Exception exc)
- {
- Trace.TraceWarning("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
- }
- }
private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
{
if (File.Exists(absolutePath))
return true;
//Or a directory?
- if (Directory.Exists(RootPath))
+ if (Directory.Exists(absolutePath))
return true;
//Fail if it is neither
return false;
loop = () =>
{
var message = inbox.Receive();
-
-/*
- var process=Process(message);
+ var process=message.Then(Process,inbox.CancellationToken);
inbox.LoopAsync(process, loop);
-*/
-
-/*
- process1.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
- {
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- }
- });
-*/
- //inbox.DoAsync(loop);
+ };
+ loop();
+ });
+ }
+ private Task<object> Process(CloudAction action)
+ {
+ if (action == null)
+ throw new ArgumentNullException("action");
+ Contract.EndContractBlock();
- var process = message.ContinueWith(t =>
- {
- var action = t.Result;
-
- Process(action);
- inbox.DoAsync(loop);
- });
+ Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+ var localFile = action.LocalFile;
+ var cloudFile = action.CloudFile;
+ var downloadPath = (cloudFile == null) ? String.Empty
+ : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
- process.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
+ try
+ {
+ switch (action.Action)
+ {
+ case CloudActionType.UploadUnconditional:
+ UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
+ break;
+ case CloudActionType.DownloadUnconditional:
+ DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name, UriKind.Relative), downloadPath);
+ break;
+ case CloudActionType.DeleteCloud:
+ DeleteCloudFile(cloudFile.Name);
+ break;
+ case CloudActionType.RenameCloud:
+ RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
+ break;
+ case CloudActionType.MustSynch:
+ if (File.Exists(downloadPath))
{
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- }
- });
+ var cloudHash = cloudFile.Hash;
+ var localHash = action.LocalHash.Value;
+ var topHash = action.TopHash.Value;
+ //Not enough to compare only the local hashes, also have to compare the tophashes
+ if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
+ !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
+ {
+ var lastLocalTime = localFile.LastWriteTime;
+ var lastUpTime = cloudFile.Last_Modified;
+ if (lastUpTime <= lastLocalTime)
+ {
+ //Local change while the app was down or Files in conflict
+ //Maybe need to store version as well, to check who has the latest version
+ //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+ UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
+ }
+ else
+ {
+ var status = StatusKeeper.GetFileStatus(downloadPath);
+ switch (status)
+ {
+ case FileStatus.Unchanged:
+ //It he cloud file has a later date, it was modified by another user or computer.
+ //If the local file's status is Unchanged, we should go on and download the cloud file
+ DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
+ break;
+ case FileStatus.Modified:
+ //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
+ //We can't ensure that a file modified online since the last time will appear as Modified, unless we
+ //index all files before we start listening.
+ StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+ break;
+ case FileStatus.Created:
+ //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
+ //In this case we must mark the file as in conflict
+ //Other cases should never occur. Mark them as Conflict as well but log a warning
+ StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+ break;
+ default:
+ //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
+ //In this case we must mark the file as in conflict
+ //Other cases should never occur. Mark them as Conflict as well but log a warning
+ StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
+ Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}", status, downloadPath, action.CloudFile.Name);
+ break;
+ }
+ }
+ }
+ }
+ else
+ DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
+ break;
+ }
+ Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
+ }
+ catch (OperationCanceledException)
+ {
+ throw;
+ }
+ catch (Exception exc)
+ {
+ Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
+ action.Action, action.LocalFile, action.CloudFile, exc);
- };
- loop();
- });
+ _agent.Post(action);
+ }
+ return CompletedTask<object>.Default;
}
}
}
+ //Remote files are polled periodically. Any changes are processed
public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
- {
+ {
+ if (String.IsNullOrWhiteSpace(accountPath))
+ throw new ArgumentNullException(accountPath);
+ Contract.EndContractBlock();
Trace.CorrelationManager.StartLogicalOperation();
Trace.TraceInformation("[LISTENER] Scheduled");
- var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t =>
- CloudClient.ListObjects(PithosContainer,since));
+
+ //Get the list of server objects changed since the last check
+ var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
+ CloudClient.ListObjects(PithosContainer,since));
+ //Next time we will check for all changes since the current check minus 1 second
+ //This is done to ensure there are no discrepancies due to clock differences
DateTime nextSince = DateTime.Now.AddSeconds(-1);
+
+
var enqueueFiles = listObjects.ContinueWith(task =>
{
}
- private Task Process(Task<CloudAction> action)
- {
- return action.ContinueWith(t=> Process(t.Result));
- }
-
-
- private void Process(CloudAction action)
- {
- if (action==null)
- throw new ArgumentNullException("action");
- Contract.EndContractBlock();
-
- Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
- var localFile = action.LocalFile;
- var cloudFile = action.CloudFile;
- var downloadPath = (cloudFile == null) ? String.Empty
- : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
-
- try
- {
- switch (action.Action)
- {
- case CloudActionType.UploadUnconditional:
- UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
- break;
- case CloudActionType.DownloadUnconditional:
- DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
- break;
- case CloudActionType.DeleteCloud:
- DeleteCloudFile(cloudFile.Name);
- break;
- case CloudActionType.RenameCloud:
- RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
- break;
- case CloudActionType.MustSynch:
- if (File.Exists(downloadPath))
- {
- var cloudHash = cloudFile.Hash;
- var localHash = action.LocalHash.Value;
- var topHash = action.TopHash.Value;
- //Not enough to compare only the local hashes, also have to compare the tophashes
- if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
- !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
- {
- var lastLocalTime = localFile.LastWriteTime;
- var lastUpTime = cloudFile.Last_Modified;
- if (lastUpTime <= lastLocalTime)
- {
- //Local change while the app was down or Files in conflict
- //Maybe need to store version as well, to check who has the latest version
-
- //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
- UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
- }
- else
- {
- var status = StatusKeeper.GetFileStatus(downloadPath);
- switch (status)
- {
- case FileStatus.Unchanged:
- //It he cloud file has a later date, it was modified by another user or computer.
- //If the local file's status is Unchanged, we should go on and download the cloud file
- DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
- break;
- case FileStatus.Modified:
- //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
- //We can't ensure that a file modified online since the last time will appear as Modified, unless we
- //index all files before we start listening.
- StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
- break;
- case FileStatus.Created:
- //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
- //In this case we must mark the file as in conflict
- //Other cases should never occur. Mark them as Conflict as well but log a warning
- StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
- break;
- default:
- //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
- //In this case we must mark the file as in conflict
- //Other cases should never occur. Mark them as Conflict as well but log a warning
- StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
- Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name);
- break;
- }
- }
- }
- }
- else
- DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
- break;
- }
- Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
- }
- catch (OperationCanceledException)
- {
- throw;
- }
- catch (Exception exc)
- {
- Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
- action.Action, action.LocalFile, action.CloudFile, exc);
-
- _agent.Post(action);
- }
-
- }
-
-
+
private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
{
throw new ArgumentNullException("serverHash");
Contract.EndContractBlock();
+
//Calculate the relative file path for the new file
var relativePath = relativeUrl.RelativeUriToFilePath();
- //The file will be stored in a temporary location while downloading with an extension .download
- var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
- var directoryPath = Path.GetDirectoryName(tempPath);
- if (!Directory.Exists(directoryPath))
- Directory.CreateDirectory(directoryPath);
-
- //If the local file exists we should make a copy of it to the
- //fragments folder, unless a newer temp copy already exists, which
- //means there is an interrupted download
- if (ShouldCopy(localPath, tempPath))
- File.Copy(localPath, tempPath, true);
-
- //Set the size of the file to the size specified in the treehash
- //This will also create an empty file if the file doesn't exist
- SetFileSize(tempPath, serverHash.Bytes);
+ var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
+
return Task.Factory.StartNew(() =>
{
//Calculate the temp file's treehash
- var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result;
+ var treeHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash).Result;
//And compare it with the server's hash
var upHashes = serverHash.GetHashesAsStrings();
end= ((i + 1)*BlockSize) ;
//Get its block
- var blockTask = CloudClient.GetBlock(container, relativeUrl,
- start, end);
+ var block= CloudClient.GetBlock(container, relativeUrl,start, end);
+
+ var store=block.Then(b => blockUpdater.StoreBlock(i, b));
+ store.Wait();
- blockTask.ContinueWith(b =>
- {
- //And apply it to the temp file
- var buffer = b.Result;
- var stream =FileAsync.OpenWrite(tempPath);
- stream.Seek(start,SeekOrigin.Begin);
- return stream.WriteAsync(buffer, 0, buffer.Length)
- .ContinueWith(s => stream.Close());
-
- }).Unwrap()
- .Wait();
Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
}
-
}
-
- //Replace the existing file with the temp
- if (File.Exists(localPath))
- File.Replace(tempPath, localPath, null, true);
- else
- File.Move(tempPath, localPath);
+ blockUpdater.Commit();
Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
});
}
- //Change the file's size, possibly truncating or adding to it
- private static void SetFileSize(string filePath, long fileSize)
- {
- if (String.IsNullOrWhiteSpace(filePath))
- throw new ArgumentNullException("filePath");
- if (!Path.IsPathRooted(filePath))
- throw new ArgumentException("The filePath must be rooted", "filePath");
- if (fileSize<0)
- throw new ArgumentOutOfRangeException("fileSize");
- Contract.EndContractBlock();
-
- using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
- {
- stream.SetLength(fileSize);
- }
- }
-
- //Check whether we should copy the local file to a temp path
- private static bool ShouldCopy(string localPath, string tempPath)
- {
- //No need to copy if there is no file
- if (!File.Exists(localPath))
- return false;
- //If there is no temp file, go ahead and copy
- if (!File.Exists(tempPath))
- return true;
-
- //If there is a temp file and is newer than the actual file, don't copy
- var localLastWrite = File.GetLastWriteTime(localPath);
- var tempLastWrite = File.GetLastWriteTime(tempPath);
-
- //This could mean there is an interrupted download in progress
- return (tempLastWrite < localLastWrite);
- }
private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
{
using System.IO;
using System.Linq;
using System.Text;
+using System.Threading.Tasks;
using Pithos.Interfaces;
namespace Pithos.Core.Agents
loop = () =>
{
var message = inbox.Receive();
- var process = message.ContinueWith(t =>
- {
- var state = t.Result;
- Process(state);
- inbox.DoAsync(loop);
- });
- process.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
- {
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
- }
- });
-
+ var process = message.Then(Process, inbox.CancellationToken);
+ inbox.LoopAsync(process,loop,ex=>
+ Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
};
loop();
});
}
+ private Task<object> Process(WorkflowState state)
+ {
+ if (state.Skip)
+ return CompletedTask<object>.Default;
+ string path = state.Path.ToLower();
+ string fileName = Path.GetFileName(path);
+
+ //Bypass deleted files, unless the status is Deleted
+ if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+ {
+ state.Skip = true;
+ this.StatusKeeper.RemoveFileOverlayStatus(path);
+ return CompletedTask<object>.Default;
+ }
+ var fileState = FileState.FindByFilePath(path);
+ switch (state.Status)
+ {
+ case FileStatus.Created:
+ case FileStatus.Modified:
+ var info = new FileInfo(path);
+ NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState));
+ break;
+ case FileStatus.Deleted:
+ NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState));
+ break;
+ case FileStatus.Renamed:
+ NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
+ break;
+ }
+
+ return CompletedTask<object>.Default;
+ }
+
+
+
public void RestartInterruptedFiles()
{
}
- private void Process(WorkflowState state)
- {
- if (state.Skip)
- return;
- string path = state.Path.ToLower();
- string fileName = Path.GetFileName(path);
-
- //Bypass deleted files, unless the status is Deleted
- if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
- {
- state.Skip = true;
- this.StatusKeeper.RemoveFileOverlayStatus(path);
- return;
- }
- var fileState = FileState.FindByFilePath(path);
- switch (state.Status)
- {
- case FileStatus.Created:
- case FileStatus.Modified:
- var info = new FileInfo(path);
- NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState));
- break;
- case FileStatus.Deleted:
- NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState));
- break;
- case FileStatus.Renamed:
- NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));
- break;
- }
-
- return;
- }
-
public void Post(WorkflowState workflowState)
if (String.IsNullOrWhiteSpace(algorithm))
throw new ArgumentNullException("algorithm");
Contract.EndContractBlock();
-
+
//Skip updating the hash for folders
if (Directory.Exists(FilePath))
- return Task.Factory.StartNew(() => this);
+ return Task.Factory.FromResult(this);
var results=Task.Factory.TrackedSequence(
() => Task.Factory.StartNew(() => Signature.CalculateMD5(FilePath)),
() => Signature.CalculateTreeHashAsync(FilePath, blockSize, algorithm)
- );
+ );
- results.ContinueWith(t =>
+ var state=results.Then(hashes =>
{
- var hashes = t.Result;
Checksum = (hashes[0] as Task<string>).Result;
- TopHash = (hashes[0] as Task<TreeHash>).Result.TopHash.ToHashString();
+ TopHash = (hashes[1] as Task<TreeHash>).Result.TopHash.ToHashString();
+ return Task.Factory.FromResult(this);
});
- return results.ContinueWith(t => this);
+ return state;
}
}
</ItemGroup>
<ItemGroup>
<Compile Include="Agents\Agent.cs" />
+ <Compile Include="Agents\BlockUpdater.cs" />
<Compile Include="Agents\CloudAction.cs" />
<Compile Include="Agents\FileAgent.cs" />
<Compile Include="Agents\FileInfoExtensions.cs" />
<Compile Include="InMemStatusChecker.cs" />
<Compile Include="StatusInfo.cs" />
<Compile Include="StatusService.cs" />
+ <Compile Include="TaskExtensions.cs" />
<Compile Include="WorkflowState.cs" />
</ItemGroup>
<ItemGroup>
--- /dev/null
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pithos.Core
+{
+ static class TaskExtensions
+ {
+ public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next)
+ {
+ return Then(first, next, CancellationToken.None);
+ }
+
+ public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next)
+ {
+ return Then(first, next, CancellationToken.None);
+ }
+
+ public static Task<T2> Then<T1, T2>(this Task<T1> first, Func<T1, Task<T2>> next,CancellationToken cancellationToken)
+ {
+ if (first == null) throw new ArgumentNullException("first");
+ if (next == null) throw new ArgumentNullException("next");
+
+ var tcs = new TaskCompletionSource<T2>();
+ first.ContinueWith(delegate
+ {
+ if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
+ else if (first.IsCanceled) tcs.TrySetCanceled();
+ else
+ {
+ try
+ {
+ var t = next(first.Result);
+ if (t == null) tcs.TrySetCanceled();
+ else t.ContinueWith(delegate
+ {
+ if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
+ else if (t.IsCanceled) tcs.TrySetCanceled();
+ else tcs.TrySetResult(t.Result);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ catch (Exception exc) { tcs.TrySetException(exc); }
+ }
+ },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+ return tcs.Task;
+ }
+
+ public static Task Then<T1>(this Task<T1> first, Func<T1, Task> next,CancellationToken cancellationToken)
+ {
+ if (first == null) throw new ArgumentNullException("first");
+ if (next == null) throw new ArgumentNullException("next");
+
+ var tcs = new TaskCompletionSource<object>();
+ first.ContinueWith(delegate
+ {
+ if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
+ else if (first.IsCanceled) tcs.TrySetCanceled();
+ else
+ {
+ try
+ {
+ var t = next(first.Result);
+ if (t == null) tcs.TrySetCanceled();
+ else t.ContinueWith(delegate
+ {
+ if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
+ else if (t.IsCanceled) tcs.TrySetCanceled();
+ else tcs.TrySetResult(null);
+ }, TaskContinuationOptions.ExecuteSynchronously);
+ }
+ catch (Exception exc) { tcs.TrySetException(exc); }
+ }
+ },cancellationToken, TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Current);
+ return tcs.Task;
+ }
+
+ }
+}
Trace.TraceInformation("[BLOCK POST] START");
- //Not much point to this as the server will timeout if we try to get the
- //hashmap too quickly.
- var tcs = new TaskCompletionSource<bool>();
-
- client.UploadProgressChanged += (sender, args) =>
- {
+ client.UploadProgressChanged += (sender, args) =>
Trace.TraceInformation("[BLOCK POST PROGRESS] {0}% {1} of {2}",
- args.ProgressPercentage, args.BytesSent,
- args.TotalBytesToSend);
- if (args.BytesSent == args.TotalBytesToSend)
- tcs.SetResult(false);
- };
+ args.ProgressPercentage, args.BytesSent,
+ args.TotalBytesToSend);
+ client.UploadFileCompleted += (sender, args) =>
+ Trace.TraceInformation("[BLOCK POST PROGRESS] Completed ");
- client.UploadFileCompleted += (sender, args) =>
+
+ //Send the block
+ var uploadTask = client.UploadDataTask(uri, "POST", block)
+ .ContinueWith(upload =>
{
- if (args.Error != null)
- tcs.SetException(args.Error);
- else
- {
- Trace.TraceInformation("[BLOCK POST PROGRESS] Completed ");
- tcs.TrySetResult(true);
- }
- };
-
-
+ client.Dispose();
- client.UploadDataTask(uri, "POST", block);
-
- //Send the block
- var uploadTask = tcs.Task
- .ContinueWith(upload =>
+ if (upload.IsFaulted)
{
- client.Dispose();
-
- if (upload.IsFaulted)
- {
- var exception = upload.Exception.InnerException;
- Trace.TraceError("[BLOCK POST] FAIL with \r{0}", exception);
- throw exception;
- }
- else
- {
- Trace.TraceInformation("[BLOCK POST] END");
- }
- });
+ var exception = upload.Exception.InnerException;
+ Trace.TraceError("[BLOCK POST] FAIL with \r{0}", exception);
+ throw exception;
+ }
+
+ Trace.TraceInformation("[BLOCK POST] END");
+ });
return uploadTask;
}
//DON'T calculate hashes for folders
if (Directory.Exists(filePath))
return Task.Factory.StartNew(()=>new TreeHash(algorithm));
+ //The hash of a non-existent file is the empty hash
+ if (!File.Exists(filePath))
+ return Task.Factory.StartNew(()=>new TreeHash(algorithm));
//Calculate the hash of all blocks using a blockhash iterator
var treeHash =Iterate<TreeHash>(BlockHashIterator(filePath, blockSize, algorithm));
{
var blocks =
new ConcurrentDictionary<string, int>();
+ if (Hashes == null)
+ return blocks;
var blockIndex = 0;
foreach (var hash in this.Hashes)