Revision a64c87c8 trunk/Pithos.Core/Agents/NetworkAgent.cs

b/trunk/Pithos.Core/Agents/NetworkAgent.cs
61 61
                loop = () =>
62 62
                {
63 63
                    var message = inbox.Receive();
64

  
65
/*
66
                    var process=Process(message);
64
                    var process=message.Then(Process,inbox.CancellationToken);
67 65
                    inbox.LoopAsync(process, loop);
68
*/
69

  
70
/*
71
                    process1.ContinueWith(t =>
72
                    {
73
                        inbox.DoAsync(loop);
74
                        if (t.IsFaulted)
75
                        {
76
                            var ex = t.Exception.InnerException;
77
                            if (ex is OperationCanceledException)
78
                                inbox.Stop();
79
                        }
80
                    });
81
*/
82
                    //inbox.DoAsync(loop);
66
                };
67
                loop();
68
            });
69
        }
83 70

  
71
        private Task<object> Process(CloudAction action)
72
        {
73
            if (action == null)
74
                throw new ArgumentNullException("action");
75
            Contract.EndContractBlock();
84 76

  
85
                    var process = message.ContinueWith(t =>
86
                    {
87
                        var action = t.Result;
88
                        
89
                        Process(action);
90
                        inbox.DoAsync(loop);
91
                    });
77
            Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
78
            var localFile = action.LocalFile;
79
            var cloudFile = action.CloudFile;
80
            var downloadPath = (cloudFile == null) ? String.Empty
81
                                    : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
92 82

  
93
                    process.ContinueWith(t =>
94
                    {
95
                        inbox.DoAsync(loop);
96
                        if (t.IsFaulted)
83
            try
84
            {
85
                switch (action.Action)
86
                {
87
                    case CloudActionType.UploadUnconditional:
88
                        UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
89
                        break;
90
                    case CloudActionType.DownloadUnconditional:
91
                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name, UriKind.Relative), downloadPath);
92
                        break;
93
                    case CloudActionType.DeleteCloud:
94
                        DeleteCloudFile(cloudFile.Name);
95
                        break;
96
                    case CloudActionType.RenameCloud:
97
                        RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
98
                        break;
99
                    case CloudActionType.MustSynch:
100
                        if (File.Exists(downloadPath))
97 101
                        {
98
                            var ex = t.Exception.InnerException;
99
                            if (ex is OperationCanceledException)
100
                                inbox.Stop();
101
                        }
102
                    });
102
                            var cloudHash = cloudFile.Hash;
103
                            var localHash = action.LocalHash.Value;
104
                            var topHash = action.TopHash.Value;
105
                            //Not enough to compare only the local hashes, also have to compare the tophashes
106
                            if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
107
                                !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
108
                            {
109
                                var lastLocalTime = localFile.LastWriteTime;
110
                                var lastUpTime = cloudFile.Last_Modified;
111
                                if (lastUpTime <= lastLocalTime)
112
                                {
113
                                    //Local change while the app was down or Files in conflict
114
                                    //Maybe need to store version as well, to check who has the latest version
103 115

  
116
                                    //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
117
                                    UploadCloudFile(localFile, action.LocalHash.Value, action.TopHash.Value);
118
                                }
119
                                else
120
                                {
121
                                    var status = StatusKeeper.GetFileStatus(downloadPath);
122
                                    switch (status)
123
                                    {
124
                                        case FileStatus.Unchanged:
125
                                            //It he cloud file has a later date, it was modified by another user or computer.
126
                                            //If the local file's status is Unchanged, we should go on and download the cloud file
127
                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
128
                                            break;
129
                                        case FileStatus.Modified:
130
                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
131
                                            //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
132
                                            //index all files before we start listening.
133
                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
134
                                            break;
135
                                        case FileStatus.Created:
136
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
137
                                            //In this case we must mark the file as in conflict
138
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
139
                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
140
                                            break;
141
                                        default:
142
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
143
                                            //In this case we must mark the file as in conflict
144
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
145
                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
146
                                            Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}", status, downloadPath, action.CloudFile.Name);
147
                                            break;
148
                                    }
149
                                }
150
                            }
151
                        }
152
                        else
153
                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name, UriKind.Relative), downloadPath);
154
                        break;
155
                }
156
                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
157
            }
158
            catch (OperationCanceledException)
159
            {
160
                throw;
161
            }
162
            catch (Exception exc)
163
            {
164
                Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
165
                                action.Action, action.LocalFile, action.CloudFile, exc);
104 166

  
105
                };
106
                loop();
107
            });
167
                _agent.Post(action);
168
            }
169
            return CompletedTask<object>.Default;
108 170
        }
109 171

  
110 172

  
......
138 200
            }
139 201
        }
140 202

  
203
        //Remote files are polled periodically. Any changes are processed
141 204
        public Task ProcessRemoteFiles(string accountPath,DateTime? since=null)
142
        {            
205
        {   
206
            if (String.IsNullOrWhiteSpace(accountPath))
207
                throw new ArgumentNullException(accountPath);
208
            Contract.EndContractBlock();
143 209

  
144 210
            Trace.CorrelationManager.StartLogicalOperation();
145 211
            Trace.TraceInformation("[LISTENER] Scheduled");
146
            var listObjects = Task.Factory.StartNewDelayed(10000).ContinueWith(t =>
147
                CloudClient.ListObjects(PithosContainer,since));
212
            
213
            //Get the list of server objects changed since the last check
214
            var listObjects = Task<IList<ObjectInfo>>.Factory.StartNewDelayed(10000,()=>
215
                            CloudClient.ListObjects(PithosContainer,since));
148 216

  
217
            //Next time we will check for all changes since the current check minus 1 second
218
            //This is done to ensure there are no discrepancies due to clock differences
149 219
            DateTime nextSince = DateTime.Now.AddSeconds(-1);
220
            
221
            
150 222

  
151 223
            var enqueueFiles = listObjects.ContinueWith(task =>
152 224
            {
......
244 316
        }
245 317

  
246 318
        
247
        private Task Process(Task<CloudAction> action)
248
        {
249
            return action.ContinueWith(t=> Process(t.Result));
250
        }
251

  
252

  
253
        private void Process(CloudAction action)
254
        {
255
            if (action==null)
256
                throw new ArgumentNullException("action");
257
            Contract.EndContractBlock();
258

  
259
            Trace.TraceInformation("[ACTION] Start Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
260
            var localFile = action.LocalFile;
261
            var cloudFile = action.CloudFile;
262
            var downloadPath = (cloudFile == null) ? String.Empty
263
                                    : Path.Combine(FileAgent.RootPath, cloudFile.Name.RelativeUrlToFilePath());
264
            
265
            try
266
            {
267
                switch (action.Action)
268
                {
269
                    case CloudActionType.UploadUnconditional:
270
                        UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
271
                        break;
272
                    case CloudActionType.DownloadUnconditional:
273
                        DownloadCloudFile(PithosContainer, new Uri(cloudFile.Name,UriKind.Relative), downloadPath);
274
                        break;
275
                    case CloudActionType.DeleteCloud:
276
                        DeleteCloudFile(cloudFile.Name);
277
                        break;
278
                    case CloudActionType.RenameCloud:
279
                        RenameCloudFile(action.OldFileName, action.NewPath, action.NewFileName);
280
                        break;
281
                    case CloudActionType.MustSynch:
282
                        if (File.Exists(downloadPath))
283
                        {                            
284
                            var cloudHash = cloudFile.Hash;
285
                            var localHash = action.LocalHash.Value;
286
                            var topHash = action.TopHash.Value;
287
                            //Not enough to compare only the local hashes, also have to compare the tophashes
288
                            if (!cloudHash.Equals(localHash, StringComparison.InvariantCultureIgnoreCase) &&
289
                                !cloudHash.Equals(topHash, StringComparison.InvariantCultureIgnoreCase))
290
                            {
291
                                var lastLocalTime = localFile.LastWriteTime;
292
                                var lastUpTime = cloudFile.Last_Modified;
293
                                if (lastUpTime <= lastLocalTime)
294
                                {
295
                                    //Local change while the app was down or Files in conflict
296
                                    //Maybe need to store version as well, to check who has the latest version
297

  
298
                                    //StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
299
                                    UploadCloudFile(localFile, action.LocalHash.Value,action.TopHash.Value);
300
                                }
301
                                else
302
                                {
303
                                    var status = StatusKeeper.GetFileStatus(downloadPath);
304
                                    switch (status)
305
                                    {
306
                                        case FileStatus.Unchanged:
307
                                            //It he cloud file has a later date, it was modified by another user or computer.
308
                                            //If the local file's status is Unchanged, we should go on and download the cloud file
309
                                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
310
                                            break;
311
                                        case FileStatus.Modified:
312
                                            //If the local file is Modified, we may have a conflict. In this case we should mark the file as Conflict
313
                                            //We can't ensure that a file modified online since the last time will appear as Modified, unless we 
314
                                            //index all files before we start listening.
315
                                            StatusKeeper.SetFileOverlayStatus(downloadPath, FileOverlayStatus.Conflict);
316
                                            break;
317
                                        case FileStatus.Created:
318
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
319
                                            //In this case we must mark the file as in conflict
320
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
321
                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
322
                                            break;
323
                                        default:
324
                                            //If the local file is Created, it means that the local and cloud files aren't related yet have the same name
325
                                            //In this case we must mark the file as in conflict
326
                                            //Other cases should never occur. Mark them as Conflict as well but log a warning
327
                                            StatusKeeper.SetFileOverlayStatus(downloadPath,FileOverlayStatus.Conflict);
328
                                            Trace.TraceWarning("Unexcepted status {0} for file {1}->{2}",status,downloadPath,action.CloudFile.Name);
329
                                            break;
330
                                    }
331
                                }
332
                            }
333
                        }
334
                        else
335
                            DownloadCloudFile(PithosContainer, new Uri(action.CloudFile.Name,UriKind.Relative), downloadPath);
336
                        break;
337
                }
338
                Trace.TraceInformation("[ACTION] End Processing {0}:{1}->{2}", action.Action, action.LocalFile, action.CloudFile.Name);
339
            }
340
            catch (OperationCanceledException)
341
            {
342
                throw;
343
            }
344
            catch (Exception exc)
345
            {
346
                Trace.TraceError("[REQUEUE] {0} : {1} -> {2} due to exception\r\n{3}",
347
                                action.Action, action.LocalFile, action.CloudFile, exc);
348

  
349
                _agent.Post(action);
350
            }
351

  
352
        }
353

  
354
        
319
       
355 320

  
356 321
        private void RenameCloudFile(string oldFileName, string newPath, string newFileName)
357 322
        {
......
489 454
                throw new ArgumentNullException("serverHash");
490 455
            Contract.EndContractBlock();
491 456

  
457
            
492 458
            //Calculate the relative file path for the new file
493 459
            var relativePath = relativeUrl.RelativeUriToFilePath();
494
            //The file will be stored in a temporary location while downloading with an extension .download
495
            var tempPath = Path.Combine(FileAgent.FragmentsPath, relativePath + ".download");
496
            var directoryPath = Path.GetDirectoryName(tempPath);
497
            if (!Directory.Exists(directoryPath))
498
                Directory.CreateDirectory(directoryPath);
499
            
500
            //If the local file exists we should make a copy of it to the
501
            //fragments folder, unless a newer temp copy already exists, which
502
            //means there is an interrupted download
503
            if (ShouldCopy(localPath, tempPath))
504
                File.Copy(localPath, tempPath, true);    
505

  
506
            //Set the size of the file to the size specified in the treehash
507
            //This will also create an empty file if the file doesn't exist            
508
            SetFileSize(tempPath, serverHash.Bytes);
460
            var blockUpdater = new BlockUpdater(FileAgent.FragmentsPath, localPath, relativePath, serverHash);
509 461

  
462
            
510 463
            return Task.Factory.StartNew(() =>
511 464
            {
512 465
                //Calculate the temp file's treehash
513
                var treeHash = Signature.CalculateTreeHashAsync(tempPath, this.BlockSize,BlockHash).Result;
466
                var treeHash = Signature.CalculateTreeHashAsync(localPath, this.BlockSize,BlockHash).Result;
514 467
                
515 468
                //And compare it with the server's hash
516 469
                var upHashes = serverHash.GetHashesAsStrings();
......
527 480
                            end= ((i + 1)*BlockSize) ;
528 481
                            
529 482
                        //Get its block
530
                        var blockTask = CloudClient.GetBlock(container, relativeUrl,
531
                                                            start, end);
483
                        var block= CloudClient.GetBlock(container, relativeUrl,start, end);
484

  
485
                        var store=block.Then(b => blockUpdater.StoreBlock(i, b));
486
                        store.Wait();
532 487

  
533
                        blockTask.ContinueWith(b =>
534
                        {
535
                            //And apply it to the temp file
536
                            var buffer = b.Result;
537
                            var stream =FileAsync.OpenWrite(tempPath);
538
                            stream.Seek(start,SeekOrigin.Begin);
539
                            return stream.WriteAsync(buffer, 0, buffer.Length)
540
                                .ContinueWith(s => stream.Close());
541

  
542
                        }).Unwrap()
543
                        .Wait();
544 488
                        Trace.TraceInformation("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
545 489
                    }
546

  
547 490
                }
548 491
                
549

  
550
                //Replace the existing file with the temp
551
                if (File.Exists(localPath))
552
                    File.Replace(tempPath, localPath, null, true);
553
                else
554
                    File.Move(tempPath, localPath);
492
                blockUpdater.Commit();
555 493
                Trace.TraceInformation("[BLOCK GET] COMPLETE {0}", localPath);
556 494
            });
557 495
        }
558 496

  
559
        //Change the file's size, possibly truncating or adding to it
560
        private static void SetFileSize(string filePath, long fileSize)
561
        {
562
            if (String.IsNullOrWhiteSpace(filePath))
563
                throw new ArgumentNullException("filePath");
564
            if (!Path.IsPathRooted(filePath))
565
                throw new ArgumentException("The filePath must be rooted", "filePath");
566
            if (fileSize<0)
567
                throw new ArgumentOutOfRangeException("fileSize");
568
            Contract.EndContractBlock();
569

  
570
            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
571
            {
572
                stream.SetLength(fileSize);
573
            }
574
        }
575

  
576
        //Check whether we should copy the local file to a temp path        
577
        private static bool ShouldCopy(string localPath, string tempPath)
578
        {
579
            //No need to copy if there is no file
580
            if (!File.Exists(localPath))            
581
                return false;
582 497

  
583
            //If there is no temp file, go ahead and copy
584
            if (!File.Exists(tempPath))                
585
                return true;
586

  
587
            //If there is a temp file and is newer than the actual file, don't copy
588
            var localLastWrite = File.GetLastWriteTime(localPath);
589
            var tempLastWrite = File.GetLastWriteTime(tempPath);
590
            
591
            //This could mean there is an interrupted download in progress
592
            return (tempLastWrite < localLastWrite);
593
        }
594 498

  
595 499
        private void UploadCloudFile(FileInfo fileInfo, string hash,string topHash)
596 500
        {

Also available in: Unified diff