Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ dc18b138

History | View | Annotate | Download (21 kB)

1
using System;
2
using System.ComponentModel.Composition;
3
using System.Diagnostics;
4
using System.Diagnostics.Contracts;
5
using System.IO;
6
using System.Net;
7
using System.Reflection;
8
using System.Threading;
9
using System.Threading.Tasks;
10
using Pithos.Interfaces;
11
using Pithos.Network;
12
using log4net;
13

    
14
namespace Pithos.Core.Agents
15
{
16
    [Export(typeof(Uploader))]
17
    public class Uploader
18
    {
19
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
20

    
21
        [Import]
22
        private IStatusKeeper StatusKeeper { get; set; }
23

    
24
        [Import]
25
        private IPithosSettings Settings { get; set; }
26

    
27
        public IStatusNotification StatusNotification { get; set; }
28

    
29
        
30
        //CancellationTokenSource _cts = new CancellationTokenSource();
31
        /*public void SignalStop()
32
        {
33
            _cts.Cancel();
34
        }*/
35

    
36
        public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken)
37
        {
38
            if (action == null)
39
                throw new ArgumentNullException("action");
40
            Contract.EndContractBlock();
41

    
42
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
43
            {
44
                try
45
                {
46
                    await UnpauseEvent.WaitAsync().ConfigureAwait(false);
47

    
48
                    var fileInfo = action.LocalFile;
49

    
50
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
51
                        return;
52

    
53
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation)
54
                        return;
55

    
56

    
57
                    //Try to load the action's local state, if it is empty
58
                    if (action.FileState == null)
59
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
60

    
61
                    TreeHash localTreeHash;
62
                    using (StatusNotification.GetNotifier("Merkle Hashing for Upload {0}", "Merkle Hashed for Upload {0}", fileInfo.Name))
63
                    {
64
                        //TODO: Load the stored treehash if appropriate
65
                        //TODO: WHO updates LastMD5?
66

    
67
                        var progress = new Progress<HashProgress>(d => StatusNotification.Notify(
68
                            new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}", d.Percentage, fileInfo.Name))));
69

    
70
                        //If the action's Treehash is already calculated, use it instead of reprocessing
71
                        localTreeHash = action.TreeHash.IsValueCreated
72
                            ? action.TreeHash.Value 
73
                            : StatusAgent.CalculateTreeHash(fileInfo, action.AccountInfo, action.FileState, Settings.HashingParallelism, cancellationToken, progress);
74
                    }
75

    
76

    
77
                    if (action.FileState != null)
78
                    {
79
                        /*
80
                                                Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
81
                                                return;
82
                        */
83

    
84

    
85
                        var latestState = action.FileState;
86

    
87
                        //Do not upload files in conflict
88
                        if (latestState.FileStatus == FileStatus.Conflict)
89
                        {
90
                            Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
91
                            return;
92
                        }
93
                        //Do not upload files when we have no permission
94
                        if (latestState.FileStatus == FileStatus.Forbidden)
95
                        {
96
                            Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
97
                            return;
98
                        }
99
                    }
100
                    //Are we targeting our own account or a sharer account?
101
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
102
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
103
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
104
                                                  : action.AccountInfo;
105

    
106

    
107

    
108
                    var fullFileName = fileInfo.GetProperCapitalization();
109
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
110
                    {
111
                        //Abort if the file is already being uploaded or downloaded
112
                        if (gate.Failed)
113
                            return;
114

    
115
                        var cloudFile = action.CloudFile;
116
                        var account = cloudFile.Account ?? accountInfo.UserName;
117
                        try
118
                        {
119

    
120
                            var client = new CloudFilesClient(accountInfo);
121

    
122
                            //Even if GetObjectInfo times out, we can proceed with the upload            
123
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
124

    
125
                            //If this a shared file
126
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
127
                            {
128
                                
129
/*
130
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
131
                                {
132
                                    MakeFileReadOnly(fullFileName);
133
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
134
                                    return;
135
                                }
136
*/
137

    
138
                                //If this is a read-only file, do not upload changes
139
                                if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
140
                                    //If the file is new, but we can't upload it
141
                                    (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) )
142
                                {
143
                                    MakeFileReadOnly(fullFileName);
144
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
145
                                    return;
146
                                }
147

    
148
                            }
149

    
150
                            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
151

    
152
                            fileInfo.Refresh();
153
                            //Does the file still exist or was it deleted/renamed?
154
                            if (fileInfo.Exists)
155
                            {
156
                                if (fileInfo is DirectoryInfo)
157
                                {
158
                                    //If the directory doesn't exist the Hash property will be empty
159
                                    if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash))
160
                                        //Go on and create the directory
161
                                        await
162
                                            client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
163
                                                             Signature.MERKLE_EMPTY, ObjectInfo.CONTENT_TYPE_DIRECTORY);
164
                                    //If the upload is in response to a Folder create with Selective Sync enabled
165
                                    if (action.IsCreation)
166
                                    {
167
                                        //Add the folder to the Selected URls
168
                                        var selectiveUri = client.RootAddressUri.Combine(cloudFile.Uri);
169
                                        Selectives.AddUri(accountInfo, selectiveUri);
170
                                        Selectives.Save(accountInfo);
171
                                    }
172
                                }
173
                                else
174
                                {
175

    
176
                                    var cloudHash = cloudInfo.X_Object_Hash.ToLower();
177

    
178
                                    string topHash;
179
                                    TreeHash treeHash;
180
                                    using (
181
                                        StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",
182
                                                                       fileInfo.Name))
183
                                    {
184
                                        treeHash = localTreeHash ?? action.TreeHash.Value;
185
                                        topHash = treeHash.TopHash.ToHashString();
186
                                    }
187

    
188

    
189

    
190
                                    //If the file hashes match, abort the upload
191
                                    if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash))
192
                                    {
193
                                        //but store any metadata changes 
194
                                        StatusKeeper.StoreInfo(fullFileName, cloudInfo,treeHash);
195
                                        Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
196
                                        return;
197
                                    }
198

    
199

    
200
                                    //Mark the file as modified while we upload it
201
                                    StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
202
                                    //And then upload it
203

    
204
                                    //Upload even small files using the Hashmap. The server may already contain
205
                                    //the relevant block                                
206

    
207

    
208

    
209
                                    await
210
                                        UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name,
211
                                                          treeHash, cancellationToken).ConfigureAwait(false);
212
                                }
213

    
214
                                var currentInfo = client.GetObjectInfo(cloudFile.Account, cloudFile.Container,
215
                                                                       cloudFile.Name);
216

    
217
                                StatusKeeper.StoreInfo(fullFileName, currentInfo, localTreeHash);
218
                                //Ensure the status is cleared
219
                                StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged,
220
                                                              FileOverlayStatus.Normal, "");
221
/*
222
                                //If there is no stored ObjectID in the file state, add it
223
                                //TODO: Why not just update everything, then change the state?
224
                                if (action.FileState == null || action.FileState.ObjectID == null)
225
                                {
226
                                    
227
                                }
228
                                else
229
                                    //If everything succeeds, change the file and overlay status to normal
230
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged,
231
                                                              FileOverlayStatus.Normal, "");
232
*/
233
                            }
234
                            else
235
                            {
236
                                StatusKeeper.ClearFileStatus(fullFileName);
237
                            }
238
                        }
239
                        catch (WebException exc)
240
                        {
241
                            var response = (exc.Response as HttpWebResponse);
242
                            if (response == null)
243
                                throw;
244
                            if (response.StatusCode == HttpStatusCode.Forbidden)
245
                            {
246
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
247
                                MakeFileReadOnly(fullFileName);
248
                            }
249
                            else
250
                                //In any other case, propagate the error
251
                                throw;
252
                        }
253
                    }
254
                    //Notify the Shell to update the overlays
255
                    NativeMethods.RaiseChangeNotification(fullFileName);
256
                    StatusNotification.NotifyChangedFile(fullFileName);
257
                }
258
                catch (AggregateException ex)
259
                {
260
                    var exc = ex.InnerException as WebException;
261
                    if (exc == null)
262
                        throw ex.InnerException;
263
                    if (HandleUploadWebException(action, exc))
264
                        return;
265
                    throw;
266
                }
267
                catch (WebException ex)
268
                {
269
                    if (HandleUploadWebException(action, ex))
270
                        return;
271
                    throw;
272
                }
273
                catch (Exception ex)
274
                {
275
                    Log.Error("Unexpected error while uploading file", ex);
276
                    throw;
277
                }
278
            }
279
        }
280

    
281

    
282
        private static void MakeFileReadOnly(string fullFileName)
283
        {
284
            var attributes = File.GetAttributes(fullFileName);
285
            //Do not make any modifications if not necessary
286
            if (attributes.HasFlag(FileAttributes.ReadOnly))
287
                return;
288
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);            
289
        }
290

    
291
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
292
        {
293
            var parts = relativePath.Split('\\');
294
            var accountName = parts[1];
295
            var oldName = accountInfo.UserName;
296
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
297
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
298
            var root = absoluteUri.Substring(0, nameIndex);
299

    
300
            accountInfo = new AccountInfo
301
                              {
302
                                  UserName = accountName,
303
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
304
                                  StorageUri = new Uri(root + accountName),
305
                                  BlockHash = accountInfo.BlockHash,
306
                                  BlockSize = accountInfo.BlockSize,
307
                                  Token = accountInfo.Token
308
                              };
309
            return accountInfo;
310
        }
311

    
312

    
313
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, Uri uri, TreeHash treeHash, CancellationToken token)
314
        {
315
            if (accountInfo == null)
316
                throw new ArgumentNullException("accountInfo");
317
            if (cloudFile == null)
318
                throw new ArgumentNullException("cloudFile");
319
            if (fileInfo == null)
320
                throw new ArgumentNullException("fileInfo");
321
            if (uri==null)
322
                throw new ArgumentNullException("uri");
323
            if (treeHash == null)
324
                throw new ArgumentNullException("treeHash");
325
            if (cloudFile.Container==null)
326
                throw new ArgumentException("Invalid container", "cloudFile");
327
            if (cloudFile.Container.IsAbsoluteUri)
328
                throw new ArgumentException("Container URI must be relative", "cloudFile");
329
            Contract.EndContractBlock();
330

    
331

    
332
            if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
333
                return;
334

    
335
            var fullFileName = fileInfo.GetProperCapitalization();
336

    
337
            var account = cloudFile.Account ?? accountInfo.UserName;
338
            var container = cloudFile.Container;
339

    
340
            int block = 0;
341

    
342
            var client = new CloudFilesClient(accountInfo);
343
            //Send the hashmap to the server            
344
            var missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false);
345
            ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length);
346
            //If the server returns no missing hashes, we are done
347

    
348
            client.UploadProgressChanged += (sender, args) =>
349
                                            ReportUploadProgress(fileInfo.Name, block, args.ProgressPercentage,
350
                                                                 missingHashes.Count, fileInfo.Length);
351

    
352

    
353
            while (missingHashes.Count > 0)
354
            {
355
                block = 0;
356

    
357
                if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
358
                    return;
359

    
360

    
361
                var buffer = new byte[accountInfo.BlockSize];
362
                foreach (var missingHash in missingHashes)
363
                {
364
                    if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
365
                        return;
366

    
367

    
368
                    //Find the proper block
369
                    long blockIndex = treeHash.HashDictionary[missingHash];
370
                    long offset = blockIndex*accountInfo.BlockSize;
371
                    Debug.Assert(offset >= 0,
372
                                 String.Format("Negative Offset! BlockIndex {0} BlockSize {1}", blockIndex,
373
                                               accountInfo.BlockSize));
374

    
375
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
376

    
377
                    try
378
                    {
379
                        //And upload the block                
380
                        await client.PostBlock(account, container, buffer, 0, read,missingHash, token).ConfigureAwait(false);
381
                        token.ThrowIfCancellationRequested();
382
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
383
                    }
384
                    catch (TaskCanceledException)
385
                    {
386
                        throw new OperationCanceledException(token);
387
                    }
388
                    catch (Exception exc)
389
                    {
390
                        Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
391
                    }
392
                    ReportUploadProgress(fileInfo.Name, block++, 100, missingHashes.Count, fileInfo.Length);
393
                }
394

    
395
                token.ThrowIfCancellationRequested();
396
                //Repeat until there are no more missing hashes                
397
                missingHashes = await client.PutHashMap(account, container, uri, treeHash).ConfigureAwait(false);
398
            }
399

    
400
            ReportUploadProgress(fileInfo.Name, missingHashes.Count, 0, missingHashes.Count, fileInfo.Length);
401

    
402
        }
403

    
404
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
405
        {
406
            token.ThrowIfCancellationRequested();
407
            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
408
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
409
            if (shouldAbort)
410
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
411
            return shouldAbort;
412
        }
413

    
414
        private void ReportUploadProgress(string fileName, int block, int blockPercentage, int totalBlocks, long fileSize)
415
        {
416
            StatusNotification.Notify(totalBlocks == 0
417
                                          ? new ProgressNotification(fileName, "Uploading", 1,blockPercentage, 1, fileSize)
418
                                          : new ProgressNotification(fileName, "Uploading", block, blockPercentage, totalBlocks, fileSize));
419
        }
420

    
421

    
422
        private bool HandleUploadWebException(CloudAction action, WebException exc)
423
        {
424
            var response = exc.Response as HttpWebResponse;
425
            if (response == null)
426
                throw exc;
427
            if (response.StatusCode == HttpStatusCode.Unauthorized)
428
            {
429
                Log.Error("Not allowed to upload file", exc);
430
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
431
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
432
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
433
                return true;
434
            }
435
            return false;
436
        }
437

    
438
        [Import]
439
        public Selectives Selectives { get; set; }
440

    
441
        public AsyncManualResetEvent UnpauseEvent { get; set; }
442
    }
443
}