Fix typo in using download progress bar
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             permissions=None,
211             public=None,
212             success=(201, 409)):
213         r = self.object_put(
214             obj,
215             format='json',
216             hashmap=True,
217             content_type=content_type,
218             json=json,
219             etag=etag,
220             content_encoding=content_encoding,
221             content_disposition=content_disposition,
222             permissions=permissions,
223             public=public,
224             success=success)
225         return None if r.status_code == 201 else r.json
226
227     def _culculate_blocks_for_upload(
228             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229             hash_cb=None):
230         offset = 0
231         if hash_cb:
232             hash_gen = hash_cb(nblocks)
233             hash_gen.next()
234
235         for i in range(nblocks):
236             block = fileobj.read(min(blocksize, size - offset))
237             bytes = len(block)
238             hash = _pithos_hash(block, blockhash)
239             hashes.append(hash)
240             hmap[hash] = (offset, bytes)
241             offset += bytes
242             if hash_cb:
243                 hash_gen.next()
244         msg = 'Failed to calculate uploaded blocks:'
245         ' Offset and object size do not match'
246         assert offset == size, msg
247
248     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249         """upload missing blocks asynchronously"""
250
251         self._init_thread_limit()
252
253         flying = []
254         failures = []
255         for hash in missing:
256             offset, bytes = hmap[hash]
257             fileobj.seek(offset)
258             data = fileobj.read(bytes)
259             r = self._put_block_async(data, hash, upload_gen)
260             flying.append(r)
261             unfinished = self._watch_thread_limit(flying)
262             for thread in set(flying).difference(unfinished):
263                 if thread.exception:
264                     failures.append(thread)
265                     if isinstance(
266                             thread.exception,
267                             ClientError) and thread.exception.status == 502:
268                         self.POOLSIZE = self._thread_limit
269                 elif thread.isAlive():
270                     flying.append(thread)
271                 elif upload_gen:
272                     try:
273                         upload_gen.next()
274                     except:
275                         pass
276             flying = unfinished
277
278         for thread in flying:
279             thread.join()
280             if thread.exception:
281                 failures.append(thread)
282             elif upload_gen:
283                 try:
284                     upload_gen.next()
285                 except:
286                     pass
287
288         return [failure.kwargs['hash'] for failure in failures]
289
290     def upload_object(
291             self, obj, f,
292             size=None,
293             hash_cb=None,
294             upload_cb=None,
295             etag=None,
296             content_encoding=None,
297             content_disposition=None,
298             content_type=None,
299             sharing=None,
300             public=None):
301         """Upload an object using multiple connections (threads)
302
303         :param obj: (str) remote object path
304
305         :param f: open file descriptor (rb)
306
307         :param hash_cb: optional progress.bar object for calculating hashes
308
309         :param upload_cb: optional progress.bar object for uploading
310
311         :param etag: (str)
312
313         :param content_encoding: (str)
314
315         :param content_disposition: (str)
316
317         :param content_type: (str)
318
319         :param sharing: {'read':[user and/or grp names],
320             'write':[usr and/or grp names]}
321
322         :param public: (bool)
323         """
324         self._assert_container()
325
326         #init
327         block_info = (blocksize, blockhash, size, nblocks) =\
328             self._get_file_block_info(f, size)
329         (hashes, hmap, offset) = ([], {}, 0)
330         if not content_type:
331             content_type = 'application/octet-stream'
332
333         self._culculate_blocks_for_upload(
334             *block_info,
335             hashes=hashes,
336             hmap=hmap,
337             fileobj=f,
338             hash_cb=hash_cb)
339
340         hashmap = dict(bytes=size, hashes=hashes)
341         missing = self._get_missing_hashes(
342             obj, hashmap,
343             content_type=content_type,
344             size=size,
345             etag=etag,
346             content_encoding=content_encoding,
347             content_disposition=content_disposition,
348             permissions=sharing,
349             public=public)
350
351         if missing is None:
352             return
353
354         if upload_cb:
355             upload_gen = upload_cb(len(missing))
356             for i in range(len(missing), len(hashmap['hashes']) + 1):
357                 try:
358                     upload_gen.next()
359                 except:
360                     upload_gen = None
361         else:
362             upload_gen = None
363
364         retries = 7
365         try:
366             while retries:
367                 sendlog.info('%s blocks missing' % len(missing))
368                 num_of_blocks = len(missing)
369                 missing = self._upload_missing_blocks(
370                     missing,
371                     hmap,
372                     f,
373                     upload_gen)
374                 if missing:
375                     if num_of_blocks == len(missing):
376                         retries -= 1
377                     else:
378                         num_of_blocks = len(missing)
379                 else:
380                     break
381             if missing:
382                 raise ClientError(
383                     '%s blocks failed to upload' % len(missing),
384                     status=800)
385         except KeyboardInterrupt:
386             sendlog.info('- - - wait for threads to finish')
387             for thread in activethreads():
388                 thread.join()
389             raise
390
391         self.object_put(
392             obj,
393             format='json',
394             hashmap=True,
395             content_type=content_type,
396             json=hashmap,
397             success=201)
398
399     # download_* auxiliary methods
400     def _get_remote_blocks_info(self, obj, **restargs):
401         #retrieve object hashmap
402         myrange = restargs.pop('data_range', None)
403         hashmap = self.get_object_hashmap(obj, **restargs)
404         restargs['data_range'] = myrange
405         blocksize = int(hashmap['block_size'])
406         blockhash = hashmap['block_hash']
407         total_size = hashmap['bytes']
408         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
409         map_dict = {}
410         for i, h in enumerate(hashmap['hashes']):
411             #  map_dict[h] = i   CHAGE
412             if h in map_dict:
413                 map_dict[h].append(i)
414             else:
415                 map_dict[h] = [i]
416         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
417
418     def _dump_blocks_sync(
419             self, obj, remote_hashes, blocksize, total_size, dst, range,
420             **args):
421         for blockid, blockhash in enumerate(remote_hashes):
422             if blockhash:
423                 start = blocksize * blockid
424                 is_last = start + blocksize > total_size
425                 end = (total_size - 1) if is_last else (start + blocksize - 1)
426                 (start, end) = _range_up(start, end, range)
427                 args['data_range'] = 'bytes=%s-%s' % (start, end)
428                 r = self.object_get(obj, success=(200, 206), **args)
429                 self._cb_next()
430                 dst.write(r.content)
431                 dst.flush()
432
433     def _get_block_async(self, obj, **args):
434         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
435         event.start()
436         return event
437
438     def _hash_from_file(self, fp, start, size, blockhash):
439         fp.seek(start)
440         block = fp.read(size)
441         h = newhashlib(blockhash)
442         h.update(block.strip('\x00'))
443         return hexlify(h.digest())
444
445     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
446         """write the results of a greenleted rest call to a file
447
448         :param offset: the offset of the file up to blocksize
449         - e.g. if the range is 10-100, all blocks will be written to
450         normal_position - 10
451         """
452         for i, (key, g) in enumerate(flying.items()):
453             if g.isAlive():
454                 continue
455             if g.exception:
456                 raise g.exception
457             block = g.value.content
458             for block_start in blockids[key]:
459                 local_file.seek(block_start + offset)
460                 local_file.write(block)
461                 self._cb_next()
462             flying.pop(key)
463             blockids.pop(key)
464         local_file.flush()
465
466     def _dump_blocks_async(
467             self, obj, remote_hashes, blocksize, total_size, local_file,
468             blockhash=None, resume=False, filerange=None, **restargs):
469         file_size = fstat(local_file.fileno()).st_size if resume else 0
470         flying = dict()
471         blockid_dict = dict()
472         offset = 0
473         if filerange is not None:
474             rstart = int(filerange.split('-')[0])
475             offset = rstart if blocksize > rstart else rstart % blocksize
476
477         self._init_thread_limit()
478         for block_hash, blockids in remote_hashes.items():
479             blockids = [blk * blocksize for blk in blockids]
480             unsaved = [blk for blk in blockids if not (
481                 blk < file_size and block_hash == self._hash_from_file(
482                         local_file, blk, blocksize, blockhash))]
483             self._cb_next(len(blockids) - len(unsaved))
484             if unsaved:
485                 key = unsaved[0]
486                 self._watch_thread_limit(flying.values())
487                 self._thread2file(
488                     flying, blockid_dict, local_file, offset,
489                     **restargs)
490                 end = total_size - 1 if key + blocksize > total_size\
491                     else key + blocksize - 1
492                 start, end = _range_up(key, end, filerange)
493                 if start == end:
494                     self._cb_next()
495                     continue
496                 restargs['async_headers'] = {
497                     'Range': 'bytes=%s-%s' % (start, end)}
498                 flying[key] = self._get_block_async(obj, **restargs)
499                 blockid_dict[key] = unsaved
500
501         for thread in flying.values():
502             thread.join()
503         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
504
505     def download_object(
506             self, obj, dst,
507             download_cb=None,
508             version=None,
509             resume=False,
510             range_str=None,
511             if_match=None,
512             if_none_match=None,
513             if_modified_since=None,
514             if_unmodified_since=None):
515         """Download an object (multiple connections, random blocks)
516
517         :param obj: (str) remote object path
518
519         :param dst: open file descriptor (wb+)
520
521         :param download_cb: optional progress.bar object for downloading
522
523         :param version: (str) file version
524
525         :param resume: (bool) if set, preserve already downloaded file parts
526
527         :param range_str: (str) from, to are file positions (int) in bytes
528
529         :param if_match: (str)
530
531         :param if_none_match: (str)
532
533         :param if_modified_since: (str) formated date
534
535         :param if_unmodified_since: (str) formated date"""
536         restargs = dict(
537             version=version,
538             data_range=None if range_str is None else 'bytes=%s' % range_str,
539             if_match=if_match,
540             if_none_match=if_none_match,
541             if_modified_since=if_modified_since,
542             if_unmodified_since=if_unmodified_since)
543
544         (
545             blocksize,
546             blockhash,
547             total_size,
548             hash_list,
549             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
550         assert total_size >= 0
551
552         if download_cb:
553             self.progress_bar_gen = download_cb(len(hash_list))
554             self._cb_next()
555
556         if dst.isatty():
557             self._dump_blocks_sync(
558                 obj,
559                 hash_list,
560                 blocksize,
561                 total_size,
562                 dst,
563                 range_str,
564                 **restargs)
565         else:
566             self._dump_blocks_async(
567                 obj,
568                 remote_hashes,
569                 blocksize,
570                 total_size,
571                 dst,
572                 blockhash,
573                 resume,
574                 range_str,
575                 **restargs)
576             if not range_str:
577                 dst.truncate(total_size)
578
579         self._complete_cb()
580
581     #Command Progress Bar method
582     def _cb_next(self, step=1):
583         if hasattr(self, 'progress_bar_gen'):
584             try:
585                 for i in xrange(step):
586                     self.progress_bar_gen.next()
587             except:
588                 pass
589
590     def _complete_cb(self):
591         while True:
592             try:
593                 self.progress_bar_gen.next()
594             except:
595                 break
596
597     def get_object_hashmap(
598             self, obj,
599             version=None,
600             if_match=None,
601             if_none_match=None,
602             if_modified_since=None,
603             if_unmodified_since=None,
604             data_range=None):
605         """
606         :param obj: (str) remote object path
607
608         :param if_match: (str)
609
610         :param if_none_match: (str)
611
612         :param if_modified_since: (str) formated date
613
614         :param if_unmodified_since: (str) formated date
615
616         :param data_range: (str) from-to where from and to are integers
617             denoting file positions in bytes
618
619         :returns: (list)
620         """
621         try:
622             r = self.object_get(
623                 obj,
624                 hashmap=True,
625                 version=version,
626                 if_etag_match=if_match,
627                 if_etag_not_match=if_none_match,
628                 if_modified_since=if_modified_since,
629                 if_unmodified_since=if_unmodified_since,
630                 data_range=data_range)
631         except ClientError as err:
632             if err.status == 304 or err.status == 412:
633                 return {}
634             raise
635         return r.json
636
637     def set_account_group(self, group, usernames):
638         """
639         :param group: (str)
640
641         :param usernames: (list)
642         """
643         self.account_post(update=True, groups={group: usernames})
644
645     def del_account_group(self, group):
646         """
647         :param group: (str)
648         """
649         self.account_post(update=True, groups={group: []})
650
651     def get_account_info(self, until=None):
652         """
653         :param until: (str) formated date
654
655         :returns: (dict)
656         """
657         r = self.account_head(until=until)
658         if r.status_code == 401:
659             raise ClientError("No authorization", status=401)
660         return r.headers
661
662     def get_account_quota(self):
663         """
664         :returns: (dict)
665         """
666         return filter_in(
667             self.get_account_info(),
668             'X-Account-Policy-Quota',
669             exactMatch=True)
670
671     def get_account_versioning(self):
672         """
673         :returns: (dict)
674         """
675         return filter_in(
676             self.get_account_info(),
677             'X-Account-Policy-Versioning',
678             exactMatch=True)
679
680     def get_account_meta(self, until=None):
681         """
682         :meta until: (str) formated date
683
684         :returns: (dict)
685         """
686         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
687
688     def get_account_group(self):
689         """
690         :returns: (dict)
691         """
692         return filter_in(self.get_account_info(), 'X-Account-Group-')
693
694     def set_account_meta(self, metapairs):
695         """
696         :param metapairs: (dict) {key1:val1, key2:val2, ...}
697         """
698         assert(type(metapairs) is dict)
699         self.account_post(update=True, metadata=metapairs)
700
701     def del_account_meta(self, metakey):
702         """
703         :param metakey: (str) metadatum key
704         """
705         self.account_post(update=True, metadata={metakey: ''})
706
707     def set_account_quota(self, quota):
708         """
709         :param quota: (int)
710         """
711         self.account_post(update=True, quota=quota)
712
713     def set_account_versioning(self, versioning):
714         """
715         "param versioning: (str)
716         """
717         self.account_post(update=True, versioning=versioning)
718
719     def list_containers(self):
720         """
721         :returns: (dict)
722         """
723         r = self.account_get()
724         return r.json
725
726     def del_container(self, until=None, delimiter=None):
727         """
728         :param until: (str) formated date
729
730         :param delimiter: (str) with / empty container
731
732         :raises ClientError: 404 Container does not exist
733
734         :raises ClientError: 409 Container is not empty
735         """
736         self._assert_container()
737         r = self.container_delete(
738             until=until,
739             delimiter=delimiter,
740             success=(204, 404, 409))
741         if r.status_code == 404:
742             raise ClientError(
743                 'Container "%s" does not exist' % self.container,
744                 r.status_code)
745         elif r.status_code == 409:
746             raise ClientError(
747                 'Container "%s" is not empty' % self.container,
748                 r.status_code)
749
750     def get_container_versioning(self, container=None):
751         """
752         :param container: (str)
753
754         :returns: (dict)
755         """
756         cnt_back_up = self.container
757         try:
758             self.container = container or cnt_back_up
759             return filter_in(
760                 self.get_container_info(),
761                 'X-Container-Policy-Versioning')
762         finally:
763             self.container = cnt_back_up
764
765     def get_container_quota(self, container=None):
766         """
767         :param container: (str)
768
769         :returns: (dict)
770         """
771         cnt_back_up = self.container
772         try:
773             self.container = container or cnt_back_up
774             return filter_in(
775                 self.get_container_info(),
776                 'X-Container-Policy-Quota')
777         finally:
778             self.container = cnt_back_up
779
780     def get_container_info(self, until=None):
781         """
782         :param until: (str) formated date
783
784         :returns: (dict)
785
786         :raises ClientError: 404 Container not found
787         """
788         try:
789             r = self.container_head(until=until)
790         except ClientError as err:
791             err.details.append('for container %s' % self.container)
792             raise err
793         return r.headers
794
795     def get_container_meta(self, until=None):
796         """
797         :param until: (str) formated date
798
799         :returns: (dict)
800         """
801         return filter_in(
802             self.get_container_info(until=until),
803             'X-Container-Meta')
804
805     def get_container_object_meta(self, until=None):
806         """
807         :param until: (str) formated date
808
809         :returns: (dict)
810         """
811         return filter_in(
812             self.get_container_info(until=until),
813             'X-Container-Object-Meta')
814
815     def set_container_meta(self, metapairs):
816         """
817         :param metapairs: (dict) {key1:val1, key2:val2, ...}
818         """
819         assert(type(metapairs) is dict)
820         self.container_post(update=True, metadata=metapairs)
821
822     def del_container_meta(self, metakey):
823         """
824         :param metakey: (str) metadatum key
825         """
826         self.container_post(update=True, metadata={metakey: ''})
827
828     def set_container_quota(self, quota):
829         """
830         :param quota: (int)
831         """
832         self.container_post(update=True, quota=quota)
833
834     def set_container_versioning(self, versioning):
835         """
836         :param versioning: (str)
837         """
838         self.container_post(update=True, versioning=versioning)
839
840     def del_object(self, obj, until=None, delimiter=None):
841         """
842         :param obj: (str) remote object path
843
844         :param until: (str) formated date
845
846         :param delimiter: (str)
847         """
848         self._assert_container()
849         self.object_delete(obj, until=until, delimiter=delimiter)
850
851     def set_object_meta(self, obj, metapairs):
852         """
853         :param obj: (str) remote object path
854
855         :param metapairs: (dict) {key1:val1, key2:val2, ...}
856         """
857         assert(type(metapairs) is dict)
858         self.object_post(obj, update=True, metadata=metapairs)
859
860     def del_object_meta(self, obj, metakey):
861         """
862         :param obj: (str) remote object path
863
864         :param metakey: (str) metadatum key
865         """
866         self.object_post(obj, update=True, metadata={metakey: ''})
867
868     def publish_object(self, obj):
869         """
870         :param obj: (str) remote object path
871
872         :returns: (str) access url
873         """
874         self.object_post(obj, update=True, public=True)
875         info = self.get_object_info(obj)
876         pref, sep, rest = self.base_url.partition('//')
877         base = rest.split('/')[0]
878         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
879
880     def unpublish_object(self, obj):
881         """
882         :param obj: (str) remote object path
883         """
884         self.object_post(obj, update=True, public=False)
885
886     def get_object_info(self, obj, version=None):
887         """
888         :param obj: (str) remote object path
889
890         :param version: (str)
891
892         :returns: (dict)
893         """
894         try:
895             r = self.object_head(obj, version=version)
896             return r.headers
897         except ClientError as ce:
898             if ce.status == 404:
899                 raise ClientError('Object %s not found' % obj, status=404)
900             raise
901
902     def get_object_meta(self, obj, version=None):
903         """
904         :param obj: (str) remote object path
905
906         :param version: (str)
907
908         :returns: (dict)
909         """
910         return filter_in(
911             self.get_object_info(obj, version=version),
912             'X-Object-Meta')
913
914     def get_object_sharing(self, obj):
915         """
916         :param obj: (str) remote object path
917
918         :returns: (dict)
919         """
920         r = filter_in(
921             self.get_object_info(obj),
922             'X-Object-Sharing',
923             exactMatch=True)
924         reply = {}
925         if len(r) > 0:
926             perms = r['x-object-sharing'].split(';')
927             for perm in perms:
928                 try:
929                     perm.index('=')
930                 except ValueError:
931                     raise ClientError('Incorrect reply format')
932                 (key, val) = perm.strip().split('=')
933                 reply[key] = val
934         return reply
935
936     def set_object_sharing(
937             self, obj,
938             read_permition=False, write_permition=False):
939         """Give read/write permisions to an object.
940
941         :param obj: (str) remote object path
942
943         :param read_permition: (list - bool) users and user groups that get
944             read permition for this object - False means all previous read
945             permissions will be removed
946
947         :param write_perimition: (list - bool) of users and user groups to get
948            write permition for this object - False means all previous write
949            permissions will be removed
950         """
951
952         perms = dict(read=read_permition or '', write=write_permition or '')
953         self.object_post(obj, update=True, permissions=perms)
954
955     def del_object_sharing(self, obj):
956         """
957         :param obj: (str) remote object path
958         """
959         self.set_object_sharing(obj)
960
961     def append_object(self, obj, source_file, upload_cb=None):
962         """
963         :param obj: (str) remote object path
964
965         :param source_file: open file descriptor
966
967         :param upload_db: progress.bar for uploading
968         """
969
970         self._assert_container()
971         meta = self.get_container_info()
972         blocksize = int(meta['x-container-block-size'])
973         filesize = fstat(source_file.fileno()).st_size
974         nblocks = 1 + (filesize - 1) // blocksize
975         offset = 0
976         if upload_cb:
977             upload_gen = upload_cb(nblocks)
978             upload_gen.next()
979         for i in range(nblocks):
980             block = source_file.read(min(blocksize, filesize - offset))
981             offset += len(block)
982             self.object_post(
983                 obj,
984                 update=True,
985                 content_range='bytes */*',
986                 content_type='application/octet-stream',
987                 content_length=len(block),
988                 data=block)
989
990             if upload_cb:
991                 upload_gen.next()
992
993     def truncate_object(self, obj, upto_bytes):
994         """
995         :param obj: (str) remote object path
996
997         :param upto_bytes: max number of bytes to leave on file
998         """
999         self.object_post(
1000             obj,
1001             update=True,
1002             content_range='bytes 0-%s/*' % upto_bytes,
1003             content_type='application/octet-stream',
1004             object_bytes=upto_bytes,
1005             source_object=path4url(self.container, obj))
1006
1007     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1008         """Overwrite a part of an object from local source file
1009
1010         :param obj: (str) remote object path
1011
1012         :param start: (int) position in bytes to start overwriting from
1013
1014         :param end: (int) position in bytes to stop overwriting at
1015
1016         :param source_file: open file descriptor
1017
1018         :param upload_db: progress.bar for uploading
1019         """
1020
1021         r = self.get_object_info(obj)
1022         rf_size = int(r['content-length'])
1023         if rf_size < int(start):
1024             raise ClientError(
1025                 'Range start exceeds file size',
1026                 status=416)
1027         elif rf_size < int(end):
1028             raise ClientError(
1029                 'Range end exceeds file size',
1030                 status=416)
1031         self._assert_container()
1032         meta = self.get_container_info()
1033         blocksize = int(meta['x-container-block-size'])
1034         filesize = fstat(source_file.fileno()).st_size
1035         datasize = int(end) - int(start) + 1
1036         nblocks = 1 + (datasize - 1) // blocksize
1037         offset = 0
1038         if upload_cb:
1039             upload_gen = upload_cb(nblocks)
1040             upload_gen.next()
1041         for i in range(nblocks):
1042             read_size = min(blocksize, filesize - offset, datasize - offset)
1043             block = source_file.read(read_size)
1044             self.object_post(
1045                 obj,
1046                 update=True,
1047                 content_type='application/octet-stream',
1048                 content_length=len(block),
1049                 content_range='bytes %s-%s/*' % (
1050                     start + offset,
1051                     start + offset + len(block) - 1),
1052                 data=block)
1053             offset += len(block)
1054
1055             if upload_cb:
1056                 upload_gen.next()
1057
1058     def copy_object(
1059             self, src_container, src_object, dst_container,
1060             dst_object=None,
1061             source_version=None,
1062             source_account=None,
1063             public=False,
1064             content_type=None,
1065             delimiter=None):
1066         """
1067         :param src_container: (str) source container
1068
1069         :param src_object: (str) source object path
1070
1071         :param dst_container: (str) destination container
1072
1073         :param dst_object: (str) destination object path
1074
1075         :param source_version: (str) source object version
1076
1077         :param source_account: (str) account to copy from
1078
1079         :param public: (bool)
1080
1081         :param content_type: (str)
1082
1083         :param delimiter: (str)
1084         """
1085         self._assert_account()
1086         self.container = dst_container
1087         src_path = path4url(src_container, src_object)
1088         self.object_put(
1089             dst_object or src_object,
1090             success=201,
1091             copy_from=src_path,
1092             content_length=0,
1093             source_version=source_version,
1094             source_account=source_account,
1095             public=public,
1096             content_type=content_type,
1097             delimiter=delimiter)
1098
1099     def move_object(
1100             self, src_container, src_object, dst_container,
1101             dst_object=False,
1102             source_account=None,
1103             source_version=None,
1104             public=False,
1105             content_type=None,
1106             delimiter=None):
1107         """
1108         :param src_container: (str) source container
1109
1110         :param src_object: (str) source object path
1111
1112         :param dst_container: (str) destination container
1113
1114         :param dst_object: (str) destination object path
1115
1116         :param source_account: (str) account to move from
1117
1118         :param source_version: (str) source object version
1119
1120         :param public: (bool)
1121
1122         :param content_type: (str)
1123
1124         :param delimiter: (str)
1125         """
1126         self._assert_account()
1127         self.container = dst_container
1128         dst_object = dst_object or src_object
1129         src_path = path4url(src_container, src_object)
1130         self.object_put(
1131             dst_object,
1132             success=201,
1133             move_from=src_path,
1134             content_length=0,
1135             source_account=source_account,
1136             source_version=source_version,
1137             public=public,
1138             content_type=content_type,
1139             delimiter=delimiter)
1140
1141     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1142         """Get accounts that share with self.account
1143
1144         :param limit: (str)
1145
1146         :param marker: (str)
1147
1148         :returns: (dict)
1149         """
1150         self._assert_account()
1151
1152         self.set_param('format', 'json')
1153         self.set_param('limit', limit, iff=limit is not None)
1154         self.set_param('marker', marker, iff=marker is not None)
1155
1156         path = ''
1157         success = kwargs.pop('success', (200, 204))
1158         r = self.get(path, *args, success=success, **kwargs)
1159         return r.json
1160
1161     def get_object_versionlist(self, obj):
1162         """
1163         :param obj: (str) remote object path
1164
1165         :returns: (list)
1166         """
1167         self._assert_container()
1168         r = self.object_get(obj, format='json', version='list')
1169         return r.json['versions']