Revision 283809f3 trunk/Pithos.Core/PithosMonitor.cs

b/trunk/Pithos.Core/PithosMonitor.cs
272 272
            }
273 273
        }
274 274

  
275
        private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
275
        private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
276 276

  
277 277
        private Timer timer;
278 278

  
......
303 303
                                      var pithosDir = new DirectoryInfo(accountPath);
304 304
                                      
305 305
                                      var remoteFiles = from info in remoteObjects
306
                                                    select info.Name;
306
                                                    select info.Name.ToLower();
307 307
                                      
308 308
                                      var onlyLocal = from localFile in pithosDir.EnumerateFiles()
309
                                                      where !remoteFiles.Contains(localFile.Name) 
309
                                                      where !remoteFiles.Contains(localFile.Name.ToLower()) 
310 310
                                                      select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
311
                                      
312
                                      
313
                                    
314 311

  
315
                                      var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
312

  
313

  
314
                                      var localNames = from info in pithosDir.EnumerateFiles()
315
                                                           select info.Name.ToLower();
316

  
316 317
                                      var onlyRemote = from upFile in remoteObjects
317
                                                       where !localNames.Contains(upFile.Name)
318
                                                       where !localNames.Contains(upFile.Name.ToLower())
318 319
                                                       select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
319 320

  
320 321

  
321 322
                                      var commonObjects = from  upFile in remoteObjects
322 323
                                                            join  localFile in pithosDir.EnumerateFiles()
323
                                                                on upFile.Name equals localFile.Name 
324
                                                                on upFile.Name.ToLower() equals localFile.Name.ToLower() 
324 325
                                                            select new ListenerAction(CloudActionType.Download, localFile, upFile);
325 326

  
326 327
                                      var uniques =
327 328
                                          onlyLocal.Union(onlyRemote).Union(commonObjects)
328
                                              .Except(_listenerActions,new LocalFileComparer());
329
                                              .Except(_networkActions,new LocalFileComparer());
329 330
                                      
330
                                      _listenerActions.AddFromEnumerable(uniques, false);
331
                                      _networkActions.AddFromEnumerable(uniques, false);
331 332

  
332 333
                                      Trace.TraceInformation("[LISTENER] End Processing");
333 334
                                      
......
348 349

  
349 350
        private void ProcessListenerActions()
350 351
        {
351
            foreach(var action in _listenerActions.GetConsumingEnumerable())
352
            foreach(var action in _networkActions.GetConsumingEnumerable())
352 353
            {
353 354
                Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile);
354 355
                var localFile = action.LocalFile;
......
393 394
                    Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
394 395
                                    action.Action, action.LocalFile,action.CloudFile,exc);                    
395 396

  
396
                    _listenerActions.Add(action);
397
                    _networkActions.Add(action);
397 398
                }
398 399
            }
399 400
        }
400 401

  
401
        private void DownloadCloudFile(string container, string fileName, string localPath)
402
        {
403
            using (var upstream = CloudClient.GetObject(container, fileName))
404
            using (var fileStream = File.OpenWrite(localPath))
405
            {
406
                upstream.CopyTo(fileStream);
407
            }
408
        }
402
      
409 403

  
410 404
        private void StartMonitoringFiles(string path)
411 405
        {
......
416 410
            _watcher.Renamed += OnRenameEvent;
417 411
            _watcher.EnableRaisingEvents = true;
418 412

  
419
            Task.Factory.StartNew(() =>
420
                                      {
421
                                          foreach (var state in _fileEvents.GetConsumingEnumerable())
422
                                          {
423
                                              try
424
                                              {
425
                                                  UpdateFileStatus(state);
426
                                                  UpdateOverlayStatus(state);
427
                                                  UpdateFileChecksum(state);
428
                                                  _uploadEvents.Add(state);                                                  
429
                                              }
430
                                              catch (OperationCanceledException)
431
                                              {
432
                                                  throw;
433
                                              }
434
                                              catch(Exception ex)
435
                                              {}
436
                                          }
437
                                          
438
                                      },_cancellationSource.Token);
413
            Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token);
414
        }
415

  
416
        private void ProcesFileEvents()
417
        {
418
            foreach (var state in _fileEvents.GetConsumingEnumerable())
419
            {
420
                try
421
                {
422
                    var networkState=StatusKeeper.GetNetworkState(state.Path);
423
                    //Skip if the file is already being downloaded or uploaded and 
424
                    //the change is create or modify
425
                    if (networkState != NetworkState.None && 
426
                        (
427
                            state.TriggeringChange==WatcherChangeTypes.Created ||
428
                            state.TriggeringChange==WatcherChangeTypes.Changed
429
                        ))
430
                        continue;
431
                    UpdateFileStatus(state);
432
                    UpdateOverlayStatus(state);
433
                    UpdateFileChecksum(state);
434
                    _uploadEvents.Add(state);
435
                }
436
                catch (OperationCanceledException exc)
437
                {
438
                    Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc);
439
                    throw;
440
                }
441
                catch (Exception exc)
442
                {
443
                    Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc);
444
                }
445
            }
439 446
        }
440 447

  
441 448
        private void StartSending()
......
510 517
            this.StatusKeeper.RemoveFileOverlayStatus(fileName);            
511 518
        }
512 519

  
520
        private void DownloadCloudFile(string container, string fileName, string localPath)
521
        {
522
            StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading);
523
            CloudClient.GetObject(container, fileName, localPath)
524
            .ContinueWith(t=>
525
                CloudClient.GetObjectInfo(container,fileName))
526
            .ContinueWith(t=>
527
                StatusKeeper.StoreInfo(fileName,t.Result))
528
            .ContinueWith(t=>
529
                StatusKeeper.SetNetworkState(localPath,NetworkState.None))
530
            .Wait();
531
        }
532

  
513 533
        private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
514 534
        {
515 535
            Contract.Requires(!Path.IsPathRooted(fileName));
516
            //Even if GetObjectInfo times out, we can proceed with the upload
536

  
537
            StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading);
538
            
539
            //Even if GetObjectInfo times out, we can proceed with the upload            
517 540
            var info = CloudClient.GetObjectInfo("PITHOS", fileName);
518 541
            Task.Factory.StartNew(() =>
519 542
            {
......
524 547
                    .ContinueWith(t => 
525 548
                        CloudClient.PutObject("PITHOS", fileName, path, hash));
526 549
                }
550
                else
551
                {
552
                    this.StatusKeeper.StoreInfo(path,info);
553
                }
527 554
            }
528 555
            )
529
            .ContinueWith(t =>{
530
                        this.StatusKeeper.SetFileStatus(path, FileStatus.Unchanged);
531
                        this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Normal);
532
            })
556
            .ContinueWith(t => 
557
                this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
558
                .ContinueWith(t=>
559
                    this.StatusKeeper.SetNetworkState(path,NetworkState.None))
533 560
            .Wait();
534 561
            Workflow.RaiseChangeNotification(path);
535 562
        }

Also available in: Unified diff