Revision 4ec636f6 trunk/Pithos.Core/Agents/FileAgent.cs

b/trunk/Pithos.Core/Agents/FileAgent.cs
7 7
using System.Linq;
8 8
using System.Text;
9 9
using System.Threading.Tasks;
10
using System.Threading.Tasks.Dataflow;
11 10
using Pithos.Interfaces;
12 11
using Pithos.Network;
13 12
using log4net;
......
15 14

  
16 15
namespace Pithos.Core.Agents
17 16
{
18
    [Export]
17
//    [Export]
19 18
    public class FileAgent
20 19
    {
21
        //Agent<WorkflowState> _agent;
22
        TransformBlock<WorkflowState, WorkflowState> _agent;
20
        Agent<WorkflowState> _agent;
23 21
        private FileSystemWatcher _watcher;
24 22

  
25
        [Import]
23
        //[Import]
26 24
        public IStatusKeeper StatusKeeper { get; set; }
27
        [Import]
25
        //[Import]
28 26
        public IPithosWorkflow Workflow { get; set; }
29
        [Import]
27
        //[Import]
30 28
        public WorkflowAgent WorkflowAgent { get; set; }
31 29

  
32 30
        private AccountInfo AccountInfo { get; set; }
......
35 33

  
36 34
        private static readonly ILog Log = LogManager.GetLogger("FileAgent");
37 35

  
38
        public FileAgent()
36
        public void Start(AccountInfo accountInfo,string rootPath)
39 37
        {
40
            _agent = new TransformBlock<WorkflowState, WorkflowState>(async message => 
41
                await TaskEx.Run(()=>Process(message)));
38
            if (accountInfo==null)
39
                throw new ArgumentNullException("accountInfo");
40
            if (String.IsNullOrWhiteSpace(rootPath))
41
                throw new ArgumentNullException("rootPath");
42
            if (!Path.IsPathRooted(rootPath))
43
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
44
            Contract.EndContractBlock();
45

  
46
            AccountInfo = accountInfo;
47
            RootPath = rootPath;
48
            _watcher = new FileSystemWatcher(rootPath);
49
            _watcher.IncludeSubdirectories = true;            
50
            _watcher.Changed += OnFileEvent;
51
            _watcher.Created += OnFileEvent;
52
            _watcher.Deleted += OnFileEvent;
53
            _watcher.Renamed += OnRenameEvent;
54
            _watcher.EnableRaisingEvents = true;
55

  
56

  
57
            _agent = Agent<WorkflowState>.Start(inbox =>
58
            {
59
                Action loop = null;
60
                loop = () =>
61
                {
62
                    var message = inbox.Receive();
63
                    var process=message.Then(Process,inbox.CancellationToken);
64

  
65
                    inbox.LoopAsync(process,loop,ex=>
66
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
67
                };
68
                loop();
69
            });
42 70
        }
43 71

  
44
        private WorkflowState Process(WorkflowState message)
72
        private Task<object> Process(WorkflowState state)
45 73
        {
46
            Debug.Assert(!Ignore(message.Path));
74
            if (state==null)
75
                throw new ArgumentNullException("state");
76
            Contract.EndContractBlock();
77

  
78
            Debug.Assert(!Ignore(state.Path));
79

  
80
            var networkState = NetworkGate.GetNetworkState(state.Path);
81
            //Skip if the file is already being downloaded or uploaded and 
82
            //the change is create or modify
83
            if (networkState != NetworkOperation.None &&
84
                (
85
                    state.TriggeringChange == WatcherChangeTypes.Created ||
86
                    state.TriggeringChange == WatcherChangeTypes.Changed
87
                ))
88
                return CompletedTask<object>.Default;
47 89

  
48 90
            try
49 91
            {
50
                //Skip if the file is already being downloaded or uploaded and 
51
                //the change is create or modify
52
                var networkState = NetworkGate.GetNetworkState(message.Path);
53
                if (networkState != NetworkOperation.None &&
54
                    (message.TriggeringChange == WatcherChangeTypes.Created ||
55
                     message.TriggeringChange == WatcherChangeTypes.Changed))
56
                    return null;
57

  
58
                UpdateFileStatus(message);
59
                UpdateOverlayStatus(message);
60
                UpdateFileChecksum(message);
61
                return message;
92
                UpdateFileStatus(state);
93
                UpdateOverlayStatus(state);
94
                UpdateFileChecksum(state);
95
                WorkflowAgent.Post(state);
62 96
            }
63 97
            catch (IOException exc)
64 98
            {
65
                if (File.Exists(message.Path))
99
                if (File.Exists(state.Path))
66 100
                {
67
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", message.Path, exc);
68
                    _agent.Post(message);
101
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
102
                    _agent.Post(state);
69 103
                }
70 104
                else
71 105
                {
72
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", message.Path, exc);
106
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
73 107
                }
74 108
            }
75 109
            catch (Exception exc)
76 110
            {
77
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped}\n{1}",
78
                               message.Path, exc);
111
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped\n{1}",
112
                               state.Path, exc);
79 113
            }
80

  
81
            return null;
114
            return CompletedTask<object>.Default;
82 115
        }
83 116

  
84
        public void Start(AccountInfo accountInfo,string rootPath)
85
        {
86
            if (accountInfo==null)
87
                throw new ArgumentNullException("accountInfo");
88
            if (String.IsNullOrWhiteSpace(rootPath))
89
                throw new ArgumentNullException("rootPath");
90
            if (!Path.IsPathRooted(rootPath))
91
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
92
            Contract.EndContractBlock();
93 117

  
94
            _agent.LinkTo(WorkflowAgent.Agent);            
95

  
96
            AccountInfo = accountInfo;
97
            RootPath = rootPath;
98
            _watcher = new FileSystemWatcher(rootPath);
99
            _watcher.IncludeSubdirectories = true;            
100
            _watcher.Changed += OnFileEvent;
101
            _watcher.Created += OnFileEvent;
102
            _watcher.Deleted += OnFileEvent;
103
            _watcher.Renamed += OnRenameEvent;
104
            _watcher.EnableRaisingEvents = true;
105
           
118
/*
119
        private Task Process(Task<WorkflowState> action)
120
        {
121
            return action.ContinueWith(t => Process(t.Result));
106 122
        }
123
*/
124

  
107 125

  
108 126
        public bool Pause
109 127
        {
......
147 165
            _watcher = null;
148 166

  
149 167
            if (_agent!=null)
150
                _agent.Complete();
168
                _agent.Stop();
151 169
        }
152 170

  
153 171
        // Enumerate all files in the Pithos directory except those in the Fragment folder

Also available in: Unified diff