Revision 5e31048f trunk/Pithos.Core/Agents/WorkflowAgent.cs

b/trunk/Pithos.Core/Agents/WorkflowAgent.cs
7 7
using System.Linq;
8 8
using System.Text;
9 9
using System.Threading.Tasks;
10
using System.Threading.Tasks.Dataflow;
10 11
using Pithos.Interfaces;
11 12
using Pithos.Network;
12 13
using log4net;
......
16 17
    [Export]
17 18
    public class WorkflowAgent
18 19
    {
19
        Agent<WorkflowState> _agent;
20
        private ActionBlock<WorkflowState> _agent;
20 21
                
21 22
        public IStatusNotification StatusNotification { get; set; }
22 23
        [Import]
......
25 26
        [Import]
26 27
        public NetworkAgent NetworkAgent { get; set; }
27 28

  
29
        public ActionBlock<WorkflowState> Agent
30
        {
31
            get { return _agent; }
32
        }
33

  
28 34
        private static readonly ILog Log = LogManager.GetLogger("WorkflowAgent");
29 35

  
30 36
        public void Start()
31 37
        {
32
            _agent = Agent<WorkflowState>.Start(inbox =>
38
            /*_agent = new ActionBlock<WorkflowState>(message =>
33 39
            {
34 40
                Action loop = null;
35 41
                loop = () =>
36 42
                {
37
                    var message = inbox.Receive();
43
                    //var message = inbox.Receive();
44
                    Process(message);
38 45
                    var process = message.Then(Process, inbox.CancellationToken);                        
39 46
                    inbox.LoopAsync(process,loop,ex=>
40 47
                            Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.Result.FileName, ex));
41 48
                };
42 49
                loop();
43
            });
50
            });*/
51
            _agent = new ActionBlock<WorkflowState>(async message =>
52
            {
53
                try
54
                {
55
                    var action=await TaskEx.Run(()=>Process(message));
56
                    if (action!=null)
57
                        NetworkAgent.Post(action);
58
                }
59
                catch (Exception ex)
60
                {
61
                    Log.ErrorFormat("[ERROR] Synch for {0}:\r{1}", message.FileName, ex);                    
62
                }
63
            });            
44 64
        }
45 65

  
46
        private Task<object> Process(WorkflowState state)
66
        private CloudAction Process(WorkflowState state)
47 67
        {
48 68
            var accountInfo = state.AccountInfo;
49 69
            using (log4net.ThreadContext.Stacks["Workflow"].Push("Process"))
......
54 74
                {
55 75
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipping {0}",state.FileName);
56 76
                    
57
                    return CompletedTask<object>.Default;
77
                    return null;
58 78
                }
59 79
                string path = state.Path.ToLower();
60 80

  
......
66 86
                    
67 87
                    if (Log.IsDebugEnabled) Log.DebugFormat("Skipped missing {0}", state.FileName);
68 88

  
69
                    return CompletedTask<object>.Default;
89
                    return null;
70 90
                }
71 91

  
72 92
                var fileState = FileState.FindByFilePath(path);
......
77 97
                {
78 98
                    case FileStatus.Created:
79 99
                    case FileStatus.Modified:
80
                        NetworkAgent.Post(new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
81
                            accountInfo.BlockHash));
82
                        break;
100
                        return new CloudUploadAction(accountInfo,info, fileState, accountInfo.BlockSize,
101
                            accountInfo.BlockHash);
83 102
                    case FileStatus.Deleted:                       
84
                        NetworkAgent.Post(new CloudDeleteAction(accountInfo,info, fileState));
85
                        break;
103
                        return new CloudDeleteAction(accountInfo,info, fileState);
86 104
                    case FileStatus.Renamed:
87
                        NetworkAgent.Post(new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, new FileInfo(state.OldPath),
88
                                                              new FileInfo(state.Path)));
89
                        break;
105
                        return new CloudMoveAction(accountInfo,CloudActionType.RenameCloud, 
106
                            new FileInfo(state.OldPath),
107
                            new FileInfo(state.Path));
90 108
                }
91

  
92
                return CompletedTask<object>.Default;
109
                return null;
93 110
            }
94 111
        }
95 112

  
......
146 163
        {
147 164
            if (Log.IsDebugEnabled)
148 165
                Log.DebugFormat("Posted {0} {1} {2}", workflowState.Path, workflowState.Status, workflowState.TriggeringChange);
149
            _agent.Post(workflowState);
166
            Agent.Post(workflowState);
150 167
        }
151 168
    }
152 169
}

Also available in: Unified diff