Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Downloader.cs @ 43dd02a8

History | View | Annotate | Download (16.2 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics.Contracts;
5
using System.IO;
6
using System.Linq;
7
using System.Reflection;
8
using System.Threading;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export(typeof(Downloader))]
17
    public class Downloader
18
    {
19
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
20

    
21
        [Import]
22
        private IStatusKeeper StatusKeeper { get; set; }
23

    
24
        
25
        public IStatusNotification StatusNotification { get; set; }
26

    
27
/*
28
        private CancellationTokenSource _cts=new CancellationTokenSource();
29

    
30
        public void SignalStop()
31
        {
32
            _cts.Cancel();
33
        }
34
*/
35

    
36

    
37
        //Download a file.
38
        public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
39
        {
40
            if (accountInfo == null)
41
                throw new ArgumentNullException("accountInfo");
42
            if (cloudFile == null)
43
                throw new ArgumentNullException("cloudFile");
44
            if (String.IsNullOrWhiteSpace(cloudFile.Account))
45
                throw new ArgumentNullException("cloudFile");
46
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
47
                throw new ArgumentNullException("cloudFile");
48
            if (String.IsNullOrWhiteSpace(filePath))
49
                throw new ArgumentNullException("filePath");
50
            if (!Path.IsPathRooted(filePath))
51
                throw new ArgumentException("The filePath must be rooted", "filePath");
52
            Contract.EndContractBlock();
53
                using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
54
                {
55
                   // var cancellationToken=_cts.Token;//  .ThrowIfCancellationRequested();
56

    
57
                    if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
58
                        return;
59

    
60

    
61
                    var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
62
                    var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
63

    
64
                    var url = relativeUrl.ToString();
65
                    if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
66
                        return;
67

    
68
                    if (!Selectives.IsSelected(accountInfo,cloudFile))
69
                        return;
70

    
71

    
72
                    //Are we already downloading or uploading the file? 
73
                    using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
74
                    {
75
                        if (gate.Failed)
76
                            return;
77

    
78
                        var client = new CloudFilesClient(accountInfo);
79
                        var account = cloudFile.Account;
80
                        var container = cloudFile.Container;
81

    
82
                        if (cloudFile.IsDirectory)
83
                        {
84
                            if (!Directory.Exists(localPath))
85
                                try
86
                                {
87
                                    Directory.CreateDirectory(localPath);
88
                                    if (Log.IsDebugEnabled)
89
                                        Log.DebugFormat("Created Directory [{0}]", localPath);
90
                                }
91
                                catch (IOException)
92
                                {
93
                                    var localInfo = new FileInfo(localPath);
94
                                    if (localInfo.Exists && localInfo.Length == 0)
95
                                    {
96
                                        Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
97
                                        localInfo.Delete();
98
                                        Directory.CreateDirectory(localPath);
99
                                        if (Log.IsDebugEnabled)
100
                                            Log.DebugFormat("Created Directory [{0}]", localPath);
101
                                    }
102
                                }
103
                        }
104
                        else
105
                        {
106
                            var isChanged = IsObjectChanged(cloudFile, localPath);
107
                            if (isChanged)
108
                            {
109
                                //Retrieve the hashmap from the server
110
                                var serverHash = await client.GetHashMap(account, container, url);
111
                                //If it's a small file
112
                                if (serverHash.Hashes.Count == 1)
113
                                    //Download it in one go
114
                                    await
115
                                        DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
116
                                    //Otherwise download it block by block
117
                                else
118
                                    await
119
                                        DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
120
                                                           serverHash,cancellationToken);
121

    
122
                                if (!cloudFile.IsWritable(accountInfo.UserName))
123
                                {
124
                                    var attributes = File.GetAttributes(localPath);
125
                                    File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
126
                                }
127
                            }
128
                        }
129

    
130
                        //Now we can store the object's metadata without worrying about ghost status entries
131
                        StatusKeeper.StoreInfo(localPath, cloudFile);
132

    
133
                    }
134
                }
135
           
136
        }
137

    
138
        //Download a file asynchronously using blocks
139
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
140
        {
141
            if (client == null)
142
                throw new ArgumentNullException("client");
143
            if (cloudFile == null)
144
                throw new ArgumentNullException("cloudFile");
145
            if (relativeUrl == null)
146
                throw new ArgumentNullException("relativeUrl");
147
            if (String.IsNullOrWhiteSpace(filePath))
148
                throw new ArgumentNullException("filePath");
149
            if (!Path.IsPathRooted(filePath))
150
                throw new ArgumentException("The filePath must be rooted", "filePath");
151
            if (serverHash == null)
152
                throw new ArgumentNullException("serverHash");
153
            if (cloudFile.IsDirectory)
154
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
155
            Contract.EndContractBlock();
156

    
157
            if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
158
                return;
159

    
160
            var fileAgent = GetFileAgent(accountInfo);
161
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
162

    
163
            //Calculate the relative file path for the new file
164
            var relativePath = relativeUrl.RelativeUriToFilePath();
165
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
166

    
167
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
168
            //Calculate the file's treehash
169

    
170
            //TODO: Should pass cancellation token here
171
            var treeHash = await Signature.CalculateTreeHashAsync(localPath, (int)serverHash.BlockSize, serverHash.BlockHash, 2);
172

    
173
            //And compare it with the server's hash
174
            var upHashes = serverHash.GetHashesAsStrings();
175
            var localHashes = treeHash.HashDictionary;
176
            ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes);
177
            for (var i = 0; i < upHashes.Length; i++)
178
            {
179
                if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
180
                    return;
181

    
182
                //For every non-matching hash
183
                var upHash = upHashes[i];
184
                if (!localHashes.ContainsKey(upHash))
185
                {
186
                    StatusNotification.Notify(new CloudNotification { Data = cloudFile });
187
                    ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes);
188

    
189
                    if (blockUpdater.UseOrphan(i, upHash))
190
                    {
191
                        Log.InfoFormat("[BLOCK GET] ORPHAN FOUND for {0} of {1} for {2}", i, upHashes.Length, localPath);
192
                        continue;
193
                    }
194
                    Log.InfoFormat("[BLOCK GET] START {0} of {1} for {2}", i, upHashes.Length, localPath);
195
                    var start = i * serverHash.BlockSize;
196
                    //To download the last block just pass a null for the end of the range
197
                    long? end = null;
198
                    if (i < upHashes.Length - 1)
199
                        end = ((i + 1) * serverHash.BlockSize);
200

    
201
                    //TODO: Pass token here
202
                    //Download the missing block
203
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
204

    
205
                    //and store it
206
                    blockUpdater.StoreBlock(i, block);
207

    
208

    
209
                    Log.InfoFormat("[BLOCK GET] FINISH {0} of {1} for {2}", i, upHashes.Length, localPath);
210
                }
211
                ReportDownloadProgress(Path.GetFileName(localPath), i, upHashes.Length, cloudFile.Bytes);
212
            }
213

    
214
            //Want to avoid notifications if no changes were made
215
            var hasChanges = blockUpdater.HasBlocks;
216
            blockUpdater.Commit();
217

    
218
            if (hasChanges)
219
                //Notify listeners that a local file has changed
220
                StatusNotification.NotifyChangedFile(localPath);
221

    
222
            Log.InfoFormat("[BLOCK GET] COMPLETE {0}", localPath);
223
        }
224

    
225
        //Download a small file with a single GET operation
226
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
227
        {
228
            if (client == null)
229
                throw new ArgumentNullException("client");
230
            if (cloudFile == null)
231
                throw new ArgumentNullException("cloudFile");
232
            if (relativeUrl == null)
233
                throw new ArgumentNullException("relativeUrl");
234
            if (String.IsNullOrWhiteSpace(filePath))
235
                throw new ArgumentNullException("filePath");
236
            if (!Path.IsPathRooted(filePath))
237
                throw new ArgumentException("The localPath must be rooted", "filePath");
238
            if (cloudFile.IsDirectory)
239
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
240
            Contract.EndContractBlock();
241

    
242
            if (await WaitOrAbort(accountInfo,cloudFile, cancellationToken))
243
                return;
244

    
245
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
246
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
247
            StatusNotification.Notify(new CloudNotification { Data = cloudFile });
248
            ReportDownloadProgress(Path.GetFileName(localPath), 1, 1, cloudFile.Bytes);
249

    
250
            var fileAgent = GetFileAgent(accountInfo);
251
            //Calculate the relative file path for the new file
252
            var relativePath = relativeUrl.RelativeUriToFilePath();
253
            //The file will be stored in a temporary location while downloading with an extension .download
254
            var tempPath = Path.Combine(fileAgent.CachePath, relativePath + ".download");
255
            //Make sure the target folder exists. DownloadFileTask will not create the folder
256
            var tempFolder = Path.GetDirectoryName(tempPath);
257
            if (!Directory.Exists(tempFolder))
258
                Directory.CreateDirectory(tempFolder);
259

    
260
            //TODO: Should pass the token here
261

    
262
            //Download the object to the temporary location
263
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
264

    
265
            //Create the local folder if it doesn't exist (necessary for shared objects)
266
            var localFolder = Path.GetDirectoryName(localPath);
267
            if (!Directory.Exists(localFolder))
268
                try
269
                {
270
                    Directory.CreateDirectory(localFolder);
271
                }
272
                catch (IOException)
273
                {
274
                    //A file may already exist that has the same name as the new folder.
275
                    //This may be an artifact of the way Pithos handles directories
276
                    var fileInfo = new FileInfo(localFolder);
277
                    if (fileInfo.Exists && fileInfo.Length == 0)
278
                    {
279
                        Log.WarnFormat("Malformed directory object detected for [{0}]", localFolder);
280
                        fileInfo.Delete();
281
                        Directory.CreateDirectory(localFolder);
282
                    }
283
                    else
284
                        throw;
285
                }
286
            //And move it to its actual location once downloading is finished
287
            if (File.Exists(localPath))
288
                File.Replace(tempPath, localPath, null, true);
289
            else
290
                File.Move(tempPath, localPath);
291
            //Notify listeners that a local file has changed
292
            StatusNotification.NotifyChangedFile(localPath);
293

    
294

    
295
        }
296

    
297

    
298
        private void ReportDownloadProgress(string fileName, int block, int totalBlocks, long fileSize)
299
        {
300
            StatusNotification.Notify(totalBlocks == 0
301
                                          ? new ProgressNotification(fileName, "Downloading", 1, 1, fileSize)
302
                                          : new ProgressNotification(fileName, "Downloading", block, totalBlocks, fileSize));
303
        }
304

    
305
        private bool IsObjectChanged(ObjectInfo cloudFile, string localPath)
306
        {
307
            //If the target is a directory, there are no changes to download
308
            if (Directory.Exists(localPath))
309
                return false;
310
            //If the file doesn't exist, we have a chagne
311
            if (!File.Exists(localPath))
312
                return true;
313
            //If there is no stored state, we have a change
314
            var localState = StatusKeeper.GetStateByFilePath(localPath);
315
            if (localState == null)
316
                return true;
317

    
318
            var info = new FileInfo(localPath);
319
            var shortHash = info.ComputeShortHash();
320
            //If the file is different from the stored state, we have a change
321
            if (localState.ShortHash != shortHash)
322
                return true;
323
            //If the top hashes differ, we have a change
324
            return (localState.Checksum != cloudFile.Hash);
325
        }
326

    
327
        private static FileAgent GetFileAgent(AccountInfo accountInfo)
328
        {
329
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
330
        }
331

    
332
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
333
        {
334
            token.ThrowIfCancellationRequested();
335
            await UnpauseEvent.WaitAsync();
336
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
337
            if (shouldAbort)
338
                Log.InfoFormat("Aborting [{0}]", cloudFile.Uri);
339
            return shouldAbort;
340
        }
341

    
342
        [Import]
343
        public Selectives Selectives { get; set; }
344

    
345
        public AsyncManualResetEvent UnpauseEvent { get; set; }
346
    }
347
}