Modify upload methods to return created obj info
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash, upload_gen=None):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None):
200         meta = self.get_container_info()
201         blocksize = int(meta['x-container-block-size'])
202         blockhash = meta['x-container-block-hash']
203         size = size if size is not None else fstat(fileobj.fileno()).st_size
204         nblocks = 1 + (size - 1) // blocksize
205         return (blocksize, blockhash, size, nblocks)
206
207     def _create_or_get_missing_hashes(
208             self, obj, json,
209             size=None,
210             format='json',
211             hashmap=True,
212             content_type=None,
213             if_etag_match=None,
214             if_etag_not_match=None,
215             content_encoding=None,
216             content_disposition=None,
217             permissions=None,
218             public=None,
219             success=(201, 409)):
220         r = self.object_put(
221             obj,
222             format='json',
223             hashmap=True,
224             content_type=content_type,
225             json=json,
226             if_etag_match=if_etag_match,
227             if_etag_not_match=if_etag_not_match,
228             content_encoding=content_encoding,
229             content_disposition=content_disposition,
230             permissions=permissions,
231             public=public,
232             success=success)
233         return (None if r.status_code == 201 else r.json), r.headers
234
235     def _calculate_blocks_for_upload(
236             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
237             hash_cb=None):
238         offset = 0
239         if hash_cb:
240             hash_gen = hash_cb(nblocks)
241             hash_gen.next()
242
243         for i in range(nblocks):
244             block = fileobj.read(min(blocksize, size - offset))
245             bytes = len(block)
246             hash = _pithos_hash(block, blockhash)
247             hashes.append(hash)
248             hmap[hash] = (offset, bytes)
249             offset += bytes
250             if hash_cb:
251                 hash_gen.next()
252         msg = 'Failed to calculate uploaded blocks:'
253         ' Offset and object size do not match'
254         assert offset == size, msg
255
256     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
257         """upload missing blocks asynchronously"""
258
259         self._init_thread_limit()
260
261         flying = []
262         failures = []
263         for hash in missing:
264             offset, bytes = hmap[hash]
265             fileobj.seek(offset)
266             data = fileobj.read(bytes)
267             r = self._put_block_async(data, hash, upload_gen)
268             flying.append(r)
269             unfinished = self._watch_thread_limit(flying)
270             for thread in set(flying).difference(unfinished):
271                 if thread.exception:
272                     failures.append(thread)
273                     if isinstance(
274                             thread.exception,
275                             ClientError) and thread.exception.status == 502:
276                         self.POOLSIZE = self._thread_limit
277                 elif thread.isAlive():
278                     flying.append(thread)
279                 elif upload_gen:
280                     try:
281                         upload_gen.next()
282                     except:
283                         pass
284             flying = unfinished
285
286         for thread in flying:
287             thread.join()
288             if thread.exception:
289                 failures.append(thread)
290             elif upload_gen:
291                 try:
292                     upload_gen.next()
293                 except:
294                     pass
295
296         return [failure.kwargs['hash'] for failure in failures]
297
298     def upload_object(
299             self, obj, f,
300             size=None,
301             hash_cb=None,
302             upload_cb=None,
303             etag=None,
304             if_etag_match=None,
305             if_not_exist=None,
306             content_encoding=None,
307             content_disposition=None,
308             content_type=None,
309             sharing=None,
310             public=None):
311         """Upload an object using multiple connections (threads)
312
313         :param obj: (str) remote object path
314
315         :param f: open file descriptor (rb)
316
317         :param hash_cb: optional progress.bar object for calculating hashes
318
319         :param upload_cb: optional progress.bar object for uploading
320
321         :param etag: (str)
322
323         :param if_etag_match: (str) Push that value to if-match header at file
324             creation
325
326         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
327             it does not exist remotely, otherwise the operation will fail.
328             Involves the case of an object with the same path is created while
329             the object is being uploaded.
330
331         :param content_encoding: (str)
332
333         :param content_disposition: (str)
334
335         :param content_type: (str)
336
337         :param sharing: {'read':[user and/or grp names],
338             'write':[usr and/or grp names]}
339
340         :param public: (bool)
341         """
342         self._assert_container()
343
344         #init
345         block_info = (blocksize, blockhash, size, nblocks) =\
346             self._get_file_block_info(f, size)
347         (hashes, hmap, offset) = ([], {}, 0)
348         if not content_type:
349             content_type = 'application/octet-stream'
350
351         self._calculate_blocks_for_upload(
352             *block_info,
353             hashes=hashes,
354             hmap=hmap,
355             fileobj=f,
356             hash_cb=hash_cb)
357
358         hashmap = dict(bytes=size, hashes=hashes)
359         missing, obj_headers = self._create_or_get_missing_hashes(
360             obj, hashmap,
361             content_type=content_type,
362             size=size,
363             if_etag_match=if_etag_match,
364             if_etag_not_match='*' if if_not_exist else None,
365             content_encoding=content_encoding,
366             content_disposition=content_disposition,
367             permissions=sharing,
368             public=public)
369
370         if missing is None:
371             return obj_headers
372
373         if upload_cb:
374             upload_gen = upload_cb(len(missing))
375             for i in range(len(missing), len(hashmap['hashes']) + 1):
376                 try:
377                     upload_gen.next()
378                 except:
379                     upload_gen = None
380         else:
381             upload_gen = None
382
383         retries = 7
384         try:
385             while retries:
386                 sendlog.info('%s blocks missing' % len(missing))
387                 num_of_blocks = len(missing)
388                 missing = self._upload_missing_blocks(
389                     missing,
390                     hmap,
391                     f,
392                     upload_gen)
393                 if missing:
394                     if num_of_blocks == len(missing):
395                         retries -= 1
396                     else:
397                         num_of_blocks = len(missing)
398                 else:
399                     break
400             if missing:
401                 raise ClientError(
402                     '%s blocks failed to upload' % len(missing),
403                     status=800)
404         except KeyboardInterrupt:
405             sendlog.info('- - - wait for threads to finish')
406             for thread in activethreads():
407                 thread.join()
408             raise
409
410         r = self.object_put(
411             obj,
412             format='json',
413             hashmap=True,
414             content_type=content_type,
415             if_etag_match=if_etag_match,
416             if_etag_not_match='*' if if_not_exist else None,
417             etag=etag,
418             json=hashmap,
419             permissions=sharing,
420             public=public,
421             success=201)
422         return r.headers
423
424     # download_* auxiliary methods
425     def _get_remote_blocks_info(self, obj, **restargs):
426         #retrieve object hashmap
427         myrange = restargs.pop('data_range', None)
428         hashmap = self.get_object_hashmap(obj, **restargs)
429         restargs['data_range'] = myrange
430         blocksize = int(hashmap['block_size'])
431         blockhash = hashmap['block_hash']
432         total_size = hashmap['bytes']
433         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
434         map_dict = {}
435         for i, h in enumerate(hashmap['hashes']):
436             #  map_dict[h] = i   CHAGE
437             if h in map_dict:
438                 map_dict[h].append(i)
439             else:
440                 map_dict[h] = [i]
441         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
442
443     def _dump_blocks_sync(
444             self, obj, remote_hashes, blocksize, total_size, dst, range,
445             **args):
446         for blockid, blockhash in enumerate(remote_hashes):
447             if blockhash:
448                 start = blocksize * blockid
449                 is_last = start + blocksize > total_size
450                 end = (total_size - 1) if is_last else (start + blocksize - 1)
451                 (start, end) = _range_up(start, end, range)
452                 args['data_range'] = 'bytes=%s-%s' % (start, end)
453                 r = self.object_get(obj, success=(200, 206), **args)
454                 self._cb_next()
455                 dst.write(r.content)
456                 dst.flush()
457
458     def _get_block_async(self, obj, **args):
459         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
460         event.start()
461         return event
462
463     def _hash_from_file(self, fp, start, size, blockhash):
464         fp.seek(start)
465         block = fp.read(size)
466         h = newhashlib(blockhash)
467         h.update(block.strip('\x00'))
468         return hexlify(h.digest())
469
470     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
471         """write the results of a greenleted rest call to a file
472
473         :param offset: the offset of the file up to blocksize
474         - e.g. if the range is 10-100, all blocks will be written to
475         normal_position - 10
476         """
477         for i, (key, g) in enumerate(flying.items()):
478             if g.isAlive():
479                 continue
480             if g.exception:
481                 raise g.exception
482             block = g.value.content
483             for block_start in blockids[key]:
484                 local_file.seek(block_start + offset)
485                 local_file.write(block)
486                 self._cb_next()
487             flying.pop(key)
488             blockids.pop(key)
489         local_file.flush()
490
491     def _dump_blocks_async(
492             self, obj, remote_hashes, blocksize, total_size, local_file,
493             blockhash=None, resume=False, filerange=None, **restargs):
494         file_size = fstat(local_file.fileno()).st_size if resume else 0
495         flying = dict()
496         blockid_dict = dict()
497         offset = 0
498         if filerange is not None:
499             rstart = int(filerange.split('-')[0])
500             offset = rstart if blocksize > rstart else rstart % blocksize
501
502         self._init_thread_limit()
503         for block_hash, blockids in remote_hashes.items():
504             blockids = [blk * blocksize for blk in blockids]
505             unsaved = [blk for blk in blockids if not (
506                 blk < file_size and block_hash == self._hash_from_file(
507                         local_file, blk, blocksize, blockhash))]
508             self._cb_next(len(blockids) - len(unsaved))
509             if unsaved:
510                 key = unsaved[0]
511                 self._watch_thread_limit(flying.values())
512                 self._thread2file(
513                     flying, blockid_dict, local_file, offset,
514                     **restargs)
515                 end = total_size - 1 if key + blocksize > total_size\
516                     else key + blocksize - 1
517                 start, end = _range_up(key, end, filerange)
518                 if start == end:
519                     self._cb_next()
520                     continue
521                 restargs['async_headers'] = {
522                     'Range': 'bytes=%s-%s' % (start, end)}
523                 flying[key] = self._get_block_async(obj, **restargs)
524                 blockid_dict[key] = unsaved
525
526         for thread in flying.values():
527             thread.join()
528         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
529
530     def download_object(
531             self, obj, dst,
532             download_cb=None,
533             version=None,
534             resume=False,
535             range_str=None,
536             if_match=None,
537             if_none_match=None,
538             if_modified_since=None,
539             if_unmodified_since=None):
540         """Download an object (multiple connections, random blocks)
541
542         :param obj: (str) remote object path
543
544         :param dst: open file descriptor (wb+)
545
546         :param download_cb: optional progress.bar object for downloading
547
548         :param version: (str) file version
549
550         :param resume: (bool) if set, preserve already downloaded file parts
551
552         :param range_str: (str) from, to are file positions (int) in bytes
553
554         :param if_match: (str)
555
556         :param if_none_match: (str)
557
558         :param if_modified_since: (str) formated date
559
560         :param if_unmodified_since: (str) formated date"""
561         restargs = dict(
562             version=version,
563             data_range=None if range_str is None else 'bytes=%s' % range_str,
564             if_match=if_match,
565             if_none_match=if_none_match,
566             if_modified_since=if_modified_since,
567             if_unmodified_since=if_unmodified_since)
568
569         (
570             blocksize,
571             blockhash,
572             total_size,
573             hash_list,
574             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
575         assert total_size >= 0
576
577         if download_cb:
578             self.progress_bar_gen = download_cb(len(hash_list))
579             self._cb_next()
580
581         if dst.isatty():
582             self._dump_blocks_sync(
583                 obj,
584                 hash_list,
585                 blocksize,
586                 total_size,
587                 dst,
588                 range_str,
589                 **restargs)
590         else:
591             self._dump_blocks_async(
592                 obj,
593                 remote_hashes,
594                 blocksize,
595                 total_size,
596                 dst,
597                 blockhash,
598                 resume,
599                 range_str,
600                 **restargs)
601             if not range_str:
602                 dst.truncate(total_size)
603
604         self._complete_cb()
605
606     #Command Progress Bar method
607     def _cb_next(self, step=1):
608         if hasattr(self, 'progress_bar_gen'):
609             try:
610                 for i in xrange(step):
611                     self.progress_bar_gen.next()
612             except:
613                 pass
614
615     def _complete_cb(self):
616         while True:
617             try:
618                 self.progress_bar_gen.next()
619             except:
620                 break
621
622     def get_object_hashmap(
623             self, obj,
624             version=None,
625             if_match=None,
626             if_none_match=None,
627             if_modified_since=None,
628             if_unmodified_since=None,
629             data_range=None):
630         """
631         :param obj: (str) remote object path
632
633         :param if_match: (str)
634
635         :param if_none_match: (str)
636
637         :param if_modified_since: (str) formated date
638
639         :param if_unmodified_since: (str) formated date
640
641         :param data_range: (str) from-to where from and to are integers
642             denoting file positions in bytes
643
644         :returns: (list)
645         """
646         try:
647             r = self.object_get(
648                 obj,
649                 hashmap=True,
650                 version=version,
651                 if_etag_match=if_match,
652                 if_etag_not_match=if_none_match,
653                 if_modified_since=if_modified_since,
654                 if_unmodified_since=if_unmodified_since,
655                 data_range=data_range)
656         except ClientError as err:
657             if err.status == 304 or err.status == 412:
658                 return {}
659             raise
660         return r.json
661
662     def set_account_group(self, group, usernames):
663         """
664         :param group: (str)
665
666         :param usernames: (list)
667         """
668         self.account_post(update=True, groups={group: usernames})
669
670     def del_account_group(self, group):
671         """
672         :param group: (str)
673         """
674         self.account_post(update=True, groups={group: []})
675
676     def get_account_info(self, until=None):
677         """
678         :param until: (str) formated date
679
680         :returns: (dict)
681         """
682         r = self.account_head(until=until)
683         if r.status_code == 401:
684             raise ClientError("No authorization", status=401)
685         return r.headers
686
687     def get_account_quota(self):
688         """
689         :returns: (dict)
690         """
691         return filter_in(
692             self.get_account_info(),
693             'X-Account-Policy-Quota',
694             exactMatch=True)
695
696     def get_account_versioning(self):
697         """
698         :returns: (dict)
699         """
700         return filter_in(
701             self.get_account_info(),
702             'X-Account-Policy-Versioning',
703             exactMatch=True)
704
705     def get_account_meta(self, until=None):
706         """
707         :meta until: (str) formated date
708
709         :returns: (dict)
710         """
711         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
712
713     def get_account_group(self):
714         """
715         :returns: (dict)
716         """
717         return filter_in(self.get_account_info(), 'X-Account-Group-')
718
719     def set_account_meta(self, metapairs):
720         """
721         :param metapairs: (dict) {key1:val1, key2:val2, ...}
722         """
723         assert(type(metapairs) is dict)
724         self.account_post(update=True, metadata=metapairs)
725
726     def del_account_meta(self, metakey):
727         """
728         :param metakey: (str) metadatum key
729         """
730         self.account_post(update=True, metadata={metakey: ''})
731
732     """
733     def set_account_quota(self, quota):
734         ""
735         :param quota: (int)
736         ""
737         self.account_post(update=True, quota=quota)
738     """
739
740     def set_account_versioning(self, versioning):
741         """
742         "param versioning: (str)
743         """
744         self.account_post(update=True, versioning=versioning)
745
746     def list_containers(self):
747         """
748         :returns: (dict)
749         """
750         r = self.account_get()
751         return r.json
752
753     def del_container(self, until=None, delimiter=None):
754         """
755         :param until: (str) formated date
756
757         :param delimiter: (str) with / empty container
758
759         :raises ClientError: 404 Container does not exist
760
761         :raises ClientError: 409 Container is not empty
762         """
763         self._assert_container()
764         r = self.container_delete(
765             until=until,
766             delimiter=delimiter,
767             success=(204, 404, 409))
768         if r.status_code == 404:
769             raise ClientError(
770                 'Container "%s" does not exist' % self.container,
771                 r.status_code)
772         elif r.status_code == 409:
773             raise ClientError(
774                 'Container "%s" is not empty' % self.container,
775                 r.status_code)
776
777     def get_container_versioning(self, container=None):
778         """
779         :param container: (str)
780
781         :returns: (dict)
782         """
783         cnt_back_up = self.container
784         try:
785             self.container = container or cnt_back_up
786             return filter_in(
787                 self.get_container_info(),
788                 'X-Container-Policy-Versioning')
789         finally:
790             self.container = cnt_back_up
791
792     def get_container_limit(self, container=None):
793         """
794         :param container: (str)
795
796         :returns: (dict)
797         """
798         cnt_back_up = self.container
799         try:
800             self.container = container or cnt_back_up
801             return filter_in(
802                 self.get_container_info(),
803                 'X-Container-Policy-Quota')
804         finally:
805             self.container = cnt_back_up
806
807     def get_container_info(self, until=None):
808         """
809         :param until: (str) formated date
810
811         :returns: (dict)
812
813         :raises ClientError: 404 Container not found
814         """
815         try:
816             r = self.container_head(until=until)
817         except ClientError as err:
818             err.details.append('for container %s' % self.container)
819             raise err
820         return r.headers
821
822     def get_container_meta(self, until=None):
823         """
824         :param until: (str) formated date
825
826         :returns: (dict)
827         """
828         return filter_in(
829             self.get_container_info(until=until),
830             'X-Container-Meta')
831
832     def get_container_object_meta(self, until=None):
833         """
834         :param until: (str) formated date
835
836         :returns: (dict)
837         """
838         return filter_in(
839             self.get_container_info(until=until),
840             'X-Container-Object-Meta')
841
842     def set_container_meta(self, metapairs):
843         """
844         :param metapairs: (dict) {key1:val1, key2:val2, ...}
845         """
846         assert(type(metapairs) is dict)
847         self.container_post(update=True, metadata=metapairs)
848
849     def del_container_meta(self, metakey):
850         """
851         :param metakey: (str) metadatum key
852         """
853         self.container_post(update=True, metadata={metakey: ''})
854
855     def set_container_limit(self, limit):
856         """
857         :param limit: (int)
858         """
859         self.container_post(update=True, quota=limit)
860
861     def set_container_versioning(self, versioning):
862         """
863         :param versioning: (str)
864         """
865         self.container_post(update=True, versioning=versioning)
866
867     def del_object(self, obj, until=None, delimiter=None):
868         """
869         :param obj: (str) remote object path
870
871         :param until: (str) formated date
872
873         :param delimiter: (str)
874         """
875         self._assert_container()
876         self.object_delete(obj, until=until, delimiter=delimiter)
877
878     def set_object_meta(self, obj, metapairs):
879         """
880         :param obj: (str) remote object path
881
882         :param metapairs: (dict) {key1:val1, key2:val2, ...}
883         """
884         assert(type(metapairs) is dict)
885         self.object_post(obj, update=True, metadata=metapairs)
886
887     def del_object_meta(self, obj, metakey):
888         """
889         :param obj: (str) remote object path
890
891         :param metakey: (str) metadatum key
892         """
893         self.object_post(obj, update=True, metadata={metakey: ''})
894
895     def publish_object(self, obj):
896         """
897         :param obj: (str) remote object path
898
899         :returns: (str) access url
900         """
901         self.object_post(obj, update=True, public=True)
902         info = self.get_object_info(obj)
903         pref, sep, rest = self.base_url.partition('//')
904         base = rest.split('/')[0]
905         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
906
907     def unpublish_object(self, obj):
908         """
909         :param obj: (str) remote object path
910         """
911         self.object_post(obj, update=True, public=False)
912
913     def get_object_info(self, obj, version=None):
914         """
915         :param obj: (str) remote object path
916
917         :param version: (str)
918
919         :returns: (dict)
920         """
921         try:
922             r = self.object_head(obj, version=version)
923             return r.headers
924         except ClientError as ce:
925             if ce.status == 404:
926                 raise ClientError('Object %s not found' % obj, status=404)
927             raise
928
929     def get_object_meta(self, obj, version=None):
930         """
931         :param obj: (str) remote object path
932
933         :param version: (str)
934
935         :returns: (dict)
936         """
937         return filter_in(
938             self.get_object_info(obj, version=version),
939             'X-Object-Meta')
940
941     def get_object_sharing(self, obj):
942         """
943         :param obj: (str) remote object path
944
945         :returns: (dict)
946         """
947         r = filter_in(
948             self.get_object_info(obj),
949             'X-Object-Sharing',
950             exactMatch=True)
951         reply = {}
952         if len(r) > 0:
953             perms = r['x-object-sharing'].split(';')
954             for perm in perms:
955                 try:
956                     perm.index('=')
957                 except ValueError:
958                     raise ClientError('Incorrect reply format')
959                 (key, val) = perm.strip().split('=')
960                 reply[key] = val
961         return reply
962
963     def set_object_sharing(
964             self, obj,
965             read_permition=False, write_permition=False):
966         """Give read/write permisions to an object.
967
968         :param obj: (str) remote object path
969
970         :param read_permition: (list - bool) users and user groups that get
971             read permition for this object - False means all previous read
972             permissions will be removed
973
974         :param write_perimition: (list - bool) of users and user groups to get
975            write permition for this object - False means all previous write
976            permissions will be removed
977         """
978
979         perms = dict(read=read_permition or '', write=write_permition or '')
980         self.object_post(obj, update=True, permissions=perms)
981
982     def del_object_sharing(self, obj):
983         """
984         :param obj: (str) remote object path
985         """
986         self.set_object_sharing(obj)
987
988     def append_object(self, obj, source_file, upload_cb=None):
989         """
990         :param obj: (str) remote object path
991
992         :param source_file: open file descriptor
993
994         :param upload_db: progress.bar for uploading
995         """
996
997         self._assert_container()
998         meta = self.get_container_info()
999         blocksize = int(meta['x-container-block-size'])
1000         filesize = fstat(source_file.fileno()).st_size
1001         nblocks = 1 + (filesize - 1) // blocksize
1002         offset = 0
1003         if upload_cb:
1004             upload_gen = upload_cb(nblocks)
1005             upload_gen.next()
1006         for i in range(nblocks):
1007             block = source_file.read(min(blocksize, filesize - offset))
1008             offset += len(block)
1009             self.object_post(
1010                 obj,
1011                 update=True,
1012                 content_range='bytes */*',
1013                 content_type='application/octet-stream',
1014                 content_length=len(block),
1015                 data=block)
1016
1017             if upload_cb:
1018                 upload_gen.next()
1019
1020     def truncate_object(self, obj, upto_bytes):
1021         """
1022         :param obj: (str) remote object path
1023
1024         :param upto_bytes: max number of bytes to leave on file
1025         """
1026         self.object_post(
1027             obj,
1028             update=True,
1029             content_range='bytes 0-%s/*' % upto_bytes,
1030             content_type='application/octet-stream',
1031             object_bytes=upto_bytes,
1032             source_object=path4url(self.container, obj))
1033
1034     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1035         """Overwrite a part of an object from local source file
1036
1037         :param obj: (str) remote object path
1038
1039         :param start: (int) position in bytes to start overwriting from
1040
1041         :param end: (int) position in bytes to stop overwriting at
1042
1043         :param source_file: open file descriptor
1044
1045         :param upload_db: progress.bar for uploading
1046         """
1047
1048         r = self.get_object_info(obj)
1049         rf_size = int(r['content-length'])
1050         if rf_size < int(start):
1051             raise ClientError(
1052                 'Range start exceeds file size',
1053                 status=416)
1054         elif rf_size < int(end):
1055             raise ClientError(
1056                 'Range end exceeds file size',
1057                 status=416)
1058         self._assert_container()
1059         meta = self.get_container_info()
1060         blocksize = int(meta['x-container-block-size'])
1061         filesize = fstat(source_file.fileno()).st_size
1062         datasize = int(end) - int(start) + 1
1063         nblocks = 1 + (datasize - 1) // blocksize
1064         offset = 0
1065         if upload_cb:
1066             upload_gen = upload_cb(nblocks)
1067             upload_gen.next()
1068         for i in range(nblocks):
1069             read_size = min(blocksize, filesize - offset, datasize - offset)
1070             block = source_file.read(read_size)
1071             self.object_post(
1072                 obj,
1073                 update=True,
1074                 content_type='application/octet-stream',
1075                 content_length=len(block),
1076                 content_range='bytes %s-%s/*' % (
1077                     start + offset,
1078                     start + offset + len(block) - 1),
1079                 data=block)
1080             offset += len(block)
1081
1082             if upload_cb:
1083                 upload_gen.next()
1084
1085     def copy_object(
1086             self, src_container, src_object, dst_container,
1087             dst_object=None,
1088             source_version=None,
1089             source_account=None,
1090             public=False,
1091             content_type=None,
1092             delimiter=None):
1093         """
1094         :param src_container: (str) source container
1095
1096         :param src_object: (str) source object path
1097
1098         :param dst_container: (str) destination container
1099
1100         :param dst_object: (str) destination object path
1101
1102         :param source_version: (str) source object version
1103
1104         :param source_account: (str) account to copy from
1105
1106         :param public: (bool)
1107
1108         :param content_type: (str)
1109
1110         :param delimiter: (str)
1111         """
1112         self._assert_account()
1113         self.container = dst_container
1114         src_path = path4url(src_container, src_object)
1115         self.object_put(
1116             dst_object or src_object,
1117             success=201,
1118             copy_from=src_path,
1119             content_length=0,
1120             source_version=source_version,
1121             source_account=source_account,
1122             public=public,
1123             content_type=content_type,
1124             delimiter=delimiter)
1125
1126     def move_object(
1127             self, src_container, src_object, dst_container,
1128             dst_object=False,
1129             source_account=None,
1130             source_version=None,
1131             public=False,
1132             content_type=None,
1133             delimiter=None):
1134         """
1135         :param src_container: (str) source container
1136
1137         :param src_object: (str) source object path
1138
1139         :param dst_container: (str) destination container
1140
1141         :param dst_object: (str) destination object path
1142
1143         :param source_account: (str) account to move from
1144
1145         :param source_version: (str) source object version
1146
1147         :param public: (bool)
1148
1149         :param content_type: (str)
1150
1151         :param delimiter: (str)
1152         """
1153         self._assert_account()
1154         self.container = dst_container
1155         dst_object = dst_object or src_object
1156         src_path = path4url(src_container, src_object)
1157         self.object_put(
1158             dst_object,
1159             success=201,
1160             move_from=src_path,
1161             content_length=0,
1162             source_account=source_account,
1163             source_version=source_version,
1164             public=public,
1165             content_type=content_type,
1166             delimiter=delimiter)
1167
1168     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1169         """Get accounts that share with self.account
1170
1171         :param limit: (str)
1172
1173         :param marker: (str)
1174
1175         :returns: (dict)
1176         """
1177         self._assert_account()
1178
1179         self.set_param('format', 'json')
1180         self.set_param('limit', limit, iff=limit is not None)
1181         self.set_param('marker', marker, iff=marker is not None)
1182
1183         path = ''
1184         success = kwargs.pop('success', (200, 204))
1185         r = self.get(path, *args, success=success, **kwargs)
1186         return r.json
1187
1188     def get_object_versionlist(self, obj):
1189         """
1190         :param obj: (str) remote object path
1191
1192         :returns: (list)
1193         """
1194         self._assert_container()
1195         r = self.object_get(obj, format='json', version='list')
1196         return r.json['versions']