Minimize requeests whn dnlding same block
[kamaki] / kamaki / clients / pithos / __init__.py
1 # Copyright 2011-2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 from threading import enumerate as activethreads
35
36 from os import fstat
37 from hashlib import new as newhashlib
38 from time import time
39
40 from binascii import hexlify
41
42 from kamaki.clients import SilentEvent, sendlog
43 from kamaki.clients.pithos.rest_api import PithosRestClient
44 from kamaki.clients.storage import ClientError
45 from kamaki.clients.utils import path4url, filter_in
46 from StringIO import StringIO
47
48
49 def _pithos_hash(block, blockhash):
50     h = newhashlib(blockhash)
51     h.update(block.rstrip('\x00'))
52     return h.hexdigest()
53
54
55 def _range_up(start, end, a_range):
56     if a_range:
57         (rstart, rend) = a_range.split('-')
58         (rstart, rend) = (int(rstart), int(rend))
59         if rstart > end or rend < start:
60             return (0, 0)
61         if rstart > start:
62             start = rstart
63         if rend < end:
64             end = rend
65     return (start, end)
66
67
68 class PithosClient(PithosRestClient):
69     """GRNet Pithos API client"""
70
71     def __init__(self, base_url, token, account=None, container=None):
72         super(PithosClient, self).__init__(base_url, token, account, container)
73
74     def purge_container(self, container=None):
75         """Delete an empty container and destroy associated blocks
76         """
77         cnt_back_up = self.container
78         try:
79             self.container = container or cnt_back_up
80             self.container_delete(until=unicode(time()))
81         finally:
82             self.container = cnt_back_up
83
84     def upload_object_unchunked(
85             self, obj, f,
86             withHashFile=False,
87             size=None,
88             etag=None,
89             content_encoding=None,
90             content_disposition=None,
91             content_type=None,
92             sharing=None,
93             public=None):
94         """
95         :param obj: (str) remote object path
96
97         :param f: open file descriptor
98
99         :param withHashFile: (bool)
100
101         :param size: (int) size of data to upload
102
103         :param etag: (str)
104
105         :param content_encoding: (str)
106
107         :param content_disposition: (str)
108
109         :param content_type: (str)
110
111         :param sharing: {'read':[user and/or grp names],
112             'write':[usr and/or grp names]}
113
114         :param public: (bool)
115         """
116         self._assert_container()
117
118         if withHashFile:
119             data = f.read()
120             try:
121                 import json
122                 data = json.dumps(json.loads(data))
123             except ValueError:
124                 raise ClientError('"%s" is not json-formated' % f.name, 1)
125             except SyntaxError:
126                 msg = '"%s" is not a valid hashmap file' % f.name
127                 raise ClientError(msg, 1)
128             f = StringIO(data)
129         else:
130             data = f.read(size) if size else f.read()
131         self.object_put(
132             obj,
133             data=data,
134             etag=etag,
135             content_encoding=content_encoding,
136             content_disposition=content_disposition,
137             content_type=content_type,
138             permissions=sharing,
139             public=public,
140             success=201)
141
142     def create_object_by_manifestation(
143             self, obj,
144             etag=None,
145             content_encoding=None,
146             content_disposition=None,
147             content_type=None,
148             sharing=None,
149             public=None):
150         """
151         :param obj: (str) remote object path
152
153         :param etag: (str)
154
155         :param content_encoding: (str)
156
157         :param content_disposition: (str)
158
159         :param content_type: (str)
160
161         :param sharing: {'read':[user and/or grp names],
162             'write':[usr and/or grp names]}
163
164         :param public: (bool)
165         """
166         self._assert_container()
167         self.object_put(
168             obj,
169             content_length=0,
170             etag=etag,
171             content_encoding=content_encoding,
172             content_disposition=content_disposition,
173             content_type=content_type,
174             permissions=sharing,
175             public=public,
176             manifest='%s/%s' % (self.container, obj))
177
178     # upload_* auxiliary methods
179     def _put_block_async(self, data, hash, upload_gen=None):
180         event = SilentEvent(method=self._put_block, data=data, hash=hash)
181         event.start()
182         return event
183
184     def _put_block(self, data, hash):
185         r = self.container_post(
186             update=True,
187             content_type='application/octet-stream',
188             content_length=len(data),
189             data=data,
190             format='json')
191         assert r.json[0] == hash, 'Local hash does not match server'
192
193     def _get_file_block_info(self, fileobj, size=None):
194         meta = self.get_container_info()
195         blocksize = int(meta['x-container-block-size'])
196         blockhash = meta['x-container-block-hash']
197         size = size if size is not None else fstat(fileobj.fileno()).st_size
198         nblocks = 1 + (size - 1) // blocksize
199         return (blocksize, blockhash, size, nblocks)
200
201     def _get_missing_hashes(
202             self, obj, json,
203             size=None,
204             format='json',
205             hashmap=True,
206             content_type=None,
207             etag=None,
208             content_encoding=None,
209             content_disposition=None,
210             permissions=None,
211             public=None,
212             success=(201, 409)):
213         r = self.object_put(
214             obj,
215             format='json',
216             hashmap=True,
217             content_type=content_type,
218             json=json,
219             etag=etag,
220             content_encoding=content_encoding,
221             content_disposition=content_disposition,
222             permissions=permissions,
223             public=public,
224             success=success)
225         return None if r.status_code == 201 else r.json
226
227     def _culculate_blocks_for_upload(
228             self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
229             hash_cb=None):
230         offset = 0
231         if hash_cb:
232             hash_gen = hash_cb(nblocks)
233             hash_gen.next()
234
235         for i in range(nblocks):
236             block = fileobj.read(min(blocksize, size - offset))
237             bytes = len(block)
238             hash = _pithos_hash(block, blockhash)
239             hashes.append(hash)
240             hmap[hash] = (offset, bytes)
241             offset += bytes
242             if hash_cb:
243                 hash_gen.next()
244         msg = 'Failed to calculate uploaded blocks:'
245         ' Offset and object size do not match'
246         assert offset == size, msg
247
248     def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
249         """upload missing blocks asynchronously"""
250
251         self._init_thread_limit()
252
253         flying = []
254         failures = []
255         for hash in missing:
256             offset, bytes = hmap[hash]
257             fileobj.seek(offset)
258             data = fileobj.read(bytes)
259             r = self._put_block_async(data, hash, upload_gen)
260             flying.append(r)
261             unfinished = self._watch_thread_limit(flying)
262             for thread in set(flying).difference(unfinished):
263                 if thread.exception:
264                     failures.append(thread)
265                     if isinstance(
266                             thread.exception,
267                             ClientError) and thread.exception.status == 502:
268                         self.POOLSIZE = self._thread_limit
269                 elif thread.isAlive():
270                     flying.append(thread)
271                 elif upload_gen:
272                     try:
273                         upload_gen.next()
274                     except:
275                         pass
276             flying = unfinished
277
278         for thread in flying:
279             thread.join()
280             if thread.exception:
281                 failures.append(thread)
282             elif upload_gen:
283                 try:
284                     upload_gen.next()
285                 except:
286                     pass
287
288         return [failure.kwargs['hash'] for failure in failures]
289
290     def upload_object(
291             self, obj, f,
292             size=None,
293             hash_cb=None,
294             upload_cb=None,
295             etag=None,
296             content_encoding=None,
297             content_disposition=None,
298             content_type=None,
299             sharing=None,
300             public=None):
301         """Upload an object using multiple connections (threads)
302
303         :param obj: (str) remote object path
304
305         :param f: open file descriptor (rb)
306
307         :param hash_cb: optional progress.bar object for calculating hashes
308
309         :param upload_cb: optional progress.bar object for uploading
310
311         :param etag: (str)
312
313         :param content_encoding: (str)
314
315         :param content_disposition: (str)
316
317         :param content_type: (str)
318
319         :param sharing: {'read':[user and/or grp names],
320             'write':[usr and/or grp names]}
321
322         :param public: (bool)
323         """
324         self._assert_container()
325
326         #init
327         block_info = (blocksize, blockhash, size, nblocks) =\
328             self._get_file_block_info(f, size)
329         (hashes, hmap, offset) = ([], {}, 0)
330         if not content_type:
331             content_type = 'application/octet-stream'
332
333         self._culculate_blocks_for_upload(
334             *block_info,
335             hashes=hashes,
336             hmap=hmap,
337             fileobj=f,
338             hash_cb=hash_cb)
339
340         hashmap = dict(bytes=size, hashes=hashes)
341         missing = self._get_missing_hashes(
342             obj, hashmap,
343             content_type=content_type,
344             size=size,
345             etag=etag,
346             content_encoding=content_encoding,
347             content_disposition=content_disposition,
348             permissions=sharing,
349             public=public)
350
351         if missing is None:
352             return
353
354         if upload_cb:
355             upload_gen = upload_cb(len(missing))
356             for i in range(len(missing), len(hashmap['hashes']) + 1):
357                 try:
358                     upload_gen.next()
359                 except:
360                     upload_gen = None
361         else:
362             upload_gen = None
363
364         retries = 7
365         try:
366             while retries:
367                 sendlog.info('%s blocks missing' % len(missing))
368                 num_of_blocks = len(missing)
369                 missing = self._upload_missing_blocks(
370                     missing,
371                     hmap,
372                     f,
373                     upload_gen)
374                 if missing:
375                     if num_of_blocks == len(missing):
376                         retries -= 1
377                     else:
378                         num_of_blocks = len(missing)
379                 else:
380                     break
381             if missing:
382                 raise ClientError(
383                     '%s blocks failed to upload' % len(missing),
384                     status=800)
385         except KeyboardInterrupt:
386             sendlog.info('- - - wait for threads to finish')
387             for thread in activethreads():
388                 thread.join()
389             raise
390
391         self.object_put(
392             obj,
393             format='json',
394             hashmap=True,
395             content_type=content_type,
396             json=hashmap,
397             success=201)
398
399     # download_* auxiliary methods
400     def _get_remote_blocks_info(self, obj, **restargs):
401         #retrieve object hashmap
402         myrange = restargs.pop('data_range', None)
403         hashmap = self.get_object_hashmap(obj, **restargs)
404         restargs['data_range'] = myrange
405         blocksize = int(hashmap['block_size'])
406         blockhash = hashmap['block_hash']
407         total_size = hashmap['bytes']
408         #assert total_size/blocksize + 1 == len(hashmap['hashes'])
409         map_dict = {}
410         for i, h in enumerate(hashmap['hashes']):
411             #  map_dict[h] = i   CHAGE
412             if h in map_dict:
413                 map_dict[h].append(i)
414             else:
415                 map_dict[h] = [i]
416         return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
417
418     def _dump_blocks_sync(
419             self, obj, remote_hashes, blocksize, total_size, dst, range,
420             **args):
421         for blockid, blockhash in enumerate(remote_hashes):
422             if blockhash:
423                 start = blocksize * blockid
424                 is_last = start + blocksize > total_size
425                 end = (total_size - 1) if is_last else (start + blocksize - 1)
426                 (start, end) = _range_up(start, end, range)
427                 args['data_range'] = 'bytes=%s-%s' % (start, end)
428                 r = self.object_get(obj, success=(200, 206), **args)
429                 self._cb_next()
430                 dst.write(r.content)
431                 dst.flush()
432
433     def _get_block_async(self, obj, **args):
434         event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
435         event.start()
436         return event
437
438     def _hash_from_file(self, fp, start, size, blockhash):
439         fp.seek(start)
440         block = fp.read(size)
441         h = newhashlib(blockhash)
442         h.update(block.strip('\x00'))
443         return hexlify(h.digest())
444
445     def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
446         """write the results of a greenleted rest call to a file
447
448         :param offset: the offset of the file up to blocksize
449         - e.g. if the range is 10-100, all blocks will be written to
450         normal_position - 10
451         """
452         for i, (key, g) in enumerate(flying.items()):
453             if g.isAlive():
454                 continue
455             if g.exception:
456                 raise g.exception
457             block = g.value.content
458             for block_start in blockids[key]:
459                 local_file.seek(block_start + offset)
460                 local_file.write(block)
461                 self._cb_next()
462             flying.pop(key)
463             blockids.pop(key)
464         local_file.flush()
465
466     def _dump_blocks_async(
467             self, obj, remote_hashes, blocksize, total_size, local_file,
468             blockhash=None, resume=False, filerange=None, **restargs):
469         file_size = fstat(local_file.fileno()).st_size if resume else 0
470         flying = dict()
471         blockid_dict = dict()
472         offset = 0
473         if filerange is not None:
474             rstart = int(filerange.split('-')[0])
475             offset = rstart if blocksize > rstart else rstart % blocksize
476
477         self._init_thread_limit()
478         for block_hash, blockids in remote_hashes.items():
479             blockids = [blk * blocksize for blk in blockids]
480             unsaved = [blk for blk in blockids if not (
481                 blk < file_size and block_hash == self._hash_from_file(
482                         local_file, blk, blocksize, blockhash))]
483             self._cb_next(len(blockids) - len(unsaved))
484             if unsaved:
485                 key = unsaved[0]
486                 self._watch_thread_limit(flying.values())
487                 self._thread2file(
488                     flying, blockid_dict, local_file, offset,
489                     **restargs)
490                 end = total_size - 1 if key + blocksize > total_size\
491                     else key + blocksize - 1
492                 start, end = _range_up(key, end, filerange)
493                 if start == end:
494                     self._cb_next()
495                     continue
496                 restargs['async_headers'] = {
497                     'Range': 'bytes=%s-%s' % (start, end)}
498                 flying[key] = self._get_block_async(obj, **restargs)
499                 blockid_dict[key] = unsaved
500
501         for thread in flying.values():
502             thread.join()
503         self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
504
505     def download_object(
506             self, obj, dst,
507             download_cb=None,
508             version=None,
509             resume=False,
510             range_str=None,
511             if_match=None,
512             if_none_match=None,
513             if_modified_since=None,
514             if_unmodified_since=None):
515         """Download an object (multiple connections, random blocks)
516
517         :param obj: (str) remote object path
518
519         :param dst: open file descriptor (wb+)
520
521         :param download_cb: optional progress.bar object for downloading
522
523         :param version: (str) file version
524
525         :param resume: (bool) if set, preserve already downloaded file parts
526
527         :param range_str: (str) from, to are file positions (int) in bytes
528
529         :param if_match: (str)
530
531         :param if_none_match: (str)
532
533         :param if_modified_since: (str) formated date
534
535         :param if_unmodified_since: (str) formated date"""
536         restargs = dict(
537             version=version,
538             data_range=None if range_str is None else 'bytes=%s' % range_str,
539             if_match=if_match,
540             if_none_match=if_none_match,
541             if_modified_since=if_modified_since,
542             if_unmodified_since=if_unmodified_since)
543
544         (
545             blocksize,
546             blockhash,
547             total_size,
548             hash_list,
549             remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
550         assert total_size >= 0
551
552         if download_cb:
553             self.progress_bar_gen = download_cb(len(hash_list))
554             self._cb_next()
555
556         if dst.isatty():
557             self._dump_blocks_sync(
558                 obj,
559                 hash_list,
560                 blocksize,
561                 total_size,
562                 dst,
563                 range_str,
564                 **restargs)
565         else:
566             self._dump_blocks_async(
567                 obj,
568                 remote_hashes,
569                 blocksize,
570                 total_size,
571                 dst,
572                 blockhash,
573                 resume,
574                 range_str,
575                 **restargs)
576             if not range_str:
577                 dst.truncate(total_size)
578
579         self._complete_cb()
580
581     #Command Progress Bar method
582     def _cb_next(self, step=1):
583         if hasattr(self, 'progress_bar_gen'):
584             try:
585                 self.progress_bar_gen.next(step)
586             except:
587                 pass
588
589     def _complete_cb(self):
590         while True:
591             try:
592                 self.progress_bar_gen.next(step)
593             except:
594                 break
595
596     def get_object_hashmap(
597             self, obj,
598             version=None,
599             if_match=None,
600             if_none_match=None,
601             if_modified_since=None,
602             if_unmodified_since=None,
603             data_range=None):
604         """
605         :param obj: (str) remote object path
606
607         :param if_match: (str)
608
609         :param if_none_match: (str)
610
611         :param if_modified_since: (str) formated date
612
613         :param if_unmodified_since: (str) formated date
614
615         :param data_range: (str) from-to where from and to are integers
616             denoting file positions in bytes
617
618         :returns: (list)
619         """
620         try:
621             r = self.object_get(
622                 obj,
623                 hashmap=True,
624                 version=version,
625                 if_etag_match=if_match,
626                 if_etag_not_match=if_none_match,
627                 if_modified_since=if_modified_since,
628                 if_unmodified_since=if_unmodified_since,
629                 data_range=data_range)
630         except ClientError as err:
631             if err.status == 304 or err.status == 412:
632                 return {}
633             raise
634         return r.json
635
636     def set_account_group(self, group, usernames):
637         """
638         :param group: (str)
639
640         :param usernames: (list)
641         """
642         self.account_post(update=True, groups={group: usernames})
643
644     def del_account_group(self, group):
645         """
646         :param group: (str)
647         """
648         self.account_post(update=True, groups={group: []})
649
650     def get_account_info(self, until=None):
651         """
652         :param until: (str) formated date
653
654         :returns: (dict)
655         """
656         r = self.account_head(until=until)
657         if r.status_code == 401:
658             raise ClientError("No authorization", status=401)
659         return r.headers
660
661     def get_account_quota(self):
662         """
663         :returns: (dict)
664         """
665         return filter_in(
666             self.get_account_info(),
667             'X-Account-Policy-Quota',
668             exactMatch=True)
669
670     def get_account_versioning(self):
671         """
672         :returns: (dict)
673         """
674         return filter_in(
675             self.get_account_info(),
676             'X-Account-Policy-Versioning',
677             exactMatch=True)
678
679     def get_account_meta(self, until=None):
680         """
681         :meta until: (str) formated date
682
683         :returns: (dict)
684         """
685         return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
686
687     def get_account_group(self):
688         """
689         :returns: (dict)
690         """
691         return filter_in(self.get_account_info(), 'X-Account-Group-')
692
693     def set_account_meta(self, metapairs):
694         """
695         :param metapairs: (dict) {key1:val1, key2:val2, ...}
696         """
697         assert(type(metapairs) is dict)
698         self.account_post(update=True, metadata=metapairs)
699
700     def del_account_meta(self, metakey):
701         """
702         :param metakey: (str) metadatum key
703         """
704         self.account_post(update=True, metadata={metakey: ''})
705
706     def set_account_quota(self, quota):
707         """
708         :param quota: (int)
709         """
710         self.account_post(update=True, quota=quota)
711
712     def set_account_versioning(self, versioning):
713         """
714         "param versioning: (str)
715         """
716         self.account_post(update=True, versioning=versioning)
717
718     def list_containers(self):
719         """
720         :returns: (dict)
721         """
722         r = self.account_get()
723         return r.json
724
725     def del_container(self, until=None, delimiter=None):
726         """
727         :param until: (str) formated date
728
729         :param delimiter: (str) with / empty container
730
731         :raises ClientError: 404 Container does not exist
732
733         :raises ClientError: 409 Container is not empty
734         """
735         self._assert_container()
736         r = self.container_delete(
737             until=until,
738             delimiter=delimiter,
739             success=(204, 404, 409))
740         if r.status_code == 404:
741             raise ClientError(
742                 'Container "%s" does not exist' % self.container,
743                 r.status_code)
744         elif r.status_code == 409:
745             raise ClientError(
746                 'Container "%s" is not empty' % self.container,
747                 r.status_code)
748
749     def get_container_versioning(self, container=None):
750         """
751         :param container: (str)
752
753         :returns: (dict)
754         """
755         cnt_back_up = self.container
756         try:
757             self.container = container or cnt_back_up
758             return filter_in(
759                 self.get_container_info(),
760                 'X-Container-Policy-Versioning')
761         finally:
762             self.container = cnt_back_up
763
764     def get_container_quota(self, container=None):
765         """
766         :param container: (str)
767
768         :returns: (dict)
769         """
770         cnt_back_up = self.container
771         try:
772             self.container = container or cnt_back_up
773             return filter_in(
774                 self.get_container_info(),
775                 'X-Container-Policy-Quota')
776         finally:
777             self.container = cnt_back_up
778
779     def get_container_info(self, until=None):
780         """
781         :param until: (str) formated date
782
783         :returns: (dict)
784
785         :raises ClientError: 404 Container not found
786         """
787         try:
788             r = self.container_head(until=until)
789         except ClientError as err:
790             err.details.append('for container %s' % self.container)
791             raise err
792         return r.headers
793
794     def get_container_meta(self, until=None):
795         """
796         :param until: (str) formated date
797
798         :returns: (dict)
799         """
800         return filter_in(
801             self.get_container_info(until=until),
802             'X-Container-Meta')
803
804     def get_container_object_meta(self, until=None):
805         """
806         :param until: (str) formated date
807
808         :returns: (dict)
809         """
810         return filter_in(
811             self.get_container_info(until=until),
812             'X-Container-Object-Meta')
813
814     def set_container_meta(self, metapairs):
815         """
816         :param metapairs: (dict) {key1:val1, key2:val2, ...}
817         """
818         assert(type(metapairs) is dict)
819         self.container_post(update=True, metadata=metapairs)
820
821     def del_container_meta(self, metakey):
822         """
823         :param metakey: (str) metadatum key
824         """
825         self.container_post(update=True, metadata={metakey: ''})
826
827     def set_container_quota(self, quota):
828         """
829         :param quota: (int)
830         """
831         self.container_post(update=True, quota=quota)
832
833     def set_container_versioning(self, versioning):
834         """
835         :param versioning: (str)
836         """
837         self.container_post(update=True, versioning=versioning)
838
839     def del_object(self, obj, until=None, delimiter=None):
840         """
841         :param obj: (str) remote object path
842
843         :param until: (str) formated date
844
845         :param delimiter: (str)
846         """
847         self._assert_container()
848         self.object_delete(obj, until=until, delimiter=delimiter)
849
850     def set_object_meta(self, obj, metapairs):
851         """
852         :param obj: (str) remote object path
853
854         :param metapairs: (dict) {key1:val1, key2:val2, ...}
855         """
856         assert(type(metapairs) is dict)
857         self.object_post(obj, update=True, metadata=metapairs)
858
859     def del_object_meta(self, obj, metakey):
860         """
861         :param obj: (str) remote object path
862
863         :param metakey: (str) metadatum key
864         """
865         self.object_post(obj, update=True, metadata={metakey: ''})
866
867     def publish_object(self, obj):
868         """
869         :param obj: (str) remote object path
870
871         :returns: (str) access url
872         """
873         self.object_post(obj, update=True, public=True)
874         info = self.get_object_info(obj)
875         pref, sep, rest = self.base_url.partition('//')
876         base = rest.split('/')[0]
877         return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
878
879     def unpublish_object(self, obj):
880         """
881         :param obj: (str) remote object path
882         """
883         self.object_post(obj, update=True, public=False)
884
885     def get_object_info(self, obj, version=None):
886         """
887         :param obj: (str) remote object path
888
889         :param version: (str)
890
891         :returns: (dict)
892         """
893         try:
894             r = self.object_head(obj, version=version)
895             return r.headers
896         except ClientError as ce:
897             if ce.status == 404:
898                 raise ClientError('Object %s not found' % obj, status=404)
899             raise
900
901     def get_object_meta(self, obj, version=None):
902         """
903         :param obj: (str) remote object path
904
905         :param version: (str)
906
907         :returns: (dict)
908         """
909         return filter_in(
910             self.get_object_info(obj, version=version),
911             'X-Object-Meta')
912
913     def get_object_sharing(self, obj):
914         """
915         :param obj: (str) remote object path
916
917         :returns: (dict)
918         """
919         r = filter_in(
920             self.get_object_info(obj),
921             'X-Object-Sharing',
922             exactMatch=True)
923         reply = {}
924         if len(r) > 0:
925             perms = r['x-object-sharing'].split(';')
926             for perm in perms:
927                 try:
928                     perm.index('=')
929                 except ValueError:
930                     raise ClientError('Incorrect reply format')
931                 (key, val) = perm.strip().split('=')
932                 reply[key] = val
933         return reply
934
935     def set_object_sharing(
936             self, obj,
937             read_permition=False, write_permition=False):
938         """Give read/write permisions to an object.
939
940         :param obj: (str) remote object path
941
942         :param read_permition: (list - bool) users and user groups that get
943             read permition for this object - False means all previous read
944             permissions will be removed
945
946         :param write_perimition: (list - bool) of users and user groups to get
947            write permition for this object - False means all previous write
948            permissions will be removed
949         """
950
951         perms = dict(read=read_permition or '', write=write_permition or '')
952         self.object_post(obj, update=True, permissions=perms)
953
954     def del_object_sharing(self, obj):
955         """
956         :param obj: (str) remote object path
957         """
958         self.set_object_sharing(obj)
959
960     def append_object(self, obj, source_file, upload_cb=None):
961         """
962         :param obj: (str) remote object path
963
964         :param source_file: open file descriptor
965
966         :param upload_db: progress.bar for uploading
967         """
968
969         self._assert_container()
970         meta = self.get_container_info()
971         blocksize = int(meta['x-container-block-size'])
972         filesize = fstat(source_file.fileno()).st_size
973         nblocks = 1 + (filesize - 1) // blocksize
974         offset = 0
975         if upload_cb:
976             upload_gen = upload_cb(nblocks)
977             upload_gen.next()
978         for i in range(nblocks):
979             block = source_file.read(min(blocksize, filesize - offset))
980             offset += len(block)
981             self.object_post(
982                 obj,
983                 update=True,
984                 content_range='bytes */*',
985                 content_type='application/octet-stream',
986                 content_length=len(block),
987                 data=block)
988
989             if upload_cb:
990                 upload_gen.next()
991
992     def truncate_object(self, obj, upto_bytes):
993         """
994         :param obj: (str) remote object path
995
996         :param upto_bytes: max number of bytes to leave on file
997         """
998         self.object_post(
999             obj,
1000             update=True,
1001             content_range='bytes 0-%s/*' % upto_bytes,
1002             content_type='application/octet-stream',
1003             object_bytes=upto_bytes,
1004             source_object=path4url(self.container, obj))
1005
1006     def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1007         """Overwrite a part of an object from local source file
1008
1009         :param obj: (str) remote object path
1010
1011         :param start: (int) position in bytes to start overwriting from
1012
1013         :param end: (int) position in bytes to stop overwriting at
1014
1015         :param source_file: open file descriptor
1016
1017         :param upload_db: progress.bar for uploading
1018         """
1019
1020         r = self.get_object_info(obj)
1021         rf_size = int(r['content-length'])
1022         if rf_size < int(start):
1023             raise ClientError(
1024                 'Range start exceeds file size',
1025                 status=416)
1026         elif rf_size < int(end):
1027             raise ClientError(
1028                 'Range end exceeds file size',
1029                 status=416)
1030         self._assert_container()
1031         meta = self.get_container_info()
1032         blocksize = int(meta['x-container-block-size'])
1033         filesize = fstat(source_file.fileno()).st_size
1034         datasize = int(end) - int(start) + 1
1035         nblocks = 1 + (datasize - 1) // blocksize
1036         offset = 0
1037         if upload_cb:
1038             upload_gen = upload_cb(nblocks)
1039             upload_gen.next()
1040         for i in range(nblocks):
1041             read_size = min(blocksize, filesize - offset, datasize - offset)
1042             block = source_file.read(read_size)
1043             self.object_post(
1044                 obj,
1045                 update=True,
1046                 content_type='application/octet-stream',
1047                 content_length=len(block),
1048                 content_range='bytes %s-%s/*' % (
1049                     start + offset,
1050                     start + offset + len(block) - 1),
1051                 data=block)
1052             offset += len(block)
1053
1054             if upload_cb:
1055                 upload_gen.next()
1056
1057     def copy_object(
1058             self, src_container, src_object, dst_container,
1059             dst_object=None,
1060             source_version=None,
1061             source_account=None,
1062             public=False,
1063             content_type=None,
1064             delimiter=None):
1065         """
1066         :param src_container: (str) source container
1067
1068         :param src_object: (str) source object path
1069
1070         :param dst_container: (str) destination container
1071
1072         :param dst_object: (str) destination object path
1073
1074         :param source_version: (str) source object version
1075
1076         :param source_account: (str) account to copy from
1077
1078         :param public: (bool)
1079
1080         :param content_type: (str)
1081
1082         :param delimiter: (str)
1083         """
1084         self._assert_account()
1085         self.container = dst_container
1086         src_path = path4url(src_container, src_object)
1087         self.object_put(
1088             dst_object or src_object,
1089             success=201,
1090             copy_from=src_path,
1091             content_length=0,
1092             source_version=source_version,
1093             source_account=source_account,
1094             public=public,
1095             content_type=content_type,
1096             delimiter=delimiter)
1097
1098     def move_object(
1099             self, src_container, src_object, dst_container,
1100             dst_object=False,
1101             source_account=None,
1102             source_version=None,
1103             public=False,
1104             content_type=None,
1105             delimiter=None):
1106         """
1107         :param src_container: (str) source container
1108
1109         :param src_object: (str) source object path
1110
1111         :param dst_container: (str) destination container
1112
1113         :param dst_object: (str) destination object path
1114
1115         :param source_account: (str) account to move from
1116
1117         :param source_version: (str) source object version
1118
1119         :param public: (bool)
1120
1121         :param content_type: (str)
1122
1123         :param delimiter: (str)
1124         """
1125         self._assert_account()
1126         self.container = dst_container
1127         dst_object = dst_object or src_object
1128         src_path = path4url(src_container, src_object)
1129         self.object_put(
1130             dst_object,
1131             success=201,
1132             move_from=src_path,
1133             content_length=0,
1134             source_account=source_account,
1135             source_version=source_version,
1136             public=public,
1137             content_type=content_type,
1138             delimiter=delimiter)
1139
1140     def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1141         """Get accounts that share with self.account
1142
1143         :param limit: (str)
1144
1145         :param marker: (str)
1146
1147         :returns: (dict)
1148         """
1149         self._assert_account()
1150
1151         self.set_param('format', 'json')
1152         self.set_param('limit', limit, iff=limit is not None)
1153         self.set_param('marker', marker, iff=marker is not None)
1154
1155         path = ''
1156         success = kwargs.pop('success', (200, 204))
1157         r = self.get(path, *args, success=success, **kwargs)
1158         return r.json
1159
1160     def get_object_versionlist(self, obj):
1161         """
1162         :param obj: (str) remote object path
1163
1164         :returns: (list)
1165         """
1166         self._assert_container()
1167         r = self.object_get(obj, format='json', version='list')
1168         return r.json['versions']