root / trunk / Pithos.Core / Agents / NetworkAgent.cs @ a27aa447
History | View | Annotate | Download (31 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Generic; |
3 |
using System.ComponentModel.Composition; |
4 |
using System.Diagnostics; |
5 |
using System.Diagnostics.Contracts; |
6 |
using System.IO; |
7 |
using System.Linq; |
8 |
using System.Text; |
9 |
using System.Threading; |
10 |
using System.Threading.Tasks; |
11 |
using Pithos.Interfaces; |
12 |
using Pithos.Network; |
13 |
|
14 |
namespace Pithos.Core.Agents |
15 |
{ |
16 |
[Export] |
17 |
public class NetworkAgent |
18 |
{ |
19 |
private Agent<CloudAction> _agent; |
20 |
|
21 |
[Import] |
22 |
public IStatusKeeper StatusKeeper { get; set; } |
23 |
|
24 |
public IStatusNotification StatusNotification { get; set; } |
25 |
[Import] |
26 |
public ICloudClient CloudClient { get; set; } |
27 |
|
28 |
[Import] |
29 |
public FileAgent FileAgent {get;set;} |
30 |
|
31 |
/* |
32 |
[Import] |
33 |
public IPithosWorkflow Workflow { get; set; } |
34 |
*/ |
35 |
|
36 |
|
37 |
public string PithosContainer { get; set; } |
38 |
public string TrashContainer { get; private set; } |
39 |
|
40 |
public int BlockSize { get; set; } |
41 |
public string BlockHash { get; set; } |
42 |
|
43 |
|
44 |
public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash) |
45 |
{ |
46 |
if (String.IsNullOrWhiteSpace(pithosContainer)) |
47 |
throw new ArgumentNullException("pithosContainer"); |
48 |
if (String.IsNullOrWhiteSpace(trashContainer)) |
49 |
throw new ArgumentNullException("trashContainer"); |
50 |
Contract.EndContractBlock(); |
51 |
|
52 |
PithosContainer = pithosContainer; |
53 |
TrashContainer = trashContainer; |
54 |
BlockSize = blockSize; |
55 |
BlockHash = blockHash; |
56 |
|
57 |
|
58 |
_agent = Agent<CloudAction>.Start(inbox => |
59 |
{ |
60 |
Action loop = null; |
61 |
loop = () => |
62 |
{ |
63 |
var message = inbox.Receive(); |
64 |
|
65 |
/* |
66 |
var process=Process(message); |
67 |
inbox.LoopAsync(process, loop); |
68 |
*/ |
69 |
|
70 |
/* |
71 |
process1.ContinueWith(t => |
72 |
{ |
73 |
inbox.DoAsync(loop); |
74 |
if (t.IsFaulted) |
75 |
{ |
76 |
var ex = t.Exception.InnerException; |
77 |
if (ex is OperationCanceledException) |
78 |
inbox.Stop(); |
79 |
} |
80 |
}); |
81 |
*/ |
82 |
//inbox.DoAsync(loop); |
83 |
|
84 |
|
85 |
var process = message.ContinueWith(t => |
86 |
{ |
87 |
var action = t.Result; |
88 |
|
89 |
Process(action); |
90 |
inbox.DoAsync(loop); |
91 |
}); |
92 |
|
93 |
process.ContinueWith(t => |
94 |
{ |
95 |
inbox.DoAsync(loop); |
96 |
if (t.IsFaulted) |
97 |
{ |
98 |
var ex = t.Exception.InnerException; |
99 |
if (ex is OperationCanceledException) |
100 |
inbox.Stop(); |
101 |
} |
102 |
}); |
103 |
|
104 |
|
105 |
}; |
106 |
loop(); |
107 |
}); |
108 |
} |
109 |
|
110 |
|
111 |
public void Post(CloudAction cloudAction) |
112 |
{ |
113 |
if (cloudAction == null) |
114 |
throw new ArgumentNullException("cloudAction"); |
115 |
Contract.EndContractBlock(); |
116 |
|
117 |
//If the action targets a local file, add a treehash calculation |
118 |
if (cloudAction.LocalFile != null) |
119 |
{ |
120 |
cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, |
121 |
BlockSize, BlockHash).Result |
122 |
.TopHash.ToHashString()); |
123 |
|
124 |
} |
125 |
_agent.Post(cloudAction); |
126 |
} |
127 |
|
128 |
class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo> |
129 |
{ |
130 |
public bool Equals(ObjectInfo x, ObjectInfo y) |
131 |
{ |
132 |
return x.Name.Equals(y.Name,StringComparison.InvariantCultureIgnoreCase); |
133 |
} |
134 |
|
135 |
public int GetHashCode(ObjectInfo obj) |
136 |
{ |
137 |
return obj.Name.ToLower().GetHashCode(); |
138 |
} |
139 |
} |
140 |
|
141 |
public Task ProcessRemoteFiles(string accountPath,DateTime? since=null) |
142 |
{ |
143 |
|
144 |
Trace.CorrelationManager.StartLogicalOperation(); |
145 |
Trace.TraceInformation("[LISTENER] Scheduled"); |
146 |
var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t => |
147 |
CloudClient.ListObjects(PithosContainer,since)); |
148 |
|
149 |
DateTime nextSince = DateTime.Now.AddSeconds(-1); |
150 |
|
151 |
var enqueueFiles = listObjects.ContinueWith(task => |
152 |
{ |
153 |
if (task.IsFaulted) |
154 |
{ |
155 |
//ListObjects failed at this point, need to reschedule |
156 |
Trace.TraceError("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception); |
157 |
ProcessRemoteFiles(accountPath, since); |
158 |
return; |
159 |
} |
160 |
Trace.CorrelationManager.StartLogicalOperation("Listener"); |
161 |
Trace.TraceInformation("[LISTENER] Start Processing"); |
162 |
|
163 |
var remoteObjects = task.Result; |
164 |
|
165 |
var remote=from info in remoteObjects |
166 |
let name=info.Name |
167 |
where !name.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase) && |
168 |
!name.StartsWith("fragments/",StringComparison.InvariantCultureIgnoreCase) |
169 |
select info; |
170 |
|
171 |
var commonObjects = new List<Tuple<ObjectInfo, FileInfo,FileState>>(); |
172 |
var remoteOnly = new List<ObjectInfo>(); |
173 |
|
174 |
//In order to avoid multiple iterations over the files, we iterate only once |
175 |
//over the remote files |
176 |
foreach (var objectInfo in remote) |
177 |
{ |
178 |
var relativePath= objectInfo.Name.RelativeUrlToFilePath();// fileInfo.AsRelativeUrlTo(FileAgent.RootPath); |
179 |
//and remove any matching objects from the list, adding them to the commonObjects list |
180 |
if (FileAgent.Exists(relativePath)) |
181 |
{ |
182 |
var localFile = FileAgent.GetFileInfo(relativePath); |
183 |
var state=FileState.FindByFilePath(localFile.FullName); |
184 |
commonObjects.Add(Tuple.Create(objectInfo, localFile,state)); |
185 |
} |
186 |
else |
187 |
//If there is no match we add them to the localFiles list |
188 |
remoteOnly.Add(objectInfo); |
189 |
} |
190 |
|
191 |
//At the end of the iteration, the *remote* list will contain the files that exist |
192 |
//only on the server |
193 |
|
194 |
//Remote files should be downloaded |
195 |
var actionsForRemote = from upFile in remoteOnly |
196 |
select new CloudAction(CloudActionType.DownloadUnconditional,upFile); |
197 |
|
198 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
199 |
var actionsForCommon = from pair in commonObjects |
200 |
let objectInfo = pair.Item1 |
201 |
let localFile = pair.Item2 |
202 |
let state=pair.Item3 |
203 |
select new CloudAction(CloudActionType.MustSynch, |
204 |
localFile, objectInfo,state); |
205 |
|
206 |
|
207 |
//Collect all the actions |
208 |
var allActions = actionsForRemote.Union(actionsForCommon); |
209 |
|
210 |
//And remove those that are already being processed by the agent |
211 |
var distinctActions =allActions |
212 |
.Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) |
213 |
.ToList(); |
214 |
|
215 |
//Queue all the actions |
216 |
foreach (var message in distinctActions) |
217 |
{ |
218 |
Post(message); |
219 |
} |
220 |
|
221 |
|
222 |
if(remoteOnly.Count>0) |
223 |
StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count)); |
224 |
|
225 |
Trace.TraceInformation("[LISTENER] End Processing"); |
226 |
Trace.CorrelationManager.StopLogicalOperation(); |
227 |
|
228 |
}); |
229 |
|
230 |
var loop = enqueueFiles.ContinueWith(t => |
231 |
{ |
232 |
if (t.IsFaulted) |
233 |
{ |
234 |
Trace.TraceError("[LISTENER] Exception: {0}", t.Exception); |
235 |
} |
236 |
else |
237 |
{ |
238 |
Trace.TraceInformation("[LISTENER] Finished"); |
239 |
} |
240 |
ProcessRemoteFiles(accountPath, nextSince); |
241 |
|
242 |
}); |
243 |
return loop; |
244 |
} |
245 |
|
246 |
|
247 |
private Task Process(Task<CloudAction> action) |
248 |
{ |
249 |
return action.ContinueWith(t=> Process(t.Result)); |
250 |
} |
251 |
|
252 |
|
253 |
private void Process(CloudAction action) |
254 |
{ |
255 |
if (action==null) |
256 |
throw new ArgumentNullException("action"); |
257 |
Contract.EndContractBlock(); |
258 |
|
259 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
260 |
var localFile = action.LocalFile; |
261 |
var cloudFile = action.CloudFile; |
262 |
var downloadPath = (cloudFile == null) ? String.Empty |
263 |
: Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath()); |
264 |
|
265 |
try |
266 |
{ |
267 |
switch (action.Action) |
268 |
{ |
269 |
case CloudActionType.UploadUnconditional: |
270 |
UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value); |
271 |
break; |
272 |
case CloudActionType.DownloadUnconditional: |
273 |
DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath); |
274 |
break; |
275 |
case CloudActionType.DeleteCloud: |
276 |
DeleteCloudFile(cloudFile.Name); |
277 |
break; |
278 |
case CloudActionType.RenameCloud: |
279 |
RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName); |
280 |
break; |
281 |
case CloudActionType.MustSynch: |
282 |
if (File.Exists(downloadPath)) |
283 |
{ |
284 |
var cloudHash = cloudFile.Hash; |
285 |
var localHash = action.LocalHash.Value; |
286 |
var topHash = action.TopHash.Value; |
287 |
//Not enough to compare only the local hashes, also have to compare the tophashes |
288 |
if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) && |
289 |
!cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase)) |
290 |
{ |
291 |
var lastLocalTime = localFile.LastWriteTime; |
292 |
var lastUpTime = cloudFile.Last_Modified; |
293 |
if (lastUpTime <= lastLocalTime) |
294 |
{ |
295 |
//Local change while the app was down or Files in conflict |
296 |
//Maybe need to store version as well, to check who has the latest version |
297 |
|
298 |
//StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
299 |
UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value); |
300 |
} |
301 |
else |
302 |
{ |
303 |
var status = StatusKeeper.GetFileStatus(downloadPath); |
304 |
switch (status) |
305 |
{ |
306 |
case FileStatus.Unchanged: |
307 |
//It he cloud file has a later date, it was modified by another user or computer. |
308 |
//If the local file's status is Unchanged, we should go on and download the cloud file |
309 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath); |
310 |
break; |
311 |
case FileStatus.Modified: |
312 |
//If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict |
313 |
//We can't ensure that a file modified online since the last time will appear as Modified, unless we |
314 |
//index all files before we start listening. |
315 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
316 |
break; |
317 |
case FileStatus.Created: |
318 |
//If the local file is Created, it means that the local and cloud files aren't related yet have the same name |
319 |
//In this case we must mark the file as in conflict |
320 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
321 |
StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); |
322 |
break; |
323 |
default: |
324 |
//If the local file is Created, it means that the local and cloud files aren't related yet have the same name |
325 |
//In this case we must mark the file as in conflict |
326 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
327 |
StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict); |
328 |
Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name); |
329 |
break; |
330 |
} |
331 |
} |
332 |
} |
333 |
} |
334 |
else |
335 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath); |
336 |
break; |
337 |
} |
338 |
Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
339 |
} |
340 |
catch (OperationCanceledException) |
341 |
{ |
342 |
throw; |
343 |
} |
344 |
catch (Exception exc) |
345 |
{ |
346 |
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
347 |
action.Action, action.LocalFile, action.CloudFile, exc); |
348 |
|
349 |
_agent.Post(action); |
350 |
} |
351 |
|
352 |
} |
353 |
|
354 |
|
355 |
|
356 |
private void RenameCloudFile(string oldFileName, string newPath, string newFileName) |
357 |
{ |
358 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
359 |
throw new ArgumentNullException("oldFileName"); |
360 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
361 |
throw new ArgumentNullException("newPath"); |
362 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
363 |
throw new ArgumentNullException("newFileName"); |
364 |
Contract.EndContractBlock(); |
365 |
//The local file is already renamed |
366 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified); |
367 |
|
368 |
CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName); |
369 |
|
370 |
this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged); |
371 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal); |
372 |
NativeMethods.RaiseChangeNotification(newPath); |
373 |
} |
374 |
|
375 |
private void DeleteCloudFile(string fileName) |
376 |
{ |
377 |
if (String.IsNullOrWhiteSpace(fileName)) |
378 |
throw new ArgumentNullException("fileName"); |
379 |
if (Path.IsPathRooted(fileName)) |
380 |
throw new ArgumentException("The fileName should not be rooted","fileName"); |
381 |
Contract.EndContractBlock(); |
382 |
|
383 |
this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); |
384 |
|
385 |
CloudClient.MoveObject(PithosContainer, fileName, TrashContainer, fileName); |
386 |
|
387 |
this.StatusKeeper.ClearFileStatus(fileName); |
388 |
this.StatusKeeper.RemoveFileOverlayStatus(fileName); |
389 |
} |
390 |
|
391 |
//Download a file. |
392 |
private void DownloadCloudFile(string container, Uri relativeUrl, string localPath) |
393 |
{ |
394 |
if (String.IsNullOrWhiteSpace(container)) |
395 |
throw new ArgumentNullException("container"); |
396 |
if (relativeUrl==null) |
397 |
throw new ArgumentNullException("relativeUrl"); |
398 |
if (String.IsNullOrWhiteSpace(localPath)) |
399 |
throw new ArgumentNullException("localPath"); |
400 |
if (!Path.IsPathRooted(localPath)) |
401 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
402 |
Contract.EndContractBlock(); |
403 |
|
404 |
var url = relativeUrl.ToString(); |
405 |
if (url.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase)) |
406 |
return; |
407 |
|
408 |
//Are we already downloading or uploading the file? |
409 |
using (var gate=NetworkGate.Acquire(localPath, NetworkOperation.Downloading)) |
410 |
{ |
411 |
if (gate.Failed) |
412 |
return; |
413 |
//The file's hashmap will be stored in the same location with the extension .hashmap |
414 |
//var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap"); |
415 |
|
416 |
//Retrieve the hashmap from the server |
417 |
var getHashMap = CloudClient.GetHashMap(container,url); |
418 |
var downloadTask= getHashMap.ContinueWith(t => |
419 |
{ |
420 |
var serverHash=t.Result; |
421 |
//If it's a small file |
422 |
return serverHash.Hashes.Count == 1 |
423 |
//Download it in one go |
424 |
? DownloadEntireFile(container, relativeUrl, localPath) |
425 |
//Otherwise download it block by block |
426 |
: DownloadWithBlocks(container, relativeUrl, localPath, serverHash); |
427 |
}); |
428 |
|
429 |
|
430 |
|
431 |
//Retrieve the object's metadata |
432 |
var getInfo = downloadTask.ContinueWith(t => |
433 |
CloudClient.GetObjectInfo(container, url)); |
434 |
//And store it |
435 |
var storeInfo = getInfo.ContinueWith(t => |
436 |
StatusKeeper.StoreInfo(localPath, t.Result)); |
437 |
|
438 |
storeInfo.Wait(); |
439 |
StatusNotification.NotifyChangedFile(localPath); |
440 |
|
441 |
} |
442 |
} |
443 |
|
444 |
//Download a small file with a single GET operation |
445 |
private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath) |
446 |
{ |
447 |
if (String.IsNullOrWhiteSpace(container)) |
448 |
throw new ArgumentNullException("container"); |
449 |
if (relativeUrl == null) |
450 |
throw new ArgumentNullException("relativeUrl"); |
451 |
if (String.IsNullOrWhiteSpace(localPath)) |
452 |
throw new ArgumentNullException("localPath"); |
453 |
if (!Path.IsPathRooted(localPath)) |
454 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
455 |
Contract.EndContractBlock(); |
456 |
|
457 |
//Calculate the relative file path for the new file |
458 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
459 |
//The file will be stored in a temporary location while downloading with an extension .download |
460 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
461 |
//Make sure the target folder exists. DownloadFileTask will not create the folder |
462 |
var directoryPath = Path.GetDirectoryName(tempPath); |
463 |
if (!Directory.Exists(directoryPath)) |
464 |
Directory.CreateDirectory(directoryPath); |
465 |
|
466 |
//Download the object to the temporary location |
467 |
var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t => |
468 |
{ |
469 |
//And move it to its actual location once downloading is finished |
470 |
if (File.Exists(localPath)) |
471 |
File.Replace(tempPath,localPath,null,true); |
472 |
else |
473 |
File.Move(tempPath,localPath); |
474 |
}); |
475 |
return getObject; |
476 |
} |
477 |
|
478 |
public Task DownloadWithBlocks(string container,Uri relativeUrl, string localPath,TreeHash serverHash) |
479 |
{ |
480 |
if (String.IsNullOrWhiteSpace(container)) |
481 |
throw new ArgumentNullException("container"); |
482 |
if (relativeUrl == null) |
483 |
throw new ArgumentNullException("relativeUrl"); |
484 |
if (String.IsNullOrWhiteSpace(localPath)) |
485 |
throw new ArgumentNullException("localPath"); |
486 |
if (!Path.IsPathRooted(localPath)) |
487 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
488 |
if(serverHash==null) |
489 |
throw new ArgumentNullException("serverHash"); |
490 |
Contract.EndContractBlock(); |
491 |
|
492 |
//Calculate the relative file path for the new file |
493 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
494 |
//The file will be stored in a temporary location while downloading with an extension .download |
495 |
var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download"); |
496 |
var directoryPath = Path.GetDirectoryName(tempPath); |
497 |
if (!Directory.Exists(directoryPath)) |
498 |
Directory.CreateDirectory(directoryPath); |
499 |
|
500 |
//If the local file exists we should make a copy of it to the |
501 |
//fragments folder, unless a newer temp copy already exists, which |
502 |
//means there is an interrupted download |
503 |
if (ShouldCopy(localPath, tempPath)) |
504 |
File.Copy(localPath, tempPath, true); |
505 |
|
506 |
//Set the size of the file to the size specified in the treehash |
507 |
//This will also create an empty file if the file doesn't exist |
508 |
SetFileSize(tempPath, serverHash.Bytes); |
509 |
|
510 |
return Task.Factory.StartNew(() => |
511 |
{ |
512 |
//Calculate the temp file's treehash |
513 |
var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result; |
514 |
|
515 |
//And compare it with the server's hash |
516 |
var upHashes = serverHash.GetHashesAsStrings(); |
517 |
var localHashes = treeHash.HashDictionary; |
518 |
for (int i = 0; i < upHashes.Length; i++) |
519 |
{ |
520 |
//For every non-matching hash |
521 |
if (!localHashes.ContainsKey(upHashes[i])) |
522 |
{ |
523 |
Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath); |
524 |
var start = i*BlockSize; |
525 |
long? end = null; |
526 |
if (i < upHashes.Length - 1 ) |
527 |
end= ((i + 1)*BlockSize) ; |
528 |
|
529 |
//Get its block |
530 |
var blockTask = CloudClient.GetBlock(container, relativeUrl, |
531 |
start, end); |
532 |
|
533 |
blockTask.ContinueWith(b => |
534 |
{ |
535 |
//And apply it to the temp file |
536 |
var buffer = b.Result; |
537 |
var stream =FileAsync.OpenWrite(tempPath); |
538 |
stream.Seek(start,SeekOrigin.Begin); |
539 |
return stream.WriteAsync(buffer, 0, buffer.Length) |
540 |
.ContinueWith(s => stream.Close()); |
541 |
|
542 |
}).Unwrap() |
543 |
.Wait(); |
544 |
Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
545 |
} |
546 |
|
547 |
} |
548 |
|
549 |
|
550 |
//Replace the existing file with the temp |
551 |
if (File.Exists(localPath)) |
552 |
File.Replace(tempPath, localPath, null, true); |
553 |
else |
554 |
File.Move(tempPath, localPath); |
555 |
Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath); |
556 |
}); |
557 |
} |
558 |
|
559 |
//Change the file's size, possibly truncating or adding to it |
560 |
private static void SetFileSize(string filePath, long fileSize) |
561 |
{ |
562 |
if (String.IsNullOrWhiteSpace(filePath)) |
563 |
throw new ArgumentNullException("filePath"); |
564 |
if (!Path.IsPathRooted(filePath)) |
565 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
566 |
if (fileSize<0) |
567 |
throw new ArgumentOutOfRangeException("fileSize"); |
568 |
Contract.EndContractBlock(); |
569 |
|
570 |
using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write)) |
571 |
{ |
572 |
stream.SetLength(fileSize); |
573 |
} |
574 |
} |
575 |
|
576 |
//Check whether we should copy the local file to a temp path |
577 |
private static bool ShouldCopy(string localPath, string tempPath) |
578 |
{ |
579 |
//No need to copy if there is no file |
580 |
if (!File.Exists(localPath)) |
581 |
return false; |
582 |
|
583 |
//If there is no temp file, go ahead and copy |
584 |
if (!File.Exists(tempPath)) |
585 |
return true; |
586 |
|
587 |
//If there is a temp file and is newer than the actual file, don't copy |
588 |
var localLastWrite = File.GetLastWriteTime(localPath); |
589 |
var tempLastWrite = File.GetLastWriteTime(tempPath); |
590 |
|
591 |
//This could mean there is an interrupted download in progress |
592 |
return (tempLastWrite < localLastWrite); |
593 |
} |
594 |
|
595 |
private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash) |
596 |
{ |
597 |
if (fileInfo==null) |
598 |
throw new ArgumentNullException("fileInfo"); |
599 |
if (String.IsNullOrWhiteSpace(hash)) |
600 |
throw new ArgumentNullException("hash"); |
601 |
Contract.EndContractBlock(); |
602 |
|
603 |
if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase)) |
604 |
return; |
605 |
|
606 |
var url = fileInfo.AsRelativeUrlTo(FileAgent.RootPath); |
607 |
|
608 |
var fullFileName = fileInfo.FullName; |
609 |
using(var gate=NetworkGate.Acquire(fullFileName,NetworkOperation.Uploading)) |
610 |
{ |
611 |
//Abort if the file is already being uploaded or downloaded |
612 |
if (gate.Failed) |
613 |
return; |
614 |
|
615 |
|
616 |
//Even if GetObjectInfo times out, we can proceed with the upload |
617 |
var info = CloudClient.GetObjectInfo(PithosContainer, url); |
618 |
|
619 |
//If the file hashes match, abort the upload |
620 |
if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) || |
621 |
topHash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase)) |
622 |
{ |
623 |
//but store any metadata changes |
624 |
this.StatusKeeper.StoreInfo(fullFileName, info); |
625 |
Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName); |
626 |
return; |
627 |
} |
628 |
|
629 |
//Mark the file as modified while we upload it |
630 |
var setStatus = Task.Factory.StartNew(() => |
631 |
StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified)); |
632 |
//And then upload it |
633 |
|
634 |
//If the file is larger than the block size, try a hashmap PUT |
635 |
if (fileInfo.Length > BlockSize ) |
636 |
{ |
637 |
//To upload using a hashmap |
638 |
//First, calculate the tree hash |
639 |
var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash); |
640 |
|
641 |
var putHashMap = setStatus.ContinueWith(t=> |
642 |
UploadWithHashMap(fileInfo,url,treeHash)); |
643 |
|
644 |
putHashMap.Wait(); |
645 |
} |
646 |
else |
647 |
{ |
648 |
//Otherwise do a regular PUT |
649 |
var put = setStatus.ContinueWith(t => |
650 |
CloudClient.PutObject(PithosContainer,url,fullFileName,hash)); |
651 |
put.Wait(); |
652 |
} |
653 |
//If everything succeeds, change the file and overlay status to normal |
654 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
655 |
} |
656 |
//Notify the Shell to update the overlays |
657 |
NativeMethods.RaiseChangeNotification(fullFileName); |
658 |
StatusNotification.NotifyChangedFile(fullFileName); |
659 |
} |
660 |
|
661 |
public void UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash) |
662 |
{ |
663 |
var fullFileName = fileInfo.FullName; |
664 |
|
665 |
//Send the hashmap to the server |
666 |
var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result); |
667 |
var missingHashes = hashPut.Result; |
668 |
if (missingHashes.Count == 0) |
669 |
return; |
670 |
|
671 |
var buffer = new byte[BlockSize]; |
672 |
foreach (var missingHash in missingHashes) |
673 |
{ |
674 |
int blockIndex = -1; |
675 |
try |
676 |
{ |
677 |
//Find the proper block |
678 |
blockIndex = treeHash.Result.HashDictionary[missingHash]; |
679 |
var offset = blockIndex*BlockSize; |
680 |
|
681 |
var read = fileInfo.Read(buffer, offset, BlockSize); |
682 |
if (read > 0) |
683 |
{ |
684 |
//Copy the actual block data out of the buffer |
685 |
var data = new byte[read]; |
686 |
Buffer.BlockCopy(buffer, 0, data, 0, read); |
687 |
|
688 |
//And POST them |
689 |
CloudClient.PostBlock(PithosContainer, data).Wait(); |
690 |
Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex, |
691 |
fullFileName); |
692 |
} |
693 |
} |
694 |
catch (Exception exc) |
695 |
{ |
696 |
Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc); |
697 |
} |
698 |
} |
699 |
|
700 |
UploadWithHashMap(fileInfo, url, treeHash); |
701 |
|
702 |
} |
703 |
|
704 |
|
705 |
} |
706 |
|
707 |
|
708 |
|
709 |
|
710 |
} |