Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ e4067290

History | View | Annotate | Download (18 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Reflection;
10
using System.Security.Cryptography;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export(typeof(Uploader))]
20
    public class Uploader
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        [Import]
25
        private IStatusKeeper StatusKeeper { get; set; }
26

    
27
        public IStatusNotification StatusNotification { get; set; }
28

    
29
        
30
        //CancellationTokenSource _cts = new CancellationTokenSource();
31
        /*public void SignalStop()
32
        {
33
            _cts.Cancel();
34
        }*/
35

    
36
        public async Task UploadCloudFile(CloudUploadAction action,CancellationToken cancellationToken)
37
        {
38
            if (action == null)
39
                throw new ArgumentNullException("action");
40
            Contract.EndContractBlock();
41

    
42
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
43
            {
44
                try
45
                {
46
                    await UnpauseEvent.WaitAsync();
47

    
48
                    var fileInfo = action.LocalFile;
49

    
50
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
51
                        return;
52

    
53
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo) && !action.IsCreation)
54
                        return;
55

    
56
                    //Try to load the action's local state, if it is empty
57
                    if (action.FileState == null)
58
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
59
                    if (action.FileState != null)
60
                    {
61
                        /*
62
                                                Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
63
                                                return;
64
                        */
65

    
66

    
67
                        var latestState = action.FileState;
68

    
69
                        //Do not upload files in conflict
70
                        if (latestState.FileStatus == FileStatus.Conflict)
71
                        {
72
                            Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
73
                            return;
74
                        }
75
                        //Do not upload files when we have no permission
76
                        if (latestState.FileStatus == FileStatus.Forbidden)
77
                        {
78
                            Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
79
                            return;
80
                        }
81
                    }
82
                    //Are we targeting our own account or a sharer account?
83
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
84
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
85
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
86
                                                  : action.AccountInfo;
87

    
88

    
89

    
90
                    var fullFileName = fileInfo.GetProperCapitalization();
91
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
92
                    {
93
                        //Abort if the file is already being uploaded or downloaded
94
                        if (gate.Failed)
95
                            return;
96

    
97
                        var cloudFile = action.CloudFile;
98
                        var account = cloudFile.Account ?? accountInfo.UserName;
99
                        try
100
                        {
101

    
102
                            var client = new CloudFilesClient(accountInfo);
103

    
104
                            //Even if GetObjectInfo times out, we can proceed with the upload            
105
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
106

    
107
                            //If this a shared file
108
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
109
                            {
110
                                
111
/*
112
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
113
                                {
114
                                    MakeFileReadOnly(fullFileName);
115
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
116
                                    return;
117
                                }
118
*/
119

    
120
                                //If this is a read-only file, do not upload changes
121
                                if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
122
                                    //If the file is new, but we can't upload it
123
                                    (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) )
124
                                {
125
                                    MakeFileReadOnly(fullFileName);
126
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
127
                                    return;
128
                                }
129

    
130
                            }
131

    
132
                            await UnpauseEvent.WaitAsync();
133

    
134
                            if (fileInfo is DirectoryInfo)
135
                            {
136
                                //If the directory doesn't exist the Hash property will be empty
137
                                if (String.IsNullOrWhiteSpace(cloudInfo.X_Object_Hash))
138
                                    //Go on and create the directory
139
                                    await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
140
                                                         String.Empty, "application/directory");
141
                                //If the upload is in response to a Folder create with Selective Sync enabled
142
                                if (action.IsCreation)
143
                                {
144
                                    //Add the folder to the Selected URls
145
                                    var selectiveUri = new Uri(client.RootAddressUri, cloudFile.Uri);
146
                                    Selectives.AddUri(accountInfo, selectiveUri);                                    
147
                                    Selectives.Save(accountInfo);
148
                                }
149
                            }
150
                            else
151
                            {
152

    
153
                                var cloudHash = cloudInfo.X_Object_Hash.ToLower();
154

    
155
                                string topHash;
156
                                TreeHash treeHash;
157
                                using(StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",fileInfo.Name))
158
                                {
159
                                    treeHash = action.TreeHash.Value;
160
                                    topHash = treeHash.TopHash.ToHashString();
161
                                }
162

    
163

    
164

    
165
                                //If the file hashes match, abort the upload
166
                                if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash ))
167
                                {
168
                                    //but store any metadata changes 
169
                                    StatusKeeper.StoreInfo(fullFileName, cloudInfo);
170
                                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
171
                                    return;
172
                                }
173

    
174

    
175
                                //Mark the file as modified while we upload it
176
                                await StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
177
                                //And then upload it
178

    
179
                                //Upload even small files using the Hashmap. The server may already contain
180
                                //the relevant block                                
181

    
182
                                
183

    
184
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken);
185
                            }
186
                            var currentInfo=client.GetObjectInfo(cloudFile.Account, cloudFile.Container, cloudFile.Name);
187
                            //If there is no stored ObjectID in the file state, add it
188
                            if (action.FileState == null || action.FileState.ObjectID == null)
189
                            {
190
                                StatusKeeper.StoreInfo(fullFileName,currentInfo);
191
                            }
192
                            else                             
193
                                //If everything succeeds, change the file and overlay status to normal
194
                                StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
195
                        }
196
                        catch (WebException exc)
197
                        {
198
                            var response = (exc.Response as HttpWebResponse);
199
                            if (response == null)
200
                                throw;
201
                            if (response.StatusCode == HttpStatusCode.Forbidden)
202
                            {
203
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
204
                                MakeFileReadOnly(fullFileName);
205
                            }
206
                            else
207
                                //In any other case, propagate the error
208
                                throw;
209
                        }
210
                    }
211
                    //Notify the Shell to update the overlays
212
                    NativeMethods.RaiseChangeNotification(fullFileName);
213
                    StatusNotification.NotifyChangedFile(fullFileName);
214
                }
215
                catch (AggregateException ex)
216
                {
217
                    var exc = ex.InnerException as WebException;
218
                    if (exc == null)
219
                        throw ex.InnerException;
220
                    if (HandleUploadWebException(action, exc))
221
                        return;
222
                    throw;
223
                }
224
                catch (WebException ex)
225
                {
226
                    if (HandleUploadWebException(action, ex))
227
                        return;
228
                    throw;
229
                }
230
                catch (Exception ex)
231
                {
232
                    Log.Error("Unexpected error while uploading file", ex);
233
                    throw;
234
                }
235
            }
236
        }
237

    
238
        private static void MakeFileReadOnly(string fullFileName)
239
        {
240
            var attributes = File.GetAttributes(fullFileName);
241
            //Do not make any modifications if not necessary
242
            if (attributes.HasFlag(FileAttributes.ReadOnly))
243
                return;
244
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);            
245
        }
246

    
247
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
248
        {
249
            var parts = relativePath.Split('\\');
250
            var accountName = parts[1];
251
            var oldName = accountInfo.UserName;
252
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
253
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
254
            var root = absoluteUri.Substring(0, nameIndex);
255

    
256
            accountInfo = new AccountInfo
257
                              {
258
                                  UserName = accountName,
259
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
260
                                  StorageUri = new Uri(root + accountName),
261
                                  BlockHash = accountInfo.BlockHash,
262
                                  BlockSize = accountInfo.BlockSize,
263
                                  Token = accountInfo.Token
264
                              };
265
            return accountInfo;
266
        }
267

    
268

    
269
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
270
        {
271
            if (accountInfo == null)
272
                throw new ArgumentNullException("accountInfo");
273
            if (cloudFile == null)
274
                throw new ArgumentNullException("cloudFile");
275
            if (fileInfo == null)
276
                throw new ArgumentNullException("fileInfo");
277
            if (String.IsNullOrWhiteSpace(url))
278
                throw new ArgumentNullException(url);
279
            if (treeHash == null)
280
                throw new ArgumentNullException("treeHash");
281
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
282
                throw new ArgumentException("Invalid container", "cloudFile");
283
            Contract.EndContractBlock();
284

    
285
           
286
            using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name))
287
            {
288
                if (await WaitOrAbort(accountInfo,cloudFile, token)) 
289
                    return;
290

    
291
                var fullFileName = fileInfo.GetProperCapitalization();
292

    
293
                var account = cloudFile.Account ?? accountInfo.UserName;
294
                var container = cloudFile.Container;
295

    
296

    
297
                var client = new CloudFilesClient(accountInfo);
298
                //Send the hashmap to the server            
299
                var missingHashes = await client.PutHashMap(account, container, url, treeHash);
300
                int block = 0;
301
                ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
302
                //If the server returns no missing hashes, we are done
303
                while (missingHashes.Count > 0)
304
                {
305

    
306
                    if (await WaitOrAbort(accountInfo,cloudFile, token))
307
                        return;
308

    
309

    
310
                    var buffer = new byte[accountInfo.BlockSize];
311
                    foreach (var missingHash in missingHashes)
312
                    {
313
                        if (await WaitOrAbort(accountInfo,cloudFile, token))
314
                            return;
315

    
316

    
317
                        //Find the proper block
318
                        var blockIndex = treeHash.HashDictionary[missingHash];
319
                        long offset = blockIndex*accountInfo.BlockSize;
320

    
321
                        var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
322

    
323
                        try
324
                        {
325
                            //And upload the block                
326
                            await client.PostBlock(account, container, buffer, 0, read, token);
327
                            Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
328
                        }
329
                        catch (TaskCanceledException exc)
330
                        {
331
                            throw new OperationCanceledException(token);
332
                        }
333
                        catch (Exception exc)
334
                        {
335
                            Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
336
                        }
337
                        ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
338
                    }
339

    
340
                    token.ThrowIfCancellationRequested();
341
                    //Repeat until there are no more missing hashes                
342
                    missingHashes = await client.PutHashMap(account, container, url, treeHash);
343
                }
344

    
345
                ReportUploadProgress(fileInfo.Name, missingHashes.Count, missingHashes.Count, fileInfo.Length);
346
            }
347
        }
348

    
349
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
350
        {
351
            token.ThrowIfCancellationRequested();
352
            await UnpauseEvent.WaitAsync();
353
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
354
            if (shouldAbort)
355
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
356
            return shouldAbort;
357
        }
358

    
359
        private void ReportUploadProgress(string fileName, int block, int totalBlocks, long fileSize)
360
        {
361
            StatusNotification.Notify(totalBlocks == 0
362
                                          ? new ProgressNotification(fileName, "Uploading", 1, 1, fileSize)
363
                                          : new ProgressNotification(fileName, "Uploading", block, totalBlocks, fileSize));
364
        }
365

    
366

    
367
        private bool HandleUploadWebException(CloudAction action, WebException exc)
368
        {
369
            var response = exc.Response as HttpWebResponse;
370
            if (response == null)
371
                throw exc;
372
            if (response.StatusCode == HttpStatusCode.Unauthorized)
373
            {
374
                Log.Error("Not allowed to upload file", exc);
375
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
376
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
377
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
378
                return true;
379
            }
380
            return false;
381
        }
382

    
383
        [Import]
384
        public Selectives Selectives { get; set; }
385

    
386
        public AsyncManualResetEvent UnpauseEvent { get; set; }
387
    }
388
}