Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ 12c87c0e

History | View | Annotate | Download (20.4 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Reflection;
10
using System.Security.Cryptography;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export(typeof(Uploader))]
20
    public class Uploader
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        [Import]
25
        private IStatusKeeper StatusKeeper { get; set; }
26

    
27
        [Import]
28
        private IPithosSettings Settings { get; set; }
29

    
30
        public IStatusNotification StatusNotification { get; set; }
31

    
32
        
33
        //CancellationTokenSource _cts = new CancellationTokenSource();
34
        /*public void SignalStop()
35
        {
36
            _cts.Cancel();
37
        }*/
38

    
39
        public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken)
40
        {
41
            if (action == null)
42
                throw new ArgumentNullException("action");
43
            Contract.EndContractBlock();
44

    
45
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
46
            {
47
                try
48
                {
49
                    await UnpauseEvent.WaitAsync().ConfigureAwait(false);
50

    
51
                    var fileInfo = action.LocalFile;
52

    
53
                    var progress=new Progress<double>(d=>
54
                        StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}",d,fileInfo.Name))));
55

    
56
                    TreeHash localTreeHash;
57
                    using (StatusNotification.GetNotifier("Merkle Hashing for Upload {0}", "Merkle Hashed for Upload {0}", fileInfo.Name))
58
                    {
59
                        localTreeHash = Signature.CalculateTreeHashAsync(fileInfo,
60
                                                                         action.AccountInfo.BlockSize,
61
                                                                         action.AccountInfo.BlockHash, Settings.HashingParallelism,cancellationToken,progress);
62
                    }
63

    
64
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
65
                        return;
66

    
67
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation)
68
                        return;
69

    
70
                    //Try to load the action's local state, if it is empty
71
                    if (action.FileState == null)
72
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
73
                    if (action.FileState != null)
74
                    {
75
                        /*
76
                                                Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
77
                                                return;
78
                        */
79

    
80

    
81
                        var latestState = action.FileState;
82

    
83
                        //Do not upload files in conflict
84
                        if (latestState.FileStatus == FileStatus.Conflict)
85
                        {
86
                            Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
87
                            return;
88
                        }
89
                        //Do not upload files when we have no permission
90
                        if (latestState.FileStatus == FileStatus.Forbidden)
91
                        {
92
                            Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
93
                            return;
94
                        }
95
                    }
96
                    //Are we targeting our own account or a sharer account?
97
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
98
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
99
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
100
                                                  : action.AccountInfo;
101

    
102

    
103

    
104
                    var fullFileName = fileInfo.GetProperCapitalization();
105
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
106
                    {
107
                        //Abort if the file is already being uploaded or downloaded
108
                        if (gate.Failed)
109
                            return;
110

    
111
                        var cloudFile = action.CloudFile;
112
                        var account = cloudFile.Account ?? accountInfo.UserName;
113
                        try
114
                        {
115

    
116
                            var client = new CloudFilesClient(accountInfo);
117

    
118
                            //Even if GetObjectInfo times out, we can proceed with the upload            
119
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
120

    
121
                            //If this a shared file
122
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
123
                            {
124
                                
125
/*
126
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
127
                                {
128
                                    MakeFileReadOnly(fullFileName);
129
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
130
                                    return;
131
                                }
132
*/
133

    
134
                                //If this is a read-only file, do not upload changes
135
                                if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
136
                                    //If the file is new, but we can't upload it
137
                                    (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) )
138
                                {
139
                                    MakeFileReadOnly(fullFileName);
140
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
141
                                    return;
142
                                }
143

    
144
                            }
145

    
146
                            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
147

    
148
                            fileInfo.Refresh();
149
                            //Does the file still exist or was it deleted/renamed?
150
                            if (fileInfo.Exists)
151
                            {
152
                                if (fileInfo is DirectoryInfo)
153
                                {
154
                                    //If the directory doesn't exist the Hash property will be empty
155
                                    if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash))
156
                                        //Go on and create the directory
157
                                        await
158
                                            client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
159
                                                             String.Empty, "application/directory");
160
                                    //If the upload is in response to a Folder create with Selective Sync enabled
161
                                    if (action.IsCreation)
162
                                    {
163
                                        //Add the folder to the Selected URls
164
                                        var selectiveUri = new Uri(client.RootAddressUri, cloudFile.Uri);
165
                                        Selectives.AddUri(accountInfo, selectiveUri);
166
                                        Selectives.Save(accountInfo);
167
                                    }
168
                                }
169
                                else
170
                                {
171

    
172
                                    var cloudHash = cloudInfo.X_Object_Hash.ToLower();
173

    
174
                                    string topHash;
175
                                    TreeHash treeHash;
176
                                    using (
177
                                        StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",
178
                                                                       fileInfo.Name))
179
                                    {
180
                                        treeHash = localTreeHash ?? action.TreeHash.Value;
181
                                        topHash = treeHash.TopHash.ToHashString();
182
                                    }
183

    
184

    
185

    
186
                                    //If the file hashes match, abort the upload
187
                                    if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash))
188
                                    {
189
                                        //but store any metadata changes 
190
                                        StatusKeeper.StoreInfo(fullFileName, cloudInfo);
191
                                        Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
192
                                        return;
193
                                    }
194

    
195

    
196
                                    //Mark the file as modified while we upload it
197
                                    await
198
                                        StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified).
199
                                            ConfigureAwait(false);
200
                                    //And then upload it
201

    
202
                                    //Upload even small files using the Hashmap. The server may already contain
203
                                    //the relevant block                                
204

    
205

    
206

    
207
                                    await
208
                                        UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name,
209
                                                          treeHash, cancellationToken).ConfigureAwait(false);
210
                                }
211

    
212
                                var currentInfo = client.GetObjectInfo(cloudFile.Account, cloudFile.Container,
213
                                                                       cloudFile.Name);
214
                                //If there is no stored ObjectID in the file state, add it
215
                                if (action.FileState == null || action.FileState.ObjectID == null)
216
                                {
217
                                    StatusKeeper.StoreInfo(fullFileName, currentInfo);
218
                                }
219
                                else
220
                                    //If everything succeeds, change the file and overlay status to normal
221
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged,
222
                                                              FileOverlayStatus.Normal, "");
223
                            }
224
                            else
225
                            {
226
                                StatusKeeper.ClearFileStatus(fullFileName);
227
                            }
228
                        }
229
                        catch (WebException exc)
230
                        {
231
                            var response = (exc.Response as HttpWebResponse);
232
                            if (response == null)
233
                                throw;
234
                            if (response.StatusCode == HttpStatusCode.Forbidden)
235
                            {
236
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
237
                                MakeFileReadOnly(fullFileName);
238
                            }
239
                            else
240
                                //In any other case, propagate the error
241
                                throw;
242
                        }
243
                    }
244
                    //Notify the Shell to update the overlays
245
                    NativeMethods.RaiseChangeNotification(fullFileName);
246
                    StatusNotification.NotifyChangedFile(fullFileName);
247
                }
248
                catch (AggregateException ex)
249
                {
250
                    var exc = ex.InnerException as WebException;
251
                    if (exc == null)
252
                        throw ex.InnerException;
253
                    if (HandleUploadWebException(action, exc))
254
                        return;
255
                    throw;
256
                }
257
                catch (WebException ex)
258
                {
259
                    if (HandleUploadWebException(action, ex))
260
                        return;
261
                    throw;
262
                }
263
                catch (Exception ex)
264
                {
265
                    Log.Error("Unexpected error while uploading file", ex);
266
                    throw;
267
                }
268
            }
269
        }
270

    
271
        private static void MakeFileReadOnly(string fullFileName)
272
        {
273
            var attributes = File.GetAttributes(fullFileName);
274
            //Do not make any modifications if not necessary
275
            if (attributes.HasFlag(FileAttributes.ReadOnly))
276
                return;
277
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);            
278
        }
279

    
280
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
281
        {
282
            var parts = relativePath.Split('\\');
283
            var accountName = parts[1];
284
            var oldName = accountInfo.UserName;
285
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
286
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
287
            var root = absoluteUri.Substring(0, nameIndex);
288

    
289
            accountInfo = new AccountInfo
290
                              {
291
                                  UserName = accountName,
292
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
293
                                  StorageUri = new Uri(root + accountName),
294
                                  BlockHash = accountInfo.BlockHash,
295
                                  BlockSize = accountInfo.BlockSize,
296
                                  Token = accountInfo.Token
297
                              };
298
            return accountInfo;
299
        }
300

    
301

    
302
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
303
        {
304
            if (accountInfo == null)
305
                throw new ArgumentNullException("accountInfo");
306
            if (cloudFile == null)
307
                throw new ArgumentNullException("cloudFile");
308
            if (fileInfo == null)
309
                throw new ArgumentNullException("fileInfo");
310
            if (String.IsNullOrWhiteSpace(url))
311
                throw new ArgumentNullException(url);
312
            if (treeHash == null)
313
                throw new ArgumentNullException("treeHash");
314
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
315
                throw new ArgumentException("Invalid container", "cloudFile");
316
            Contract.EndContractBlock();
317

    
318

    
319
            if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
320
                return;
321

    
322
            var fullFileName = fileInfo.GetProperCapitalization();
323

    
324
            var account = cloudFile.Account ?? accountInfo.UserName;
325
            var container = cloudFile.Container;
326

    
327
            int block = 0;
328

    
329
            var client = new CloudFilesClient(accountInfo);
330
            //Send the hashmap to the server            
331
            var missingHashes = await client.PutHashMap(account, container, url, treeHash).ConfigureAwait(false);
332
            ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length);
333
            //If the server returns no missing hashes, we are done
334

    
335
            client.UploadProgressChanged += (sender, args) =>
336
                                            ReportUploadProgress(fileInfo.Name, block, args.ProgressPercentage,
337
                                                                 missingHashes.Count, fileInfo.Length);
338

    
339

    
340
            while (missingHashes.Count > 0)
341
            {
342
                block = 0;
343

    
344
                if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
345
                    return;
346

    
347

    
348
                var buffer = new byte[accountInfo.BlockSize];
349
                foreach (var missingHash in missingHashes)
350
                {
351
                    if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
352
                        return;
353

    
354

    
355
                    //Find the proper block
356
                    long blockIndex = treeHash.HashDictionary[missingHash];
357
                    long offset = blockIndex*accountInfo.BlockSize;
358
                    Debug.Assert(offset >= 0,
359
                                 String.Format("Negative Offset! BlockIndex {0} BlockSize {1}", blockIndex,
360
                                               accountInfo.BlockSize));
361

    
362
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
363

    
364
                    try
365
                    {
366
                        //And upload the block                
367
                        await client.PostBlock(account, container, buffer, 0, read, token).ConfigureAwait(false);
368
                        token.ThrowIfCancellationRequested();
369
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
370
                    }
371
                    catch (TaskCanceledException exc)
372
                    {
373
                        throw new OperationCanceledException(token);
374
                    }
375
                    catch (Exception exc)
376
                    {
377
                        Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
378
                    }
379
                    ReportUploadProgress(fileInfo.Name, block++, 100, missingHashes.Count, fileInfo.Length);
380
                }
381

    
382
                token.ThrowIfCancellationRequested();
383
                //Repeat until there are no more missing hashes                
384
                missingHashes = await client.PutHashMap(account, container, url, treeHash).ConfigureAwait(false);
385
            }
386

    
387
            ReportUploadProgress(fileInfo.Name, missingHashes.Count, 0, missingHashes.Count, fileInfo.Length);
388

    
389
        }
390

    
391
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
392
        {
393
            token.ThrowIfCancellationRequested();
394
            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
395
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
396
            if (shouldAbort)
397
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
398
            return shouldAbort;
399
        }
400

    
401
        private void ReportUploadProgress(string fileName, int block, int blockPercentage, int totalBlocks, long fileSize)
402
        {
403
            StatusNotification.Notify(totalBlocks == 0
404
                                          ? new ProgressNotification(fileName, "Uploading", 1,blockPercentage, 1, fileSize)
405
                                          : new ProgressNotification(fileName, "Uploading", block, blockPercentage, totalBlocks, fileSize));
406
        }
407

    
408

    
409
        private bool HandleUploadWebException(CloudAction action, WebException exc)
410
        {
411
            var response = exc.Response as HttpWebResponse;
412
            if (response == null)
413
                throw exc;
414
            if (response.StatusCode == HttpStatusCode.Unauthorized)
415
            {
416
                Log.Error("Not allowed to upload file", exc);
417
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
418
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
419
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
420
                return true;
421
            }
422
            return false;
423
        }
424

    
425
        [Import]
426
        public Selectives Selectives { get; set; }
427

    
428
        public AsyncManualResetEvent UnpauseEvent { get; set; }
429
    }
430
}