Modifications to enable Sync Pausing for all operations
[pithos-ms-client] / trunk / Pithos.Core / Agents / Downloader.cs
1 using System;
2 using System.Collections.Generic;
3 using System.ComponentModel.Composition;
4 using System.Diagnostics.Contracts;
5 using System.IO;
6 using System.Linq;
7 using System.Reflection;
8 using System.Threading;
9 using System.Threading.Tasks;
10 using Pithos.Interfaces;
11 using Pithos.Network;
12 using log4net;
13
14 namespace Pithos.Core.Agents
15 {
16     [Export(typeof(Downloader))]
17     public class Downloader
18     {
19         private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
20
21         [Import]
22         private IStatusKeeper StatusKeeper { get; set; }
23
24         
25         public IStatusNotification StatusNotification { get; set; }
26
27 /*
28         private CancellationTokenSource _cts=new CancellationTokenSource();
29
30         public void SignalStop()
31         {
32             _cts.Cancel();
33         }
34 */
35
36
37         //Download a file.
38         public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
39         {
40             if (accountInfo == null)
41                 throw new ArgumentNullException("accountInfo");
42             if (cloudFile == null)
43                 throw new ArgumentNullException("cloudFile");
44             if (String.IsNullOrWhiteSpace(cloudFile.Account))
45                 throw new ArgumentNullException("cloudFile");
46             if (String.IsNullOrWhiteSpace(cloudFile.Container))
47                 throw new ArgumentNullException("cloudFile");
48             if (String.IsNullOrWhiteSpace(filePath))
49                 throw new ArgumentNullException("filePath");
50             if (!Path.IsPathRooted(filePath))
51                 throw new ArgumentException("The filePath must be rooted", "filePath");
52             Contract.EndContractBlock();
53                 using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
54                 {
55                    // var cancellationToken=_cts.Token;//  .ThrowIfCancellationRequested();
56
57                     cancellationToken.ThrowIfCancellationRequested();
58                     await UnpauseEvent.WaitAsync();
59
60
61                     var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
62                     var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
63
64                     var url = relativeUrl.ToString();
65                     if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
66                         return;
67
68                     if (!Selectives.IsSelected(cloudFile))
69                         return;
70
71
72                     //Are we already downloading or uploading the file? 
73                     using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
74                     {
75                         if (gate.Failed)
76                             return;
77
78                         var client = new CloudFilesClient(accountInfo);
79                         var account = cloudFile.Account;
80                         var container = cloudFile.Container;
81
82                         if (cloudFile.IsDirectory)
83                         {
84                             if (!Directory.Exists(localPath))
85                                 try
86                                 {
87                                     Directory.CreateDirectory(localPath);
88                                     if (Log.IsDebugEnabled)
89                                         Log.DebugFormat("Created Directory [{0}]", localPath);
90                                 }
91                                 catch (IOException)
92                                 {
93                                     var localInfo = new FileInfo(localPath);
94                                     if (localInfo.Exists && localInfo.Length == 0)
95                                     {
96                                         Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
97                                         localInfo.Delete();
98                                         Directory.CreateDirectory(localPath);
99                                         if (Log.IsDebugEnabled)
100                                             Log.DebugFormat("Created Directory [{0}]", localPath);
101                                     }
102                                 }
103                         }
104                         else
105                         {
106                             var isChanged = IsObjectChanged(cloudFile, localPath);
107                             if (isChanged)
108                             {
109                                 //Retrieve the hashmap from the server
110                                 var serverHash = await client.GetHashMap(account, container, url);
111                                 //If it's a small file
112                                 if (serverHash.Hashes.Count == 1)
113                                     //Download it in one go
114                                     await
115                                         DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
116                                     //Otherwise download it block by block
117                                 else
118                                     await
119                                         DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
120                                                            serverHash,cancellationToken);
121
122                                 if (!cloudFile.IsWritable(accountInfo.UserName))
123                                 {
124                                     var attributes = File.GetAttributes(localPath);
125                                     File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
126                                 }
127                             }
128                         }
129
130                         //Now we can store the object's metadata without worrying about ghost status entries
131                         StatusKeeper.StoreInfo(localPath, cloudFile);
132
133                     }
134                 }
135            
136         }
137
138         //Download a file asynchronously using blocks
139         public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
140         {
141             if (client == null)
142                 throw new ArgumentNullException("client");
143             if (cloudFile == null)
144                 throw new ArgumentNullException("cloudFile");
145             if (relativeUrl == null)
146                 throw new ArgumentNullException("relativeUrl");
147             if (String.IsNullOrWhiteSpace(filePath))
148                 throw new ArgumentNullException("filePath");
149             if (!Path.IsPathRooted(filePath))
150                 throw new ArgumentException("The filePath must be rooted", "filePath");
151             if (serverHash == null)
152                 throw new ArgumentNullException("serverHash");
153             if (cloudFile.IsDirectory)
154                 throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
155             Contract.EndContractBlock();
156
157             cancellationToken.ThrowIfCancellationRequested();
158             await UnpauseEvent.WaitAsync();
159
160             var fileAgent = GetFileAgent(accountInfo);
161             var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
162
163             //Calculate the relative file path for the new file
164             var relativePath = relativeUrl.RelativeUriToFilePath();
165             var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
166
167             StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
168             //Calculate the file's treehash
169
170             //TODO: Should pass cancellation token here
171             var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
172
173             //And compare it with the server's hash
174             var upHashes = serverHash.GetHashesAsStrings();
175             var localHashes = treeHash.HashDictionary;
176             ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes);
177             for (int i = 0; i < upHashes.Length; i++)
178             {
179                 cancellationToken.ThrowIfCancellationRequested();
180                 await UnpauseEvent.WaitAsync();
181
182                 //For every non-matching hash
183                 var upHash = upHashes[i];
184                 if (!localHashes.ContainsKey(upHash))
185                 {
186                     StatusNotification.Notify(new CloudNotification { Data = cloudFile });
187
188                     if (blockUpdater.UseOrphan(i, upHash))
189                     {
190                         Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
191                         continue;
192                     }
193                     Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
194                     var start = i * serverHash.BlockSize;
195                     //To download the last block just pass a null for the end of the range
196                     long? end = null;
197                     if (i < upHashes.Length - 1)
198                         end = ((i + 1) * serverHash.BlockSize);
199
200                     //TODO: Pass token here
201                     //Download the missing block
202                     var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
203
204                     //and store it
205                     blockUpdater.StoreBlock(i, block);
206
207
208                     Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
209                 }
210                 ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes);
211             }
212
213             //Want to avoid notifications if no changes were made
214             var hasChanges = blockUpdater.HasBlocks;
215             blockUpdater.Commit();
216
217             if (hasChanges)
218                 //Notify listeners that a local file has changed
219                 StatusNotification.NotifyChangedFile(localPath);
220
221             Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
222         }
223
224         //Download a small file with a single GET operation
225         private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
226         {
227             if (client == null)
228                 throw new ArgumentNullException("client");
229             if (cloudFile == null)
230                 throw new ArgumentNullException("cloudFile");
231             if (relativeUrl == null)
232                 throw new ArgumentNullException("relativeUrl");
233             if (String.IsNullOrWhiteSpace(filePath))
234                 throw new ArgumentNullException("filePath");
235             if (!Path.IsPathRooted(filePath))
236                 throw new ArgumentException("The localPath must be rooted", "filePath");
237             if (cloudFile.IsDirectory)
238                 throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
239             Contract.EndContractBlock();
240
241             cancellationToken.ThrowIfCancellationRequested();
242             await UnpauseEvent.WaitAsync();
243
244             var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
245             StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
246             StatusNotification.Notify(new CloudNotification { Data = cloudFile });
247
248             var fileAgent = GetFileAgent(accountInfo);
249             //Calculate the relative file path for the new file
250             var relativePath = relativeUrl.RelativeUriToFilePath();
251             //The file will be stored in a temporary location while downloading with an extension .download
252             var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
253             //Make sure the target folder exists. DownloadFileTask will not create the folder
254             var tempFolder = Path.GetDirectoryName(tempPath);
255             if (!Directory.Exists(tempFolder))
256                 Directory.CreateDirectory(tempFolder);
257
258             //TODO: Should pass the token here
259
260             //Download the object to the temporary location
261             await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
262
263             //Create the local folder if it doesn't exist (necessary for shared objects)
264             var localFolder = Path.GetDirectoryName(localPath);
265             if (!Directory.Exists(localFolder))
266                 try
267                 {
268                     Directory.CreateDirectory(localFolder);
269                 }
270                 catch (IOException)
271                 {
272                     //A file may already exist that has the same name as the new folder.
273                     //This may be an artifact of the way Pithos handles directories
274                     var fileInfo = new FileInfo(localFolder);
275                     if (fileInfo.Exists && fileInfo.Length == 0)
276                     {
277                         Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder);
278                         fileInfo.Delete();
279                         Directory.CreateDirectory(localFolder);
280                     }
281                     else
282                         throw;
283                 }
284             //And move it to its actual location once downloading is finished
285             if (File.Exists(localPath))
286                 File.Replace(tempPath, localPath, null, true);
287             else
288                 File.Move(tempPath, localPath);
289             //Notify listeners that a local file has changed
290             StatusNotification.NotifyChangedFile(localPath);
291
292
293         }
294
295
296         private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize)
297         {
298             StatusNotification.Notify(totalBlocks == 0
299                                           ? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize)
300                                           : new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize));
301         }
302
303         private bool IsObjectChanged(ObjectInfo cloudFile, string localPath)
304         {
305             //If the target is a directory, there are no changes to download
306             if (Directory.Exists(localPath))
307                 return false;
308             //If the file doesn't exist, we have a chagne
309             if (!File.Exists(localPath))
310                 return true;
311             //If there is no stored state, we have a change
312             var localState = StatusKeeper.GetStateByFilePath(localPath);
313             if (localState == null)
314                 return true;
315
316             var info = new FileInfo(localPath);
317             var shortHash = info.ComputeShortHash();
318             //If the file is different from the stored state, we have a change
319             if (localState.ShortHash != shortHash)
320                 return true;
321             //If the top hashes differ, we have a change
322             return (localState.Checksum != cloudFile.Hash);
323         }
324
325         private static FileAgent GetFileAgent(AccountInfo accountInfo)
326         {
327             return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
328         }
329
330         [Import]
331         public Selectives Selectives { get; set; }
332
333         public AsyncManualResetEvent UnpauseEvent { get; set; }
334     }
335 }