Revision cfed7823 trunk/Pithos.Core/Agents/NetworkAgent.cs
b/trunk/Pithos.Core/Agents/NetworkAgent.cs | ||
---|---|---|
11 | 11 |
using System.Threading.Tasks; |
12 | 12 |
using Pithos.Interfaces; |
13 | 13 |
using Pithos.Network; |
14 |
using log4net; |
|
14 | 15 |
|
15 | 16 |
namespace Pithos.Core.Agents |
16 | 17 |
{ |
... | ... | |
37 | 38 |
|
38 | 39 |
public string PithosContainer { get; set; } |
39 | 40 |
public string TrashContainer { get; private set; } |
41 |
public IList<string> Containers { get; private set; } |
|
40 | 42 |
|
41 | 43 |
public int BlockSize { get; set; } |
42 | 44 |
public string BlockHash { get; set; } |
43 | 45 |
|
46 |
private static readonly ILog Log = LogManager.GetLogger(typeof(NetworkAgent)); |
|
47 |
|
|
44 | 48 |
|
45 | 49 |
public void Start(string pithosContainer, string trashContainer, int blockSize, string blockHash) |
46 | 50 |
{ |
... | ... | |
75 | 79 |
throw new ArgumentNullException("action"); |
76 | 80 |
Contract.EndContractBlock(); |
77 | 81 |
|
78 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
|
82 |
using (log4net.LogicalThreadContext.Stacks["NETWORK"].Push("PROCESS")) |
|
83 |
{ |
|
84 |
Log.InfoFormat("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, |
|
85 |
action.CloudFile.Name); |
|
86 |
|
|
87 |
var localFile = action.LocalFile; |
|
88 |
var cloudFile = action.CloudFile; |
|
89 |
var downloadPath = (cloudFile == null) |
|
90 |
? String.Empty |
|
91 |
: Path.Combine(FileAgent.RootPath, cloudFile.RelativeUrlToFilePath(CloudClient.UserName)); |
|
92 |
|
|
93 |
try |
|
94 |
{ |
|
95 |
var account = action.CloudFile.Account ?? CloudClient.UserName; |
|
96 |
var container = action.CloudFile.Container ?? PithosContainer; |
|
97 |
|
|
98 |
switch (action.Action) |
|
99 |
{ |
|
100 |
case CloudActionType.UploadUnconditional: |
|
101 |
UploadCloudFile(account, container, localFile, action.LocalHash.Value, action.TopHash.Value); |
|
102 |
break; |
|
103 |
case CloudActionType.DownloadUnconditional: |
|
104 |
|
|
105 |
DownloadCloudFile(account, container, new Uri(cloudFile.Name, UriKind.Relative), |
|
106 |
downloadPath); |
|
107 |
break; |
|
108 |
case CloudActionType.DeleteCloud: |
|
109 |
DeleteCloudFile(account, container, cloudFile.Name); |
|
110 |
break; |
|
111 |
case CloudActionType.RenameCloud: |
|
112 |
var moveAction = (CloudMoveAction)action; |
|
113 |
RenameCloudFile(account, container, moveAction.OldFileName, moveAction.NewPath, |
|
114 |
moveAction.NewFileName); |
|
115 |
break; |
|
116 |
case CloudActionType.MustSynch: |
|
117 |
|
|
118 |
if (!File.Exists(downloadPath)) |
|
119 |
{ |
|
120 |
var cloudUri = new Uri(action.CloudFile.Name, UriKind.Relative); |
|
121 |
DownloadCloudFile(account, container, cloudUri, downloadPath); |
|
122 |
} |
|
123 |
else |
|
124 |
{ |
|
125 |
SyncFiles(action); |
|
126 |
} |
|
127 |
break; |
|
128 |
} |
|
129 |
Log.InfoFormat("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, |
|
130 |
action.CloudFile.Name); |
|
131 |
} |
|
132 |
catch (OperationCanceledException) |
|
133 |
{ |
|
134 |
throw; |
|
135 |
} |
|
136 |
catch (System.IO.FileNotFoundException exc) |
|
137 |
{ |
|
138 |
Log.ErrorFormat("{0} : {1} -> {2} failed because the file was not found.\n Rescheduling a delete", |
|
139 |
action.Action, action.LocalFile, action.CloudFile, exc); |
|
140 |
Post(new CloudAction(CloudActionType.DeleteCloud,action.CloudFile)); |
|
141 |
} |
|
142 |
catch (Exception exc) |
|
143 |
{ |
|
144 |
Log.ErrorFormat("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
|
145 |
action.Action, action.LocalFile, action.CloudFile, exc); |
|
146 |
|
|
147 |
_agent.Post(action); |
|
148 |
} |
|
149 |
return CompletedTask<object>.Default; |
|
150 |
} |
|
151 |
} |
|
152 |
|
|
153 |
private void SyncFiles(CloudAction action) |
|
154 |
{ |
|
155 |
if (action==null) |
|
156 |
throw new ArgumentNullException("action"); |
|
157 |
if (action.LocalFile==null) |
|
158 |
throw new ArgumentException("The action's local file is not specified","action"); |
|
159 |
if (!Path.IsPathRooted(action.LocalFile.FullName)) |
|
160 |
throw new ArgumentException("The action's local file path must be absolute","action"); |
|
161 |
if (action.CloudFile== null) |
|
162 |
throw new ArgumentException("The action's cloud file is not specified", "action"); |
|
163 |
Contract.EndContractBlock(); |
|
164 |
|
|
79 | 165 |
var localFile = action.LocalFile; |
80 | 166 |
var cloudFile = action.CloudFile; |
81 |
var downloadPath = (cloudFile == null) ? String.Empty |
|
82 |
: Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath()); |
|
167 |
var downloadPath=action.LocalFile.FullName.ToLower(); |
|
83 | 168 |
|
84 |
try |
|
169 |
var account = cloudFile.Account; |
|
170 |
//Use "pithos" by default if no container is specified |
|
171 |
var container = cloudFile.Container ?? PithosContainer; |
|
172 |
|
|
173 |
var cloudUri = new Uri(cloudFile.Name, UriKind.Relative); |
|
174 |
var cloudHash = cloudFile.Hash.ToLower(); |
|
175 |
var localHash = action.LocalHash.Value.ToLower(); |
|
176 |
var topHash = action.TopHash.Value.ToLower(); |
|
177 |
|
|
178 |
//Not enough to compare only the local hashes, also have to compare the tophashes |
|
179 |
|
|
180 |
//If any of the hashes match, we are done |
|
181 |
if ((cloudHash == localHash || cloudHash == topHash)) |
|
85 | 182 |
{ |
86 |
switch (action.Action) |
|
183 |
Log.InfoFormat("Skipping {0}, hashes match",downloadPath); |
|
184 |
return; |
|
185 |
} |
|
186 |
|
|
187 |
//The hashes DON'T match. We need to sync |
|
188 |
var lastLocalTime = localFile.LastWriteTime; |
|
189 |
var lastUpTime = cloudFile.Last_Modified; |
|
190 |
|
|
191 |
//If the local file is newer upload it |
|
192 |
if (lastUpTime <= lastLocalTime) |
|
193 |
{ |
|
194 |
//It probably means it was changed while the app was down |
|
195 |
UploadCloudFile(account, container, localFile, action.LocalHash.Value, |
|
196 |
action.TopHash.Value); |
|
197 |
} |
|
198 |
else |
|
199 |
{ |
|
200 |
//It the cloud file has a later date, it was modified by another user or computer. |
|
201 |
//We need to check the local file's status |
|
202 |
var status = StatusKeeper.GetFileStatus(downloadPath); |
|
203 |
switch (status) |
|
87 | 204 |
{ |
88 |
case CloudActionType.UploadUnconditional: |
|
89 |
UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value); |
|
205 |
case FileStatus.Unchanged: |
|
206 |
//If the local file's status is Unchanged, we can go on and download the newer cloud file |
|
207 |
DownloadCloudFile(account, container,cloudUri,downloadPath); |
|
90 | 208 |
break; |
91 |
case CloudActionType.DownloadUnconditional: |
|
92 |
DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name, UriKind.Relative), downloadPath); |
|
209 |
case FileStatus.Modified: |
|
210 |
//If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict |
|
211 |
//We can't ensure that a file modified online since the last time will appear as Modified, unless we |
|
212 |
//index all files before we start listening. |
|
213 |
case FileStatus.Created: |
|
214 |
//If the local file is Created, it means that the local and cloud files aren't related, |
|
215 |
// yet they have the same name. |
|
216 |
|
|
217 |
//In both cases we must mark the file as in conflict |
|
218 |
ReportConflict(downloadPath); |
|
93 | 219 |
break; |
94 |
case CloudActionType.DeleteCloud: |
|
95 |
DeleteCloudFile(cloudFile.Name); |
|
96 |
break; |
|
97 |
case CloudActionType.RenameCloud: |
|
98 |
RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName); |
|
99 |
break; |
|
100 |
case CloudActionType.MustSynch: |
|
101 |
if (File.Exists(downloadPath)) |
|
102 |
{ |
|
103 |
var cloudHash = cloudFile.Hash; |
|
104 |
var localHash = action.LocalHash.Value; |
|
105 |
var topHash = action.TopHash.Value; |
|
106 |
//Not enough to compare only the local hashes, also have to compare the tophashes |
|
107 |
if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) && |
|
108 |
!cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase)) |
|
109 |
{ |
|
110 |
var lastLocalTime = localFile.LastWriteTime; |
|
111 |
var lastUpTime = cloudFile.Last_Modified; |
|
112 |
if (lastUpTime <= lastLocalTime) |
|
113 |
{ |
|
114 |
//Local change while the app was down or Files in conflict |
|
115 |
//Maybe need to store version as well, to check who has the latest version |
|
116 |
|
|
117 |
//StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
|
118 |
UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value); |
|
119 |
} |
|
120 |
else |
|
121 |
{ |
|
122 |
var status = StatusKeeper.GetFileStatus(downloadPath); |
|
123 |
switch (status) |
|
124 |
{ |
|
125 |
case FileStatus.Unchanged: |
|
126 |
//It he cloud file has a later date, it was modified by another user or computer. |
|
127 |
//If the local file's status is Unchanged, we should go on and download the cloud file |
|
128 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath); |
|
129 |
break; |
|
130 |
case FileStatus.Modified: |
|
131 |
//If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict |
|
132 |
//We can't ensure that a file modified online since the last time will appear as Modified, unless we |
|
133 |
//index all files before we start listening. |
|
134 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
|
135 |
break; |
|
136 |
case FileStatus.Created: |
|
137 |
//If the local file is Created, it means that the local and cloud files aren't related yet have the same name |
|
138 |
//In this case we must mark the file as in conflict |
|
139 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
|
140 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
|
141 |
break; |
|
142 |
default: |
|
143 |
//If the local file is Created, it means that the local and cloud files aren't related yet have the same name |
|
144 |
//In this case we must mark the file as in conflict |
|
145 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
|
146 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
|
147 |
Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}", status, downloadPath, action.CloudFile.Name); |
|
148 |
break; |
|
149 |
} |
|
150 |
} |
|
151 |
} |
|
152 |
} |
|
153 |
else |
|
154 |
DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath); |
|
220 |
default: |
|
221 |
//Other cases should never occur. Mark them as Conflict as well but log a warning |
|
222 |
ReportConflict(downloadPath); |
|
223 |
Log.WarnFormat("Unexcepted status {0} for file {1}->{2}", status, |
|
224 |
downloadPath, action.CloudFile.Name); |
|
155 | 225 |
break; |
156 | 226 |
} |
227 |
} |
|
228 |
} |
|
229 |
|
|
230 |
private void ReportConflict(string downloadPath) |
|
231 |
{ |
|
232 |
if (String.IsNullOrWhiteSpace(downloadPath)) |
|
233 |
throw new ArgumentNullException("downloadPath"); |
|
234 |
Contract.EndContractBlock(); |
|
235 |
|
|
236 |
StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict); |
|
237 |
var message = String.Format("Conflict detected for file {0}", downloadPath); |
|
238 |
Log.Warn(message); |
|
239 |
StatusNotification.NotifyChange(message, TraceLevel.Warning); |
|
240 |
} |
|
241 |
|
|
242 |
/* |
|
243 |
private Task<object> Process(CloudMoveAction action) |
|
244 |
{ |
|
245 |
if (action == null) |
|
246 |
throw new ArgumentNullException("action"); |
|
247 |
Contract.EndContractBlock(); |
|
248 |
|
|
249 |
Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
|
250 |
|
|
251 |
try |
|
252 |
{ |
|
253 |
RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName); |
|
157 | 254 |
Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name); |
158 | 255 |
} |
159 | 256 |
catch (OperationCanceledException) |
... | ... | |
163 | 260 |
catch (Exception exc) |
164 | 261 |
{ |
165 | 262 |
Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}", |
166 |
action.Action, action.LocalFile, action.CloudFile, exc);
|
|
263 |
action.Action, action.OldFileName, action.NewFileName, exc);
|
|
167 | 264 |
|
168 | 265 |
_agent.Post(action); |
169 | 266 |
} |
170 | 267 |
return CompletedTask<object>.Default; |
171 | 268 |
} |
269 |
*/ |
|
172 | 270 |
|
173 | 271 |
|
174 | 272 |
public void Post(CloudAction cloudAction) |
... | ... | |
180 | 278 |
//If the action targets a local file, add a treehash calculation |
181 | 279 |
if (cloudAction.LocalFile != null) |
182 | 280 |
{ |
183 |
cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, |
|
281 |
|
|
282 |
if (cloudAction.LocalFile.Length>BlockSize) |
|
283 |
cloudAction.TopHash = new Lazy<string>(() => Signature.CalculateTreeHashAsync(cloudAction.LocalFile, |
|
184 | 284 |
BlockSize, BlockHash).Result |
185 | 285 |
.TopHash.ToHashString()); |
286 |
else |
|
287 |
{ |
|
288 |
cloudAction.TopHash=new Lazy<string>(()=> cloudAction.LocalHash.Value); |
|
289 |
} |
|
186 | 290 |
|
187 | 291 |
} |
188 | 292 |
_agent.Post(cloudAction); |
... | ... | |
208 | 312 |
throw new ArgumentNullException(accountPath); |
209 | 313 |
Contract.EndContractBlock(); |
210 | 314 |
|
211 |
Trace.CorrelationManager.StartLogicalOperation(); |
|
212 |
Trace.TraceInformation("[LISTENER] Scheduled"); |
|
213 |
|
|
214 |
//Get the list of server objects changed since the last check |
|
215 |
var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=> |
|
216 |
CloudClient.ListObjects(PithosContainer,since)); |
|
217 |
//Get the list of deleted objects since the last check |
|
218 |
var listTrash= Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=> |
|
219 |
CloudClient.ListObjects(TrashContainer,since)); |
|
315 |
using (log4net.LogicalThreadContext.Stacks["SCHEDULE"].Push("Retrieve Remote")) |
|
316 |
{ |
|
317 |
Log.Info("[LISTENER] Scheduled"); |
|
220 | 318 |
|
221 |
var listAll = Task.Factory.TrackedSequence(() => listObjects, () => listTrash); |
|
319 |
//Get the list of server objects changed since the last check |
|
320 |
var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
|
321 |
CloudClient.ListObjects(CloudClient.UserName, PithosContainer, since)); |
|
322 |
//Get the list of deleted objects since the last check |
|
323 |
var listTrash = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
|
324 |
CloudClient.ListObjects(CloudClient.UserName, TrashContainer, since)); |
|
222 | 325 |
|
223 |
//Next time we will check for all changes since the current check minus 1 second |
|
224 |
//This is done to ensure there are no discrepancies due to clock differences |
|
225 |
DateTime nextSince = DateTime.Now.AddSeconds(-1); |
|
326 |
var listShared = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000, () => |
|
327 |
CloudClient.ListSharedObjects(since)); |
|
226 | 328 |
|
329 |
var listAll = Task.Factory.TrackedSequence( |
|
330 |
() => listObjects, |
|
331 |
() => listTrash, |
|
332 |
() => listShared); |
|
227 | 333 |
|
334 |
//Next time we will check for all changes since the current check minus 1 second |
|
335 |
//This is done to ensure there are no discrepancies due to clock differences |
|
336 |
DateTime nextSince = DateTime.Now.AddSeconds(-1); |
|
228 | 337 |
|
229 |
|
|
230 | 338 |
|
231 |
|
|
232 |
var enqueueFiles = listAll.ContinueWith(task => |
|
233 |
{ |
|
234 |
if (task.IsFaulted) |
|
339 |
var enqueueFiles = listAll.ContinueWith(task => |
|
235 | 340 |
{ |
236 |
//ListObjects failed at this point, need to reschedule |
|
237 |
Trace.TraceError("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception); |
|
238 |
ProcessRemoteFiles(accountPath, since); |
|
239 |
return; |
|
240 |
} |
|
241 |
Trace.CorrelationManager.StartLogicalOperation("Listener"); |
|
242 |
Trace.TraceInformation("[LISTENER] Start Processing"); |
|
341 |
if (task.IsFaulted) |
|
342 |
{ |
|
343 |
//ListObjects failed at this point, need to reschedule |
|
344 |
Log.ErrorFormat("[FAIL] ListObjects in ProcessRemoteFiles with {0}", task.Exception); |
|
345 |
ProcessRemoteFiles(accountPath, since); |
|
346 |
return; |
|
347 |
} |
|
348 |
using (log4net.LogicalThreadContext.Stacks["SCHEDULE"].Push("Process Results")) |
|
349 |
{ |
|
350 |
var remoteObjects = ((Task<IList<ObjectInfo>>) task.Result[0]).Result; |
|
351 |
var trashObjects = ((Task<IList<ObjectInfo>>) task.Result[1]).Result; |
|
352 |
var sharedObjects = ((Task<IList<ObjectInfo>>) task.Result[2]).Result; |
|
353 |
|
|
354 |
//Items with the same name, hash may be both in the container and the trash |
|
355 |
//Don't delete items that exist in the container |
|
356 |
var realTrash = from trash in trashObjects |
|
357 |
where !remoteObjects.Any(info => info.Hash == trash.Hash) |
|
358 |
select trash; |
|
359 |
ProcessDeletedFiles(realTrash); |
|
360 |
|
|
361 |
|
|
362 |
var remote = from info in remoteObjects.Union(sharedObjects) |
|
363 |
let name = info.Name |
|
364 |
where !name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase) && |
|
365 |
!name.StartsWith("fragments/", StringComparison.InvariantCultureIgnoreCase) |
|
366 |
select info; |
|
367 |
|
|
368 |
//Create a list of actions from the remote files |
|
369 |
var allActions = ObjectsToActions(remote); |
|
370 |
|
|
371 |
//And remove those that are already being processed by the agent |
|
372 |
var distinctActions = allActions |
|
373 |
.Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) |
|
374 |
.ToList(); |
|
375 |
|
|
376 |
//Queue all the actions |
|
377 |
foreach (var message in distinctActions) |
|
378 |
{ |
|
379 |
Post(message); |
|
380 |
} |
|
243 | 381 |
|
244 |
var trashObjects = ((Task<IList<ObjectInfo>>)task.Result[1]).Result; |
|
245 |
var remoteObjects = ((Task<IList<ObjectInfo>>)task.Result[0]).Result; |
|
382 |
//Report the number of new files |
|
383 |
var remoteCount = distinctActions.Count(action=> |
|
384 |
action.Action==CloudActionType.DownloadUnconditional); |
|
385 |
if ( remoteCount > 0) |
|
386 |
StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteCount)); |
|
246 | 387 |
|
247 |
|
|
248 |
//Items with the same name, hash may be both in the container and the trash |
|
249 |
//Don't delete items that exist in the container |
|
250 |
var realTrash = from trash in trashObjects |
|
251 |
where !remoteObjects.Any(info => info.Hash == trash.Hash) |
|
252 |
select trash; |
|
253 |
ProcessDeletedFiles(realTrash); |
|
254 |
|
|
255 |
|
|
256 |
var remote=from info in remoteObjects |
|
257 |
let name=info.Name |
|
258 |
where !name.EndsWith(".ignore",StringComparison.InvariantCultureIgnoreCase) && |
|
259 |
!name.StartsWith("fragments/",StringComparison.InvariantCultureIgnoreCase) |
|
260 |
select info; |
|
261 |
|
|
262 |
var commonObjects = new List<Tuple<ObjectInfo, FileInfo,FileState>>(); |
|
263 |
var remoteOnly = new List<ObjectInfo>(); |
|
264 |
|
|
265 |
//In order to avoid multiple iterations over the files, we iterate only once |
|
266 |
//over the remote files |
|
267 |
foreach (var objectInfo in remote) |
|
268 |
{ |
|
269 |
var relativePath= objectInfo.Name.RelativeUrlToFilePath(); |
|
270 |
//and remove any matching objects from the list, adding them to the commonObjects list |
|
271 |
if (FileAgent.Exists(relativePath)) |
|
388 |
Log.Info("[LISTENER] End Processing"); |
|
389 |
} |
|
390 |
}); |
|
391 |
|
|
392 |
var loop = enqueueFiles.ContinueWith(t => |
|
393 |
{ |
|
394 |
if (t.IsFaulted) |
|
272 | 395 |
{ |
273 |
var localFile = FileAgent.GetFileInfo(relativePath); |
|
274 |
var state = FileState.FindByFilePath(localFile.FullName); |
|
275 |
commonObjects.Add(Tuple.Create(objectInfo, localFile, state)); |
|
396 |
Log.Error("[LISTENER] Exception", t.Exception); |
|
276 | 397 |
} |
277 | 398 |
else |
278 | 399 |
{ |
279 |
//If there is no match we add them to the localFiles list |
|
280 |
//but only if the file is not marked for deletion |
|
281 |
var targetFile = Path.Combine(FileAgent.RootPath, relativePath); |
|
282 |
var fileStatus = StatusKeeper.GetFileStatus(targetFile); |
|
283 |
if (fileStatus!=FileStatus.Deleted) |
|
284 |
remoteOnly.Add(objectInfo); |
|
285 |
|
|
286 |
|
|
400 |
Log.Info("[LISTENER] Finished"); |
|
287 | 401 |
} |
288 |
} |
|
289 |
|
|
290 |
//At the end of the iteration, the *remote* list will contain the files that exist |
|
291 |
//only on the server |
|
292 |
|
|
293 |
//Remote files should be downloaded |
|
294 |
var actionsForRemote = from upFile in remoteOnly |
|
295 |
select new CloudAction(CloudActionType.DownloadUnconditional,upFile); |
|
296 |
|
|
297 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
|
298 |
var actionsForCommon = from pair in commonObjects |
|
299 |
let objectInfo = pair.Item1 |
|
300 |
let localFile = pair.Item2 |
|
301 |
let state=pair.Item3 |
|
302 |
select new CloudAction(CloudActionType.MustSynch, |
|
303 |
localFile, objectInfo,state,BlockSize,BlockHash); |
|
304 |
|
|
305 |
|
|
306 |
|
|
307 |
|
|
308 |
|
|
309 |
//Collect all the actions |
|
310 |
var allActions = actionsForRemote.Union(actionsForCommon); |
|
311 |
|
|
312 |
//And remove those that are already being processed by the agent |
|
313 |
var distinctActions =allActions |
|
314 |
.Except(_agent.GetEnumerable(), new PithosMonitor.LocalFileComparer()) |
|
315 |
.ToList(); |
|
316 |
|
|
317 |
//Queue all the actions |
|
318 |
foreach (var message in distinctActions) |
|
319 |
{ |
|
320 |
Post(message); |
|
321 |
} |
|
322 |
|
|
323 |
|
|
324 |
if(remoteOnly.Count>0) |
|
325 |
StatusNotification.NotifyChange(String.Format("Processing {0} new files", remoteOnly.Count)); |
|
402 |
ProcessRemoteFiles(accountPath, nextSince); |
|
326 | 403 |
|
327 |
Trace.TraceInformation("[LISTENER] End Processing"); |
|
328 |
Trace.CorrelationManager.StopLogicalOperation(); |
|
404 |
}); |
|
405 |
return loop; |
|
406 |
} |
|
407 |
} |
|
329 | 408 |
|
330 |
}); |
|
409 |
//Creates an appropriate action for each server file |
|
410 |
private IEnumerable<CloudAction> ObjectsToActions(IEnumerable<ObjectInfo> remote) |
|
411 |
{ |
|
412 |
if (remote==null) |
|
413 |
throw new ArgumentNullException(); |
|
414 |
Contract.EndContractBlock(); |
|
331 | 415 |
|
332 |
var loop = enqueueFiles.ContinueWith(t => |
|
416 |
//In order to avoid multiple iterations over the files, we iterate only once |
|
417 |
//over the remote files |
|
418 |
foreach (var objectInfo in remote) |
|
333 | 419 |
{ |
334 |
if (t.IsFaulted) |
|
420 |
var relativePath = objectInfo.RelativeUrlToFilePath(CloudClient.UserName); |
|
421 |
//and remove any matching objects from the list, adding them to the commonObjects list |
|
422 |
if (FileAgent.Exists(relativePath)) |
|
335 | 423 |
{ |
336 |
Trace.TraceError("[LISTENER] Exception: {0}", t.Exception); |
|
424 |
var localFile = FileAgent.GetFileInfo(relativePath); |
|
425 |
var state = FileState.FindByFilePath(localFile.FullName); |
|
426 |
//Common files should be checked on a per-case basis to detect differences, which is newer |
|
427 |
|
|
428 |
yield return new CloudAction(CloudActionType.MustSynch, |
|
429 |
localFile, objectInfo, state, BlockSize, BlockHash); |
|
337 | 430 |
} |
338 | 431 |
else |
339 | 432 |
{ |
340 |
Trace.TraceInformation("[LISTENER] Finished"); |
|
433 |
//If there is no match we add them to the localFiles list |
|
434 |
//but only if the file is not marked for deletion |
|
435 |
var targetFile = Path.Combine(FileAgent.RootPath, relativePath); |
|
436 |
var fileStatus = StatusKeeper.GetFileStatus(targetFile); |
|
437 |
if (fileStatus != FileStatus.Deleted) |
|
438 |
{ |
|
439 |
//Remote files should be downloaded |
|
440 |
yield return new CloudAction(CloudActionType.DownloadUnconditional, objectInfo); |
|
441 |
} |
|
341 | 442 |
} |
342 |
ProcessRemoteFiles(accountPath, nextSince); |
|
343 |
|
|
344 |
}); |
|
345 |
return loop; |
|
443 |
} |
|
346 | 444 |
} |
347 | 445 |
|
348 | 446 |
private void ProcessDeletedFiles(IEnumerable<ObjectInfo> trashObjects) |
349 | 447 |
{ |
350 | 448 |
foreach (var trashObject in trashObjects) |
351 | 449 |
{ |
352 |
var relativePath = trashObject.Name.RelativeUrlToFilePath();
|
|
450 |
var relativePath = trashObject.RelativeUrlToFilePath(CloudClient.UserName);
|
|
353 | 451 |
//and remove any matching objects from the list, adding them to the commonObjects list |
354 | 452 |
FileAgent.Delete(relativePath); |
355 | 453 |
} |
356 | 454 |
} |
357 | 455 |
|
358 | 456 |
|
359 |
private void RenameCloudFile(string oldFileName, string newPath, string newFileName) |
|
457 |
private void RenameCloudFile(string account, string container,string oldFileName, string newPath, string newFileName)
|
|
360 | 458 |
{ |
459 |
if (String.IsNullOrWhiteSpace(account)) |
|
460 |
throw new ArgumentNullException("account"); |
|
461 |
if (String.IsNullOrWhiteSpace(container)) |
|
462 |
throw new ArgumentNullException("container"); |
|
361 | 463 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
362 | 464 |
throw new ArgumentNullException("oldFileName"); |
363 | 465 |
if (String.IsNullOrWhiteSpace(oldFileName)) |
... | ... | |
368 | 470 |
//The local file is already renamed |
369 | 471 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Modified); |
370 | 472 |
|
371 |
CloudClient.MoveObject(PithosContainer, oldFileName, PithosContainer, newFileName);
|
|
473 |
CloudClient.MoveObject(account, container, oldFileName, container, newFileName);
|
|
372 | 474 |
|
373 | 475 |
this.StatusKeeper.SetFileStatus(newPath, FileStatus.Unchanged); |
374 | 476 |
this.StatusKeeper.SetFileOverlayStatus(newPath, FileOverlayStatus.Normal); |
375 | 477 |
NativeMethods.RaiseChangeNotification(newPath); |
376 | 478 |
} |
377 | 479 |
|
378 |
private void DeleteCloudFile(string fileName) |
|
379 |
{ |
|
480 |
private void DeleteCloudFile(string account,string container, string fileName) |
|
481 |
{ |
|
482 |
if (String.IsNullOrWhiteSpace(account)) |
|
483 |
throw new ArgumentNullException("account"); |
|
484 |
if (String.IsNullOrWhiteSpace(container)) |
|
485 |
throw new ArgumentNullException("container"); |
|
486 |
if (String.IsNullOrWhiteSpace(container)) |
|
487 |
throw new ArgumentNullException("container"); |
|
488 |
|
|
380 | 489 |
if (String.IsNullOrWhiteSpace(fileName)) |
381 | 490 |
throw new ArgumentNullException("fileName"); |
382 | 491 |
if (Path.IsPathRooted(fileName)) |
383 | 492 |
throw new ArgumentException("The fileName should not be rooted","fileName"); |
384 | 493 |
Contract.EndContractBlock(); |
385 | 494 |
|
386 |
this.StatusKeeper.SetFileOverlayStatus(fileName, FileOverlayStatus.Modified); |
|
387 |
CloudClient.DeleteObject(PithosContainer, fileName, TrashContainer); |
|
495 |
using ( log4net.LogicalThreadContext.Stacks["DeleteCloudFile"].Push("Delete")) |
|
496 |
{ |
|
497 |
var info = FileAgent.GetFileInfo(fileName); |
|
498 |
var path = info.FullName.ToLower(); |
|
499 |
this.StatusKeeper.SetFileOverlayStatus(path, FileOverlayStatus.Modified); |
|
500 |
|
|
501 |
CloudClient.DeleteObject(account, container, fileName, TrashContainer); |
|
388 | 502 |
|
389 |
this.StatusKeeper.ClearFileStatus(fileName);
|
|
390 |
this.StatusKeeper.RemoveFileOverlayStatus(fileName);
|
|
503 |
this.StatusKeeper.ClearFileStatus(path);
|
|
504 |
}
|
|
391 | 505 |
} |
392 | 506 |
|
393 | 507 |
//Download a file. |
394 |
private void DownloadCloudFile(string container, Uri relativeUrl, string localPath) |
|
508 |
private void DownloadCloudFile(string account,string container, Uri relativeUrl, string localPath)
|
|
395 | 509 |
{ |
510 |
if (String.IsNullOrWhiteSpace(account)) |
|
511 |
throw new ArgumentNullException("account"); |
|
396 | 512 |
if (String.IsNullOrWhiteSpace(container)) |
397 | 513 |
throw new ArgumentNullException("container"); |
398 | 514 |
if (relativeUrl == null) |
... | ... | |
403 | 519 |
throw new ArgumentException("The localPath must be rooted", "localPath"); |
404 | 520 |
Contract.EndContractBlock(); |
405 | 521 |
|
406 |
var download=Task.Factory.Iterate(DownloadIterator(container, relativeUrl, localPath)); |
|
522 |
var download=Task.Factory.Iterate(DownloadIterator(account,container, relativeUrl, localPath));
|
|
407 | 523 |
download.Wait(); |
408 | 524 |
} |
409 | 525 |
|
410 |
private IEnumerable<Task> DownloadIterator(string container, Uri relativeUrl, string localPath) |
|
526 |
private IEnumerable<Task> DownloadIterator(string account,string container, Uri relativeUrl, string localPath)
|
|
411 | 527 |
{ |
528 |
if (String.IsNullOrWhiteSpace(account)) |
|
529 |
throw new ArgumentNullException("account"); |
|
412 | 530 |
if (String.IsNullOrWhiteSpace(container)) |
413 | 531 |
throw new ArgumentNullException("container"); |
414 | 532 |
if (relativeUrl==null) |
... | ... | |
432 | 550 |
//var hashPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".hashmap"); |
433 | 551 |
|
434 | 552 |
//Retrieve the hashmap from the server |
435 |
var getHashMap = CloudClient.GetHashMap(container,url);
|
|
553 |
var getHashMap = CloudClient.GetHashMap(account, container, url);
|
|
436 | 554 |
yield return getHashMap; |
437 | 555 |
|
438 | 556 |
var serverHash=getHashMap.Result; |
439 | 557 |
//If it's a small file |
440 | 558 |
var downloadTask=(serverHash.Hashes.Count == 1 ) |
441 | 559 |
//Download it in one go |
442 |
? DownloadEntireFile(container, relativeUrl, localPath) |
|
560 |
? DownloadEntireFile(account,container, relativeUrl, localPath)
|
|
443 | 561 |
//Otherwise download it block by block |
444 |
: DownloadWithBlocks(container, relativeUrl, localPath, serverHash); |
|
562 |
: DownloadWithBlocks(account,container, relativeUrl, localPath, serverHash);
|
|
445 | 563 |
|
446 | 564 |
yield return downloadTask; |
447 | 565 |
|
448 | 566 |
|
449 | 567 |
//Retrieve the object's metadata |
450 |
var info=CloudClient.GetObjectInfo(container, url); |
|
568 |
var info=CloudClient.GetObjectInfo(account, container, url);
|
|
451 | 569 |
//And store it |
452 | 570 |
StatusKeeper.StoreInfo(localPath, info); |
453 | 571 |
|
... | ... | |
458 | 576 |
} |
459 | 577 |
|
460 | 578 |
//Download a small file with a single GET operation |
461 |
private Task DownloadEntireFile(string container, Uri relativeUrl, string localPath) |
|
579 |
private Task DownloadEntireFile(string account,string container, Uri relativeUrl, string localPath)
|
|
462 | 580 |
{ |
581 |
if (String.IsNullOrWhiteSpace(account)) |
|
582 |
throw new ArgumentNullException("account"); |
|
463 | 583 |
if (String.IsNullOrWhiteSpace(container)) |
464 | 584 |
throw new ArgumentNullException("container"); |
465 | 585 |
if (relativeUrl == null) |
... | ... | |
480 | 600 |
Directory.CreateDirectory(directoryPath); |
481 | 601 |
|
482 | 602 |
//Download the object to the temporary location |
483 |
var getObject = CloudClient.GetObject(container, relativeUrl.ToString(), tempPath).ContinueWith(t => |
|
603 |
var getObject = CloudClient.GetObject(account, container, relativeUrl.ToString(), tempPath).ContinueWith(t =>
|
|
484 | 604 |
{ |
485 | 605 |
t.PropagateExceptions(); |
486 | 606 |
//And move it to its actual location once downloading is finished |
... | ... | |
493 | 613 |
} |
494 | 614 |
|
495 | 615 |
//Download a file asynchronously using blocks |
496 |
public Task DownloadWithBlocks(string container, Uri relativeUrl, string localPath, TreeHash serverHash) |
|
616 |
public Task DownloadWithBlocks(string account,string container, Uri relativeUrl, string localPath, TreeHash serverHash)
|
|
497 | 617 |
{ |
618 |
if (String.IsNullOrWhiteSpace(account)) |
|
619 |
throw new ArgumentNullException("account"); |
|
498 | 620 |
if (String.IsNullOrWhiteSpace(container)) |
499 | 621 |
throw new ArgumentNullException("container"); |
500 | 622 |
if (relativeUrl == null) |
... | ... | |
507 | 629 |
throw new ArgumentNullException("serverHash"); |
508 | 630 |
Contract.EndContractBlock(); |
509 | 631 |
|
510 |
return Task.Factory.Iterate(BlockDownloadIterator(container, relativeUrl, localPath, serverHash)); |
|
632 |
return Task.Factory.Iterate(BlockDownloadIterator(account,container, relativeUrl, localPath, serverHash));
|
|
511 | 633 |
} |
512 | 634 |
|
513 |
private IEnumerable<Task> BlockDownloadIterator(string container,Uri relativeUrl, string localPath,TreeHash serverHash) |
|
635 |
private IEnumerable<Task> BlockDownloadIterator(string account,string container,Uri relativeUrl, string localPath,TreeHash serverHash)
|
|
514 | 636 |
{ |
637 |
if (String.IsNullOrWhiteSpace(account)) |
|
638 |
throw new ArgumentNullException("account"); |
|
515 | 639 |
if (String.IsNullOrWhiteSpace(container)) |
516 | 640 |
throw new ArgumentNullException("container"); |
517 | 641 |
if (relativeUrl == null) |
... | ... | |
547 | 671 |
{ |
548 | 672 |
if (blockUpdater.UseOrphan(i, upHash)) |
549 | 673 |
{ |
550 |
Trace.TraceInformation("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
|
|
674 |
Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
|
|
551 | 675 |
continue; |
552 | 676 |
} |
553 |
Trace.TraceInformation("[BLOCK GET] START {0} of {1} for {2}",i,upHashes.Length,localPath);
|
|
677 |
Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
|
|
554 | 678 |
var start = i*BlockSize; |
555 | 679 |
//To download the last block just pass a null for the end of the range |
556 | 680 |
long? end = null; |
... | ... | |
558 | 682 |
end= ((i + 1)*BlockSize) ; |
559 | 683 |
|
560 | 684 |
//Download the missing block |
561 |
var getBlock = CloudClient.GetBlock(container, relativeUrl, start, end); |
|
685 |
var getBlock = CloudClient.GetBlock(account, container, relativeUrl, start, end);
|
|
562 | 686 |
yield return getBlock; |
563 | 687 |
var block = getBlock.Result; |
564 | 688 |
|
565 | 689 |
//and store it |
566 | 690 |
yield return blockUpdater.StoreBlock(i, block); |
567 |
|
|
568 | 691 |
|
569 |
Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
|
692 |
|
|
693 |
Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath); |
|
570 | 694 |
} |
571 | 695 |
} |
572 | 696 |
|
573 | 697 |
blockUpdater.Commit(); |
574 |
Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
|
|
698 |
Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
|
|
575 | 699 |
} |
576 | 700 |
|
577 | 701 |
|
578 |
private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash) |
|
702 |
private void UploadCloudFile(string account,string container,FileInfo fileInfo, string hash,string topHash)
|
|
579 | 703 |
{ |
704 |
if (String.IsNullOrWhiteSpace(account)) |
|
705 |
throw new ArgumentNullException("account"); |
|
706 |
if (String.IsNullOrWhiteSpace(container)) |
|
707 |
throw new ArgumentNullException("container"); |
|
580 | 708 |
if (fileInfo == null) |
581 | 709 |
throw new ArgumentNullException("fileInfo"); |
582 | 710 |
if (String.IsNullOrWhiteSpace(hash)) |
... | ... | |
585 | 713 |
throw new ArgumentNullException("topHash"); |
586 | 714 |
Contract.EndContractBlock(); |
587 | 715 |
|
588 |
var upload = Task.Factory.Iterate(UploadIterator(fileInfo, hash, topHash)); |
|
716 |
var upload = Task.Factory.Iterate(UploadIterator(account,container,fileInfo, hash, topHash));
|
|
589 | 717 |
upload.Wait(); |
590 | 718 |
} |
591 | 719 |
|
592 |
private IEnumerable<Task> UploadIterator(FileInfo fileInfo, string hash,string topHash) |
|
720 |
private IEnumerable<Task> UploadIterator(string account,string container,FileInfo fileInfo, string hash,string topHash)
|
|
593 | 721 |
{ |
594 |
if (fileInfo==null) |
|
722 |
if (String.IsNullOrWhiteSpace(account)) |
|
723 |
throw new ArgumentNullException("account"); |
|
724 |
if (String.IsNullOrWhiteSpace(container)) |
|
725 |
throw new ArgumentNullException("container"); |
|
726 |
if (fileInfo == null) |
|
595 | 727 |
throw new ArgumentNullException("fileInfo"); |
596 | 728 |
if (String.IsNullOrWhiteSpace(hash)) |
597 | 729 |
throw new ArgumentNullException("hash"); |
... | ... | |
613 | 745 |
|
614 | 746 |
|
615 | 747 |
//Even if GetObjectInfo times out, we can proceed with the upload |
616 |
var info = CloudClient.GetObjectInfo(PithosContainer, url);
|
|
748 |
var info = CloudClient.GetObjectInfo(account, container, url);
|
|
617 | 749 |
|
618 | 750 |
//If the file hashes match, abort the upload |
619 | 751 |
if (hash.Equals(info.Hash, StringComparison.InvariantCultureIgnoreCase) || |
... | ... | |
621 | 753 |
{ |
622 | 754 |
//but store any metadata changes |
623 | 755 |
this.StatusKeeper.StoreInfo(fullFileName, info); |
624 |
Trace.TraceInformation("Skip upload of {0}, hashes match", fullFileName);
|
|
756 |
Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
|
|
625 | 757 |
yield break; |
626 | 758 |
} |
627 | 759 |
|
... | ... | |
637 | 769 |
var treeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName, BlockSize, BlockHash); |
638 | 770 |
yield return treeHash; |
639 | 771 |
|
640 |
yield return Task.Factory.Iterate(UploadWithHashMap(fileInfo,url,treeHash)); |
|
772 |
yield return Task.Factory.Iterate(UploadWithHashMap(account,container,fileInfo,url,treeHash));
|
|
641 | 773 |
|
642 | 774 |
} |
643 | 775 |
else |
644 | 776 |
{ |
645 | 777 |
//Otherwise do a regular PUT |
646 |
yield return CloudClient.PutObject(PithosContainer,url,fullFileName,hash);
|
|
778 |
yield return CloudClient.PutObject(account, container, url, fullFileName, hash);
|
|
647 | 779 |
} |
648 | 780 |
//If everything succeeds, change the file and overlay status to normal |
649 | 781 |
this.StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal); |
... | ... | |
653 | 785 |
StatusNotification.NotifyChangedFile(fullFileName); |
654 | 786 |
} |
655 | 787 |
|
656 |
public IEnumerable<Task> UploadWithHashMap(FileInfo fileInfo,string url,Task<TreeHash> treeHash) |
|
788 |
public IEnumerable<Task> UploadWithHashMap(string account,string container,FileInfo fileInfo,string url,Task<TreeHash> treeHash)
|
|
657 | 789 |
{ |
658 |
if(fileInfo==null) |
|
790 |
if (String.IsNullOrWhiteSpace(account)) |
|
791 |
throw new ArgumentNullException("account"); |
|
792 |
if (String.IsNullOrWhiteSpace(container)) |
|
793 |
throw new ArgumentNullException("container"); |
|
794 |
if (fileInfo == null) |
|
659 | 795 |
throw new ArgumentNullException("fileInfo"); |
660 | 796 |
if (String.IsNullOrWhiteSpace(url)) |
661 | 797 |
throw new ArgumentNullException(url); |
... | ... | |
666 | 802 |
var fullFileName = fileInfo.FullName; |
667 | 803 |
|
668 | 804 |
//Send the hashmap to the server |
669 |
var hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
|
|
805 |
var hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
|
|
670 | 806 |
yield return hashPut; |
671 | 807 |
|
672 | 808 |
var missingHashes = hashPut.Result; |
... | ... | |
684 | 820 |
var read = fileInfo.Read(buffer, offset, BlockSize); |
685 | 821 |
|
686 | 822 |
//And upload the block |
687 |
var postBlock = CloudClient.PostBlock(PithosContainer, buffer, 0, read);
|
|
823 |
var postBlock = CloudClient.PostBlock(account, container, buffer, 0, read);
|
|
688 | 824 |
|
689 | 825 |
//We have to handle possible exceptions in a continuation because |
690 | 826 |
//*yield return* can't appear inside a try block |
691 | 827 |
yield return postBlock.ContinueWith(t => |
692 | 828 |
t.ReportExceptions( |
693 |
exc=>Trace.TraceError("[ERROR] uploading block {0} of {1}\n{2}",blockIndex, fullFileName, exc),
|
|
694 |
()=>Trace.TraceInformation("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
|
|
829 |
exc => Log.ErrorFormat("[ERROR] uploading block {0} of {1}\n{2}", blockIndex, fullFileName, exc),
|
|
830 |
()=>Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex,fullFileName)));
|
|
695 | 831 |
} |
696 | 832 |
|
697 | 833 |
//Repeat until there are no more missing hashes |
698 |
hashPut = CloudClient.PutHashMap(PithosContainer, url, treeHash.Result);
|
|
834 |
hashPut = CloudClient.PutHashMap(account, container, url, treeHash.Result);
|
|
699 | 835 |
yield return hashPut; |
700 | 836 |
missingHashes = hashPut.Result; |
701 | 837 |
} |
Also available in: Unified diff