Revision 5e31048f trunk/Pithos.Core/Agents/FileAgent.cs

b/trunk/Pithos.Core/Agents/FileAgent.cs
7 7
using System.Linq;
8 8
using System.Text;
9 9
using System.Threading.Tasks;
10
using System.Threading.Tasks.Dataflow;
10 11
using Pithos.Interfaces;
11 12
using Pithos.Network;
12 13
using log4net;
......
17 18
    [Export]
18 19
    public class FileAgent
19 20
    {
20
        Agent<WorkflowState> _agent;
21
        //Agent<WorkflowState> _agent;
22
        TransformBlock<WorkflowState, WorkflowState> _agent;
21 23
        private FileSystemWatcher _watcher;
22 24

  
23 25
        [Import]
......
33 35

  
34 36
        private static readonly ILog Log = LogManager.GetLogger("FileAgent");
35 37

  
36
        public void Start(AccountInfo accountInfo,string rootPath)
38
        public FileAgent()
37 39
        {
38
            if (accountInfo==null)
39
                throw new ArgumentNullException("accountInfo");
40
            if (String.IsNullOrWhiteSpace(rootPath))
41
                throw new ArgumentNullException("rootPath");
42
            if (!Path.IsPathRooted(rootPath))
43
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
44
            Contract.EndContractBlock();
45

  
46
            AccountInfo = accountInfo;
47
            RootPath = rootPath;
48
            _watcher = new FileSystemWatcher(rootPath);
49
            _watcher.IncludeSubdirectories = true;            
50
            _watcher.Changed += OnFileEvent;
51
            _watcher.Created += OnFileEvent;
52
            _watcher.Deleted += OnFileEvent;
53
            _watcher.Renamed += OnRenameEvent;
54
            _watcher.EnableRaisingEvents = true;
55

  
56

  
57
            _agent = Agent<WorkflowState>.Start(inbox =>
58
            {
59
                Action loop = null;
60
                loop = () =>
61
                {
62
                    var message = inbox.Receive();
63
                    var process=message.Then(Process,inbox.CancellationToken);
64

  
65
                    inbox.LoopAsync(process,loop,ex=>
66
                        Log.ErrorFormat("[ERROR] File Event Processing:\r{0}", ex));
67
                };
68
                loop();
69
            });
40
            _agent = new TransformBlock<WorkflowState, WorkflowState>(async message => 
41
                await TaskEx.Run(()=>Process(message)));
70 42
        }
71 43

  
72
        private Task<object> Process(WorkflowState state)
44
        private WorkflowState Process(WorkflowState message)
73 45
        {
74
            if (state==null)
75
                throw new ArgumentNullException("state");
76
            Contract.EndContractBlock();
77

  
78
            Debug.Assert(!Ignore(state.Path));
79

  
80
            var networkState = NetworkGate.GetNetworkState(state.Path);
81
            //Skip if the file is already being downloaded or uploaded and 
82
            //the change is create or modify
83
            if (networkState != NetworkOperation.None &&
84
                (
85
                    state.TriggeringChange == WatcherChangeTypes.Created ||
86
                    state.TriggeringChange == WatcherChangeTypes.Changed
87
                ))
88
                return CompletedTask<object>.Default;
46
            Debug.Assert(!Ignore(message.Path));
89 47

  
90 48
            try
91 49
            {
92
                UpdateFileStatus(state);
93
                UpdateOverlayStatus(state);
94
                UpdateFileChecksum(state);
95
                WorkflowAgent.Post(state);
50
                //Skip if the file is already being downloaded or uploaded and 
51
                //the change is create or modify
52
                var networkState = NetworkGate.GetNetworkState(message.Path);
53
                if (networkState != NetworkOperation.None &&
54
                    (message.TriggeringChange == WatcherChangeTypes.Created ||
55
                     message.TriggeringChange == WatcherChangeTypes.Changed))
56
                    return null;
57

  
58
                UpdateFileStatus(message);
59
                UpdateOverlayStatus(message);
60
                UpdateFileChecksum(message);
61
                return message;
96 62
            }
97 63
            catch (IOException exc)
98 64
            {
99
                if (File.Exists(state.Path))
65
                if (File.Exists(message.Path))
100 66
                {
101
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", state.Path, exc);
102
                    _agent.Post(state);
67
                    Log.WarnFormat("File access error occured, retrying {0}\n{1}", message.Path, exc);
68
                    _agent.Post(message);
103 69
                }
104 70
                else
105 71
                {
106
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", state.Path, exc);
72
                    Log.WarnFormat("File {0} does not exist. Will be ignored\n{1}", message.Path, exc);
107 73
                }
108 74
            }
109 75
            catch (Exception exc)
110 76
            {
111
                Log.WarnFormat("Error occured while indexing{0. The file will be skipped}\n{1}", state.Path, exc);
77
                Log.WarnFormat("Error occured while indexing{0}. The file will be skipped}\n{1}",
78
                               message.Path, exc);
112 79
            }
113
            return CompletedTask<object>.Default;
114
        }
115 80

  
81
            return null;
82
        }
116 83

  
117
/*
118
        private Task Process(Task<WorkflowState> action)
84
        public void Start(AccountInfo accountInfo,string rootPath)
119 85
        {
120
            return action.ContinueWith(t => Process(t.Result));
121
        }
122
*/
86
            if (accountInfo==null)
87
                throw new ArgumentNullException("accountInfo");
88
            if (String.IsNullOrWhiteSpace(rootPath))
89
                throw new ArgumentNullException("rootPath");
90
            if (!Path.IsPathRooted(rootPath))
91
                throw new ArgumentException("rootPath must be an absolute path","rootPath");
92
            Contract.EndContractBlock();
93

  
94
            _agent.LinkTo(WorkflowAgent.Agent);            
123 95

  
96
            AccountInfo = accountInfo;
97
            RootPath = rootPath;
98
            _watcher = new FileSystemWatcher(rootPath);
99
            _watcher.IncludeSubdirectories = true;            
100
            _watcher.Changed += OnFileEvent;
101
            _watcher.Created += OnFileEvent;
102
            _watcher.Deleted += OnFileEvent;
103
            _watcher.Renamed += OnRenameEvent;
104
            _watcher.EnableRaisingEvents = true;
105
           
106
        }
124 107

  
125 108
        public bool Pause
126 109
        {
......
164 147
            _watcher = null;
165 148

  
166 149
            if (_agent!=null)
167
                _agent.Stop();
150
                _agent.Complete();
168 151
        }
169 152

  
170 153
        // Enumerate all files in the Pithos directory except those in the Fragment folder

Also available in: Unified diff