1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
36 # Monkey-patch everything for gevent early on
37 gevent.monkey.patch_all()
39 import hashlib, os, gevent.pool
43 from .storage import StorageClient, ClientError
44 from .utils import path4url, prefix_keys, filter_in, filter_out, list2str
47 def pithos_hash(block, blockhash):
48 h = hashlib.new(blockhash)
49 h.update(block.rstrip('\x00'))
52 class PithosClient(StorageClient):
53 """GRNet Pithos API client"""
55 def __init__(self, base_url, token, account=None, container = None):
56 super(PithosClient, self).__init__(base_url, token, account = account, container = container)
57 self.async_pool = None
59 def account_head(self, until = None,
60 if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
61 """ Full Pithos+ HEAD at account level
62 --- request parameters ---
63 @param until (string): optional timestamp
64 --- --- optional request headers ---
65 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
66 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
69 path = path4url(self.account)
71 self.set_param('until', until, iff = until is not None)
72 self.set_header('If-Modified-Since', if_modified_since)
73 self.set_header('If-Unmodified-Since', if_unmodified_since)
75 success = kwargs.pop('success', 204)
76 return self.head(path, *args, success=success, **kwargs)
78 def account_get(self, limit=None, marker=None, format='json', show_only_shared=False, until=None,
79 if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
80 """ Full Pithos+ GET at account level
81 --- request parameters ---
82 @param limit (integer): The amount of results requested (server will use default value if None)
83 @param marker (string): Return containers with name lexicographically after marker
84 @param format (string): reply format can be json or xml (default: json)
85 @param shared (bool): If true, only shared containers will be included in results
86 @param until (string): optional timestamp
87 --- --- optional request headers ---
88 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
89 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
93 self.set_param('format',format, iff = format is not None)
94 self.set_param('limit',limit, iff = limit is not None)
95 self.set_param('marker',marker, iff = marker is not None)
96 self.set_param('shared', iff = show_only_shared)
97 self.set_param('until',until, iff = until is not None)
99 self.set_header('If-Modified-Since', if_modified_since)
100 self.set_header('If-Unmodified-Since', if_unmodified_since)
102 path = path4url(self.account)
103 success = kwargs.pop('success', (200, 204))
104 return self.get(path, *args, success = success, **kwargs)
106 def account_post(self, update=True,
107 groups={}, metadata=None, quota=None, versioning=None, *args, **kwargs):
108 """ Full Pithos+ POST at account level
109 --- request parameters ---
110 @param update (bool): if True, Do not replace metadata/groups
111 --- request headers ---
112 @groups (dict): Optional user defined groups in the form
113 { 'group1':['user1', 'user2', ...],
114 'group2':['userA', 'userB', ...], ...
116 @metadata (dict): Optional user defined metadata in the form
118 'name2': 'value2', ...
120 @param quota(integer): If supported, sets the Account quota
121 @param versioning(string): If supported, sets the Account versioning
122 to 'auto' or some other supported versioning string
124 self.assert_account()
126 self.set_param('update', iff = update)
128 for group, usernames in groups.items():
131 for user in usernames:
132 userstr = userstr + dlm + user
134 self.set_header('X-Account-Group-'+group, userstr)
135 if metadata is not None:
136 for metaname, metaval in metadata.items():
137 self.set_header('X-Account-Meta-'+metaname, metaval)
138 self.set_header('X-Account-Policy-Quota', quota)
139 self.set_header('X-Account-Policy-Versioning', versioning)
141 path = path4url(self.account)
142 success = kwargs.pop('success', 202)
143 return self.post(path, *args, success=success, **kwargs)
145 def container_head(self, until=None,
146 if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
147 """ Full Pithos+ HEAD at container level
148 --- request params ---
149 @param until (string): optional timestamp
150 --- optional request headers ---
151 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
152 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
154 self.assert_container()
156 self.set_param('until', until, iff=until is not None)
158 self.set_header('If-Modified-Since', if_modified_since)
159 self.set_header('If-Unmodified-Since', if_unmodified_since)
161 path = path4url(self.account, self.container)
162 success = kwargs.pop('success', 204)
163 return self.head(path, *args, success=success, **kwargs)
165 def container_get(self, limit = None, marker = None, prefix=None, delimiter=None, path = None,
166 format='json', meta=[], show_only_shared=False, until=None,
167 if_modified_since=None, if_unmodified_since=None, *args, **kwargs):
168 """ Full Pithos+ GET at container level
169 --- request parameters ---
170 @param limit (integer): The amount of results requested (server qill use default value if None)
171 @param marker (string): Return containers with name lexicographically after marker
172 @param prefix (string): Return objects starting with prefix
173 @param delimiter (string): Return objects up to the delimiter
174 @param path (string): assume prefix = path and delimiter = / (overwrites prefix
176 @param format (string): reply format can be json or xml (default: json)
177 @param meta (list): Return objects that satisfy the key queries in the specified
178 comma separated list (use <key>, !<key> for existence queries, <key><op><value>
179 for value queries, where <op> can be one of =, !=, <=, >=, <, >)
180 @param shared (bool): If true, only shared containers will be included in results
181 @param until (string): optional timestamp
182 --- --- optional request headers ---
183 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
184 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
186 self.assert_container()
188 self.set_param('format', format, iff=format is not None)
189 self.set_param('limit', limit, iff=limit is not None)
190 self.set_param('marker', marker, iff=marker is not None)
192 self.set_param('prefix', prefix, iff=prefix is not None)
193 self.set_param('delimiter', delimiter, iff=delimiter is not None)
195 self.set_param('path', path)
196 self.set_param('shared', iff=show_only_shared)
197 self.set_param('meta', list2str(meta), iff=meta is not None and len(meta) > 0)
198 self.set_param('until', until, iff=until is not None)
200 self.set_header('If-Modified-Since', if_modified_since)
201 self.set_header('If-Unmodified-Since', if_unmodified_since)
203 path = path4url(self.account, self.container)
204 success = kwargs.pop('success', 200)
205 return self.get(path, *args, success=success, **kwargs)
207 def container_put(self, quota=None, versioning=None, metadata=None, *args, **kwargs):
208 """ Full Pithos+ PUT at container level
209 --- request headers ---
210 @param quota (integer): Size limit in KB
211 @param versioning (string): 'auto' or other string supported by server
212 @metadata (dict): Optional user defined metadata in the form
214 'name2': 'value2', ...
217 self.assert_container()
219 if metadata is not None:
220 for metaname, metaval in metadata.items():
221 self.set_header('X-Container-Meta-'+metaname, metaval)
222 self.set_header('X-Container-Policy-Quota', quota)
223 self.set_header('X-Container-Policy-Versioning', versioning)
225 path = path4url(self.account, self.container)
226 success = kwargs.pop('success',(201, 202))
227 return self.put(path, *args, success=success, **kwargs)
229 def container_post(self, update=True, format='json',
230 quota=None, versioning=None, metadata=None, content_type=None, content_length=None,
231 transfer_encoding=None, *args, **kwargs):
232 """ Full Pithos+ POST at container level
233 --- request params ---
234 @param update (bool): if True, Do not replace metadata/groups
235 @param format(string): json (default) or xml
236 --- request headers ---
237 @param quota (integer): Size limit in KB
238 @param versioning (string): 'auto' or other string supported by server
239 @metadata (dict): Optional user defined metadata in the form
241 'name2': 'value2', ...
243 @param content_type (string): set a custom content type
244 @param content_length (string): set a custrom content length
245 @param transfer_encoding (string): set a custrom transfer encoding
247 self.assert_container()
249 self.set_param('format', format, iff=format is not None)
250 self.set_param('update', iff=update)
252 if metadata is not None:
253 for metaname, metaval in metadata.items():
254 self.set_header('X-Container-Meta-'+metaname, metaval)
255 self.set_header('X-Container-Policy-Quota', quota)
256 self.set_header('X-Container-Policy-Versioning', versioning)
257 self.set_header('Content-Type', content_type)
258 self.set_header('Content-Length', content_length)
259 self.set_header('Transfer-Encoding', transfer_encoding)
261 path = path4url(self.account, self.container)
262 success = kwargs.pop('success', 202)
263 return self.post(path, *args, success=success, **kwargs)
265 def container_delete(self, until=None, delimiter=None, *args, **kwargs):
266 """ Full Pithos+ DELETE at container level
267 --- request parameters ---
268 @param until (timestamp string): if defined, container is purged up to that time
270 self.assert_container()
272 self.set_param('until', until, iff=until is not None)
273 self.set_param('delimiter', delimiter, iff=delimiter is not None)
275 path=path4url(self.account, self.container)
276 success = kwargs.pop('success', 204)
277 return self.delete(path, success=success)
279 def object_head(self, object, version=None, if_etag_match=None, if_etag_not_match = None,
280 if_modified_since = None, if_unmodified_since = None, *args, **kwargs):
281 """ Full Pithos+ HEAD at object level
282 --- request parameters ---
283 @param version (string): optional version identified
284 --- request headers ---
285 @param if_etag_match (string): if provided, return only results
286 with etag matching with this
287 @param if_etag_not_match (string): if provided, return only results
288 with etag not matching with this
289 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
290 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
292 self.assert_container()
294 self.set_param('version', version, iff=version is not None)
296 self.set_header('If-Match', if_etag_match)
297 self.set_header('If-None-Match', if_etag_not_match)
298 self.set_header('If-Modified-Since', if_modified_since)
299 self.set_header('If-Unmodified-Since', if_unmodified_since)
301 path=path4url(self.account, self.container, object)
302 success = kwargs.pop('success', 200)
303 return self.head(path, *args, success=success, **kwargs)
305 def object_get(self, object, format='json', hashmap=False, version=None,
306 data_range=None, if_range=False, if_etag_match=None, if_etag_not_match = None,
307 if_modified_since = None, if_unmodified_since = None, *args, **kwargs):
308 """ Full Pithos+ GET at object level
309 --- request parameters ---
310 @param format (string): json (default) or xml
311 @param hashmap (bool): Optional request for hashmap
312 @param version (string): optional version identified
313 --- request headers ---
314 @param data_range (string): Optional range of data to retrieve
315 @param if_range (bool):
316 @param if_etag_match (string): if provided, return only results
317 with etag matching with this
318 @param if_etag_not_match (string): if provided, return only results
319 with etag not matching with this
320 @param if_modified_since (string): Retrieve if account has changed since provided timestamp
321 @param if_unmodified_since (string): Retrieve if account has not changed since provided timestamp
323 self.assert_container()
325 self.set_param('format', format, iff=format is not None)
326 self.set_param('version', version, iff=version is not None)
327 self.set_param('hashmap', hashmap, iff=hashmap)
329 self.set_header('Range', data_range)
330 self.set_header('If-Range', '', if_range is True and data_range is not None)
331 self.set_header('If-Match', if_etag_match, )
332 self.set_header('If-None-Match', if_etag_not_match)
333 self.set_header('If-Modified-Since', if_modified_since)
334 self.set_header('If-Unmodified-Since', if_unmodified_since)
336 path=path4url(self.account, self.container, object)
337 success = kwargs.pop('success', 200)
338 return self.get(path, *args, success=success, **kwargs)
340 def object_put(self, object, format='json', hashmap=False, delimiter = None, if_etag_match=None,
341 if_etag_not_match = None, etag=None, content_length = None, content_type=None,
342 transfer_encoding=None, copy_from=None, move_from=None, source_account=None,
343 source_version=None, content_encoding = None, content_disposition=None, manifest = None,
344 permitions =None, public = None, metadata=None, *args, **kwargs):
345 """ Full Pithos+ PUT at object level
346 --- request parameters ---
347 @param format (string): json (default) or xml
348 @param hashmap (bool): Optional hashmap provided instead of data
349 --- request headers ---
350 @param if_etag_match (string): if provided, return only results
351 with etag matching with this
352 @param if_etag_not_match (string): if provided, return only results
353 with etag not matching with this
354 @param etag (string): The MD5 hash of the object (optional to check written data)
355 @param content_length (integer): The size of the data written
356 @param content_type (string): The MIME content type of the object
357 @param transfer_encoding (string): Set to chunked to specify incremental uploading (if used, Content-Length is ignored)
358 @param copy_from (string): The source path in the form /<container>/<object>
359 @param move_from (string): The source path in the form /<container>/<object>
360 @param source_account (string): The source account to copy/move from
361 @param source_version (string): The source version to copy from
362 @param conent_encoding (string): The encoding of the object
363 @param content_disposition (string): The presentation style of the object
364 @param manifest (string): Object parts prefix in /<container>/<object> form
365 @param permitions (dict): Object permissions in the form (all fields are optional)
366 {'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
367 @param public (bool): If true, Object is publicly accessible, if false, not
368 @param metadata (dict): Optional user defined metadata in the form
369 {'meta-key-1':'meta-value-1', 'meta-key-2':'meta-value-2', ...}
371 self.assert_container()
373 self.set_param('format', format, iff=format is not None)
374 self.set_param('hashmap', hashmap, iff=hashmap)
375 self.set_param('delimiter', delimiter, iff=delimiter is not None)
377 self.set_header('If-Match', if_etag_match)
378 self.set_header('If-None-Match', if_etag_not_match)
379 self.set_header('ETag', etag)
380 self.set_header('Content-Length', content_length)
381 self.set_header('Content-Type', content_type)
382 self.set_header('Transfer-Encoding', transfer_encoding)
383 self.set_header('X-Copy-From', copy_from)
384 self.set_header('X-Move-From', move_from)
385 self.set_header('X-Source-Account', source_account)
386 self.set_header('X-Source-Version', source_version)
387 self.set_header('Content-Encoding', content_encoding)
388 self.set_header('Content-Disposition', content_disposition)
389 self.set_header('X-Object-Manifest', manifest)
391 if permitions is not None:
392 for permition_type, permition_list in permitions.items():
394 perms = '' #Remove permitions
395 if len(permition_list) == 0:
397 perms += ';'+permition_type if len(perms) > 0 else permition_type
398 perms += '='+list2str(permition_list, seperator=',')
399 self.set_header('X-Object-Sharing', perms)
400 self.set_header('X-Object-Public', public)
401 if metadata is not None:
402 for key, val in metadata.items():
403 self.set_header('X-Object-Meta-'+key, val)
405 path=path4url(self.account, self.container, object)
406 success = kwargs.pop('success', 201)
407 return self.put(path, *args, success=success, **kwargs)
409 def object_copy(self, object, destination, format='json', ignore_content_type=False,
410 if_etag_match=None, if_etag_not_match=None, destination_account=None,
411 content_type=None, content_encoding=None, content_disposition=None, source_version=None,
412 permitions=None, public=False, metadata=None, *args, **kwargs):
413 """ Full Pithos+ COPY at object level
414 --- request parameters ---
415 @param format (string): json (default) or xml
416 @param ignore_content_type (bool): Ignore the supplied Content-Type
417 --- request headers ---
418 @param if_etag_match (string): if provided, copy only results
419 with etag matching with this
420 @param if_etag_not_match (string): if provided, copy only results
421 with etag not matching with this
422 @param destination (string): The destination path in the form /<container>/<object>
423 @param destination_account (string): The destination account to copy to
424 @param content_type (string): The MIME content type of the object
425 @param content_encoding (string): The encoding of the object
426 @param content_disposition (string): The presentation style of the object
427 @param source_version (string): The source version to copy from
428 @param permitions (dict): Object permissions in the form (all fields are optional)
429 {'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
430 permitions override source permitions, removing any old permitions
431 @param public (bool): If true, Object is publicly accessible, if else, not
432 @param metadata (dict): Optional user defined metadata in the form
433 {'meta-key-1':'meta-value-1', 'meta-key-2':'meta-value-2', ...}
434 Metadata are appended to the source metadata. In case of same keys, they
435 replace the old metadata
437 self.assert_container()
439 self.set_param('format', format, iff=format is not None)
440 self.set_param('ignore_content_type', iff=ignore_content_type)
442 self.set_header('If-Match', if_etag_match)
443 self.set_header('If-None-Match', if_etag_not_match)
444 self.set_header('Destination', destination)
445 self.set_header('Destination-Account', destination_account)
446 self.set_header('Content-Type', content_type)
447 self.set_header('Content-Encoding', content_encoding)
448 self.set_header('Content-Disposition', content_disposition)
449 self.set_header('X-Source-Version', source_version)
451 if permitions is not None:
452 for permition_type, permition_list in permitions.items():
454 perms = '' #Remove permitions
455 if len(permition_list) == 0:
457 perms += ';'+permition_type if len(perms) > 0 else permition_type
458 perms += '='+list2str(permition_list, seperator=',')
459 self.set_header('X-Object-Sharing', perms)
460 self.set_header('X-Object-Public', public)
461 if metadata is not None:
462 for key, val in metadata.items():
463 self.set_header('X-Object-Meta-'+key, val)
465 path = path4url(self.account, self.container, object)
466 success = kwargs.pop('success', 201)
467 return self.copy(path, *args, success=success, **kwargs)
469 def object_move(self, object, format='json', ignore_content_type=False,
470 if_etag_match=None, if_etag_not_match=None, destination=None, destination_account=None,
471 content_type=None, content_encoding=None, content_disposition=None, permitions={},
472 public=False, metadata={}, *args, **kwargs):
473 """ Full Pithos+ COPY at object level
474 --- request parameters ---
475 @param format (string): json (default) or xml
476 @param ignore_content_type (bool): Ignore the supplied Content-Type
477 --- request headers ---
478 @param if_etag_match (string): if provided, return only results
479 with etag matching with this
480 @param if_etag_not_match (string): if provided, return only results
481 with etag not matching with this
482 @param destination (string): The destination path in the form /<container>/<object>
483 @param destination_account (string): The destination account to copy to
484 @param content_type (string): The MIME content type of the object
485 @param content_encoding (string): The encoding of the object
486 @param content_disposition (string): The presentation style of the object
487 @param source_version (string): The source version to copy from
488 @param permitions (dict): Object permissions in the form (all fields are optional)
489 {'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
490 @param public (bool): If true, Object is publicly accessible, if false, not
491 @param metadata (dict): Optional user defined metadata in the form
492 {'meta-key-1':'meta-value-1', 'meta-key-2':'meta-value-2', ...}
494 self.assert_container()
496 self.set_param('format', format, iff=format is not None)
497 self.set_param('ignore_content_type', iff=ignore_content_type)
499 self.set_header('If-Match', if_etag_match)
500 self.set_header('If-None-Match', if_etag_not_match)
501 self.set_header('Destination', destination)
502 self.set_header('Destination-Account', destination_account)
503 self.set_header('Content-Type', content_type)
504 self.set_header('Content-Encoding', content_encoding)
505 self.set_header('Content-Disposition', content_disposition)
507 for permition_type, permition_list in permitions.items():
509 perms = '' #Remove permitions
510 if len(permition_list) == 0:
512 perms += ';'+permition_type if len(perms) > 0 else permition_type
513 perms += '='+list2str(permition_list, seperator=',')
514 self.set_header('X-Object-Sharing', perms)
515 self.set_header('X-Object-Public', public)
516 for key, val in metadata.items():
517 self.set_header('X-Object-Meta-'+key, val)
519 path = path4url(self.account, self.container, object)
520 success = kwargs.pop('success', 201)
521 return self.move(path, *args, success=success, **kwargs)
523 def object_post(self, object, format='json', update=True,
524 if_etag_match=None, if_etag_not_match=None, content_length=None, content_type=None,
525 content_range=None, transfer_encoding=None, content_encoding=None, content_disposition=None,
526 source_object=None, source_account=None, source_version=None, object_bytes=None,
527 manifest=None, permitions={}, public=False, metadata={}, *args, **kwargs):
528 """ Full Pithos+ POST at object level
529 --- request parameters ---
530 @param format (string): json (default) or xml
531 @param update (bool): Do not replace metadata
532 --- request headers ---
533 @param if_etag_match (string): if provided, return only results
534 with etag matching with this
535 @param if_etag_not_match (string): if provided, return only results
536 with etag not matching with this
537 @param content_length (string): The size of the data written
538 @param content_type (string): The MIME content type of the object
539 @param content_range (string): The range of data supplied
540 @param transfer_encoding (string): Set to chunked to specify incremental uploading
541 (if used, Content-Length is ignored)
542 @param content_encoding (string): The encoding of the object
543 @param content_disposition (string): The presentation style of the object
544 @param source_object (string): Update with data from the object at path /<container>/<object>
545 @param source_account (string): The source account to update from
546 @param source_version (string): The source version to copy from
547 @param object_bytes (integer): The updated objects final size
548 @param manifest (string): Object parts prefix in /<container>/<object> form
549 @param permitions (dict): Object permissions in the form (all fields are optional)
550 {'read':[user1, group1, user2, ...], 'write':['user3, group2, group3, ...]}
551 @param public (bool): If true, Object is publicly accessible, if false, not
552 @param metadata (dict): Optional user defined metadata in the form
553 {'meta-key-1':'meta-value-1', 'meta-key-2':'meta-value-2', ...}
555 self.assert_container()
557 self.set_param('format', format, iff=format is not None)
558 self.set_param('update', iff = update)
560 self.set_header('If-Match', if_etag_match)
561 self.set_header('If-None-Match', if_etag_not_match)
562 self.set_header('Content-Length', content_length, iff=transfer_encoding is None)
563 self.set_header('Content-Type', content_type)
564 self.set_header('Content-Range', content_range)
565 self.set_header('Transfer-Encoding', transfer_encoding)
566 self.set_header('Content-Encoding', content_encoding)
567 self.set_header('Content-Disposition', content_disposition)
568 self.set_header('X-Source-Object', source_object)
569 self.set_header('X-Source-Account', source_account)
570 self.set_header('X-Source-Version', source_version)
571 self.set_header('X-Object-Bytes', object_bytes)
572 self.set_header('X-Object-Manifest', manifest)
574 for permition_type, permition_list in permitions.items():
576 perms = '' #Remove permitions
577 if len(permition_list) == 0:
579 perms += ';'+permition_type if len(perms) > 0 else permition_type
580 perms += '='+list2str(permition_list, seperator=',')
581 self.set_header('X-Object-Sharing', perms)
582 self.set_header('X-Object-Public', public)
583 for key, val in metadata.items():
584 self.set_header('X-Object-Meta-'+key, val)
586 path = path4url(self.account, self.container, object)
587 success=kwargs.pop('success', (202, 204))
588 return self.post(path, *args, success=success, **kwargs)
590 def object_delete(self, object, until=None, delimiter=None, *args, **kwargs):
591 """ Full Pithos+ DELETE at object level
592 --- request parameters ---
593 @param until (string): Optional timestamp
595 self.assert_container()
597 self.set_param('until', until, iff=until is not None)
598 self.set_param('delimiter', delimiter, iff=delimiter is not None)
600 path = path4url(self.account, self.container, object)
601 success = kwargs.pop('success', 204)
602 return self.delete(path, *args, success=success, **kwargs)
604 def purge_container(self):
605 self.container_delete(until=unicode(time()))
607 def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
608 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
610 # This is a naive implementation, it loads the whole file in memory
611 #Look in pithos for a nice implementation
612 self.assert_container()
618 data = json.dumps(json.loads(data))
620 raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
622 raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
623 from StringIO import StringIO
625 data = f.read(size) if size is not None else f.read()
626 self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
627 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
628 public=public, success=201)
630 def put_block_async(self, data, hash):
631 class SilentGreenlet(gevent.Greenlet):
632 def _report_error(self, exc_info):
633 _stderr = sys._stderr
635 sys.stderr = StringIO()
636 gevent.Greenlet._report_error(self, exc_info)
640 if self.async_pool is None:
641 self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
642 g = SilentGreenlet(self.put_block, data, hash)
643 self.async_pool.start(g)
646 def put_block(self, data, hash):
647 r = self.container_post(update=True, content_type='application/octet-stream',
648 content_length=len(data), data=data, format='json')
649 assert r.json[0] == hash, 'Local hash does not match server'
652 def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
653 content_disposition=None, content_type=None, sharing=None, public=None):
654 self.assert_container()
655 obj_content_type = 'application/octet-stream' if content_type is None else content_type
656 self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
657 content_disposition=content_disposition, content_type=content_type, permitions=sharing,
658 public=public, manifest='%s/%s'%(self.container,obj))
660 def upload_object(self, object, f, size=None, hash_cb=None, upload_cb=None, etag=None,
661 content_encoding=None, content_disposition=None, content_type=None, sharing=None,
663 """upload_object chunk by chunk. Different chunks are uploaded asynchronously
664 in a pseudo-parallel fashion (using greenlets)
666 self.assert_container()
668 meta = self.get_container_info()
669 blocksize = int(meta['x-container-block-size'])
670 blockhash = meta['x-container-block-hash']
672 size = size if size is not None else os.fstat(f.fileno()).st_size
673 nblocks = 1 + (size - 1) // blocksize
680 hash_gen = hash_cb(nblocks)
683 for i in range(nblocks):
684 block = f.read(min(blocksize, size - offset))
686 hash = pithos_hash(block, blockhash)
688 map[hash] = (offset, bytes)
693 assert offset == size
695 obj_content_type = 'application/octet-stream' if content_type is None else content_type
697 hashmap = dict(bytes=size, hashes=hashes)
698 r = self.object_put(object, format='json', hashmap=True, content_type=obj_content_type,
699 json=hashmap, etag=etag, content_encoding=content_encoding,
700 content_disposition=content_disposition, permitions=sharing, public=public,
703 if r.status_code == 201:
709 upload_gen = upload_cb(len(missing))
715 offset, bytes = map[hash]
718 r = self.put_block_async(data, hash)
727 flying = [r for r in flying if not r.ready()]
729 gevent.joinall(flying)
730 self.object_put(object, format='json', hashmap=True, content_type=obj_content_type,
731 json=hashmap, success=201)
733 def download_object(self, obj, f, download_cb=None, version=None, overide=False, range=None,
734 if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
735 """overide is forcing the local file to become exactly as the remote, even if it is
736 substantialy different
739 self.assert_container()
741 #retrieve object hashmap
742 hashmap = self.get_object_hashmap(obj, version=version, if_match=if_match,
743 if_none_match=if_none_match, if_modified_since=if_modified_since,
744 if_unmodified_since=if_unmodified_since)
745 blocksize = int(hashmap['block_size'])
746 blockhash = hashmap['block_hash']
747 total_size = hashmap['bytes']
748 map = hashmap['hashes']
752 download_bars = len(map)
754 if range is not None:
756 (custom_start, custom_end) = range.split('-')
757 (custom_start, custom_end) = (int(custom_start), int(custom_end))
759 raise ClientError(message='Invalid range string', status=601)
760 if custom_start > custom_end or custom_start < 0:
761 raise ClientError(message='Negative range', status=601)
762 elif custom_start == custom_end:
764 elif custom_end > total_size:
765 raise ClientError(message='Range exceeds file size', status=601)
770 if download_cb is not None:
771 download_gen = download_cb(total_size/blocksize + 1)
774 #load local file existing hashmap
778 if path.exists(f.name):
779 from binascii import hexlify
780 from .pithos_sh_lib.hashmap import HashMap
781 h = HashMap(blocksize, blockhash)
782 with_progress_bar = False if download_cb is None else True
783 h.load(f, with_progress_bar)
784 for i, x in enumerate(h):
785 existing_hash = hexlify(x)
786 if existing_hash in map_dict:
787 hash_dict[existing_hash] = i
791 raise ClientError(message='Local file is substantialy different',
795 for i, h in enumerate(map):
796 if not f.isatty() and h in hash_dict:
798 if download_cb is not None:
801 if range is not None:
802 if start < custom_start:
804 elif start > custom_end:
806 end = start + blocksize -1 if start+blocksize < total_size else total_size -1
807 if range is not None and end > custom_end:
809 data_range = 'bytes=%s-%s'%(start, end)
810 data = self.object_get(obj, data_range=data_range, success=(200, 206), version=version,
811 if_etag_match=if_match, if_etag_not_match=if_none_match,
812 if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
815 f.write(data.content)
816 #f.write(data.text.encode('utf-8'))
818 if overide and not f.isatty():
819 f.truncate(total_size)
821 def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
822 if_modified_since=None, if_unmodified_since=None):
824 r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
825 if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
826 if_unmodified_since=if_unmodified_since)
827 except ClientError as err:
828 if err.status == 304 or err.status == 412:
831 from json import loads
834 def set_account_group(self, group, usernames):
835 self.account_post(update=True, groups = {group:usernames})
837 def del_account_group(self, group):
838 return self.account_post(update=True, groups={group:[]})
840 def get_account_info(self, until=None):
841 from datetime import datetime
842 r = self.account_head(until=until)
843 if r.status_code == 401:
844 raise ClientError("No authorization")
847 def get_account_quota(self):
848 return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
850 def get_account_versioning(self):
851 return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
853 def get_account_meta(self, until=None):
854 return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
856 def get_account_group(self):
857 return filter_in(self.get_account_info(), 'X-Account-Group-')
859 def set_account_meta(self, metapairs):
860 assert(type(metapairs) is dict)
861 self.account_post(update=True, metadata=metapairs)
863 def del_account_meta(self, metakey):
864 self.account_post(update=True, metadata={metakey:''})
866 def set_account_quota(self, quota):
867 self.account_post(update=True, quota=quota)
869 def set_account_versioning(self, versioning):
870 self.account_post(update=True, versioning = versioning)
872 def list_containers(self):
873 r = self.account_get()
876 def del_container(self, until=None, delimiter=None):
877 self.assert_container()
878 r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
879 if r.status_code == 404:
880 raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
881 elif r.status_code == 409:
882 raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
884 def get_container_versioning(self, container):
885 self.container = container
886 return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
888 def get_container_quota(self, container):
889 self.container = container
890 return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
892 def get_container_info(self, until = None):
893 r = self.container_head(until=until)
896 def get_container_meta(self, until = None):
897 return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
899 def get_container_object_meta(self, until = None):
900 return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
902 def set_container_meta(self, metapairs):
903 assert(type(metapairs) is dict)
904 self.container_post(update=True, metadata=metapairs)
906 def del_container_meta(self, metakey):
907 self.container_post(update=True, metadata={metakey:''})
909 def set_container_quota(self, quota):
910 self.container_post(update=True, quota=quota)
912 def set_container_versioning(self, versioning):
913 self.container_post(update=True, versioning=versioning)
915 def del_object(self, obj, until=None, delimiter=None):
916 self.assert_container()
917 self.object_delete(obj, until=until, delimiter=delimiter)
919 def set_object_meta(self, object, metapairs):
920 assert(type(metapairs) is dict)
921 self.object_post(object, update=True, metadata=metapairs)
923 def del_object_meta(self, metakey, object):
924 self.object_post(object, update=True, metadata={metakey:''})
926 def publish_object(self, object):
927 self.object_post(object, update=True, public=True)
929 def unpublish_object(self, object):
930 self.object_post(object, update=True, public=False)
932 def get_object_info(self, obj, version=None):
933 r = self.object_head(obj, version=version)
936 def get_object_meta(self, obj, version=None):
937 return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
939 def get_object_sharing(self, object):
940 r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
943 perms = r['x-object-sharing'].split(';')
948 raise ClientError('Incorrect reply format')
949 (key, val) = perm.strip().split('=')
953 def set_object_sharing(self, object, read_permition = False, write_permition = False):
954 """Give read/write permisions to an object.
955 @param object is the object to change sharing permitions onto
956 @param read_permition is a list of users and user groups that get read permition for this object
957 False means all previous read permitions will be removed
958 @param write_perimition is a list of users and user groups to get write permition for this object
959 False means all previous read permitions will be removed
962 perms['read'] = read_permition if isinstance(read_permition, list) else ''
963 perms['write'] = write_permition if isinstance(write_permition, list) else ''
964 self.object_post(object, update=True, permitions=perms)
966 def del_object_sharing(self, object):
967 self.set_object_sharing(object)
969 def append_object(self, object, source_file, upload_cb = None):
970 """@param upload_db is a generator for showing progress of upload
971 to caller application, e.g. a progress bar. Its next is called
972 whenever a block is uploaded
974 self.assert_container()
975 meta = self.get_container_info()
976 blocksize = int(meta['x-container-block-size'])
977 filesize = os.fstat(source_file.fileno()).st_size
978 nblocks = 1 + (filesize - 1)//blocksize
980 if upload_cb is not None:
981 upload_gen = upload_cb(nblocks)
982 for i in range(nblocks):
983 block = source_file.read(min(blocksize, filesize - offset))
985 self.object_post(object, update=True, content_range='bytes */*',
986 content_type='application/octet-stream', content_length=len(block), data=block)
987 if upload_cb is not None:
990 def truncate_object(self, object, upto_bytes):
991 self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
992 content_type='application/octet-stream', object_bytes=upto_bytes,
993 source_object=path4url(self.container, object))
995 def overwrite_object(self, object, start, end, source_file, upload_cb=None):
996 """Overwrite a part of an object with given source file
997 @start the part of the remote object to start overwriting from, in bytes
998 @end the part of the remote object to stop overwriting to, in bytes
1000 self.assert_container()
1001 meta = self.get_container_info()
1002 blocksize = int(meta['x-container-block-size'])
1003 filesize = os.fstat(source_file.fileno()).st_size
1004 datasize = int(end) - int(start) + 1
1005 nblocks = 1 + (datasize - 1)//blocksize
1007 if upload_cb is not None:
1008 upload_gen = upload_cb(nblocks)
1009 for i in range(nblocks):
1010 block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
1011 offset += len(block)
1012 self.object_post(object, update=True, content_type='application/octet-stream',
1013 content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
1014 if upload_cb is not None:
1017 def copy_object(self, src_container, src_object, dst_container, dst_object=False,
1018 source_version = None, public=False, content_type=None, delimiter=None):
1019 self.assert_account()
1020 self.container = dst_container
1021 dst_object = dst_object or src_object
1022 src_path = path4url(src_container, src_object)
1023 self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
1024 source_version=source_version, public=public, content_type=content_type,
1025 delimiter=delimiter)
1027 def move_object(self, src_container, src_object, dst_container, dst_object=False,
1028 source_version = None, public=False, content_type=None, delimiter=None):
1029 self.assert_account()
1030 self.container = dst_container
1031 dst_object = dst_object or src_object
1032 src_path = path4url(src_container, src_object)
1033 self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
1034 source_version=source_version, public=public, content_type=content_type,
1035 delimiter=delimiter)