Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ 43dd02a8

History | View | Annotate | Download (16.1 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Reflection;
10
using System.Security.Cryptography;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export(typeof(Uploader))]
20
    public class Uploader
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        [Import]
25
        private IStatusKeeper StatusKeeper { get; set; }
26

    
27
        
28
        public IStatusNotification StatusNotification { get; set; }
29

    
30
        
31
        //CancellationTokenSource _cts = new CancellationTokenSource();
32
        /*public void SignalStop()
33
        {
34
            _cts.Cancel();
35
        }*/
36

    
37
        public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken)
38
        {
39
            if (action == null)
40
                throw new ArgumentNullException("action");
41
            Contract.EndContractBlock();
42

    
43
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
44
            {
45
                try
46
                {
47
                    await UnpauseEvent.WaitAsync();
48

    
49
                    var fileInfo = action.LocalFile;
50

    
51
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
52
                        return;
53

    
54
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo))
55
                        return;
56

    
57
                    //Try to load the action's local state, if it is empty
58
                    if (action.FileState == null)
59
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
60
                    if (action.FileState == null)
61
                    {
62
                        Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
63
                        return;
64
                    }
65

    
66
                    var latestState = action.FileState;
67

    
68
                    //Do not upload files in conflict
69
                    if (latestState.FileStatus == FileStatus.Conflict)
70
                    {
71
                        Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
72
                        return;
73
                    }
74
                    //Do not upload files when we have no permission
75
                    if (latestState.FileStatus == FileStatus.Forbidden)
76
                    {
77
                        Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
78
                        return;
79
                    }
80

    
81
                    //Are we targeting our own account or a sharer account?
82
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
83
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
84
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
85
                                                  : action.AccountInfo;
86

    
87

    
88

    
89
                    var fullFileName = fileInfo.GetProperCapitalization();
90
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
91
                    {
92
                        //Abort if the file is already being uploaded or downloaded
93
                        if (gate.Failed)
94
                            return;
95

    
96
                        var cloudFile = action.CloudFile;
97
                        var account = cloudFile.Account ?? accountInfo.UserName;
98
                        try
99
                        {
100

    
101
                            var client = new CloudFilesClient(accountInfo);
102

    
103
                            //Even if GetObjectInfo times out, we can proceed with the upload            
104
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
105

    
106
                            //If this a shared file
107
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
108
                            {
109
                                //If this is a read-only file, do not upload changes
110
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
111
                                {
112
                                    MakeFileReadOnly(fullFileName);
113
                                    return;
114
                                }
115

    
116
                                //If the file is new, can we upload it?
117
                                if ( !cloudInfo.Exists && !client.CanUpload(account, cloudFile))
118
                                {
119
                                    MakeFileReadOnly(fullFileName);
120
                                    return;
121
                                }
122

    
123
                            }
124

    
125
                            await UnpauseEvent.WaitAsync();
126

    
127
                            if (fileInfo is DirectoryInfo)
128
                            {
129
                                //If the directory doesn't exist the Hash property will be empty
130
                                if (String.IsNullOrWhiteSpace(cloudInfo.Hash))
131
                                    //Go on and create the directory
132
                                    await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
133
                                                         String.Empty, "application/directory");
134
                            }
135
                            else
136
                            {
137

    
138
                                var cloudHash = cloudInfo.Hash.ToLower();
139

    
140
                                string topHash;
141
                                TreeHash treeHash;
142
                                using(StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",fileInfo.Name))
143
                                {
144
                                    treeHash = action.TreeHash.Value;
145
                                    topHash = treeHash.TopHash.ToHashString();
146
                                }
147

    
148

    
149

    
150
                                //If the file hashes match, abort the upload
151
                                if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash ))
152
                                {
153
                                    //but store any metadata changes 
154
                                    StatusKeeper.StoreInfo(fullFileName, cloudInfo);
155
                                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
156
                                    return;
157
                                }
158

    
159

    
160
                                //Mark the file as modified while we upload it
161
                                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
162
                                //And then upload it
163

    
164
                                //Upload even small files using the Hashmap. The server may already contain
165
                                //the relevant block                                
166

    
167
                                
168

    
169
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken);
170
                            }
171
                            //If everything succeeds, change the file and overlay status to normal
172
                            StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
173
                        }
174
                        catch (WebException exc)
175
                        {
176
                            var response = (exc.Response as HttpWebResponse);
177
                            if (response == null)
178
                                throw;
179
                            if (response.StatusCode == HttpStatusCode.Forbidden)
180
                            {
181
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
182
                                MakeFileReadOnly(fullFileName);
183
                            }
184
                            else
185
                                //In any other case, propagate the error
186
                                throw;
187
                        }
188
                    }
189
                    //Notify the Shell to update the overlays
190
                    NativeMethods.RaiseChangeNotification(fullFileName);
191
                    StatusNotification.NotifyChangedFile(fullFileName);
192
                }
193
                catch (AggregateException ex)
194
                {
195
                    var exc = ex.InnerException as WebException;
196
                    if (exc == null)
197
                        throw ex.InnerException;
198
                    if (HandleUploadWebException(action, exc))
199
                        return;
200
                    throw;
201
                }
202
                catch (WebException ex)
203
                {
204
                    if (HandleUploadWebException(action, ex))
205
                        return;
206
                    throw;
207
                }
208
                catch (Exception ex)
209
                {
210
                    Log.Error("Unexpected error while uploading file", ex);
211
                    throw;
212
                }
213
            }
214
        }
215

    
216
        private static void MakeFileReadOnly(string fullFileName)
217
        {
218
            var attributes = File.GetAttributes(fullFileName);
219
            //Do not make any modifications if not necessary
220
            if (attributes.HasFlag(FileAttributes.ReadOnly))
221
                return;
222
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);
223
        }
224

    
225
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
226
        {
227
            var parts = relativePath.Split('\\');
228
            var accountName = parts[1];
229
            var oldName = accountInfo.UserName;
230
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
231
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
232
            var root = absoluteUri.Substring(0, nameIndex);
233

    
234
            accountInfo = new AccountInfo
235
                              {
236
                                  UserName = accountName,
237
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
238
                                  StorageUri = new Uri(root + accountName),
239
                                  BlockHash = accountInfo.BlockHash,
240
                                  BlockSize = accountInfo.BlockSize,
241
                                  Token = accountInfo.Token
242
                              };
243
            return accountInfo;
244
        }
245

    
246

    
247
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
248
        {
249
            if (accountInfo == null)
250
                throw new ArgumentNullException("accountInfo");
251
            if (cloudFile == null)
252
                throw new ArgumentNullException("cloudFile");
253
            if (fileInfo == null)
254
                throw new ArgumentNullException("fileInfo");
255
            if (String.IsNullOrWhiteSpace(url))
256
                throw new ArgumentNullException(url);
257
            if (treeHash == null)
258
                throw new ArgumentNullException("treeHash");
259
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
260
                throw new ArgumentException("Invalid container", "cloudFile");
261
            Contract.EndContractBlock();
262

    
263
           
264
            using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name))
265
            {
266
                if (await WaitOrAbort(accountInfo,cloudFile, token)) 
267
                    return;
268

    
269
                var fullFileName = fileInfo.GetProperCapitalization();
270

    
271
                var account = cloudFile.Account ?? accountInfo.UserName;
272
                var container = cloudFile.Container;
273

    
274

    
275
                var client = new CloudFilesClient(accountInfo);
276
                //Send the hashmap to the server            
277
                var missingHashes = await client.PutHashMap(account, container, url, treeHash);
278
                int block = 0;
279
                ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
280
                //If the server returns no missing hashes, we are done
281
                while (missingHashes.Count > 0)
282
                {
283

    
284
                    if (await WaitOrAbort(accountInfo,cloudFile, token))
285
                        return;
286

    
287

    
288
                    var buffer = new byte[accountInfo.BlockSize];
289
                    foreach (var missingHash in missingHashes)
290
                    {
291
                        if (await WaitOrAbort(accountInfo,cloudFile, token))
292
                            return;
293

    
294

    
295
                        //Find the proper block
296
                        var blockIndex = treeHash.HashDictionary[missingHash];
297
                        long offset = blockIndex*accountInfo.BlockSize;
298

    
299
                        var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
300

    
301
                        try
302
                        {
303
                            //And upload the block                
304
                            await client.PostBlock(account, container, buffer, 0, read);
305
                            Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
306
                        }
307
                        catch (Exception exc)
308
                        {
309
                            Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
310
                        }
311
                        ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
312
                    }
313

    
314
                    token.ThrowIfCancellationRequested();
315
                    //Repeat until there are no more missing hashes                
316
                    missingHashes = await client.PutHashMap(account, container, url, treeHash);
317
                }
318

    
319
                ReportUploadProgress(fileInfo.Name, missingHashes.Count, missingHashes.Count, fileInfo.Length);
320
            }
321
        }
322

    
323
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
324
        {
325
            token.ThrowIfCancellationRequested();
326
            await UnpauseEvent.WaitAsync();
327
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
328
            if (shouldAbort)
329
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
330
            return shouldAbort;
331
        }
332

    
333
        private void ReportUploadProgress(string fileName, int block, int totalBlocks, long fileSize)
334
        {
335
            StatusNotification.Notify(totalBlocks == 0
336
                                          ? new ProgressNotification(fileName, "Uploading", 1, 1, fileSize)
337
                                          : new ProgressNotification(fileName, "Uploading", block, totalBlocks, fileSize));
338
        }
339

    
340

    
341
        private bool HandleUploadWebException(CloudAction action, WebException exc)
342
        {
343
            var response = exc.Response as HttpWebResponse;
344
            if (response == null)
345
                throw exc;
346
            if (response.StatusCode == HttpStatusCode.Unauthorized)
347
            {
348
                Log.Error("Not allowed to upload file", exc);
349
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
350
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
351
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
352
                return true;
353
            }
354
            return false;
355
        }
356

    
357
        [Import]
358
        public Selectives Selectives { get; set; }
359

    
360
        public AsyncManualResetEvent UnpauseEvent { get; set; }
361
    }
362
}