Split file-quota semantics for account and contner
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             content_encoding=None,
208             content_disposition=None,
209             permissions=None,
210             public=None,
211             success=(201, 409)):
212         r = self.object_put(
213             obj,
214             format='json',
215             hashmap=True,
216             content_type=content_type,
217             json=json,
218             content_encoding=content_encoding,
219             content_disposition=content_disposition,
220             permissions=permissions,
221             public=public,
222             success=success)
223         return None if r.status_code == 201 else r.json
224
225     def _culculate_blocks_for_upload(
226             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
227             hash_cb=None):
228         offset = 0
229         if hash_cb:
230             hash_gen = hash_cb(nblocks)
231             hash_gen.next()
232
233         for i in range(nblocks):
234             block = fileobj.read(min(blocksize, size - offset))
235             bytes = len(block)
236             hash = _pithos_hash(block, blockhash)
237             hashes.append(hash)
238             hmap[hash] = (offset, bytes)
239             offset += bytes
240             if hash_cb:
241                 hash_gen.next()
242         msg = 'Failed to calculate uploaded blocks:'
243         ' Offset and object size do not match'
244         assert offset == size, msg
245
246     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
247         """upload missing blocks asynchronously"""
248
249         self._init_thread_limit()
250
251         flying = []
252         failures = []
253         for hash in missing:
254             offset, bytes = hmap[hash]
255             fileobj.seek(offset)
256             data = fileobj.read(bytes)
257             r = self._put_block_async(data, hash, upload_gen)
258             flying.append(r)
259             unfinished = self._watch_thread_limit(flying)
260             for thread in set(flying).difference(unfinished):
261                 if thread.exception:
262                     failures.append(thread)
263                     if isinstance(
264                             thread.exception,
265                             ClientError) and thread.exception.status == 502:
266                         self.POOLSIZE = self._thread_limit
267                 elif thread.isAlive():
268                     flying.append(thread)
269                 elif upload_gen:
270                     try:
271                         upload_gen.next()
272                     except:
273                         pass
274             flying = unfinished
275
276         for thread in flying:
277             thread.join()
278             if thread.exception:
279                 failures.append(thread)
280             elif upload_gen:
281                 try:
282                     upload_gen.next()
283                 except:
284                     pass
285
286         return [failure.kwargs['hash'] for failure in failures]
287
288     def upload_object(
289             self, obj, f,
290             size=None,
291             hash_cb=None,
292             upload_cb=None,
293             etag=None,
294             if_etag_match=None,
295             if_not_exist=None,
296             content_encoding=None,
297             content_disposition=None,
298             content_type=None,
299             sharing=None,
300             public=None):
301         """Upload an object using multiple connections (threads)
302
303         :param obj: (str) remote object path
304
305         :param f: open file descriptor (rb)
306
307         :param hash_cb: optional progress.bar object for calculating hashes
308
309         :param upload_cb: optional progress.bar object for uploading
310
311         :param etag: (str)
312
313         :param if_etag_match: (str) Push that value to if-match header at file
314             creation
315
316         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
317             it does not exist remotely, otherwise the operation will fail.
318             Involves the case of an object with the same path is created while
319             the object is being uploaded.
320
321         :param content_encoding: (str)
322
323         :param content_disposition: (str)
324
325         :param content_type: (str)
326
327         :param sharing: {'read':[user and/or grp names],
328             'write':[usr and/or grp names]}
329
330         :param public: (bool)
331         """
332         self._assert_container()
333
334         #init
335         block_info = (blocksize, blockhash, size, nblocks) =\
336             self._get_file_block_info(f, size)
337         (hashes, hmap, offset) = ([], {}, 0)
338         if not content_type:
339             content_type = 'application/octet-stream'
340
341         self._culculate_blocks_for_upload(
342             *block_info,
343             hashes=hashes,
344             hmap=hmap,
345             fileobj=f,
346             hash_cb=hash_cb)
347
348         hashmap = dict(bytes=size, hashes=hashes)
349         missing = self._get_missing_hashes(
350             obj, hashmap,
351             content_type=content_type,
352             size=size,
353             content_encoding=content_encoding,
354             content_disposition=content_disposition,
355             permissions=sharing,
356             public=public)
357
358         if missing is None:
359             return
360
361         if upload_cb:
362             upload_gen = upload_cb(len(missing))
363             for i in range(len(missing), len(hashmap['hashes']) + 1):
364                 try:
365                     upload_gen.next()
366                 except:
367                     upload_gen = None
368         else:
369             upload_gen = None
370
371         retries = 7
372         try:
373             while retries:
374                 sendlog.info('%s blocks missing' % len(missing))
375                 num_of_blocks = len(missing)
376                 missing = self._upload_missing_blocks(
377                     missing,
378                     hmap,
379                     f,
380                     upload_gen)
381                 if missing:
382                     if num_of_blocks == len(missing):
383                         retries -= 1
384                     else:
385                         num_of_blocks = len(missing)
386                 else:
387                     break
388             if missing:
389                 raise ClientError(
390                     '%s blocks failed to upload' % len(missing),
391                     status=800)
392         except KeyboardInterrupt:
393             sendlog.info('- - - wait for threads to finish')
394             for thread in activethreads():
395                 thread.join()
396             raise
397
398         self.object_put(
399             obj,
400             format='json',
401             hashmap=True,
402             content_type=content_type,
403             if_etag_match=if_etag_match,
404             if_etag_not_match='*' if if_not_exist else None,
405             etag=etag,
406             json=hashmap,
407             permissions=sharing,
408             public=public,
409             success=201)
410
411     # download_* auxiliary methods
412     def _get_remote_blocks_info(self, obj, **restargs):
413         #retrieve object hashmap
414         myrange = restargs.pop('data_range', None)
415         hashmap = self.get_object_hashmap(obj, **restargs)
416         restargs['data_range'] = myrange
417         blocksize = int(hashmap['block_size'])
418         blockhash = hashmap['block_hash']
419         total_size = hashmap['bytes']
420         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
421         map_dict = {}
422         for i, h in enumerate(hashmap['hashes']):
423             #  map_dict[h] = i   CHAGE
424             if h in map_dict:
425                 map_dict[h].append(i)
426             else:
427                 map_dict[h] = [i]
428         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
429
430     def _dump_blocks_sync(
431             self, obj, remote_hashes, blocksize, total_size, dst, range,
432             **args):
433         for blockid, blockhash in enumerate(remote_hashes):
434             if blockhash:
435                 start = blocksize * blockid
436                 is_last = start + blocksize > total_size
437                 end = (total_size - 1) if is_last else (start + blocksize - 1)
438                 (start, end) = _range_up(start, end, range)
439                 args['data_range'] = 'bytes=%s-%s' % (start, end)
440                 r = self.object_get(obj, success=(200, 206), **args)
441                 self._cb_next()
442                 dst.write(r.content)
443                 dst.flush()
444
445     def _get_block_async(self, obj, **args):
446         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
447         event.start()
448         return event
449
450     def _hash_from_file(self, fp, start, size, blockhash):
451         fp.seek(start)
452         block = fp.read(size)
453         h = newhashlib(blockhash)
454         h.update(block.strip('\x00'))
455         return hexlify(h.digest())
456
457     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
458         """write the results of a greenleted rest call to a file
459
460         :param offset: the offset of the file up to blocksize
461         - e.g. if the range is 10-100, all blocks will be written to
462         normal_position - 10
463         """
464         for i, (key, g) in enumerate(flying.items()):
465             if g.isAlive():
466                 continue
467             if g.exception:
468                 raise g.exception
469             block = g.value.content
470             for block_start in blockids[key]:
471                 local_file.seek(block_start + offset)
472                 local_file.write(block)
473                 self._cb_next()
474             flying.pop(key)
475             blockids.pop(key)
476         local_file.flush()
477
478     def _dump_blocks_async(
479             self, obj, remote_hashes, blocksize, total_size, local_file,
480             blockhash=None, resume=False, filerange=None, **restargs):
481         file_size = fstat(local_file.fileno()).st_size if resume else 0
482         flying = dict()
483         blockid_dict = dict()
484         offset = 0
485         if filerange is not None:
486             rstart = int(filerange.split('-')[0])
487             offset = rstart if blocksize > rstart else rstart % blocksize
488
489         self._init_thread_limit()
490         for block_hash, blockids in remote_hashes.items():
491             blockids = [blk * blocksize for blk in blockids]
492             unsaved = [blk for blk in blockids if not (
493                 blk < file_size and block_hash == self._hash_from_file(
494                         local_file, blk, blocksize, blockhash))]
495             self._cb_next(len(blockids) - len(unsaved))
496             if unsaved:
497                 key = unsaved[0]
498                 self._watch_thread_limit(flying.values())
499                 self._thread2file(
500                     flying, blockid_dict, local_file, offset,
501                     **restargs)
502                 end = total_size - 1 if key + blocksize > total_size\
503                     else key + blocksize - 1
504                 start, end = _range_up(key, end, filerange)
505                 if start == end:
506                     self._cb_next()
507                     continue
508                 restargs['async_headers'] = {
509                     'Range': 'bytes=%s-%s' % (start, end)}
510                 flying[key] = self._get_block_async(obj, **restargs)
511                 blockid_dict[key] = unsaved
512
513         for thread in flying.values():
514             thread.join()
515         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
516
517     def download_object(
518             self, obj, dst,
519             download_cb=None,
520             version=None,
521             resume=False,
522             range_str=None,
523             if_match=None,
524             if_none_match=None,
525             if_modified_since=None,
526             if_unmodified_since=None):
527         """Download an object (multiple connections, random blocks)
528
529         :param obj: (str) remote object path
530
531         :param dst: open file descriptor (wb+)
532
533         :param download_cb: optional progress.bar object for downloading
534
535         :param version: (str) file version
536
537         :param resume: (bool) if set, preserve already downloaded file parts
538
539         :param range_str: (str) from, to are file positions (int) in bytes
540
541         :param if_match: (str)
542
543         :param if_none_match: (str)
544
545         :param if_modified_since: (str) formated date
546
547         :param if_unmodified_since: (str) formated date"""
548         restargs = dict(
549             version=version,
550             data_range=None if range_str is None else 'bytes=%s' % range_str,
551             if_match=if_match,
552             if_none_match=if_none_match,
553             if_modified_since=if_modified_since,
554             if_unmodified_since=if_unmodified_since)
555
556         (
557             blocksize,
558             blockhash,
559             total_size,
560             hash_list,
561             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
562         assert total_size >= 0
563
564         if download_cb:
565             self.progress_bar_gen = download_cb(len(hash_list))
566             self._cb_next()
567
568         if dst.isatty():
569             self._dump_blocks_sync(
570                 obj,
571                 hash_list,
572                 blocksize,
573                 total_size,
574                 dst,
575                 range_str,
576                 **restargs)
577         else:
578             self._dump_blocks_async(
579                 obj,
580                 remote_hashes,
581                 blocksize,
582                 total_size,
583                 dst,
584                 blockhash,
585                 resume,
586                 range_str,
587                 **restargs)
588             if not range_str:
589                 dst.truncate(total_size)
590
591         self._complete_cb()
592
593     #Command Progress Bar method
594     def _cb_next(self, step=1):
595         if hasattr(self, 'progress_bar_gen'):
596             try:
597                 for i in xrange(step):
598                     self.progress_bar_gen.next()
599             except:
600                 pass
601
602     def _complete_cb(self):
603         while True:
604             try:
605                 self.progress_bar_gen.next()
606             except:
607                 break
608
609     def get_object_hashmap(
610             self, obj,
611             version=None,
612             if_match=None,
613             if_none_match=None,
614             if_modified_since=None,
615             if_unmodified_since=None,
616             data_range=None):
617         """
618         :param obj: (str) remote object path
619
620         :param if_match: (str)
621
622         :param if_none_match: (str)
623
624         :param if_modified_since: (str) formated date
625
626         :param if_unmodified_since: (str) formated date
627
628         :param data_range: (str) from-to where from and to are integers
629             denoting file positions in bytes
630
631         :returns: (list)
632         """
633         try:
634             r = self.object_get(
635                 obj,
636                 hashmap=True,
637                 version=version,
638                 if_etag_match=if_match,
639                 if_etag_not_match=if_none_match,
640                 if_modified_since=if_modified_since,
641                 if_unmodified_since=if_unmodified_since,
642                 data_range=data_range)
643         except ClientError as err:
644             if err.status == 304 or err.status == 412:
645                 return {}
646             raise
647         return r.json
648
649     def set_account_group(self, group, usernames):
650         """
651         :param group: (str)
652
653         :param usernames: (list)
654         """
655         self.account_post(update=True, groups={group: usernames})
656
657     def del_account_group(self, group):
658         """
659         :param group: (str)
660         """
661         self.account_post(update=True, groups={group: []})
662
663     def get_account_info(self, until=None):
664         """
665         :param until: (str) formated date
666
667         :returns: (dict)
668         """
669         r = self.account_head(until=until)
670         if r.status_code == 401:
671             raise ClientError("No authorization", status=401)
672         return r.headers
673
674     def get_account_quota(self):
675         """
676         :returns: (dict)
677         """
678         return filter_in(
679             self.get_account_info(),
680             'X-Account-Policy-Quota',
681             exactMatch=True)
682
683     def get_account_versioning(self):
684         """
685         :returns: (dict)
686         """
687         return filter_in(
688             self.get_account_info(),
689             'X-Account-Policy-Versioning',
690             exactMatch=True)
691
692     def get_account_meta(self, until=None):
693         """
694         :meta until: (str) formated date
695
696         :returns: (dict)
697         """
698         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
699
700     def get_account_group(self):
701         """
702         :returns: (dict)
703         """
704         return filter_in(self.get_account_info(), 'X-Account-Group-')
705
706     def set_account_meta(self, metapairs):
707         """
708         :param metapairs: (dict) {key1:val1, key2:val2, ...}
709         """
710         assert(type(metapairs) is dict)
711         self.account_post(update=True, metadata=metapairs)
712
713     def del_account_meta(self, metakey):
714         """
715         :param metakey: (str) metadatum key
716         """
717         self.account_post(update=True, metadata={metakey: ''})
718
719     """
720     def set_account_quota(self, quota):
721         ""
722         :param quota: (int)
723         ""
724         self.account_post(update=True, quota=quota)
725     """
726
727     def set_account_versioning(self, versioning):
728         """
729         "param versioning: (str)
730         """
731         self.account_post(update=True, versioning=versioning)
732
733     def list_containers(self):
734         """
735         :returns: (dict)
736         """
737         r = self.account_get()
738         return r.json
739
740     def del_container(self, until=None, delimiter=None):
741         """
742         :param until: (str) formated date
743
744         :param delimiter: (str) with / empty container
745
746         :raises ClientError: 404 Container does not exist
747
748         :raises ClientError: 409 Container is not empty
749         """
750         self._assert_container()
751         r = self.container_delete(
752             until=until,
753             delimiter=delimiter,
754             success=(204, 404, 409))
755         if r.status_code == 404:
756             raise ClientError(
757                 'Container "%s" does not exist' % self.container,
758                 r.status_code)
759         elif r.status_code == 409:
760             raise ClientError(
761                 'Container "%s" is not empty' % self.container,
762                 r.status_code)
763
764     def get_container_versioning(self, container=None):
765         """
766         :param container: (str)
767
768         :returns: (dict)
769         """
770         cnt_back_up = self.container
771         try:
772             self.container = container or cnt_back_up
773             return filter_in(
774                 self.get_container_info(),
775                 'X-Container-Policy-Versioning')
776         finally:
777             self.container = cnt_back_up
778
779     def get_container_limit(self, container=None):
780         """
781         :param container: (str)
782
783         :returns: (dict)
784         """
785         cnt_back_up = self.container
786         try:
787             self.container = container or cnt_back_up
788             return filter_in(
789                 self.get_container_info(),
790                 'X-Container-Policy-Quota')
791         finally:
792             self.container = cnt_back_up
793
794     def get_container_info(self, until=None):
795         """
796         :param until: (str) formated date
797
798         :returns: (dict)
799
800         :raises ClientError: 404 Container not found
801         """
802         try:
803             r = self.container_head(until=until)
804         except ClientError as err:
805             err.details.append('for container %s' % self.container)
806             raise err
807         return r.headers
808
809     def get_container_meta(self, until=None):
810         """
811         :param until: (str) formated date
812
813         :returns: (dict)
814         """
815         return filter_in(
816             self.get_container_info(until=until),
817             'X-Container-Meta')
818
819     def get_container_object_meta(self, until=None):
820         """
821         :param until: (str) formated date
822
823         :returns: (dict)
824         """
825         return filter_in(
826             self.get_container_info(until=until),
827             'X-Container-Object-Meta')
828
829     def set_container_meta(self, metapairs):
830         """
831         :param metapairs: (dict) {key1:val1, key2:val2, ...}
832         """
833         assert(type(metapairs) is dict)
834         self.container_post(update=True, metadata=metapairs)
835
836     def del_container_meta(self, metakey):
837         """
838         :param metakey: (str) metadatum key
839         """
840         self.container_post(update=True, metadata={metakey: ''})
841
842     def set_container_limit(self, limit):
843         """
844         :param limit: (int)
845         """
846         self.container_post(update=True, quota=limit)
847
848     def set_container_versioning(self, versioning):
849         """
850         :param versioning: (str)
851         """
852         self.container_post(update=True, versioning=versioning)
853
854     def del_object(self, obj, until=None, delimiter=None):
855         """
856         :param obj: (str) remote object path
857
858         :param until: (str) formated date
859
860         :param delimiter: (str)
861         """
862         self._assert_container()
863         self.object_delete(obj, until=until, delimiter=delimiter)
864
865     def set_object_meta(self, obj, metapairs):
866         """
867         :param obj: (str) remote object path
868
869         :param metapairs: (dict) {key1:val1, key2:val2, ...}
870         """
871         assert(type(metapairs) is dict)
872         self.object_post(obj, update=True, metadata=metapairs)
873
874     def del_object_meta(self, obj, metakey):
875         """
876         :param obj: (str) remote object path
877
878         :param metakey: (str) metadatum key
879         """
880         self.object_post(obj, update=True, metadata={metakey: ''})
881
882     def publish_object(self, obj):
883         """
884         :param obj: (str) remote object path
885
886         :returns: (str) access url
887         """
888         self.object_post(obj, update=True, public=True)
889         info = self.get_object_info(obj)
890         pref, sep, rest = self.base_url.partition('//')
891         base = rest.split('/')[0]
892         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
893
894     def unpublish_object(self, obj):
895         """
896         :param obj: (str) remote object path
897         """
898         self.object_post(obj, update=True, public=False)
899
900     def get_object_info(self, obj, version=None):
901         """
902         :param obj: (str) remote object path
903
904         :param version: (str)
905
906         :returns: (dict)
907         """
908         try:
909             r = self.object_head(obj, version=version)
910             return r.headers
911         except ClientError as ce:
912             if ce.status == 404:
913                 raise ClientError('Object %s not found' % obj, status=404)
914             raise
915
916     def get_object_meta(self, obj, version=None):
917         """
918         :param obj: (str) remote object path
919
920         :param version: (str)
921
922         :returns: (dict)
923         """
924         return filter_in(
925             self.get_object_info(obj, version=version),
926             'X-Object-Meta')
927
928     def get_object_sharing(self, obj):
929         """
930         :param obj: (str) remote object path
931
932         :returns: (dict)
933         """
934         r = filter_in(
935             self.get_object_info(obj),
936             'X-Object-Sharing',
937             exactMatch=True)
938         reply = {}
939         if len(r) > 0:
940             perms = r['x-object-sharing'].split(';')
941             for perm in perms:
942                 try:
943                     perm.index('=')
944                 except ValueError:
945                     raise ClientError('Incorrect reply format')
946                 (key, val) = perm.strip().split('=')
947                 reply[key] = val
948         return reply
949
950     def set_object_sharing(
951             self, obj,
952             read_permition=False, write_permition=False):
953         """Give read/write permisions to an object.
954
955         :param obj: (str) remote object path
956
957         :param read_permition: (list - bool) users and user groups that get
958             read permition for this object - False means all previous read
959             permissions will be removed
960
961         :param write_perimition: (list - bool) of users and user groups to get
962            write permition for this object - False means all previous write
963            permissions will be removed
964         """
965
966         perms = dict(read=read_permition or '', write=write_permition or '')
967         self.object_post(obj, update=True, permissions=perms)
968
969     def del_object_sharing(self, obj):
970         """
971         :param obj: (str) remote object path
972         """
973         self.set_object_sharing(obj)
974
975     def append_object(self, obj, source_file, upload_cb=None):
976         """
977         :param obj: (str) remote object path
978
979         :param source_file: open file descriptor
980
981         :param upload_db: progress.bar for uploading
982         """
983
984         self._assert_container()
985         meta = self.get_container_info()
986         blocksize = int(meta['x-container-block-size'])
987         filesize = fstat(source_file.fileno()).st_size
988         nblocks = 1 + (filesize - 1) // blocksize
989         offset = 0
990         if upload_cb:
991             upload_gen = upload_cb(nblocks)
992             upload_gen.next()
993         for i in range(nblocks):
994             block = source_file.read(min(blocksize, filesize - offset))
995             offset += len(block)
996             self.object_post(
997                 obj,
998                 update=True,
999                 content_range='bytes */*',
1000                 content_type='application/octet-stream',
1001                 content_length=len(block),
1002                 data=block)
1003
1004             if upload_cb:
1005                 upload_gen.next()
1006
1007     def truncate_object(self, obj, upto_bytes):
1008         """
1009         :param obj: (str) remote object path
1010
1011         :param upto_bytes: max number of bytes to leave on file
1012         """
1013         self.object_post(
1014             obj,
1015             update=True,
1016             content_range='bytes 0-%s/*' % upto_bytes,
1017             content_type='application/octet-stream',
1018             object_bytes=upto_bytes,
1019             source_object=path4url(self.container, obj))
1020
1021     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1022         """Overwrite a part of an object from local source file
1023
1024         :param obj: (str) remote object path
1025
1026         :param start: (int) position in bytes to start overwriting from
1027
1028         :param end: (int) position in bytes to stop overwriting at
1029
1030         :param source_file: open file descriptor
1031
1032         :param upload_db: progress.bar for uploading
1033         """
1034
1035         r = self.get_object_info(obj)
1036         rf_size = int(r['content-length'])
1037         if rf_size < int(start):
1038             raise ClientError(
1039                 'Range start exceeds file size',
1040                 status=416)
1041         elif rf_size < int(end):
1042             raise ClientError(
1043                 'Range end exceeds file size',
1044                 status=416)
1045         self._assert_container()
1046         meta = self.get_container_info()
1047         blocksize = int(meta['x-container-block-size'])
1048         filesize = fstat(source_file.fileno()).st_size
1049         datasize = int(end) - int(start) + 1
1050         nblocks = 1 + (datasize - 1) // blocksize
1051         offset = 0
1052         if upload_cb:
1053             upload_gen = upload_cb(nblocks)
1054             upload_gen.next()
1055         for i in range(nblocks):
1056             read_size = min(blocksize, filesize - offset, datasize - offset)
1057             block = source_file.read(read_size)
1058             self.object_post(
1059                 obj,
1060                 update=True,
1061                 content_type='application/octet-stream',
1062                 content_length=len(block),
1063                 content_range='bytes %s-%s/*' % (
1064                     start + offset,
1065                     start + offset + len(block) - 1),
1066                 data=block)
1067             offset += len(block)
1068
1069             if upload_cb:
1070                 upload_gen.next()
1071
1072     def copy_object(
1073             self, src_container, src_object, dst_container,
1074             dst_object=None,
1075             source_version=None,
1076             source_account=None,
1077             public=False,
1078             content_type=None,
1079             delimiter=None):
1080         """
1081         :param src_container: (str) source container
1082
1083         :param src_object: (str) source object path
1084
1085         :param dst_container: (str) destination container
1086
1087         :param dst_object: (str) destination object path
1088
1089         :param source_version: (str) source object version
1090
1091         :param source_account: (str) account to copy from
1092
1093         :param public: (bool)
1094
1095         :param content_type: (str)
1096
1097         :param delimiter: (str)
1098         """
1099         self._assert_account()
1100         self.container = dst_container
1101         src_path = path4url(src_container, src_object)
1102         self.object_put(
1103             dst_object or src_object,
1104             success=201,
1105             copy_from=src_path,
1106             content_length=0,
1107             source_version=source_version,
1108             source_account=source_account,
1109             public=public,
1110             content_type=content_type,
1111             delimiter=delimiter)
1112
1113     def move_object(
1114             self, src_container, src_object, dst_container,
1115             dst_object=False,
1116             source_account=None,
1117             source_version=None,
1118             public=False,
1119             content_type=None,
1120             delimiter=None):
1121         """
1122         :param src_container: (str) source container
1123
1124         :param src_object: (str) source object path
1125
1126         :param dst_container: (str) destination container
1127
1128         :param dst_object: (str) destination object path
1129
1130         :param source_account: (str) account to move from
1131
1132         :param source_version: (str) source object version
1133
1134         :param public: (bool)
1135
1136         :param content_type: (str)
1137
1138         :param delimiter: (str)
1139         """
1140         self._assert_account()
1141         self.container = dst_container
1142         dst_object = dst_object or src_object
1143         src_path = path4url(src_container, src_object)
1144         self.object_put(
1145             dst_object,
1146             success=201,
1147             move_from=src_path,
1148             content_length=0,
1149             source_account=source_account,
1150             source_version=source_version,
1151             public=public,
1152             content_type=content_type,
1153             delimiter=delimiter)
1154
1155     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1156         """Get accounts that share with self.account
1157
1158         :param limit: (str)
1159
1160         :param marker: (str)
1161
1162         :returns: (dict)
1163         """
1164         self._assert_account()
1165
1166         self.set_param('format', 'json')
1167         self.set_param('limit', limit, iff=limit is not None)
1168         self.set_param('marker', marker, iff=marker is not None)
1169
1170         path = ''
1171         success = kwargs.pop('success', (200, 204))
1172         r = self.get(path, *args, success=success, **kwargs)
1173         return r.json
1174
1175     def get_object_versionlist(self, obj):
1176         """
1177         :param obj: (str) remote object path
1178
1179         :returns: (list)
1180         """
1181         self._assert_container()
1182         r = self.object_get(obj, format='json', version='list')
1183         return r.json['versions']