Revision b5061ac8 trunk/Pithos.Core/PithosMonitor.cs

b/trunk/Pithos.Core/PithosMonitor.cs
79 79
            string path = Settings.PithosPath;
80 80
            var proxyUri = ProxyFromSettings();            
81 81
            CloudClient.Proxy = proxyUri;
82
            IndexLocalFiles(path);
82 83
            StartMonitoringFiles(path);
83 84

  
84 85
            StartStatusService();
......
105 106
            return null;
106 107
        }
107 108

  
109
        private void IndexLocalFiles(string path)
110
        {
111
            Trace.TraceInformation("[START] Inxed Local");
112
            try
113
            {
114
                var files =
115
                    from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel()
116
                    select filePath;
117
                StatusKeeper.StoreUnversionedFiles(files);
118

  
119
                var newFiles= FileState.FindAllByProperty("OverlayStatus", FileOverlayStatus.Unversioned)
120
                            .Select(state=>state.FilePath);
121
                foreach (var newFile in newFiles)
122
                {
123
                    _uploadEvents.Add(new WorkflowState
124
                                          {
125
                                              Path = newFile,
126
                                              FileName = Path.GetFileName(newFile),
127
                                              TriggeringChange = WatcherChangeTypes.Created
128
                                          });
129
                }
130

  
131
            }
132
            catch (Exception exc)
133
            {
134
                Trace.TraceError("[ERROR] Index Local - {0}", exc);
135
            }
136
            finally
137
            {
138
                Trace.TraceInformation("[END] Inxed Local");
139
            }
140
        }
141

  
108 142
        private void StartStatusService()
109 143
        {
110 144
            // Create a ServiceHost for the CalculatorService type and provide the base address.
......
159 193
                
160 194
                CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
161 195

  
162
                StartListening();
196
                StartListening(Settings.PithosPath);
163 197
                StartSending();
164 198
            }
165 199
            catch (Exception)
......
193 227
                Action = action;
194 228
                LocalFile = localFile;
195 229
                CloudFile = cloudFile;
196
                LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
230
                LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
197 231
            }
198 232
            
199 233
        }
......
227 261

  
228 262
        private Timer timer;
229 263

  
230
        private void StartListening()
264
        private void StartListening(string accountPath)
231 265
        {
232 266
            
233
            Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS"))
267
            ProcessRemoteFiles(accountPath);
268

  
269
            Task.Factory.StartNew(ProcessListenerActions);
270
                        
271
        }
272

  
273
        private Task ProcessRemoteFiles(string accountPath)
274
        {
275
            Trace.TraceInformation("[LISTENER] Scheduled");    
276
            return Task.Factory.StartNewDelayed(10000)
277
                .ContinueWith(t=>CloudClient.ListObjects("PITHOS"))
234 278
                .ContinueWith(task =>
235 279
                                  {
236
                                      
237
                                      var objects = task.Result;
238
                                      if (objects.Count == 0)
280
                                      Trace.TraceInformation("[LISTENER] Start Processing");
281

  
282
                                      var remoteObjects = task.Result;
283
/*
284
                                      if (remoteObjects.Count == 0)
239 285
                                          return;
286
*/
240 287

  
241
                                      var pithosDir = new DirectoryInfo(Settings.PithosPath);
288
                                      var pithosDir = new DirectoryInfo(accountPath);
242 289
                                      
243
                                      var upFiles = from info in objects
290
                                      var remoteFiles = from info in remoteObjects
244 291
                                                    select info.Name;
245 292
                                      
246 293
                                      var onlyLocal = from localFile in pithosDir.EnumerateFiles()
247
                                                      where !upFiles.Contains(localFile.Name) 
294
                                                      where !remoteFiles.Contains(localFile.Name) 
248 295
                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
249 296
                                      
250 297
                                      
251 298
                                    
252 299

  
253 300
                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
254
                                      var onlyRemote = from upFile in objects
301
                                      var onlyRemote = from upFile in remoteObjects
255 302
                                                       where !localNames.Contains(upFile.Name)
256 303
                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
257 304

  
258 305

  
259
                                      var existingObjects = from  upFile in objects
306
                                      var commonObjects = from  upFile in remoteObjects
260 307
                                                            join  localFile in pithosDir.EnumerateFiles()
261
                                                            on upFile.Name equals localFile.Name 
262
                                                       select new ListenerAction(CloudActionType.Download, localFile, upFile);
308
                                                                on upFile.Name equals localFile.Name 
309
                                                            select new ListenerAction(CloudActionType.Download, localFile, upFile);
263 310

  
264 311
                                      var uniques =
265
                                          onlyLocal.Union(onlyRemote).Union(existingObjects)
266
                                          .Except(_listenerActions,new LocalFileComparer());
267

  
312
                                          onlyLocal.Union(onlyRemote).Union(commonObjects)
313
                                              .Except(_listenerActions,new LocalFileComparer());
314
                                      
268 315
                                      _listenerActions.AddFromEnumerable(uniques, false);
269
                                     
270
                                 }
271
                );
272 316

  
273
            Task.Factory.StartNew(() =>
274
                                      {
275
                                          foreach (var action in _listenerActions.GetConsumingEnumerable())
276
                                          {
277
                                              var localFile = action.LocalFile;
278
                                              var cloudFile = action.CloudFile;
279
                                              var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name);
280
                                              try
281
                                              {
282
                                                  switch (action.Action)
283
                                                  {
284
                                                      case CloudActionType.UploadUnconditional:
285

  
286
                                                          UploadCloudFile(localFile.Name, localFile.Length,
287
                                                                          localFile.FullName, action.LocalHash.Value);
288
                                                          break;
289
                                                      case CloudActionType.DownloadUnconditional:
290
                                                          DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath);
291
                                                          break;
292
                                                      case CloudActionType.Download:
293
                                                          if (File.Exists(downloadPath))
294
                                                          {
295
                                                              if (cloudFile.Hash != action.LocalHash.Value)
296
                                                              {
297
                                                                  var lastLocalTime = localFile.LastWriteTime;
298
                                                                  var lastUpTime = cloudFile.Last_Modified;
299
                                                                  if (lastUpTime <= lastLocalTime)
300
                                                                  {
301
                                                                      //Files in conflict
302
                                                                      StatusKeeper.SetFileOverlayStatus(downloadPath,
303
                                                                                                        FileOverlayStatus
304
                                                                                                            .Conflict);
305
                                                                  }
306
                                                                  else
307
                                                                      DownloadCloudFile("PITHOS", action.CloudFile.Name,
308
                                                                                        downloadPath);
309
                                                              }
310
                                                          }
311
                                                          else
312
                                                              DownloadCloudFile("PITHOS", action.CloudFile.Name,
313
                                                                                downloadPath);
314
                                                          break;
315
                                                  }
316
                                              }
317
                                              catch (Exception exc)
318
                                              {
319
                                                  Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile);
320
                                                  Debug.WriteLine(exc.ToString());
321
                                                  _listenerActions.Add(action);
322
                                              }
323
                                          }
324
                                      }
325
                );
326
            
327
            timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
328
            
317
                                      Trace.TraceInformation("[LISTENER] End Processing");
318
                                      
319
                                  }
320
                ).ContinueWith(t=>
321
                {
322
                    if (t.IsFaulted)
323
                    {
324
                        Trace.TraceError("[LISTENER] Exception: {0}",t.Exception);                                           
325
                    }
326
                    else
327
                    {
328
                        Trace.TraceInformation("[LISTENER] Finished");                                           
329
                    }                    
330
                    ProcessRemoteFiles(accountPath);
331
                });
332
        }
333

  
334
        private void ProcessListenerActions()
335
        {
336
            foreach(var action in _listenerActions.GetConsumingEnumerable())
337
            {
338
                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
339
                var localFile = action.LocalFile;
340
                var cloudFile = action.CloudFile;
341
                var downloadPath = (cloudFile == null)? String.Empty
342
                                        : Path.Combine(Settings.PithosPath,cloudFile.Name);
343
                try
344
                {
345
                    switch (action.Action)
346
                    {
347
                        case CloudActionType.UploadUnconditional:
348

  
349
                            UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value);
350
                            break;
351
                        case CloudActionType.DownloadUnconditional:
352
                            DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath);
353
                            break;
354
                        case CloudActionType.Download:
355
                            if (File.Exists(downloadPath))
356
                            {
357
                                if (cloudFile.Hash !=action.LocalHash.Value)
358
                                {
359
                                    var lastLocalTime =localFile.LastWriteTime;
360
                                    var lastUpTime =cloudFile.Last_Modified;
361
                                    if (lastUpTime <=lastLocalTime)
362
                                    {
363
                                        //Files in conflict
364
                                        StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
365
                                    }
366
                                    else
367
                                        DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
368
                                }
369
                            }
370
                            else
371
                                DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath);
372
                            break;
373
                    }
374
                    Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile);
375
                }
376
                catch (Exception exc)
377
                {
378
                    Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
379
                                    action.Action, action.LocalFile,action.CloudFile,exc);
380
                    Trace.TraceError(exc.ToString());
381

  
382
                    _listenerActions.Add(action);
383
                }
384
            }
329 385
        }
330 386

  
331 387
        private void DownloadCloudFile(string container, string fileName, string localPath)
......
446 502
            if ( hash != info.Hash)
447 503
            {
448 504
                this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
449
                
450
                    CloudClient.PutObject("PITHOS", fileName, path);
451
                
505

  
506
                CloudClient.PutObject("PITHOS", fileName, path, hash).Wait();
452 507
            }
453 508
            this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
454 509
            this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
......
521 576
                return state;
522 577

  
523 578
            string path = state.Path;
524
            string hash = CalculateHash(path);
579
            string hash = Signature.CalculateHash(path);
525 580

  
526 581
            StatusKeeper.UpdateFileChecksum(path, hash);
527 582

  
......
529 584
            return state;
530 585
        }
531 586

  
532
        private static string CalculateHash(string path)
533
        {
534
            string hash;
535
            using (var hasher = MD5.Create())
536
            using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true))
537
            {
538
                var hashBytes = hasher.ComputeHash(stream);
539
                var hashBuilder = new StringBuilder();
540
                foreach (byte b in hasher.ComputeHash(stream))
541
                    hashBuilder.Append(b.ToString("x2").ToLower());
542
                hash = hashBuilder.ToString();
543

  
544
            }
545
            return hash;
546
        }
587
       
547 588

  
548 589
        private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
549 590
        {

Also available in: Unified diff