Revision f3d080df trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
1 |
using System; |
|
1 |
// ----------------------------------------------------------------------- |
|
2 |
// <copyright file="NetworkAgent.cs" company="GRNET"> |
|
3 |
// Copyright 2011 GRNET S.A. All rights reserved. |
|
4 |
// |
|
5 |
// Redistribution and use in source and binary forms, with or |
|
6 |
// without modification, are permitted provided that the following |
|
7 |
// conditions are met: |
|
8 |
// |
|
9 |
// 1. Redistributions of source code must retain the above |
|
10 |
// copyright notice, this list of conditions and the following |
|
11 |
// disclaimer. |
|
12 |
// |
|
13 |
// 2. Redistributions in binary form must reproduce the above |
|
14 |
// copyright notice, this list of conditions and the following |
|
15 |
// disclaimer in the documentation and/or other materials |
|
16 |
// provided with the distribution. |
|
17 |
// |
|
18 |
// THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
19 |
// OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
20 |
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
21 |
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
22 |
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
23 |
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
24 |
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
25 |
// USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
26 |
// AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
27 |
// LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
28 |
// ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
29 |
// POSSIBILITY OF SUCH DAMAGE. |
|
30 |
// |
|
31 |
// The views and conclusions contained in the software and |
|
32 |
// documentation are those of the authors and should not be |
|
33 |
// interpreted as representing official policies, either expressed |
|
34 |
// or implied, of GRNET S.A. |
|
35 |
// </copyright> |
|
36 |
// ----------------------------------------------------------------------- |
|
37 |
|
|
38 |
using System; |
|
2 | 39 |
using System.Collections.Concurrent; |
3 | 40 |
using System.Collections.Generic; |
4 | 41 |
using System.ComponentModel.Composition; |
... | ... | |
14 | 51 |
|
15 | 52 |
namespace Pithos.Core.Agents |
16 | 53 |
{ |
54 |
//TODO: Ensure all network operations use exact casing. Pithos is case sensitive |
|
17 | 55 |
[Export] |
18 | 56 |
public class NetworkAgent |
19 | 57 |
{ |
20 | 58 |
private Agent<CloudAction> _agent; |
21 | 59 |
|
60 |
//A separate agent is used to execute delete actions immediatelly; |
|
61 |
private Agent<CloudDeleteAction> _deleteAgent; |
|
62 |
|
|
22 | 63 |
|
23 | 64 |
[Import] |
24 | 65 |
public IStatusKeeper StatusKeeper { get; set; } |
... | ... | |
42 | 83 |
inbox.LoopAsync(process, loop); |
43 | 84 |
}; |
44 | 85 |
loop(); |
45 |
}); |
|
86 |
}); |
|
87 |
|
|
88 |
_deleteAgent = Agent<CloudDeleteAction>.Start(inbox => |
|
89 |
{ |
|
90 |
Action loop = null; |
|
91 |
loop = () => |
|
92 |
{ |
|
93 |
var message = inbox.Receive(); |
|
94 |
var process = message.Then(ProcessDelete,inbox.CancellationToken); |
|
95 |
inbox.LoopAsync(process, loop); |
|
96 |
}; |
|
97 |
loop(); |
|
98 |
|
|
99 |
}); |
|
46 | 100 |
} |
47 | 101 |
|
48 | 102 |
private async Task Process(CloudAction action) |
... | ... | |
75 | 129 |
await DownloadCloudFile(accountInfo, cloudFile, downloadPath); |
76 | 130 |
break; |
77 | 131 |
case CloudActionType.DeleteCloud: |
78 |
DeleteCloudFile(accountInfo, cloudFile); |
|
132 |
//Redirect deletes to the delete agent |
|
133 |
_deleteAgent.Post((CloudDeleteAction)action); |
|
79 | 134 |
break; |
80 | 135 |
case CloudActionType.RenameCloud: |
81 | 136 |
var moveAction = (CloudMoveAction)action; |
... | ... | |
127 | 182 |
} |
128 | 183 |
} |
129 | 184 |
|
185 |
/// <summary> |
|
186 |
/// Processes cloud delete actions |
|
187 |
/// </summary> |
|
188 |
/// <param name="action">The delete action to execute</param> |
|
189 |
/// <returns></returns> |
|
190 |
/// <remarks> |
|
191 |
/// When a file/folder is deleted locally, we must delete it ASAP from the server and block any download |
|
192 |
/// operations that may be in progress. |
|
193 |
/// <para> |
|
194 |
/// A separate agent is used to process deletes because the main agent may be busy with a long operation. |
|
195 |
/// </para> |
|
196 |
/// </remarks> |
|
197 |
private async Task ProcessDelete(CloudDeleteAction action) |
|
198 |
{ |
|
199 |
if (action == null) |
|
200 |
throw new ArgumentNullException("action"); |
|
201 |
if (action.AccountInfo==null) |
|
202 |
throw new ArgumentException("The action.AccountInfo is empty","action"); |
|
203 |
Contract.EndContractBlock(); |
|
204 |
|
|
205 |
var accountInfo = action.AccountInfo; |
|
206 |
|
|
207 |
using (log4net.ThreadContext.Stacks["NETWORK"].Push("PROCESS")) |
|
208 |
{ |
|
209 |
Log.InfoFormat("[ACTION] Start Processing {0}", action); |
|
210 |
|
|
211 |
var cloudFile = action.CloudFile; |
|
212 |
|
|
213 |
try |
|
214 |
{ |
|
215 |
//Acquire a lock on the deleted file to prevent uploading/downloading operations from the normal |
|
216 |
//agent |
|
217 |
using (var gate = NetworkGate.Acquire(action.LocalFile.FullName, NetworkOperation.Deleting)) |
|
218 |
{ |
|
219 |
// Remove any related actions from the normal agent |
|
220 |
_agent.Remove(queuedAction => |
|
221 |
queuedAction.CloudFile.Container == action.CloudFile.Container && |
|
222 |
queuedAction.CloudFile.Name == action.CloudFile.Name); |
|
223 |
// and then delete the file from the server |
|
224 |
DeleteCloudFile(accountInfo, cloudFile); |
|
225 |
Log.InfoFormat("[ACTION] End Delete {0}:{1}->{2}", action.Action, action.LocalFile, |
|
226 |
action.CloudFile.Name); |
|
227 |
} |
|
228 |
} |
|
229 |
catch (WebException exc) |
|
230 |
{ |
|
231 |
Log.ErrorFormat("[WEB ERROR] {0} : {1} -> {2} due to exception\r\n{3}", action.Action, action.LocalFile, action.CloudFile, exc); |
|
232 |
} |
|
233 |
catch (OperationCanceledException) |
|
234 |
{ |
|
235 |
throw; |
|
236 |
} |
|
237 |
catch (DirectoryNotFoundException) |
|
238 |
{ |
|
239 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the directory was not found.\n Rescheduling a delete", |
|
240 |
action.Action, action.LocalFile, action.CloudFile); |
|
241 |
//Repost a delete action for the missing file |
|
242 |
_deleteAgent.Post(action); |
|
243 |
} |
|
244 |
catch (FileNotFoundException) |
|
245 |
{ |
|
246 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", |
|
247 |
action.Action, action.LocalFile, action.CloudFile); |
|
248 |
//Post a delete action for the missing file |
|
249 |
_deleteAgent.Post(action); |
|
250 |
} |
|
251 |
catch (Exception exc) |
|
252 |
{ |
|
253 |
Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
|
254 |
action.Action, action.LocalFile, action.CloudFile, exc); |
|
255 |
|
|
256 |
_deleteAgent.Post(action); |
|
257 |
} |
|
258 |
} |
|
259 |
} |
|
260 |
|
|
130 | 261 |
private async Task SyncFiles(AccountInfo accountInfo,CloudAction action) |
131 | 262 |
{ |
132 | 263 |
if (accountInfo == null) |
... | ... | |
143 | 274 |
|
144 | 275 |
var localFile = action.LocalFile; |
145 | 276 |
var cloudFile = action.CloudFile; |
146 |
var downloadPath=action.LocalFile.FullName.ToLower();
|
|
277 |
var downloadPath=action.LocalFile.GetProperCapitalization();
|
|
147 | 278 |
|
148 | 279 |
var cloudHash = cloudFile.Hash.ToLower(); |
149 | 280 |
var localHash = action.LocalHash.Value.ToLower(); |
... | ... | |
219 | 350 |
if (cloudAction.AccountInfo==null) |
220 | 351 |
throw new ArgumentException("The CloudAction.AccountInfo is empty","cloudAction"); |
221 | 352 |
Contract.EndContractBlock(); |
222 |
|
|
353 |
|
|
223 | 354 |
//If the action targets a local file, add a treehash calculation |
224 | 355 |
if (cloudAction.LocalFile as FileInfo != null) |
225 | 356 |
{ |
... | ... | |
241 | 372 |
//The hash for a directory is the empty string |
242 | 373 |
cloudAction.TopHash = new Lazy<string>(() => String.Empty); |
243 | 374 |
} |
244 |
_agent.Post(cloudAction); |
|
375 |
|
|
376 |
if (cloudAction is CloudDeleteAction) |
|
377 |
_deleteAgent.Post((CloudDeleteAction)cloudAction); |
|
378 |
else |
|
379 |
_agent.Post(cloudAction); |
|
245 | 380 |
} |
246 | 381 |
|
247 | 382 |
/* class ObjectInfoByNameComparer:IEqualityComparer<ObjectInfo> |
... | ... | |
410 | 545 |
if (fileAgent.Exists(relativePath)) |
411 | 546 |
{ |
412 | 547 |
//If a directory object already exists, we don't need to perform any other action |
413 |
var localFile = fileAgent.GetFileInfo(relativePath); |
|
548 |
var localFile = fileAgent.GetFileSystemInfo(relativePath);
|
|
414 | 549 |
if (objectInfo.Content_Type == @"application/directory" && localFile is DirectoryInfo) |
415 | 550 |
continue; |
416 | 551 |
var state = FileState.FindByFilePath(localFile.FullName); |
... | ... | |
470 | 605 |
throw new ArgumentException("OldCloudFile","action"); |
471 | 606 |
Contract.EndContractBlock(); |
472 | 607 |
|
473 |
var newFilePath = action.LocalFile.FullName; |
|
608 |
|
|
609 |
var newFilePath = action.LocalFile.FullName; |
|
610 |
|
|
611 |
//How do we handle concurrent renames and deletes/uploads/downloads? |
|
612 |
//* A conflicting upload means that a file was renamed before it had a chance to finish uploading |
|
613 |
// This should never happen as the network agent executes only one action at a time |
|
614 |
//* A conflicting download means that the file was modified on the cloud. While we can go on and complete |
|
615 |
// the rename, there may be a problem if the file is downloaded in blocks, as subsequent block requests for the |
|
616 |
// same name will fail. |
|
617 |
// This should never happen as the network agent executes only one action at a time. |
|
618 |
//* A conflicting delete can happen if the rename was followed by a delete action that didn't have the chance |
|
619 |
// to remove the rename from the queue. |
|
620 |
// We can probably ignore this case. It will result in an error which should be ignored |
|
621 |
|
|
622 |
|
|
474 | 623 |
//The local file is already renamed |
475 | 624 |
StatusKeeper.SetFileOverlayStatus(newFilePath, FileOverlayStatus.Modified); |
476 | 625 |
|
... | ... | |
479 | 628 |
var container = action.CloudFile.Container; |
480 | 629 |
|
481 | 630 |
var client = new CloudFilesClient(accountInfo); |
631 |
//TODO: What code is returned when the source file doesn't exist? |
|
482 | 632 |
client.MoveObject(account, container, action.OldCloudFile.Name, container, action.CloudFile.Name); |
483 | 633 |
|
484 | 634 |
StatusKeeper.SetFileStatus(newFilePath, FileStatus.Unchanged); |
... | ... | |
502 | 652 |
using ( log4net.ThreadContext.Stacks["DeleteCloudFile"].Push("Delete")) |
503 | 653 |
{ |
504 | 654 |
var fileName= cloudFile.RelativeUrlToFilePath(accountInfo.UserName); |
505 |
var info = fileAgent.GetFileInfo(fileName); |
|
655 |
var info = fileAgent.GetFileSystemInfo(fileName);
|
|
506 | 656 |
var fullPath = info.FullName.ToLower(); |
507 | 657 |
|
508 | 658 |
StatusKeeper.SetFileOverlayStatus(fullPath, FileOverlayStatus.Modified); |
... | ... | |
518 | 668 |
} |
519 | 669 |
|
520 | 670 |
//Download a file. |
521 |
private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string localPath)
|
|
671 |
private async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile , string filePath)
|
|
522 | 672 |
{ |
523 | 673 |
if (accountInfo == null) |
524 | 674 |
throw new ArgumentNullException("accountInfo"); |
... | ... | |
528 | 678 |
throw new ArgumentNullException("cloudFile"); |
529 | 679 |
if (String.IsNullOrWhiteSpace(cloudFile.Container)) |
530 | 680 |
throw new ArgumentNullException("cloudFile"); |
531 |
if (String.IsNullOrWhiteSpace(localPath))
|
|
532 |
throw new ArgumentNullException("localPath");
|
|
533 |
if (!Path.IsPathRooted(localPath))
|
|
534 |
throw new ArgumentException("The localPath must be rooted", "localPath");
|
|
681 |
if (String.IsNullOrWhiteSpace(filePath))
|
|
682 |
throw new ArgumentNullException("filePath");
|
|
683 |
if (!Path.IsPathRooted(filePath))
|
|
684 |
throw new ArgumentException("The filePath must be rooted", "filePath");
|
|
535 | 685 |
Contract.EndContractBlock(); |
536 |
|
|
686 |
|
|
687 |
var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
|
537 | 688 |
var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative); |
538 | 689 |
|
539 | 690 |
var url = relativeUrl.ToString(); |
... | ... | |
584 | 735 |
} |
585 | 736 |
|
586 | 737 |
//Download a small file with a single GET operation |
587 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath,TreeHash serverHash)
|
|
738 |
private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath,TreeHash serverHash)
|
|
588 | 739 |
{ |
589 | 740 |
if (client == null) |
590 | 741 |
throw new ArgumentNullException("client"); |
... | ... | |
592 | 743 |
throw new ArgumentNullException("cloudFile"); |
593 | 744 |
if (relativeUrl == null) |
594 | 745 |
throw new ArgumentNullException("relativeUrl"); |
595 |
if (String.IsNullOrWhiteSpace(localPath))
|
|
596 |
throw new ArgumentNullException("localPath");
|
|
597 |
if (!Path.IsPathRooted(localPath))
|
|
598 |
throw new ArgumentException("The localPath must be rooted", "localPath");
|
|
746 |
if (String.IsNullOrWhiteSpace(filePath))
|
|
747 |
throw new ArgumentNullException("filePath");
|
|
748 |
if (!Path.IsPathRooted(filePath))
|
|
749 |
throw new ArgumentException("The localPath must be rooted", "filePath");
|
|
599 | 750 |
Contract.EndContractBlock(); |
600 | 751 |
|
752 |
var localPath = Pithos.Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
|
601 | 753 |
//If the file already exists |
602 | 754 |
if (File.Exists(localPath)) |
603 | 755 |
{ |
... | ... | |
624 | 776 |
Directory.CreateDirectory(tempFolder); |
625 | 777 |
|
626 | 778 |
//Download the object to the temporary location |
627 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath).ContinueWith(t => |
|
628 |
{ |
|
629 |
t.PropagateExceptions(); |
|
630 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
|
631 |
var localFolder = Path.GetDirectoryName(localPath); |
|
632 |
if (!Directory.Exists(localFolder)) |
|
633 |
Directory.CreateDirectory(localFolder); |
|
634 |
//And move it to its actual location once downloading is finished |
|
635 |
if (File.Exists(localPath)) |
|
636 |
File.Replace(tempPath,localPath,null,true); |
|
637 |
else |
|
638 |
File.Move(tempPath,localPath); |
|
639 |
//Notify listeners that a local file has changed |
|
640 |
StatusNotification.NotifyChangedFile(localPath); |
|
779 |
await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath); |
|
641 | 780 |
|
642 |
}); |
|
781 |
//Create the local folder if it doesn't exist (necessary for shared objects) |
|
782 |
var localFolder = Path.GetDirectoryName(localPath); |
|
783 |
if (!Directory.Exists(localFolder)) |
|
784 |
Directory.CreateDirectory(localFolder); |
|
785 |
//And move it to its actual location once downloading is finished |
|
786 |
if (File.Exists(localPath)) |
|
787 |
File.Replace(tempPath,localPath,null,true); |
|
788 |
else |
|
789 |
File.Move(tempPath,localPath); |
|
790 |
//Notify listeners that a local file has changed |
|
791 |
StatusNotification.NotifyChangedFile(localPath); |
|
792 |
|
|
793 |
|
|
643 | 794 |
} |
644 | 795 |
|
645 | 796 |
//Download a file asynchronously using blocks |
646 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string localPath, TreeHash serverHash)
|
|
797 |
public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
|
|
647 | 798 |
{ |
648 | 799 |
if (client == null) |
649 | 800 |
throw new ArgumentNullException("client"); |
... | ... | |
651 | 802 |
throw new ArgumentNullException("cloudFile"); |
652 | 803 |
if (relativeUrl == null) |
653 | 804 |
throw new ArgumentNullException("relativeUrl"); |
654 |
if (String.IsNullOrWhiteSpace(localPath))
|
|
655 |
throw new ArgumentNullException("localPath");
|
|
656 |
if (!Path.IsPathRooted(localPath))
|
|
657 |
throw new ArgumentException("The localPath must be rooted", "localPath");
|
|
805 |
if (String.IsNullOrWhiteSpace(filePath))
|
|
806 |
throw new ArgumentNullException("filePath");
|
|
807 |
if (!Path.IsPathRooted(filePath))
|
|
808 |
throw new ArgumentException("The filePath must be rooted", "filePath");
|
|
658 | 809 |
if (serverHash == null) |
659 | 810 |
throw new ArgumentNullException("serverHash"); |
660 | 811 |
Contract.EndContractBlock(); |
661 | 812 |
|
662 | 813 |
var fileAgent = GetFileAgent(accountInfo); |
814 |
var localPath = Interfaces.FileInfoExtensions.GetProperFilePathCapitalization(filePath); |
|
663 | 815 |
|
664 | 816 |
//Calculate the relative file path for the new file |
665 | 817 |
var relativePath = relativeUrl.RelativeUriToFilePath(); |
... | ... | |
751 | 903 |
} |
752 | 904 |
|
753 | 905 |
|
754 |
var fullFileName = fileInfo.FullName;
|
|
906 |
var fullFileName = fileInfo.GetProperCapitalization();
|
|
755 | 907 |
using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading)) |
756 | 908 |
{ |
757 | 909 |
//Abort if the file is already being uploaded or downloaded |
... | ... | |
768 | 920 |
//If this is a read-only file, do not upload changes |
769 | 921 |
if (info.AllowedTo == "read") |
770 | 922 |
return; |
771 |
|
|
772 |
//WRONG: If this is a directory, there is no hash to check. ???? |
|
773 |
//TODO: Check how a directory hash is calculated |
|
923 |
|
|
924 |
//TODO: Check how a directory hash is calculated -> All dirs seem to have the same hash |
|
774 | 925 |
if (fileInfo is DirectoryInfo) |
775 | 926 |
{ |
776 | 927 |
//If the directory doesn't exist the Hash property will be empty |
777 | 928 |
if (String.IsNullOrWhiteSpace(info.Hash)) |
778 | 929 |
//Go on and create the directory |
779 |
client.PutObject(account, cloudFile.Container, cloudFile.Name, fileInfo.FullName, String.Empty, "application/directory");
|
|
930 |
client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName, String.Empty, "application/directory");
|
|
780 | 931 |
} |
781 | 932 |
else |
782 | 933 |
{ |
... | ... | |
804 | 955 |
//the relevant block |
805 | 956 |
|
806 | 957 |
//First, calculate the tree hash |
807 |
var treeHash = await Signature.CalculateTreeHashAsync(fileInfo.FullName, accountInfo.BlockSize,
|
|
958 |
var treeHash = await Signature.CalculateTreeHashAsync(fullFileName, accountInfo.BlockSize,
|
|
808 | 959 |
accountInfo.BlockHash); |
809 | 960 |
|
810 | 961 |
await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash); |
... | ... | |
871 | 1022 |
throw new ArgumentException("Invalid container","cloudFile"); |
872 | 1023 |
Contract.EndContractBlock(); |
873 | 1024 |
|
874 |
var fullFileName = fileInfo.FullName;
|
|
1025 |
var fullFileName = fileInfo.GetProperCapitalization();
|
|
875 | 1026 |
|
876 | 1027 |
var account = cloudFile.Account ?? accountInfo.UserName; |
877 | 1028 |
var container = cloudFile.Container ; |
Also available in: Unified diff