Revision 6a0b1658 kamaki/clients/pithos.py
b/kamaki/clients/pithos.py | ||
---|---|---|
32 | 32 |
# or implied, of GRNET S.A. |
33 | 33 |
|
34 | 34 |
import hashlib |
35 |
import json
|
|
35 |
import os
|
|
36 | 36 |
|
37 |
from . import ClientError |
|
38 |
from .storage import StorageClient |
|
39 | 37 |
from ..utils import OrderedDict |
40 | 38 |
|
39 |
from .storage import StorageClient |
|
40 |
|
|
41 |
|
|
42 |
def pithos_hash(block, blockhash): |
|
43 |
h = hashlib.new(blockhash) |
|
44 |
h.update(block.rstrip('\x00')) |
|
45 |
return h.hexdigest() |
|
46 |
|
|
41 | 47 |
|
42 | 48 |
class PithosClient(StorageClient): |
43 | 49 |
"""GRNet Pithos API client""" |
44 | 50 |
|
45 | 51 |
def put_block(self, data, hash): |
46 |
path = '/%s/%s?update' % (self.account, self.container) |
|
52 |
path = '/%s/%s' % (self.account, self.container) |
|
53 |
params = {'update': ''} |
|
47 | 54 |
headers = {'Content-Type': 'application/octet-stream', |
48 |
'Content-Length': len(data)}
|
|
49 |
resp, reply = self.raw_http_cmd('POST', path, data, headers,
|
|
50 |
success=202)
|
|
51 |
assert reply.strip() == hash, 'Local hash does not match server'
|
|
55 |
'Content-Length': str(len(data))}
|
|
56 |
r = self.post(path, params=params, data=data, headers=headers,
|
|
57 |
success=202) |
|
58 |
assert r.text.strip() == hash, 'Local hash does not match server'
|
|
52 | 59 |
|
53 |
def create_object(self, object, f): |
|
54 |
meta = self.get_container_meta() |
|
60 |
def create_object(self, object, f, hash_cb=None, upload_cb=None): |
|
61 |
"""Create an object by uploading only the missing blocks |
|
62 |
|
|
63 |
hash_cb is a generator function taking the total number of blocks to |
|
64 |
be hashed as an argument. Its next() will be called every time a block |
|
65 |
is hashed. |
|
66 |
|
|
67 |
upload_cb is a generator function with the same properties that is |
|
68 |
called every time a block is uploaded. |
|
69 |
""" |
|
70 |
self.assert_container() |
|
71 |
|
|
72 |
meta = self.get_container_meta(self.container) |
|
55 | 73 |
blocksize = int(meta['block-size']) |
56 | 74 |
blockhash = meta['block-hash'] |
57 | 75 |
|
58 |
size = 0 |
|
76 |
file_size = os.fstat(f.fileno()).st_size |
|
77 |
nblocks = 1 + (file_size - 1) // blocksize |
|
59 | 78 |
hashes = OrderedDict() |
60 |
data = f.read(blocksize) |
|
61 |
while data: |
|
62 |
bytes = len(data) |
|
63 |
h = hashlib.new(blockhash) |
|
64 |
h.update(data.rstrip('\x00')) |
|
65 |
hash = h.hexdigest() |
|
79 |
|
|
80 |
size = 0 |
|
81 |
|
|
82 |
if hash_cb: |
|
83 |
hash_gen = hash_cb(nblocks) |
|
84 |
hash_gen.next() |
|
85 |
for i in range(nblocks): |
|
86 |
block = f.read(blocksize) |
|
87 |
bytes = len(block) |
|
88 |
hash = pithos_hash(block, blockhash) |
|
66 | 89 |
hashes[hash] = (size, bytes) |
67 | 90 |
size += bytes |
68 |
data = f.read(blocksize) |
|
91 |
if hash_cb: |
|
92 |
hash_gen.next() |
|
93 |
|
|
94 |
assert size == file_size |
|
69 | 95 |
|
70 |
path = '/%s/%s/%s?hashmap&format=json' % (self.account, self.container,
|
|
71 |
object)
|
|
96 |
path = '/%s/%s/%s' % (self.account, self.container, object)
|
|
97 |
params = {'hashmap': '', 'format': 'json'}
|
|
72 | 98 |
hashmap = dict(bytes=size, hashes=hashes.keys()) |
73 |
req = json.dumps(hashmap) |
|
74 |
resp, reply = self.raw_http_cmd('PUT', path, req, success=None) |
|
75 |
|
|
76 |
if resp.status not in (201, 409): |
|
77 |
raise ClientError('Invalid response from the server') |
|
99 |
r = self.put(path, params=params, json=hashmap, success=(201, 409)) |
|
78 | 100 |
|
79 |
if resp.status == 201:
|
|
101 |
if r.status_code == 201:
|
|
80 | 102 |
return |
81 | 103 |
|
82 |
missing = json.loads(reply)
|
|
104 |
missing = r.json
|
|
83 | 105 |
|
106 |
if upload_cb: |
|
107 |
upload_gen = upload_cb(len(missing)) |
|
108 |
upload_gen.next() |
|
84 | 109 |
for hash in missing: |
85 | 110 |
offset, bytes = hashes[hash] |
86 | 111 |
f.seek(offset) |
87 | 112 |
data = f.read(bytes) |
88 | 113 |
self.put_block(data, hash) |
114 |
if upload_cb: |
|
115 |
upload_gen.next() |
|
89 | 116 |
|
90 |
self.http_put(path, req, success=201) |
|
117 |
self.put(path, params=params, json=hashmap, success=201) |
Also available in: Unified diff