Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / Uploader.cs @ 97edb52f

History | View | Annotate | Download (16.5 kB)

1
using System;
2
using System.Collections.Generic;
3
using System.ComponentModel.Composition;
4
using System.Diagnostics;
5
using System.Diagnostics.Contracts;
6
using System.IO;
7
using System.Linq;
8
using System.Net;
9
using System.Reflection;
10
using System.Security.Cryptography;
11
using System.Threading;
12
using System.Threading.Tasks;
13
using Pithos.Interfaces;
14
using Pithos.Network;
15
using log4net;
16

    
17
namespace Pithos.Core.Agents
18
{
19
    [Export(typeof(Uploader))]
20
    public class Uploader
21
    {
22
        private static readonly ILog Log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
23

    
24
        [Import]
25
        private IStatusKeeper StatusKeeper { get; set; }
26

    
27
        
28
        public IStatusNotification StatusNotification { get; set; }
29

    
30
        
31
        //CancellationTokenSource _cts = new CancellationTokenSource();
32
        /*public void SignalStop()
33
        {
34
            _cts.Cancel();
35
        }*/
36

    
37
        public async Task UploadCloudFile(CloudAction action,CancellationToken cancellationToken)
38
        {
39
            if (action == null)
40
                throw new ArgumentNullException("action");
41
            Contract.EndContractBlock();
42

    
43
            using (ThreadContext.Stacks["Operation"].Push("UploadCloudFile"))
44
            {
45
                try
46
                {
47
                    await UnpauseEvent.WaitAsync();
48

    
49
                    var fileInfo = action.LocalFile;
50

    
51
                    if (fileInfo.Extension.Equals("ignore", StringComparison.InvariantCultureIgnoreCase))
52
                        return;
53

    
54
                    if (!Selectives.IsSelected(action.AccountInfo, fileInfo))
55
                        return;
56

    
57
                    //Try to load the action's local state, if it is empty
58
                    if (action.FileState == null)
59
                        action.FileState = StatusKeeper.GetStateByFilePath(fileInfo.FullName);
60
                    if (action.FileState == null)
61
                    {
62
                        Log.WarnFormat("File [{0}] has no local state. It was probably created by a download action", fileInfo.FullName);
63
                        return;
64
                    }
65

    
66
                    var latestState = action.FileState;
67

    
68
                    //Do not upload files in conflict
69
                    if (latestState.FileStatus == FileStatus.Conflict)
70
                    {
71
                        Log.InfoFormat("Skipping file in conflict [{0}]", fileInfo.FullName);
72
                        return;
73
                    }
74
                    //Do not upload files when we have no permission
75
                    if (latestState.FileStatus == FileStatus.Forbidden)
76
                    {
77
                        Log.InfoFormat("Skipping forbidden file [{0}]", fileInfo.FullName);
78
                        return;
79
                    }
80

    
81
                    //Are we targeting our own account or a sharer account?
82
                    var relativePath = fileInfo.AsRelativeTo(action.AccountInfo.AccountPath);
83
                    var accountInfo = relativePath.StartsWith(FolderConstants.OthersFolder) 
84
                                                  ? GetSharerAccount(relativePath, action.AccountInfo) 
85
                                                  : action.AccountInfo;
86

    
87

    
88

    
89
                    var fullFileName = fileInfo.GetProperCapitalization();
90
                    using (var gate = NetworkGate.Acquire(fullFileName, NetworkOperation.Uploading))
91
                    {
92
                        //Abort if the file is already being uploaded or downloaded
93
                        if (gate.Failed)
94
                            return;
95

    
96
                        var cloudFile = action.CloudFile;
97
                        var account = cloudFile.Account ?? accountInfo.UserName;
98
                        try
99
                        {
100

    
101
                            var client = new CloudFilesClient(accountInfo);
102

    
103
                            //Even if GetObjectInfo times out, we can proceed with the upload            
104
                            var cloudInfo = client.GetObjectInfo(account, cloudFile.Container, cloudFile.Name);
105

    
106
                            //If this a shared file
107
                            if (!cloudFile.Account.Equals(action.AccountInfo.UserName,StringComparison.InvariantCultureIgnoreCase))
108
                            {
109
                                
110
/*
111
                                if (!cloudInfo.IsWritable(action.AccountInfo.UserName))
112
                                {
113
                                    MakeFileReadOnly(fullFileName);
114
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
115
                                    return;
116
                                }
117
*/
118

    
119
                                //If this is a read-only file, do not upload changes
120
                                if ( !cloudInfo.IsWritable(action.AccountInfo.UserName) ||
121
                                    //If the file is new, but we can't upload it
122
                                    (!cloudInfo.Exists && !client.CanUpload(account, cloudFile)) )
123
                                {
124
                                    MakeFileReadOnly(fullFileName);
125
                                    StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
126
                                    return;
127
                                }
128

    
129
                            }
130

    
131
                            await UnpauseEvent.WaitAsync();
132

    
133
                            if (fileInfo is DirectoryInfo)
134
                            {
135
                                //If the directory doesn't exist the Hash property will be empty
136
                                if (String.IsNullOrWhiteSpace(cloudInfo.Hash))
137
                                    //Go on and create the directory
138
                                    await client.PutObject(account, cloudFile.Container, cloudFile.Name, fullFileName,
139
                                                         String.Empty, "application/directory");
140
                            }
141
                            else
142
                            {
143

    
144
                                var cloudHash = cloudInfo.Hash.ToLower();
145

    
146
                                string topHash;
147
                                TreeHash treeHash;
148
                                using(StatusNotification.GetNotifier("Hashing {0} for Upload", "Finished hashing {0}",fileInfo.Name))
149
                                {
150
                                    treeHash = action.TreeHash.Value;
151
                                    topHash = treeHash.TopHash.ToHashString();
152
                                }
153

    
154

    
155

    
156
                                //If the file hashes match, abort the upload
157
                                if (cloudInfo != ObjectInfo.Empty && (topHash == cloudHash ))
158
                                {
159
                                    //but store any metadata changes 
160
                                    StatusKeeper.StoreInfo(fullFileName, cloudInfo);
161
                                    Log.InfoFormat("Skip upload of {0}, hashes match", fullFileName);
162
                                    return;
163
                                }
164

    
165

    
166
                                //Mark the file as modified while we upload it
167
                                StatusKeeper.SetFileOverlayStatus(fullFileName, FileOverlayStatus.Modified);
168
                                //And then upload it
169

    
170
                                //Upload even small files using the Hashmap. The server may already contain
171
                                //the relevant block                                
172

    
173
                                
174

    
175
                                await UploadWithHashMap(accountInfo, cloudFile, fileInfo as FileInfo, cloudFile.Name, treeHash,cancellationToken);
176
                            }
177
                            //If everything succeeds, change the file and overlay status to normal
178
                            StatusKeeper.SetFileState(fullFileName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
179
                        }
180
                        catch (WebException exc)
181
                        {
182
                            var response = (exc.Response as HttpWebResponse);
183
                            if (response == null)
184
                                throw;
185
                            if (response.StatusCode == HttpStatusCode.Forbidden)
186
                            {
187
                                StatusKeeper.SetFileState(fileInfo.FullName, FileStatus.Forbidden, FileOverlayStatus.Conflict, "Forbidden");
188
                                MakeFileReadOnly(fullFileName);
189
                            }
190
                            else
191
                                //In any other case, propagate the error
192
                                throw;
193
                        }
194
                    }
195
                    //Notify the Shell to update the overlays
196
                    NativeMethods.RaiseChangeNotification(fullFileName);
197
                    StatusNotification.NotifyChangedFile(fullFileName);
198
                }
199
                catch (AggregateException ex)
200
                {
201
                    var exc = ex.InnerException as WebException;
202
                    if (exc == null)
203
                        throw ex.InnerException;
204
                    if (HandleUploadWebException(action, exc))
205
                        return;
206
                    throw;
207
                }
208
                catch (WebException ex)
209
                {
210
                    if (HandleUploadWebException(action, ex))
211
                        return;
212
                    throw;
213
                }
214
                catch (Exception ex)
215
                {
216
                    Log.Error("Unexpected error while uploading file", ex);
217
                    throw;
218
                }
219
            }
220
        }
221

    
222
        private static void MakeFileReadOnly(string fullFileName)
223
        {
224
            var attributes = File.GetAttributes(fullFileName);
225
            //Do not make any modifications if not necessary
226
            if (attributes.HasFlag(FileAttributes.ReadOnly))
227
                return;
228
            File.SetAttributes(fullFileName, attributes | FileAttributes.ReadOnly);            
229
        }
230

    
231
        private static AccountInfo GetSharerAccount(string relativePath, AccountInfo accountInfo)
232
        {
233
            var parts = relativePath.Split('\\');
234
            var accountName = parts[1];
235
            var oldName = accountInfo.UserName;
236
            var absoluteUri = accountInfo.StorageUri.AbsoluteUri;
237
            var nameIndex = absoluteUri.IndexOf(oldName, StringComparison.Ordinal);
238
            var root = absoluteUri.Substring(0, nameIndex);
239

    
240
            accountInfo = new AccountInfo
241
                              {
242
                                  UserName = accountName,
243
                                  AccountPath = Path.Combine(accountInfo.AccountPath, parts[0], parts[1]),
244
                                  StorageUri = new Uri(root + accountName),
245
                                  BlockHash = accountInfo.BlockHash,
246
                                  BlockSize = accountInfo.BlockSize,
247
                                  Token = accountInfo.Token
248
                              };
249
            return accountInfo;
250
        }
251

    
252

    
253
        public async Task UploadWithHashMap(AccountInfo accountInfo, ObjectInfo cloudFile, FileInfo fileInfo, string url, TreeHash treeHash, CancellationToken token)
254
        {
255
            if (accountInfo == null)
256
                throw new ArgumentNullException("accountInfo");
257
            if (cloudFile == null)
258
                throw new ArgumentNullException("cloudFile");
259
            if (fileInfo == null)
260
                throw new ArgumentNullException("fileInfo");
261
            if (String.IsNullOrWhiteSpace(url))
262
                throw new ArgumentNullException(url);
263
            if (treeHash == null)
264
                throw new ArgumentNullException("treeHash");
265
            if (String.IsNullOrWhiteSpace(cloudFile.Container))
266
                throw new ArgumentException("Invalid container", "cloudFile");
267
            Contract.EndContractBlock();
268

    
269
           
270
            using (StatusNotification.GetNotifier("Uploading {0}", "Finished Uploading {0}", fileInfo.Name))
271
            {
272
                if (await WaitOrAbort(accountInfo,cloudFile, token)) 
273
                    return;
274

    
275
                var fullFileName = fileInfo.GetProperCapitalization();
276

    
277
                var account = cloudFile.Account ?? accountInfo.UserName;
278
                var container = cloudFile.Container;
279

    
280

    
281
                var client = new CloudFilesClient(accountInfo);
282
                //Send the hashmap to the server            
283
                var missingHashes = await client.PutHashMap(account, container, url, treeHash);
284
                int block = 0;
285
                ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
286
                //If the server returns no missing hashes, we are done
287
                while (missingHashes.Count > 0)
288
                {
289

    
290
                    if (await WaitOrAbort(accountInfo,cloudFile, token))
291
                        return;
292

    
293

    
294
                    var buffer = new byte[accountInfo.BlockSize];
295
                    foreach (var missingHash in missingHashes)
296
                    {
297
                        if (await WaitOrAbort(accountInfo,cloudFile, token))
298
                            return;
299

    
300

    
301
                        //Find the proper block
302
                        var blockIndex = treeHash.HashDictionary[missingHash];
303
                        long offset = blockIndex*accountInfo.BlockSize;
304

    
305
                        var read = fileInfo.Read(buffer, offset, accountInfo.BlockSize);
306

    
307
                        try
308
                        {
309
                            //And upload the block                
310
                            await client.PostBlock(account, container, buffer, 0, read);
311
                            Log.InfoFormat("[BLOCK] Block {0} of {1} uploaded", blockIndex, fullFileName);
312
                        }
313
                        catch (Exception exc)
314
                        {
315
                            Log.Error(String.Format("Uploading block {0} of {1}", blockIndex, fullFileName), exc);
316
                        }
317
                        ReportUploadProgress(fileInfo.Name, block++, missingHashes.Count, fileInfo.Length);
318
                    }
319

    
320
                    token.ThrowIfCancellationRequested();
321
                    //Repeat until there are no more missing hashes                
322
                    missingHashes = await client.PutHashMap(account, container, url, treeHash);
323
                }
324

    
325
                ReportUploadProgress(fileInfo.Name, missingHashes.Count, missingHashes.Count, fileInfo.Length);
326
            }
327
        }
328

    
329
        private async Task<bool> WaitOrAbort(AccountInfo account,ObjectInfo cloudFile, CancellationToken token)
330
        {
331
            token.ThrowIfCancellationRequested();
332
            await UnpauseEvent.WaitAsync();
333
            var shouldAbort = !Selectives.IsSelected(account,cloudFile);
334
            if (shouldAbort)
335
                Log.InfoFormat("Aborting [{0}]",cloudFile.Uri);
336
            return shouldAbort;
337
        }
338

    
339
        private void ReportUploadProgress(string fileName, int block, int totalBlocks, long fileSize)
340
        {
341
            StatusNotification.Notify(totalBlocks == 0
342
                                          ? new ProgressNotification(fileName, "Uploading", 1, 1, fileSize)
343
                                          : new ProgressNotification(fileName, "Uploading", block, totalBlocks, fileSize));
344
        }
345

    
346

    
347
        private bool HandleUploadWebException(CloudAction action, WebException exc)
348
        {
349
            var response = exc.Response as HttpWebResponse;
350
            if (response == null)
351
                throw exc;
352
            if (response.StatusCode == HttpStatusCode.Unauthorized)
353
            {
354
                Log.Error("Not allowed to upload file", exc);
355
                var message = String.Format("Not allowed to uplad file {0}", action.LocalFile.FullName);
356
                StatusKeeper.SetFileState(action.LocalFile.FullName, FileStatus.Unchanged, FileOverlayStatus.Normal, "");
357
                StatusNotification.NotifyChange(message, TraceLevel.Warning);
358
                return true;
359
            }
360
            return false;
361
        }
362

    
363
        [Import]
364
        public Selectives Selectives { get; set; }
365

    
366
        public AsyncManualResetEvent UnpauseEvent { get; set; }
367
    }
368
}