Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ bf4471a3

History | View | Annotate | Download (19.2 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Reflection;
10
using System.Security.Cryptography;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export(typeof(Uploader))]
20
    public class Uploader
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        [Import]
25
        private IStatusKeeper StatusKeeper { get; set; }
26

    
27
        public IStatusNotification StatusNotification { get; set; }
28

    
29
        
30
        //CancellationTokenSource _cts = new CancellationTokenSource();
31
        /*public void SignalStop()
32
        {
33
            _cts.Cancel();
34
        }*/
35

    
36
        public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken)
37
        {
38
            if (action == null)
39
                throw new ArgumentNullException("action");
40
            Contract.EndContractBlock();
41

    
42
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
43
            {
44
                try
45
                {
46
                    await UnpauseEvent.WaitAsync().ConfigureAwait(false);
47

    
48
                    var fileInfo = action.LocalFile;
49

    
50
                    var progress=new Progress<double>(d=>
51
                        StatusNotification.Notify(new StatusNotification(String.Format("Merkle Hashing for Upload {0:p} of {1}",d,fileInfo.Name))));
52

    
53
                    TreeHash localTreeHash;
54
                    using (StatusNotification.GetNotifier("Merkle Hashing for Upload {0}", "Merkle Hashed for Upload {0}", fileInfo.Name))
55
                    {
56
                        localTreeHash = Signature.CalculateTreeHashAsync(fileInfo.FullName,
57
                                                                         action.AccountInfo.BlockSize,
58
                                                                         action.AccountInfo.BlockHash, 1,progress);
59
                    }
60

    
61
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
62
                        return;
63

    
64
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation)
65
                        return;
66

    
67
                    //Try to load the action's local state, if it is empty
68
                    if (action.FileState == null)
69
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
70
                    if (action.FileState != null)
71
                    {
72
                        /*
73
                                                Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
74
                                                return;
75
                        */
76

    
77

    
78
                        var latestState = action.FileState;
79

    
80
                        //Do not upload files in conflict
81
                        if (latestState.FileStatus == FileStatus.Conflict)
82
                        {
83
                            Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
84
                            return;
85
                        }
86
                        //Do not upload files when we have no permission
87
                        if (latestState.FileStatus == FileStatus.Forbidden)
88
                        {
89
                            Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
90
                            return;
91
                        }
92
                    }
93
                    //Are we targeting our own account or a sharer account?
94
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
95
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
96
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
97
                                                  : action.AccountInfo;
98

    
99

    
100

    
101
                    var fullFileName = fileInfo.GetProperCapitalization();
102
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
103
                    {
104
                        //Abort if the file is already being uploaded or downloaded
105
                        if (gate.Failed)
106
                            return;
107

    
108
                        var cloudFile = action.CloudFile;
109
                        var account = cloudFile.Account ?? accountInfo.UserName;
110
                        try
111
                        {
112

    
113
                            var client = new CloudFilesClient(accountInfo);
114

    
115
                            //Even if GetObjectInfo times out, we can proceed with the upload            
116
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
117

    
118
                            //If this a shared file
119
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
120
                            {
121
                                
122
/*
123
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
124
                                {
125
                                    MakeFileReadOnly(fullFileName);
126
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
127
                                    return;
128
                                }
129
*/
130

    
131
                                //If this is a read-only file, do not upload changes
132
                                if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
133
                                    //If the file is new, but we can't upload it
134
                                    (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) )
135
                                {
136
                                    MakeFileReadOnly(fullFileName);
137
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
138
                                    return;
139
                                }
140

    
141
                            }
142

    
143
                            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
144

    
145
                            if (fileInfo is DirectoryInfo)
146
                            {
147
                                //If the directory doesn't exist the Hash property will be empty
148
                                if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash))
149
                                    //Go on and create the directory
150
                                    await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
151
                                                         String.Empty, "application/directory");
152
                                //If the upload is in response to a Folder create with Selective Sync enabled
153
                                if (action.IsCreation)
154
                                {
155
                                    //Add the folder to the Selected URls
156
                                    var selectiveUri = new Uri(client.RootAddressUri, cloudFile.Uri);
157
                                    Selectives.AddUri(accountInfo, selectiveUri);                                    
158
                                    Selectives.Save(accountInfo);
159
                                }
160
                            }
161
                            else
162
                            {
163

    
164
                                var cloudHash = cloudInfo.X_Object_Hash.ToLower();
165

    
166
                                string topHash;
167
                                TreeHash treeHash;
168
                                using(StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",fileInfo.Name))
169
                                {
170
                                    treeHash = localTreeHash??action.TreeHash.Value;
171
                                    topHash = treeHash.TopHash.ToHashString();
172
                                }
173

    
174

    
175

    
176
                                //If the file hashes match, abort the upload
177
                                if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash ))
178
                                {
179
                                    //but store any metadata changes 
180
                                    StatusKeeper.StoreInfo(fullFileName, cloudInfo);
181
                                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
182
                                    return;
183
                                }
184

    
185

    
186
                                //Mark the file as modified while we upload it
187
                                await StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified).ConfigureAwait(false);
188
                                //And then upload it
189

    
190
                                //Upload even small files using the Hashmap. The server may already contain
191
                                //the relevant block                                
192

    
193

    
194

    
195
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash, cancellationToken).ConfigureAwait(false);
196
                            }
197
                            var currentInfo=client.GetObjectInfo(cloudFile.Account, cloudFile.Container, cloudFile.Name);
198
                            //If there is no stored ObjectID in the file state, add it
199
                            if (action.FileState == null || action.FileState.ObjectID == null)
200
                            {
201
                                StatusKeeper.StoreInfo(fullFileName,currentInfo);
202
                            }
203
                            else                             
204
                                //If everything succeeds, change the file and overlay status to normal
205
                                StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
206
                        }
207
                        catch (WebException exc)
208
                        {
209
                            var response = (exc.Response as HttpWebResponse);
210
                            if (response == null)
211
                                throw;
212
                            if (response.StatusCode == HttpStatusCode.Forbidden)
213
                            {
214
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
215
                                MakeFileReadOnly(fullFileName);
216
                            }
217
                            else
218
                                //In any other case, propagate the error
219
                                throw;
220
                        }
221
                    }
222
                    //Notify the Shell to update the overlays
223
                    NativeMethods.RaiseChangeNotification(fullFileName);
224
                    StatusNotification.NotifyChangedFile(fullFileName);
225
                }
226
                catch (AggregateException ex)
227
                {
228
                    var exc = ex.InnerException as WebException;
229
                    if (exc == null)
230
                        throw ex.InnerException;
231
                    if (HandleUploadWebException(action, exc))
232
                        return;
233
                    throw;
234
                }
235
                catch (WebException ex)
236
                {
237
                    if (HandleUploadWebException(action, ex))
238
                        return;
239
                    throw;
240
                }
241
                catch (Exception ex)
242
                {
243
                    Log.Error("Unexpected error while uploading file", ex);
244
                    throw;
245
                }
246
            }
247
        }
248

    
249
        private static void MakeFileReadOnly(string fullFileName)
250
        {
251
            var attributes = File.GetAttributes(fullFileName);
252
            //Do not make any modifications if not necessary
253
            if (attributes.HasFlag(FileAttributes.ReadOnly))
254
                return;
255
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);            
256
        }
257

    
258
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
259
        {
260
            var parts = relativePath.Split('\\');
261
            var accountName = parts[1];
262
            var oldName = accountInfo.UserName;
263
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
264
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
265
            var root = absoluteUri.Substring(0, nameIndex);
266

    
267
            accountInfo = new AccountInfo
268
                              {
269
                                  UserName = accountName,
270
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
271
                                  StorageUri = new Uri(root + accountName),
272
                                  BlockHash = accountInfo.BlockHash,
273
                                  BlockSize = accountInfo.BlockSize,
274
                                  Token = accountInfo.Token
275
                              };
276
            return accountInfo;
277
        }
278

    
279

    
280
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
281
        {
282
            if (accountInfo == null)
283
                throw new ArgumentNullException("accountInfo");
284
            if (cloudFile == null)
285
                throw new ArgumentNullException("cloudFile");
286
            if (fileInfo == null)
287
                throw new ArgumentNullException("fileInfo");
288
            if (String.IsNullOrWhiteSpace(url))
289
                throw new ArgumentNullException(url);
290
            if (treeHash == null)
291
                throw new ArgumentNullException("treeHash");
292
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
293
                throw new ArgumentException("Invalid container", "cloudFile");
294
            Contract.EndContractBlock();
295

    
296

    
297
            if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
298
                return;
299

    
300
            var fullFileName = fileInfo.GetProperCapitalization();
301

    
302
            var account = cloudFile.Account ?? accountInfo.UserName;
303
            var container = cloudFile.Container;
304

    
305
            int block = 0;
306

    
307
            var client = new CloudFilesClient(accountInfo);
308
            //Send the hashmap to the server            
309
            var missingHashes = await client.PutHashMap(account, container, url, treeHash).ConfigureAwait(false);
310
            ReportUploadProgress(fileInfo.Name, block, 0, missingHashes.Count, fileInfo.Length);
311
            //If the server returns no missing hashes, we are done
312

    
313
            client.UploadProgressChanged += (sender, args) =>
314
                                            ReportUploadProgress(fileInfo.Name, block, args.ProgressPercentage,
315
                                                                 missingHashes.Count, fileInfo.Length);
316

    
317

    
318
            while (missingHashes.Count > 0)
319
            {
320
                block = 0;
321

    
322
                if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
323
                    return;
324

    
325

    
326
                var buffer = new byte[accountInfo.BlockSize];
327
                foreach (var missingHash in missingHashes)
328
                {
329
                    if (await WaitOrAbort(accountInfo, cloudFile, token).ConfigureAwait(false))
330
                        return;
331

    
332

    
333
                    //Find the proper block
334
                    long blockIndex = treeHash.HashDictionary[missingHash];
335
                    long offset = blockIndex*accountInfo.BlockSize;
336
                    Debug.Assert(offset >= 0,
337
                                 String.Format("Negative Offset! BlockIndex {0} BlockSize {1}", blockIndex,
338
                                               accountInfo.BlockSize));
339

    
340
                    var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
341

    
342
                    try
343
                    {
344
                        //And upload the block                
345
                        await client.PostBlock(account, container, buffer, 0, read, token).ConfigureAwait(false);
346
                        Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
347
                    }
348
                    catch (TaskCanceledException exc)
349
                    {
350
                        throw new OperationCanceledException(token);
351
                    }
352
                    catch (Exception exc)
353
                    {
354
                        Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
355
                    }
356
                    ReportUploadProgress(fileInfo.Name, block++, 100, missingHashes.Count, fileInfo.Length);
357
                }
358

    
359
                token.ThrowIfCancellationRequested();
360
                //Repeat until there are no more missing hashes                
361
                missingHashes = await client.PutHashMap(account, container, url, treeHash).ConfigureAwait(false);
362
            }
363

    
364
            ReportUploadProgress(fileInfo.Name, missingHashes.Count, 0, missingHashes.Count, fileInfo.Length);
365

    
366
        }
367

    
368
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
369
        {
370
            token.ThrowIfCancellationRequested();
371
            await UnpauseEvent.WaitAsync().ConfigureAwait(false);
372
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
373
            if (shouldAbort)
374
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
375
            return shouldAbort;
376
        }
377

    
378
        private void ReportUploadProgress(string fileName, int block, int blockPercentage, int totalBlocks, long fileSize)
379
        {
380
            StatusNotification.Notify(totalBlocks == 0
381
                                          ? new ProgressNotification(fileName, "Uploading", 1,blockPercentage, 1, fileSize)
382
                                          : new ProgressNotification(fileName, "Uploading", block, blockPercentage, totalBlocks, fileSize));
383
        }
384

    
385

    
386
        private bool HandleUploadWebException(CloudAction action, WebException exc)
387
        {
388
            var response = exc.Response as HttpWebResponse;
389
            if (response == null)
390
                throw exc;
391
            if (response.StatusCode == HttpStatusCode.Unauthorized)
392
            {
393
                Log.Error("Not allowed to upload file", exc);
394
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
395
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
396
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
397
                return true;
398
            }
399
            return false;
400
        }
401

    
402
        [Import]
403
        public Selectives Selectives { get; set; }
404

    
405
        public AsyncManualResetEvent UnpauseEvent { get; set; }
406
    }
407
}