using System.IO;
using System.Linq;
using System.Text;
+using System.Threading.Tasks;
using Pithos.Interfaces;
namespace Pithos.Core.Agents
loop = () =>
{
var message = inbox.Receive();
- var process = message.ContinueWith(t =>
- {
- var state = t.Result;
- Process(state);
- inbox.DoAsync(loop);
- });
- process.ContinueWith(t =>
- {
- inbox.DoAsync(loop);
- if (t.IsFaulted)
- {
- var ex = t.Exception.InnerException;
- if (ex is OperationCanceledException)
- inbox.Stop();
- Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
- }
- });
-
+ var process = message.Then(Process, inbox.CancellationToken);
+ inbox.LoopAsync(process,loop,ex=>
+ Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
};
loop();
});
}
- public void RestartInterruptedFiles()
- {
-
- StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
- var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
-
- var pendingEntries = (from state in FileState.Queryable
- where interruptedStates.Contains(state.OverlayStatus) &&
- !state.FilePath.StartsWith(FragmentsPath) &&
- !state.FilePath.EndsWith(".ignore")
- select state).ToList();
- var staleEntries = from state in pendingEntries
- where !File.Exists(state.FilePath)
- select state;
- var staleKeys = staleEntries.Select(state=>state.Id);
- FileState.DeleteAll(staleKeys);
-
- var validEntries = from state in pendingEntries.Except(staleEntries)
- where File.Exists(state.FilePath)
- select new WorkflowState
- {
- Path = state.FilePath.ToLower(),
- FileName = Path.GetFileName(state.FilePath).ToLower(),
- Hash = state.Checksum,
- Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- FileStatus.Created :
- FileStatus.Modified,
- TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
- WatcherChangeTypes.Created :
- WatcherChangeTypes.Changed
- };
-
- _agent.AddFromEnumerable(validEntries);
-
-
- }
-
- private void Process(WorkflowState state)
+ private Task<object> Process(WorkflowState state)
{
if (state.Skip)
- return;
+ return CompletedTask<object>.Default;
string path = state.Path.ToLower();
string fileName = Path.GetFileName(path);
//Bypass deleted files, unless the status is Deleted
- if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
+ if (!File.Exists(path) && state.Status != FileStatus.Deleted)
{
state.Skip = true;
this.StatusKeeper.RemoveFileOverlayStatus(path);
- return;
+ return CompletedTask<object>.Default;
}
+ var fileState = FileState.FindByFilePath(path);
+ var blockHash = NetworkAgent.BlockHash;
+ var blockSize = NetworkAgent.BlockSize;
switch (state.Status)
{
case FileStatus.Created:
case FileStatus.Modified:
var info = new FileInfo(path);
- NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty));
+ NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty, fileState,blockSize,blockHash));
break;
case FileStatus.Deleted:
- NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName}));
+ NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo { Name = fileName }, fileState, blockSize, blockHash));
break;
case FileStatus.Renamed:
- NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));
+ NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName, state.OldPath, state.FileName, state.Path));
break;
}
- return;
+ return CompletedTask<object>.Default;
}
+
+
+ public void RestartInterruptedFiles()
+ {
+
+ StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
+
+ var pendingEntries = from state in FileState.Queryable
+ where state.FileStatus != FileStatus.Unchanged &&
+ !state.FilePath.StartsWith(FragmentsPath.ToLower()) &&
+ !state.FilePath.EndsWith(".ignore")
+ select state;
+
+ var validEntries = from state in pendingEntries
+ select new WorkflowState
+ {
+ Path = state.FilePath.ToLower(),
+ FileName = Path.GetFileName(state.FilePath).ToLower(),
+ Hash = state.Checksum,
+ Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
+ FileStatus.Created :
+ state.FileStatus,
+ TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
+ WatcherChangeTypes.Created :
+ WatcherChangeTypes.Changed
+ };
+ foreach (var entry in validEntries)
+ {
+ Post(entry);
+ }
+
+ }
+
public void Post(WorkflowState workflowState)