Added processing of batch files
authorpkanavos <pkanavos@gmail.com>
Thu, 14 Jun 2012 20:38:29 +0000 (23:38 +0300)
committerpkanavos <pkanavos@gmail.com>
Thu, 14 Jun 2012 20:38:29 +0000 (23:38 +0300)
trunk/Pithos.Core/Agents/FileAgent.cs
trunk/Pithos.Core/Agents/PollAgent.cs

index 5b9fb0c..978ace2 100644 (file)
@@ -83,7 +83,9 @@ namespace Pithos.Core.Agents
 
         private void ProcessBatchedEvents(Dictionary<string, FileSystemEventArgs[]> fileEvents)
         {
-            PollAgent.SynchNow();
+            var paths = fileEvents.Keys;
+
+            PollAgent.SynchNow(paths);
         }
 
 /*
index 3336406..476f904 100644 (file)
@@ -204,11 +204,13 @@ namespace Pithos.Core.Agents
         /// <summary>\r
         /// Start a manual synchronization\r
         /// </summary>\r
-        public void SynchNow()\r
+        public void SynchNow(IEnumerable<string> paths=null)\r
         {            \r
+            _batchQueue.Enqueue(paths);\r
             _syncEvent.Set();\r
         }\r
 \r
+        readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>();\r
 \r
         /// <summary>\r
         /// Remote files are polled periodically. Any changes are processed\r
@@ -233,8 +235,23 @@ namespace Pithos.Core.Agents
                     await _unPauseEvent.WaitAsync();\r
                     UpdateStatus(PithosStatus.PollSyncing);\r
 \r
-                    var tasks = from accountInfo in _accounts.Values\r
-                                select ProcessAccountFiles(accountInfo, since);\r
+                    var accountBatches=new Dictionary<Uri, IEnumerable<string>>();\r
+                    IEnumerable<string> batch = null;\r
+                    if (_batchQueue.TryDequeue(out batch) && batch != null)\r
+                        foreach (var account in _accounts.Values)\r
+                        {\r
+                            var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath));\r
+                            accountBatches[account.AccountKey] = accountBatch;\r
+                        }\r
+\r
+\r
+                    IEnumerable<Task<DateTime?>> tasks = new List<Task<DateTime?>>();\r
+                    foreach(var accountInfo in _accounts.Values)\r
+                    {\r
+                        IEnumerable<string> accountBatch ;\r
+                        accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch);\r
+                        ProcessAccountFiles (accountInfo, accountBatch, since);\r
+                    }\r
 \r
                     var nextTimes=await TaskEx.WhenAll(tasks.ToList());\r
 \r
@@ -298,7 +315,7 @@ namespace Pithos.Core.Agents
             return since;\r
         }\r
 \r
-        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)\r
+        public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, DateTime? since = null)\r
         {\r
             if (accountInfo == null)\r
                 throw new ArgumentNullException("accountInfo");\r
@@ -414,8 +431,9 @@ namespace Pithos.Core.Agents
 \r
                         var tuples = MergeSources(infos, files, states).ToList();\r
 \r
-                        \r
-                        foreach (var tuple in tuples)\r
+\r
+                        var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));\r
+                        foreach (var tuple in stateTuples)\r
                         {\r
                             await _unPauseEvent.WaitAsync();\r
 \r