Revision 24ff0a35 kamaki/clients/pithos.py

b/kamaki/clients/pithos.py
79 79
        r = self.container_delete(until=unicode(time()))
80 80
        r.release()
81 81

  
82
    def upload_object_unchunked(self, obj, f,
83
        withHashFile=False,
84
        size=None,
85
        etag=None,
86
        content_encoding=None,
87
        content_disposition=None,
88
        content_type=None,
89
        sharing=None,
90
        public=None):
82
    def upload_object_unchunked(
83
            self, obj, f,
84
            withHashFile=False,
85
            size=None,
86
            etag=None,
87
            content_encoding=None,
88
            content_disposition=None,
89
            content_type=None,
90
            sharing=None,
91
            public=None):
91 92
        """
92 93
        :param obj: (str) remote object path
93 94

  
......
118 119
                import json
119 120
                data = json.dumps(json.loads(data))
120 121
            except ValueError:
121
                raise ClientError(message='"%s" is not json-formated' % f.name,
122
                    status=1)
122
                raise ClientError('"%s" is not json-formated' % f.name, 1)
123 123
            except SyntaxError:
124
                raise ClientError(message='"%s" is not a valid hashmap file'\
125
                % f.name, status=1)
124
                msg = '"%s" is not a valid hashmap file' % f.name
125
                raise ClientError(msg, 1)
126 126
            f = StringIO(data)
127 127
        data = f.read(size) if size is not None else f.read()
128
        r = self.object_put(obj,
128
        r = self.object_put(
129
            obj,
129 130
            data=data,
130 131
            etag=etag,
131 132
            content_encoding=content_encoding,
......
136 137
            success=201)
137 138
        r.release()
138 139

  
139
    def create_object_by_manifestation(self, obj,
140
        etag=None,
141
        content_encoding=None,
142
        content_disposition=None,
143
        content_type=None,
144
        sharing=None,
145
        public=None):
140
    def create_object_by_manifestation(
141
            self, obj,
142
            etag=None,
143
            content_encoding=None,
144
            content_disposition=None,
145
            content_type=None,
146
            sharing=None,
147
            public=None):
146 148
        """
147 149
        :param obj: (str) remote object path
148 150

  
......
160 162
        :param public: (bool)
161 163
        """
162 164
        self._assert_container()
163
        r = self.object_put(obj,
165
        r = self.object_put(
166
            obj,
164 167
            content_length=0,
165 168
            etag=etag,
166 169
            content_encoding=content_encoding,
......
181 184
        from random import randint
182 185
        if not randint(0, 7):
183 186
            raise ClientError('BAD GATEWAY STUFF', 503)
184
        r = self.container_post(update=True,
187
        r = self.container_post(
188
            update=True,
185 189
            content_type='application/octet-stream',
186 190
            content_length=len(data),
187 191
            data=data,
......
196 200
        nblocks = 1 + (size - 1) // blocksize
197 201
        return (blocksize, blockhash, size, nblocks)
198 202

  
199
    def _get_missing_hashes(self, obj, json,
200
        size=None,
201
        format='json',
202
        hashmap=True,
203
        content_type=None,
204
        etag=None,
205
        content_encoding=None,
206
        content_disposition=None,
207
        permissions=None,
208
        public=None,
209
        success=(201, 409)):
210
        r = self.object_put(obj,
203
    def _get_missing_hashes(
204
            self, obj, json,
205
            size=None,
206
            format='json',
207
            hashmap=True,
208
            content_type=None,
209
            etag=None,
210
            content_encoding=None,
211
            content_disposition=None,
212
            permissions=None,
213
            public=None,
214
            success=(201, 409)):
215
        r = self.object_put(
216
            obj,
211 217
            format='json',
212 218
            hashmap=True,
213 219
            content_type=content_type,
......
223 229
            return None
224 230
        return r.json
225 231

  
226
    def _caclulate_uploaded_blocks(self,
227
        blocksize,
228
        blockhash,
229
        size,
230
        nblocks,
231
        hashes,
232
        hmap,
233
        fileobj,
232
    def _caclulate_uploaded_blocks(
233
        self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
234 234
        hash_cb=None):
235 235
        offset = 0
236 236
        if hash_cb:
......
247 247
            if hash_cb:
248 248
                hash_gen.next()
249 249
        if offset != size:
250
            assert offset == size, \
251
                   "Failed to calculate uploaded blocks: " \
252
                    "Offset and object size do not match"
250
            msg = 'Failed to calculate uploaded blocks:'
251
            msg += ' Offset and object size do not match'
252
            assert offset == size, msg
253 253

  
254 254
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
255 255
        """upload missing blocks asynchronously"""
......
268 268
            for thread in set(flying).difference(unfinished):
269 269
                if thread.exception:
270 270
                    failures.append(thread)
271
                    if isinstance(thread.exception, ClientError)\
272
                    and thread.exception.status == 502:
273
                        self.POOLSIZE = self._thread_limit
271
                    if isinstance(
272
                        thread.exception,
273
                        ClientError) and thread.exception.status == 502:
274
                            self.POOLSIZE = self._thread_limit
274 275
                elif thread.isAlive():
275 276
                    flying.append(thread)
276 277
                elif upload_gen:
......
292 293

  
293 294
        return [failure.kwargs['hash'] for failure in failures]
294 295

  
295
    def upload_object(self, obj, f,
296
        size=None,
297
        hash_cb=None,
298
        upload_cb=None,
299
        etag=None,
300
        content_encoding=None,
301
        content_disposition=None,
302
        content_type=None,
303
        sharing=None,
304
        public=None):
296
    def upload_object(
297
            self, obj, f,
298
            size=None,
299
            hash_cb=None,
300
            upload_cb=None,
301
            etag=None,
302
            content_encoding=None,
303
            content_disposition=None,
304
            content_type=None,
305
            sharing=None,
306
            public=None):
305 307
        """Upload an object using multiple connections (threads)
306 308

  
307 309
        :param obj: (str) remote object path
......
334 336
        if content_type is None:
335 337
            content_type = 'application/octet-stream'
336 338

  
337
        self._caclulate_uploaded_blocks(*block_info,
339
        self._caclulate_uploaded_blocks(
340
            *block_info,
338 341
            hashes=hashes,
339 342
            hmap=hmap,
340 343
            fileobj=f,
341 344
            hash_cb=hash_cb)
342 345

  
343 346
        hashmap = dict(bytes=size, hashes=hashes)
344
        missing = self._get_missing_hashes(obj, hashmap,
347
        missing = self._get_missing_hashes(
348
            obj, hashmap,
345 349
            content_type=content_type,
346 350
            size=size,
347 351
            etag=etag,
......
414 418
            map_dict[h] = i
415 419
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
416 420

  
417
    def _dump_blocks_sync(self,
418
        obj,
419
        remote_hashes,
420
        blocksize,
421
        total_size,
422
        dst,
423
        range,
424
        **restargs):
421
    def _dump_blocks_sync(
422
            self, obj, remote_hashes, blocksize, total_size, dst, range,
423
            **args):
425 424
        for blockid, blockhash in enumerate(remote_hashes):
426
            if blockhash == None:
427
                continue
428
            start = blocksize * blockid
429
            end = total_size - 1 if start + blocksize > total_size\
430
                else start + blocksize - 1
431
            (start, end) = _range_up(start, end, range)
432
            restargs['data_range'] = 'bytes=%s-%s' % (start, end)
433
            r = self.object_get(obj, success=(200, 206), **restargs)
434
            self._cb_next()
435
            dst.write(r.content)
436
            dst.flush()
425
            if blockhash:
426
                start = blocksize * blockid
427
                is_last = start + blocksize > total_size
428
                end = (total_size - 1) if is_last else (start + blocksize - 1)
429
                (start, end) = _range_up(start, end, range)
430
                args['data_range'] = 'bytes=%s-%s' % (start, end)
431
                r = self.object_get(obj, success=(200, 206), **args)
432
                self._cb_next()
433
                dst.write(r.content)
434
                dst.flush()
437 435

  
438
    def _get_block_async(self, obj, **restargs):
439
        event = SilentEvent(self.object_get,
440
            obj,
441
            success=(200, 206),
442
            **restargs)
436
    def _get_block_async(self, obj, **args):
437
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
443 438
        event.start()
444 439
        return event
445 440

  
......
450 445
        h.update(block.strip('\x00'))
451 446
        return hexlify(h.digest())
452 447

  
453
    def _thread2file(self,
454
        flying,
455
        local_file,
456
        offset=0,
457
        **restargs):
448
    def _thread2file(self, flying, local_file, offset=0, **restargs):
458 449
        """write the results of a greenleted rest call to a file
459 450
        @offset: the offset of the file up to blocksize
460 451
            - e.g. if the range is 10-100, all
......
472 463
        local_file.flush()
473 464
        return finished
474 465

  
475
    def _dump_blocks_async(self,
476
        obj,
477
        remote_hashes,
478
        blocksize,
479
        total_size,
480
        local_file,
481
        blockhash=None,
482
        resume=False,
483
        filerange=None,
484
        **restargs):
485

  
466
    def _dump_blocks_async(
467
            self, obj, remote_hashes, blocksize, total_size, local_file,
468
            blockhash=None, resume=False, filerange=None, **restargs):
486 469
        file_size = fstat(local_file.fileno()).st_size if resume else 0
487 470
        flying = {}
488 471
        finished = []
......
494 477
        self._init_thread_limit()
495 478
        for block_hash, blockid in remote_hashes.items():
496 479
            start = blocksize * blockid
497
            if start < file_size\
498
            and block_hash == self._hash_from_file(
499
                    local_file,
500
                    start,
501
                    blocksize,
502
                    blockhash):
480
            if start < file_size and block_hash == self._hash_from_file(
481
                    local_file, start, blocksize, blockhash):
503 482
                self._cb_next()
504 483
                continue
505 484
            self._watch_thread_limit(flying.values())
......
521 500
            thread.join()
522 501
        finished += self._thread2file(flying, local_file, offset, **restargs)
523 502

  
524
    def download_object(self,
525
        obj,
526
        dst,
527
        download_cb=None,
528
        version=None,
529
        resume=False,
530
        range=None,
531
        if_match=None,
532
        if_none_match=None,
533
        if_modified_since=None,
534
        if_unmodified_since=None):
503
    def download_object(
504
            self, obj, dst,
505
            download_cb=None,
506
            version=None,
507
            resume=False,
508
            range_str=None,
509
            if_match=None,
510
            if_none_match=None,
511
            if_modified_since=None,
512
            if_unmodified_since=None):
535 513
        """Download an object using multiple connections (threads) and
536 514
            writing to random parts of the file
537 515

  
......
545 523

  
546 524
        :param resume: (bool) if set, preserve already downloaded file parts
547 525

  
548
        :param range: (str) from-to where from and to are integers denoting
549
            file positions in bytes
526
        :param range_str: (str) from-to where from and to are integers
527
        denoting file positions in bytes
550 528

  
551 529
        :param if_match: (str)
552 530

  
......
557 535
        :param if_unmodified_since: (str) formated date
558 536
        """
559 537

  
560
        restargs = dict(version=version,
561
            data_range=None if range is None else 'bytes=%s' % range,
538
        restargs = dict(
539
            version=version,
540
            data_range=None if range_str is None else 'bytes=%s' % range_str,
562 541
            if_match=if_match,
563 542
            if_none_match=if_none_match,
564 543
            if_modified_since=if_modified_since,
565 544
            if_unmodified_since=if_unmodified_since)
566 545

  
567
        (blocksize,
546
        (
547
            blocksize,
568 548
            blockhash,
569 549
            total_size,
570 550
            hash_list,
......
576 556
            self._cb_next()
577 557

  
578 558
        if dst.isatty():
579
            self._dump_blocks_sync(obj,
559
            self._dump_blocks_sync(
560
                obj,
580 561
                hash_list,
581 562
                blocksize,
582 563
                total_size,
583 564
                dst,
584
                range,
565
                range_str,
585 566
                **restargs)
586 567
        else:
587
            self._dump_blocks_async(obj,
568
            self._dump_blocks_async(
569
                obj,
588 570
                remote_hashes,
589 571
                blocksize,
590 572
                total_size,
591 573
                dst,
592 574
                blockhash,
593 575
                resume,
594
                range,
576
                range_str,
595 577
                **restargs)
596
            if range is None:
578
            if not range_str:
597 579
                dst.truncate(total_size)
598 580

  
599 581
        self._complete_cb()
......
613 595
            except:
614 596
                break
615 597

  
616
    def get_object_hashmap(self, obj,
617
        version=None,
618
        if_match=None,
619
        if_none_match=None,
620
        if_modified_since=None,
621
        if_unmodified_since=None,
622
        data_range=None):
598
    def get_object_hashmap(
599
            self, obj,
600
            version=None,
601
            if_match=None,
602
            if_none_match=None,
603
            if_modified_since=None,
604
            if_unmodified_since=None,
605
            data_range=None):
623 606
        """
624 607
        :param obj: (str) remote object path
625 608

  
......
637 620
        :returns: (list)
638 621
        """
639 622
        try:
640
            r = self.object_get(obj,
623
            r = self.object_get(
624
                obj,
641 625
                hashmap=True,
642 626
                version=version,
643 627
                if_etag_match=if_match,
......
682 666
        """
683 667
        :returns: (dict)
684 668
        """
685
        return filter_in(self.get_account_info(),
669
        return filter_in(
670
            self.get_account_info(),
686 671
            'X-Account-Policy-Quota',
687 672
            exactMatch=True)
688 673

  
......
690 675
        """
691 676
        :returns: (dict)
692 677
        """
693
        return filter_in(self.get_account_info(),
678
        return filter_in(
679
            self.get_account_info(),
694 680
            'X-Account-Policy-Versioning',
695 681
            exactMatch=True)
696 682

  
......
760 746
            success=(204, 404, 409))
761 747
        r.release()
762 748
        if r.status_code == 404:
763
            raise ClientError('Container "%s" does not exist' % self.container,
749
            raise ClientError(
750
                'Container "%s" does not exist' % self.container,
764 751
                r.status_code)
765 752
        elif r.status_code == 409:
766
            raise ClientError('Container "%s" is not empty' % self.container,
753
            raise ClientError(
754
                'Container "%s" is not empty' % self.container,
767 755
                r.status_code)
768 756

  
769 757
    def get_container_versioning(self, container):
......
773 761
        :returns: (dict)
774 762
        """
775 763
        self.container = container
776
        return filter_in(self.get_container_info(),
764
        return filter_in(
765
            self.get_container_info(),
777 766
            'X-Container-Policy-Versioning')
778 767

  
779 768
    def get_container_quota(self, container):
......
806 795

  
807 796
        :returns: (dict)
808 797
        """
809
        return filter_in(self.get_container_info(until=until),
798
        return filter_in(
799
            self.get_container_info(until=until),
810 800
            'X-Container-Meta')
811 801

  
812 802
    def get_container_object_meta(self, until=None):
......
815 805

  
816 806
        :returns: (dict)
817 807
        """
818
        return filter_in(self.get_container_info(until=until),
808
        return filter_in(
809
            self.get_container_info(until=until),
819 810
            'X-Container-Object-Meta')
820 811

  
821 812
    def set_container_meta(self, metapairs):
......
889 880
        info = self.get_object_info(obj)
890 881
        pref, sep, rest = self.base_url.partition('//')
891 882
        base = rest.split('/')[0]
892
        newurl = path4url('%s%s%s' % (pref, sep, base),
883
        newurl = path4url(
884
            '%s%s%s' % (pref, sep, base),
893 885
            info['x-object-public'])
894 886
        return newurl[1:]
895 887

  
......
924 916

  
925 917
        :returns: (dict)
926 918
        """
927
        return filter_in(self.get_object_info(obj, version=version),
919
        return filter_in(
920
            self.get_object_info(obj, version=version),
928 921
            'X-Object-Meta')
929 922

  
930 923
    def get_object_sharing(self, obj):
......
933 926

  
934 927
        :returns: (dict)
935 928
        """
936
        r = filter_in(self.get_object_info(obj),
929
        r = filter_in(
930
            self.get_object_info(obj),
937 931
            'X-Object-Sharing',
938 932
            exactMatch=True)
939 933
        reply = {}
......
948 942
                reply[key] = val
949 943
        return reply
950 944

  
951
    def set_object_sharing(self, obj,
952
        read_permition=False,
953
        write_permition=False):
945
    def set_object_sharing(
946
        self, obj,
947
        read_permition=False, write_permition=False):
954 948
        """Give read/write permisions to an object.
955 949

  
956 950
        :param obj: (str) remote object path
......
964 958
           permissions will be removed
965 959
        """
966 960

  
967
        perms = dict(read='' if not read_permition else read_permition,
961
        perms = dict(
962
            read='' if not read_permition else read_permition,
968 963
            write='' if not write_permition else write_permition)
969 964
        r = self.object_post(obj, update=True, permissions=perms)
970 965
        r.release()
......
996 991
        for i in range(nblocks):
997 992
            block = source_file.read(min(blocksize, filesize - offset))
998 993
            offset += len(block)
999
            r = self.object_post(obj,
994
            r = self.object_post(
995
                obj,
1000 996
                update=True,
1001 997
                content_range='bytes */*',
1002 998
                content_type='application/octet-stream',
......
1013 1009

  
1014 1010
        :param upto_bytes: max number of bytes to leave on file
1015 1011
        """
1016
        r = self.object_post(obj,
1012
        r = self.object_post(
1013
            obj,
1017 1014
            update=True,
1018 1015
            content_range='bytes 0-%s/*' % upto_bytes,
1019 1016
            content_type='application/octet-stream',
......
1021 1018
            source_object=path4url(self.container, obj))
1022 1019
        r.release()
1023 1020

  
1024
    def overwrite_object(self,
1025
        obj,
1026
        start,
1027
        end,
1028
        source_file,
1021
    def overwrite_object(
1022
        self, obj, start, end, source_file,
1029 1023
        upload_cb=None):
1030 1024
        """Overwrite a part of an object from local source file
1031 1025

  
......
1061 1055
            upload_gen = upload_cb(nblocks)
1062 1056
            upload_gen.next()
1063 1057
        for i in range(nblocks):
1064
            block = source_file.read(min(blocksize,
1065
                filesize - offset,
1066
                datasize - offset))
1067
            r = self.object_post(obj,
1058
            read_size = min(blocksize, filesize - offset, datasize - offset)
1059
            block = source_file.read(read_size)
1060
            r = self.object_post(
1061
                obj,
1068 1062
                update=True,
1069 1063
                content_type='application/octet-stream',
1070 1064
                content_length=len(block),
......
1078 1072
            if upload_cb:
1079 1073
                upload_gen.next()
1080 1074

  
1081
    def copy_object(self, src_container, src_object, dst_container,
1075
    def copy_object(
1076
        self, src_container, src_object, dst_container,
1082 1077
        dst_object=False,
1083 1078
        source_version=None,
1084 1079
        public=False,
......
1105 1100
        self.container = dst_container
1106 1101
        dst_object = dst_object or src_object
1107 1102
        src_path = path4url(src_container, src_object)
1108
        r = self.object_put(dst_object,
1103
        r = self.object_put(
1104
            dst_object,
1109 1105
            success=201,
1110 1106
            copy_from=src_path,
1111 1107
            content_length=0,
......
1115 1111
            delimiter=delimiter)
1116 1112
        r.release()
1117 1113

  
1118
    def move_object(self, src_container, src_object, dst_container,
1119
        dst_object=False,
1120
        source_version=None,
1121
        public=False,
1122
        content_type=None,
1123
        delimiter=None):
1114
    def move_object(
1115
            self, src_container, src_object, dst_container,
1116
            dst_object=False,
1117
            source_version=None,
1118
            public=False,
1119
            content_type=None,
1120
            delimiter=None):
1124 1121
        """
1125 1122
        :param src_container: (str) source container
1126 1123

  
......
1142 1139
        self.container = dst_container
1143 1140
        dst_object = dst_object or src_object
1144 1141
        src_path = path4url(src_container, src_object)
1145
        r = self.object_put(dst_object,
1142
        r = self.object_put(
1143
            dst_object,
1146 1144
            success=201,
1147 1145
            move_from=src_path,
1148 1146
            content_length=0,

Also available in: Unified diff