\r
var tuples = MergeSources(infos, files, states,moves).ToList();\r
\r
+ var processedPaths = new HashSet<string>();\r
//Process only the changes in the batch file, if one exists\r
var stateTuples = accountBatch==null?tuples:tuples.Where(t => accountBatch.Contains(t.FilePath));\r
foreach (var tuple in stateTuples.Where(s=>!s.Locked))\r
//Set the Merkle Hash\r
//SetMerkleHash(accountInfo, tuple);\r
\r
- await SyncSingleItem(accountInfo, tuple, agent, moves,token).ConfigureAwait(false);\r
+ await SyncSingleItem(accountInfo, tuple, agent, moves,processedPaths,token).ConfigureAwait(false);\r
\r
}\r
\r
Pause = false;\r
}\r
\r
- private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves, CancellationToken token)\r
+ private async Task SyncSingleItem(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths, CancellationToken token)\r
{\r
Log.DebugFormat("Sync [{0}] C:[{1}] L:[{2}] S:[{3}]", tuple.FilePath, tuple.C, tuple.L, tuple.S);\r
\r
+ //If the processed paths already contain the current path, exit\r
+ if (!processedPaths.Add(tuple.FilePath))\r
+ return;\r
+\r
try\r
{\r
bool isInferredParent = tuple.ObjectInfo != null && tuple.ObjectInfo.UUID.StartsWith("00000000-0000-0000");\r
//to avoid an expensive hash\r
if (!MoveForLocalMove(accountInfo, tuple))\r
{\r
- await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo, progress).ConfigureAwait(false);\r
+ await UploadLocalFile(accountInfo, tuple, token, isCreation, localInfo,processedPaths, progress).ConfigureAwait(false);\r
}\r
\r
//updateRecord( S = C )\r
\r
if (isCreation )\r
{ \r
- ProcessChildren(accountInfo, tuple, agent, moves,token);\r
+ ProcessChildren(accountInfo, tuple, agent, moves,processedPaths,token);\r
}\r
}\r
}\r
{\r
//In case of error log and retry with the next poll\r
Log.ErrorFormat("[SYNC] Failed for file {0}. Will Retry.\r\n{1}",tuple.FilePath,exc);\r
-\r
- \r
}\r
}\r
\r
}\r
\r
private async Task UploadLocalFile(AccountInfo accountInfo, StateTuple tuple, CancellationToken token,\r
- bool isUnselectedRootFolder, FileSystemInfo localInfo, Progress<double> progress)\r
+ bool isUnselectedRootFolder, FileSystemInfo localInfo, HashSet<string> processedPaths, Progress<double> progress)\r
{\r
var action = new CloudUploadAction(accountInfo, localInfo, tuple.FileState,\r
accountInfo.BlockSize, accountInfo.BlockHash,\r
\r
if (isUnselectedRootFolder)\r
{\r
- var dirActions =\r
+ var dirActions =(\r
from dir in ((DirectoryInfo) localInfo).EnumerateDirectories("*", SearchOption.AllDirectories)\r
let subAction = new CloudUploadAction(accountInfo, dir, null,\r
accountInfo.BlockSize, accountInfo.BlockHash,\r
"Poll", true, token, progress)\r
- select NetworkAgent.Uploader.UploadCloudFile(subAction, token);\r
- await TaskEx.WhenAll(dirActions.ToArray());\r
+ select subAction).ToList();\r
+ foreach (var dirAction in dirActions)\r
+ {\r
+ processedPaths.Add(dirAction.LocalFile.FullName);\r
+ }\r
+ \r
+ await TaskEx.WhenAll(dirActions.Select(a=>NetworkAgent.Uploader.UploadCloudFile(a,token)).ToArray());\r
}\r
}\r
\r
}\r
}\r
\r
- private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,CancellationToken token)\r
+ private void ProcessChildren(AccountInfo accountInfo, StateTuple tuple, FileAgent agent, ConcurrentDictionary<string, MovedEventArgs> moves,HashSet<string> processedPaths,CancellationToken token)\r
{\r
\r
var dirInfo = tuple.FileInfo as DirectoryInfo;\r
select new StateTuple(file){C=file.ComputeShortHash(this.StatusNotification)};\r
\r
//Process folders first, to ensure folders appear on the sever as soon as possible\r
- folderTuples.ApplyAction(t =>SyncSingleItem(accountInfo, t, agent, moves, token).Wait());\r
+ folderTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, processedPaths,token).Wait());\r
\r
- fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves, token).Wait());\r
+ fileTuples.ApplyAction(t => SyncSingleItem(accountInfo, t, agent, moves,processedPaths, token).Wait());\r
}\r
\r
\r
public class Selectives\r
{\r
\r
- public ConcurrentDictionary<Uri, List<Uri>> SelectiveUris { get; private set; }\r
+ public ConcurrentDictionary<Uri, ConcurrentBag<Uri>> SelectiveUris { get; private set; }\r
\r
- public ConcurrentDictionary<Uri, List<string>> SelectivePaths { get; private set; }\r
+ public ConcurrentDictionary<Uri, ConcurrentBag<string>> SelectivePaths { get; private set; }\r
\r
[Import]\r
public IPithosSettings Settings { get; set; }\r
\r
public Selectives()\r
{\r
- SelectiveUris = new ConcurrentDictionary<Uri, List<Uri>>();\r
- SelectivePaths = new ConcurrentDictionary<Uri, List<string>>();\r
+ SelectiveUris = new ConcurrentDictionary<Uri, ConcurrentBag<Uri>>();\r
+ SelectivePaths = new ConcurrentDictionary<Uri, ConcurrentBag<string>>();\r
}\r
\r
readonly Dictionary<Uri, bool> _selectiveEnabled = new Dictionary<Uri, bool>();\r
\r
public void SetSelectedUris(AccountInfo account,List<Uri> uris)\r
{\r
- SelectiveUris[account.AccountKey] = uris;\r
- SelectivePaths[account.AccountKey] = UrisToFilePaths(account,uris);\r
+ SelectiveUris[account.AccountKey] = new ConcurrentBag<Uri>(uris);\r
+ SelectivePaths[account.AccountKey] = new ConcurrentBag<string>(UrisToFilePaths(account,uris));\r
}\r
\r
public void AddUri(AccountInfo account,Uri uri)\r
if (!selectiveEnabled)\r
return !isShared;\r
\r
- List<Uri> filterUris;\r
+ ConcurrentBag<Uri> filterUris;\r
return SelectiveUris.TryGetValue(account.AccountKey, out filterUris) \r
/*|| filterUris.Count ==0*/\r
&& filterUris.Any(f => info.Uri.IsAtOrDirectlyBelow(f));\r
if (!selectiveEnabled)\r
return !isShared;\r
\r
- List<string> paths;\r
+ ConcurrentBag<string> paths;\r
var hasSelectives = SelectivePaths.TryGetValue(account.AccountKey, out paths);\r
var isSelected = hasSelectives && paths.Any(fullPath.IsAtOrDirectlyBelow);\r
return isSelected;\r