Revision 6f03d6e1 trunk/Pithos.Core/Agents/PollAgent.cs
b/trunk/Pithos.Core/Agents/PollAgent.cs | ||
---|---|---|
119 | 119 |
{ |
120 | 120 |
UpdateStatus(PithosStatus.PollSyncing); |
121 | 121 |
|
122 |
//Next time we will check for all changes since the current check minus 1 second |
|
123 |
//This is done to ensure there are no discrepancies due to clock differences |
|
124 |
var current = DateTime.Now.AddSeconds(-1); |
|
125 |
|
|
126 | 122 |
var tasks = from accountInfo in _accounts.Values |
127 | 123 |
select ProcessAccountFiles(accountInfo, since); |
128 | 124 |
|
129 |
await TaskEx.WhenAll(tasks.ToList()); |
|
125 |
var nextTimes=await TaskEx.WhenAll(tasks.ToList());
|
|
130 | 126 |
|
131 | 127 |
_firstPoll = false; |
132 | 128 |
//Reschedule the poll with the current timestamp as a "since" value |
133 |
nextSince = current; |
|
129 |
|
|
130 |
if (nextTimes.Length>0) |
|
131 |
nextSince = nextTimes.Min(); |
|
132 |
if (Log.IsDebugEnabled) |
|
133 |
Log.DebugFormat("Next Poll at [{0}]",nextSince); |
|
134 | 134 |
} |
135 | 135 |
catch (Exception ex) |
136 | 136 |
{ |
... | ... | |
180 | 180 |
return since; |
181 | 181 |
} |
182 | 182 |
|
183 |
public async Task ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null) |
|
183 |
public async Task<DateTime?> ProcessAccountFiles(AccountInfo accountInfo, DateTime? since = null)
|
|
184 | 184 |
{ |
185 | 185 |
if (accountInfo == null) |
186 | 186 |
throw new ArgumentNullException("accountInfo"); |
... | ... | |
191 | 191 |
|
192 | 192 |
using (log4net.ThreadContext.Stacks["Retrieve Remote"].Push(accountInfo.UserName)) |
193 | 193 |
{ |
194 |
|
|
194 | 195 |
await NetworkAgent.GetDeleteAwaiter(); |
195 | 196 |
|
196 | 197 |
Log.Info("Scheduled"); |
... | ... | |
204 | 205 |
|
205 | 206 |
CreateContainerFolders(accountInfo, containers); |
206 | 207 |
|
208 |
//The nextSince time fallback time is the same as the current. |
|
209 |
//If polling succeeds, the next Since time will be the smallest of the maximum modification times |
|
210 |
//of the shared and account objects |
|
211 |
var nextSince = since; |
|
212 |
|
|
207 | 213 |
try |
208 | 214 |
{ |
209 | 215 |
//Wait for any deletions to finish |
... | ... | |
227 | 233 |
var dict = listTasks.ToDictionary(t => t.AsyncState); |
228 | 234 |
|
229 | 235 |
//Get all non-trash objects. Remember, the container name is stored in AsyncState |
230 |
var remoteObjects = from objectList in listTasks |
|
236 |
var remoteObjects = (from objectList in listTasks
|
|
231 | 237 |
where (string)objectList.AsyncState != "trash" |
232 | 238 |
from obj in objectList.Result |
233 |
select obj; |
|
239 |
select obj).ToList(); |
|
240 |
|
|
241 |
//Get the latest remote object modification date, only if it is after |
|
242 |
//the original since date |
|
243 |
nextSince = GetLatestDateAfter(nextSince, remoteObjects); |
|
234 | 244 |
|
235 | 245 |
var sharedObjects = dict["shared"].Result; |
246 |
nextSince = GetLatestDateBefore(nextSince, sharedObjects); |
|
236 | 247 |
|
237 | 248 |
//DON'T process trashed files |
238 | 249 |
//If some files are deleted and added again to a folder, they will be deleted |
... | ... | |
289 | 300 |
catch (Exception ex) |
290 | 301 |
{ |
291 | 302 |
Log.ErrorFormat("[FAIL] ListObjects for{0} in ProcessRemoteFiles with {1}", accountInfo.UserName, ex); |
292 |
return; |
|
303 |
return nextSince;
|
|
293 | 304 |
} |
294 | 305 |
|
295 | 306 |
Log.Info("[LISTENER] Finished"); |
296 |
|
|
307 |
return nextSince; |
|
297 | 308 |
} |
298 | 309 |
} |
299 | 310 |
|
311 |
/// <summary> |
|
312 |
/// Returns the latest LastModified date from the list of objects, but only if it is before |
|
313 |
/// than the threshold value |
|
314 |
/// </summary> |
|
315 |
/// <param name="threshold"></param> |
|
316 |
/// <param name="cloudObjects"></param> |
|
317 |
/// <returns></returns> |
|
318 |
private static DateTime? GetLatestDateBefore(DateTime? threshold, IList<ObjectInfo> cloudObjects) |
|
319 |
{ |
|
320 |
DateTime? maxDate = null; |
|
321 |
if (cloudObjects!=null && cloudObjects.Count > 0) |
|
322 |
maxDate = cloudObjects.Max(obj => obj.Last_Modified); |
|
323 |
if (maxDate == null || maxDate == DateTime.MinValue) |
|
324 |
return threshold; |
|
325 |
if (threshold == null || threshold == DateTime.MinValue || threshold > maxDate) |
|
326 |
return maxDate; |
|
327 |
return threshold; |
|
328 |
} |
|
329 |
|
|
330 |
/// <summary> |
|
331 |
/// Returns the latest LastModified date from the list of objects, but only if it is after |
|
332 |
/// the threshold value |
|
333 |
/// </summary> |
|
334 |
/// <param name="threshold"></param> |
|
335 |
/// <param name="cloudObjects"></param> |
|
336 |
/// <returns></returns> |
|
337 |
private static DateTime? GetLatestDateAfter(DateTime? threshold, IList<ObjectInfo> cloudObjects) |
|
338 |
{ |
|
339 |
DateTime? maxDate = null; |
|
340 |
if (cloudObjects!=null && cloudObjects.Count > 0) |
|
341 |
maxDate = cloudObjects.Max(obj => obj.Last_Modified); |
|
342 |
if (maxDate == null || maxDate == DateTime.MinValue) |
|
343 |
return threshold; |
|
344 |
if (threshold == null || threshold == DateTime.MinValue || threshold < maxDate) |
|
345 |
return maxDate; |
|
346 |
return threshold; |
|
347 |
} |
|
348 |
|
|
300 | 349 |
readonly AccountsDifferencer _differencer = new AccountsDifferencer(); |
301 | 350 |
private List<Uri> _selectiveUris=new List<Uri>(); |
302 | 351 |
|
Also available in: Unified diff