root / trunk / Pithos.Core / Agents / BlockUpdater.cs @ 796a7c29
History | View | Annotate | Download (13.4 kB)
1 |
#region |
---|---|
2 |
/* ----------------------------------------------------------------------- |
3 |
* <copyright file="BlockUpdater.cs" company="GRNet"> |
4 |
* |
5 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
6 |
* |
7 |
* Redistribution and use in source and binary forms, with or |
8 |
* without modification, are permitted provided that the following |
9 |
* conditions are met: |
10 |
* |
11 |
* 1. Redistributions of source code must retain the above |
12 |
* copyright notice, this list of conditions and the following |
13 |
* disclaimer. |
14 |
* |
15 |
* 2. Redistributions in binary form must reproduce the above |
16 |
* copyright notice, this list of conditions and the following |
17 |
* disclaimer in the documentation and/or other materials |
18 |
* provided with the distribution. |
19 |
* |
20 |
* |
21 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
22 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
23 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
24 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
25 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
26 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
27 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
28 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
29 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
30 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
31 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
32 |
* POSSIBILITY OF SUCH DAMAGE. |
33 |
* |
34 |
* The views and conclusions contained in the software and |
35 |
* documentation are those of the authors and should not be |
36 |
* interpreted as representing official policies, either expressed |
37 |
* or implied, of GRNET S.A. |
38 |
* </copyright> |
39 |
* ----------------------------------------------------------------------- |
40 |
*/ |
41 |
#endregion |
42 |
using System; |
43 |
using System.Collections.Concurrent; |
44 |
using System.Diagnostics.Contracts; |
45 |
using System.IO; |
46 |
using System.Linq; |
47 |
using System.Reflection; |
48 |
using System.Threading.Tasks; |
49 |
using OpenSSL.Crypto; |
50 |
using Pithos.Network; |
51 |
|
52 |
namespace Pithos.Core.Agents |
53 |
{ |
54 |
class BlockUpdater |
55 |
{ |
56 |
//TODO: Must clean orphaned blocks from the Cache folder. |
57 |
// |
58 |
//The Cache folder may have orphaned blocks. Blocks may be left in the Cache folder because: |
59 |
//1. A download was in progress when the application terminated. These blocks are needed to proceed |
60 |
// with partial download |
61 |
//2. The application terminated abnormally before the blocks were cleared after a download |
62 |
//3. The server file was deleted before the download completed. |
63 |
// |
64 |
//In #1, we need to keep the blocks. We need to detect the other cases and delete orphans |
65 |
// |
66 |
//Mitigations: |
67 |
// - Delete blocks with no corresponding state |
68 |
// - Check and delete possible orphans when a Deletion is detected |
69 |
// - Add Advanced command "Clear Cache" |
70 |
// |
71 |
//Need a better way to differentiate between cases #2, #3 and #1 |
72 |
|
73 |
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); |
74 |
|
75 |
public string FilePath { get; private set; } |
76 |
public string RelativePath { get; private set; } |
77 |
|
78 |
public string CachePath { get; private set; } |
79 |
|
80 |
public TreeHash ServerHash { get; private set; } |
81 |
|
82 |
public string TempPath { get; private set; } |
83 |
|
84 |
public bool HasBlocks |
85 |
{ |
86 |
get { return _blocks.Count>0; } |
87 |
} |
88 |
|
89 |
readonly ConcurrentDictionary<long, string> _blocks = new ConcurrentDictionary<long, string>(); |
90 |
readonly ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>(); |
91 |
|
92 |
[ContractInvariantMethod] |
93 |
private void Invariants() |
94 |
{ |
95 |
Contract.Invariant(Path.IsPathRooted(CachePath)); |
96 |
Contract.Invariant(Path.IsPathRooted(FilePath)); |
97 |
Contract.Invariant(Path.IsPathRooted(TempPath)); |
98 |
Contract.Invariant(!Path.IsPathRooted(RelativePath)); |
99 |
Contract.Invariant(_blocks!=null); |
100 |
Contract.Invariant(_orphanBlocks!=null); |
101 |
Contract.Invariant(ServerHash!=null); |
102 |
} |
103 |
|
104 |
public BlockUpdater(string cachePath, string filePath, string relativePath,TreeHash serverHash) |
105 |
{ |
106 |
if (String.IsNullOrWhiteSpace(cachePath)) |
107 |
throw new ArgumentNullException("cachePath"); |
108 |
if (!Path.IsPathRooted(cachePath)) |
109 |
throw new ArgumentException("The cachePath must be rooted", "cachePath"); |
110 |
|
111 |
if (string.IsNullOrWhiteSpace(filePath)) |
112 |
throw new ArgumentNullException("filePath"); |
113 |
if (!Path.IsPathRooted(filePath)) |
114 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
115 |
|
116 |
if (string.IsNullOrWhiteSpace(relativePath)) |
117 |
throw new ArgumentNullException("relativePath"); |
118 |
if (Path.IsPathRooted(relativePath)) |
119 |
throw new ArgumentException("The relativePath must NOT be rooted", "relativePath"); |
120 |
|
121 |
if (serverHash == null) |
122 |
throw new ArgumentNullException("serverHash"); |
123 |
Contract.EndContractBlock(); |
124 |
|
125 |
CachePath=cachePath; |
126 |
FilePath = filePath; |
127 |
RelativePath=relativePath; |
128 |
ServerHash = serverHash; |
129 |
//The file will be stored in a temporary location while downloading with an extension .download |
130 |
TempPath = Path.Combine(CachePath, RelativePath + ".download"); |
131 |
|
132 |
//Need to calculate the directory path because RelativePath may include folders |
133 |
var directoryPath = Path.GetDirectoryName(TempPath); |
134 |
//directoryPath CAN be null if TempPath is a root path |
135 |
if (String.IsNullOrWhiteSpace(directoryPath)) |
136 |
throw new ArgumentException("TempPath"); |
137 |
//CachePath was absolute so directoryPath is absolute too |
138 |
Contract.Assume(Path.IsPathRooted(directoryPath)); |
139 |
|
140 |
if (!Directory.Exists(directoryPath)) |
141 |
Directory.CreateDirectory(directoryPath); |
142 |
|
143 |
LoadOrphans(directoryPath); |
144 |
} |
145 |
|
146 |
private void LoadOrphans(string directoryPath) |
147 |
{ |
148 |
if (string.IsNullOrWhiteSpace(directoryPath)) |
149 |
throw new ArgumentNullException("directoryPath"); |
150 |
if (!Path.IsPathRooted(directoryPath)) |
151 |
throw new ArgumentException("The directoryPath must be rooted", "directoryPath"); |
152 |
if (ServerHash==null) |
153 |
throw new InvalidOperationException("ServerHash wasn't initialized"); |
154 |
Contract.EndContractBlock(); |
155 |
|
156 |
var fileNamename = Path.GetFileName(FilePath); |
157 |
var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*"); |
158 |
foreach (var orphan in orphans) |
159 |
{ |
160 |
using (var hasher = new MessageDigestContext(MessageDigest.CreateByName(ServerHash.BlockHash))) |
161 |
{ |
162 |
hasher.Init(); |
163 |
var buffer=File.ReadAllBytes(orphan); |
164 |
//The server truncates nulls before calculating hashes, have to do the same |
165 |
//Find the last non-null byte, starting from the end |
166 |
var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0); |
167 |
//lastByteIndex may be -1 if the file was empty. We don't want to use that block file |
168 |
if (lastByteIndex >= 0) |
169 |
{ |
170 |
byte[] block; |
171 |
if (lastByteIndex == buffer.Length - 1) |
172 |
block = buffer; |
173 |
else |
174 |
{ |
175 |
block=new byte[lastByteIndex]; |
176 |
Buffer.BlockCopy(buffer,0,block,0,lastByteIndex); |
177 |
} |
178 |
var binHash = hasher.Digest(block); |
179 |
var hash = binHash.ToHashString(); |
180 |
_orphanBlocks[hash] = orphan; |
181 |
} |
182 |
} |
183 |
} |
184 |
} |
185 |
|
186 |
|
187 |
public void Commit() |
188 |
{ |
189 |
if (String.IsNullOrWhiteSpace(FilePath)) |
190 |
throw new InvalidOperationException("FilePath is empty"); |
191 |
if (String.IsNullOrWhiteSpace(TempPath)) |
192 |
throw new InvalidOperationException("TempPath is empty"); |
193 |
Contract.EndContractBlock(); |
194 |
|
195 |
//Copy the file to a temporary location. Changes will be made to the |
196 |
//temporary file, then it will replace the original file |
197 |
if (File.Exists(FilePath)) |
198 |
File.Copy(FilePath, TempPath, true); |
199 |
|
200 |
//Set the size of the file to the size specified in the treehash |
201 |
//This will also create an empty file if the file doesn't exist |
202 |
|
203 |
|
204 |
SetFileSize(TempPath, ServerHash.Bytes); |
205 |
|
206 |
//Update the temporary file with the data from the blocks |
207 |
using (var stream = File.OpenWrite(TempPath)) |
208 |
{ |
209 |
foreach (var block in _blocks) |
210 |
{ |
211 |
var blockPath = block.Value; |
212 |
var blockIndex = block.Key; |
213 |
using (var blockStream = File.OpenRead(blockPath)) |
214 |
{ |
215 |
long offset = blockIndex*ServerHash.BlockSize; |
216 |
stream.Seek(offset, SeekOrigin.Begin); |
217 |
blockStream.CopyTo(stream); |
218 |
} |
219 |
} |
220 |
} |
221 |
SwapFiles(); |
222 |
|
223 |
ClearBlocks(); |
224 |
} |
225 |
|
226 |
private void SwapFiles() |
227 |
{ |
228 |
if (String.IsNullOrWhiteSpace(FilePath)) |
229 |
throw new InvalidOperationException("FilePath is empty"); |
230 |
if (String.IsNullOrWhiteSpace(TempPath)) |
231 |
throw new InvalidOperationException("TempPath is empty"); |
232 |
Contract.EndContractBlock(); |
233 |
|
234 |
if (File.Exists(FilePath)) |
235 |
File.Replace(TempPath, FilePath, null, true); |
236 |
else |
237 |
{ |
238 |
var targetDirectory = Path.GetDirectoryName(FilePath); |
239 |
if (!Directory.Exists(targetDirectory)) |
240 |
Directory.CreateDirectory(targetDirectory); |
241 |
File.Move(TempPath, FilePath); |
242 |
} |
243 |
} |
244 |
|
245 |
private void ClearBlocks() |
246 |
{ |
247 |
if (Log.IsDebugEnabled) |
248 |
Log.DebugFormat("Clearing blocks for {0}",this.FilePath); |
249 |
//Get all the the block paths, orphan or not |
250 |
var paths= _blocks.Select(pair => pair.Value) |
251 |
.Union(_orphanBlocks.Select(pair => pair.Value)); |
252 |
foreach (var filePath in paths) |
253 |
{ |
254 |
File.Delete(filePath); |
255 |
} |
256 |
|
257 |
File.Delete(TempPath); |
258 |
_blocks.Clear(); |
259 |
_orphanBlocks.Clear(); |
260 |
} |
261 |
|
262 |
//Change the file's size, possibly truncating or adding to it |
263 |
private void SetFileSize(string filePath, long fileSize) |
264 |
{ |
265 |
if (String.IsNullOrWhiteSpace(filePath)) |
266 |
throw new ArgumentNullException("filePath"); |
267 |
if (!Path.IsPathRooted(filePath)) |
268 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
269 |
if (fileSize < 0) |
270 |
throw new ArgumentOutOfRangeException("fileSize"); |
271 |
Contract.EndContractBlock(); |
272 |
|
273 |
using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write)) |
274 |
{ |
275 |
stream.SetLength(fileSize); |
276 |
} |
277 |
} |
278 |
|
279 |
/* //Check whether we should copy the local file to a temp path |
280 |
private bool ShouldCopy(string localPath, string tempPath) |
281 |
{ |
282 |
//No need to copy if there is no file |
283 |
if (!File.Exists(localPath)) |
284 |
return false; |
285 |
|
286 |
//If there is no temp file, go ahead and copy |
287 |
if (!File.Exists(tempPath)) |
288 |
return true; |
289 |
|
290 |
//If there is a temp file and is newer than the actual file, don't copy |
291 |
var localLastWrite = File.GetLastWriteTime(localPath); |
292 |
var tempLastWrite = File.GetLastWriteTime(tempPath); |
293 |
|
294 |
//This could mean there is an interrupted download in progress |
295 |
return (tempLastWrite < localLastWrite); |
296 |
}*/ |
297 |
|
298 |
|
299 |
public bool UseOrphan(long blockIndex, string blockHash) |
300 |
{ |
301 |
string blockPath=null; |
302 |
if (_orphanBlocks.TryGetValue(blockHash,out blockPath)) |
303 |
{ |
304 |
_blocks[blockIndex] = blockPath; |
305 |
return true; |
306 |
} |
307 |
return false; |
308 |
} |
309 |
|
310 |
public Task StoreBlock(long blockIndex,byte[] buffer) |
311 |
{ |
312 |
var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex); |
313 |
_blocks[blockIndex] = blockPath; |
314 |
//Remove any orphan files |
315 |
if (File.Exists(blockPath)) |
316 |
File.Delete(blockPath); |
317 |
|
318 |
return FileAsync.WriteAllBytes(blockPath, buffer); |
319 |
} |
320 |
|
321 |
|
322 |
|
323 |
} |
324 |
} |