root / trunk / Pithos.Core / Agents / BlockUpdater.cs @ cfed7823
History | View | Annotate | Download (10 kB)
1 |
using System; |
---|---|
2 |
using System.Collections.Concurrent; |
3 |
using System.Collections.Generic; |
4 |
using System.Diagnostics.Contracts; |
5 |
using System.IO; |
6 |
using System.Linq; |
7 |
using System.Security.Cryptography; |
8 |
using System.Text; |
9 |
using System.Threading.Tasks; |
10 |
using Pithos.Network; |
11 |
|
12 |
namespace Pithos.Core.Agents |
13 |
{ |
14 |
class BlockUpdater |
15 |
{ |
16 |
public string FilePath { get; private set; } |
17 |
public string RelativePath { get; private set; } |
18 |
|
19 |
public string FragmentsPath { get; private set; } |
20 |
|
21 |
public TreeHash ServerHash { get; private set; } |
22 |
|
23 |
public string TempPath { get; private set; } |
24 |
|
25 |
readonly ConcurrentDictionary<int, string> _blocks = new ConcurrentDictionary<int, string>(); |
26 |
readonly ConcurrentDictionary<string, string> _orphanBlocks = new ConcurrentDictionary<string, string>(); |
27 |
|
28 |
[ContractInvariantMethod] |
29 |
private void Invariants() |
30 |
{ |
31 |
Contract.Invariant(Path.IsPathRooted(FragmentsPath)); |
32 |
Contract.Invariant(Path.IsPathRooted(FilePath)); |
33 |
Contract.Invariant(Path.IsPathRooted(TempPath)); |
34 |
Contract.Invariant(!Path.IsPathRooted(RelativePath)); |
35 |
Contract.Invariant(_blocks!=null); |
36 |
Contract.Invariant(_orphanBlocks!=null); |
37 |
Contract.Invariant(ServerHash!=null); |
38 |
} |
39 |
|
40 |
public BlockUpdater(string fragmentsPath, string filePath, string relativePath,TreeHash serverHash) |
41 |
{ |
42 |
if (String.IsNullOrWhiteSpace(fragmentsPath)) |
43 |
throw new ArgumentNullException("fragmentsPath"); |
44 |
if (!Path.IsPathRooted(fragmentsPath)) |
45 |
throw new ArgumentException("The fragmentsPath must be rooted", "fragmentsPath"); |
46 |
|
47 |
if (string.IsNullOrWhiteSpace(filePath)) |
48 |
throw new ArgumentNullException("filePath"); |
49 |
if (!Path.IsPathRooted(filePath)) |
50 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
51 |
|
52 |
if (string.IsNullOrWhiteSpace(relativePath)) |
53 |
throw new ArgumentNullException("relativePath"); |
54 |
if (Path.IsPathRooted(relativePath)) |
55 |
throw new ArgumentException("The relativePath must NOT be rooted", "relativePath"); |
56 |
|
57 |
if (serverHash == null) |
58 |
throw new ArgumentNullException("serverHash"); |
59 |
Contract.EndContractBlock(); |
60 |
|
61 |
FragmentsPath=fragmentsPath; |
62 |
FilePath = filePath; |
63 |
RelativePath=relativePath; |
64 |
ServerHash = serverHash; |
65 |
//The file will be stored in a temporary location while downloading with an extension .download |
66 |
TempPath = Path.Combine(FragmentsPath, RelativePath + ".download"); |
67 |
|
68 |
//Need to calculate the directory path because RelativePath may include folders |
69 |
var directoryPath = Path.GetDirectoryName(TempPath); |
70 |
//directoryPath CAN be null if TempPath is a root path |
71 |
if (String.IsNullOrWhiteSpace(directoryPath)) |
72 |
throw new ArgumentException("TempPath"); |
73 |
//FragmentsPath was absolute so directoryPath is absolute too |
74 |
Contract.Assume(Path.IsPathRooted(directoryPath)); |
75 |
|
76 |
if (!Directory.Exists(directoryPath)) |
77 |
Directory.CreateDirectory(directoryPath); |
78 |
|
79 |
LoadOrphans(directoryPath); |
80 |
} |
81 |
|
82 |
private void LoadOrphans(string directoryPath) |
83 |
{ |
84 |
if (string.IsNullOrWhiteSpace(directoryPath)) |
85 |
throw new ArgumentNullException("directoryPath"); |
86 |
if (!Path.IsPathRooted(directoryPath)) |
87 |
throw new ArgumentException("The directoryPath must be rooted", "directoryPath"); |
88 |
if (ServerHash==null) |
89 |
throw new InvalidOperationException("ServerHash wasn't initialized"); |
90 |
Contract.EndContractBlock(); |
91 |
|
92 |
var fileNamename = Path.GetFileName(FilePath); |
93 |
var orphans = Directory.GetFiles(directoryPath, fileNamename + ".*"); |
94 |
foreach (var orphan in orphans) |
95 |
{ |
96 |
using (HashAlgorithm hasher = HashAlgorithm.Create(ServerHash.BlockHash)) |
97 |
{ |
98 |
var buffer=File.ReadAllBytes(orphan); |
99 |
//The server truncates nulls before calculating hashes, have to do the same |
100 |
//Find the last non-null byte, starting from the end |
101 |
var lastByteIndex = Array.FindLastIndex(buffer, buffer.Length-1, aByte => aByte != 0); |
102 |
//lastByteIndex may be -1 if the file was empty. We don't want to use that block file |
103 |
if (lastByteIndex >= 0) |
104 |
{ |
105 |
var binHash = hasher.ComputeHash(buffer, 0, lastByteIndex); |
106 |
var hash = binHash.ToHashString(); |
107 |
_orphanBlocks[hash] = orphan; |
108 |
} |
109 |
} |
110 |
} |
111 |
} |
112 |
|
113 |
|
114 |
public void Commit() |
115 |
{ |
116 |
if (String.IsNullOrWhiteSpace(FilePath)) |
117 |
throw new InvalidOperationException("FilePath is empty"); |
118 |
if (String.IsNullOrWhiteSpace(TempPath)) |
119 |
throw new InvalidOperationException("TempPath is empty"); |
120 |
Contract.EndContractBlock(); |
121 |
|
122 |
//Copy the file to a temporary location. Changes will be made to the |
123 |
//temporary file, then it will replace the original file |
124 |
if (File.Exists(FilePath)) |
125 |
File.Copy(FilePath, TempPath, true); |
126 |
|
127 |
//Set the size of the file to the size specified in the treehash |
128 |
//This will also create an empty file if the file doesn't exist |
129 |
|
130 |
|
131 |
SetFileSize(TempPath, ServerHash.Bytes); |
132 |
|
133 |
//Update the temporary file with the data from the blocks |
134 |
using (var stream = File.OpenWrite(TempPath)) |
135 |
{ |
136 |
foreach (var block in _blocks) |
137 |
{ |
138 |
var blockPath = block.Value; |
139 |
var blockIndex = block.Key; |
140 |
using (var blockStream = File.OpenRead(blockPath)) |
141 |
{ |
142 |
var offset = blockIndex*ServerHash.BlockSize; |
143 |
stream.Seek(offset, SeekOrigin.Begin); |
144 |
blockStream.CopyTo(stream); |
145 |
} |
146 |
} |
147 |
} |
148 |
SwapFiles(); |
149 |
|
150 |
ClearBlocks(); |
151 |
} |
152 |
|
153 |
private void SwapFiles() |
154 |
{ |
155 |
if (String.IsNullOrWhiteSpace(FilePath)) |
156 |
throw new InvalidOperationException("FilePath is empty"); |
157 |
if (String.IsNullOrWhiteSpace(TempPath)) |
158 |
throw new InvalidOperationException("TempPath is empty"); |
159 |
Contract.EndContractBlock(); |
160 |
|
161 |
if (File.Exists(FilePath)) |
162 |
File.Replace(TempPath, FilePath, null, true); |
163 |
else |
164 |
File.Move(TempPath, FilePath); |
165 |
} |
166 |
|
167 |
private void ClearBlocks() |
168 |
{ |
169 |
//Get all the the block paths, orphan or not |
170 |
var paths= _blocks.Select(pair => pair.Value) |
171 |
.Union(_orphanBlocks.Select(pair => pair.Value)); |
172 |
foreach (var filePath in paths) |
173 |
{ |
174 |
File.Delete(filePath); |
175 |
} |
176 |
|
177 |
File.Delete(TempPath); |
178 |
_blocks.Clear(); |
179 |
_orphanBlocks.Clear(); |
180 |
} |
181 |
|
182 |
//Change the file's size, possibly truncating or adding to it |
183 |
private void SetFileSize(string filePath, long fileSize) |
184 |
{ |
185 |
if (String.IsNullOrWhiteSpace(filePath)) |
186 |
throw new ArgumentNullException("filePath"); |
187 |
if (!Path.IsPathRooted(filePath)) |
188 |
throw new ArgumentException("The filePath must be rooted", "filePath"); |
189 |
if (fileSize < 0) |
190 |
throw new ArgumentOutOfRangeException("fileSize"); |
191 |
Contract.EndContractBlock(); |
192 |
|
193 |
using (var stream = File.Open(filePath, FileMode.OpenOrCreate, FileAccess.Write)) |
194 |
{ |
195 |
stream.SetLength(fileSize); |
196 |
} |
197 |
} |
198 |
|
199 |
/* //Check whether we should copy the local file to a temp path |
200 |
private bool ShouldCopy(string localPath, string tempPath) |
201 |
{ |
202 |
//No need to copy if there is no file |
203 |
if (!File.Exists(localPath)) |
204 |
return false; |
205 |
|
206 |
//If there is no temp file, go ahead and copy |
207 |
if (!File.Exists(tempPath)) |
208 |
return true; |
209 |
|
210 |
//If there is a temp file and is newer than the actual file, don't copy |
211 |
var localLastWrite = File.GetLastWriteTime(localPath); |
212 |
var tempLastWrite = File.GetLastWriteTime(tempPath); |
213 |
|
214 |
//This could mean there is an interrupted download in progress |
215 |
return (tempLastWrite < localLastWrite); |
216 |
}*/ |
217 |
|
218 |
|
219 |
public bool UseOrphan(int blockIndex, string blockHash) |
220 |
{ |
221 |
string blockPath=null; |
222 |
if (_orphanBlocks.TryGetValue(blockHash,out blockPath)) |
223 |
{ |
224 |
_blocks[blockIndex] = blockPath; |
225 |
return true; |
226 |
} |
227 |
return false; |
228 |
} |
229 |
|
230 |
public Task StoreBlock(int blockIndex,byte[] buffer) |
231 |
{ |
232 |
var blockPath = String.Format("{0}.{1:000000}", TempPath, blockIndex); |
233 |
_blocks[blockIndex] = blockPath; |
234 |
//Remove any orphan files |
235 |
if (File.Exists(blockPath)) |
236 |
File.Delete(blockPath); |
237 |
|
238 |
return FileAsync.WriteAllBytes(blockPath, buffer); |
239 |
} |
240 |
|
241 |
/*private Task WriteAsync(string filePath, byte[] buffer, int offset, int count) |
242 |
{ |
243 |
var stream = FileAsync.OpenWrite(filePath); |
244 |
try |
245 |
{ |
246 |
stream.Seek(offset, SeekOrigin.Begin); |
247 |
var write = stream.WriteAsync(buffer, 0, count); |
248 |
return write.ContinueWith(s => stream.Close()); |
249 |
} |
250 |
catch (Exception ex) |
251 |
{ |
252 |
stream.Close(); |
253 |
return Task.Factory.FromException(ex); |
254 |
} |
255 |
}*/ |
256 |
|
257 |
} |
258 |
} |