Statistics
| Branch: | Revision:

root / trunk / Pithos.Core / Agents / BlockUpdater.cs @ cfed7823

History | View | Annotate | Download (10 kB)

1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Diagnostics.Contracts;
5
using System.IO;
6
using System.Linq;
7
using System.Security.Cryptography;
8
using System.Text;
9
using System.Threading.Tasks;
10
using Pithos.Network;
11

    
12
namespace Pithos.Core.Agents
13
{
14
    class BlockUpdater
15
    {
16
        public string FilePath { get; private set; }
17
        public string RelativePath { get; private set; }
18

    
19
        public string FragmentsPath { get; private set; }
20

    
21
        public TreeHash ServerHash { get; private set; }
22

    
23
        public string TempPath { get; private set; }
24

    
25
        readonly ConcurrentDictionary<int, string> _blocks = new ConcurrentDictionary<int, string>();
26
        readonly ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>();
27

    
28
        [ContractInvariantMethod]
29
        private void Invariants()
30
        {
31
            Contract.Invariant(Path.IsPathRooted(FragmentsPath));
32
            Contract.Invariant(Path.IsPathRooted(FilePath));
33
            Contract.Invariant(Path.IsPathRooted(TempPath));
34
            Contract.Invariant(!Path.IsPathRooted(RelativePath));
35
            Contract.Invariant(_blocks!=null);
36
            Contract.Invariant(_orphanBlocks!=null);
37
            Contract.Invariant(ServerHash!=null);
38
        }
39

    
40
        public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash)
41
        {   
42
            if (String.IsNullOrWhiteSpace(fragmentsPath))
43
                throw new ArgumentNullException("fragmentsPath");
44
            if (!Path.IsPathRooted(fragmentsPath))
45
                throw new ArgumentException("The fragmentsPath must be rooted", "fragmentsPath");
46
            
47
            if (string.IsNullOrWhiteSpace(filePath))
48
                throw new ArgumentNullException("filePath");
49
            if (!Path.IsPathRooted(filePath))
50
                throw new ArgumentException("The filePath must be rooted", "filePath");
51
            
52
            if (string.IsNullOrWhiteSpace(relativePath))
53
                throw new ArgumentNullException("relativePath");
54
            if (Path.IsPathRooted(relativePath))
55
                throw new ArgumentException("The relativePath must NOT be rooted", "relativePath");
56

    
57
            if (serverHash == null)
58
                throw new ArgumentNullException("serverHash");
59
            Contract.EndContractBlock();
60

    
61
            FragmentsPath=fragmentsPath;
62
            FilePath = filePath;
63
            RelativePath=relativePath;
64
            ServerHash = serverHash;
65
            //The file will be stored in a temporary location while downloading with an extension .download
66
            TempPath = Path.Combine(FragmentsPath, RelativePath + ".download");
67
            
68
            //Need to calculate the directory path because RelativePath may include folders
69
            var directoryPath = Path.GetDirectoryName(TempPath);            
70
            //directoryPath CAN be null if TempPath is a root path
71
            if (String.IsNullOrWhiteSpace(directoryPath))
72
                throw new ArgumentException("TempPath");
73
            //FragmentsPath was absolute so directoryPath is absolute too
74
            Contract.Assume(Path.IsPathRooted(directoryPath));
75
            
76
            if (!Directory.Exists(directoryPath))
77
                Directory.CreateDirectory(directoryPath);
78

    
79
            LoadOrphans(directoryPath);
80
        }
81

    
82
        private void LoadOrphans(string directoryPath)
83
        {
84
            if (string.IsNullOrWhiteSpace(directoryPath))
85
                throw new ArgumentNullException("directoryPath");
86
            if (!Path.IsPathRooted(directoryPath))
87
                throw new ArgumentException("The directoryPath must be rooted", "directoryPath");
88
            if (ServerHash==null)
89
                throw new InvalidOperationException("ServerHash wasn't initialized");
90
            Contract.EndContractBlock();
91

    
92
            var fileNamename = Path.GetFileName(FilePath);
93
            var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*");
94
            foreach (var orphan in orphans)
95
            {
96
                using (HashAlgorithm hasher = HashAlgorithm.Create(ServerHash.BlockHash))
97
                {
98
                    var buffer=File.ReadAllBytes(orphan);
99
                    //The server truncates nulls before calculating hashes, have to do the same
100
                    //Find the last non-null byte, starting from the end
101
                    var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0);
102
                    //lastByteIndex may be -1 if the file was empty. We don't want to use that block file
103
                    if (lastByteIndex >= 0)
104
                    {
105
                        var binHash = hasher.ComputeHash(buffer, 0, lastByteIndex);
106
                        var hash = binHash.ToHashString();
107
                        _orphanBlocks[hash] = orphan;
108
                    }
109
                }
110
            }
111
        }
112

    
113

    
114
        public void Commit()
115
        {
116
            if (String.IsNullOrWhiteSpace(FilePath))
117
                throw new InvalidOperationException("FilePath is empty");
118
            if (String.IsNullOrWhiteSpace(TempPath))
119
                throw new InvalidOperationException("TempPath is empty");
120
            Contract.EndContractBlock();
121

    
122
            //Copy the file to a temporary location. Changes will be made to the
123
            //temporary file, then it will replace the original file
124
            if (File.Exists(FilePath))
125
                File.Copy(FilePath, TempPath, true);
126

    
127
            //Set the size of the file to the size specified in the treehash
128
            //This will also create an empty file if the file doesn't exist                        
129
            
130
            
131
            SetFileSize(TempPath, ServerHash.Bytes);
132

    
133
            //Update the temporary file with the data from the blocks
134
            using (var stream = File.OpenWrite(TempPath))
135
            {
136
                foreach (var block in _blocks)
137
                {
138
                    var blockPath = block.Value;
139
                    var blockIndex = block.Key;
140
                    using (var blockStream = File.OpenRead(blockPath))
141
                    {                        
142
                        var offset = blockIndex*ServerHash.BlockSize;
143
                        stream.Seek(offset, SeekOrigin.Begin);
144
                        blockStream.CopyTo(stream);
145
                    }
146
                }
147
            }
148
            SwapFiles();
149

    
150
            ClearBlocks();
151
        }
152

    
153
        private void SwapFiles()
154
        {
155
            if (String.IsNullOrWhiteSpace(FilePath))
156
                throw new InvalidOperationException("FilePath is empty");
157
            if (String.IsNullOrWhiteSpace(TempPath))
158
                throw new InvalidOperationException("TempPath is empty");            
159
            Contract.EndContractBlock();
160

    
161
            if (File.Exists(FilePath))
162
                File.Replace(TempPath, FilePath, null, true);
163
            else
164
                File.Move(TempPath, FilePath);
165
        }
166

    
167
        private void ClearBlocks()
168
        {
169
            //Get all the the block paths, orphan or not
170
            var paths= _blocks.Select(pair => pair.Value)
171
                          .Union(_orphanBlocks.Select(pair => pair.Value));
172
            foreach (var filePath in paths)
173
            {
174
                File.Delete(filePath);
175
            }
176

    
177
            File.Delete(TempPath);
178
            _blocks.Clear();
179
            _orphanBlocks.Clear();
180
        }
181

    
182
        //Change the file's size, possibly truncating or adding to it
183
        private  void SetFileSize(string filePath, long fileSize)
184
        {
185
            if (String.IsNullOrWhiteSpace(filePath))
186
                throw new ArgumentNullException("filePath");
187
            if (!Path.IsPathRooted(filePath))
188
                throw new ArgumentException("The filePath must be rooted", "filePath");
189
            if (fileSize < 0)
190
                throw new ArgumentOutOfRangeException("fileSize");
191
            Contract.EndContractBlock();
192

    
193
            using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write))
194
            {
195
                stream.SetLength(fileSize);
196
            }
197
        }
198

    
199
       /* //Check whether we should copy the local file to a temp path        
200
        private  bool ShouldCopy(string localPath, string tempPath)
201
        {
202
            //No need to copy if there is no file
203
            if (!File.Exists(localPath))
204
                return false;
205

    
206
            //If there is no temp file, go ahead and copy
207
            if (!File.Exists(tempPath))
208
                return true;
209

    
210
            //If there is a temp file and is newer than the actual file, don't copy
211
            var localLastWrite = File.GetLastWriteTime(localPath);
212
            var tempLastWrite = File.GetLastWriteTime(tempPath);
213

    
214
            //This could mean there is an interrupted download in progress
215
            return (tempLastWrite < localLastWrite);
216
        }*/
217

    
218

    
219
        public bool UseOrphan(int blockIndex, string blockHash)
220
        {
221
            string blockPath=null;
222
            if (_orphanBlocks.TryGetValue(blockHash,out blockPath))
223
            {
224
                _blocks[blockIndex] = blockPath;
225
                return true;
226
            }
227
            return false;
228
        }
229

    
230
        public Task StoreBlock(int blockIndex,byte[] buffer)
231
        {
232
            var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex);
233
            _blocks[blockIndex] = blockPath;
234
            //Remove any orphan files
235
            if (File.Exists(blockPath))
236
                File.Delete(blockPath);
237

    
238
            return FileAsync.WriteAllBytes(blockPath, buffer);
239
        }
240

    
241
        /*private Task WriteAsync(string filePath, byte[] buffer, int offset, int count)
242
        {
243
            var stream = FileAsync.OpenWrite(filePath);
244
            try
245
            {
246
                stream.Seek(offset, SeekOrigin.Begin);
247
                var write = stream.WriteAsync(buffer, 0, count);
248
                return write.ContinueWith(s => stream.Close());
249
            }
250
            catch (Exception ex)
251
            {
252
                stream.Close();
253
                return Task.Factory.FromException(ex);
254
            }
255
        }*/
256

    
257
    }
258
}