2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.ComponentModel.Composition;
5 using System.Diagnostics;
6 using System.Diagnostics.Contracts;
9 using System.Net.NetworkInformation;
10 using System.Security.Cryptography;
12 using System.Threading;
13 using System.Threading.Tasks;
14 using Microsoft.WindowsAPICodePack.Net;
15 using Pithos.Interfaces;
16 using System.ServiceModel;
20 [Export(typeof(PithosMonitor))]
21 public class PithosMonitor:IDisposable
24 public IPithosSettings Settings{get;set;}
27 public IStatusKeeper StatusKeeper { get; set; }
30 public IPithosWorkflow Workflow { get; set; }
33 public ICloudClient CloudClient { get; set; }
36 public ICloudClient CloudListeningClient { get; set; }
38 private ServiceHost _statusService { get; set; }
40 private FileSystemWatcher _watcher;
44 get { return _watcher == null || !_watcher.EnableRaisingEvents; }
48 _watcher.EnableRaisingEvents = !value;
53 CancellationTokenSource _cancellationSource;
55 readonly BlockingCollection<WorkflowState> _fileEvents = new BlockingCollection<WorkflowState>();
56 readonly BlockingCollection<WorkflowState> _uploadEvents = new BlockingCollection<WorkflowState>();
63 if (_cancellationSource != null)
65 if (!_cancellationSource.IsCancellationRequested)
68 _cancellationSource = new CancellationTokenSource();
70 string path = Settings.PithosPath;
71 StartMonitoringFiles(path);
78 private void StartStatusService()
80 Uri baseAddress = new Uri("http://localhost:30000/pithos/statuscache");
81 string address = "net.pipe://localhost/pithos/statuscache";
83 // Create a ServiceHost for the CalculatorService type and provide the base address.
84 _statusService= new ServiceHost(typeof(StatusService), baseAddress);
86 NetNamedPipeBinding binding = new NetNamedPipeBinding(NetNamedPipeSecurityMode.None);
87 _statusService.AddServiceEndpoint(typeof(IStatusService), binding, address);
88 _statusService.Open();
91 private void StopStatusService()
93 if (_statusService == null)
96 if (_statusService.State == CommunicationState.Faulted)
97 _statusService.Abort();
98 else if (_statusService.State != CommunicationState.Closed)
99 _statusService.Close();
100 _statusService = null;
105 private void StartNetwork()
108 bool connected = NetworkListManager.IsConnectedToInternet;
109 //If we are not connected retry later
112 Task.Factory.StartNewDelayed(10000, StartNetwork);
118 CloudClient.Authenticate(Settings.UserName, Settings.ApiKey);
125 //Faild to authenticate due to network or account error
126 //Retry after a while
127 Task.Factory.StartNewDelayed(10000, StartNetwork);
131 internal enum CloudActionType
136 DownloadUnconditional,
141 internal class ListenerAction
143 public CloudActionType Action { get; set; }
144 public FileInfo LocalFile { get; set; }
145 public ObjectInfo CloudFile { get; set; }
147 public Lazy<string> LocalHash { get; set; }
149 public ListenerAction(CloudActionType action, FileInfo localFile, ObjectInfo cloudFile)
152 LocalFile = localFile;
153 CloudFile = cloudFile;
154 LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
159 internal class LocalFileComparer:EqualityComparer<ListenerAction>
161 public override bool Equals(ListenerAction x, ListenerAction y)
163 if (x.Action != y.Action)
165 if (x.LocalFile != null && y.LocalFile != null && !x.LocalFile.FullName.Equals(y.LocalFile.FullName))
167 if (x.CloudFile != null && y.CloudFile != null && !x.CloudFile.Hash.Equals(y.CloudFile.Hash))
169 if (x.CloudFile == null ^ y.CloudFile == null ||
170 x.LocalFile == null ^ y.LocalFile == null)
175 public override int GetHashCode(ListenerAction obj)
177 var hash1 = (obj.LocalFile == null) ? int.MaxValue : obj.LocalFile.FullName.GetHashCode();
178 var hash2 = (obj.CloudFile == null) ? int.MaxValue : obj.CloudFile.Hash.GetHashCode();
179 var hash3 = obj.Action.GetHashCode();
180 return hash1 ^ hash2 & hash3;
184 private BlockingCollection<ListenerAction> _listenerActions=new BlockingCollection<ListenerAction>();
188 private void StartListening()
191 Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS"))
192 .ContinueWith(task =>
195 var objects = task.Result;
196 if (objects.Count == 0)
199 var pithosDir = new DirectoryInfo(Settings.PithosPath);
201 var upFiles = from info in objects
204 var onlyLocal = from localFile in pithosDir.EnumerateFiles()
205 where !upFiles.Contains(localFile.Name)
206 select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null);
211 var localNames =pithosDir.EnumerateFiles().Select(info => info.Name);
212 var onlyRemote = from upFile in objects
213 where !localNames.Contains(upFile.Name)
214 select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile);
217 var existingObjects = from upFile in objects
218 join localFile in pithosDir.EnumerateFiles()
219 on upFile.Name equals localFile.Name
220 select new ListenerAction(CloudActionType.Download, localFile, upFile);
223 onlyLocal.Union(onlyRemote).Union(existingObjects)
224 .Except(_listenerActions,new LocalFileComparer());
226 _listenerActions.AddFromEnumerable(uniques, false);
231 Task.Factory.StartNew(() =>
233 foreach (var action in _listenerActions.GetConsumingEnumerable())
235 var localFile = action.LocalFile;
236 var cloudFile = action.CloudFile;
237 var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name);
240 switch (action.Action)
242 case CloudActionType.UploadUnconditional:
244 UploadCloudFile(localFile.Name, localFile.Length,
245 localFile.FullName, action.LocalHash.Value);
247 case CloudActionType.DownloadUnconditional:
248 DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath);
250 case CloudActionType.Download:
251 if (File.Exists(downloadPath))
253 if (cloudFile.Hash != action.LocalHash.Value)
255 var lastLocalTime = localFile.LastWriteTime;
256 var lastUpTime = cloudFile.Last_Modified;
257 if (lastUpTime <= lastLocalTime)
260 StatusKeeper.SetFileOverlayStatus(downloadPath,
265 DownloadCloudFile("PITHOS", action.CloudFile.Name,
270 DownloadCloudFile("PITHOS", action.CloudFile.Name,
275 catch (Exception exc)
277 Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile);
278 Debug.WriteLine(exc.ToString());
279 _listenerActions.Add(action);
285 timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
289 private void DownloadCloudFile(string container, string fileName, string localPath)
291 using (var upstream = CloudClient.GetObject(container, fileName))
292 using (var fileStream = File.OpenWrite(localPath))
294 upstream.CopyTo(fileStream);
298 private void StartMonitoringFiles(string path)
300 _watcher = new FileSystemWatcher(path);
301 _watcher.Changed += OnFileEvent;
302 _watcher.Created += OnFileEvent;
303 _watcher.Deleted += OnFileEvent;
304 _watcher.Renamed += OnRenameEvent;
305 _watcher.EnableRaisingEvents = true;
307 Task.Factory.StartNew(() =>
309 foreach (var state in _fileEvents.GetConsumingEnumerable())
313 UpdateFileStatus(state);
314 UpdateOverlayStatus(state);
315 UpdateFileChecksum(state);
316 _uploadEvents.Add(state);
318 catch (OperationCanceledException)
326 },_cancellationSource.Token);
329 private void StartSending()
331 Task.Factory.StartNew(() =>
333 foreach (var state in _uploadEvents.GetConsumingEnumerable())
339 catch (OperationCanceledException)
347 },_cancellationSource.Token);
351 private WorkflowState SynchToCloud(WorkflowState state)
355 string path = state.Path;
356 string fileName = Path.GetFileName(path);
360 case FileStatus.Created:
361 case FileStatus.Modified:
362 var info = new FileInfo(path);
363 long fileSize = info.Length;
364 UploadCloudFile(fileName, fileSize, path,state.Hash);
366 case FileStatus.Deleted:
367 DeleteCloudFile(fileName);
369 case FileStatus.Renamed:
370 RenameCloudFile(state);
376 private void RenameCloudFile(WorkflowState state)
378 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Synch);
382 CloudClient.MoveObject("PITHOS", state.OldFileName, state.FileName);
384 this.StatusKeeper.SetFileStatus(state.Path, FileStatus.Unchanged);
385 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
386 Workflow.RaiseChangeNotification(state.Path);
389 private void DeleteCloudFile(string fileName)
391 Contract.Requires(!Path.IsPathRooted(fileName));
393 this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Synch);
394 CloudClient.DeleteObject("PITHOS", fileName);
395 this.StatusKeeper.ClearFileStatus(fileName);
396 this.StatusKeeper.RemoveFileOverlayStatus(fileName);
399 private void UploadCloudFile(string fileName, long fileSize, string path,string hash)
401 Contract.Requires(!Path.IsPathRooted(fileName));
402 //Even if GetObjectInfo times out, we can proceed with the upload
403 var info=CloudClient.GetObjectInfo("PITHOS", fileName);
404 if ( hash != info.Hash)
406 this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch);
408 CloudClient.PutObject("PITHOS", fileName, path);
411 this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged);
412 this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal);
413 Workflow.RaiseChangeNotification(path);
416 private Dictionary<WatcherChangeTypes, FileStatus> _statusDict = new Dictionary<WatcherChangeTypes, FileStatus>
418 {WatcherChangeTypes.Created,FileStatus.Created},
419 {WatcherChangeTypes.Changed,FileStatus.Modified},
420 {WatcherChangeTypes.Deleted,FileStatus.Deleted},
421 {WatcherChangeTypes.Renamed,FileStatus.Renamed}
424 private WorkflowState UpdateFileStatus(WorkflowState state)
426 string path = state.Path;
427 FileStatus status = _statusDict[state.TriggeringChange];
428 var oldStatus = Workflow.StatusKeeper.GetFileStatus(path);
429 if (status == oldStatus)
431 state.Status = status;
435 if (state.Status == FileStatus.Renamed)
436 Workflow.ClearFileStatus(path);
438 state.Status = Workflow.SetFileStatus(path, status);
442 private WorkflowState UpdateOverlayStatus(WorkflowState state)
447 switch (state.Status)
449 case FileStatus.Created:
450 case FileStatus.Modified:
451 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
453 case FileStatus.Deleted:
454 this.StatusKeeper.RemoveFileOverlayStatus(state.Path);
456 case FileStatus.Renamed:
457 this.StatusKeeper.RemoveFileOverlayStatus(state.OldPath);
458 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Modified);
460 case FileStatus.Unchanged:
461 this.StatusKeeper.SetFileOverlayStatus(state.Path, FileOverlayStatus.Normal);
465 if (state.Status==FileStatus.Deleted)
466 Workflow.RaiseChangeNotification(Path.GetDirectoryName(state.Path));
468 Workflow.RaiseChangeNotification(state.Path);
473 private WorkflowState UpdateFileChecksum(WorkflowState state)
478 if (state.Status == FileStatus.Deleted)
481 string path = state.Path;
482 string hash = CalculateHash(path);
484 StatusKeeper.UpdateFileChecksum(path, hash);
490 private static string CalculateHash(string path)
493 using (var hasher = MD5.Create())
494 using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true))
496 var hashBytes = hasher.ComputeHash(stream);
497 var hashBuilder = new StringBuilder();
498 foreach (byte b in hasher.ComputeHash(stream))
499 hashBuilder.Append(b.ToString("x2").ToLower());
500 hash = hashBuilder.ToString();
506 private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg)
508 Debug.WriteLine(String.Format("{0} {1} {2}", arg.ChangeType, arg.Name, arg.FullPath), "INFO");
512 void OnFileEvent(object sender, FileSystemEventArgs e)
514 _fileEvents.Add(new WorkflowState{Path=e.FullPath,FileName = e.Name,TriggeringChange=e.ChangeType});
517 void OnRenameEvent(object sender, RenamedEventArgs e)
519 _fileEvents.Add(new WorkflowState { OldPath=e.OldFullPath,OldFileName=e.OldName,
520 Path = e.FullPath, FileName = e.Name, TriggeringChange = e.ChangeType });
525 if (_watcher != null)
527 _watcher.Changed -= OnFileEvent;
528 _watcher.Created -= OnFileEvent;
529 _watcher.Deleted -= OnFileEvent;
530 _watcher.Renamed -= OnRenameEvent;
534 _fileEvents.CompleteAdding();
547 public void Dispose()
550 GC.SuppressFinalize(this);
553 protected virtual void Dispose(bool disposing)