Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 3ec5c230

History | View | Annotate | Download (47.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
        server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                raise ClientError(
446
                    '%s blocks failed to upload' % len(missing),
447
                    details=['%s' % thread.exception for thread in missing])
448
        except KeyboardInterrupt:
449
            sendlog.info('- - - wait for threads to finish')
450
            for thread in activethreads():
451
                thread.join()
452
            raise
453

    
454
        r = self.object_put(
455
            obj,
456
            format='json',
457
            hashmap=True,
458
            content_type=content_type,
459
            if_etag_match=if_etag_match,
460
            if_etag_not_match='*' if if_not_exist else None,
461
            etag=etag,
462
            json=hashmap,
463
            permissions=sharing,
464
            public=public,
465
            success=201)
466
        return r.headers
467

    
468
    def upload_from_string(
469
            self, obj, input_str,
470
            hash_cb=None,
471
            upload_cb=None,
472
            etag=None,
473
            if_etag_match=None,
474
            if_not_exist=None,
475
            content_encoding=None,
476
            content_disposition=None,
477
            content_type=None,
478
            sharing=None,
479
            public=None,
480
            container_info_cache=None):
481
        """Upload an object using multiple connections (threads)
482

483
        :param obj: (str) remote object path
484

485
        :param input_str: (str) upload content
486

487
        :param hash_cb: optional progress.bar object for calculating hashes
488

489
        :param upload_cb: optional progress.bar object for uploading
490

491
        :param etag: (str)
492

493
        :param if_etag_match: (str) Push that value to if-match header at file
494
            creation
495

496
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497
            it does not exist remotely, otherwise the operation will fail.
498
            Involves the case of an object with the same path is created while
499
            the object is being uploaded.
500

501
        :param content_encoding: (str)
502

503
        :param content_disposition: (str)
504

505
        :param content_type: (str)
506

507
        :param sharing: {'read':[user and/or grp names],
508
            'write':[usr and/or grp names]}
509

510
        :param public: (bool)
511

512
        :param container_info_cache: (dict) if given, avoid redundant calls to
513
        server for container info (block size and hash information)
514
        """
515
        self._assert_container()
516

    
517
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
518
                fileobj=None, size=len(input_str), cache=container_info_cache)
519
        (hashes, hmap, offset) = ([], {}, 0)
520
        if not content_type:
521
            content_type = 'application/octet-stream'
522

    
523
        num_of_blocks, blockmod = size / blocksize, size % blocksize
524
        num_of_blocks += (1 if blockmod else 0) if num_of_blocks else blockmod
525

    
526
        hashes = []
527
        hmap = {}
528
        for blockid in range(num_of_blocks):
529
            start = blockid * blocksize
530
            block = input_str[start: (start + blocksize)]
531
            hashes.append(_pithos_hash(block, blockhash))
532
            hmap[hashes[blockid]] = (start, block)
533

    
534
        hashmap = dict(bytes=size, hashes=hashes)
535
        missing, obj_headers = self._create_object_or_get_missing_hashes(
536
            obj, hashmap,
537
            content_type=content_type,
538
            size=size,
539
            if_etag_match=if_etag_match,
540
            if_etag_not_match='*' if if_not_exist else None,
541
            content_encoding=content_encoding,
542
            content_disposition=content_disposition,
543
            permissions=sharing,
544
            public=public)
545
        if missing is None:
546
            return obj_headers
547
        num_of_missing = len(missing)
548

    
549
        if upload_cb:
550
            self.progress_bar_gen = upload_cb(num_of_blocks)
551
            for i in range(num_of_blocks + 1 - num_of_missing):
552
                self._cb_next()
553

    
554
        tries = 7
555
        old_failures = 0
556
        try:
557
            while tries and missing:
558
                flying = []
559
                failures = []
560
                for hash in missing:
561
                    offset, block = hmap[hash]
562
                    bird = self._put_block_async(block, hash)
563
                    flying.append(bird)
564
                    unfinished = self._watch_thread_limit(flying)
565
                    for thread in set(flying).difference(unfinished):
566
                        if thread.exception:
567
                            failures.append(thread.kwargs['hash'])
568
                        if thread.isAlive():
569
                            flying.append(thread)
570
                        else:
571
                            self._cb_next()
572
                    flying = unfinished
573
                for thread in flying:
574
                    thread.join()
575
                    if thread.exception:
576
                        failures.append(thread.kwargs['hash'])
577
                    self._cb_next()
578
                missing = failures
579
                if missing and len(missing) == old_failures:
580
                    tries -= 1
581
                old_failures = len(missing)
582
            if missing:
583
                raise ClientError(
584
                    '%s blocks failed to upload' % len(missing),
585
                    details=['%s' % thread.exception for thread in missing])
586
        except KeyboardInterrupt:
587
            sendlog.info('- - - wait for threads to finish')
588
            for thread in activethreads():
589
                thread.join()
590
            raise
591

    
592
        r = self.object_put(
593
            obj,
594
            format='json',
595
            hashmap=True,
596
            content_type=content_type,
597
            if_etag_match=if_etag_match,
598
            if_etag_not_match='*' if if_not_exist else None,
599
            etag=etag,
600
            json=hashmap,
601
            permissions=sharing,
602
            public=public,
603
            success=201)
604
        return r.headers
605

    
606
    # download_* auxiliary methods
607
    def _get_remote_blocks_info(self, obj, **restargs):
608
        #retrieve object hashmap
609
        myrange = restargs.pop('data_range', None)
610
        hashmap = self.get_object_hashmap(obj, **restargs)
611
        restargs['data_range'] = myrange
612
        blocksize = int(hashmap['block_size'])
613
        blockhash = hashmap['block_hash']
614
        total_size = hashmap['bytes']
615
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
616
        map_dict = {}
617
        for i, h in enumerate(hashmap['hashes']):
618
            #  map_dict[h] = i   CHAGE
619
            if h in map_dict:
620
                map_dict[h].append(i)
621
            else:
622
                map_dict[h] = [i]
623
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
624

    
625
    def _dump_blocks_sync(
626
            self, obj, remote_hashes, blocksize, total_size, dst, range,
627
            **args):
628
        for blockid, blockhash in enumerate(remote_hashes):
629
            if blockhash:
630
                start = blocksize * blockid
631
                is_last = start + blocksize > total_size
632
                end = (total_size - 1) if is_last else (start + blocksize - 1)
633
                (start, end) = _range_up(start, end, range)
634
                args['data_range'] = 'bytes=%s-%s' % (start, end)
635
                r = self.object_get(obj, success=(200, 206), **args)
636
                self._cb_next()
637
                dst.write(r.content)
638
                dst.flush()
639

    
640
    def _get_block_async(self, obj, **args):
641
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
642
        event.start()
643
        return event
644

    
645
    def _hash_from_file(self, fp, start, size, blockhash):
646
        fp.seek(start)
647
        block = fp.read(size)
648
        h = newhashlib(blockhash)
649
        h.update(block.strip('\x00'))
650
        return hexlify(h.digest())
651

    
652
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
653
        """write the results of a greenleted rest call to a file
654

655
        :param offset: the offset of the file up to blocksize
656
        - e.g. if the range is 10-100, all blocks will be written to
657
        normal_position - 10
658
        """
659
        for key, g in flying.items():
660
            if g.isAlive():
661
                continue
662
            if g.exception:
663
                raise g.exception
664
            block = g.value.content
665
            for block_start in blockids[key]:
666
                local_file.seek(block_start + offset)
667
                local_file.write(block)
668
                self._cb_next()
669
            flying.pop(key)
670
            blockids.pop(key)
671
        local_file.flush()
672

    
673
    def _dump_blocks_async(
674
            self, obj, remote_hashes, blocksize, total_size, local_file,
675
            blockhash=None, resume=False, filerange=None, **restargs):
676
        file_size = fstat(local_file.fileno()).st_size if resume else 0
677
        flying = dict()
678
        blockid_dict = dict()
679
        offset = 0
680
        if filerange is not None:
681
            rstart = int(filerange.split('-')[0])
682
            offset = rstart if blocksize > rstart else rstart % blocksize
683

    
684
        self._init_thread_limit()
685
        for block_hash, blockids in remote_hashes.items():
686
            blockids = [blk * blocksize for blk in blockids]
687
            unsaved = [blk for blk in blockids if not (
688
                blk < file_size and block_hash == self._hash_from_file(
689
                        local_file, blk, blocksize, blockhash))]
690
            self._cb_next(len(blockids) - len(unsaved))
691
            if unsaved:
692
                key = unsaved[0]
693
                self._watch_thread_limit(flying.values())
694
                self._thread2file(
695
                    flying, blockid_dict, local_file, offset,
696
                    **restargs)
697
                end = total_size - 1 if (
698
                    key + blocksize > total_size) else key + blocksize - 1
699
                start, end = _range_up(key, end, filerange)
700
                if start == end:
701
                    self._cb_next()
702
                    continue
703
                restargs['async_headers'] = {
704
                    'Range': 'bytes=%s-%s' % (start, end)}
705
                flying[key] = self._get_block_async(obj, **restargs)
706
                blockid_dict[key] = unsaved
707

    
708
        for thread in flying.values():
709
            thread.join()
710
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
711

    
712
    def download_object(
713
            self, obj, dst,
714
            download_cb=None,
715
            version=None,
716
            resume=False,
717
            range_str=None,
718
            if_match=None,
719
            if_none_match=None,
720
            if_modified_since=None,
721
            if_unmodified_since=None):
722
        """Download an object (multiple connections, random blocks)
723

724
        :param obj: (str) remote object path
725

726
        :param dst: open file descriptor (wb+)
727

728
        :param download_cb: optional progress.bar object for downloading
729

730
        :param version: (str) file version
731

732
        :param resume: (bool) if set, preserve already downloaded file parts
733

734
        :param range_str: (str) from, to are file positions (int) in bytes
735

736
        :param if_match: (str)
737

738
        :param if_none_match: (str)
739

740
        :param if_modified_since: (str) formated date
741

742
        :param if_unmodified_since: (str) formated date"""
743
        restargs = dict(
744
            version=version,
745
            data_range=None if range_str is None else 'bytes=%s' % range_str,
746
            if_match=if_match,
747
            if_none_match=if_none_match,
748
            if_modified_since=if_modified_since,
749
            if_unmodified_since=if_unmodified_since)
750

    
751
        (
752
            blocksize,
753
            blockhash,
754
            total_size,
755
            hash_list,
756
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
757
        assert total_size >= 0
758

    
759
        if download_cb:
760
            self.progress_bar_gen = download_cb(len(hash_list))
761
            self._cb_next()
762

    
763
        if dst.isatty():
764
            self._dump_blocks_sync(
765
                obj,
766
                hash_list,
767
                blocksize,
768
                total_size,
769
                dst,
770
                range_str,
771
                **restargs)
772
        else:
773
            self._dump_blocks_async(
774
                obj,
775
                remote_hashes,
776
                blocksize,
777
                total_size,
778
                dst,
779
                blockhash,
780
                resume,
781
                range_str,
782
                **restargs)
783
            if not range_str:
784
                dst.truncate(total_size)
785

    
786
        self._complete_cb()
787

    
788
    def download_to_string(
789
            self, obj,
790
            download_cb=None,
791
            version=None,
792
            range_str=None,
793
            if_match=None,
794
            if_none_match=None,
795
            if_modified_since=None,
796
            if_unmodified_since=None):
797
        """Download an object to a string (multiple connections). This method
798
        uses threads for http requests, but stores all content in memory.
799

800
        :param obj: (str) remote object path
801

802
        :param download_cb: optional progress.bar object for downloading
803

804
        :param version: (str) file version
805

806
        :param range_str: (str) from, to are file positions (int) in bytes
807

808
        :param if_match: (str)
809

810
        :param if_none_match: (str)
811

812
        :param if_modified_since: (str) formated date
813

814
        :param if_unmodified_since: (str) formated date
815

816
        :returns: (str) the whole object contents
817
        """
818
        restargs = dict(
819
            version=version,
820
            data_range=None if range_str is None else 'bytes=%s' % range_str,
821
            if_match=if_match,
822
            if_none_match=if_none_match,
823
            if_modified_since=if_modified_since,
824
            if_unmodified_since=if_unmodified_since)
825

    
826
        (
827
            blocksize,
828
            blockhash,
829
            total_size,
830
            hash_list,
831
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
832
        assert total_size >= 0
833

    
834
        if download_cb:
835
            self.progress_bar_gen = download_cb(len(hash_list))
836
            self._cb_next()
837

    
838
        num_of_blocks = len(remote_hashes)
839
        ret = [''] * num_of_blocks
840
        self._init_thread_limit()
841
        flying = dict()
842
        try:
843
            for blockid, blockhash in enumerate(remote_hashes):
844
                start = blocksize * blockid
845
                is_last = start + blocksize > total_size
846
                end = (total_size - 1) if is_last else (start + blocksize - 1)
847
                (start, end) = _range_up(start, end, range_str)
848
                if start < end:
849
                    self._watch_thread_limit(flying.values())
850
                    flying[blockid] = self._get_block_async(obj, **restargs)
851
                for runid, thread in flying.items():
852
                    if (blockid + 1) == num_of_blocks:
853
                        thread.join()
854
                    elif thread.isAlive():
855
                        continue
856
                    if thread.exception:
857
                        raise thread.exception
858
                    ret[runid] = thread.value.content
859
                    self._cb_next()
860
                    flying.pop(runid)
861
            return ''.join(ret)
862
        except KeyboardInterrupt:
863
            sendlog.info('- - - wait for threads to finish')
864
            for thread in activethreads():
865
                thread.join()
866

    
867
    #Command Progress Bar method
868
    def _cb_next(self, step=1):
869
        if hasattr(self, 'progress_bar_gen'):
870
            try:
871
                for i in xrange(step):
872
                    self.progress_bar_gen.next()
873
            except:
874
                pass
875

    
876
    def _complete_cb(self):
877
        while True:
878
            try:
879
                self.progress_bar_gen.next()
880
            except:
881
                break
882

    
883
    def get_object_hashmap(
884
            self, obj,
885
            version=None,
886
            if_match=None,
887
            if_none_match=None,
888
            if_modified_since=None,
889
            if_unmodified_since=None,
890
            data_range=None):
891
        """
892
        :param obj: (str) remote object path
893

894
        :param if_match: (str)
895

896
        :param if_none_match: (str)
897

898
        :param if_modified_since: (str) formated date
899

900
        :param if_unmodified_since: (str) formated date
901

902
        :param data_range: (str) from-to where from and to are integers
903
            denoting file positions in bytes
904

905
        :returns: (list)
906
        """
907
        try:
908
            r = self.object_get(
909
                obj,
910
                hashmap=True,
911
                version=version,
912
                if_etag_match=if_match,
913
                if_etag_not_match=if_none_match,
914
                if_modified_since=if_modified_since,
915
                if_unmodified_since=if_unmodified_since,
916
                data_range=data_range)
917
        except ClientError as err:
918
            if err.status == 304 or err.status == 412:
919
                return {}
920
            raise
921
        return r.json
922

    
923
    def set_account_group(self, group, usernames):
924
        """
925
        :param group: (str)
926

927
        :param usernames: (list)
928
        """
929
        r = self.account_post(update=True, groups={group: usernames})
930
        return r
931

    
932
    def del_account_group(self, group):
933
        """
934
        :param group: (str)
935
        """
936
        self.account_post(update=True, groups={group: []})
937

    
938
    def get_account_info(self, until=None):
939
        """
940
        :param until: (str) formated date
941

942
        :returns: (dict)
943
        """
944
        r = self.account_head(until=until)
945
        if r.status_code == 401:
946
            raise ClientError("No authorization", status=401)
947
        return r.headers
948

    
949
    def get_account_quota(self):
950
        """
951
        :returns: (dict)
952
        """
953
        return filter_in(
954
            self.get_account_info(),
955
            'X-Account-Policy-Quota',
956
            exactMatch=True)
957

    
958
    def get_account_versioning(self):
959
        """
960
        :returns: (dict)
961
        """
962
        return filter_in(
963
            self.get_account_info(),
964
            'X-Account-Policy-Versioning',
965
            exactMatch=True)
966

    
967
    def get_account_meta(self, until=None):
968
        """
969
        :meta until: (str) formated date
970

971
        :returns: (dict)
972
        """
973
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
974

    
975
    def get_account_group(self):
976
        """
977
        :returns: (dict)
978
        """
979
        return filter_in(self.get_account_info(), 'X-Account-Group-')
980

    
981
    def set_account_meta(self, metapairs):
982
        """
983
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
984
        """
985
        assert(type(metapairs) is dict)
986
        r = self.account_post(update=True, metadata=metapairs)
987
        return r.headers
988

    
989
    def del_account_meta(self, metakey):
990
        """
991
        :param metakey: (str) metadatum key
992
        """
993
        r = self.account_post(update=True, metadata={metakey: ''})
994
        return r.headers
995

    
996
    """
997
    def set_account_quota(self, quota):
998
        ""
999
        :param quota: (int)
1000
        ""
1001
        self.account_post(update=True, quota=quota)
1002
    """
1003

    
1004
    def set_account_versioning(self, versioning):
1005
        """
1006
        "param versioning: (str)
1007
        """
1008
        r = self.account_post(update=True, versioning=versioning)
1009
        return r.headers
1010

    
1011
    def list_containers(self):
1012
        """
1013
        :returns: (dict)
1014
        """
1015
        r = self.account_get()
1016
        return r.json
1017

    
1018
    def del_container(self, until=None, delimiter=None):
1019
        """
1020
        :param until: (str) formated date
1021

1022
        :param delimiter: (str) with / empty container
1023

1024
        :raises ClientError: 404 Container does not exist
1025

1026
        :raises ClientError: 409 Container is not empty
1027
        """
1028
        self._assert_container()
1029
        r = self.container_delete(
1030
            until=until,
1031
            delimiter=delimiter,
1032
            success=(204, 404, 409))
1033
        if r.status_code == 404:
1034
            raise ClientError(
1035
                'Container "%s" does not exist' % self.container,
1036
                r.status_code)
1037
        elif r.status_code == 409:
1038
            raise ClientError(
1039
                'Container "%s" is not empty' % self.container,
1040
                r.status_code)
1041
        return r.headers
1042

    
1043
    def get_container_versioning(self, container=None):
1044
        """
1045
        :param container: (str)
1046

1047
        :returns: (dict)
1048
        """
1049
        cnt_back_up = self.container
1050
        try:
1051
            self.container = container or cnt_back_up
1052
            return filter_in(
1053
                self.get_container_info(),
1054
                'X-Container-Policy-Versioning')
1055
        finally:
1056
            self.container = cnt_back_up
1057

    
1058
    def get_container_limit(self, container=None):
1059
        """
1060
        :param container: (str)
1061

1062
        :returns: (dict)
1063
        """
1064
        cnt_back_up = self.container
1065
        try:
1066
            self.container = container or cnt_back_up
1067
            return filter_in(
1068
                self.get_container_info(),
1069
                'X-Container-Policy-Quota')
1070
        finally:
1071
            self.container = cnt_back_up
1072

    
1073
    def get_container_info(self, until=None):
1074
        """
1075
        :param until: (str) formated date
1076

1077
        :returns: (dict)
1078

1079
        :raises ClientError: 404 Container not found
1080
        """
1081
        try:
1082
            r = self.container_head(until=until)
1083
        except ClientError as err:
1084
            err.details.append('for container %s' % self.container)
1085
            raise err
1086
        return r.headers
1087

    
1088
    def get_container_meta(self, until=None):
1089
        """
1090
        :param until: (str) formated date
1091

1092
        :returns: (dict)
1093
        """
1094
        return filter_in(
1095
            self.get_container_info(until=until),
1096
            'X-Container-Meta')
1097

    
1098
    def get_container_object_meta(self, until=None):
1099
        """
1100
        :param until: (str) formated date
1101

1102
        :returns: (dict)
1103
        """
1104
        return filter_in(
1105
            self.get_container_info(until=until),
1106
            'X-Container-Object-Meta')
1107

    
1108
    def set_container_meta(self, metapairs):
1109
        """
1110
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111
        """
1112
        assert(type(metapairs) is dict)
1113
        r = self.container_post(update=True, metadata=metapairs)
1114
        return r.headers
1115

    
1116
    def del_container_meta(self, metakey):
1117
        """
1118
        :param metakey: (str) metadatum key
1119

1120
        :returns: (dict) response headers
1121
        """
1122
        r = self.container_post(update=True, metadata={metakey: ''})
1123
        return r.headers
1124

    
1125
    def set_container_limit(self, limit):
1126
        """
1127
        :param limit: (int)
1128
        """
1129
        r = self.container_post(update=True, quota=limit)
1130
        return r.headers
1131

    
1132
    def set_container_versioning(self, versioning):
1133
        """
1134
        :param versioning: (str)
1135
        """
1136
        r = self.container_post(update=True, versioning=versioning)
1137
        return r.headers
1138

    
1139
    def del_object(self, obj, until=None, delimiter=None):
1140
        """
1141
        :param obj: (str) remote object path
1142

1143
        :param until: (str) formated date
1144

1145
        :param delimiter: (str)
1146
        """
1147
        self._assert_container()
1148
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1149
        return r.headers
1150

    
1151
    def set_object_meta(self, obj, metapairs):
1152
        """
1153
        :param obj: (str) remote object path
1154

1155
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1156
        """
1157
        assert(type(metapairs) is dict)
1158
        r = self.object_post(obj, update=True, metadata=metapairs)
1159
        return r.headers
1160

    
1161
    def del_object_meta(self, obj, metakey):
1162
        """
1163
        :param obj: (str) remote object path
1164

1165
        :param metakey: (str) metadatum key
1166
        """
1167
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1168
        return r.headers
1169

    
1170
    def publish_object(self, obj):
1171
        """
1172
        :param obj: (str) remote object path
1173

1174
        :returns: (str) access url
1175
        """
1176
        self.object_post(obj, update=True, public=True)
1177
        info = self.get_object_info(obj)
1178
        pref, sep, rest = self.base_url.partition('//')
1179
        base = rest.split('/')[0]
1180
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1181

    
1182
    def unpublish_object(self, obj):
1183
        """
1184
        :param obj: (str) remote object path
1185
        """
1186
        r = self.object_post(obj, update=True, public=False)
1187
        return r.headers
1188

    
1189
    def get_object_info(self, obj, version=None):
1190
        """
1191
        :param obj: (str) remote object path
1192

1193
        :param version: (str)
1194

1195
        :returns: (dict)
1196
        """
1197
        try:
1198
            r = self.object_head(obj, version=version)
1199
            return r.headers
1200
        except ClientError as ce:
1201
            if ce.status == 404:
1202
                raise ClientError('Object %s not found' % obj, status=404)
1203
            raise
1204

    
1205
    def get_object_meta(self, obj, version=None):
1206
        """
1207
        :param obj: (str) remote object path
1208

1209
        :param version: (str)
1210

1211
        :returns: (dict)
1212
        """
1213
        return filter_in(
1214
            self.get_object_info(obj, version=version),
1215
            'X-Object-Meta')
1216

    
1217
    def get_object_sharing(self, obj):
1218
        """
1219
        :param obj: (str) remote object path
1220

1221
        :returns: (dict)
1222
        """
1223
        r = filter_in(
1224
            self.get_object_info(obj),
1225
            'X-Object-Sharing',
1226
            exactMatch=True)
1227
        reply = {}
1228
        if len(r) > 0:
1229
            perms = r['x-object-sharing'].split(';')
1230
            for perm in perms:
1231
                try:
1232
                    perm.index('=')
1233
                except ValueError:
1234
                    raise ClientError('Incorrect reply format')
1235
                (key, val) = perm.strip().split('=')
1236
                reply[key] = val
1237
        return reply
1238

    
1239
    def set_object_sharing(
1240
            self, obj,
1241
            read_permition=False, write_permition=False):
1242
        """Give read/write permisions to an object.
1243

1244
        :param obj: (str) remote object path
1245

1246
        :param read_permition: (list - bool) users and user groups that get
1247
            read permition for this object - False means all previous read
1248
            permissions will be removed
1249

1250
        :param write_perimition: (list - bool) of users and user groups to get
1251
           write permition for this object - False means all previous write
1252
           permissions will be removed
1253

1254
        :returns: (dict) response headers
1255
        """
1256

    
1257
        perms = dict(read=read_permition or '', write=write_permition or '')
1258
        r = self.object_post(obj, update=True, permissions=perms)
1259
        return r.headers
1260

    
1261
    def del_object_sharing(self, obj):
1262
        """
1263
        :param obj: (str) remote object path
1264
        """
1265
        return self.set_object_sharing(obj)
1266

    
1267
    def append_object(self, obj, source_file, upload_cb=None):
1268
        """
1269
        :param obj: (str) remote object path
1270

1271
        :param source_file: open file descriptor
1272

1273
        :param upload_db: progress.bar for uploading
1274
        """
1275
        self._assert_container()
1276
        meta = self.get_container_info()
1277
        blocksize = int(meta['x-container-block-size'])
1278
        filesize = fstat(source_file.fileno()).st_size
1279
        nblocks = 1 + (filesize - 1) // blocksize
1280
        offset = 0
1281
        headers = {}
1282
        if upload_cb:
1283
            self.progress_bar_gen = upload_cb(nblocks)
1284
            self._cb_next()
1285
        flying = {}
1286
        self._init_thread_limit()
1287
        try:
1288
            for i in range(nblocks):
1289
                block = source_file.read(min(blocksize, filesize - offset))
1290
                offset += len(block)
1291

    
1292
                self._watch_thread_limit(flying.values())
1293
                unfinished = {}
1294
                flying[i] = SilentEvent(
1295
                    method=self.object_post,
1296
                    obj=obj,
1297
                    update=True,
1298
                    content_range='bytes */*',
1299
                    content_type='application/octet-stream',
1300
                    content_length=len(block),
1301
                    data=block)
1302
                flying[i].start()
1303

    
1304
                for key, thread in flying.items():
1305
                    if thread.isAlive():
1306
                        if i < nblocks:
1307
                            unfinished[key] = thread
1308
                            continue
1309
                        thread.join()
1310
                    if thread.exception:
1311
                        raise thread.exception
1312
                    headers[key] = thread.value.headers
1313
                    self._cb_next()
1314
                flying = unfinished
1315
        except KeyboardInterrupt:
1316
            sendlog.info('- - - wait for threads to finish')
1317
            for thread in activethreads():
1318
                thread.join()
1319
        finally:
1320
            from time import sleep
1321
            sleep(1.1 * len(activethreads()))
1322
        return headers.values()
1323

    
1324
    def truncate_object(self, obj, upto_bytes):
1325
        """
1326
        :param obj: (str) remote object path
1327

1328
        :param upto_bytes: max number of bytes to leave on file
1329

1330
        :returns: (dict) response headers
1331
        """
1332
        r = self.object_post(
1333
            obj,
1334
            update=True,
1335
            content_range='bytes 0-%s/*' % upto_bytes,
1336
            content_type='application/octet-stream',
1337
            object_bytes=upto_bytes,
1338
            source_object=path4url(self.container, obj))
1339
        return r.headers
1340

    
1341
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1342
        """Overwrite a part of an object from local source file
1343

1344
        :param obj: (str) remote object path
1345

1346
        :param start: (int) position in bytes to start overwriting from
1347

1348
        :param end: (int) position in bytes to stop overwriting at
1349

1350
        :param source_file: open file descriptor
1351

1352
        :param upload_db: progress.bar for uploading
1353
        """
1354

    
1355
        r = self.get_object_info(obj)
1356
        rf_size = int(r['content-length'])
1357
        if rf_size < int(start):
1358
            raise ClientError(
1359
                'Range start exceeds file size',
1360
                status=416)
1361
        elif rf_size < int(end):
1362
            raise ClientError(
1363
                'Range end exceeds file size',
1364
                status=416)
1365
        self._assert_container()
1366
        meta = self.get_container_info()
1367
        blocksize = int(meta['x-container-block-size'])
1368
        filesize = fstat(source_file.fileno()).st_size
1369
        datasize = int(end) - int(start) + 1
1370
        nblocks = 1 + (datasize - 1) // blocksize
1371
        offset = 0
1372
        if upload_cb:
1373
            self.progress_bar_gen = upload_cb(nblocks)
1374
            self._cb_next()
1375
        headers = []
1376
        for i in range(nblocks):
1377
            read_size = min(blocksize, filesize - offset, datasize - offset)
1378
            block = source_file.read(read_size)
1379
            r = self.object_post(
1380
                obj,
1381
                update=True,
1382
                content_type='application/octet-stream',
1383
                content_length=len(block),
1384
                content_range='bytes %s-%s/*' % (
1385
                    start + offset,
1386
                    start + offset + len(block) - 1),
1387
                data=block)
1388
            headers.append(dict(r.headers))
1389
            offset += len(block)
1390

    
1391
            self._cb_next
1392
        return headers
1393

    
1394
    def copy_object(
1395
            self, src_container, src_object, dst_container,
1396
            dst_object=None,
1397
            source_version=None,
1398
            source_account=None,
1399
            public=False,
1400
            content_type=None,
1401
            delimiter=None):
1402
        """
1403
        :param src_container: (str) source container
1404

1405
        :param src_object: (str) source object path
1406

1407
        :param dst_container: (str) destination container
1408

1409
        :param dst_object: (str) destination object path
1410

1411
        :param source_version: (str) source object version
1412

1413
        :param source_account: (str) account to copy from
1414

1415
        :param public: (bool)
1416

1417
        :param content_type: (str)
1418

1419
        :param delimiter: (str)
1420

1421
        :returns: (dict) response headers
1422
        """
1423
        self._assert_account()
1424
        self.container = dst_container
1425
        src_path = path4url(src_container, src_object)
1426
        r = self.object_put(
1427
            dst_object or src_object,
1428
            success=201,
1429
            copy_from=src_path,
1430
            content_length=0,
1431
            source_version=source_version,
1432
            source_account=source_account,
1433
            public=public,
1434
            content_type=content_type,
1435
            delimiter=delimiter)
1436
        return r.headers
1437

    
1438
    def move_object(
1439
            self, src_container, src_object, dst_container,
1440
            dst_object=False,
1441
            source_account=None,
1442
            source_version=None,
1443
            public=False,
1444
            content_type=None,
1445
            delimiter=None):
1446
        """
1447
        :param src_container: (str) source container
1448

1449
        :param src_object: (str) source object path
1450

1451
        :param dst_container: (str) destination container
1452

1453
        :param dst_object: (str) destination object path
1454

1455
        :param source_account: (str) account to move from
1456

1457
        :param source_version: (str) source object version
1458

1459
        :param public: (bool)
1460

1461
        :param content_type: (str)
1462

1463
        :param delimiter: (str)
1464

1465
        :returns: (dict) response headers
1466
        """
1467
        self._assert_account()
1468
        self.container = dst_container
1469
        dst_object = dst_object or src_object
1470
        src_path = path4url(src_container, src_object)
1471
        r = self.object_put(
1472
            dst_object,
1473
            success=201,
1474
            move_from=src_path,
1475
            content_length=0,
1476
            source_account=source_account,
1477
            source_version=source_version,
1478
            public=public,
1479
            content_type=content_type,
1480
            delimiter=delimiter)
1481
        return r.headers
1482

    
1483
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1484
        """Get accounts that share with self.account
1485

1486
        :param limit: (str)
1487

1488
        :param marker: (str)
1489

1490
        :returns: (dict)
1491
        """
1492
        self._assert_account()
1493

    
1494
        self.set_param('format', 'json')
1495
        self.set_param('limit', limit, iff=limit is not None)
1496
        self.set_param('marker', marker, iff=marker is not None)
1497

    
1498
        path = ''
1499
        success = kwargs.pop('success', (200, 204))
1500
        r = self.get(path, *args, success=success, **kwargs)
1501
        return r.json
1502

    
1503
    def get_object_versionlist(self, obj):
1504
        """
1505
        :param obj: (str) remote object path
1506

1507
        :returns: (list)
1508
        """
1509
        self._assert_container()
1510
        r = self.object_get(obj, format='json', version='list')
1511
        return r.json['versions']