Implement download_to_string in pithos client
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39 from StringIO import StringIO
40
41 from binascii import hexlify
42
43 from kamaki.clients import SilentEvent, sendlog
44 from kamaki.clients.pithos.rest_api import PithosRestClient
45 from kamaki.clients.storage import ClientError
46 from kamaki.clients.utils import path4url, filter_in
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """Synnefo Pithos+ API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115
116         :returns: (dict) created object metadata
117         """
118         self._assert_container()
119
120         if withHashFile:
121             data = f.read()
122             try:
123                 import json
124                 data = json.dumps(json.loads(data))
125             except ValueError:
126                 raise ClientError('"%s" is not json-formated' % f.name, 1)
127             except SyntaxError:
128                 msg = '"%s" is not a valid hashmap file' % f.name
129                 raise ClientError(msg, 1)
130             f = StringIO(data)
131         else:
132             data = f.read(size) if size else f.read()
133         r = self.object_put(
134             obj,
135             data=data,
136             etag=etag,
137             content_encoding=content_encoding,
138             content_disposition=content_disposition,
139             content_type=content_type,
140             permissions=sharing,
141             public=public,
142             success=201)
143         return r.headers
144
145     def create_object_by_manifestation(
146             self, obj,
147             etag=None,
148             content_encoding=None,
149             content_disposition=None,
150             content_type=None,
151             sharing=None,
152             public=None):
153         """
154         :param obj: (str) remote object path
155
156         :param etag: (str)
157
158         :param content_encoding: (str)
159
160         :param content_disposition: (str)
161
162         :param content_type: (str)
163
164         :param sharing: {'read':[user and/or grp names],
165             'write':[usr and/or grp names]}
166
167         :param public: (bool)
168
169         :returns: (dict) created object metadata
170         """
171         self._assert_container()
172         r = self.object_put(
173             obj,
174             content_length=0,
175             etag=etag,
176             content_encoding=content_encoding,
177             content_disposition=content_disposition,
178             content_type=content_type,
179             permissions=sharing,
180             public=public,
181             manifest='%s/%s' % (self.container, obj))
182         return r.headers
183
184     # upload_* auxiliary methods
185     def _put_block_async(self, data, hash, upload_gen=None):
186         event = SilentEvent(method=self._put_block, data=data, hash=hash)
187         event.start()
188         return event
189
190     def _put_block(self, data, hash):
191         r = self.container_post(
192             update=True,
193             content_type='application/octet-stream',
194             content_length=len(data),
195             data=data,
196             format='json')
197         assert r.json[0] == hash, 'Local hash does not match server'
198
199     def _get_file_block_info(self, fileobj, size=None, cache=None):
200         """
201         :param fileobj: (file descriptor) source
202
203         :param size: (int) size of data to upload from source
204
205         :param cache: (dict) if provided, cache container info response to
206         avoid redundant calls
207         """
208         if isinstance(cache, dict):
209             try:
210                 meta = cache[self.container]
211             except KeyError:
212                 meta = self.get_container_info()
213                 cache[self.container] = meta
214         else:
215             meta = self.get_container_info()
216         blocksize = int(meta['x-container-block-size'])
217         blockhash = meta['x-container-block-hash']
218         size = size if size is not None else fstat(fileobj.fileno()).st_size
219         nblocks = 1 + (size - 1) // blocksize
220         return (blocksize, blockhash, size, nblocks)
221
222     def _create_or_get_missing_hashes(
223             self, obj, json,
224             size=None,
225             format='json',
226             hashmap=True,
227             content_type=None,
228             if_etag_match=None,
229             if_etag_not_match=None,
230             content_encoding=None,
231             content_disposition=None,
232             permissions=None,
233             public=None,
234             success=(201, 409)):
235         r = self.object_put(
236             obj,
237             format='json',
238             hashmap=True,
239             content_type=content_type,
240             json=json,
241             if_etag_match=if_etag_match,
242             if_etag_not_match=if_etag_not_match,
243             content_encoding=content_encoding,
244             content_disposition=content_disposition,
245             permissions=permissions,
246             public=public,
247             success=success)
248         return (None if r.status_code == 201 else r.json), r.headers
249
250     def _calculate_blocks_for_upload(
251             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
252             hash_cb=None):
253         offset = 0
254         if hash_cb:
255             hash_gen = hash_cb(nblocks)
256             hash_gen.next()
257
258         for i in range(nblocks):
259             block = fileobj.read(min(blocksize, size - offset))
260             bytes = len(block)
261             hash = _pithos_hash(block, blockhash)
262             hashes.append(hash)
263             hmap[hash] = (offset, bytes)
264             offset += bytes
265             if hash_cb:
266                 hash_gen.next()
267         msg = 'Failed to calculate uploaded blocks:'
268         ' Offset and object size do not match'
269         assert offset == size, msg
270
271     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
272         """upload missing blocks asynchronously"""
273
274         self._init_thread_limit()
275
276         flying = []
277         failures = []
278         for hash in missing:
279             offset, bytes = hmap[hash]
280             fileobj.seek(offset)
281             data = fileobj.read(bytes)
282             r = self._put_block_async(data, hash, upload_gen)
283             flying.append(r)
284             unfinished = self._watch_thread_limit(flying)
285             for thread in set(flying).difference(unfinished):
286                 if thread.exception:
287                     failures.append(thread)
288                     if isinstance(
289                             thread.exception,
290                             ClientError) and thread.exception.status == 502:
291                         self.POOLSIZE = self._thread_limit
292                 elif thread.isAlive():
293                     flying.append(thread)
294                 elif upload_gen:
295                     try:
296                         upload_gen.next()
297                     except:
298                         pass
299             flying = unfinished
300
301         for thread in flying:
302             thread.join()
303             if thread.exception:
304                 failures.append(thread)
305             elif upload_gen:
306                 try:
307                     upload_gen.next()
308                 except:
309                     pass
310
311         return [failure.kwargs['hash'] for failure in failures]
312
313     def upload_object(
314             self, obj, f,
315             size=None,
316             hash_cb=None,
317             upload_cb=None,
318             etag=None,
319             if_etag_match=None,
320             if_not_exist=None,
321             content_encoding=None,
322             content_disposition=None,
323             content_type=None,
324             sharing=None,
325             public=None,
326             container_info_cache=None):
327         """Upload an object using multiple connections (threads)
328
329         :param obj: (str) remote object path
330
331         :param f: open file descriptor (rb)
332
333         :param hash_cb: optional progress.bar object for calculating hashes
334
335         :param upload_cb: optional progress.bar object for uploading
336
337         :param etag: (str)
338
339         :param if_etag_match: (str) Push that value to if-match header at file
340             creation
341
342         :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
343             it does not exist remotely, otherwise the operation will fail.
344             Involves the case of an object with the same path is created while
345             the object is being uploaded.
346
347         :param content_encoding: (str)
348
349         :param content_disposition: (str)
350
351         :param content_type: (str)
352
353         :param sharing: {'read':[user and/or grp names],
354             'write':[usr and/or grp names]}
355
356         :param public: (bool)
357
358         :param container_info_cache: (dict) if given, avoid redundant calls to
359         server for container info (block size and hash information)
360         """
361         self._assert_container()
362
363         #init
364         block_info = (
365             blocksize, blockhash, size, nblocks) = self._get_file_block_info(
366                 f, size, container_info_cache)
367         (hashes, hmap, offset) = ([], {}, 0)
368         if not content_type:
369             content_type = 'application/octet-stream'
370
371         self._calculate_blocks_for_upload(
372             *block_info,
373             hashes=hashes,
374             hmap=hmap,
375             fileobj=f,
376             hash_cb=hash_cb)
377
378         hashmap = dict(bytes=size, hashes=hashes)
379         missing, obj_headers = self._create_or_get_missing_hashes(
380             obj, hashmap,
381             content_type=content_type,
382             size=size,
383             if_etag_match=if_etag_match,
384             if_etag_not_match='*' if if_not_exist else None,
385             content_encoding=content_encoding,
386             content_disposition=content_disposition,
387             permissions=sharing,
388             public=public)
389
390         if missing is None:
391             return obj_headers
392
393         if upload_cb:
394             upload_gen = upload_cb(len(missing))
395             for i in range(len(missing), len(hashmap['hashes']) + 1):
396                 try:
397                     upload_gen.next()
398                 except:
399                     upload_gen = None
400         else:
401             upload_gen = None
402
403         retries = 7
404         try:
405             while retries:
406                 sendlog.info('%s blocks missing' % len(missing))
407                 num_of_blocks = len(missing)
408                 missing = self._upload_missing_blocks(
409                     missing,
410                     hmap,
411                     f,
412                     upload_gen)
413                 if missing:
414                     if num_of_blocks == len(missing):
415                         retries -= 1
416                     else:
417                         num_of_blocks = len(missing)
418                 else:
419                     break
420             if missing:
421                 raise ClientError(
422                     '%s blocks failed to upload' % len(missing),
423                     status=800)
424         except KeyboardInterrupt:
425             sendlog.info('- - - wait for threads to finish')
426             for thread in activethreads():
427                 thread.join()
428             raise
429
430         r = self.object_put(
431             obj,
432             format='json',
433             hashmap=True,
434             content_type=content_type,
435             if_etag_match=if_etag_match,
436             if_etag_not_match='*' if if_not_exist else None,
437             etag=etag,
438             json=hashmap,
439             permissions=sharing,
440             public=public,
441             success=201)
442         return r.headers
443
444     # download_* auxiliary methods
445     def _get_remote_blocks_info(self, obj, **restargs):
446         #retrieve object hashmap
447         myrange = restargs.pop('data_range', None)
448         hashmap = self.get_object_hashmap(obj, **restargs)
449         restargs['data_range'] = myrange
450         blocksize = int(hashmap['block_size'])
451         blockhash = hashmap['block_hash']
452         total_size = hashmap['bytes']
453         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
454         map_dict = {}
455         for i, h in enumerate(hashmap['hashes']):
456             #  map_dict[h] = i   CHAGE
457             if h in map_dict:
458                 map_dict[h].append(i)
459             else:
460                 map_dict[h] = [i]
461         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
462
463     def _dump_blocks_sync(
464             self, obj, remote_hashes, blocksize, total_size, dst, range,
465             **args):
466         for blockid, blockhash in enumerate(remote_hashes):
467             if blockhash:
468                 start = blocksize * blockid
469                 is_last = start + blocksize > total_size
470                 end = (total_size - 1) if is_last else (start + blocksize - 1)
471                 (start, end) = _range_up(start, end, range)
472                 args['data_range'] = 'bytes=%s-%s' % (start, end)
473                 r = self.object_get(obj, success=(200, 206), **args)
474                 self._cb_next()
475                 dst.write(r.content)
476                 dst.flush()
477
478     def _get_block_async(self, obj, **args):
479         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
480         event.start()
481         return event
482
483     def _hash_from_file(self, fp, start, size, blockhash):
484         fp.seek(start)
485         block = fp.read(size)
486         h = newhashlib(blockhash)
487         h.update(block.strip('\x00'))
488         return hexlify(h.digest())
489
490     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
491         """write the results of a greenleted rest call to a file
492
493         :param offset: the offset of the file up to blocksize
494         - e.g. if the range is 10-100, all blocks will be written to
495         normal_position - 10
496         """
497         for i, (key, g) in enumerate(flying.items()):
498             if g.isAlive():
499                 continue
500             if g.exception:
501                 raise g.exception
502             block = g.value.content
503             for block_start in blockids[key]:
504                 local_file.seek(block_start + offset)
505                 local_file.write(block)
506                 self._cb_next()
507             flying.pop(key)
508             blockids.pop(key)
509         local_file.flush()
510
511     def _dump_blocks_async(
512             self, obj, remote_hashes, blocksize, total_size, local_file,
513             blockhash=None, resume=False, filerange=None, **restargs):
514         file_size = fstat(local_file.fileno()).st_size if resume else 0
515         flying = dict()
516         blockid_dict = dict()
517         offset = 0
518         if filerange is not None:
519             rstart = int(filerange.split('-')[0])
520             offset = rstart if blocksize > rstart else rstart % blocksize
521
522         self._init_thread_limit()
523         for block_hash, blockids in remote_hashes.items():
524             blockids = [blk * blocksize for blk in blockids]
525             unsaved = [blk for blk in blockids if not (
526                 blk < file_size and block_hash == self._hash_from_file(
527                         local_file, blk, blocksize, blockhash))]
528             self._cb_next(len(blockids) - len(unsaved))
529             if unsaved:
530                 key = unsaved[0]
531                 self._watch_thread_limit(flying.values())
532                 self._thread2file(
533                     flying, blockid_dict, local_file, offset,
534                     **restargs)
535                 end = total_size - 1 if key + blocksize > total_size\
536                     else key + blocksize - 1
537                 start, end = _range_up(key, end, filerange)
538                 if start == end:
539                     self._cb_next()
540                     continue
541                 restargs['async_headers'] = {
542                     'Range': 'bytes=%s-%s' % (start, end)}
543                 flying[key] = self._get_block_async(obj, **restargs)
544                 blockid_dict[key] = unsaved
545
546         for thread in flying.values():
547             thread.join()
548         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
549
550     def download_object(
551             self, obj, dst,
552             download_cb=None,
553             version=None,
554             resume=False,
555             range_str=None,
556             if_match=None,
557             if_none_match=None,
558             if_modified_since=None,
559             if_unmodified_since=None):
560         """Download an object (multiple connections, random blocks)
561
562         :param obj: (str) remote object path
563
564         :param dst: open file descriptor (wb+)
565
566         :param download_cb: optional progress.bar object for downloading
567
568         :param version: (str) file version
569
570         :param resume: (bool) if set, preserve already downloaded file parts
571
572         :param range_str: (str) from, to are file positions (int) in bytes
573
574         :param if_match: (str)
575
576         :param if_none_match: (str)
577
578         :param if_modified_since: (str) formated date
579
580         :param if_unmodified_since: (str) formated date"""
581         restargs = dict(
582             version=version,
583             data_range=None if range_str is None else 'bytes=%s' % range_str,
584             if_match=if_match,
585             if_none_match=if_none_match,
586             if_modified_since=if_modified_since,
587             if_unmodified_since=if_unmodified_since)
588
589         (
590             blocksize,
591             blockhash,
592             total_size,
593             hash_list,
594             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
595         assert total_size >= 0
596
597         if download_cb:
598             self.progress_bar_gen = download_cb(len(hash_list))
599             self._cb_next()
600
601         if dst.isatty():
602             self._dump_blocks_sync(
603                 obj,
604                 hash_list,
605                 blocksize,
606                 total_size,
607                 dst,
608                 range_str,
609                 **restargs)
610         else:
611             self._dump_blocks_async(
612                 obj,
613                 remote_hashes,
614                 blocksize,
615                 total_size,
616                 dst,
617                 blockhash,
618                 resume,
619                 range_str,
620                 **restargs)
621             if not range_str:
622                 dst.truncate(total_size)
623
624         self._complete_cb()
625
626     def download_to_string(
627             self, obj,
628             download_cb=None,
629             version=None,
630             range_str=None,
631             if_match=None,
632             if_none_match=None,
633             if_modified_since=None,
634             if_unmodified_since=None):
635         """Download an object to a string (multiple connections)
636
637         :param obj: (str) remote object path
638
639         :param download_cb: optional progress.bar object for downloading
640
641         :param version: (str) file version
642
643         :param range_str: (str) from, to are file positions (int) in bytes
644
645         :param if_match: (str)
646
647         :param if_none_match: (str)
648
649         :param if_modified_since: (str) formated date
650
651         :param if_unmodified_since: (str) formated date
652
653         :returns: (str) the whole object contents
654         """
655         restargs = dict(
656             version=version,
657             data_range=None if range_str is None else 'bytes=%s' % range_str,
658             if_match=if_match,
659             if_none_match=if_none_match,
660             if_modified_since=if_modified_since,
661             if_unmodified_since=if_unmodified_since)
662
663         (
664             blocksize,
665             blockhash,
666             total_size,
667             hash_list,
668             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
669         assert total_size >= 0
670
671         if download_cb:
672             self.progress_bar_gen = download_cb(len(hash_list))
673             self._cb_next()
674
675         ret = ''
676         for blockid, blockhash in enumerate(remote_hashes):
677             start = blocksize * blockid
678             is_last = start + blocksize > total_size
679             end = (total_size - 1) if is_last else (start + blocksize - 1)
680             (start, end) = _range_up(start, end, range_str)
681             if start == end:
682                 continue
683             restargs['data_range'] = 'bytes=%s-%s' % (start, end)
684             r = self.object_get(obj, success=(200, 206), **restargs)
685             ret += r.content
686             self._cb_next()
687
688         self._complete_cb()
689         return ret
690
691     #Command Progress Bar method
692     def _cb_next(self, step=1):
693         if hasattr(self, 'progress_bar_gen'):
694             try:
695                 for i in xrange(step):
696                     self.progress_bar_gen.next()
697             except:
698                 pass
699
700     def _complete_cb(self):
701         while True:
702             try:
703                 self.progress_bar_gen.next()
704             except:
705                 break
706
707     def get_object_hashmap(
708             self, obj,
709             version=None,
710             if_match=None,
711             if_none_match=None,
712             if_modified_since=None,
713             if_unmodified_since=None,
714             data_range=None):
715         """
716         :param obj: (str) remote object path
717
718         :param if_match: (str)
719
720         :param if_none_match: (str)
721
722         :param if_modified_since: (str) formated date
723
724         :param if_unmodified_since: (str) formated date
725
726         :param data_range: (str) from-to where from and to are integers
727             denoting file positions in bytes
728
729         :returns: (list)
730         """
731         try:
732             r = self.object_get(
733                 obj,
734                 hashmap=True,
735                 version=version,
736                 if_etag_match=if_match,
737                 if_etag_not_match=if_none_match,
738                 if_modified_since=if_modified_since,
739                 if_unmodified_since=if_unmodified_since,
740                 data_range=data_range)
741         except ClientError as err:
742             if err.status == 304 or err.status == 412:
743                 return {}
744             raise
745         return r.json
746
747     def set_account_group(self, group, usernames):
748         """
749         :param group: (str)
750
751         :param usernames: (list)
752         """
753         self.account_post(update=True, groups={group: usernames})
754
755     def del_account_group(self, group):
756         """
757         :param group: (str)
758         """
759         self.account_post(update=True, groups={group: []})
760
761     def get_account_info(self, until=None):
762         """
763         :param until: (str) formated date
764
765         :returns: (dict)
766         """
767         r = self.account_head(until=until)
768         if r.status_code == 401:
769             raise ClientError("No authorization", status=401)
770         return r.headers
771
772     def get_account_quota(self):
773         """
774         :returns: (dict)
775         """
776         return filter_in(
777             self.get_account_info(),
778             'X-Account-Policy-Quota',
779             exactMatch=True)
780
781     def get_account_versioning(self):
782         """
783         :returns: (dict)
784         """
785         return filter_in(
786             self.get_account_info(),
787             'X-Account-Policy-Versioning',
788             exactMatch=True)
789
790     def get_account_meta(self, until=None):
791         """
792         :meta until: (str) formated date
793
794         :returns: (dict)
795         """
796         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
797
798     def get_account_group(self):
799         """
800         :returns: (dict)
801         """
802         return filter_in(self.get_account_info(), 'X-Account-Group-')
803
804     def set_account_meta(self, metapairs):
805         """
806         :param metapairs: (dict) {key1:val1, key2:val2, ...}
807         """
808         assert(type(metapairs) is dict)
809         self.account_post(update=True, metadata=metapairs)
810
811     def del_account_meta(self, metakey):
812         """
813         :param metakey: (str) metadatum key
814         """
815         self.account_post(update=True, metadata={metakey: ''})
816
817     """
818     def set_account_quota(self, quota):
819         ""
820         :param quota: (int)
821         ""
822         self.account_post(update=True, quota=quota)
823     """
824
825     def set_account_versioning(self, versioning):
826         """
827         "param versioning: (str)
828         """
829         self.account_post(update=True, versioning=versioning)
830
831     def list_containers(self):
832         """
833         :returns: (dict)
834         """
835         r = self.account_get()
836         return r.json
837
838     def del_container(self, until=None, delimiter=None):
839         """
840         :param until: (str) formated date
841
842         :param delimiter: (str) with / empty container
843
844         :raises ClientError: 404 Container does not exist
845
846         :raises ClientError: 409 Container is not empty
847         """
848         self._assert_container()
849         r = self.container_delete(
850             until=until,
851             delimiter=delimiter,
852             success=(204, 404, 409))
853         if r.status_code == 404:
854             raise ClientError(
855                 'Container "%s" does not exist' % self.container,
856                 r.status_code)
857         elif r.status_code == 409:
858             raise ClientError(
859                 'Container "%s" is not empty' % self.container,
860                 r.status_code)
861
862     def get_container_versioning(self, container=None):
863         """
864         :param container: (str)
865
866         :returns: (dict)
867         """
868         cnt_back_up = self.container
869         try:
870             self.container = container or cnt_back_up
871             return filter_in(
872                 self.get_container_info(),
873                 'X-Container-Policy-Versioning')
874         finally:
875             self.container = cnt_back_up
876
877     def get_container_limit(self, container=None):
878         """
879         :param container: (str)
880
881         :returns: (dict)
882         """
883         cnt_back_up = self.container
884         try:
885             self.container = container or cnt_back_up
886             return filter_in(
887                 self.get_container_info(),
888                 'X-Container-Policy-Quota')
889         finally:
890             self.container = cnt_back_up
891
892     def get_container_info(self, until=None):
893         """
894         :param until: (str) formated date
895
896         :returns: (dict)
897
898         :raises ClientError: 404 Container not found
899         """
900         try:
901             r = self.container_head(until=until)
902         except ClientError as err:
903             err.details.append('for container %s' % self.container)
904             raise err
905         return r.headers
906
907     def get_container_meta(self, until=None):
908         """
909         :param until: (str) formated date
910
911         :returns: (dict)
912         """
913         return filter_in(
914             self.get_container_info(until=until),
915             'X-Container-Meta')
916
917     def get_container_object_meta(self, until=None):
918         """
919         :param until: (str) formated date
920
921         :returns: (dict)
922         """
923         return filter_in(
924             self.get_container_info(until=until),
925             'X-Container-Object-Meta')
926
927     def set_container_meta(self, metapairs):
928         """
929         :param metapairs: (dict) {key1:val1, key2:val2, ...}
930         """
931         assert(type(metapairs) is dict)
932         self.container_post(update=True, metadata=metapairs)
933
934     def del_container_meta(self, metakey):
935         """
936         :param metakey: (str) metadatum key
937         """
938         self.container_post(update=True, metadata={metakey: ''})
939
940     def set_container_limit(self, limit):
941         """
942         :param limit: (int)
943         """
944         self.container_post(update=True, quota=limit)
945
946     def set_container_versioning(self, versioning):
947         """
948         :param versioning: (str)
949         """
950         self.container_post(update=True, versioning=versioning)
951
952     def del_object(self, obj, until=None, delimiter=None):
953         """
954         :param obj: (str) remote object path
955
956         :param until: (str) formated date
957
958         :param delimiter: (str)
959         """
960         self._assert_container()
961         self.object_delete(obj, until=until, delimiter=delimiter)
962
963     def set_object_meta(self, obj, metapairs):
964         """
965         :param obj: (str) remote object path
966
967         :param metapairs: (dict) {key1:val1, key2:val2, ...}
968         """
969         assert(type(metapairs) is dict)
970         self.object_post(obj, update=True, metadata=metapairs)
971
972     def del_object_meta(self, obj, metakey):
973         """
974         :param obj: (str) remote object path
975
976         :param metakey: (str) metadatum key
977         """
978         self.object_post(obj, update=True, metadata={metakey: ''})
979
980     def publish_object(self, obj):
981         """
982         :param obj: (str) remote object path
983
984         :returns: (str) access url
985         """
986         self.object_post(obj, update=True, public=True)
987         info = self.get_object_info(obj)
988         pref, sep, rest = self.base_url.partition('//')
989         base = rest.split('/')[0]
990         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
991
992     def unpublish_object(self, obj):
993         """
994         :param obj: (str) remote object path
995         """
996         self.object_post(obj, update=True, public=False)
997
998     def get_object_info(self, obj, version=None):
999         """
1000         :param obj: (str) remote object path
1001
1002         :param version: (str)
1003
1004         :returns: (dict)
1005         """
1006         try:
1007             r = self.object_head(obj, version=version)
1008             return r.headers
1009         except ClientError as ce:
1010             if ce.status == 404:
1011                 raise ClientError('Object %s not found' % obj, status=404)
1012             raise
1013
1014     def get_object_meta(self, obj, version=None):
1015         """
1016         :param obj: (str) remote object path
1017
1018         :param version: (str)
1019
1020         :returns: (dict)
1021         """
1022         return filter_in(
1023             self.get_object_info(obj, version=version),
1024             'X-Object-Meta')
1025
1026     def get_object_sharing(self, obj):
1027         """
1028         :param obj: (str) remote object path
1029
1030         :returns: (dict)
1031         """
1032         r = filter_in(
1033             self.get_object_info(obj),
1034             'X-Object-Sharing',
1035             exactMatch=True)
1036         reply = {}
1037         if len(r) > 0:
1038             perms = r['x-object-sharing'].split(';')
1039             for perm in perms:
1040                 try:
1041                     perm.index('=')
1042                 except ValueError:
1043                     raise ClientError('Incorrect reply format')
1044                 (key, val) = perm.strip().split('=')
1045                 reply[key] = val
1046         return reply
1047
1048     def set_object_sharing(
1049             self, obj,
1050             read_permition=False, write_permition=False):
1051         """Give read/write permisions to an object.
1052
1053         :param obj: (str) remote object path
1054
1055         :param read_permition: (list - bool) users and user groups that get
1056             read permition for this object - False means all previous read
1057             permissions will be removed
1058
1059         :param write_perimition: (list - bool) of users and user groups to get
1060            write permition for this object - False means all previous write
1061            permissions will be removed
1062         """
1063
1064         perms = dict(read=read_permition or '', write=write_permition or '')
1065         self.object_post(obj, update=True, permissions=perms)
1066
1067     def del_object_sharing(self, obj):
1068         """
1069         :param obj: (str) remote object path
1070         """
1071         self.set_object_sharing(obj)
1072
1073     def append_object(self, obj, source_file, upload_cb=None):
1074         """
1075         :param obj: (str) remote object path
1076
1077         :param source_file: open file descriptor
1078
1079         :param upload_db: progress.bar for uploading
1080         """
1081
1082         self._assert_container()
1083         meta = self.get_container_info()
1084         blocksize = int(meta['x-container-block-size'])
1085         filesize = fstat(source_file.fileno()).st_size
1086         nblocks = 1 + (filesize - 1) // blocksize
1087         offset = 0
1088         if upload_cb:
1089             upload_gen = upload_cb(nblocks)
1090             upload_gen.next()
1091         for i in range(nblocks):
1092             block = source_file.read(min(blocksize, filesize - offset))
1093             offset += len(block)
1094             self.object_post(
1095                 obj,
1096                 update=True,
1097                 content_range='bytes */*',
1098                 content_type='application/octet-stream',
1099                 content_length=len(block),
1100                 data=block)
1101
1102             if upload_cb:
1103                 upload_gen.next()
1104
1105     def truncate_object(self, obj, upto_bytes):
1106         """
1107         :param obj: (str) remote object path
1108
1109         :param upto_bytes: max number of bytes to leave on file
1110         """
1111         self.object_post(
1112             obj,
1113             update=True,
1114             content_range='bytes 0-%s/*' % upto_bytes,
1115             content_type='application/octet-stream',
1116             object_bytes=upto_bytes,
1117             source_object=path4url(self.container, obj))
1118
1119     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1120         """Overwrite a part of an object from local source file
1121
1122         :param obj: (str) remote object path
1123
1124         :param start: (int) position in bytes to start overwriting from
1125
1126         :param end: (int) position in bytes to stop overwriting at
1127
1128         :param source_file: open file descriptor
1129
1130         :param upload_db: progress.bar for uploading
1131         """
1132
1133         r = self.get_object_info(obj)
1134         rf_size = int(r['content-length'])
1135         if rf_size < int(start):
1136             raise ClientError(
1137                 'Range start exceeds file size',
1138                 status=416)
1139         elif rf_size < int(end):
1140             raise ClientError(
1141                 'Range end exceeds file size',
1142                 status=416)
1143         self._assert_container()
1144         meta = self.get_container_info()
1145         blocksize = int(meta['x-container-block-size'])
1146         filesize = fstat(source_file.fileno()).st_size
1147         datasize = int(end) - int(start) + 1
1148         nblocks = 1 + (datasize - 1) // blocksize
1149         offset = 0
1150         if upload_cb:
1151             upload_gen = upload_cb(nblocks)
1152             upload_gen.next()
1153         for i in range(nblocks):
1154             read_size = min(blocksize, filesize - offset, datasize - offset)
1155             block = source_file.read(read_size)
1156             self.object_post(
1157                 obj,
1158                 update=True,
1159                 content_type='application/octet-stream',
1160                 content_length=len(block),
1161                 content_range='bytes %s-%s/*' % (
1162                     start + offset,
1163                     start + offset + len(block) - 1),
1164                 data=block)
1165             offset += len(block)
1166
1167             if upload_cb:
1168                 upload_gen.next()
1169
1170     def copy_object(
1171             self, src_container, src_object, dst_container,
1172             dst_object=None,
1173             source_version=None,
1174             source_account=None,
1175             public=False,
1176             content_type=None,
1177             delimiter=None):
1178         """
1179         :param src_container: (str) source container
1180
1181         :param src_object: (str) source object path
1182
1183         :param dst_container: (str) destination container
1184
1185         :param dst_object: (str) destination object path
1186
1187         :param source_version: (str) source object version
1188
1189         :param source_account: (str) account to copy from
1190
1191         :param public: (bool)
1192
1193         :param content_type: (str)
1194
1195         :param delimiter: (str)
1196         """
1197         self._assert_account()
1198         self.container = dst_container
1199         src_path = path4url(src_container, src_object)
1200         self.object_put(
1201             dst_object or src_object,
1202             success=201,
1203             copy_from=src_path,
1204             content_length=0,
1205             source_version=source_version,
1206             source_account=source_account,
1207             public=public,
1208             content_type=content_type,
1209             delimiter=delimiter)
1210
1211     def move_object(
1212             self, src_container, src_object, dst_container,
1213             dst_object=False,
1214             source_account=None,
1215             source_version=None,
1216             public=False,
1217             content_type=None,
1218             delimiter=None):
1219         """
1220         :param src_container: (str) source container
1221
1222         :param src_object: (str) source object path
1223
1224         :param dst_container: (str) destination container
1225
1226         :param dst_object: (str) destination object path
1227
1228         :param source_account: (str) account to move from
1229
1230         :param source_version: (str) source object version
1231
1232         :param public: (bool)
1233
1234         :param content_type: (str)
1235
1236         :param delimiter: (str)
1237         """
1238         self._assert_account()
1239         self.container = dst_container
1240         dst_object = dst_object or src_object
1241         src_path = path4url(src_container, src_object)
1242         self.object_put(
1243             dst_object,
1244             success=201,
1245             move_from=src_path,
1246             content_length=0,
1247             source_account=source_account,
1248             source_version=source_version,
1249             public=public,
1250             content_type=content_type,
1251             delimiter=delimiter)
1252
1253     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1254         """Get accounts that share with self.account
1255
1256         :param limit: (str)
1257
1258         :param marker: (str)
1259
1260         :returns: (dict)
1261         """
1262         self._assert_account()
1263
1264         self.set_param('format', 'json')
1265         self.set_param('limit', limit, iff=limit is not None)
1266         self.set_param('marker', marker, iff=marker is not None)
1267
1268         path = ''
1269         success = kwargs.pop('success', (200, 204))
1270         r = self.get(path, *args, success=success, **kwargs)
1271         return r.json
1272
1273     def get_object_versionlist(self, obj):
1274         """
1275         :param obj: (str) remote object path
1276
1277         :returns: (list)
1278         """
1279         self._assert_container()
1280         r = self.object_get(obj, format='json', version='list')
1281         return r.json['versions']