Revision 2341c603 trunk/Pithos.Core/Agents/Downloader.cs

b/trunk/Pithos.Core/Agents/Downloader.cs
1 1
using System;
2
using System.Collections.Generic;
2 3
using System.ComponentModel.Composition;
3 4
using System.Diagnostics.Contracts;
4 5
using System.IO;
6
using System.Linq;
5 7
using System.Reflection;
8
using System.Threading;
6 9
using System.Threading.Tasks;
7 10
using Pithos.Interfaces;
8 11
using Pithos.Network;
......
21 24
        
22 25
        public IStatusNotification StatusNotification { get; set; }
23 26

  
27
/*
28
        private CancellationTokenSource _cts=new CancellationTokenSource();
29

  
30
        public void SignalStop()
31
        {
32
            _cts.Cancel();
33
        }
34
*/
35

  
36

  
24 37
        //Download a file.
25
        public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath)
38
        public async Task DownloadCloudFile(AccountInfo accountInfo, ObjectInfo cloudFile, string filePath,CancellationToken cancellationToken)
26 39
        {
27 40
            if (accountInfo == null)
28 41
                throw new ArgumentNullException("accountInfo");
......
37 50
            if (!Path.IsPathRooted(filePath))
38 51
                throw new ArgumentException("The filePath must be rooted", "filePath");
39 52
            Contract.EndContractBlock();
53
                using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
54
                {
55
                   // var cancellationToken=_cts.Token;//  .ThrowIfCancellationRequested();
40 56

  
41
            using (ThreadContext.Stacks["Operation"].Push("DownloadCloudFile"))
42
            {
57
                    cancellationToken.ThrowIfCancellationRequested();
58
                    await UnpauseEvent.WaitAsync();
43 59

  
44
                var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
45
                var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
46 60

  
47
                var url = relativeUrl.ToString();
48
                if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
49
                    return;
61
                    var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
62
                    var relativeUrl = new Uri(cloudFile.Name, UriKind.Relative);
50 63

  
64
                    var url = relativeUrl.ToString();
65
                    if (cloudFile.Name.EndsWith(".ignore", StringComparison.InvariantCultureIgnoreCase))
66
                        return;
51 67

  
52
                //Are we already downloading or uploading the file? 
53
                using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
54
                {
55
                    if (gate.Failed)
68
                    if (!Selectives.IsSelected(cloudFile))
56 69
                        return;
57 70

  
58
                    var client = new CloudFilesClient(accountInfo);
59
                    var account = cloudFile.Account;
60
                    var container = cloudFile.Container;
61 71

  
62
                    if (cloudFile.IsDirectory)
72
                    //Are we already downloading or uploading the file? 
73
                    using (var gate = NetworkGate.Acquire(localPath, NetworkOperation.Downloading))
63 74
                    {
64
                        if (!Directory.Exists(localPath))
65
                            try
66
                            {
67
                                Directory.CreateDirectory(localPath);
68
                                if (Log.IsDebugEnabled)
69
                                    Log.DebugFormat("Created Directory [{0}]", localPath);
70
                            }
71
                            catch (IOException)
72
                            {
73
                                var localInfo = new FileInfo(localPath);
74
                                if (localInfo.Exists && localInfo.Length == 0)
75
                        if (gate.Failed)
76
                            return;
77

  
78
                        var client = new CloudFilesClient(accountInfo);
79
                        var account = cloudFile.Account;
80
                        var container = cloudFile.Container;
81

  
82
                        if (cloudFile.IsDirectory)
83
                        {
84
                            if (!Directory.Exists(localPath))
85
                                try
75 86
                                {
76
                                    Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
77
                                    localInfo.Delete();
78 87
                                    Directory.CreateDirectory(localPath);
79 88
                                    if (Log.IsDebugEnabled)
80 89
                                        Log.DebugFormat("Created Directory [{0}]", localPath);
81 90
                                }
82
                            }
83
                    }
84
                    else
85
                    {
86
                        var isChanged = IsObjectChanged(cloudFile, localPath);
87
                        if (isChanged)
91
                                catch (IOException)
92
                                {
93
                                    var localInfo = new FileInfo(localPath);
94
                                    if (localInfo.Exists && localInfo.Length == 0)
95
                                    {
96
                                        Log.WarnFormat("Malformed directory object detected for [{0}]", localPath);
97
                                        localInfo.Delete();
98
                                        Directory.CreateDirectory(localPath);
99
                                        if (Log.IsDebugEnabled)
100
                                            Log.DebugFormat("Created Directory [{0}]", localPath);
101
                                    }
102
                                }
103
                        }
104
                        else
88 105
                        {
89
                            //Retrieve the hashmap from the server
90
                            var serverHash = await client.GetHashMap(account, container, url);
91
                            //If it's a small file
92
                            if (serverHash.Hashes.Count == 1)
93
                                //Download it in one go
94
                                await
95
                                    DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath);
96
                            //Otherwise download it block by block
97
                            else
98
                                await
99
                                    DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
100
                                                       serverHash);
101

  
102
                            if (!cloudFile.IsWritable(accountInfo.UserName))
106
                            var isChanged = IsObjectChanged(cloudFile, localPath);
107
                            if (isChanged)
103 108
                            {
104
                                var attributes = File.GetAttributes(localPath);
105
                                File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
109
                                //Retrieve the hashmap from the server
110
                                var serverHash = await client.GetHashMap(account, container, url);
111
                                //If it's a small file
112
                                if (serverHash.Hashes.Count == 1)
113
                                    //Download it in one go
114
                                    await
115
                                        DownloadEntireFileAsync(accountInfo, client, cloudFile, relativeUrl, localPath,cancellationToken);
116
                                    //Otherwise download it block by block
117
                                else
118
                                    await
119
                                        DownloadWithBlocks(accountInfo, client, cloudFile, relativeUrl, localPath,
120
                                                           serverHash,cancellationToken);
121

  
122
                                if (!cloudFile.IsWritable(accountInfo.UserName))
123
                                {
124
                                    var attributes = File.GetAttributes(localPath);
125
                                    File.SetAttributes(localPath, attributes | FileAttributes.ReadOnly);
126
                                }
106 127
                            }
107 128
                        }
108
                    }
109 129

  
110
                    //Now we can store the object's metadata without worrying about ghost status entries
111
                    StatusKeeper.StoreInfo(localPath, cloudFile);
130
                        //Now we can store the object's metadata without worrying about ghost status entries
131
                        StatusKeeper.StoreInfo(localPath, cloudFile);
112 132

  
133
                    }
113 134
                }
114
            }
135
           
115 136
        }
116 137

  
117 138
        //Download a file asynchronously using blocks
118
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash)
139
        public async Task DownloadWithBlocks(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, TreeHash serverHash, CancellationToken cancellationToken)
119 140
        {
120 141
            if (client == null)
121 142
                throw new ArgumentNullException("client");
......
133 154
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
134 155
            Contract.EndContractBlock();
135 156

  
157
            cancellationToken.ThrowIfCancellationRequested();
158
            await UnpauseEvent.WaitAsync();
159

  
136 160
            var fileAgent = GetFileAgent(accountInfo);
137 161
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
138 162

  
......
140 164
            var relativePath = relativeUrl.RelativeUriToFilePath();
141 165
            var blockUpdater = new BlockUpdater(fileAgent.CachePath, localPath, relativePath, serverHash);
142 166

  
143

  
144 167
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Calculating hashmap for {0} before download", Path.GetFileName(localPath)));
145 168
            //Calculate the file's treehash
169

  
170
            //TODO: Should pass cancellation token here
146 171
            var treeHash = await Signature.CalculateTreeHashAsync(localPath, serverHash.BlockSize, serverHash.BlockHash, 2);
147 172

  
148 173
            //And compare it with the server's hash
......
151 176
            ReportDownloadProgress(Path.GetFileName(localPath), 0, upHashes.Length, cloudFile.Bytes);
152 177
            for (int i = 0; i < upHashes.Length; i++)
153 178
            {
179
                cancellationToken.ThrowIfCancellationRequested();
180
                await UnpauseEvent.WaitAsync();
181

  
154 182
                //For every non-matching hash
155 183
                var upHash = upHashes[i];
156 184
                if (!localHashes.ContainsKey(upHash))
......
169 197
                    if (i < upHashes.Length - 1)
170 198
                        end = ((i + 1) * serverHash.BlockSize);
171 199

  
200
                    //TODO: Pass token here
172 201
                    //Download the missing block
173
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end);
202
                    var block = await client.GetBlock(cloudFile.Account, cloudFile.Container, relativeUrl, start, end,cancellationToken);
174 203

  
175 204
                    //and store it
176 205
                    blockUpdater.StoreBlock(i, block);
......
193 222
        }
194 223

  
195 224
        //Download a small file with a single GET operation
196
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath)
225
        private async Task DownloadEntireFileAsync(AccountInfo accountInfo, CloudFilesClient client, ObjectInfo cloudFile, Uri relativeUrl, string filePath, CancellationToken cancellationToken)
197 226
        {
198 227
            if (client == null)
199 228
                throw new ArgumentNullException("client");
......
209 238
                throw new ArgumentException("cloudFile is a directory, not a file", "cloudFile");
210 239
            Contract.EndContractBlock();
211 240

  
241
            cancellationToken.ThrowIfCancellationRequested();
242
            await UnpauseEvent.WaitAsync();
243

  
212 244
            var localPath = FileInfoExtensions.GetProperFilePathCapitalization(filePath);
213 245
            StatusNotification.SetPithosStatus(PithosStatus.LocalSyncing, String.Format("Downloading {0}", Path.GetFileName(localPath)));
214 246
            StatusNotification.Notify(new CloudNotification { Data = cloudFile });
......
223 255
            if (!Directory.Exists(tempFolder))
224 256
                Directory.CreateDirectory(tempFolder);
225 257

  
258
            //TODO: Should pass the token here
259

  
226 260
            //Download the object to the temporary location
227
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath);
261
            await client.GetObject(cloudFile.Account, cloudFile.Container, relativeUrl.ToString(), tempPath,cancellationToken);
228 262

  
229 263
            //Create the local folder if it doesn't exist (necessary for shared objects)
230 264
            var localFolder = Path.GetDirectoryName(localPath);
......
293 327
            return AgentLocator<FileAgent>.Get(accountInfo.AccountPath);
294 328
        }
295 329

  
330
        [Import]
331
        public Selectives Selectives { get; set; }
296 332

  
333
        public AsyncManualResetEvent UnpauseEvent { get; set; }
297 334
    }
298 335
}

Also available in: Unified diff