Revision b5061ac8 trunk/Pithos.Core/PithosMonitor.cs
b/trunk/Pithos.Core/PithosMonitor.cs | ||
---|---|---|
79 | 79 |
string path = Settings.PithosPath; |
80 | 80 |
var proxyUri = ProxyFromSettings(); |
81 | 81 |
CloudClient.Proxy = proxyUri; |
82 |
IndexLocalFiles(path); |
|
82 | 83 |
StartMonitoringFiles(path); |
83 | 84 |
|
84 | 85 |
StartStatusService(); |
... | ... | |
105 | 106 |
return null; |
106 | 107 |
} |
107 | 108 |
|
109 |
private void IndexLocalFiles(string path) |
|
110 |
{ |
|
111 |
Trace.TraceInformation("[START] Inxed Local"); |
|
112 |
try |
|
113 |
{ |
|
114 |
var files = |
|
115 |
from filePath in Directory.EnumerateFiles(path, "*", SearchOption.AllDirectories).AsParallel() |
|
116 |
select filePath; |
|
117 |
StatusKeeper.StoreUnversionedFiles(files); |
|
118 |
|
|
119 |
var newFiles= FileState.FindAllByProperty("OverlayStatus", FileOverlayStatus.Unversioned) |
|
120 |
.Select(state=>state.FilePath); |
|
121 |
foreach (var newFile in newFiles) |
|
122 |
{ |
|
123 |
_uploadEvents.Add(new WorkflowState |
|
124 |
{ |
|
125 |
Path = newFile, |
|
126 |
FileName = Path.GetFileName(newFile), |
|
127 |
TriggeringChange = WatcherChangeTypes.Created |
|
128 |
}); |
|
129 |
} |
|
130 |
|
|
131 |
} |
|
132 |
catch (Exception exc) |
|
133 |
{ |
|
134 |
Trace.TraceError("[ERROR] Index Local - {0}", exc); |
|
135 |
} |
|
136 |
finally |
|
137 |
{ |
|
138 |
Trace.TraceInformation("[END] Inxed Local"); |
|
139 |
} |
|
140 |
} |
|
141 |
|
|
108 | 142 |
private void StartStatusService() |
109 | 143 |
{ |
110 | 144 |
// Create a ServiceHost for the CalculatorService type and provide the base address. |
... | ... | |
159 | 193 |
|
160 | 194 |
CloudClient.Authenticate(Settings.UserName, Settings.ApiKey); |
161 | 195 |
|
162 |
StartListening(); |
|
196 |
StartListening(Settings.PithosPath);
|
|
163 | 197 |
StartSending(); |
164 | 198 |
} |
165 | 199 |
catch (Exception) |
... | ... | |
193 | 227 |
Action = action; |
194 | 228 |
LocalFile = localFile; |
195 | 229 |
CloudFile = cloudFile; |
196 |
LocalHash=new Lazy<string>(()=>CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication); |
|
230 |
LocalHash=new Lazy<string>(()=>Signature.CalculateHash(LocalFile.FullName),LazyThreadSafetyMode.ExecutionAndPublication);
|
|
197 | 231 |
} |
198 | 232 |
|
199 | 233 |
} |
... | ... | |
227 | 261 |
|
228 | 262 |
private Timer timer; |
229 | 263 |
|
230 |
private void StartListening() |
|
264 |
private void StartListening(string accountPath)
|
|
231 | 265 |
{ |
232 | 266 |
|
233 |
Func<Task> listener = ()=>Task.Factory.StartNew(()=>CloudClient.ListObjects("PITHOS")) |
|
267 |
ProcessRemoteFiles(accountPath); |
|
268 |
|
|
269 |
Task.Factory.StartNew(ProcessListenerActions); |
|
270 |
|
|
271 |
} |
|
272 |
|
|
273 |
private Task ProcessRemoteFiles(string accountPath) |
|
274 |
{ |
|
275 |
Trace.TraceInformation("[LISTENER] Scheduled"); |
|
276 |
return Task.Factory.StartNewDelayed(10000) |
|
277 |
.ContinueWith(t=>CloudClient.ListObjects("PITHOS")) |
|
234 | 278 |
.ContinueWith(task => |
235 | 279 |
{ |
236 |
|
|
237 |
var objects = task.Result; |
|
238 |
if (objects.Count == 0) |
|
280 |
Trace.TraceInformation("[LISTENER] Start Processing"); |
|
281 |
|
|
282 |
var remoteObjects = task.Result; |
|
283 |
/* |
|
284 |
if (remoteObjects.Count == 0) |
|
239 | 285 |
return; |
286 |
*/ |
|
240 | 287 |
|
241 |
var pithosDir = new DirectoryInfo(Settings.PithosPath);
|
|
288 |
var pithosDir = new DirectoryInfo(accountPath);
|
|
242 | 289 |
|
243 |
var upFiles = from info in objects
|
|
290 |
var remoteFiles = from info in remoteObjects
|
|
244 | 291 |
select info.Name; |
245 | 292 |
|
246 | 293 |
var onlyLocal = from localFile in pithosDir.EnumerateFiles() |
247 |
where !upFiles.Contains(localFile.Name)
|
|
294 |
where !remoteFiles.Contains(localFile.Name)
|
|
248 | 295 |
select new ListenerAction(CloudActionType.UploadUnconditional, localFile,null); |
249 | 296 |
|
250 | 297 |
|
251 | 298 |
|
252 | 299 |
|
253 | 300 |
var localNames =pithosDir.EnumerateFiles().Select(info => info.Name); |
254 |
var onlyRemote = from upFile in objects
|
|
301 |
var onlyRemote = from upFile in remoteObjects
|
|
255 | 302 |
where !localNames.Contains(upFile.Name) |
256 | 303 |
select new ListenerAction(CloudActionType.DownloadUnconditional,null,upFile); |
257 | 304 |
|
258 | 305 |
|
259 |
var existingObjects = from upFile in objects
|
|
306 |
var commonObjects = from upFile in remoteObjects
|
|
260 | 307 |
join localFile in pithosDir.EnumerateFiles() |
261 |
on upFile.Name equals localFile.Name |
|
262 |
select new ListenerAction(CloudActionType.Download, localFile, upFile); |
|
308 |
on upFile.Name equals localFile.Name
|
|
309 |
select new ListenerAction(CloudActionType.Download, localFile, upFile);
|
|
263 | 310 |
|
264 | 311 |
var uniques = |
265 |
onlyLocal.Union(onlyRemote).Union(existingObjects)
|
|
266 |
.Except(_listenerActions,new LocalFileComparer()); |
|
267 |
|
|
312 |
onlyLocal.Union(onlyRemote).Union(commonObjects)
|
|
313 |
.Except(_listenerActions,new LocalFileComparer());
|
|
314 |
|
|
268 | 315 |
_listenerActions.AddFromEnumerable(uniques, false); |
269 |
|
|
270 |
} |
|
271 |
); |
|
272 | 316 |
|
273 |
Task.Factory.StartNew(() => |
|
274 |
{ |
|
275 |
foreach (var action in _listenerActions.GetConsumingEnumerable()) |
|
276 |
{ |
|
277 |
var localFile = action.LocalFile; |
|
278 |
var cloudFile = action.CloudFile; |
|
279 |
var downloadPath = (cloudFile==null)? String.Empty:Path.Combine(Settings.PithosPath,cloudFile.Name); |
|
280 |
try |
|
281 |
{ |
|
282 |
switch (action.Action) |
|
283 |
{ |
|
284 |
case CloudActionType.UploadUnconditional: |
|
285 |
|
|
286 |
UploadCloudFile(localFile.Name, localFile.Length, |
|
287 |
localFile.FullName, action.LocalHash.Value); |
|
288 |
break; |
|
289 |
case CloudActionType.DownloadUnconditional: |
|
290 |
DownloadCloudFile("PITHOS", cloudFile.Name, downloadPath); |
|
291 |
break; |
|
292 |
case CloudActionType.Download: |
|
293 |
if (File.Exists(downloadPath)) |
|
294 |
{ |
|
295 |
if (cloudFile.Hash != action.LocalHash.Value) |
|
296 |
{ |
|
297 |
var lastLocalTime = localFile.LastWriteTime; |
|
298 |
var lastUpTime = cloudFile.Last_Modified; |
|
299 |
if (lastUpTime <= lastLocalTime) |
|
300 |
{ |
|
301 |
//Files in conflict |
|
302 |
StatusKeeper.SetFileOverlayStatus(downloadPath, |
|
303 |
FileOverlayStatus |
|
304 |
.Conflict); |
|
305 |
} |
|
306 |
else |
|
307 |
DownloadCloudFile("PITHOS", action.CloudFile.Name, |
|
308 |
downloadPath); |
|
309 |
} |
|
310 |
} |
|
311 |
else |
|
312 |
DownloadCloudFile("PITHOS", action.CloudFile.Name, |
|
313 |
downloadPath); |
|
314 |
break; |
|
315 |
} |
|
316 |
} |
|
317 |
catch (Exception exc) |
|
318 |
{ |
|
319 |
Debug.WriteLine("Processing of {0}:{1}->{2} failed. Putting it back in the queue",action.Action,action.LocalFile,action.CloudFile); |
|
320 |
Debug.WriteLine(exc.ToString()); |
|
321 |
_listenerActions.Add(action); |
|
322 |
} |
|
323 |
} |
|
324 |
} |
|
325 |
); |
|
326 |
|
|
327 |
timer = new Timer(o => listener(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); |
|
328 |
|
|
317 |
Trace.TraceInformation("[LISTENER] End Processing"); |
|
318 |
|
|
319 |
} |
|
320 |
).ContinueWith(t=> |
|
321 |
{ |
|
322 |
if (t.IsFaulted) |
|
323 |
{ |
|
324 |
Trace.TraceError("[LISTENER] Exception: {0}",t.Exception); |
|
325 |
} |
|
326 |
else |
|
327 |
{ |
|
328 |
Trace.TraceInformation("[LISTENER] Finished"); |
|
329 |
} |
|
330 |
ProcessRemoteFiles(accountPath); |
|
331 |
}); |
|
332 |
} |
|
333 |
|
|
334 |
private void ProcessListenerActions() |
|
335 |
{ |
|
336 |
foreach(var action in _listenerActions.GetConsumingEnumerable()) |
|
337 |
{ |
|
338 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}",action.Action,action.LocalFile,action.CloudFile); |
|
339 |
var localFile = action.LocalFile; |
|
340 |
var cloudFile = action.CloudFile; |
|
341 |
var downloadPath = (cloudFile == null)? String.Empty |
|
342 |
: Path.Combine(Settings.PithosPath,cloudFile.Name); |
|
343 |
try |
|
344 |
{ |
|
345 |
switch (action.Action) |
|
346 |
{ |
|
347 |
case CloudActionType.UploadUnconditional: |
|
348 |
|
|
349 |
UploadCloudFile(localFile.Name,localFile.Length,localFile.FullName,action.LocalHash.Value); |
|
350 |
break; |
|
351 |
case CloudActionType.DownloadUnconditional: |
|
352 |
DownloadCloudFile("PITHOS",cloudFile.Name,downloadPath); |
|
353 |
break; |
|
354 |
case CloudActionType.Download: |
|
355 |
if (File.Exists(downloadPath)) |
|
356 |
{ |
|
357 |
if (cloudFile.Hash !=action.LocalHash.Value) |
|
358 |
{ |
|
359 |
var lastLocalTime =localFile.LastWriteTime; |
|
360 |
var lastUpTime =cloudFile.Last_Modified; |
|
361 |
if (lastUpTime <=lastLocalTime) |
|
362 |
{ |
|
363 |
//Files in conflict |
|
364 |
StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); |
|
365 |
} |
|
366 |
else |
|
367 |
DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath); |
|
368 |
} |
|
369 |
} |
|
370 |
else |
|
371 |
DownloadCloudFile("PITHOS",action.CloudFile.Name,downloadPath); |
|
372 |
break; |
|
373 |
} |
|
374 |
Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile); |
|
375 |
} |
|
376 |
catch (Exception exc) |
|
377 |
{ |
|
378 |
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
|
379 |
action.Action, action.LocalFile,action.CloudFile,exc); |
|
380 |
Trace.TraceError(exc.ToString()); |
|
381 |
|
|
382 |
_listenerActions.Add(action); |
|
383 |
} |
|
384 |
} |
|
329 | 385 |
} |
330 | 386 |
|
331 | 387 |
private void DownloadCloudFile(string container, string fileName, string localPath) |
... | ... | |
446 | 502 |
if ( hash != info.Hash) |
447 | 503 |
{ |
448 | 504 |
this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Synch); |
449 |
|
|
450 |
CloudClient.PutObject("PITHOS", fileName, path); |
|
451 |
|
|
505 |
|
|
506 |
CloudClient.PutObject("PITHOS", fileName, path, hash).Wait(); |
|
452 | 507 |
} |
453 | 508 |
this.StatusKeeper.SetFileStatus(path,FileStatus.Unchanged); |
454 | 509 |
this.StatusKeeper.SetFileOverlayStatus(path,FileOverlayStatus.Normal); |
... | ... | |
521 | 576 |
return state; |
522 | 577 |
|
523 | 578 |
string path = state.Path; |
524 |
string hash = CalculateHash(path); |
|
579 |
string hash = Signature.CalculateHash(path);
|
|
525 | 580 |
|
526 | 581 |
StatusKeeper.UpdateFileChecksum(path, hash); |
527 | 582 |
|
... | ... | |
529 | 584 |
return state; |
530 | 585 |
} |
531 | 586 |
|
532 |
private static string CalculateHash(string path) |
|
533 |
{ |
|
534 |
string hash; |
|
535 |
using (var hasher = MD5.Create()) |
|
536 |
using (var stream = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, true)) |
|
537 |
{ |
|
538 |
var hashBytes = hasher.ComputeHash(stream); |
|
539 |
var hashBuilder = new StringBuilder(); |
|
540 |
foreach (byte b in hasher.ComputeHash(stream)) |
|
541 |
hashBuilder.Append(b.ToString("x2").ToLower()); |
|
542 |
hash = hashBuilder.ToString(); |
|
543 |
|
|
544 |
} |
|
545 |
return hash; |
|
546 |
} |
|
587 |
|
|
547 | 588 |
|
548 | 589 |
private FileSystemEventArgs CalculateSignature(FileSystemEventArgs arg) |
549 | 590 |
{ |
Also available in: Unified diff