Revision 2341c603 trunk/Pithos.Core/Agents/Uploader.cs

b/trunk/Pithos.Core/Agents/Uploader.cs
1 1
using System;
2
using System.Collections.Generic;
2 3
using System.ComponentModel.Composition;
3 4
using System.Diagnostics;
4 5
using System.Diagnostics.Contracts;
5 6
using System.IO;
7
using System.Linq;
6 8
using System.Net;
7 9
using System.Reflection;
10
using System.Threading;
8 11
using System.Threading.Tasks;
9 12
using Pithos.Interfaces;
10 13
using Pithos.Network;
......
23 26
        
24 27
        public IStatusNotification StatusNotification { get; set; }
25 28

  
26
        public async Task UploadCloudFile(CloudAction action)
29
        
30
        //CancellationTokenSource _cts = new CancellationTokenSource();
31
        /*public void SignalStop()
32
        {
33
            _cts.Cancel();
34
        }*/
35

  
36
        public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken)
27 37
        {
28 38
            if (action == null)
29 39
                throw new ArgumentNullException("action");
......
33 43
            {
34 44
                try
35 45
                {
46
                    await UnpauseEvent.WaitAsync();
47

  
36 48
                    var fileInfo = action.LocalFile;
37 49

  
38 50
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
39 51
                        return;
40 52

  
53
                    if (Selectives.IsSelected(action.AccountInfo, fileInfo))
54
                        return;
55

  
41 56
                    //Try to load the action's local state, if it is empty
42 57
                    if (action.FileState == null)
43 58
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
......
106 121

  
107 122
                            }
108 123

  
124
                            await UnpauseEvent.WaitAsync();
125

  
109 126
                            if (fileInfo is DirectoryInfo)
110 127
                            {
111 128
                                //If the directory doesn't exist the Hash property will be empty
......
144 161
                                //Upload even small files using the Hashmap. The server may already contain
145 162
                                //the relevant block                                
146 163

  
147
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash);
164
                                
165

  
166
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken);
148 167
                            }
149 168
                            //If everything succeeds, change the file and overlay status to normal
150 169
                            StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
......
222 241
        }
223 242

  
224 243

  
225
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash)
244
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
226 245
        {
227 246
            if (accountInfo == null)
228 247
                throw new ArgumentNullException("accountInfo");
......
238 257
                throw new ArgumentException("Invalid container", "cloudFile");
239 258
            Contract.EndContractBlock();
240 259

  
260
           
241 261
            using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name))
242 262
            {
243

  
263
                token.ThrowIfCancellationRequested();
264
                await UnpauseEvent.WaitAsync();
265
           
244 266
                var fullFileName = fileInfo.GetProperCapitalization();
245 267

  
246 268
                var account = cloudFile.Account ?? accountInfo.UserName;
247 269
                var container = cloudFile.Container;
248 270

  
271

  
249 272
                var client = new CloudFilesClient(accountInfo);
250 273
                //Send the hashmap to the server            
251 274
                var missingHashes = await client.PutHashMap(account, container, url, treeHash);
......
255 278
                while (missingHashes.Count > 0)
256 279
                {
257 280

  
281
                    token.ThrowIfCancellationRequested();
282
                    await UnpauseEvent.WaitAsync();
283

  
258 284
                    var buffer = new byte[accountInfo.BlockSize];
259 285
                    foreach (var missingHash in missingHashes)
260 286
                    {
287
                        token.ThrowIfCancellationRequested();
288
                        await UnpauseEvent.WaitAsync();
289

  
261 290
                        //Find the proper block
262 291
                        var blockIndex = treeHash.HashDictionary[missingHash];
263 292
                        long offset = blockIndex*accountInfo.BlockSize;
......
277 306
                        ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
278 307
                    }
279 308

  
309
                    token.ThrowIfCancellationRequested();
280 310
                    //Repeat until there are no more missing hashes                
281 311
                    missingHashes = await client.PutHashMap(account, container, url, treeHash);
282 312
                }
......
308 338
            }
309 339
            return false;
310 340
        }
341

  
342
        [Import]
343
        public Selectives Selectives { get; set; }
344

  
345
        public AsyncManualResetEvent UnpauseEvent { get; set; }
311 346
    }
312 347
}

Also available in: Unified diff