1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
39 from .storage import StorageClient, ClientError
40 from .utils import path4url, params4url, prefix_keys, filter_in, filter_out
43 def pithos_hash(block, blockhash):
44 h = hashlib.new(blockhash)
45 h.update(block.rstrip('\x00'))
49 class PithosClient(StorageClient):
50 """GRNet Pithos API client"""
52 def purge_container(self, container):
54 path = path4url(self.account, container)+params4url({'until': unicode(time())})
55 self.delete(path, success=204)
57 def put_block(self, data, hash):
58 path = path4url(self.account, self.container)+params4url({'update':None})
59 self.set_header('Content-Type', 'application/octet-stream')
60 self.set_header('Content-Length', len(data))
61 r = self.post(path, data=data, success=202)
62 assert r.text.strip() == hash, 'Local hash does not match server'
64 def create_object(self, object, f, size=None, hash_cb=None,
66 """Create an object by uploading only the missing blocks
67 hash_cb is a generator function taking the total number of blocks to
68 be hashed as an argument. Its next() will be called every time a block
71 upload_cb is a generator function with the same properties that is
72 called every time a block is uploaded.
74 self.assert_container()
76 meta = self.get_container_info(self.container)
77 blocksize = int(meta['x-container-block-size'])
78 blockhash = meta['x-container-block-hash']
80 size = size if size is not None else os.fstat(f.fileno()).st_size
81 nblocks = 1 + (size - 1) // blocksize
88 hash_gen = hash_cb(nblocks)
91 for i in range(nblocks):
92 block = f.read(min(blocksize, size - offset))
94 hash = pithos_hash(block, blockhash)
96 map[hash] = (offset, bytes)
101 assert offset == size
103 path = path4url(self.account, self.container, object)+params4url(dict(format='json', hashmap=''))
104 hashmap = dict(bytes=size, hashes=hashes)
105 self.set_header('Content-Type', 'application/octet-stream')
106 r = self.put(path, json=hashmap, success=(201, 409))
108 if r.status_code == 201:
114 upload_gen = upload_cb(len(missing))
118 offset, bytes = map[hash]
121 self.put_block(data, hash)
125 self.put(path, json=hashmap, success=201)
127 def set_account_group(self, group, usernames):
128 self.assert_account()
129 path = path4url(self.account)+params4url({'update':None})
132 for user in usernames:
133 userstr = userstr + dlm + user
135 self.set_header('X-Account-Group-'+group, userstr)
136 self.post(path, success=202)
138 def del_account_group(self, group):
139 self.assert_account()
140 path = path4url(self.account)+params4url({'update':None})
141 self.set_header('X-Account-Group-'+group, '')
142 r = self.post(path, success=202)
144 def get_account_quota(self):
145 return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
147 def get_account_versioning(self):
148 return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
150 def get_account_meta(self):
151 return filter_in(self.get_account_info(), 'X-Account-Meta-')
153 def get_account_group(self):
154 return filter_in(self.get_account_info(), 'X-Account-Group-')
156 def set_account_meta(self, metapairs):
157 assert(type(metapairs) is dict)
158 self.assert_account()
159 path = path4url(self.account)+params4url({'update':None})
160 for key, val in metapairs.items():
161 self.set_header('X-Account-Meta-'+key, val)
162 self.post(path, success=202)
164 def set_account_quota(self, quota):
165 self.assert_account()
166 path = path4url(self.account)+params4url({'update':None})
167 self.set_header('X-Account-Policy-Quota', quota)
168 self.post(path, success=202)
170 def set_account_versioning(self, versioning):
171 self.assert_account()
172 path = path4url(self.account)+params4url({'update':None})
173 self.set_header('X-Account-Policy-Versioning', versioning)
174 self.post(path, success=202)
176 def get_container_versioning(self, container):
177 return filter_in(self.get_container_info(container), 'X-Container-Policy-Versioning')
179 def get_container_quota(self, container):
180 return filter_in(self.get_container_info(container), 'X-Container-Policy-Quota')
182 def get_container_meta(self, container):
183 return filter_in(self.get_container_info(container), 'X-Container-Meta-')
185 def get_container_object_meta(self, container):
186 return filter_in(self.get_container_info(container), 'X-Container-Object-Meta')
188 def set_container_meta(self, metapairs):
189 assert(type(metapairs) is dict)
190 self.assert_container()
191 path=path4url(self.account, self.container)+params4url({'update':None})
192 for key, val in metapairs.items():
193 self.set_header('X-Container-Meta-'+key, val)
194 self.post(path, success=202)
196 def delete_container_meta(self, metakey):
197 headers = self.get_container_info(self.container)
198 self.headers = filter_out(headers, 'x-container-meta-'+metakey, exactMatch = True)
199 if len(self.headers) == len(headers):
200 raise ClientError('X-Container-Meta-%s not found' % metakey, 404)
201 path = path4url(self.account, self.container)
202 self.post(path, success = 202)
204 def replace_container_meta(self, metapairs):
205 self.assert_container()
206 path=path4url(self.account, self.container)
207 for key, val in metapairs.items():
208 self.set_header('X-Container-Meta-'+key, val)
209 self.post(path, success=202)
211 def set_container_quota(self, quota):
212 self.assert_container()
213 path = path4url(self.account, self.container)+params4url({'update':None})
214 self.set_header('X-Container-Policy-Quota', quota)
215 self.post(path, success=202)
217 def set_container_versioning(self, versioning):
218 self.assert_container()
219 path = path4url(self.account, self.container)+params4url({'update':None})
220 self.set_header('X-Container-Policy-Versioning', versioning)
221 self.post(path, success=202)
223 def set_object_meta(self, object, metapairs):
224 assert(type(metapairs) is dict)
225 self.assert_container()
226 path=path4url(self.account, self.container, object)+params4url({'update':None})
227 for key, val in metapairs.items():
228 self.set_header('X-Object-Meta-'+key, val)
229 self.post(path, success=202)
231 def delete_object_meta(self, metakey, object):
232 self.assert_container()
233 self.set_header('X-Object-Meta-'+metakey, '')
234 path = path4url(self.account, self.container, object)+params4url({'update':None})
235 self.post(path, success=202)
237 def publish_object(self, object):
238 self.assert_container()
239 path = path4url(self.account, self.container, object)+params4url({'update':None})
240 self.set_header('X-Object-Public', True)
241 self.post(path, success=202)
243 def unpublish_object(self, object):
244 self.assert_container()
245 path = path4url(self.account, self.container, object)+params4url({'update':None})
246 self.set_header('X-Object-Public', False)
247 self.post(path, success=202)
249 def get_object_sharing(self, object):
250 return filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
252 def set_object_sharing(self, object, read_permition = False, write_permition = False):
253 """Give read/write permisions to an object.
254 @param object is the object to change sharing permitions onto
255 @param read_permition is a list of users and user groups that get read permition for this object
256 False means all previous read permitions will be removed
257 @param write_perimition is a list of users and user groups to get write permition for this object
258 False means all previous read permitions will be removed
260 self.assert_container()
265 for rperm in read_permition:
266 perms = perms + dlm + rperm
270 perms = 'write=' if not read_permition else perms + ';write='
271 for wperm in write_permition:
272 perms = perms + dlm + wperm
274 path = path4url(self.account, self.container, object)+params4url({'update':None})
275 self.set_header('X-Object-Sharing', perms)
276 self.post(path, success=(202, 204))
278 def del_object_sharing(self, object):
279 self.set_object_sharing(object)
281 def append_object(self, object, source_file, upload_cb = None):
282 """@poaram upload_db is a generator for showing progress of upload
283 to caller application, e.g. a progress bar. Its next is called
284 whenever a block is uploaded
286 self.assert_container()
287 meta = self.get_container_info(self.container)
288 blocksize = int(meta['x-container-block-size'])
289 filesize = os.fstat(source_file.fileno()).st_size
290 nblocks = 1 + (filesize - 1)//blocksize
292 self.set_header('Content-Range', 'bytes */*')
293 self.set_header('Content-Type', 'application/octet-stream')
294 path=path4url(self.account, self.container, object)+params4url({'update':None})
295 if upload_cb is not None:
296 upload_gen = upload_cb(nblocks)
297 for i in range(nblocks):
298 block = source_file.read(min(blocksize, filesize - offset))
300 self.set_header('Content-Length', len(block))
301 self.post(path, data=block, success=(202, 204))
302 if upload_cb is not None:
305 def truncate_object(self, object, upto_bytes):
306 self.assert_container()
307 self.set_header('Content-Range', 'bytes 0-%s/*'%upto_bytes)
308 self.set_header('Content-Type', 'application/octet-stream')
309 self.set_header('X-Object-Bytes', upto_bytes)
310 self.set_header('X-Source-Object', path4url(self.container, object))
311 path=path4url(self.account, self.container, object)+params4url({'update':None})
312 self.post(path, success=(202, 204))
314 def overwrite_object(self, object, start, end, source_file, upload_cb=None):
315 """Overwrite a part of an object with given source file
316 @start the part of the remote object to start overwriting from, in bytes
317 @end the part of the remote object to stop overwriting to, in bytes
319 self.assert_container()
320 meta = self.get_container_info(self.container)
321 blocksize = int(meta['x-container-block-size'])
322 filesize = os.fstat(source_file.fileno()).st_size
323 datasize = int(end) - int(start) + 1
324 nblocks = 1 + (datasize - 1)//blocksize
326 self.set_header('Content-Range', 'bytes %s-%s/*' % (start, end) )
327 self.set_header('Content-Type', 'application/octet-stream')
328 path=path4url(self.account, self.container, object)+params4url({'update':None})
329 if upload_cb is not None:
330 upload_gen = upload_cb(nblocks)
331 for i in range(nblocks):
332 block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
334 self.set_header('Content-Length', len(block))
335 self.post(path, data=block, success=(202, 204))
336 if upload_cb is not None: