Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 7806f19d

History | View | Annotate | Download (47.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
            server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                try:
446
                    details = ['%s' % thread.exception for thread in missing]
447
                except Exception:
448
                    details = ['Also, failed to read thread exceptions']
449
                raise ClientError(
450
                    '%s blocks failed to upload' % len(missing),
451
                    details=details)
452
        except KeyboardInterrupt:
453
            sendlog.info('- - - wait for threads to finish')
454
            for thread in activethreads():
455
                thread.join()
456
            raise
457

    
458
        r = self.object_put(
459
            obj,
460
            format='json',
461
            hashmap=True,
462
            content_type=content_type,
463
            content_encoding=content_encoding,
464
            if_etag_match=if_etag_match,
465
            if_etag_not_match='*' if if_not_exist else None,
466
            etag=etag,
467
            json=hashmap,
468
            permissions=sharing,
469
            public=public,
470
            success=201)
471
        return r.headers
472

    
473
    def upload_from_string(
474
            self, obj, input_str,
475
            hash_cb=None,
476
            upload_cb=None,
477
            etag=None,
478
            if_etag_match=None,
479
            if_not_exist=None,
480
            content_encoding=None,
481
            content_disposition=None,
482
            content_type=None,
483
            sharing=None,
484
            public=None,
485
            container_info_cache=None):
486
        """Upload an object using multiple connections (threads)
487

488
        :param obj: (str) remote object path
489

490
        :param input_str: (str) upload content
491

492
        :param hash_cb: optional progress.bar object for calculating hashes
493

494
        :param upload_cb: optional progress.bar object for uploading
495

496
        :param etag: (str)
497

498
        :param if_etag_match: (str) Push that value to if-match header at file
499
            creation
500

501
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502
            it does not exist remotely, otherwise the operation will fail.
503
            Involves the case of an object with the same path is created while
504
            the object is being uploaded.
505

506
        :param content_encoding: (str)
507

508
        :param content_disposition: (str)
509

510
        :param content_type: (str)
511

512
        :param sharing: {'read':[user and/or grp names],
513
            'write':[usr and/or grp names]}
514

515
        :param public: (bool)
516

517
        :param container_info_cache: (dict) if given, avoid redundant calls to
518
            server for container info (block size and hash information)
519
        """
520
        self._assert_container()
521

    
522
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
523
                fileobj=None, size=len(input_str), cache=container_info_cache)
524
        (hashes, hmap, offset) = ([], {}, 0)
525
        if not content_type:
526
            content_type = 'application/octet-stream'
527

    
528
        hashes = []
529
        hmap = {}
530
        for blockid in range(nblocks):
531
            start = blockid * blocksize
532
            block = input_str[start: (start + blocksize)]
533
            hashes.append(_pithos_hash(block, blockhash))
534
            hmap[hashes[blockid]] = (start, block)
535

    
536
        hashmap = dict(bytes=size, hashes=hashes)
537
        missing, obj_headers = self._create_object_or_get_missing_hashes(
538
            obj, hashmap,
539
            content_type=content_type,
540
            size=size,
541
            if_etag_match=if_etag_match,
542
            if_etag_not_match='*' if if_not_exist else None,
543
            content_encoding=content_encoding,
544
            content_disposition=content_disposition,
545
            permissions=sharing,
546
            public=public)
547
        if missing is None:
548
            return obj_headers
549
        num_of_missing = len(missing)
550

    
551
        if upload_cb:
552
            self.progress_bar_gen = upload_cb(nblocks)
553
            for i in range(nblocks + 1 - num_of_missing):
554
                self._cb_next()
555

    
556
        tries = 7
557
        old_failures = 0
558
        try:
559
            while tries and missing:
560
                flying = []
561
                failures = []
562
                for hash in missing:
563
                    offset, block = hmap[hash]
564
                    bird = self._put_block_async(block, hash)
565
                    flying.append(bird)
566
                    unfinished = self._watch_thread_limit(flying)
567
                    for thread in set(flying).difference(unfinished):
568
                        if thread.exception:
569
                            failures.append(thread.kwargs['hash'])
570
                        if thread.isAlive():
571
                            flying.append(thread)
572
                        else:
573
                            self._cb_next()
574
                    flying = unfinished
575
                for thread in flying:
576
                    thread.join()
577
                    if thread.exception:
578
                        failures.append(thread.kwargs['hash'])
579
                    self._cb_next()
580
                missing = failures
581
                if missing and len(missing) == old_failures:
582
                    tries -= 1
583
                old_failures = len(missing)
584
            if missing:
585
                raise ClientError(
586
                    '%s blocks failed to upload' % len(missing),
587
                    details=['%s' % thread.exception for thread in missing])
588
        except KeyboardInterrupt:
589
            sendlog.info('- - - wait for threads to finish')
590
            for thread in activethreads():
591
                thread.join()
592
            raise
593

    
594
        r = self.object_put(
595
            obj,
596
            format='json',
597
            hashmap=True,
598
            content_type=content_type,
599
            content_encoding=content_encoding,
600
            if_etag_match=if_etag_match,
601
            if_etag_not_match='*' if if_not_exist else None,
602
            etag=etag,
603
            json=hashmap,
604
            permissions=sharing,
605
            public=public,
606
            success=201)
607
        return r.headers
608

    
609
    # download_* auxiliary methods
610
    def _get_remote_blocks_info(self, obj, **restargs):
611
        #retrieve object hashmap
612
        myrange = restargs.pop('data_range', None)
613
        hashmap = self.get_object_hashmap(obj, **restargs)
614
        restargs['data_range'] = myrange
615
        blocksize = int(hashmap['block_size'])
616
        blockhash = hashmap['block_hash']
617
        total_size = hashmap['bytes']
618
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
619
        map_dict = {}
620
        for i, h in enumerate(hashmap['hashes']):
621
            #  map_dict[h] = i   CHAGE
622
            if h in map_dict:
623
                map_dict[h].append(i)
624
            else:
625
                map_dict[h] = [i]
626
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
627

    
628
    def _dump_blocks_sync(
629
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
630
            **args):
631
        for blockid, blockhash in enumerate(remote_hashes):
632
            if blockhash:
633
                start = blocksize * blockid
634
                is_last = start + blocksize > total_size
635
                end = (total_size - 1) if is_last else (start + blocksize - 1)
636
                (start, end) = _range_up(start, end, crange)
637
                args['data_range'] = 'bytes=%s-%s' % (start, end)
638
                r = self.object_get(obj, success=(200, 206), **args)
639
                self._cb_next()
640
                dst.write(r.content)
641
                dst.flush()
642

    
643
    def _get_block_async(self, obj, **args):
644
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
645
        event.start()
646
        return event
647

    
648
    def _hash_from_file(self, fp, start, size, blockhash):
649
        fp.seek(start)
650
        block = fp.read(size)
651
        h = newhashlib(blockhash)
652
        h.update(block.strip('\x00'))
653
        return hexlify(h.digest())
654

    
655
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
656
        """write the results of a greenleted rest call to a file
657

658
        :param offset: the offset of the file up to blocksize
659
        - e.g. if the range is 10-100, all blocks will be written to
660
        normal_position - 10
661
        """
662
        for key, g in flying.items():
663
            if g.isAlive():
664
                continue
665
            if g.exception:
666
                raise g.exception
667
            block = g.value.content
668
            for block_start in blockids[key]:
669
                local_file.seek(block_start + offset)
670
                local_file.write(block)
671
                self._cb_next()
672
            flying.pop(key)
673
            blockids.pop(key)
674
        local_file.flush()
675

    
676
    def _dump_blocks_async(
677
            self, obj, remote_hashes, blocksize, total_size, local_file,
678
            blockhash=None, resume=False, filerange=None, **restargs):
679
        file_size = fstat(local_file.fileno()).st_size if resume else 0
680
        flying = dict()
681
        blockid_dict = dict()
682
        offset = 0
683
        if filerange is not None:
684
            rstart = int(filerange.split('-')[0])
685
            offset = rstart if blocksize > rstart else rstart % blocksize
686

    
687
        self._init_thread_limit()
688
        for block_hash, blockids in remote_hashes.items():
689
            blockids = [blk * blocksize for blk in blockids]
690
            unsaved = [blk for blk in blockids if not (
691
                blk < file_size and block_hash == self._hash_from_file(
692
                        local_file, blk, blocksize, blockhash))]
693
            self._cb_next(len(blockids) - len(unsaved))
694
            if unsaved:
695
                key = unsaved[0]
696
                self._watch_thread_limit(flying.values())
697
                self._thread2file(
698
                    flying, blockid_dict, local_file, offset,
699
                    **restargs)
700
                end = total_size - 1 if (
701
                    key + blocksize > total_size) else key + blocksize - 1
702
                start, end = _range_up(key, end, filerange)
703
                if start == end:
704
                    self._cb_next()
705
                    continue
706
                restargs['async_headers'] = {
707
                    'Range': 'bytes=%s-%s' % (start, end)}
708
                flying[key] = self._get_block_async(obj, **restargs)
709
                blockid_dict[key] = unsaved
710

    
711
        for thread in flying.values():
712
            thread.join()
713
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
714

    
715
    def download_object(
716
            self, obj, dst,
717
            download_cb=None,
718
            version=None,
719
            resume=False,
720
            range_str=None,
721
            if_match=None,
722
            if_none_match=None,
723
            if_modified_since=None,
724
            if_unmodified_since=None):
725
        """Download an object (multiple connections, random blocks)
726

727
        :param obj: (str) remote object path
728

729
        :param dst: open file descriptor (wb+)
730

731
        :param download_cb: optional progress.bar object for downloading
732

733
        :param version: (str) file version
734

735
        :param resume: (bool) if set, preserve already downloaded file parts
736

737
        :param range_str: (str) from, to are file positions (int) in bytes
738

739
        :param if_match: (str)
740

741
        :param if_none_match: (str)
742

743
        :param if_modified_since: (str) formated date
744

745
        :param if_unmodified_since: (str) formated date"""
746
        restargs = dict(
747
            version=version,
748
            data_range=None if range_str is None else 'bytes=%s' % range_str,
749
            if_match=if_match,
750
            if_none_match=if_none_match,
751
            if_modified_since=if_modified_since,
752
            if_unmodified_since=if_unmodified_since)
753

    
754
        (
755
            blocksize,
756
            blockhash,
757
            total_size,
758
            hash_list,
759
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
760
        assert total_size >= 0
761

    
762
        if download_cb:
763
            self.progress_bar_gen = download_cb(len(hash_list))
764
            self._cb_next()
765

    
766
        if dst.isatty():
767
            self._dump_blocks_sync(
768
                obj,
769
                hash_list,
770
                blocksize,
771
                total_size,
772
                dst,
773
                range_str,
774
                **restargs)
775
        else:
776
            self._dump_blocks_async(
777
                obj,
778
                remote_hashes,
779
                blocksize,
780
                total_size,
781
                dst,
782
                blockhash,
783
                resume,
784
                range_str,
785
                **restargs)
786
            if not range_str:
787
                dst.truncate(total_size)
788

    
789
        self._complete_cb()
790

    
791
    def download_to_string(
792
            self, obj,
793
            download_cb=None,
794
            version=None,
795
            range_str=None,
796
            if_match=None,
797
            if_none_match=None,
798
            if_modified_since=None,
799
            if_unmodified_since=None):
800
        """Download an object to a string (multiple connections). This method
801
        uses threads for http requests, but stores all content in memory.
802

803
        :param obj: (str) remote object path
804

805
        :param download_cb: optional progress.bar object for downloading
806

807
        :param version: (str) file version
808

809
        :param range_str: (str) from, to are file positions (int) in bytes
810

811
        :param if_match: (str)
812

813
        :param if_none_match: (str)
814

815
        :param if_modified_since: (str) formated date
816

817
        :param if_unmodified_since: (str) formated date
818

819
        :returns: (str) the whole object contents
820
        """
821
        restargs = dict(
822
            version=version,
823
            data_range=None if range_str is None else 'bytes=%s' % range_str,
824
            if_match=if_match,
825
            if_none_match=if_none_match,
826
            if_modified_since=if_modified_since,
827
            if_unmodified_since=if_unmodified_since)
828

    
829
        (
830
            blocksize,
831
            blockhash,
832
            total_size,
833
            hash_list,
834
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
835
        assert total_size >= 0
836

    
837
        if download_cb:
838
            self.progress_bar_gen = download_cb(len(hash_list))
839
            self._cb_next()
840

    
841
        num_of_blocks = len(remote_hashes)
842
        ret = [''] * num_of_blocks
843
        self._init_thread_limit()
844
        flying = dict()
845
        try:
846
            for blockid, blockhash in enumerate(remote_hashes):
847
                start = blocksize * blockid
848
                is_last = start + blocksize > total_size
849
                end = (total_size - 1) if is_last else (start + blocksize - 1)
850
                (start, end) = _range_up(start, end, range_str)
851
                if start < end:
852
                    self._watch_thread_limit(flying.values())
853
                    flying[blockid] = self._get_block_async(obj, **restargs)
854
                for runid, thread in flying.items():
855
                    if (blockid + 1) == num_of_blocks:
856
                        thread.join()
857
                    elif thread.isAlive():
858
                        continue
859
                    if thread.exception:
860
                        raise thread.exception
861
                    ret[runid] = thread.value.content
862
                    self._cb_next()
863
                    flying.pop(runid)
864
            return ''.join(ret)
865
        except KeyboardInterrupt:
866
            sendlog.info('- - - wait for threads to finish')
867
            for thread in activethreads():
868
                thread.join()
869

    
870
    #Command Progress Bar method
871
    def _cb_next(self, step=1):
872
        if hasattr(self, 'progress_bar_gen'):
873
            try:
874
                for i in xrange(step):
875
                    self.progress_bar_gen.next()
876
            except:
877
                pass
878

    
879
    def _complete_cb(self):
880
        while True:
881
            try:
882
                self.progress_bar_gen.next()
883
            except:
884
                break
885

    
886
    def get_object_hashmap(
887
            self, obj,
888
            version=None,
889
            if_match=None,
890
            if_none_match=None,
891
            if_modified_since=None,
892
            if_unmodified_since=None,
893
            data_range=None):
894
        """
895
        :param obj: (str) remote object path
896

897
        :param if_match: (str)
898

899
        :param if_none_match: (str)
900

901
        :param if_modified_since: (str) formated date
902

903
        :param if_unmodified_since: (str) formated date
904

905
        :param data_range: (str) from-to where from and to are integers
906
            denoting file positions in bytes
907

908
        :returns: (list)
909
        """
910
        try:
911
            r = self.object_get(
912
                obj,
913
                hashmap=True,
914
                version=version,
915
                if_etag_match=if_match,
916
                if_etag_not_match=if_none_match,
917
                if_modified_since=if_modified_since,
918
                if_unmodified_since=if_unmodified_since,
919
                data_range=data_range)
920
        except ClientError as err:
921
            if err.status == 304 or err.status == 412:
922
                return {}
923
            raise
924
        return r.json
925

    
926
    def set_account_group(self, group, usernames):
927
        """
928
        :param group: (str)
929

930
        :param usernames: (list)
931
        """
932
        r = self.account_post(update=True, groups={group: usernames})
933
        return r
934

    
935
    def del_account_group(self, group):
936
        """
937
        :param group: (str)
938
        """
939
        self.account_post(update=True, groups={group: []})
940

    
941
    def get_account_info(self, until=None):
942
        """
943
        :param until: (str) formated date
944

945
        :returns: (dict)
946
        """
947
        r = self.account_head(until=until)
948
        if r.status_code == 401:
949
            raise ClientError("No authorization", status=401)
950
        return r.headers
951

    
952
    def get_account_quota(self):
953
        """
954
        :returns: (dict)
955
        """
956
        return filter_in(
957
            self.get_account_info(),
958
            'X-Account-Policy-Quota',
959
            exactMatch=True)
960

    
961
    #def get_account_versioning(self):
962
    #    """
963
    #    :returns: (dict)
964
    #    """
965
    #    return filter_in(
966
    #        self.get_account_info(),
967
    #        'X-Account-Policy-Versioning',
968
    #        exactMatch=True)
969

    
970
    def get_account_meta(self, until=None):
971
        """
972
        :param until: (str) formated date
973

974
        :returns: (dict)
975
        """
976
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
977

    
978
    def get_account_group(self):
979
        """
980
        :returns: (dict)
981
        """
982
        return filter_in(self.get_account_info(), 'X-Account-Group-')
983

    
984
    def set_account_meta(self, metapairs):
985
        """
986
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
987
        """
988
        assert(type(metapairs) is dict)
989
        r = self.account_post(update=True, metadata=metapairs)
990
        return r.headers
991

    
992
    def del_account_meta(self, metakey):
993
        """
994
        :param metakey: (str) metadatum key
995
        """
996
        r = self.account_post(update=True, metadata={metakey: ''})
997
        return r.headers
998

    
999
    #def set_account_quota(self, quota):
1000
    #    """
1001
    #    :param quota: (int)
1002
    #    """
1003
    #    self.account_post(update=True, quota=quota)
1004

    
1005
    #def set_account_versioning(self, versioning):
1006
    #    """
1007
    #    :param versioning: (str)
1008
    #    """
1009
    #    r = self.account_post(update=True, versioning=versioning)
1010
    #    return r.headers
1011

    
1012
    def list_containers(self):
1013
        """
1014
        :returns: (dict)
1015
        """
1016
        r = self.account_get()
1017
        return r.json
1018

    
1019
    def del_container(self, until=None, delimiter=None):
1020
        """
1021
        :param until: (str) formated date
1022

1023
        :param delimiter: (str) with / empty container
1024

1025
        :raises ClientError: 404 Container does not exist
1026

1027
        :raises ClientError: 409 Container is not empty
1028
        """
1029
        self._assert_container()
1030
        r = self.container_delete(
1031
            until=until,
1032
            delimiter=delimiter,
1033
            success=(204, 404, 409))
1034
        if r.status_code == 404:
1035
            raise ClientError(
1036
                'Container "%s" does not exist' % self.container,
1037
                r.status_code)
1038
        elif r.status_code == 409:
1039
            raise ClientError(
1040
                'Container "%s" is not empty' % self.container,
1041
                r.status_code)
1042
        return r.headers
1043

    
1044
    def get_container_versioning(self, container=None):
1045
        """
1046
        :param container: (str)
1047

1048
        :returns: (dict)
1049
        """
1050
        cnt_back_up = self.container
1051
        try:
1052
            self.container = container or cnt_back_up
1053
            return filter_in(
1054
                self.get_container_info(),
1055
                'X-Container-Policy-Versioning')
1056
        finally:
1057
            self.container = cnt_back_up
1058

    
1059
    def get_container_limit(self, container=None):
1060
        """
1061
        :param container: (str)
1062

1063
        :returns: (dict)
1064
        """
1065
        cnt_back_up = self.container
1066
        try:
1067
            self.container = container or cnt_back_up
1068
            return filter_in(
1069
                self.get_container_info(),
1070
                'X-Container-Policy-Quota')
1071
        finally:
1072
            self.container = cnt_back_up
1073

    
1074
    def get_container_info(self, until=None):
1075
        """
1076
        :param until: (str) formated date
1077

1078
        :returns: (dict)
1079

1080
        :raises ClientError: 404 Container not found
1081
        """
1082
        try:
1083
            r = self.container_head(until=until)
1084
        except ClientError as err:
1085
            err.details.append('for container %s' % self.container)
1086
            raise err
1087
        return r.headers
1088

    
1089
    def get_container_meta(self, until=None):
1090
        """
1091
        :param until: (str) formated date
1092

1093
        :returns: (dict)
1094
        """
1095
        return filter_in(
1096
            self.get_container_info(until=until),
1097
            'X-Container-Meta')
1098

    
1099
    def get_container_object_meta(self, until=None):
1100
        """
1101
        :param until: (str) formated date
1102

1103
        :returns: (dict)
1104
        """
1105
        return filter_in(
1106
            self.get_container_info(until=until),
1107
            'X-Container-Object-Meta')
1108

    
1109
    def set_container_meta(self, metapairs):
1110
        """
1111
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1112
        """
1113
        assert(type(metapairs) is dict)
1114
        r = self.container_post(update=True, metadata=metapairs)
1115
        return r.headers
1116

    
1117
    def del_container_meta(self, metakey):
1118
        """
1119
        :param metakey: (str) metadatum key
1120

1121
        :returns: (dict) response headers
1122
        """
1123
        r = self.container_post(update=True, metadata={metakey: ''})
1124
        return r.headers
1125

    
1126
    def set_container_limit(self, limit):
1127
        """
1128
        :param limit: (int)
1129
        """
1130
        r = self.container_post(update=True, quota=limit)
1131
        return r.headers
1132

    
1133
    def set_container_versioning(self, versioning):
1134
        """
1135
        :param versioning: (str)
1136
        """
1137
        r = self.container_post(update=True, versioning=versioning)
1138
        return r.headers
1139

    
1140
    def del_object(self, obj, until=None, delimiter=None):
1141
        """
1142
        :param obj: (str) remote object path
1143

1144
        :param until: (str) formated date
1145

1146
        :param delimiter: (str)
1147
        """
1148
        self._assert_container()
1149
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1150
        return r.headers
1151

    
1152
    def set_object_meta(self, obj, metapairs):
1153
        """
1154
        :param obj: (str) remote object path
1155

1156
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1157
        """
1158
        assert(type(metapairs) is dict)
1159
        r = self.object_post(obj, update=True, metadata=metapairs)
1160
        return r.headers
1161

    
1162
    def del_object_meta(self, obj, metakey):
1163
        """
1164
        :param obj: (str) remote object path
1165

1166
        :param metakey: (str) metadatum key
1167
        """
1168
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1169
        return r.headers
1170

    
1171
    def publish_object(self, obj):
1172
        """
1173
        :param obj: (str) remote object path
1174

1175
        :returns: (str) access url
1176
        """
1177
        self.object_post(obj, update=True, public=True)
1178
        info = self.get_object_info(obj)
1179
        return info['x-object-public']
1180
        pref, sep, rest = self.base_url.partition('//')
1181
        base = rest.split('/')[0]
1182
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1183

    
1184
    def unpublish_object(self, obj):
1185
        """
1186
        :param obj: (str) remote object path
1187
        """
1188
        r = self.object_post(obj, update=True, public=False)
1189
        return r.headers
1190

    
1191
    def get_object_info(self, obj, version=None):
1192
        """
1193
        :param obj: (str) remote object path
1194

1195
        :param version: (str)
1196

1197
        :returns: (dict)
1198
        """
1199
        try:
1200
            r = self.object_head(obj, version=version)
1201
            return r.headers
1202
        except ClientError as ce:
1203
            if ce.status == 404:
1204
                raise ClientError('Object %s not found' % obj, status=404)
1205
            raise
1206

    
1207
    def get_object_meta(self, obj, version=None):
1208
        """
1209
        :param obj: (str) remote object path
1210

1211
        :param version: (str)
1212

1213
        :returns: (dict)
1214
        """
1215
        return filter_in(
1216
            self.get_object_info(obj, version=version),
1217
            'X-Object-Meta')
1218

    
1219
    def get_object_sharing(self, obj):
1220
        """
1221
        :param obj: (str) remote object path
1222

1223
        :returns: (dict)
1224
        """
1225
        r = filter_in(
1226
            self.get_object_info(obj),
1227
            'X-Object-Sharing',
1228
            exactMatch=True)
1229
        reply = {}
1230
        if len(r) > 0:
1231
            perms = r['x-object-sharing'].split(';')
1232
            for perm in perms:
1233
                try:
1234
                    perm.index('=')
1235
                except ValueError:
1236
                    raise ClientError('Incorrect reply format')
1237
                (key, val) = perm.strip().split('=')
1238
                reply[key] = val
1239
        return reply
1240

    
1241
    def set_object_sharing(
1242
            self, obj,
1243
            read_permission=False, write_permission=False):
1244
        """Give read/write permisions to an object.
1245

1246
        :param obj: (str) remote object path
1247

1248
        :param read_permission: (list - bool) users and user groups that get
1249
            read permission for this object - False means all previous read
1250
            permissions will be removed
1251

1252
        :param write_permission: (list - bool) of users and user groups to get
1253
           write permission for this object - False means all previous write
1254
           permissions will be removed
1255

1256
        :returns: (dict) response headers
1257
        """
1258

    
1259
        perms = dict(read=read_permission or '', write=write_permission or '')
1260
        r = self.object_post(obj, update=True, permissions=perms)
1261
        return r.headers
1262

    
1263
    def del_object_sharing(self, obj):
1264
        """
1265
        :param obj: (str) remote object path
1266
        """
1267
        return self.set_object_sharing(obj)
1268

    
1269
    def append_object(self, obj, source_file, upload_cb=None):
1270
        """
1271
        :param obj: (str) remote object path
1272

1273
        :param source_file: open file descriptor
1274

1275
        :param upload_db: progress.bar for uploading
1276
        """
1277
        self._assert_container()
1278
        meta = self.get_container_info()
1279
        blocksize = int(meta['x-container-block-size'])
1280
        filesize = fstat(source_file.fileno()).st_size
1281
        nblocks = 1 + (filesize - 1) // blocksize
1282
        offset = 0
1283
        headers = {}
1284
        if upload_cb:
1285
            self.progress_bar_gen = upload_cb(nblocks)
1286
            self._cb_next()
1287
        flying = {}
1288
        self._init_thread_limit()
1289
        try:
1290
            for i in range(nblocks):
1291
                block = source_file.read(min(blocksize, filesize - offset))
1292
                offset += len(block)
1293

    
1294
                self._watch_thread_limit(flying.values())
1295
                unfinished = {}
1296
                flying[i] = SilentEvent(
1297
                    method=self.object_post,
1298
                    obj=obj,
1299
                    update=True,
1300
                    content_range='bytes */*',
1301
                    content_type='application/octet-stream',
1302
                    content_length=len(block),
1303
                    data=block)
1304
                flying[i].start()
1305

    
1306
                for key, thread in flying.items():
1307
                    if thread.isAlive():
1308
                        if i < nblocks:
1309
                            unfinished[key] = thread
1310
                            continue
1311
                        thread.join()
1312
                    if thread.exception:
1313
                        raise thread.exception
1314
                    headers[key] = thread.value.headers
1315
                    self._cb_next()
1316
                flying = unfinished
1317
        except KeyboardInterrupt:
1318
            sendlog.info('- - - wait for threads to finish')
1319
            for thread in activethreads():
1320
                thread.join()
1321
        finally:
1322
            from time import sleep
1323
            sleep(2 * len(activethreads()))
1324
        return headers.values()
1325

    
1326
    def truncate_object(self, obj, upto_bytes):
1327
        """
1328
        :param obj: (str) remote object path
1329

1330
        :param upto_bytes: max number of bytes to leave on file
1331

1332
        :returns: (dict) response headers
1333
        """
1334
        r = self.object_post(
1335
            obj,
1336
            update=True,
1337
            content_range='bytes 0-%s/*' % upto_bytes,
1338
            content_type='application/octet-stream',
1339
            object_bytes=upto_bytes,
1340
            source_object=path4url(self.container, obj))
1341
        return r.headers
1342

    
1343
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1344
        """Overwrite a part of an object from local source file
1345

1346
        :param obj: (str) remote object path
1347

1348
        :param start: (int) position in bytes to start overwriting from
1349

1350
        :param end: (int) position in bytes to stop overwriting at
1351

1352
        :param source_file: open file descriptor
1353

1354
        :param upload_db: progress.bar for uploading
1355
        """
1356

    
1357
        r = self.get_object_info(obj)
1358
        rf_size = int(r['content-length'])
1359
        if rf_size < int(start):
1360
            raise ClientError(
1361
                'Range start exceeds file size',
1362
                status=416)
1363
        elif rf_size < int(end):
1364
            raise ClientError(
1365
                'Range end exceeds file size',
1366
                status=416)
1367
        self._assert_container()
1368
        meta = self.get_container_info()
1369
        blocksize = int(meta['x-container-block-size'])
1370
        filesize = fstat(source_file.fileno()).st_size
1371
        datasize = int(end) - int(start) + 1
1372
        nblocks = 1 + (datasize - 1) // blocksize
1373
        offset = 0
1374
        if upload_cb:
1375
            self.progress_bar_gen = upload_cb(nblocks)
1376
            self._cb_next()
1377
        headers = []
1378
        for i in range(nblocks):
1379
            read_size = min(blocksize, filesize - offset, datasize - offset)
1380
            block = source_file.read(read_size)
1381
            r = self.object_post(
1382
                obj,
1383
                update=True,
1384
                content_type='application/octet-stream',
1385
                content_length=len(block),
1386
                content_range='bytes %s-%s/*' % (
1387
                    start + offset,
1388
                    start + offset + len(block) - 1),
1389
                data=block)
1390
            headers.append(dict(r.headers))
1391
            offset += len(block)
1392

    
1393
            self._cb_next
1394
        return headers
1395

    
1396
    def copy_object(
1397
            self, src_container, src_object, dst_container,
1398
            dst_object=None,
1399
            source_version=None,
1400
            source_account=None,
1401
            public=False,
1402
            content_type=None,
1403
            delimiter=None):
1404
        """
1405
        :param src_container: (str) source container
1406

1407
        :param src_object: (str) source object path
1408

1409
        :param dst_container: (str) destination container
1410

1411
        :param dst_object: (str) destination object path
1412

1413
        :param source_version: (str) source object version
1414

1415
        :param source_account: (str) account to copy from
1416

1417
        :param public: (bool)
1418

1419
        :param content_type: (str)
1420

1421
        :param delimiter: (str)
1422

1423
        :returns: (dict) response headers
1424
        """
1425
        self._assert_account()
1426
        self.container = dst_container
1427
        src_path = path4url(src_container, src_object)
1428
        r = self.object_put(
1429
            dst_object or src_object,
1430
            success=201,
1431
            copy_from=src_path,
1432
            content_length=0,
1433
            source_version=source_version,
1434
            source_account=source_account,
1435
            public=public,
1436
            content_type=content_type,
1437
            delimiter=delimiter)
1438
        return r.headers
1439

    
1440
    def move_object(
1441
            self, src_container, src_object, dst_container,
1442
            dst_object=False,
1443
            source_account=None,
1444
            source_version=None,
1445
            public=False,
1446
            content_type=None,
1447
            delimiter=None):
1448
        """
1449
        :param src_container: (str) source container
1450

1451
        :param src_object: (str) source object path
1452

1453
        :param dst_container: (str) destination container
1454

1455
        :param dst_object: (str) destination object path
1456

1457
        :param source_account: (str) account to move from
1458

1459
        :param source_version: (str) source object version
1460

1461
        :param public: (bool)
1462

1463
        :param content_type: (str)
1464

1465
        :param delimiter: (str)
1466

1467
        :returns: (dict) response headers
1468
        """
1469
        self._assert_account()
1470
        self.container = dst_container
1471
        dst_object = dst_object or src_object
1472
        src_path = path4url(src_container, src_object)
1473
        r = self.object_put(
1474
            dst_object,
1475
            success=201,
1476
            move_from=src_path,
1477
            content_length=0,
1478
            source_account=source_account,
1479
            source_version=source_version,
1480
            public=public,
1481
            content_type=content_type,
1482
            delimiter=delimiter)
1483
        return r.headers
1484

    
1485
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1486
        """Get accounts that share with self.account
1487

1488
        :param limit: (str)
1489

1490
        :param marker: (str)
1491

1492
        :returns: (dict)
1493
        """
1494
        self._assert_account()
1495

    
1496
        self.set_param('format', 'json')
1497
        self.set_param('limit', limit, iff=limit is not None)
1498
        self.set_param('marker', marker, iff=marker is not None)
1499

    
1500
        path = ''
1501
        success = kwargs.pop('success', (200, 204))
1502
        r = self.get(path, *args, success=success, **kwargs)
1503
        return r.json
1504

    
1505
    def get_object_versionlist(self, obj):
1506
        """
1507
        :param obj: (str) remote object path
1508

1509
        :returns: (list)
1510
        """
1511
        self._assert_container()
1512
        r = self.object_get(obj, format='json', version='list')
1513
        return r.json['versions']