Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 00336c85

History | View | Annotate | Download (47.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
        server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                raise ClientError(
446
                    '%s blocks failed to upload' % len(missing),
447
                    details=['%s' % thread.exception for thread in missing])
448
        except KeyboardInterrupt:
449
            sendlog.info('- - - wait for threads to finish')
450
            for thread in activethreads():
451
                thread.join()
452
            raise
453

    
454
        r = self.object_put(
455
            obj,
456
            format='json',
457
            hashmap=True,
458
            content_type=content_type,
459
            if_etag_match=if_etag_match,
460
            if_etag_not_match='*' if if_not_exist else None,
461
            etag=etag,
462
            json=hashmap,
463
            permissions=sharing,
464
            public=public,
465
            success=201)
466
        return r.headers
467

    
468
    def upload_from_string(
469
            self, obj, input_str,
470
            hash_cb=None,
471
            upload_cb=None,
472
            etag=None,
473
            if_etag_match=None,
474
            if_not_exist=None,
475
            content_encoding=None,
476
            content_disposition=None,
477
            content_type=None,
478
            sharing=None,
479
            public=None,
480
            container_info_cache=None):
481
        """Upload an object using multiple connections (threads)
482

483
        :param obj: (str) remote object path
484

485
        :param input_str: (str) upload content
486

487
        :param hash_cb: optional progress.bar object for calculating hashes
488

489
        :param upload_cb: optional progress.bar object for uploading
490

491
        :param etag: (str)
492

493
        :param if_etag_match: (str) Push that value to if-match header at file
494
            creation
495

496
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497
            it does not exist remotely, otherwise the operation will fail.
498
            Involves the case of an object with the same path is created while
499
            the object is being uploaded.
500

501
        :param content_encoding: (str)
502

503
        :param content_disposition: (str)
504

505
        :param content_type: (str)
506

507
        :param sharing: {'read':[user and/or grp names],
508
            'write':[usr and/or grp names]}
509

510
        :param public: (bool)
511

512
        :param container_info_cache: (dict) if given, avoid redundant calls to
513
        server for container info (block size and hash information)
514
        """
515
        self._assert_container()
516

    
517
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
518
                fileobj=None, size=len(input_str), cache=container_info_cache)
519
        (hashes, hmap, offset) = ([], {}, 0)
520
        if not content_type:
521
            content_type = 'application/octet-stream'
522

    
523
        num_of_blocks, blockmod = size / blocksize, size % blocksize
524
        num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
525

    
526
        hashes = []
527
        hmap = {}
528
        for blockid in range(num_of_blocks):
529
            start = blockid * blocksize
530
            block = input_str[start: (start + blocksize)]
531
            hashes.append(_pithos_hash(block, blockhash))
532
            hmap[hashes[blockid]] = (start, block)
533

    
534
        hashmap = dict(bytes=size, hashes=hashes)
535
        missing, obj_headers = self._create_object_or_get_missing_hashes(
536
            obj, hashmap,
537
            content_type=content_type,
538
            size=size,
539
            if_etag_match=if_etag_match,
540
            if_etag_not_match='*' if if_not_exist else None,
541
            content_encoding=content_encoding,
542
            content_disposition=content_disposition,
543
            permissions=sharing,
544
            public=public)
545
        if missing is None:
546
            return obj_headers
547
        num_of_missing = len(missing)
548

    
549
        if upload_cb:
550
            self.progress_bar_gen = upload_cb(num_of_blocks)
551
            for i in range(num_of_blocks + 1 - num_of_missing):
552
                self._cb_next()
553

    
554
        tries = 7
555
        old_failures = 0
556
        try:
557
            while tries and missing:
558
                flying = []
559
                failures = []
560
                for hash in missing:
561
                    offset, block = hmap[hash]
562
                    bird = self._put_block_async(block, hash)
563
                    flying.append(bird)
564
                    unfinished = self._watch_thread_limit(flying)
565
                    for thread in set(flying).difference(unfinished):
566
                        if thread.exception:
567
                            failures.append(thread.kwargs['hash'])
568
                        if thread.isAlive():
569
                            flying.append(thread)
570
                        else:
571
                            self._cb_next()
572
                    flying = unfinished
573
                for thread in flying:
574
                    thread.join()
575
                    if thread.exception:
576
                        failures.append(thread.kwargs['hash'])
577
                    self._cb_next()
578
                missing = failures
579
                if missing and len(missing) == old_failures:
580
                    tries -= 1
581
                old_failures = len(missing)
582
            if missing:
583
                raise ClientError(
584
                    '%s blocks failed to upload' % len(missing),
585
                    details=['%s' % thread.exception for thread in missing])
586
        except KeyboardInterrupt:
587
            sendlog.info('- - - wait for threads to finish')
588
            for thread in activethreads():
589
                thread.join()
590
            raise
591

    
592
        r = self.object_put(
593
            obj,
594
            format='json',
595
            hashmap=True,
596
            content_type=content_type,
597
            if_etag_match=if_etag_match,
598
            if_etag_not_match='*' if if_not_exist else None,
599
            etag=etag,
600
            json=hashmap,
601
            permissions=sharing,
602
            public=public,
603
            success=201)
604
        return r.headers
605

    
606
    # download_* auxiliary methods
607
    def _get_remote_blocks_info(self, obj, **restargs):
608
        #retrieve object hashmap
609
        myrange = restargs.pop('data_range', None)
610
        hashmap = self.get_object_hashmap(obj, **restargs)
611
        restargs['data_range'] = myrange
612
        blocksize = int(hashmap['block_size'])
613
        blockhash = hashmap['block_hash']
614
        total_size = hashmap['bytes']
615
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
616
        map_dict = {}
617
        for i, h in enumerate(hashmap['hashes']):
618
            #  map_dict[h] = i   CHAGE
619
            if h in map_dict:
620
                map_dict[h].append(i)
621
            else:
622
                map_dict[h] = [i]
623
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
624

    
625
    def _dump_blocks_sync(
626
            self, obj, remote_hashes, blocksize, total_size, dst, range,
627
            **args):
628
        for blockid, blockhash in enumerate(remote_hashes):
629
            if blockhash:
630
                start = blocksize * blockid
631
                is_last = start + blocksize > total_size
632
                end = (total_size - 1) if is_last else (start + blocksize - 1)
633
                (start, end) = _range_up(start, end, range)
634
                args['data_range'] = 'bytes=%s-%s' % (start, end)
635
                r = self.object_get(obj, success=(200, 206), **args)
636
                self._cb_next()
637
                dst.write(r.content)
638
                dst.flush()
639

    
640
    def _get_block_async(self, obj, **args):
641
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
642
        event.start()
643
        return event
644

    
645
    def _hash_from_file(self, fp, start, size, blockhash):
646
        fp.seek(start)
647
        block = fp.read(size)
648
        h = newhashlib(blockhash)
649
        h.update(block.strip('\x00'))
650
        return hexlify(h.digest())
651

    
652
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
653
        """write the results of a greenleted rest call to a file
654

655
        :param offset: the offset of the file up to blocksize
656
        - e.g. if the range is 10-100, all blocks will be written to
657
        normal_position - 10
658
        """
659
        for key, g in flying.items():
660
            if g.isAlive():
661
                continue
662
            if g.exception:
663
                raise g.exception
664
            block = g.value.content
665
            for block_start in blockids[key]:
666
                local_file.seek(block_start + offset)
667
                local_file.write(block)
668
                self._cb_next()
669
            flying.pop(key)
670
            blockids.pop(key)
671
        local_file.flush()
672

    
673
    def _dump_blocks_async(
674
            self, obj, remote_hashes, blocksize, total_size, local_file,
675
            blockhash=None, resume=False, filerange=None, **restargs):
676
        file_size = fstat(local_file.fileno()).st_size if resume else 0
677
        flying = dict()
678
        blockid_dict = dict()
679
        offset = 0
680
        if filerange is not None:
681
            rstart = int(filerange.split('-')[0])
682
            offset = rstart if blocksize > rstart else rstart % blocksize
683

    
684
        self._init_thread_limit()
685
        for block_hash, blockids in remote_hashes.items():
686
            blockids = [blk * blocksize for blk in blockids]
687
            unsaved = [blk for blk in blockids if not (
688
                blk < file_size and block_hash == self._hash_from_file(
689
                        local_file, blk, blocksize, blockhash))]
690
            self._cb_next(len(blockids) - len(unsaved))
691
            if unsaved:
692
                key = unsaved[0]
693
                self._watch_thread_limit(flying.values())
694
                self._thread2file(
695
                    flying, blockid_dict, local_file, offset,
696
                    **restargs)
697
                end = total_size - 1 if (
698
                    key + blocksize > total_size) else key + blocksize - 1
699
                start, end = _range_up(key, end, filerange)
700
                if start == end:
701
                    self._cb_next()
702
                    continue
703
                restargs['async_headers'] = {
704
                    'Range': 'bytes=%s-%s' % (start, end)}
705
                flying[key] = self._get_block_async(obj, **restargs)
706
                blockid_dict[key] = unsaved
707

    
708
        for thread in flying.values():
709
            thread.join()
710
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
711

    
712
    def download_object(
713
            self, obj, dst,
714
            download_cb=None,
715
            version=None,
716
            resume=False,
717
            range_str=None,
718
            if_match=None,
719
            if_none_match=None,
720
            if_modified_since=None,
721
            if_unmodified_since=None):
722
        """Download an object (multiple connections, random blocks)
723

724
        :param obj: (str) remote object path
725

726
        :param dst: open file descriptor (wb+)
727

728
        :param download_cb: optional progress.bar object for downloading
729

730
        :param version: (str) file version
731

732
        :param resume: (bool) if set, preserve already downloaded file parts
733

734
        :param range_str: (str) from, to are file positions (int) in bytes
735

736
        :param if_match: (str)
737

738
        :param if_none_match: (str)
739

740
        :param if_modified_since: (str) formated date
741

742
        :param if_unmodified_since: (str) formated date"""
743
        restargs = dict(
744
            version=version,
745
            data_range=None if range_str is None else 'bytes=%s' % range_str,
746
            if_match=if_match,
747
            if_none_match=if_none_match,
748
            if_modified_since=if_modified_since,
749
            if_unmodified_since=if_unmodified_since)
750

    
751
        (
752
            blocksize,
753
            blockhash,
754
            total_size,
755
            hash_list,
756
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
757
        assert total_size >= 0
758

    
759
        if download_cb:
760
            self.progress_bar_gen = download_cb(len(hash_list))
761
            self._cb_next()
762

    
763
        if dst.isatty():
764
            self._dump_blocks_sync(
765
                obj,
766
                hash_list,
767
                blocksize,
768
                total_size,
769
                dst,
770
                range_str,
771
                **restargs)
772
        else:
773
            self._dump_blocks_async(
774
                obj,
775
                remote_hashes,
776
                blocksize,
777
                total_size,
778
                dst,
779
                blockhash,
780
                resume,
781
                range_str,
782
                **restargs)
783
            if not range_str:
784
                dst.truncate(total_size)
785

    
786
        self._complete_cb()
787

    
788
    def download_to_string(
789
            self, obj,
790
            download_cb=None,
791
            version=None,
792
            range_str=None,
793
            if_match=None,
794
            if_none_match=None,
795
            if_modified_since=None,
796
            if_unmodified_since=None):
797
        """Download an object to a string (multiple connections). This method
798
        uses threads for http requests, but stores all content in memory.
799

800
        :param obj: (str) remote object path
801

802
        :param download_cb: optional progress.bar object for downloading
803

804
        :param version: (str) file version
805

806
        :param range_str: (str) from, to are file positions (int) in bytes
807

808
        :param if_match: (str)
809

810
        :param if_none_match: (str)
811

812
        :param if_modified_since: (str) formated date
813

814
        :param if_unmodified_since: (str) formated date
815

816
        :returns: (str) the whole object contents
817
        """
818
        restargs = dict(
819
            version=version,
820
            data_range=None if range_str is None else 'bytes=%s' % range_str,
821
            if_match=if_match,
822
            if_none_match=if_none_match,
823
            if_modified_since=if_modified_since,
824
            if_unmodified_since=if_unmodified_since)
825

    
826
        (
827
            blocksize,
828
            blockhash,
829
            total_size,
830
            hash_list,
831
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
832
        assert total_size >= 0
833

    
834
        if download_cb:
835
            self.progress_bar_gen = download_cb(len(hash_list))
836
            self._cb_next()
837

    
838
        num_of_blocks = len(remote_hashes)
839
        ret = [''] * num_of_blocks
840
        self._init_thread_limit()
841
        flying = dict()
842
        try:
843
            for blockid, blockhash in enumerate(remote_hashes):
844
                start = blocksize * blockid
845
                is_last = start + blocksize > total_size
846
                end = (total_size - 1) if is_last else (start + blocksize - 1)
847
                (start, end) = _range_up(start, end, range_str)
848
                if start < end:
849
                    self._watch_thread_limit(flying.values())
850
                    flying[blockid] = self._get_block_async(obj, **restargs)
851
                for runid, thread in flying.items():
852
                    if (blockid + 1) == num_of_blocks:
853
                        thread.join()
854
                    elif thread.isAlive():
855
                        continue
856
                    if thread.exception:
857
                        raise thread.exception
858
                    ret[runid] = thread.value.content
859
                    self._cb_next()
860
                    flying.pop(runid)
861
            return ''.join(ret)
862
        except KeyboardInterrupt:
863
            sendlog.info('- - - wait for threads to finish')
864
            for thread in activethreads():
865
                thread.join()
866

    
867
    #Command Progress Bar method
868
    def _cb_next(self, step=1):
869
        if hasattr(self, 'progress_bar_gen'):
870
            try:
871
                for i in xrange(step):
872
                    self.progress_bar_gen.next()
873
            except:
874
                pass
875

    
876
    def _complete_cb(self):
877
        while True:
878
            try:
879
                self.progress_bar_gen.next()
880
            except:
881
                break
882

    
883
    def get_object_hashmap(
884
            self, obj,
885
            version=None,
886
            if_match=None,
887
            if_none_match=None,
888
            if_modified_since=None,
889
            if_unmodified_since=None,
890
            data_range=None):
891
        """
892
        :param obj: (str) remote object path
893

894
        :param if_match: (str)
895

896
        :param if_none_match: (str)
897

898
        :param if_modified_since: (str) formated date
899

900
        :param if_unmodified_since: (str) formated date
901

902
        :param data_range: (str) from-to where from and to are integers
903
            denoting file positions in bytes
904

905
        :returns: (list)
906
        """
907
        try:
908
            r = self.object_get(
909
                obj,
910
                hashmap=True,
911
                version=version,
912
                if_etag_match=if_match,
913
                if_etag_not_match=if_none_match,
914
                if_modified_since=if_modified_since,
915
                if_unmodified_since=if_unmodified_since,
916
                data_range=data_range)
917
        except ClientError as err:
918
            if err.status == 304 or err.status == 412:
919
                return {}
920
            raise
921
        return r.json
922

    
923
    def set_account_group(self, group, usernames):
924
        """
925
        :param group: (str)
926

927
        :param usernames: (list)
928
        """
929
        r = self.account_post(update=True, groups={group: usernames})
930
        return r
931

    
932
    def del_account_group(self, group):
933
        """
934
        :param group: (str)
935
        """
936
        self.account_post(update=True, groups={group: []})
937

    
938
    def get_account_info(self, until=None):
939
        """
940
        :param until: (str) formated date
941

942
        :returns: (dict)
943
        """
944
        r = self.account_head(until=until)
945
        if r.status_code == 401:
946
            raise ClientError("No authorization", status=401)
947
        return r.headers
948

    
949
    def get_account_quota(self):
950
        """
951
        :returns: (dict)
952
        """
953
        return filter_in(
954
            self.get_account_info(),
955
            'X-Account-Policy-Quota',
956
            exactMatch=True)
957

    
958
    def get_account_versioning(self):
959
        """
960
        :returns: (dict)
961
        """
962
        return filter_in(
963
            self.get_account_info(),
964
            'X-Account-Policy-Versioning',
965
            exactMatch=True)
966

    
967
    def get_account_meta(self, until=None):
968
        """
969
        :meta until: (str) formated date
970

971
        :returns: (dict)
972
        """
973
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
974

    
975
    def get_account_group(self):
976
        """
977
        :returns: (dict)
978
        """
979
        return filter_in(self.get_account_info(), 'X-Account-Group-')
980

    
981
    def set_account_meta(self, metapairs):
982
        """
983
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
984
        """
985
        assert(type(metapairs) is dict)
986
        r = self.account_post(update=True, metadata=metapairs)
987
        return r.headers
988

    
989
    def del_account_meta(self, metakey):
990
        """
991
        :param metakey: (str) metadatum key
992
        """
993
        r = self.account_post(update=True, metadata={metakey: ''})
994
        return r.headers
995

    
996
    """
997
    def set_account_quota(self, quota):
998
        ""
999
        :param quota: (int)
1000
        ""
1001
        self.account_post(update=True, quota=quota)
1002
    """
1003

    
1004
    def set_account_versioning(self, versioning):
1005
        """
1006
        "param versioning: (str)
1007
        """
1008
        r = self.account_post(update=True, versioning=versioning)
1009
        return r.headers
1010

    
1011
    def list_containers(self):
1012
        """
1013
        :returns: (dict)
1014
        """
1015
        r = self.account_get()
1016
        return r.json
1017

    
1018
    def del_container(self, until=None, delimiter=None):
1019
        """
1020
        :param until: (str) formated date
1021

1022
        :param delimiter: (str) with / empty container
1023

1024
        :raises ClientError: 404 Container does not exist
1025

1026
        :raises ClientError: 409 Container is not empty
1027
        """
1028
        self._assert_container()
1029
        r = self.container_delete(
1030
            until=until,
1031
            delimiter=delimiter,
1032
            success=(204, 404, 409))
1033
        if r.status_code == 404:
1034
            raise ClientError(
1035
                'Container "%s" does not exist' % self.container,
1036
                r.status_code)
1037
        elif r.status_code == 409:
1038
            raise ClientError(
1039
                'Container "%s" is not empty' % self.container,
1040
                r.status_code)
1041
        return r.headers
1042

    
1043
    def get_container_versioning(self, container=None):
1044
        """
1045
        :param container: (str)
1046

1047
        :returns: (dict)
1048
        """
1049
        cnt_back_up = self.container
1050
        try:
1051
            self.container = container or cnt_back_up
1052
            return filter_in(
1053
                self.get_container_info(),
1054
                'X-Container-Policy-Versioning')
1055
        finally:
1056
            self.container = cnt_back_up
1057

    
1058
    def get_container_limit(self, container=None):
1059
        """
1060
        :param container: (str)
1061

1062
        :returns: (dict)
1063
        """
1064
        cnt_back_up = self.container
1065
        try:
1066
            self.container = container or cnt_back_up
1067
            return filter_in(
1068
                self.get_container_info(),
1069
                'X-Container-Policy-Quota')
1070
        finally:
1071
            self.container = cnt_back_up
1072

    
1073
    def get_container_info(self, until=None):
1074
        """
1075
        :param until: (str) formated date
1076

1077
        :returns: (dict)
1078

1079
        :raises ClientError: 404 Container not found
1080
        """
1081
        try:
1082
            r = self.container_head(until=until)
1083
        except ClientError as err:
1084
            err.details.append('for container %s' % self.container)
1085
            raise err
1086
        return r.headers
1087

    
1088
    def get_container_meta(self, until=None):
1089
        """
1090
        :param until: (str) formated date
1091

1092
        :returns: (dict)
1093
        """
1094
        return filter_in(
1095
            self.get_container_info(until=until),
1096
            'X-Container-Meta')
1097

    
1098
    def get_container_object_meta(self, until=None):
1099
        """
1100
        :param until: (str) formated date
1101

1102
        :returns: (dict)
1103
        """
1104
        return filter_in(
1105
            self.get_container_info(until=until),
1106
            'X-Container-Object-Meta')
1107

    
1108
    def set_container_meta(self, metapairs):
1109
        """
1110
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111
        """
1112
        assert(type(metapairs) is dict)
1113
        r = self.container_post(update=True, metadata=metapairs)
1114
        return r.headers
1115

    
1116
    def del_container_meta(self, metakey):
1117
        """
1118
        :param metakey: (str) metadatum key
1119
        """
1120
        r = self.container_post(update=True, metadata={metakey: ''})
1121
        return r.headers
1122

    
1123
    def set_container_limit(self, limit):
1124
        """
1125
        :param limit: (int)
1126
        """
1127
        r = self.container_post(update=True, quota=limit)
1128
        return r.headers
1129

    
1130
    def set_container_versioning(self, versioning):
1131
        """
1132
        :param versioning: (str)
1133
        """
1134
        r = self.container_post(update=True, versioning=versioning)
1135
        return r.headers
1136

    
1137
    def del_object(self, obj, until=None, delimiter=None):
1138
        """
1139
        :param obj: (str) remote object path
1140

1141
        :param until: (str) formated date
1142

1143
        :param delimiter: (str)
1144
        """
1145
        self._assert_container()
1146
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1147
        return r.headers
1148

    
1149
    def set_object_meta(self, obj, metapairs):
1150
        """
1151
        :param obj: (str) remote object path
1152

1153
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1154
        """
1155
        assert(type(metapairs) is dict)
1156
        r = self.object_post(obj, update=True, metadata=metapairs)
1157
        return r.headers
1158

    
1159
    def del_object_meta(self, obj, metakey):
1160
        """
1161
        :param obj: (str) remote object path
1162

1163
        :param metakey: (str) metadatum key
1164
        """
1165
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1166
        return r.headers
1167

    
1168
    def publish_object(self, obj):
1169
        """
1170
        :param obj: (str) remote object path
1171

1172
        :returns: (str) access url
1173
        """
1174
        self.object_post(obj, update=True, public=True)
1175
        info = self.get_object_info(obj)
1176
        pref, sep, rest = self.base_url.partition('//')
1177
        base = rest.split('/')[0]
1178
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1179

    
1180
    def unpublish_object(self, obj):
1181
        """
1182
        :param obj: (str) remote object path
1183
        """
1184
        r = self.object_post(obj, update=True, public=False)
1185
        return r.headers
1186

    
1187
    def get_object_info(self, obj, version=None):
1188
        """
1189
        :param obj: (str) remote object path
1190

1191
        :param version: (str)
1192

1193
        :returns: (dict)
1194
        """
1195
        try:
1196
            r = self.object_head(obj, version=version)
1197
            return r.headers
1198
        except ClientError as ce:
1199
            if ce.status == 404:
1200
                raise ClientError('Object %s not found' % obj, status=404)
1201
            raise
1202

    
1203
    def get_object_meta(self, obj, version=None):
1204
        """
1205
        :param obj: (str) remote object path
1206

1207
        :param version: (str)
1208

1209
        :returns: (dict)
1210
        """
1211
        return filter_in(
1212
            self.get_object_info(obj, version=version),
1213
            'X-Object-Meta')
1214

    
1215
    def get_object_sharing(self, obj):
1216
        """
1217
        :param obj: (str) remote object path
1218

1219
        :returns: (dict)
1220
        """
1221
        r = filter_in(
1222
            self.get_object_info(obj),
1223
            'X-Object-Sharing',
1224
            exactMatch=True)
1225
        reply = {}
1226
        if len(r) > 0:
1227
            perms = r['x-object-sharing'].split(';')
1228
            for perm in perms:
1229
                try:
1230
                    perm.index('=')
1231
                except ValueError:
1232
                    raise ClientError('Incorrect reply format')
1233
                (key, val) = perm.strip().split('=')
1234
                reply[key] = val
1235
        return reply
1236

    
1237
    def set_object_sharing(
1238
            self, obj,
1239
            read_permission=False, write_permission=False):
1240
        """Give read/write permisions to an object.
1241

1242
        :param obj: (str) remote object path
1243

1244
        :param read_permission: (list - bool) users and user groups that get
1245
            read permission for this object - False means all previous read
1246
            permissions will be removed
1247

1248
        :param write_permission: (list - bool) of users and user groups to get
1249
           write permission for this object - False means all previous write
1250
           permissions will be removed
1251

1252
        :returns: (dict) response headers
1253
        """
1254

    
1255
        perms = dict(read=read_permission or '', write=write_permission or '')
1256
        r = self.object_post(obj, update=True, permissions=perms)
1257
        return r.headers
1258

    
1259
    def del_object_sharing(self, obj):
1260
        """
1261
        :param obj: (str) remote object path
1262
        """
1263
        return self.set_object_sharing(obj)
1264

    
1265
    def append_object(self, obj, source_file, upload_cb=None):
1266
        """
1267
        :param obj: (str) remote object path
1268

1269
        :param source_file: open file descriptor
1270

1271
        :param upload_db: progress.bar for uploading
1272
        """
1273

    
1274
        self._assert_container()
1275
        meta = self.get_container_info()
1276
        blocksize = int(meta['x-container-block-size'])
1277
        filesize = fstat(source_file.fileno()).st_size
1278
        nblocks = 1 + (filesize - 1) // blocksize
1279
        offset = 0
1280
        headers = {}
1281
        if upload_cb:
1282
            self.progress_bar_gen = upload_cb(nblocks)
1283
            self._cb_next()
1284
        flying = {}
1285
        self._init_thread_limit()
1286
        try:
1287
            for i in range(nblocks):
1288
                block = source_file.read(min(blocksize, filesize - offset))
1289
                offset += len(block)
1290

    
1291
                self._watch_thread_limit(flying.values())
1292
                unfinished = {}
1293
                flying[i] = SilentEvent(
1294
                    method=self.object_post,
1295
                    obj=obj,
1296
                    update=True,
1297
                    content_range='bytes */*',
1298
                    content_type='application/octet-stream',
1299
                    content_length=len(block),
1300
                    data=block)
1301
                flying[i].start()
1302

    
1303
                for key, thread in flying.items():
1304
                    if thread.isAlive():
1305
                        if i < nblocks:
1306
                            unfinished[key] = thread
1307
                            continue
1308
                        thread.join()
1309
                    if thread.exception:
1310
                        raise thread.exception
1311
                    headers[key] = thread.value.headers
1312
                    self._cb_next()
1313
                flying = unfinished
1314
        except KeyboardInterrupt:
1315
            sendlog.info('- - - wait for threads to finish')
1316
            for thread in activethreads():
1317
                thread.join()
1318
        return headers.values()
1319

    
1320
    def truncate_object(self, obj, upto_bytes):
1321
        """
1322
        :param obj: (str) remote object path
1323

1324
        :param upto_bytes: max number of bytes to leave on file
1325

1326
        :returns: (dict) response headers
1327
        """
1328
        r = self.object_post(
1329
            obj,
1330
            update=True,
1331
            content_range='bytes 0-%s/*' % upto_bytes,
1332
            content_type='application/octet-stream',
1333
            object_bytes=upto_bytes,
1334
            source_object=path4url(self.container, obj))
1335
        return r.headers
1336

    
1337
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1338
        """Overwrite a part of an object from local source file
1339

1340
        :param obj: (str) remote object path
1341

1342
        :param start: (int) position in bytes to start overwriting from
1343

1344
        :param end: (int) position in bytes to stop overwriting at
1345

1346
        :param source_file: open file descriptor
1347

1348
        :param upload_db: progress.bar for uploading
1349
        """
1350

    
1351
        r = self.get_object_info(obj)
1352
        rf_size = int(r['content-length'])
1353
        if rf_size < int(start):
1354
            raise ClientError(
1355
                'Range start exceeds file size',
1356
                status=416)
1357
        elif rf_size < int(end):
1358
            raise ClientError(
1359
                'Range end exceeds file size',
1360
                status=416)
1361
        self._assert_container()
1362
        meta = self.get_container_info()
1363
        blocksize = int(meta['x-container-block-size'])
1364
        filesize = fstat(source_file.fileno()).st_size
1365
        datasize = int(end) - int(start) + 1
1366
        nblocks = 1 + (datasize - 1) // blocksize
1367
        offset = 0
1368
        if upload_cb:
1369
            self.progress_bar_gen = upload_cb(nblocks)
1370
            self._cb_next()
1371
        headers = []
1372
        for i in range(nblocks):
1373
            read_size = min(blocksize, filesize - offset, datasize - offset)
1374
            block = source_file.read(read_size)
1375
            r = self.object_post(
1376
                obj,
1377
                update=True,
1378
                content_type='application/octet-stream',
1379
                content_length=len(block),
1380
                content_range='bytes %s-%s/*' % (
1381
                    start + offset,
1382
                    start + offset + len(block) - 1),
1383
                data=block)
1384
            headers.append(dict(r.headers))
1385
            offset += len(block)
1386

    
1387
            self._cb_next
1388
        return headers
1389

    
1390
    def copy_object(
1391
            self, src_container, src_object, dst_container,
1392
            dst_object=None,
1393
            source_version=None,
1394
            source_account=None,
1395
            public=False,
1396
            content_type=None,
1397
            delimiter=None):
1398
        """
1399
        :param src_container: (str) source container
1400

1401
        :param src_object: (str) source object path
1402

1403
        :param dst_container: (str) destination container
1404

1405
        :param dst_object: (str) destination object path
1406

1407
        :param source_version: (str) source object version
1408

1409
        :param source_account: (str) account to copy from
1410

1411
        :param public: (bool)
1412

1413
        :param content_type: (str)
1414

1415
        :param delimiter: (str)
1416

1417
        :returns: (dict) response headers
1418
        """
1419
        self._assert_account()
1420
        self.container = dst_container
1421
        src_path = path4url(src_container, src_object)
1422
        r = self.object_put(
1423
            dst_object or src_object,
1424
            success=201,
1425
            copy_from=src_path,
1426
            content_length=0,
1427
            source_version=source_version,
1428
            source_account=source_account,
1429
            public=public,
1430
            content_type=content_type,
1431
            delimiter=delimiter)
1432
        return r.headers
1433

    
1434
    def move_object(
1435
            self, src_container, src_object, dst_container,
1436
            dst_object=False,
1437
            source_account=None,
1438
            source_version=None,
1439
            public=False,
1440
            content_type=None,
1441
            delimiter=None):
1442
        """
1443
        :param src_container: (str) source container
1444

1445
        :param src_object: (str) source object path
1446

1447
        :param dst_container: (str) destination container
1448

1449
        :param dst_object: (str) destination object path
1450

1451
        :param source_account: (str) account to move from
1452

1453
        :param source_version: (str) source object version
1454

1455
        :param public: (bool)
1456

1457
        :param content_type: (str)
1458

1459
        :param delimiter: (str)
1460

1461
        :returns: (dict) response headers
1462
        """
1463
        self._assert_account()
1464
        self.container = dst_container
1465
        dst_object = dst_object or src_object
1466
        src_path = path4url(src_container, src_object)
1467
        r = self.object_put(
1468
            dst_object,
1469
            success=201,
1470
            move_from=src_path,
1471
            content_length=0,
1472
            source_account=source_account,
1473
            source_version=source_version,
1474
            public=public,
1475
            content_type=content_type,
1476
            delimiter=delimiter)
1477
        return r.headers
1478

    
1479
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1480
        """Get accounts that share with self.account
1481

1482
        :param limit: (str)
1483

1484
        :param marker: (str)
1485

1486
        :returns: (dict)
1487
        """
1488
        self._assert_account()
1489

    
1490
        self.set_param('format', 'json')
1491
        self.set_param('limit', limit, iff=limit is not None)
1492
        self.set_param('marker', marker, iff=marker is not None)
1493

    
1494
        path = ''
1495
        success = kwargs.pop('success', (200, 204))
1496
        r = self.get(path, *args, success=success, **kwargs)
1497
        return r.json
1498

    
1499
    def get_object_versionlist(self, obj):
1500
        """
1501
        :param obj: (str) remote object path
1502

1503
        :returns: (list)
1504
        """
1505
        self._assert_container()
1506
        r = self.object_get(obj, format='json', version='list')
1507
        return r.json['versions']