Uploading and downloading with hashes
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using Pithos.Interfaces;
10
11 namespace Pithos.Core.Agents
12 {
13     [Export]
14     public class WorkflowAgent
15     {
16         Agent<WorkflowState> _agent;
17                 
18         public IStatusNotification StatusNotification { get; set; }
19         [Import]
20         public IStatusKeeper StatusKeeper { get; set; }
21
22         //We should avoid processing files stored in the Fragments folder
23         //The Full path to the fragments folder is stored in FragmentsPath
24         public string FragmentsPath { get; set; }
25
26         [Import]
27         public NetworkAgent NetworkAgent { get; set; }
28
29         public void Start()
30         {
31             _agent = Agent<WorkflowState>.Start(inbox =>
32             {
33                 Action loop = null;
34                 loop = () =>
35                 {
36                     var message = inbox.Receive();
37                     var process = message.ContinueWith(t =>
38                     {
39                         var state = t.Result;
40                         Process(state);
41                         inbox.DoAsync(loop);
42                     });
43                     process.ContinueWith(t =>
44                     {
45                         inbox.DoAsync(loop);
46                         if (t.IsFaulted)
47                         {
48                             var ex = t.Exception.InnerException;
49                             if (ex is OperationCanceledException)
50                                 inbox.Stop();
51                             Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
52                         }
53                     });
54
55                 };
56                 loop();
57             });
58         }
59
60         public void RestartInterruptedFiles()
61         {
62             
63             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
64             var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
65
66             var pendingEntries = (from state in FileState.Queryable
67                                  where interruptedStates.Contains(state.OverlayStatus) &&
68                                        !state.FilePath.StartsWith(FragmentsPath) &&
69                                        !state.FilePath.EndsWith(".ignore")
70                                  select state).ToList();
71             var staleEntries = from state in pendingEntries
72                                   where !File.Exists(state.FilePath)
73                                   select state;
74             var staleKeys = staleEntries.Select(state=>state.Id);
75             FileState.DeleteAll(staleKeys);
76
77             var validEntries = from state in pendingEntries.Except(staleEntries)
78                              where File.Exists(state.FilePath)
79                              select new WorkflowState
80                              {
81                                  Path = state.FilePath.ToLower(),
82                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
83                                  Hash = state.Checksum,
84                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
85                                                    FileStatus.Created :
86                                                    FileStatus.Modified,
87                                  TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
88                                                    WatcherChangeTypes.Created :
89                                                    WatcherChangeTypes.Changed
90                              };
91             foreach (var entry in validEntries)
92             {
93                 Post(entry);
94             }            
95
96         }       
97
98         private void Process(WorkflowState state)
99         {
100             if (state.Skip)
101                 return;
102             string path = state.Path.ToLower();
103             string fileName = Path.GetFileName(path);
104
105             //Bypass deleted files, unless the status is Deleted
106             if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
107             {
108                 state.Skip = true;
109                 this.StatusKeeper.RemoveFileOverlayStatus(path);
110                 return;
111             }
112             var fileState = FileState.FindByFilePath(path);
113             switch (state.Status)
114             {
115                 case FileStatus.Created:
116                 case FileStatus.Modified:
117                     var info = new FileInfo(path);
118                     NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState));
119                     break;
120                 case FileStatus.Deleted:
121                     NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState));                    
122                     break;
123                 case FileStatus.Renamed:
124                     NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
125                     break;
126             }
127
128             return;
129         }
130
131        
132
133         public void Post(WorkflowState workflowState)
134         {
135             _agent.Post(workflowState);
136         }
137     }
138 }