Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Downloader.cs @ bf4471a3

History | View | Annotate | Download (17.8 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics.Contracts;
5
using System.IO;
6
using System.Linq;
7
using System.Reflection;
8
using System.Threading;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export(typeof(Downloader))]
17
    public class Downloader
18
    {
19
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
20

    
21
        [Import]
22
        private IStatusKeeper StatusKeeper { get; set; }
23

    
24
        
25
        public IStatusNotification StatusNotification { get; set; }
26

    
27
/*
28
        private CancellationTokenSource _cts=new CancellationTokenSource();
29

    
30
        public void SignalStop()
31
        {
32
            _cts.Cancel();
33
        }
34
*/
35

    
36

    
37
        //Download a file.
38
        public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
39
        {
40
            if (accountInfo == null)
41
                throw new ArgumentNullException("accountInfo");
42
            if (cloudFile == null)
43
                throw new ArgumentNullException("cloudFile");
44
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
45
                throw new ArgumentNullException("cloudFile");
46
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
47
                throw new ArgumentNullException("cloudFile");
48
            if (String.IsNullOrWhiteSpace(filePath))
49
                throw new ArgumentNullException("filePath");
50
            if (!Path.IsPathRooted(filePath))
51
                throw new ArgumentException("The filePath must be rooted", "filePath");
52
            Contract.EndContractBlock();
53
                using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
54
                {
55
                   // var cancellationToken=_cts.Token;//  .ThrowIfCancellationRequested();
56

    
57
                    if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken).ConfigureAwait(false))
58
                        return;
59

    
60
                    var fileName = Path.GetFileName(filePath);
61
                    var progress = new Progress<double>(d =>
62
                        StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName))));
63

    
64

    
65
                    TreeHash localTreeHash;
66
                    
67
                    using (StatusNotification.GetNotifier("Hashing for Download {0}", "Hashed for Download {0}", fileName))
68
                    {
69

    
70
                        localTreeHash = Signature.CalculateTreeHashAsync(filePath,
71
                                                                            accountInfo.BlockSize,
72
                                                                            accountInfo.BlockHash, 1,progress);
73
                    }
74

    
75
                    var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
76
                    var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
77

    
78
                    var url = relativeUrl.ToString();
79
                    if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
80
                        return;
81

    
82
                    if (!Selectives.IsSelected(accountInfo,cloudFile))
83
                        return;
84

    
85

    
86
                    //Are we already downloading or uploading the file? 
87
                    using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
88
                    {
89
                        if (gate.Failed)
90
                            return;
91

    
92
                        var client = new CloudFilesClient(accountInfo);
93
                        var account = cloudFile.Account;
94
                        var container = cloudFile.Container;
95

    
96
                        if (cloudFile.IsDirectory)
97
                        {
98
                            if (!Directory.Exists(localPath))
99
                                try
100
                                {
101
                                    Directory.CreateDirectory(localPath);
102
                                    if (Log.IsDebugEnabled)
103
                                        Log.DebugFormat("Created Directory [{0}]", localPath);
104
                                }
105
                                catch (IOException)
106
                                {
107
                                    var localInfo = new FileInfo(localPath);
108
                                    if (localInfo.Exists && localInfo.Length == 0)
109
                                    {
110
                                        Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
111
                                        localInfo.Delete();
112
                                        Directory.CreateDirectory(localPath);
113
                                        if (Log.IsDebugEnabled)
114
                                            Log.DebugFormat("Created Directory [{0}]", localPath);
115
                                    }
116
                                }
117
                        }
118
                        else
119
                        {
120
                            var isChanged = IsObjectChanged(cloudFile, localPath);
121
                            if (isChanged)
122
                            {
123
                                //Retrieve the hashmap from the server
124
                                var serverHash = await client.GetHashMap(account, container, url).ConfigureAwait(false);
125
                                //If it's a small file
126
                                if (serverHash.Hashes.Count == 1)
127
                                    //Download it in one go
128
                                    await
129
                                        DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
130
                                    //Otherwise download it block by block
131
                                else
132
                                    await
133
                                        DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,localTreeHash,
134
                                                           serverHash,cancellationToken);
135

    
136
                                if (!cloudFile.IsWritable(accountInfo.UserName))
137
                                {
138
                                    var attributes = File.GetAttributes(localPath);
139
                                    File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
140
                                }
141
                            }
142
                        }
143

    
144
                        //Now we can store the object's metadata without worrying about ghost status entries
145
                        StatusKeeper.StoreInfo(localPath, cloudFile);
146

    
147
                    }
148
                }
149
           
150
        }
151

    
152
        //Download a file asynchronously using blocks
153
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash localTreeHash,TreeHash serverHash, CancellationToken cancellationToken)
154
        {
155
            if (client == null)
156
                throw new ArgumentNullException("client");
157
            if (cloudFile == null)
158
                throw new ArgumentNullException("cloudFile");
159
            if (relativeUrl == null)
160
                throw new ArgumentNullException("relativeUrl");
161
            if (String.IsNullOrWhiteSpace(filePath))
162
                throw new ArgumentNullException("filePath");
163
            if (!Path.IsPathRooted(filePath))
164
                throw new ArgumentException("The filePath must be rooted", "filePath");
165
            if (serverHash == null)
166
                throw new ArgumentNullException("serverHash");
167
            if (cloudFile.IsDirectory)
168
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
169
            Contract.EndContractBlock();
170

    
171
            if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false))
172
                return;
173

    
174
            var fileAgent = GetFileAgent(accountInfo);
175
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
176

    
177
            //Calculate the relative file path for the new file
178
            var relativePath = relativeUrl.RelativeUriToFilePath();
179
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
180

    
181
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
182
            //Calculate the file's treehash
183

    
184
            var fileName = Path.GetFileName(localPath);
185
            var progress = new Progress<double>(d =>
186
                StatusNotification.Notify(new StatusNotification(String.Format("Hashing for Download {0} of {1}", d, fileName))));
187

    
188
            //TODO: Should pass cancellation token here
189
            var treeHash = localTreeHash ?? Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, 2,progress);
190

    
191
            //And compare it with the server's hash
192
            var upHashes = serverHash.GetHashesAsStrings();
193
            var localHashes = treeHash.HashDictionary;
194
            ReportDownloadProgress(Path.GetFileName(localPath), 0,0, upHashes.Length, cloudFile.Bytes);
195

    
196

    
197
            long i = 0;
198
            client.DownloadProgressChanged += (sender, args) =>
199
                                ReportDownloadProgress(Path.GetFileName(localPath), i, args.ProgressPercentage, upHashes.Length, cloudFile.Bytes);
200

    
201

    
202
            for (i = 0; i < upHashes.Length; i++)
203
            {
204
                if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false))
205
                    return;
206

    
207
                //For every non-matching hash
208
                var upHash = upHashes[i];
209
                if (!localHashes.ContainsKey(upHash))
210
                {
211
                    StatusNotification.Notify(new CloudNotification { Data = cloudFile });
212
                    ReportDownloadProgress(Path.GetFileName(localPath), i, 0,upHashes.Length, cloudFile.Bytes);
213

    
214
                    if (blockUpdater.UseOrphan(i, upHash))
215
                    {
216
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
217
                        continue;
218
                    }
219
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
220
                    long start = i * serverHash.BlockSize;
221
                    //To download the last block just pass a null for the end of the range
222
                    long? end = null;
223
                    if (i < upHashes.Length - 1)
224
                        end = ((i + 1) * serverHash.BlockSize);
225

    
226
                    //TODO: Pass token here
227
                    //Download the missing block
228
                    byte[] block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end, cancellationToken).ConfigureAwait(false);
229

    
230
                    //and store it
231
                    blockUpdater.StoreBlock(i, block);
232

    
233

    
234
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
235
                }
236
                ReportDownloadProgress(Path.GetFileName(localPath), i, 100,upHashes.Length, cloudFile.Bytes);
237
            }
238

    
239
            //Want to avoid notifications if no changes were made
240
            var hasChanges = blockUpdater.HasBlocks;
241
            blockUpdater.Commit();
242

    
243
            if (hasChanges)
244
                //Notify listeners that a local file has changed
245
                StatusNotification.NotifyChangedFile(localPath);
246

    
247
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
248
        }
249

    
250
        //Download a small file with a single GET operation
251
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
252
        {
253
            if (client == null)
254
                throw new ArgumentNullException("client");
255
            if (cloudFile == null)
256
                throw new ArgumentNullException("cloudFile");
257
            if (relativeUrl == null)
258
                throw new ArgumentNullException("relativeUrl");
259
            if (String.IsNullOrWhiteSpace(filePath))
260
                throw new ArgumentNullException("filePath");
261
            if (!Path.IsPathRooted(filePath))
262
                throw new ArgumentException("The localPath must be rooted", "filePath");
263
            if (cloudFile.IsDirectory)
264
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
265
            Contract.EndContractBlock();
266

    
267
            if (await WaitOrAbort(accountInfo, cloudFile, cancellationToken).ConfigureAwait(false))
268
                return;
269

    
270
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
271
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
272
            StatusNotification.Notify(new CloudNotification { Data = cloudFile });
273
            ReportDownloadProgress(Path.GetFileName(localPath), 1, 0,1, cloudFile.Bytes);
274

    
275
            var fileAgent = GetFileAgent(accountInfo);
276
            //Calculate the relative file path for the new file
277
            var relativePath = relativeUrl.RelativeUriToFilePath();
278
            //The file will be stored in a temporary location while downloading with an extension .download
279
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
280
            //Make sure the target folder exists. DownloadFileTask will not create the folder
281
            var tempFolder = Path.GetDirectoryName(tempPath);
282
            if (!Directory.Exists(tempFolder))
283
                Directory.CreateDirectory(tempFolder);
284

    
285
            //TODO: Should pass the token here
286

    
287
            //Download the object to the temporary location
288
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath, cancellationToken).ConfigureAwait(false);
289

    
290
            //Create the local folder if it doesn't exist (necessary for shared objects)
291
            var localFolder = Path.GetDirectoryName(localPath);
292
            if (!Directory.Exists(localFolder))
293
                try
294
                {
295
                    Directory.CreateDirectory(localFolder);
296
                }
297
                catch (IOException)
298
                {
299
                    //A file may already exist that has the same name as the new folder.
300
                    //This may be an artifact of the way Pithos handles directories
301
                    var fileInfo = new FileInfo(localFolder);
302
                    if (fileInfo.Exists && fileInfo.Length == 0)
303
                    {
304
                        Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder);
305
                        fileInfo.Delete();
306
                        Directory.CreateDirectory(localFolder);
307
                    }
308
                    else
309
                        throw;
310
                }
311
            //And move it to its actual location once downloading is finished
312
            if (File.Exists(localPath))
313
                File.Replace(tempPath, localPath, null, true);
314
            else
315
                File.Move(tempPath, localPath);
316
            //Notify listeners that a local file has changed
317
            StatusNotification.NotifyChangedFile(localPath);
318

    
319

    
320
        }
321

    
322

    
323
        private void ReportDownloadProgress(string fileName, long block, int blockPercentage,int totalBlocks, long fileSize)
324
        {
325
            StatusNotification.Notify(totalBlocks == 0
326
                                          ? new ProgressNotification(fileName, "Downloading", 1, blockPercentage,1, fileSize)
327
                                          : new ProgressNotification(fileName, "Downloading", block, blockPercentage, totalBlocks, fileSize));
328
        }
329

    
330
        private bool IsObjectChanged(ObjectInfo cloudFile, string localPath)
331
        {
332
            //If the target is a directory, there are no changes to download
333
            if (Directory.Exists(localPath))
334
                return false;
335
            //If the file doesn't exist, we have a chagne
336
            if (!File.Exists(localPath))
337
                return true;
338
            //If there is no stored state, we have a change
339
            var localState = StatusKeeper.GetStateByFilePath(localPath);
340
            if (localState == null)
341
                return true;
342

    
343
            var info = new FileInfo(localPath);
344
            var etag = info.ComputeShortHash(StatusNotification);
345
            //If the file is different from the stored state, we have a change
346
            if (localState.ETag != etag)
347
                return true;
348
            //If the top hashes differ, we have a change
349
            return (localState.Checksum != cloudFile.X_Object_Hash);
350
        }
351

    
352
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
353
        {
354
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
355
        }
356

    
357
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
358
        {
359
            token.ThrowIfCancellationRequested();
360
            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
361
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
362
            if (shouldAbort)
363
                Log.InfoFormat("Aborting [{0}]", cloudFile.Uri);
364
            return shouldAbort;
365
        }
366

    
367
        [Import]
368
        public Selectives Selectives { get; set; }
369

    
370
        public AsyncManualResetEvent UnpauseEvent { get; set; }
371
    }
372
}