Refactored to agents
[pithos-ms-client] / trunk / Pithos.Core / Agents / WorkflowAgent.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics;
5 using System.Diagnostics.Contracts;
6 using System.IO;
7 using System.Linq;
8 using System.Text;
9 using Pithos.Interfaces;
10
11 namespace Pithos.Core.Agents
12 {
13     [Export]
14     public class WorkflowAgent
15     {
16         Agent<WorkflowState> _agent;
17                 
18         public IStatusNotification StatusNotification { get; set; }
19         [Import]
20         public IStatusKeeper StatusKeeper { get; set; }
21         [Import]
22         public IPithosWorkflow Workflow { get; set; }
23
24         [Import]
25         public NetworkAgent NetworkAgent { get; set; }
26
27         public void Start()
28         {
29             _agent = Agent<WorkflowState>.Start(inbox =>
30             {
31                 Action loop = null;
32                 loop = () =>
33                 {
34                     var message = inbox.Receive();
35                     var process = message.ContinueWith(t =>
36                     {
37                         var state = t.Result;
38                         Process(state);
39                         inbox.DoAsync(loop);
40                     });
41                     process.ContinueWith(t =>
42                     {
43                         inbox.DoAsync(loop);
44                         if (t.IsFaulted)
45                         {
46                             var ex = t.Exception.InnerException;
47                             if (ex is OperationCanceledException)
48                                 inbox.Stop();
49                             Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
50                         }
51                     });
52
53                 };
54                 loop();
55             });
56         }
57
58         public void RestartInterruptedFiles()
59         {
60             StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
61             var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
62             var filesQuery = from state in FileState.Queryable
63                              where interruptedStates.Contains(state.OverlayStatus)
64                              select new WorkflowState
65                              {
66                                  Path = state.FilePath.ToLower(),
67                                  FileName = Path.GetFileName(state.FilePath).ToLower(),
68                                  Hash = state.Checksum,
69                                  Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
70                                                    FileStatus.Created :
71                                                    FileStatus.Modified,
72                                  TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
73                                                    WatcherChangeTypes.Created :
74                                                    WatcherChangeTypes.Changed
75                              };
76             _agent.AddFromEnumerable(filesQuery);
77
78         }
79
80
81         private void Process(WorkflowState state)
82         {
83             if (state.Skip)
84                 return;
85             string path = state.Path.ToLower();
86             string fileName = Path.GetFileName(path);
87
88             //Bypass deleted files, unless the status is Deleted
89             if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
90             {
91                 state.Skip = true;
92                 this.StatusKeeper.RemoveFileOverlayStatus(path);
93                 return;
94             }
95
96             switch (state.Status)
97             {
98                 case FileStatus.Created:
99                 case FileStatus.Modified:
100                     var info = new FileInfo(path);
101                     NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty));
102                     break;
103                 case FileStatus.Deleted:
104                     NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName}));                    
105                     break;
106                 case FileStatus.Renamed:
107                     NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
108                     break;
109             }
110
111             return;
112         }
113
114        
115
116         public void Post(WorkflowState workflowState)
117         {
118             _agent.Post(workflowState);
119         }
120     }
121 }