Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / BlockUpdater.cs @ 796a7c29

History | View | Annotate | Download (13.4 kB)

1
#region
2
/* -----------------------------------------------------------------------
3
 * <copyright file="BlockUpdater.cs" company="GRNet">
4
 * 
5
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or
8
 * without modification, are permitted provided that the following
9
 * conditions are met:
10
 *
11
 *   1. Redistributions of source code must retain the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer.
14
 *
15
 *   2. Redistributions in binary form must reproduce the above
16
 *      copyright notice, this list of conditions and the following
17
 *      disclaimer in the documentation and/or other materials
18
 *      provided with the distribution.
19
 *
20
 *
21
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
22
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
24
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
25
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
28
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32
 * POSSIBILITY OF SUCH DAMAGE.
33
 *
34
 * The views and conclusions contained in the software and
35
 * documentation are those of the authors and should not be
36
 * interpreted as representing official policies, either expressed
37
 * or implied, of GRNET S.A.
38
 * </copyright>
39
 * -----------------------------------------------------------------------
40
 */
41
#endregion
42
using System;
43
using System.Collections.Concurrent;
44
using System.Diagnostics.Contracts;
45
using System.IO;
46
using System.Linq;
47
using System.Reflection;
48
using System.Threading.Tasks;
49
using OpenSSL.Crypto;
50
using Pithos.Network;
51

    
52
namespace Pithos.Core.Agents
53
{
54
    class BlockUpdater
55
    {
56
        //TODO: Must clean orphaned blocks from the Cache folder.
57
        //
58
        //The Cache folder may have orphaned blocks. Blocks may be left in the Cache folder because:
59
        //1. A download was in progress when the application terminated. These blocks are needed to proceed 
60
        //  with partial download
61
        //2. The application terminated abnormally before the blocks were cleared after a download
62
        //3. The server file was deleted before the download completed.
63
        //
64
        //In #1, we need to keep the blocks. We need to detect the other cases and delete orphans
65
        //
66
        //Mitigations:
67
        // - Delete blocks with no corresponding state
68
        // - Check and delete possible orphans when a Deletion is detected
69
        // - Add Advanced command "Clear Cache"
70
        //
71
        //Need a better way to differentiate between cases #2, #3 and #1
72

    
73
        private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
74

    
75
        public string FilePath { get; private set; }
76
        public string RelativePath { get; private set; }
77

    
78
        public string CachePath { get; private set; }
79

    
80
        public TreeHash ServerHash { get; private set; }
81

    
82
        public string TempPath { get; private set; }
83

    
84
        public bool HasBlocks
85
        {
86
            get { return _blocks.Count>0; }            
87
        }
88

    
89
        readonly ConcurrentDictionary<long, string> _blocks = new ConcurrentDictionary<long, string>();
90
        readonly ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>();
91

    
92
        [ContractInvariantMethod]
93
        private void Invariants()
94
        {
95
            Contract.Invariant(Path.IsPathRooted(CachePath));
96
            Contract.Invariant(Path.IsPathRooted(FilePath));
97
            Contract.Invariant(Path.IsPathRooted(TempPath));
98
            Contract.Invariant(!Path.IsPathRooted(RelativePath));
99
            Contract.Invariant(_blocks!=null);
100
            Contract.Invariant(_orphanBlocks!=null);
101
            Contract.Invariant(ServerHash!=null);
102
        }
103

    
104
        public BlockUpdater(string cachePath, string filePath, string relativePath,TreeHash serverHash)
105
        {   
106
            if (String.IsNullOrWhiteSpace(cachePath))
107
                throw new ArgumentNullException("cachePath");
108
            if (!Path.IsPathRooted(cachePath))
109
                throw new ArgumentException("The cachePath must be rooted", "cachePath");
110
            
111
            if (string.IsNullOrWhiteSpace(filePath))
112
                throw new ArgumentNullException("filePath");
113
            if (!Path.IsPathRooted(filePath))
114
                throw new ArgumentException("The filePath must be rooted", "filePath");
115
            
116
            if (string.IsNullOrWhiteSpace(relativePath))
117
                throw new ArgumentNullException("relativePath");
118
            if (Path.IsPathRooted(relativePath))
119
                throw new ArgumentException("The relativePath must NOT be rooted", "relativePath");
120

    
121
            if (serverHash == null)
122
                throw new ArgumentNullException("serverHash");
123
            Contract.EndContractBlock();
124

    
125
            CachePath=cachePath;
126
            FilePath = filePath;
127
            RelativePath=relativePath;
128
            ServerHash = serverHash;
129
            //The file will be stored in a temporary location while downloading with an extension .download
130
            TempPath = Path.Combine(CachePath, RelativePath + ".download");
131
            
132
            //Need to calculate the directory path because RelativePath may include folders
133
            var directoryPath = Path.GetDirectoryName(TempPath);            
134
            //directoryPath CAN be null if TempPath is a root path
135
            if (String.IsNullOrWhiteSpace(directoryPath))
136
                throw new ArgumentException("TempPath");
137
            //CachePath was absolute so directoryPath is absolute too
138
            Contract.Assume(Path.IsPathRooted(directoryPath));
139
            
140
            if (!Directory.Exists(directoryPath))
141
                Directory.CreateDirectory(directoryPath);
142

    
143
            LoadOrphans(directoryPath);
144
        }
145

    
146
        private void LoadOrphans(string directoryPath)
147
        {
148
            if (string.IsNullOrWhiteSpace(directoryPath))
149
                throw new ArgumentNullException("directoryPath");
150
            if (!Path.IsPathRooted(directoryPath))
151
                throw new ArgumentException("The directoryPath must be rooted", "directoryPath");
152
            if (ServerHash==null)
153
                throw new InvalidOperationException("ServerHash wasn't initialized");
154
            Contract.EndContractBlock();
155

    
156
            var fileNamename = Path.GetFileName(FilePath);
157
            var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*");
158
            foreach (var orphan in orphans)
159
            {
160
                using (var hasher = new MessageDigestContext(MessageDigest.CreateByName(ServerHash.BlockHash)))                
161
                {
162
                    hasher.Init();
163
                    var buffer=File.ReadAllBytes(orphan);
164
                    //The server truncates nulls before calculating hashes, have to do the same
165
                    //Find the last non-null byte, starting from the end
166
                    var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0);
167
                    //lastByteIndex may be -1 if the file was empty. We don't want to use that block file
168
                    if (lastByteIndex >= 0)
169
                    {
170
                        byte[] block;
171
                        if (lastByteIndex == buffer.Length - 1)
172
                            block = buffer;
173
                        else
174
                        {
175
                            block=new byte[lastByteIndex];
176
                            Buffer.BlockCopy(buffer,0,block,0,lastByteIndex);
177
                        }
178
                        var binHash = hasher.Digest(block);
179
                        var hash = binHash.ToHashString();
180
                        _orphanBlocks[hash] = orphan;
181
                    }
182
                }
183
            }
184
        }
185

    
186

    
187
        public void Commit()
188
        {
189
            if (String.IsNullOrWhiteSpace(FilePath))
190
                throw new InvalidOperationException("FilePath is empty");
191
            if (String.IsNullOrWhiteSpace(TempPath))
192
                throw new InvalidOperationException("TempPath is empty");
193
            Contract.EndContractBlock();
194

    
195
            //Copy the file to a temporary location. Changes will be made to the
196
            //temporary file, then it will replace the original file
197
            if (File.Exists(FilePath))
198
                File.Copy(FilePath, TempPath, true);
199

    
200
            //Set the size of the file to the size specified in the treehash
201
            //This will also create an empty file if the file doesn't exist                        
202
            
203
            
204
            SetFileSize(TempPath, ServerHash.Bytes);
205

    
206
            //Update the temporary file with the data from the blocks
207
            using (var stream = File.OpenWrite(TempPath))
208
            {
209
                foreach (var block in _blocks)
210
                {
211
                    var blockPath = block.Value;
212
                    var blockIndex = block.Key;
213
                    using (var blockStream = File.OpenRead(blockPath))
214
                    {                        
215
                        long offset = blockIndex*ServerHash.BlockSize;
216
                        stream.Seek(offset, SeekOrigin.Begin);
217
                        blockStream.CopyTo(stream);
218
                    }
219
                }
220
            }
221
            SwapFiles();
222

    
223
            ClearBlocks();
224
        }
225

    
226
        private void SwapFiles()
227
        {
228
            if (String.IsNullOrWhiteSpace(FilePath))
229
                throw new InvalidOperationException("FilePath is empty");
230
            if (String.IsNullOrWhiteSpace(TempPath))
231
                throw new InvalidOperationException("TempPath is empty");            
232
            Contract.EndContractBlock();
233

    
234
            if (File.Exists(FilePath))
235
                File.Replace(TempPath, FilePath, null, true);
236
            else
237
            {
238
                var targetDirectory = Path.GetDirectoryName(FilePath);
239
                if (!Directory.Exists(targetDirectory))
240
                    Directory.CreateDirectory(targetDirectory);
241
                File.Move(TempPath, FilePath);
242
            }
243
        }
244

    
245
        private void ClearBlocks()
246
        {
247
            if (Log.IsDebugEnabled)
248
                Log.DebugFormat("Clearing blocks for {0}",this.FilePath);
249
            //Get all the the block paths, orphan or not
250
            var paths= _blocks.Select(pair => pair.Value)
251
                          .Union(_orphanBlocks.Select(pair => pair.Value));
252
            foreach (var filePath in paths)
253
            {
254
                File.Delete(filePath);
255
            }
256

    
257
            File.Delete(TempPath);
258
            _blocks.Clear();
259
            _orphanBlocks.Clear();
260
        }
261

    
262
        //Change the file's size, possibly truncating or adding to it
263
        private  void SetFileSize(string filePath, long fileSize)
264
        {
265
            if (String.IsNullOrWhiteSpace(filePath))
266
                throw new ArgumentNullException("filePath");
267
            if (!Path.IsPathRooted(filePath))
268
                throw new ArgumentException("The filePath must be rooted", "filePath");
269
            if (fileSize < 0)
270
                throw new ArgumentOutOfRangeException("fileSize");
271
            Contract.EndContractBlock();
272

    
273
            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
274
            {
275
                stream.SetLength(fileSize);
276
            }
277
        }
278

    
279
       /* //Check whether we should copy the local file to a temp path        
280
        private  bool ShouldCopy(string localPath, string tempPath)
281
        {
282
            //No need to copy if there is no file
283
            if (!File.Exists(localPath))
284
                return false;
285

    
286
            //If there is no temp file, go ahead and copy
287
            if (!File.Exists(tempPath))
288
                return true;
289

    
290
            //If there is a temp file and is newer than the actual file, don't copy
291
            var localLastWrite = File.GetLastWriteTime(localPath);
292
            var tempLastWrite = File.GetLastWriteTime(tempPath);
293

    
294
            //This could mean there is an interrupted download in progress
295
            return (tempLastWrite < localLastWrite);
296
        }*/
297

    
298

    
299
        public bool UseOrphan(long blockIndex, string blockHash)
300
        {
301
            string blockPath=null;
302
            if (_orphanBlocks.TryGetValue(blockHash,out blockPath))
303
            {
304
                _blocks[blockIndex] = blockPath;
305
                return true;
306
            }
307
            return false;
308
        }
309

    
310
        public Task StoreBlock(long blockIndex,byte[] buffer)
311
        {
312
            var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex);
313
            _blocks[blockIndex] = blockPath;
314
            //Remove any orphan files
315
            if (File.Exists(blockPath))
316
                File.Delete(blockPath);
317

    
318
            return FileAsync.WriteAllBytes(blockPath, buffer);
319
        }
320

    
321
       
322

    
323
    }
324
}