Revision 283809f3 trunk/Pithos.Core/PithosMonitor.cs
b/trunk/Pithos.Core/PithosMonitor.cs | ||
---|---|---|
272 | 272 |
} |
273 | 273 |
} |
274 | 274 |
|
275 |
private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
|
|
275 |
private BlockingCollection<ListenerAction> _networkActions=new BlockingCollection<ListenerAction>();
|
|
276 | 276 |
|
277 | 277 |
private Timer timer; |
278 | 278 |
|
... | ... | |
303 | 303 |
var pithosDir = new DirectoryInfo(accountPath); |
304 | 304 |
|
305 | 305 |
var remoteFiles = from info in remoteObjects |
306 |
select info.Name; |
|
306 |
select info.Name.ToLower();
|
|
307 | 307 |
|
308 | 308 |
var onlyLocal = from localFile in pithosDir.EnumerateFiles() |
309 |
where !remoteFiles.Contains(localFile.Name) |
|
309 |
where !remoteFiles.Contains(localFile.Name.ToLower())
|
|
310 | 310 |
select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null); |
311 |
|
|
312 |
|
|
313 |
|
|
314 | 311 |
|
315 |
var localNames =pithosDir.EnumerateFiles().Select(info => info.Name); |
|
312 |
|
|
313 |
|
|
314 |
var localNames = from info in pithosDir.EnumerateFiles() |
|
315 |
select info.Name.ToLower(); |
|
316 |
|
|
316 | 317 |
var onlyRemote = from upFile in remoteObjects |
317 |
where !localNames.Contains(upFile.Name) |
|
318 |
where !localNames.Contains(upFile.Name.ToLower())
|
|
318 | 319 |
select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile); |
319 | 320 |
|
320 | 321 |
|
321 | 322 |
var commonObjects = from upFile in remoteObjects |
322 | 323 |
join localFile in pithosDir.EnumerateFiles() |
323 |
on upFile.Name equals localFile.Name
|
|
324 |
on upFile.Name.ToLower() equals localFile.Name.ToLower()
|
|
324 | 325 |
select new ListenerAction(CloudActionType.Download, localFile, upFile); |
325 | 326 |
|
326 | 327 |
var uniques = |
327 | 328 |
onlyLocal.Union(onlyRemote).Union(commonObjects) |
328 |
.Except(_listenerActions,new LocalFileComparer());
|
|
329 |
.Except(_networkActions,new LocalFileComparer());
|
|
329 | 330 |
|
330 |
_listenerActions.AddFromEnumerable(uniques, false);
|
|
331 |
_networkActions.AddFromEnumerable(uniques, false);
|
|
331 | 332 |
|
332 | 333 |
Trace.TraceInformation("[LISTENER] End Processing"); |
333 | 334 |
|
... | ... | |
348 | 349 |
|
349 | 350 |
private void ProcessListenerActions() |
350 | 351 |
{ |
351 |
foreach(var action in _listenerActions.GetConsumingEnumerable())
|
|
352 |
foreach(var action in _networkActions.GetConsumingEnumerable())
|
|
352 | 353 |
{ |
353 | 354 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile); |
354 | 355 |
var localFile = action.LocalFile; |
... | ... | |
393 | 394 |
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
394 | 395 |
action.Action, action.LocalFile,action.CloudFile,exc); |
395 | 396 |
|
396 |
_listenerActions.Add(action);
|
|
397 |
_networkActions.Add(action);
|
|
397 | 398 |
} |
398 | 399 |
} |
399 | 400 |
} |
400 | 401 |
|
401 |
private void DownloadCloudFile(string container, string fileName, string localPath) |
|
402 |
{ |
|
403 |
using (var upstream = CloudClient.GetObject(container, fileName)) |
|
404 |
using (var fileStream = File.OpenWrite(localPath)) |
|
405 |
{ |
|
406 |
upstream.CopyTo(fileStream); |
|
407 |
} |
|
408 |
} |
|
402 |
|
|
409 | 403 |
|
410 | 404 |
private void StartMonitoringFiles(string path) |
411 | 405 |
{ |
... | ... | |
416 | 410 |
_watcher.Renamed += OnRenameEvent; |
417 | 411 |
_watcher.EnableRaisingEvents = true; |
418 | 412 |
|
419 |
Task.Factory.StartNew(() => |
|
420 |
{ |
|
421 |
foreach (var state in _fileEvents.GetConsumingEnumerable()) |
|
422 |
{ |
|
423 |
try |
|
424 |
{ |
|
425 |
UpdateFileStatus(state); |
|
426 |
UpdateOverlayStatus(state); |
|
427 |
UpdateFileChecksum(state); |
|
428 |
_uploadEvents.Add(state); |
|
429 |
} |
|
430 |
catch (OperationCanceledException) |
|
431 |
{ |
|
432 |
throw; |
|
433 |
} |
|
434 |
catch(Exception ex) |
|
435 |
{} |
|
436 |
} |
|
437 |
|
|
438 |
},_cancellationSource.Token); |
|
413 |
Task.Factory.StartNew(ProcesFileEvents,_cancellationSource.Token); |
|
414 |
} |
|
415 |
|
|
416 |
private void ProcesFileEvents() |
|
417 |
{ |
|
418 |
foreach (var state in _fileEvents.GetConsumingEnumerable()) |
|
419 |
{ |
|
420 |
try |
|
421 |
{ |
|
422 |
var networkState=StatusKeeper.GetNetworkState(state.Path); |
|
423 |
//Skip if the file is already being downloaded or uploaded and |
|
424 |
//the change is create or modify |
|
425 |
if (networkState != NetworkState.None && |
|
426 |
( |
|
427 |
state.TriggeringChange==WatcherChangeTypes.Created || |
|
428 |
state.TriggeringChange==WatcherChangeTypes.Changed |
|
429 |
)) |
|
430 |
continue; |
|
431 |
UpdateFileStatus(state); |
|
432 |
UpdateOverlayStatus(state); |
|
433 |
UpdateFileChecksum(state); |
|
434 |
_uploadEvents.Add(state); |
|
435 |
} |
|
436 |
catch (OperationCanceledException exc) |
|
437 |
{ |
|
438 |
Trace.TraceError("[ERROR] File Event Processing:\r{0}", exc); |
|
439 |
throw; |
|
440 |
} |
|
441 |
catch (Exception exc) |
|
442 |
{ |
|
443 |
Trace.TraceError("[ERROR] File Event Processing:\r{0}",exc); |
|
444 |
} |
|
445 |
} |
|
439 | 446 |
} |
440 | 447 |
|
441 | 448 |
private void StartSending() |
... | ... | |
510 | 517 |
this.StatusKeeper.RemoveFileOverlayStatus(fileName); |
511 | 518 |
} |
512 | 519 |
|
520 |
private void DownloadCloudFile(string container, string fileName, string localPath) |
|
521 |
{ |
|
522 |
StatusKeeper.SetNetworkState(localPath,NetworkState.Downloading); |
|
523 |
CloudClient.GetObject(container, fileName, localPath) |
|
524 |
.ContinueWith(t=> |
|
525 |
CloudClient.GetObjectInfo(container,fileName)) |
|
526 |
.ContinueWith(t=> |
|
527 |
StatusKeeper.StoreInfo(fileName,t.Result)) |
|
528 |
.ContinueWith(t=> |
|
529 |
StatusKeeper.SetNetworkState(localPath,NetworkState.None)) |
|
530 |
.Wait(); |
|
531 |
} |
|
532 |
|
|
513 | 533 |
private void UploadCloudFile(string fileName, long fileSize, string path,string hash) |
514 | 534 |
{ |
515 | 535 |
Contract.Requires(!Path.IsPathRooted(fileName)); |
516 |
//Even if GetObjectInfo times out, we can proceed with the upload |
|
536 |
|
|
537 |
StatusKeeper.SetNetworkState(fileName,NetworkState.Uploading); |
|
538 |
|
|
539 |
//Even if GetObjectInfo times out, we can proceed with the upload |
|
517 | 540 |
var info = CloudClient.GetObjectInfo("PITHOS", fileName); |
518 | 541 |
Task.Factory.StartNew(() => |
519 | 542 |
{ |
... | ... | |
524 | 547 |
.ContinueWith(t => |
525 | 548 |
CloudClient.PutObject("PITHOS", fileName, path, hash)); |
526 | 549 |
} |
550 |
else |
|
551 |
{ |
|
552 |
this.StatusKeeper.StoreInfo(path,info); |
|
553 |
} |
|
527 | 554 |
} |
528 | 555 |
) |
529 |
.ContinueWith(t =>{
|
|
530 |
this.StatusKeeper.SetFileStatus(path, FileStatus.Unchanged);
|
|
531 |
this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Normal);
|
|
532 |
})
|
|
556 |
.ContinueWith(t =>
|
|
557 |
this.StatusKeeper.SetFileState(path, FileStatus.Unchanged, FileOverlayStatus.Normal))
|
|
558 |
.ContinueWith(t=>
|
|
559 |
this.StatusKeeper.SetNetworkState(path,NetworkState.None))
|
|
533 | 560 |
.Wait(); |
534 | 561 |
Workflow.RaiseChangeNotification(path); |
535 | 562 |
} |
Also available in: Unified diff