Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / WorkflowAgent.cs @ a27aa447

History | View | Annotate | Download (5.3 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Text;
9
using Pithos.Interfaces;
10

    
11
namespace Pithos.Core.Agents
12
{
13
    [Export]
14
    public class WorkflowAgent
15
    {
16
        Agent<WorkflowState> _agent;
17
                
18
        public IStatusNotification StatusNotification { get; set; }
19
        [Import]
20
        public IStatusKeeper StatusKeeper { get; set; }
21

    
22
        //We should avoid processing files stored in the Fragments folder
23
        //The Full path to the fragments folder is stored in FragmentsPath
24
        public string FragmentsPath { get; set; }
25

    
26
        [Import]
27
        public NetworkAgent NetworkAgent { get; set; }
28

    
29
        public void Start()
30
        {
31
            _agent = Agent<WorkflowState>.Start(inbox =>
32
            {
33
                Action loop = null;
34
                loop = () =>
35
                {
36
                    var message = inbox.Receive();
37
                    var process = message.ContinueWith(t =>
38
                    {
39
                        var state = t.Result;
40
                        Process(state);
41
                        inbox.DoAsync(loop);
42
                    });
43
                    process.ContinueWith(t =>
44
                    {
45
                        inbox.DoAsync(loop);
46
                        if (t.IsFaulted)
47
                        {
48
                            var ex = t.Exception.InnerException;
49
                            if (ex is OperationCanceledException)
50
                                inbox.Stop();
51
                            Trace.TraceError("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex);
52
                        }
53
                    });
54

    
55
                };
56
                loop();
57
            });
58
        }
59

    
60
        public void RestartInterruptedFiles()
61
        {
62
            
63
            StatusNotification.NotifyChange("Restart processing interrupted files", TraceLevel.Verbose);
64
            var interruptedStates = new[] { FileOverlayStatus.Unversioned, FileOverlayStatus.Modified };
65

    
66
            var pendingEntries = (from state in FileState.Queryable
67
                                 where interruptedStates.Contains(state.OverlayStatus) &&
68
                                       !state.FilePath.StartsWith(FragmentsPath) &&
69
                                       !state.FilePath.EndsWith(".ignore")
70
                                 select state).ToList();
71
            var staleEntries = from state in pendingEntries
72
                                  where !File.Exists(state.FilePath)
73
                                  select state;
74
            var staleKeys = staleEntries.Select(state=>state.Id);
75
            FileState.DeleteAll(staleKeys);
76

    
77
            var validEntries = from state in pendingEntries.Except(staleEntries)
78
                             where File.Exists(state.FilePath)
79
                             select new WorkflowState
80
                             {
81
                                 Path = state.FilePath.ToLower(),
82
                                 FileName = Path.GetFileName(state.FilePath).ToLower(),
83
                                 Hash = state.Checksum,
84
                                 Status = state.OverlayStatus == FileOverlayStatus.Unversioned ?
85
                                                   FileStatus.Created :
86
                                                   FileStatus.Modified,
87
                                 TriggeringChange = state.OverlayStatus == FileOverlayStatus.Unversioned ?
88
                                                   WatcherChangeTypes.Created :
89
                                                   WatcherChangeTypes.Changed
90
                             };
91
            foreach (var entry in validEntries)
92
            {
93
                Post(entry);
94
            }            
95

    
96
        }       
97

    
98
        private void Process(WorkflowState state)
99
        {
100
            if (state.Skip)
101
                return;
102
            string path = state.Path.ToLower();
103
            string fileName = Path.GetFileName(path);
104

    
105
            //Bypass deleted files, unless the status is Deleted
106
            if (!(File.Exists(path) || state.Status != FileStatus.Deleted))
107
            {
108
                state.Skip = true;
109
                this.StatusKeeper.RemoveFileOverlayStatus(path);
110
                return;
111
            }
112
            var fileState = FileState.FindByFilePath(path);
113
            switch (state.Status)
114
            {
115
                case FileStatus.Created:
116
                case FileStatus.Modified:
117
                    var info = new FileInfo(path);
118
                    NetworkAgent.Post(new CloudAction(CloudActionType.UploadUnconditional, info, ObjectInfo.Empty,fileState));
119
                    break;
120
                case FileStatus.Deleted:
121
                    NetworkAgent.Post(new CloudAction(CloudActionType.DeleteCloud, null, new ObjectInfo {Name=fileName},fileState));                    
122
                    break;
123
                case FileStatus.Renamed:
124
                    NetworkAgent.Post(new CloudAction(CloudActionType.RenameCloud, state.OldFileName,state.OldPath,state.FileName,state.Path));                    
125
                    break;
126
            }
127

    
128
            return;
129
        }
130

    
131
       
132

    
133
        public void Post(WorkflowState workflowState)
134
        {
135
            _agent.Post(workflowState);
136
        }
137
    }
138
}