root / trunk / Pithos.Core / Agents / PollAgent.cs @ 19265570
History | View | Annotate | Download (56 kB)
1 |
#region |
---|---|
2 |
/* ----------------------------------------------------------------------- |
3 |
* <copyright file="PollAgent.cs" company="GRNet"> |
4 |
* |
5 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
6 |
* |
7 |
* Redistribution and use in source and binary forms, with or |
8 |
* without modification, are permitted provided that the following |
9 |
* conditions are met: |
10 |
* |
11 |
* 1. Redistributions of source code must retain the above |
12 |
* copyright notice, this list of conditions and the following |
13 |
* disclaimer. |
14 |
* |
15 |
* 2. Redistributions in binary form must reproduce the above |
16 |
* copyright notice, this list of conditions and the following |
17 |
* disclaimer in the documentation and/or other materials |
18 |
* provided with the distribution. |
19 |
* |
20 |
* |
21 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
22 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
23 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
24 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
25 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
28 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
29 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
30 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
31 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
32 |
* POSSIBILITY OF SUCH DAMAGE. |
33 |
* |
34 |
* The views and conclusions contained in the software and |
35 |
* documentation are those of the authors and should not be |
36 |
* interpreted as representing official policies, either expressed |
37 |
* or implied, of GRNET S.A. |
38 |
* </copyright> |
39 |
* ----------------------------------------------------------------------- |
40 |
*/ |
41 |
#endregion |
42 |
|
43 |
using System.Collections.Concurrent; |
44 |
using System.ComponentModel.Composition; |
45 |
using System.Diagnostics; |
46 |
using System.Diagnostics.Contracts; |
47 |
using System.IO; |
48 |
using System.Reflection; |
49 |
using System.Threading; |
50 |
using System.Threading.Tasks; |
51 |
using Pithos.Interfaces; |
52 |
using Pithos.Network; |
53 |
using log4net; |
54 |
|
55 |
namespace Pithos.Core.Agents |
56 |
{ |
57 |
using System; |
58 |
using System.Collections.Generic; |
59 |
using System.Linq; |
60 |
|
61 |
/*public class PollRequest |
62 |
{ |
63 |
public DateTime? Since { get; set; } |
64 |
public IEnumerable<string> Batch { get; set; } |
65 |
}*/ |
66 |
|
67 |
|
68 |
/// <summary> |
69 |
/// PollAgent periodically polls the server to detect object changes. The agent retrieves a listing of all |
70 |
/// objects and compares it with a previously cached version to detect differences. |
71 |
/// New files are downloaded, missing files are deleted from the local file system and common files are compared |
72 |
/// to determine the appropriate action |
73 |
/// </summary> |
74 |
[Export] |
75 |
public class PollAgent |
76 |
{ |
77 |
private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
78 |
|
79 |
[System.ComponentModel.Composition.Import] |
80 |
public IStatusKeeper StatusKeeper { get; set; } |
81 |
|
82 |
[System.ComponentModel.Composition.Import] |
83 |
public IPithosSettings Settings { get; set; } |
84 |
|
85 |
[System.ComponentModel.Composition.Import] |
86 |
public NetworkAgent NetworkAgent { get; set; } |
87 |
|
88 |
[System.ComponentModel.Composition.Import] |
89 |
public Selectives Selectives { get; set; } |
90 |
|
91 |
public IStatusNotification StatusNotification { get; set; } |
92 |
|
93 |
private CancellationTokenSource _currentOperationCancellation = new CancellationTokenSource(); |
94 |
|
95 |
public void CancelCurrentOperation() |
96 |
{ |
97 |
//What does it mean to cancel the current upload/download? |
98 |
//Obviously, the current operation will be cancelled by throwing |
99 |
//a cancellation exception. |
100 |
// |
101 |
//The default behavior is to retry any operations that throw. |
102 |
//Obviously this is not what we want in this situation. |
103 |
//The cancelled operation should NOT bea retried. |
104 |
// |
105 |
//This can be done by catching the cancellation exception |
106 |
//and avoiding the retry. |
107 |
// |
108 |
|
109 |
//Have to reset the cancellation source - it is not possible to reset the source |
110 |
//Have to prevent a case where an operation requests a token from the old source |
111 |
var oldSource = Interlocked.Exchange(ref _currentOperationCancellation, new CancellationTokenSource()); |
112 |
oldSource.Cancel(); |
113 |
|
114 |
} |
115 |
|
116 |
public bool Pause |
117 |
{ |
118 |
get { |
119 |
return _pause; |
120 |
} |
121 |
set { |
122 |
_pause = value; |
123 |
if (!_pause) |
124 |
_unPauseEvent.Set(); |
125 |
else |
126 |
{ |
127 |
_unPauseEvent.Reset(); |
128 |
} |
129 |
} |
130 |
} |
131 |
|
132 |
public CancellationToken CancellationToken |
133 |
{ |
134 |
get { return _currentOperationCancellation.Token; } |
135 |
} |
136 |
|
137 |
private bool _firstPoll = true; |
138 |
|
139 |
//The Sync Event signals a manual synchronisation |
140 |
private readonly AsyncManualResetEvent _syncEvent = new AsyncManualResetEvent(); |
141 |
|
142 |
private readonly AsyncManualResetEvent _unPauseEvent = new AsyncManualResetEvent(true); |
143 |
|
144 |
private readonly ConcurrentDictionary<string, DateTime> _lastSeen = new ConcurrentDictionary<string, DateTime>(); |
145 |
private readonly ConcurrentDictionary<Uri, AccountInfo> _accounts = new ConcurrentDictionary<Uri,AccountInfo>(); |
146 |
|
147 |
//private readonly ActionBlock<PollRequest> _pollAction; |
148 |
|
149 |
readonly HashSet<string> _knownContainers = new HashSet<string>(); |
150 |
|
151 |
|
152 |
/// <summary> |
153 |
/// Start a manual synchronization |
154 |
/// </summary> |
155 |
public void SynchNow(IEnumerable<string> paths=null) |
156 |
{ |
157 |
_batchQueue.Enqueue(paths); |
158 |
_syncEvent.Set(); |
159 |
|
160 |
//_pollAction.Post(new PollRequest {Batch = paths}); |
161 |
} |
162 |
|
163 |
readonly ConcurrentQueue<IEnumerable<string>> _batchQueue=new ConcurrentQueue<IEnumerable<string>>(); |
164 |
|
165 |
ConcurrentDictionary<string,MovedEventArgs> _moves=new ConcurrentDictionary<string, MovedEventArgs>(); |
166 |
|
167 |
public void PostMove(MovedEventArgs args) |
168 |
{ |
169 |
TaskEx.Run(() => _moves.AddOrUpdate(args.OldFullPath, args,(s,e)=>e)); |
170 |
} |
171 |
|
172 |
/// <summary> |
173 |
/// Remote files are polled periodically. Any changes are processed |
174 |
/// </summary> |
175 |
/// <param name="since"></param> |
176 |
/// <returns></returns> |
177 |
public async Task PollRemoteFiles(DateTimeOffset? since = null) |
178 |
{ |
179 |
if (Log.IsDebugEnabled) |
180 |
Log.DebugFormat("Polling changes after [{0}]",since); |
181 |
|
182 |
Debug.Assert(Thread.CurrentThread.IsBackground, "Polling Ended up in the main thread!"); |
183 |
|
184 |
//GC.Collect(); |
185 |
|
186 |
using (ThreadContext.Stacks["Retrieve Remote"].Push("All accounts")) |
187 |
{ |
188 |
//If this poll fails, we will retry with the same since value |
189 |
DateTimeOffset? nextSince = since; |
190 |
try |
191 |
{ |
192 |
_unPauseEvent.Wait(); |
193 |
UpdateStatus(PithosStatus.PollSyncing); |
194 |
|
195 |
var accountBatches=new Dictionary<Uri, IEnumerable<string>>(); |
196 |
IEnumerable<string> batch = null; |
197 |
if (_batchQueue.TryDequeue(out batch) && batch != null) |
198 |
foreach (var account in _accounts.Values) |
199 |
{ |
200 |
var accountBatch = batch.Where(path => path.IsAtOrBelow(account.AccountPath)); |
201 |
accountBatches[account.AccountKey] = accountBatch; |
202 |
} |
203 |
|
204 |
var moves=Interlocked.Exchange(ref _moves, new ConcurrentDictionary<string, MovedEventArgs>()); |
205 |
|
206 |
var tasks = new List<Task<DateTimeOffset?>>(); |
207 |
foreach(var accountInfo in _accounts.Values) |
208 |
{ |
209 |
IEnumerable<string> accountBatch ; |
210 |
accountBatches.TryGetValue(accountInfo.AccountKey,out accountBatch); |
211 |
var t=ProcessAccountFiles (accountInfo, accountBatch, moves,since); |
212 |
tasks.Add(t); |
213 |
} |
214 |
|
215 |
var taskList = tasks.ToList(); |
216 |
var nextTimes=await TaskEx.WhenAll(taskList).ConfigureAwait(false); |
217 |
|
218 |
_firstPoll = false; |
219 |
//Reschedule the poll with the current timestamp as a "since" value |
220 |
|
221 |
if (nextTimes.Length>0) |
222 |
nextSince = nextTimes.Min(); |
223 |
if (Log.IsDebugEnabled) |
224 |
Log.DebugFormat("Next Poll for changes since [{0}]",nextSince); |
225 |
} |
226 |
catch (Exception ex) |
227 |
{ |
228 |
Log.ErrorFormat("Error while processing accounts\r\n{0}", ex); |
229 |
//In case of failure retry with the same "since" value |
230 |
} |
231 |
|
232 |
UpdateStatus(PithosStatus.PollComplete); |
233 |
//The multiple try blocks are required because we can't have an await call |
234 |
//inside a finally block |
235 |
//TODO: Find a more elegant solution for reschedulling in the event of an exception |
236 |
try |
237 |
{ |
238 |
//Wait for the polling interval to pass or the Sync event to be signalled |
239 |
nextSince = await WaitForScheduledOrManualPoll(nextSince).ConfigureAwait(false); |
240 |
} |
241 |
finally |
242 |
{ |
243 |
//Ensure polling is scheduled even in case of error |
244 |
TaskEx.Run(()=>PollRemoteFiles(nextSince)); |
245 |
//_pollAction.Post(new PollRequest {Since = nextSince}); |
246 |
} |
247 |
} |
248 |
} |
249 |
|
250 |
/// <summary> |
251 |
/// Wait for the polling period to expire or a manual sync request |
252 |
/// </summary> |
253 |
/// <param name="since"></param> |
254 |
/// <returns></returns> |
255 |
private async Task<DateTimeOffset?> WaitForScheduledOrManualPoll(DateTimeOffset? since) |
256 |
{ |
257 |
var sync = _syncEvent.WaitAsync(); |
258 |
var delay = TimeSpan.FromSeconds(Settings.PollingInterval); |
259 |
if (Log.IsDebugEnabled) |
260 |
Log.DebugFormat("Next Poll at [{0}]", DateTime.Now.Add(delay)); |
261 |
var wait = TaskEx.Delay(delay); |
262 |
|
263 |
var signaledTask = await TaskEx.WhenAny(sync, wait).ConfigureAwait(false); |
264 |
|
265 |
//Pausing takes precedence over manual sync or awaiting |
266 |
_unPauseEvent.Wait(); |
267 |
|
268 |
//Wait for network processing to finish before polling |
269 |
var pauseTask=NetworkAgent.ProceedEvent.WaitAsync(); |
270 |
await TaskEx.WhenAll(signaledTask, pauseTask).ConfigureAwait(false); |
271 |
|
272 |
//If polling is signalled by SynchNow, ignore the since tag |
273 |
if (sync.IsCompleted) |
274 |
{ |
275 |
_syncEvent.Reset(); |
276 |
return null; |
277 |
} |
278 |
return since; |
279 |
} |
280 |
|
281 |
|
282 |
|
283 |
public async Task<DateTimeOffset?> ProcessAccountFiles(AccountInfo accountInfo, IEnumerable<string> accountBatch, ConcurrentDictionary<string, MovedEventArgs> moves, DateTimeOffset? since = null) |
284 |
{ |
285 |
if (accountInfo == null) |
286 |
throw new ArgumentNullException("accountInfo"); |
287 |
if (String.IsNullOrWhiteSpace(accountInfo.AccountPath)) |
288 |
throw new ArgumentException("The AccountInfo.AccountPath is empty", "accountInfo"); |
289 |
Contract.EndContractBlock(); |
290 |
|
291 |
|
292 |
using (ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) |
293 |
{ |
294 |
|
295 |
await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); |
296 |
|
297 |
Log.Info("Scheduled"); |
298 |
var client = new CloudFilesClient(accountInfo); |
299 |
|
300 |
//We don't need to check the trash container |
301 |
var allContainers=await client.ListContainers(accountInfo.UserName).ConfigureAwait(false); |
302 |
var containers = allContainers |
303 |
.Where(c=>c.Name.ToString()!="trash") |
304 |
.ToList(); |
305 |
|
306 |
|
307 |
CreateContainerFolders(accountInfo, containers); |
308 |
|
309 |
//The nextSince time fallback time is the same as the current. |
310 |
//If polling succeeds, the next Since time will be the smallest of the maximum modification times |
311 |
//of the shared and account objects |
312 |
DateTimeOffset? nextSince = since; |
313 |
|
314 |
try |
315 |
{ |
316 |
//Wait for any deletions to finish |
317 |
await NetworkAgent.GetDeleteAwaiter().ConfigureAwait(false); |
318 |
//Get the poll time now. We may miss some deletions but it's better to keep a file that was deleted |
319 |
//than delete a file that was created while we were executing the poll |
320 |
|
321 |
//Get the list of server objects changed since the last check |
322 |
//The name of the container is passed as state in order to create a dictionary of tasks in a subsequent step |
323 |
var listObjects = (from container in containers |
324 |
select Task<IList<ObjectInfo>>.Factory.StartNew(_ => |
325 |
client.ListObjects(accountInfo.UserName, container.Name, since), container.Name)).ToList(); |
326 |
|
327 |
var listShared = Task<IList<ObjectInfo>>.Factory.StartNew(_ => |
328 |
client.ListSharedObjects(_knownContainers,since), "shared"); |
329 |
listObjects.Add(listShared); |
330 |
var listTasks = await Task.Factory.WhenAll(listObjects.ToArray()).ConfigureAwait(false); |
331 |
|
332 |
using (ThreadContext.Stacks["SCHEDULE"].Push("Process Results")) |
333 |
{ |
334 |
var dict = listTasks.ToDictionary(t => t.AsyncState); |
335 |
|
336 |
//Get all non-trash objects. Remember, the container name is stored in AsyncState |
337 |
var remoteObjects = (from objectList in listTasks |
338 |
where objectList.AsyncState.ToString() != "trash" |
339 |
from obj in objectList.Result |
340 |
orderby obj.Bytes ascending |
341 |
select obj).ToList(); |
342 |
|
343 |
//Get the latest remote object modification date, only if it is after |
344 |
//the original since date |
345 |
nextSince = GetLatestDateAfter(nextSince, remoteObjects); |
346 |
|
347 |
var sharedObjects = dict["shared"].Result; |
348 |
|
349 |
//DON'T process trashed files |
350 |
//If some files are deleted and added again to a folder, they will be deleted |
351 |
//even though they are new. |
352 |
//We would have to check file dates and hashes to ensure that a trashed file |
353 |
//can be deleted safely from the local hard drive. |
354 |
/* |
355 |
//Items with the same name, hash may be both in the container and the trash |
356 |
//Don't delete items that exist in the container |
357 |
var realTrash = from trash in trashObjects |
358 |
where |
359 |
!remoteObjects.Any( |
360 |
info => info.Name == trash.Name && info.Hash == trash.Hash) |
361 |
8 select trash; |
362 |
ProcessTrashedFiles(accountInfo, realTrash); |
363 |
*/ |
364 |
|
365 |
var cleanRemotes = (from info in remoteObjects.Union(sharedObjects) |
366 |
let name = info.Name.ToUnescapedString()??"" |
367 |
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && |
368 |
!name.StartsWith(FolderConstants.CacheFolder + "/", |
369 |
StringComparison.InvariantCultureIgnoreCase) |
370 |
select info).ToList(); |
371 |
|
372 |
if (_firstPoll) |
373 |
StatusKeeper.CleanupOrphanStates(); |
374 |
|
375 |
var differencer = _differencer.PostSnapshot(accountInfo, cleanRemotes); |
376 |
var currentRemotes = differencer.Current.ToList(); |
377 |
StatusKeeper.CleanupStaleStates(accountInfo, currentRemotes); |
378 |
|
379 |
//var filterUris = Selectives.SelectiveUris[accountInfo.AccountKey]; |
380 |
|
381 |
//May have to wait if the FileAgent has asked for a Pause, due to local changes |
382 |
await _unPauseEvent.WaitAsync().ConfigureAwait(false); |
383 |
|
384 |
//Get the local files here |
385 |
var agent = AgentLocator<FileAgent>.Get(accountInfo.AccountPath); |
386 |
var files = LoadLocalFileTuples(accountInfo, accountBatch); |
387 |
|
388 |
|
389 |
var states = StatusKeeper.GetAllStates(); |
390 |
|
391 |
var infos = (from remote in currentRemotes |
392 |
let path = remote.RelativeUrlToFilePath(accountInfo.UserName) |
393 |
let info=agent.GetFileSystemInfo(path) |
394 |
select Tuple.Create(info.FullName,remote)) |
395 |
.ToList(); |
396 |
|
397 |
var token = _currentOperationCancellation.Token; |
398 |
|
399 |
var tuples = MergeSources(infos, files, states,moves).ToList(); |
400 |
|
401 |
var processedPaths = new HashSet<string>(); |
402 |
//Process only the changes in the batch file, if one exists |
403 |
var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath)); |
404 |
foreach (var tuple in stateTuples.Where(s=>!s.Locked)) |
405 |
{ |
406 |
await _unPauseEvent.WaitAsync().ConfigureAwait(false); |
407 |
|
408 |
//Set the Merkle Hash |
409 |
//SetMerkleHash(accountInfo, tuple); |
410 |
|
411 |
await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false); |
412 |
|
413 |
} |
414 |
|
415 |
|
416 |
//On the first run |
417 |
/* |
418 |
if (_firstPoll) |
419 |
{ |
420 |
MarkSuspectedDeletes(accountInfo, cleanRemotes); |
421 |
} |
422 |
*/ |
423 |
|
424 |
|
425 |
Log.Info("[LISTENER] End Processing"); |
426 |
} |
427 |
} |
428 |
catch (Exception ex) |
429 |
{ |
430 |
Log.ErrorFormat("[FAIL] ListObjects for {0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); |
431 |
return nextSince; |
432 |
} |
433 |
|
434 |
Log.Info("[LISTENER] Finished"); |
435 |
return nextSince; |
436 |
} |
437 |
} |
438 |
/* |
439 |
|
440 |
private static void SetMerkleHash(AccountInfo accountInfo, StateTuple tuple) |
441 |
{ |
442 |
//The Merkle hash for directories is that of an empty buffer |
443 |
if (tuple.FileInfo is DirectoryInfo) |
444 |
tuple.C = MERKLE_EMPTY; |
445 |
else if (tuple.FileState != null && tuple.MD5 == tuple.FileState.ETag) |
446 |
{ |
447 |
//If there is a state whose MD5 matches, load the merkle hash from the file state |
448 |
//insteaf of calculating it |
449 |
tuple.C = tuple.FileState.Checksum; |
450 |
} |
451 |
else |
452 |
{ |
453 |
tuple.Merkle = Signature.CalculateTreeHashAsync((FileInfo)tuple.FileInfo, accountInfo.BlockSize, accountInfo.BlockHash,1,progress); |
454 |
//tuple.C=tuple.Merkle.TopHash.ToHashString(); |
455 |
} |
456 |
} |
457 |
*/ |
458 |
|
459 |
private IEnumerable<FileSystemInfo> LoadLocalFileTuples(AccountInfo accountInfo,IEnumerable<string> batch ) |
460 |
{ |
461 |
using (ThreadContext.Stacks["Account Files Hashing"].Push(accountInfo.UserName)) |
462 |
{ |
463 |
var batchPaths = (batch==null)?new List<string>():batch.ToList(); |
464 |
IEnumerable<FileSystemInfo> localInfos=AgentLocator<FileAgent>.Get(accountInfo.AccountPath) |
465 |
.EnumerateFileSystemInfos(); |
466 |
if (batchPaths.Count>0) |
467 |
localInfos= localInfos.Where(fi => batchPaths.Contains(fi.FullName)); |
468 |
|
469 |
return localInfos; |
470 |
} |
471 |
} |
472 |
|
473 |
/// <summary> |
474 |
/// Wait and Pause the agent while waiting |
475 |
/// </summary> |
476 |
/// <param name="backoff"></param> |
477 |
/// <returns></returns> |
478 |
private async Task PauseFor(int backoff) |
479 |
{ |
480 |
|
481 |
Pause = true; |
482 |
await TaskEx.Delay(backoff).ConfigureAwait(false); |
483 |
Pause = false; |
484 |
} |
485 |
|
486 |
private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths, CancellationToken token) |
487 |
{ |
488 |
Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S); |
489 |
|
490 |
//If the processed paths already contain the current path, exit |
491 |
if (!processedPaths.Add(tuple.FilePath)) |
492 |
return; |
493 |
|
494 |
try |
495 |
{ |
496 |
bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000"); |
497 |
|
498 |
var localFilePath = tuple.FilePath; |
499 |
//Don't use the tuple info, it may have been deleted |
500 |
var localInfo = FileInfoExtensions.FromPath(localFilePath); |
501 |
|
502 |
|
503 |
var isUnselectedRootFolder = agent.IsUnselectedRootFolder(tuple.FilePath); |
504 |
|
505 |
//Unselected root folders that have not yet been uploaded should be uploaded and added to the |
506 |
//selective folders |
507 |
|
508 |
if (!Selectives.IsSelected(accountInfo, localFilePath) && |
509 |
!(isUnselectedRootFolder && tuple.ObjectInfo == null)) |
510 |
return; |
511 |
|
512 |
// Local file unchanged? If both C and L are null, make sure it's because |
513 |
//both the file is missing and the state checksum is not missing |
514 |
if (tuple.C == tuple.L /*&& (localInfo.Exists || tuple.FileState == null)*/) |
515 |
{ |
516 |
//No local changes |
517 |
//Server unchanged? |
518 |
if (tuple.S == tuple.L) |
519 |
{ |
520 |
// No server changes |
521 |
//Has the file been renamed locally? |
522 |
if (!await MoveForLocalMove(accountInfo,tuple)) |
523 |
//Has the file been renamed on the server? |
524 |
MoveForServerMove(accountInfo, tuple); |
525 |
} |
526 |
else |
527 |
{ |
528 |
//Different from server |
529 |
//Does the server file exist? |
530 |
if (tuple.S == null) |
531 |
{ |
532 |
//Server file doesn't exist |
533 |
//deleteObjectFromLocal() |
534 |
using ( |
535 |
StatusNotification.GetNotifier("Deleting local {0}", "Deleted local {0}",true, |
536 |
localInfo.Name)) |
537 |
{ |
538 |
DeleteLocalFile(agent, localFilePath); |
539 |
} |
540 |
} |
541 |
else |
542 |
{ |
543 |
//Server file exists |
544 |
//downloadServerObject() // Result: L = S |
545 |
//If the file has moved on the server, move it locally before downloading |
546 |
using ( |
547 |
StatusNotification.GetNotifier("Downloading {0}", "Downloaded {0}",true, |
548 |
localInfo.Name)) |
549 |
{ |
550 |
var targetPath = MoveForServerMove(accountInfo, tuple); |
551 |
if (targetPath != null) |
552 |
{ |
553 |
|
554 |
await DownloadCloudFile(accountInfo, tuple, token, targetPath).ConfigureAwait(false); |
555 |
|
556 |
AddOwnFolderToSelectives(accountInfo, tuple, targetPath); |
557 |
} |
558 |
} |
559 |
} |
560 |
} |
561 |
} |
562 |
else |
563 |
{ |
564 |
|
565 |
//Local changes found |
566 |
|
567 |
//Server unchanged? |
568 |
if (tuple.S == tuple.L) |
569 |
{ |
570 |
//The FileAgent selective sync checks for new root folder files |
571 |
if (!agent.Ignore(localFilePath)) |
572 |
{ |
573 |
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) |
574 |
{ |
575 |
//deleteObjectFromServer() |
576 |
DeleteCloudFile(accountInfo, tuple); |
577 |
//updateRecord( Remove L, S) |
578 |
} |
579 |
else |
580 |
{ |
581 |
//uploadLocalObject() // Result: S = C, L = S |
582 |
var progress = new Progress<HashProgress>(d => |
583 |
StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, localInfo.Name)))); |
584 |
|
585 |
//Is it an unselected root folder |
586 |
var isCreation = isUnselectedRootFolder ||//or a new folder under a selected parent? |
587 |
(localInfo is DirectoryInfo && Selectives.IsSelected(accountInfo, localInfo) && tuple.FileState == null && tuple.ObjectInfo == null); |
588 |
|
589 |
|
590 |
//Is this a result of a FILE move with no modifications? Then try to move it, |
591 |
//to avoid an expensive hash |
592 |
if (!await MoveForLocalMove(accountInfo, tuple)) |
593 |
{ |
594 |
await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false); |
595 |
} |
596 |
|
597 |
//updateRecord( S = C ) |
598 |
//State updated by the uploader |
599 |
|
600 |
if (isCreation ) |
601 |
{ |
602 |
ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token); |
603 |
} |
604 |
} |
605 |
} |
606 |
} |
607 |
else |
608 |
{ |
609 |
if (tuple.C == tuple.S) |
610 |
{ |
611 |
// (Identical Changes) Result: L = S |
612 |
//doNothing() |
613 |
|
614 |
//Don't update anything for nonexistend server files |
615 |
if (tuple.S != null) |
616 |
{ |
617 |
//Detect server moves |
618 |
var targetPath = MoveForServerMove(accountInfo, tuple); |
619 |
if (targetPath != null) |
620 |
{ |
621 |
Debug.Assert(tuple.Merkle != null); |
622 |
StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo, tuple.Merkle); |
623 |
|
624 |
AddOwnFolderToSelectives(accountInfo, tuple, targetPath); |
625 |
} |
626 |
} |
627 |
else |
628 |
{ |
629 |
//At this point, C==S==NULL and we have a stale state (L) |
630 |
//Log the stale tuple for investigation |
631 |
Log.WarnFormat("Stale tuple detected FilePathPath:[{0}], State:[{1}], LocalFile:[{2}]", tuple.FilePath, tuple.FileState, tuple.FileInfo); |
632 |
|
633 |
//And remove it |
634 |
if (!String.IsNullOrWhiteSpace(tuple.FilePath)) |
635 |
StatusKeeper.ClearFileStatus(tuple.FilePath); |
636 |
} |
637 |
} |
638 |
else |
639 |
{ |
640 |
if ((tuple.C == null || !localInfo.Exists) && tuple.ObjectInfo != null) |
641 |
{ |
642 |
//deleteObjectFromServer() |
643 |
DeleteCloudFile(accountInfo, tuple); |
644 |
//updateRecord(Remove L, S) |
645 |
} |
646 |
//If both the local and server files are missing, the state is stale |
647 |
else if (!localInfo.Exists && (tuple.S == null || tuple.ObjectInfo == null)) |
648 |
{ |
649 |
StatusKeeper.ClearFileStatus(localInfo.FullName); |
650 |
} |
651 |
else |
652 |
{ |
653 |
ReportConflictForMismatch(localFilePath); |
654 |
//identifyAsConflict() // Manual action required |
655 |
} |
656 |
} |
657 |
} |
658 |
} |
659 |
} |
660 |
catch (Exception exc) |
661 |
{ |
662 |
//In case of error log and retry with the next poll |
663 |
Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc); |
664 |
} |
665 |
} |
666 |
|
667 |
private void DeleteLocalFile(FileAgent agent, string localFilePath) |
668 |
{ |
669 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Deleted, |
670 |
FileOverlayStatus.Deleted, ""); |
671 |
using (NetworkGate.Acquire(localFilePath, NetworkOperation.Deleting)) |
672 |
{ |
673 |
agent.Delete(localFilePath); |
674 |
} |
675 |
//updateRecord(Remove C, L) |
676 |
StatusKeeper.ClearFileStatus(localFilePath); |
677 |
} |
678 |
|
679 |
private async Task DownloadCloudFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, string targetPath) |
680 |
{ |
681 |
StatusKeeper.SetFileState(targetPath, FileStatus.Modified, FileOverlayStatus.Modified, |
682 |
""); |
683 |
|
684 |
var finalHash=await |
685 |
NetworkAgent.Downloader.DownloadCloudFile(accountInfo, tuple.ObjectInfo, targetPath, |
686 |
token) |
687 |
.ConfigureAwait(false); |
688 |
//updateRecord( L = S ) |
689 |
StatusKeeper.UpdateFileChecksum(targetPath, tuple.ObjectInfo.ETag, |
690 |
finalHash); |
691 |
|
692 |
StatusKeeper.StoreInfo(targetPath, tuple.ObjectInfo,finalHash); |
693 |
} |
694 |
|
695 |
private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token, |
696 |
bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet<string> processedPaths, IProgress<HashProgress> progress) |
697 |
{ |
698 |
var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState, |
699 |
accountInfo.BlockSize, accountInfo.BlockHash, |
700 |
"Poll", isUnselectedRootFolder, token, progress,tuple.Merkle); |
701 |
|
702 |
using (StatusNotification.GetNotifier("Uploading {0}", "Uploaded {0}",true, |
703 |
localInfo.Name)) |
704 |
{ |
705 |
await NetworkAgent.Uploader.UploadCloudFile(action, token).ConfigureAwait(false); |
706 |
} |
707 |
|
708 |
if (isUnselectedRootFolder) |
709 |
{ |
710 |
var dirActions =( |
711 |
from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories) |
712 |
let subAction = new CloudUploadAction(accountInfo, dir, null, |
713 |
accountInfo.BlockSize, accountInfo.BlockHash, |
714 |
"Poll", true, token, progress) |
715 |
select subAction).ToList(); |
716 |
foreach (var dirAction in dirActions) |
717 |
{ |
718 |
processedPaths.Add(dirAction.LocalFile.FullName); |
719 |
} |
720 |
|
721 |
await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray()); |
722 |
} |
723 |
} |
724 |
|
725 |
private async Task<bool> MoveForLocalMove(AccountInfo accountInfo, StateTuple tuple) |
726 |
{ |
727 |
//Is the file a directory or previous path missing? |
728 |
if (tuple.FileInfo is DirectoryInfo) |
729 |
return false; |
730 |
//Is the previous path missing? |
731 |
if (String.IsNullOrWhiteSpace(tuple.OldFullPath)) |
732 |
return false; |
733 |
//Has the file locally, in which case it should be uploaded rather than moved? |
734 |
if (tuple.OldChecksum != tuple.Merkle.TopHash.ToHashString()) |
735 |
return false; |
736 |
|
737 |
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); |
738 |
var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); |
739 |
//Has the file been renamed on the server? |
740 |
if (!tuple.OldFullPath.Equals(serverPath)) |
741 |
{ |
742 |
ReportConflictForDoubleRename(tuple.FilePath); |
743 |
return false; |
744 |
} |
745 |
|
746 |
try |
747 |
{ |
748 |
|
749 |
var client = new CloudFilesClient(accountInfo); |
750 |
var objectInfo = CloudAction.CreateObjectInfoFor(accountInfo, tuple.FileInfo); |
751 |
var containerPath = Path.Combine(accountInfo.AccountPath, objectInfo.Container.ToUnescapedString()); |
752 |
//TODO: SImplify these multiple conversions from and to Uris |
753 |
var oldName = tuple.OldFullPath.AsRelativeTo(containerPath); |
754 |
//Then execute a move instead of an upload |
755 |
using (StatusNotification.GetNotifier("Moving {0}", "Moved {0}", true,tuple.FileInfo.Name)) |
756 |
{ |
757 |
await client.MoveObject(objectInfo.Account, objectInfo.Container, oldName.ToEscapedUri(), |
758 |
objectInfo.Container, objectInfo.Name).ConfigureAwait(false); |
759 |
} |
760 |
return true; |
761 |
} |
762 |
catch (Exception exc) |
763 |
{ |
764 |
Log.ErrorFormat("[MOVE] Failed for [{0}],:\r\n{1}", tuple.FilePath, exc); |
765 |
//Return false to force an upload of the file |
766 |
return false; |
767 |
} |
768 |
|
769 |
} |
770 |
|
771 |
private void AddOwnFolderToSelectives(AccountInfo accountInfo, StateTuple tuple, string targetPath) |
772 |
{ |
773 |
//Not for shared folders |
774 |
if (tuple.ObjectInfo.IsShared==true) |
775 |
return; |
776 |
//Also ensure that any newly created folders are added to the selectives, if the original folder was selected |
777 |
var containerPath = Path.Combine(accountInfo.AccountPath, tuple.ObjectInfo.Container.ToUnescapedString()); |
778 |
|
779 |
//If this is a root folder encountered for the first time |
780 |
if (tuple.L == null && Directory.Exists(tuple.FileInfo.FullName) |
781 |
&& (tuple.FileInfo.FullName.IsAtOrBelow(containerPath))) |
782 |
{ |
783 |
|
784 |
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); |
785 |
var initialPath = Path.Combine(accountInfo.AccountPath, relativePath); |
786 |
|
787 |
//var hasMoved = true;// !initialPath.Equals(targetPath); |
788 |
//If the new path is under a selected folder, add it to the selectives as well |
789 |
if (Selectives.IsSelected(accountInfo, initialPath)) |
790 |
{ |
791 |
Selectives.AddUri(accountInfo, tuple.ObjectInfo.Uri); |
792 |
Selectives.Save(accountInfo); |
793 |
} |
794 |
} |
795 |
} |
796 |
|
797 |
private string MoveForServerMove(AccountInfo accountInfo, StateTuple tuple) |
798 |
{ |
799 |
if (tuple.ObjectInfo == null) |
800 |
return null; |
801 |
var relativePath = tuple.ObjectInfo.RelativeUrlToFilePath(accountInfo.UserName); |
802 |
var serverPath = Path.Combine(accountInfo.AccountPath, relativePath); |
803 |
|
804 |
//Compare Case Insensitive |
805 |
if (String.Equals(tuple.FilePath ,serverPath,StringComparison.InvariantCultureIgnoreCase)) |
806 |
return serverPath; |
807 |
|
808 |
//Has the file been renamed locally? |
809 |
if (!String.IsNullOrWhiteSpace(tuple.OldFullPath) && !tuple.OldFullPath.Equals(tuple.FilePath)) |
810 |
{ |
811 |
ReportConflictForDoubleRename(tuple.FilePath); |
812 |
return null; |
813 |
} |
814 |
|
815 |
tuple.FileInfo.Refresh(); |
816 |
//The file/folder may not exist if it was moved because its parent moved |
817 |
if (!tuple.FileInfo.Exists) |
818 |
{ |
819 |
var target=FileInfoExtensions.FromPath(serverPath); |
820 |
if (!target.Exists) |
821 |
{ |
822 |
Log.ErrorFormat("No source or target found while trying to move {0} to {1}", tuple.FileInfo.FullName, serverPath); |
823 |
} |
824 |
return serverPath; |
825 |
} |
826 |
|
827 |
using (StatusNotification.GetNotifier("Moving local {0}", "Moved local {0}", true,Path.GetFileName(tuple.FilePath))) |
828 |
using(NetworkGate.Acquire(tuple.FilePath,NetworkOperation.Renaming)) |
829 |
{ |
830 |
|
831 |
var fi = tuple.FileInfo as FileInfo; |
832 |
if (fi != null) |
833 |
{ |
834 |
var targetFile = new FileInfo(serverPath); |
835 |
if (!targetFile.Directory.Exists) |
836 |
targetFile.Directory.Create(); |
837 |
fi.MoveTo(serverPath); |
838 |
} |
839 |
var di = tuple.FileInfo as DirectoryInfo; |
840 |
if (di != null) |
841 |
{ |
842 |
var targetDir = new DirectoryInfo(serverPath); |
843 |
if (!targetDir.Parent.Exists) |
844 |
targetDir.Parent.Create(); |
845 |
di.MoveTo(serverPath); |
846 |
} |
847 |
} |
848 |
|
849 |
StatusKeeper.StoreInfo(serverPath, tuple.ObjectInfo); |
850 |
|
851 |
return serverPath; |
852 |
} |
853 |
|
854 |
private void DeleteCloudFile(AccountInfo accountInfo, StateTuple tuple) |
855 |
{ |
856 |
using (StatusNotification.GetNotifier("Deleting server {0}", "Deleted server {0}", true,Path.GetFileName(tuple.FilePath))) |
857 |
{ |
858 |
|
859 |
StatusKeeper.SetFileState(tuple.FilePath, FileStatus.Deleted, |
860 |
FileOverlayStatus.Deleted, ""); |
861 |
NetworkAgent.DeleteAgent.DeleteCloudFile(accountInfo, tuple.ObjectInfo); |
862 |
StatusKeeper.ClearFileStatus(tuple.FilePath); |
863 |
} |
864 |
} |
865 |
|
866 |
private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths,CancellationToken token) |
867 |
{ |
868 |
|
869 |
var dirInfo = tuple.FileInfo as DirectoryInfo; |
870 |
var folderTuples = from folder in dirInfo.EnumerateDirectories("*", SearchOption.AllDirectories) |
871 |
select new StateTuple(folder){C=Signature.MERKLE_EMPTY}; |
872 |
|
873 |
var fileTuples = from file in dirInfo.EnumerateFiles("*", SearchOption.AllDirectories) |
874 |
let state=StatusKeeper.GetStateByFilePath(file.FullName) |
875 |
select new StateTuple(file){ |
876 |
Merkle=StatusAgent.CalculateTreeHash(file,accountInfo,state, |
877 |
Settings.HashingParallelism,token,null) |
878 |
}; |
879 |
|
880 |
//Process folders first, to ensure folders appear on the sever as soon as possible |
881 |
folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait()); |
882 |
|
883 |
fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait()); |
884 |
} |
885 |
|
886 |
|
887 |
|
888 |
private IEnumerable<StateTuple> MergeSources(IEnumerable<Tuple<string, ObjectInfo>> infos, IEnumerable<FileSystemInfo> files, List<FileState> states, ConcurrentDictionary<string, MovedEventArgs> moves) |
889 |
{ |
890 |
var tuplesByPath = new Dictionary<string, StateTuple>(); |
891 |
foreach (var info in files) |
892 |
{ |
893 |
var tuple = new StateTuple(info); |
894 |
//Is this the target of a move event? |
895 |
var moveArg = moves.Values.FirstOrDefault(arg => info.FullName.Equals(arg.FullPath, StringComparison.InvariantCultureIgnoreCase) |
896 |
|| info.FullName.IsAtOrBelow(arg.FullPath)); |
897 |
if (moveArg != null) |
898 |
{ |
899 |
tuple.NewFullPath = info.FullName; |
900 |
var relativePath = info.AsRelativeTo(moveArg.FullPath); |
901 |
tuple.OldFullPath = Path.Combine(moveArg.OldFullPath, relativePath); |
902 |
tuple.OldChecksum = states.FirstOrDefault(st => st.FilePath.Equals(tuple.OldFullPath, StringComparison.InvariantCultureIgnoreCase)) |
903 |
.NullSafe(st => st.Checksum); |
904 |
} |
905 |
|
906 |
tuplesByPath[tuple.FilePath] = tuple; |
907 |
} |
908 |
|
909 |
|
910 |
|
911 |
|
912 |
//For files that have state |
913 |
foreach (var state in states) |
914 |
{ |
915 |
StateTuple hashTuple; |
916 |
|
917 |
|
918 |
if (tuplesByPath.TryGetValue(state.FilePath, out hashTuple)) |
919 |
{ |
920 |
hashTuple.FileState = state; |
921 |
UpdateHashes(hashTuple); |
922 |
} |
923 |
else if (moves.ContainsKey(state.FilePath) && tuplesByPath.TryGetValue(moves[state.FilePath].FullPath, out hashTuple)) |
924 |
{ |
925 |
hashTuple.FileState = state; |
926 |
UpdateHashes(hashTuple); |
927 |
} |
928 |
else |
929 |
{ |
930 |
var fsInfo = FileInfoExtensions.FromPath(state.FilePath); |
931 |
hashTuple = new StateTuple {FileInfo = fsInfo, FileState = state}; |
932 |
|
933 |
//Is the source of a moved item? |
934 |
var moveArg = moves.Values.FirstOrDefault(arg => state.FilePath.Equals(arg.OldFullPath,StringComparison.InvariantCultureIgnoreCase) |
935 |
|| state.FilePath.IsAtOrBelow(arg.OldFullPath)); |
936 |
if (moveArg != null) |
937 |
{ |
938 |
var relativePath = state.FilePath.AsRelativeTo(moveArg.OldFullPath); |
939 |
hashTuple.NewFullPath = Path.Combine(moveArg.FullPath,relativePath); |
940 |
hashTuple.OldFullPath = state.FilePath; |
941 |
//Do we have the old MD5? |
942 |
//hashTuple.OldMD5 = state.LastMD5; |
943 |
} |
944 |
|
945 |
|
946 |
tuplesByPath[state.FilePath] = hashTuple; |
947 |
} |
948 |
} |
949 |
//for files that don't have state |
950 |
var statelessTuples = tuplesByPath.Values.Where(t => t.FileState == null).ToArray(); |
951 |
//If there are too many stateless tuples, update them in parallel |
952 |
if (statelessTuples.Length > 20) |
953 |
Parallel.ForEach(statelessTuples, UpdateHashes); |
954 |
else |
955 |
statelessTuples.ApplyAction(UpdateHashes); |
956 |
|
957 |
var tuplesByID = tuplesByPath.Values |
958 |
.Where(tuple => tuple.FileState != null && tuple.FileState.ObjectID!=null) |
959 |
.ToDictionary(tuple=>tuple.FileState.ObjectID,tuple=>tuple);//new Dictionary<Guid, StateTuple>(); |
960 |
|
961 |
foreach (var info in infos) |
962 |
{ |
963 |
StateTuple hashTuple; |
964 |
var filePath = info.Item1; |
965 |
var objectInfo = info.Item2; |
966 |
var objectID = objectInfo.UUID; |
967 |
|
968 |
if (objectID != _emptyGuid && tuplesByID.TryGetValue(objectID, out hashTuple)) |
969 |
{ |
970 |
hashTuple.ObjectInfo = objectInfo; |
971 |
} |
972 |
else if (tuplesByPath.TryGetValue(filePath, out hashTuple)) |
973 |
{ |
974 |
hashTuple.ObjectInfo = objectInfo; |
975 |
} |
976 |
else |
977 |
{ |
978 |
|
979 |
|
980 |
var fsInfo = FileInfoExtensions.FromPath(filePath); |
981 |
hashTuple= new StateTuple {FileInfo = fsInfo, ObjectInfo = objectInfo}; |
982 |
tuplesByPath[filePath] = hashTuple; |
983 |
|
984 |
if (objectInfo.UUID!=_emptyGuid) |
985 |
tuplesByID[objectInfo.UUID] = hashTuple; |
986 |
} |
987 |
} |
988 |
|
989 |
var tuples = tuplesByPath.Values; |
990 |
var brokenTuples = from tuple in tuples |
991 |
where tuple.FileState != null && tuple.FileState.Checksum == null |
992 |
&& tuple.ObjectInfo != null && (tuple.FileInfo==null || !tuple.FileInfo.Exists) |
993 |
select tuple; |
994 |
var actualTuples = tuples.Except(brokenTuples); |
995 |
Debug.Assert(actualTuples.All(t => t.HashesValid())); |
996 |
|
997 |
foreach (var tuple in brokenTuples) |
998 |
{ |
999 |
StatusKeeper.SetFileState(tuple.FileState.FilePath, |
1000 |
FileStatus.Conflict, FileOverlayStatus.Conflict, "FileState without checksum encountered for server object missing from disk"); |
1001 |
} |
1002 |
|
1003 |
return actualTuples; |
1004 |
} |
1005 |
|
1006 |
|
1007 |
/// <summary> |
1008 |
/// Update the tuple with the file's hashes, avoiding calculation if the file is unchanged |
1009 |
/// </summary> |
1010 |
/// <param name="hashTuple"></param> |
1011 |
/// <remarks> |
1012 |
/// The function first checks the file's size and last write date to see if there are any changes. If there are none, |
1013 |
/// the file's stored hashes are used. |
1014 |
/// Otherwise, MD5 is calculated first to ensure there are no changes. If MD5 is different, the Merkle hash is calculated |
1015 |
/// </remarks> |
1016 |
private void UpdateHashes(StateTuple hashTuple) |
1017 |
{ |
1018 |
|
1019 |
try |
1020 |
{ |
1021 |
var state = hashTuple.NullSafe(s => s.FileState); |
1022 |
var storedHash = state.NullSafe(s => s.Checksum); |
1023 |
var storedHashes = state.NullSafe(s => s.Hashes); |
1024 |
//var storedMD5 = state.NullSafe(s => s.LastMD5); |
1025 |
var storedDate = state.NullSafe(s => s.LastWriteDate) ?? DateTime.MinValue; |
1026 |
var storedLength = state.NullSafe(s => s.LastLength); |
1027 |
|
1028 |
//var md5Hash = Signature.MD5_EMPTY; |
1029 |
var merkle=TreeHash.Empty; |
1030 |
|
1031 |
if (hashTuple.FileInfo is FileInfo) |
1032 |
{ |
1033 |
var file = (FileInfo)hashTuple.FileInfo.WithProperCapitalization(); |
1034 |
|
1035 |
//Attributes unchanged? |
1036 |
//LastWriteTime is only accurate to the second |
1037 |
var unchangedAttributes = file.LastWriteTime - storedDate < TimeSpan.FromSeconds(1) |
1038 |
&& storedLength == file.Length; |
1039 |
|
1040 |
//Attributes appear unchanged but the file length doesn't match the stored hash ? |
1041 |
var nonEmptyMismatch = unchangedAttributes && |
1042 |
(file.Length == 0 ^ storedHash== Signature.MERKLE_EMPTY); |
1043 |
|
1044 |
//Missing hashes for NON-EMPTY hash ? |
1045 |
var missingHashes = storedHash != Signature.MERKLE_EMPTY && |
1046 |
String.IsNullOrWhiteSpace(storedHashes); |
1047 |
|
1048 |
//Unchanged attributes but changed MD5 |
1049 |
//Short-circuiting ensures MD5 is computed only if the attributes are changed |
1050 |
|
1051 |
//var md5Mismatch = (!unchangedAttributes && file.ComputeShortHash(StatusNotification) != storedMD5); |
1052 |
|
1053 |
|
1054 |
//If the attributes are unchanged but the Merkle doesn't match the size, |
1055 |
//or the attributes and the MD5 hash have changed, |
1056 |
//or the hashes are missing but the tophash is NOT empty, we need to recalculate |
1057 |
// |
1058 |
//Otherwise we load the hashes from state |
1059 |
if (!unchangedAttributes || nonEmptyMismatch || missingHashes) |
1060 |
merkle = RecalculateTreehash(file); |
1061 |
else |
1062 |
{ |
1063 |
merkle=TreeHash.Parse(hashTuple.FileState.Hashes); |
1064 |
//merkle.MD5 = storedMD5; |
1065 |
} |
1066 |
|
1067 |
|
1068 |
//md5Hash = merkle.MD5; |
1069 |
} |
1070 |
//hashTuple.MD5 = md5Hash; |
1071 |
//Setting Merkle also updates C |
1072 |
hashTuple.Merkle = merkle; |
1073 |
} |
1074 |
catch (IOException) |
1075 |
{ |
1076 |
hashTuple.Locked = true; |
1077 |
} |
1078 |
} |
1079 |
|
1080 |
/// <summary> |
1081 |
/// Recalculate a file's treehash and md5 and update the database |
1082 |
/// </summary> |
1083 |
/// <param name="file"></param> |
1084 |
/// <returns></returns> |
1085 |
private TreeHash RecalculateTreehash(FileInfo file) |
1086 |
{ |
1087 |
var progress = new Progress<HashProgress>(d =>StatusNotification.Notify( |
1088 |
new StatusNotification(String.Format("Hashing {0:p} of {1}", d.Percentage, file.Name)))); |
1089 |
var merkle = Signature.CalculateTreeHash(file, StatusKeeper.BlockSize, StatusKeeper.BlockHash, |
1090 |
Settings.HashingParallelism, CancellationToken, progress); |
1091 |
|
1092 |
StatusKeeper.UpdateFileHashes(file.FullName, merkle); |
1093 |
return merkle; |
1094 |
} |
1095 |
|
1096 |
/// <summary> |
1097 |
/// Returns the latest LastModified date from the list of objects, but only if it is before |
1098 |
/// than the threshold value |
1099 |
/// </summary> |
1100 |
/// <param name="threshold"></param> |
1101 |
/// <param name="cloudObjects"></param> |
1102 |
/// <returns></returns> |
1103 |
private static DateTimeOffset? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects) |
1104 |
{ |
1105 |
DateTimeOffset? maxDate = null; |
1106 |
if (cloudObjects!=null && cloudObjects.Count > 0) |
1107 |
maxDate = cloudObjects.Max(obj => obj.Last_Modified); |
1108 |
if (!maxDate.HasValue) |
1109 |
return threshold; |
1110 |
if (!threshold.HasValue|| threshold > maxDate) |
1111 |
return maxDate; |
1112 |
return threshold; |
1113 |
} |
1114 |
|
1115 |
/// <summary> |
1116 |
/// Returns the latest LastModified date from the list of objects, but only if it is after |
1117 |
/// the threshold value |
1118 |
/// </summary> |
1119 |
/// <param name="threshold"></param> |
1120 |
/// <param name="cloudObjects"></param> |
1121 |
/// <returns></returns> |
1122 |
private static DateTimeOffset? GetLatestDateAfter(DateTimeOffset? threshold, IList<ObjectInfo> cloudObjects) |
1123 |
{ |
1124 |
DateTimeOffset? maxDate = null; |
1125 |
if (cloudObjects!=null && cloudObjects.Count > 0) |
1126 |
maxDate = cloudObjects.Max(obj => obj.Last_Modified); |
1127 |
if (!maxDate.HasValue) |
1128 |
return threshold; |
1129 |
if (!threshold.HasValue|| threshold < maxDate) |
1130 |
return maxDate; |
1131 |
return threshold; |
1132 |
} |
1133 |
|
1134 |
readonly AccountsDifferencer _differencer = new AccountsDifferencer(); |
1135 |
private bool _pause; |
1136 |
private readonly string _emptyGuid = Guid.Empty.ToString(); |
1137 |
|
1138 |
|
1139 |
|
1140 |
private void ReportConflictForMismatch(string localFilePath) |
1141 |
{ |
1142 |
if (String.IsNullOrWhiteSpace(localFilePath)) |
1143 |
throw new ArgumentNullException("localFilePath"); |
1144 |
Contract.EndContractBlock(); |
1145 |
|
1146 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File changed at the server"); |
1147 |
UpdateStatus(PithosStatus.HasConflicts); |
1148 |
var message = String.Format("Conflict detected for file {0}", localFilePath); |
1149 |
Log.Warn(message); |
1150 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
1151 |
} |
1152 |
|
1153 |
private void ReportConflictForDoubleRename(string localFilePath) |
1154 |
{ |
1155 |
if (String.IsNullOrWhiteSpace(localFilePath)) |
1156 |
throw new ArgumentNullException("localFilePath"); |
1157 |
Contract.EndContractBlock(); |
1158 |
|
1159 |
StatusKeeper.SetFileState(localFilePath, FileStatus.Conflict, FileOverlayStatus.Conflict, "File renamed both locally and on the server"); |
1160 |
UpdateStatus(PithosStatus.HasConflicts); |
1161 |
var message = String.Format("Double rename conflict detected for file {0}", localFilePath); |
1162 |
Log.Warn(message); |
1163 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
1164 |
} |
1165 |
|
1166 |
|
1167 |
/// <summary> |
1168 |
/// Notify the UI to update the visual status |
1169 |
/// </summary> |
1170 |
/// <param name="status"></param> |
1171 |
private void UpdateStatus(PithosStatus status) |
1172 |
{ |
1173 |
try |
1174 |
{ |
1175 |
StatusNotification.SetPithosStatus(status); |
1176 |
//StatusNotification.Notify(new Notification()); |
1177 |
} |
1178 |
catch (Exception exc) |
1179 |
{ |
1180 |
//Failure is not critical, just log it |
1181 |
Log.Warn("Error while updating status", exc); |
1182 |
} |
1183 |
} |
1184 |
|
1185 |
private static void CreateContainerFolders(AccountInfo accountInfo, IEnumerable<ContainerInfo> containers) |
1186 |
{ |
1187 |
var containerPaths = from container in containers |
1188 |
let containerPath = Path.Combine(accountInfo.AccountPath, container.Name.ToUnescapedString()) |
1189 |
where container.Name.ToString() != FolderConstants.TrashContainer && !Directory.Exists(containerPath) |
1190 |
select containerPath; |
1191 |
|
1192 |
foreach (var path in containerPaths) |
1193 |
{ |
1194 |
Directory.CreateDirectory(path); |
1195 |
} |
1196 |
} |
1197 |
|
1198 |
public void AddAccount(AccountInfo accountInfo) |
1199 |
{ |
1200 |
//Avoid adding a duplicate accountInfo |
1201 |
_accounts.TryAdd(accountInfo.AccountKey, accountInfo); |
1202 |
} |
1203 |
|
1204 |
public void RemoveAccount(AccountInfo accountInfo) |
1205 |
{ |
1206 |
AccountInfo account; |
1207 |
_accounts.TryRemove(accountInfo.AccountKey, out account); |
1208 |
|
1209 |
SnapshotDifferencer differencer; |
1210 |
_differencer.Differencers.TryRemove(accountInfo.AccountKey, out differencer); |
1211 |
} |
1212 |
|
1213 |
} |
1214 |
} |