Fixed ceiling calculation
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using Pithos.Interfaces;
10
11 namespace Pithos.Core.Agents
12 {
13     [Export]
14     public class WorkflowAgent
15     {
16         Agent<WorkflowState> _agent;
17                 
18         public IStatusNotification StatusNotification { get; set; }
19         [Import]
20         public IStatusKeeper StatusKeeper { get; set; }
21
22         //We should avoid processing files stored in the Fragments folder
23         //The Full path to the fragments folder is stored in FragmentsPath
24         public string FragmentsPath { get; set; }
25
26         [Import]
27         public NetworkAgent NetworkAgent { get; set; }
28
29         public void Start()
30         {
31             _agent = Agent<WorkflowState>.Start(inbox =>
32             {
33                 Action loop = null;
34                 loop = () =>
35                 {
36                     var message = inbox.Receive();
37                     var process = message.ContinueWith(t =>
38                     {
39                         var state = t.Result;
40                         Process(state);
41                         inbox.DoAsync(loop);
42                     });
43                     process.ContinueWith(t =>
44                     {
45                         inbox.DoAsync(loop);
46                         if (t.IsFaulted)
47                         {
48                             var ex = t.Exception.InnerException;
49                             if (ex is OperationCanceledException)
50                                 inbox.Stop();
51                             Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
52                         }
53                     });
54
55                 };
56                 loop();
57             });
58         }
59
60         public void RestartInterruptedFiles()
61         {
62             
63             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
64             var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
65
66             var pendingEntries = (from state in FileState.Queryable
67                                  where interruptedStates.Contains(state.OverlayStatus) &&
68                                        !state.FilePath.StartsWith(FragmentsPath) &&
69                                        !state.FilePath.EndsWith(".ignore")
70                                  select state).ToList();
71             var staleEntries = from state in pendingEntries
72                                   where !File.Exists(state.FilePath)
73                                   select state;
74             var staleKeys = staleEntries.Select(state=>state.Id);
75             FileState.DeleteAll(staleKeys);
76
77             var validEntries = from state in pendingEntries.Except(staleEntries)
78                              where File.Exists(state.FilePath)
79                              select new WorkflowState
80                              {
81                                  Path = state.FilePath.ToLower(),
82                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
83                                  Hash = state.Checksum,
84                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
85                                                    FileStatus.Created :
86                                                    FileStatus.Modified,
87                                  TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
88                                                    WatcherChangeTypes.Created :
89                                                    WatcherChangeTypes.Changed
90                              };
91
92             _agent.AddFromEnumerable(validEntries);
93
94
95         }       
96
97         private void Process(WorkflowState state)
98         {
99             if (state.Skip)
100                 return;
101             string path = state.Path.ToLower();
102             string fileName = Path.GetFileName(path);
103
104             //Bypass deleted files, unless the status is Deleted
105             if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
106             {
107                 state.Skip = true;
108                 this.StatusKeeper.RemoveFileOverlayStatus(path);
109                 return;
110             }
111
112             switch (state.Status)
113             {
114                 case FileStatus.Created:
115                 case FileStatus.Modified:
116                     var info = new FileInfo(path);
117                     NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty));
118                     break;
119                 case FileStatus.Deleted:
120                     NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName}));                    
121                     break;
122                 case FileStatus.Renamed:
123                     NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
124                     break;
125             }
126
127             return;
128         }
129
130        
131
132         public void Post(WorkflowState workflowState)
133         {
134             _agent.Post(workflowState);
135         }
136     }
137 }