Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 4f0576d8

History | View | Annotate | Download (49.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in, readall
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
70
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71
        end, start)
72
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73
        max_value, end)
74
    if not a_range:
75
        return '%s-%s' % (start, end)
76
    selected = []
77
    for some_range in a_range.split(','):
78
        v0, sep, v1 = some_range.partition('-')
79
        if v0:
80
            v0 = int(v0)
81
            if sep:
82
                v1 = int(v1)
83
                if v1 < start or v0 > end or v1 < v0:
84
                    continue
85
                v0 = v0 if v0 > start else start
86
                v1 = v1 if v1 < end else end
87
                selected.append('%s-%s' % (v0, v1))
88
            elif v0 < start:
89
                continue
90
            else:
91
                v1 = v0 if v0 <= end else end
92
                selected.append('%s-%s' % (start, v1))
93
        else:
94
            v1 = int(v1)
95
            if max_value - v1 > end:
96
                continue
97
            v0 = (max_value - v1) if max_value - v1 > start else start
98
            selected.append('%s-%s' % (v0, end))
99
    return ','.join(selected)
100

    
101

    
102
class PithosClient(PithosRestClient):
103
    """Synnefo Pithos+ API client"""
104

    
105
    def __init__(self, base_url, token, account=None, container=None):
106
        super(PithosClient, self).__init__(base_url, token, account, container)
107

    
108
    def create_container(
109
            self,
110
            container=None, sizelimit=None, versioning=None, metadata=None,
111
            **kwargs):
112
        """
113
        :param container: (str) if not given, self.container is used instead
114

115
        :param sizelimit: (int) container total size limit in bytes
116

117
        :param versioning: (str) can be auto or whatever supported by server
118

119
        :param metadata: (dict) Custom user-defined metadata of the form
120
            { 'name1': 'value1', 'name2': 'value2', ... }
121

122
        :returns: (dict) response headers
123
        """
124
        cnt_back_up = self.container
125
        try:
126
            self.container = container or cnt_back_up
127
            r = self.container_put(
128
                quota=sizelimit, versioning=versioning, metadata=metadata,
129
                **kwargs)
130
            return r.headers
131
        finally:
132
            self.container = cnt_back_up
133

    
134
    def purge_container(self, container=None):
135
        """Delete an empty container and destroy associated blocks"""
136
        cnt_back_up = self.container
137
        try:
138
            self.container = container or cnt_back_up
139
            r = self.container_delete(until=unicode(time()))
140
        finally:
141
            self.container = cnt_back_up
142
        return r.headers
143

    
144
    def upload_object_unchunked(
145
            self, obj, f,
146
            withHashFile=False,
147
            size=None,
148
            etag=None,
149
            content_encoding=None,
150
            content_disposition=None,
151
            content_type=None,
152
            sharing=None,
153
            public=None):
154
        """
155
        :param obj: (str) remote object path
156

157
        :param f: open file descriptor
158

159
        :param withHashFile: (bool)
160

161
        :param size: (int) size of data to upload
162

163
        :param etag: (str)
164

165
        :param content_encoding: (str)
166

167
        :param content_disposition: (str)
168

169
        :param content_type: (str)
170

171
        :param sharing: {'read':[user and/or grp names],
172
            'write':[usr and/or grp names]}
173

174
        :param public: (bool)
175

176
        :returns: (dict) created object metadata
177
        """
178
        self._assert_container()
179

    
180
        if withHashFile:
181
            data = f.read()
182
            try:
183
                import json
184
                data = json.dumps(json.loads(data))
185
            except ValueError:
186
                raise ClientError('"%s" is not json-formated' % f.name, 1)
187
            except SyntaxError:
188
                msg = '"%s" is not a valid hashmap file' % f.name
189
                raise ClientError(msg, 1)
190
            f = StringIO(data)
191
        else:
192
            data = readall(f, size) if size else f.read()
193
        r = self.object_put(
194
            obj,
195
            data=data,
196
            etag=etag,
197
            content_encoding=content_encoding,
198
            content_disposition=content_disposition,
199
            content_type=content_type,
200
            permissions=sharing,
201
            public=public,
202
            success=201)
203
        return r.headers
204

    
205
    def create_object_by_manifestation(
206
            self, obj,
207
            etag=None,
208
            content_encoding=None,
209
            content_disposition=None,
210
            content_type=None,
211
            sharing=None,
212
            public=None):
213
        """
214
        :param obj: (str) remote object path
215

216
        :param etag: (str)
217

218
        :param content_encoding: (str)
219

220
        :param content_disposition: (str)
221

222
        :param content_type: (str)
223

224
        :param sharing: {'read':[user and/or grp names],
225
            'write':[usr and/or grp names]}
226

227
        :param public: (bool)
228

229
        :returns: (dict) created object metadata
230
        """
231
        self._assert_container()
232
        r = self.object_put(
233
            obj,
234
            content_length=0,
235
            etag=etag,
236
            content_encoding=content_encoding,
237
            content_disposition=content_disposition,
238
            content_type=content_type,
239
            permissions=sharing,
240
            public=public,
241
            manifest='%s/%s' % (self.container, obj))
242
        return r.headers
243

    
244
    # upload_* auxiliary methods
245
    def _put_block_async(self, data, hash):
246
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
247
        event.start()
248
        return event
249

    
250
    def _put_block(self, data, hash):
251
        r = self.container_post(
252
            update=True,
253
            content_type='application/octet-stream',
254
            content_length=len(data),
255
            data=data,
256
            format='json')
257
        assert r.json[0] == hash, 'Local hash does not match server'
258

    
259
    def _get_file_block_info(self, fileobj, size=None, cache=None):
260
        """
261
        :param fileobj: (file descriptor) source
262

263
        :param size: (int) size of data to upload from source
264

265
        :param cache: (dict) if provided, cache container info response to
266
        avoid redundant calls
267
        """
268
        if isinstance(cache, dict):
269
            try:
270
                meta = cache[self.container]
271
            except KeyError:
272
                meta = self.get_container_info()
273
                cache[self.container] = meta
274
        else:
275
            meta = self.get_container_info()
276
        blocksize = int(meta['x-container-block-size'])
277
        blockhash = meta['x-container-block-hash']
278
        size = size if size is not None else fstat(fileobj.fileno()).st_size
279
        nblocks = 1 + (size - 1) // blocksize
280
        return (blocksize, blockhash, size, nblocks)
281

    
282
    def _create_object_or_get_missing_hashes(
283
            self, obj, json,
284
            size=None,
285
            format='json',
286
            hashmap=True,
287
            content_type=None,
288
            if_etag_match=None,
289
            if_etag_not_match=None,
290
            content_encoding=None,
291
            content_disposition=None,
292
            permissions=None,
293
            public=None,
294
            success=(201, 409)):
295
        r = self.object_put(
296
            obj,
297
            format='json',
298
            hashmap=True,
299
            content_type=content_type,
300
            json=json,
301
            if_etag_match=if_etag_match,
302
            if_etag_not_match=if_etag_not_match,
303
            content_encoding=content_encoding,
304
            content_disposition=content_disposition,
305
            permissions=permissions,
306
            public=public,
307
            success=success)
308
        return (None if r.status_code == 201 else r.json), r.headers
309

    
310
    def _calculate_blocks_for_upload(
311
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312
            hash_cb=None):
313
        offset = 0
314
        if hash_cb:
315
            hash_gen = hash_cb(nblocks)
316
            hash_gen.next()
317

    
318
        for i in range(nblocks):
319
            block = readall(fileobj, min(blocksize, size - offset))
320
            bytes = len(block)
321
            hash = _pithos_hash(block, blockhash)
322
            hashes.append(hash)
323
            hmap[hash] = (offset, bytes)
324
            offset += bytes
325
            if hash_cb:
326
                hash_gen.next()
327
        msg = ('Failed to calculate uploaded blocks:'
328
               ' Offset and object size do not match')
329
        assert offset == size, msg
330

    
331
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
332
        """upload missing blocks asynchronously"""
333

    
334
        self._init_thread_limit()
335

    
336
        flying = []
337
        failures = []
338
        for hash in missing:
339
            offset, bytes = hmap[hash]
340
            fileobj.seek(offset)
341
            data = readall(fileobj, bytes)
342
            r = self._put_block_async(data, hash)
343
            flying.append(r)
344
            unfinished = self._watch_thread_limit(flying)
345
            for thread in set(flying).difference(unfinished):
346
                if thread.exception:
347
                    failures.append(thread)
348
                    if isinstance(
349
                            thread.exception,
350
                            ClientError) and thread.exception.status == 502:
351
                        self.POOLSIZE = self._thread_limit
352
                elif thread.isAlive():
353
                    flying.append(thread)
354
                elif upload_gen:
355
                    try:
356
                        upload_gen.next()
357
                    except:
358
                        pass
359
            flying = unfinished
360

    
361
        for thread in flying:
362
            thread.join()
363
            if thread.exception:
364
                failures.append(thread)
365
            elif upload_gen:
366
                try:
367
                    upload_gen.next()
368
                except:
369
                    pass
370

    
371
        return [failure.kwargs['hash'] for failure in failures]
372

    
373
    def upload_object(
374
            self, obj, f,
375
            size=None,
376
            hash_cb=None,
377
            upload_cb=None,
378
            etag=None,
379
            if_etag_match=None,
380
            if_not_exist=None,
381
            content_encoding=None,
382
            content_disposition=None,
383
            content_type=None,
384
            sharing=None,
385
            public=None,
386
            container_info_cache=None):
387
        """Upload an object using multiple connections (threads)
388

389
        :param obj: (str) remote object path
390

391
        :param f: open file descriptor (rb)
392

393
        :param hash_cb: optional progress.bar object for calculating hashes
394

395
        :param upload_cb: optional progress.bar object for uploading
396

397
        :param etag: (str)
398

399
        :param if_etag_match: (str) Push that value to if-match header at file
400
            creation
401

402
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
403
            it does not exist remotely, otherwise the operation will fail.
404
            Involves the case of an object with the same path is created while
405
            the object is being uploaded.
406

407
        :param content_encoding: (str)
408

409
        :param content_disposition: (str)
410

411
        :param content_type: (str)
412

413
        :param sharing: {'read':[user and/or grp names],
414
            'write':[usr and/or grp names]}
415

416
        :param public: (bool)
417

418
        :param container_info_cache: (dict) if given, avoid redundant calls to
419
            server for container info (block size and hash information)
420
        """
421
        self._assert_container()
422

    
423
        block_info = (
424
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
425
                f, size, container_info_cache)
426
        (hashes, hmap, offset) = ([], {}, 0)
427
        if not content_type:
428
            content_type = 'application/octet-stream'
429

    
430
        self._calculate_blocks_for_upload(
431
            *block_info,
432
            hashes=hashes,
433
            hmap=hmap,
434
            fileobj=f,
435
            hash_cb=hash_cb)
436

    
437
        hashmap = dict(bytes=size, hashes=hashes)
438
        missing, obj_headers = self._create_object_or_get_missing_hashes(
439
            obj, hashmap,
440
            content_type=content_type,
441
            size=size,
442
            if_etag_match=if_etag_match,
443
            if_etag_not_match='*' if if_not_exist else None,
444
            content_encoding=content_encoding,
445
            content_disposition=content_disposition,
446
            permissions=sharing,
447
            public=public)
448

    
449
        if missing is None:
450
            return obj_headers
451

    
452
        if upload_cb:
453
            upload_gen = upload_cb(len(missing))
454
            for i in range(len(missing), len(hashmap['hashes']) + 1):
455
                try:
456
                    upload_gen.next()
457
                except:
458
                    upload_gen = None
459
        else:
460
            upload_gen = None
461

    
462
        retries = 7
463
        try:
464
            while retries:
465
                sendlog.info('%s blocks missing' % len(missing))
466
                num_of_blocks = len(missing)
467
                missing = self._upload_missing_blocks(
468
                    missing,
469
                    hmap,
470
                    f,
471
                    upload_gen)
472
                if missing:
473
                    if num_of_blocks == len(missing):
474
                        retries -= 1
475
                    else:
476
                        num_of_blocks = len(missing)
477
                else:
478
                    break
479
            if missing:
480
                try:
481
                    details = ['%s' % thread.exception for thread in missing]
482
                except Exception:
483
                    details = ['Also, failed to read thread exceptions']
484
                raise ClientError(
485
                    '%s blocks failed to upload' % len(missing),
486
                    details=details)
487
        except KeyboardInterrupt:
488
            sendlog.info('- - - wait for threads to finish')
489
            for thread in activethreads():
490
                thread.join()
491
            raise
492

    
493
        r = self.object_put(
494
            obj,
495
            format='json',
496
            hashmap=True,
497
            content_type=content_type,
498
            content_encoding=content_encoding,
499
            if_etag_match=if_etag_match,
500
            if_etag_not_match='*' if if_not_exist else None,
501
            etag=etag,
502
            json=hashmap,
503
            permissions=sharing,
504
            public=public,
505
            success=201)
506
        return r.headers
507

    
508
    def upload_from_string(
509
            self, obj, input_str,
510
            hash_cb=None,
511
            upload_cb=None,
512
            etag=None,
513
            if_etag_match=None,
514
            if_not_exist=None,
515
            content_encoding=None,
516
            content_disposition=None,
517
            content_type=None,
518
            sharing=None,
519
            public=None,
520
            container_info_cache=None):
521
        """Upload an object using multiple connections (threads)
522

523
        :param obj: (str) remote object path
524

525
        :param input_str: (str) upload content
526

527
        :param hash_cb: optional progress.bar object for calculating hashes
528

529
        :param upload_cb: optional progress.bar object for uploading
530

531
        :param etag: (str)
532

533
        :param if_etag_match: (str) Push that value to if-match header at file
534
            creation
535

536
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
537
            it does not exist remotely, otherwise the operation will fail.
538
            Involves the case of an object with the same path is created while
539
            the object is being uploaded.
540

541
        :param content_encoding: (str)
542

543
        :param content_disposition: (str)
544

545
        :param content_type: (str)
546

547
        :param sharing: {'read':[user and/or grp names],
548
            'write':[usr and/or grp names]}
549

550
        :param public: (bool)
551

552
        :param container_info_cache: (dict) if given, avoid redundant calls to
553
            server for container info (block size and hash information)
554
        """
555
        self._assert_container()
556

    
557
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
558
                fileobj=None, size=len(input_str), cache=container_info_cache)
559
        (hashes, hmap, offset) = ([], {}, 0)
560
        if not content_type:
561
            content_type = 'application/octet-stream'
562

    
563
        hashes = []
564
        hmap = {}
565
        for blockid in range(nblocks):
566
            start = blockid * blocksize
567
            block = input_str[start: (start + blocksize)]
568
            hashes.append(_pithos_hash(block, blockhash))
569
            hmap[hashes[blockid]] = (start, block)
570

    
571
        hashmap = dict(bytes=size, hashes=hashes)
572
        missing, obj_headers = self._create_object_or_get_missing_hashes(
573
            obj, hashmap,
574
            content_type=content_type,
575
            size=size,
576
            if_etag_match=if_etag_match,
577
            if_etag_not_match='*' if if_not_exist else None,
578
            content_encoding=content_encoding,
579
            content_disposition=content_disposition,
580
            permissions=sharing,
581
            public=public)
582
        if missing is None:
583
            return obj_headers
584
        num_of_missing = len(missing)
585

    
586
        if upload_cb:
587
            self.progress_bar_gen = upload_cb(nblocks)
588
            for i in range(nblocks + 1 - num_of_missing):
589
                self._cb_next()
590

    
591
        tries = 7
592
        old_failures = 0
593
        try:
594
            while tries and missing:
595
                flying = []
596
                failures = []
597
                for hash in missing:
598
                    offset, block = hmap[hash]
599
                    bird = self._put_block_async(block, hash)
600
                    flying.append(bird)
601
                    unfinished = self._watch_thread_limit(flying)
602
                    for thread in set(flying).difference(unfinished):
603
                        if thread.exception:
604
                            failures.append(thread.kwargs['hash'])
605
                        if thread.isAlive():
606
                            flying.append(thread)
607
                        else:
608
                            self._cb_next()
609
                    flying = unfinished
610
                for thread in flying:
611
                    thread.join()
612
                    if thread.exception:
613
                        failures.append(thread.kwargs['hash'])
614
                    self._cb_next()
615
                missing = failures
616
                if missing and len(missing) == old_failures:
617
                    tries -= 1
618
                old_failures = len(missing)
619
            if missing:
620
                raise ClientError(
621
                    '%s blocks failed to upload' % len(missing),
622
                    details=['%s' % thread.exception for thread in missing])
623
        except KeyboardInterrupt:
624
            sendlog.info('- - - wait for threads to finish')
625
            for thread in activethreads():
626
                thread.join()
627
            raise
628

    
629
        r = self.object_put(
630
            obj,
631
            format='json',
632
            hashmap=True,
633
            content_type=content_type,
634
            content_encoding=content_encoding,
635
            if_etag_match=if_etag_match,
636
            if_etag_not_match='*' if if_not_exist else None,
637
            etag=etag,
638
            json=hashmap,
639
            permissions=sharing,
640
            public=public,
641
            success=201)
642
        return r.headers
643

    
644
    # download_* auxiliary methods
645
    def _get_remote_blocks_info(self, obj, **restargs):
646
        #retrieve object hashmap
647
        myrange = restargs.pop('data_range', None)
648
        hashmap = self.get_object_hashmap(obj, **restargs)
649
        restargs['data_range'] = myrange
650
        blocksize = int(hashmap['block_size'])
651
        blockhash = hashmap['block_hash']
652
        total_size = hashmap['bytes']
653
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
654
        map_dict = {}
655
        for i, h in enumerate(hashmap['hashes']):
656
            #  map_dict[h] = i   CHAGE
657
            if h in map_dict:
658
                map_dict[h].append(i)
659
            else:
660
                map_dict[h] = [i]
661
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
662

    
663
    def _dump_blocks_sync(
664
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
665
            **args):
666
        if not total_size:
667
            return
668
        for blockid, blockhash in enumerate(remote_hashes):
669
            if blockhash:
670
                start = blocksize * blockid
671
                is_last = start + blocksize > total_size
672
                end = (total_size - 1) if is_last else (start + blocksize - 1)
673
                data_range = _range_up(start, end, total_size, crange)
674
                if not data_range:
675
                    self._cb_next()
676
                    continue
677
                args['data_range'] = 'bytes=%s' % data_range
678
                r = self.object_get(obj, success=(200, 206), **args)
679
                self._cb_next()
680
                dst.write(r.content)
681
                dst.flush()
682

    
683
    def _get_block_async(self, obj, **args):
684
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
685
        event.start()
686
        return event
687

    
688
    def _hash_from_file(self, fp, start, size, blockhash):
689
        fp.seek(start)
690
        block = readall(fp, size)
691
        h = newhashlib(blockhash)
692
        h.update(block.strip('\x00'))
693
        return hexlify(h.digest())
694

    
695
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
696
        """write the results of a greenleted rest call to a file
697

698
        :param offset: the offset of the file up to blocksize
699
        - e.g. if the range is 10-100, all blocks will be written to
700
        normal_position - 10
701
        """
702
        for key, g in flying.items():
703
            if g.isAlive():
704
                continue
705
            if g.exception:
706
                raise g.exception
707
            block = g.value.content
708
            for block_start in blockids[key]:
709
                local_file.seek(block_start + offset)
710
                local_file.write(block)
711
                self._cb_next()
712
            flying.pop(key)
713
            blockids.pop(key)
714
        local_file.flush()
715

    
716
    def _dump_blocks_async(
717
            self, obj, remote_hashes, blocksize, total_size, local_file,
718
            blockhash=None, resume=False, filerange=None, **restargs):
719
        file_size = fstat(local_file.fileno()).st_size if resume else 0
720
        flying = dict()
721
        blockid_dict = dict()
722
        offset = 0
723

    
724
        self._init_thread_limit()
725
        for block_hash, blockids in remote_hashes.items():
726
            blockids = [blk * blocksize for blk in blockids]
727
            unsaved = [blk for blk in blockids if not (
728
                blk < file_size and block_hash == self._hash_from_file(
729
                        local_file, blk, blocksize, blockhash))]
730
            self._cb_next(len(blockids) - len(unsaved))
731
            if unsaved:
732
                key = unsaved[0]
733
                self._watch_thread_limit(flying.values())
734
                self._thread2file(
735
                    flying, blockid_dict, local_file, offset,
736
                    **restargs)
737
                end = total_size - 1 if (
738
                    key + blocksize > total_size) else key + blocksize - 1
739
                if end < key:
740
                    self._cb_next()
741
                    continue
742
                data_range = _range_up(key, end, total_size, filerange)
743
                if not data_range:
744
                    self._cb_next()
745
                    continue
746
                restargs[
747
                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
748
                flying[key] = self._get_block_async(obj, **restargs)
749
                blockid_dict[key] = unsaved
750

    
751
        for thread in flying.values():
752
            thread.join()
753
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
754

    
755
    def download_object(
756
            self, obj, dst,
757
            download_cb=None,
758
            version=None,
759
            resume=False,
760
            range_str=None,
761
            if_match=None,
762
            if_none_match=None,
763
            if_modified_since=None,
764
            if_unmodified_since=None):
765
        """Download an object (multiple connections, random blocks)
766

767
        :param obj: (str) remote object path
768

769
        :param dst: open file descriptor (wb+)
770

771
        :param download_cb: optional progress.bar object for downloading
772

773
        :param version: (str) file version
774

775
        :param resume: (bool) if set, preserve already downloaded file parts
776

777
        :param range_str: (str) from, to are file positions (int) in bytes
778

779
        :param if_match: (str)
780

781
        :param if_none_match: (str)
782

783
        :param if_modified_since: (str) formated date
784

785
        :param if_unmodified_since: (str) formated date"""
786
        restargs = dict(
787
            version=version,
788
            data_range=None if range_str is None else 'bytes=%s' % range_str,
789
            if_match=if_match,
790
            if_none_match=if_none_match,
791
            if_modified_since=if_modified_since,
792
            if_unmodified_since=if_unmodified_since)
793

    
794
        (
795
            blocksize,
796
            blockhash,
797
            total_size,
798
            hash_list,
799
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
800
        assert total_size >= 0
801

    
802
        if download_cb:
803
            self.progress_bar_gen = download_cb(len(hash_list))
804
            self._cb_next()
805

    
806
        if dst.isatty():
807
            self._dump_blocks_sync(
808
                obj,
809
                hash_list,
810
                blocksize,
811
                total_size,
812
                dst,
813
                range_str,
814
                **restargs)
815
        else:
816
            self._dump_blocks_async(
817
                obj,
818
                remote_hashes,
819
                blocksize,
820
                total_size,
821
                dst,
822
                blockhash,
823
                resume,
824
                range_str,
825
                **restargs)
826
            if not range_str:
827
                dst.truncate(total_size)
828

    
829
        self._complete_cb()
830

    
831
    def download_to_string(
832
            self, obj,
833
            download_cb=None,
834
            version=None,
835
            range_str=None,
836
            if_match=None,
837
            if_none_match=None,
838
            if_modified_since=None,
839
            if_unmodified_since=None):
840
        """Download an object to a string (multiple connections). This method
841
        uses threads for http requests, but stores all content in memory.
842

843
        :param obj: (str) remote object path
844

845
        :param download_cb: optional progress.bar object for downloading
846

847
        :param version: (str) file version
848

849
        :param range_str: (str) from, to are file positions (int) in bytes
850

851
        :param if_match: (str)
852

853
        :param if_none_match: (str)
854

855
        :param if_modified_since: (str) formated date
856

857
        :param if_unmodified_since: (str) formated date
858

859
        :returns: (str) the whole object contents
860
        """
861
        restargs = dict(
862
            version=version,
863
            data_range=None if range_str is None else 'bytes=%s' % range_str,
864
            if_match=if_match,
865
            if_none_match=if_none_match,
866
            if_modified_since=if_modified_since,
867
            if_unmodified_since=if_unmodified_since)
868

    
869
        (
870
            blocksize,
871
            blockhash,
872
            total_size,
873
            hash_list,
874
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
875
        assert total_size >= 0
876

    
877
        if download_cb:
878
            self.progress_bar_gen = download_cb(len(hash_list))
879
            self._cb_next()
880

    
881
        num_of_blocks = len(remote_hashes)
882
        ret = [''] * num_of_blocks
883
        self._init_thread_limit()
884
        flying = dict()
885
        try:
886
            for blockid, blockhash in enumerate(remote_hashes):
887
                start = blocksize * blockid
888
                is_last = start + blocksize > total_size
889
                end = (total_size - 1) if is_last else (start + blocksize - 1)
890
                data_range_str = _range_up(start, end, end, range_str)
891
                if data_range_str:
892
                    self._watch_thread_limit(flying.values())
893
                    restargs['data_range'] = 'bytes=%s' % data_range_str
894
                    flying[blockid] = self._get_block_async(obj, **restargs)
895
                for runid, thread in flying.items():
896
                    if (blockid + 1) == num_of_blocks:
897
                        thread.join()
898
                    elif thread.isAlive():
899
                        continue
900
                    if thread.exception:
901
                        raise thread.exception
902
                    ret[runid] = thread.value.content
903
                    self._cb_next()
904
                    flying.pop(runid)
905
            return ''.join(ret)
906
        except KeyboardInterrupt:
907
            sendlog.info('- - - wait for threads to finish')
908
            for thread in activethreads():
909
                thread.join()
910

    
911
    #Command Progress Bar method
912
    def _cb_next(self, step=1):
913
        if hasattr(self, 'progress_bar_gen'):
914
            try:
915
                for i in xrange(step):
916
                    self.progress_bar_gen.next()
917
            except:
918
                pass
919

    
920
    def _complete_cb(self):
921
        while True:
922
            try:
923
                self.progress_bar_gen.next()
924
            except:
925
                break
926

    
927
    def get_object_hashmap(
928
            self, obj,
929
            version=None,
930
            if_match=None,
931
            if_none_match=None,
932
            if_modified_since=None,
933
            if_unmodified_since=None):
934
        """
935
        :param obj: (str) remote object path
936

937
        :param if_match: (str)
938

939
        :param if_none_match: (str)
940

941
        :param if_modified_since: (str) formated date
942

943
        :param if_unmodified_since: (str) formated date
944

945
        :returns: (list)
946
        """
947
        try:
948
            r = self.object_get(
949
                obj,
950
                hashmap=True,
951
                version=version,
952
                if_etag_match=if_match,
953
                if_etag_not_match=if_none_match,
954
                if_modified_since=if_modified_since,
955
                if_unmodified_since=if_unmodified_since)
956
        except ClientError as err:
957
            if err.status == 304 or err.status == 412:
958
                return {}
959
            raise
960
        return r.json
961

    
962
    def set_account_group(self, group, usernames):
963
        """
964
        :param group: (str)
965

966
        :param usernames: (list)
967
        """
968
        r = self.account_post(update=True, groups={group: usernames})
969
        return r
970

    
971
    def del_account_group(self, group):
972
        """
973
        :param group: (str)
974
        """
975
        self.account_post(update=True, groups={group: []})
976

    
977
    def get_account_info(self, until=None):
978
        """
979
        :param until: (str) formated date
980

981
        :returns: (dict)
982
        """
983
        r = self.account_head(until=until)
984
        if r.status_code == 401:
985
            raise ClientError("No authorization", status=401)
986
        return r.headers
987

    
988
    def get_account_quota(self):
989
        """
990
        :returns: (dict)
991
        """
992
        return filter_in(
993
            self.get_account_info(),
994
            'X-Account-Policy-Quota',
995
            exactMatch=True)
996

    
997
    #def get_account_versioning(self):
998
    #    """
999
    #    :returns: (dict)
1000
    #    """
1001
    #    return filter_in(
1002
    #        self.get_account_info(),
1003
    #        'X-Account-Policy-Versioning',
1004
    #        exactMatch=True)
1005

    
1006
    def get_account_meta(self, until=None):
1007
        """
1008
        :param until: (str) formated date
1009

1010
        :returns: (dict)
1011
        """
1012
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1013

    
1014
    def get_account_group(self):
1015
        """
1016
        :returns: (dict)
1017
        """
1018
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1019

    
1020
    def set_account_meta(self, metapairs):
1021
        """
1022
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1023
        """
1024
        assert(type(metapairs) is dict)
1025
        r = self.account_post(update=True, metadata=metapairs)
1026
        return r.headers
1027

    
1028
    def del_account_meta(self, metakey):
1029
        """
1030
        :param metakey: (str) metadatum key
1031
        """
1032
        r = self.account_post(update=True, metadata={metakey: ''})
1033
        return r.headers
1034

    
1035
    #def set_account_quota(self, quota):
1036
    #    """
1037
    #    :param quota: (int)
1038
    #    """
1039
    #    self.account_post(update=True, quota=quota)
1040

    
1041
    #def set_account_versioning(self, versioning):
1042
    #    """
1043
    #    :param versioning: (str)
1044
    #    """
1045
    #    r = self.account_post(update=True, versioning=versioning)
1046
    #    return r.headers
1047

    
1048
    def list_containers(self):
1049
        """
1050
        :returns: (dict)
1051
        """
1052
        r = self.account_get()
1053
        return r.json
1054

    
1055
    def del_container(self, until=None, delimiter=None):
1056
        """
1057
        :param until: (str) formated date
1058

1059
        :param delimiter: (str) with / empty container
1060

1061
        :raises ClientError: 404 Container does not exist
1062

1063
        :raises ClientError: 409 Container is not empty
1064
        """
1065
        self._assert_container()
1066
        r = self.container_delete(
1067
            until=until,
1068
            delimiter=delimiter,
1069
            success=(204, 404, 409))
1070
        if r.status_code == 404:
1071
            raise ClientError(
1072
                'Container "%s" does not exist' % self.container,
1073
                r.status_code)
1074
        elif r.status_code == 409:
1075
            raise ClientError(
1076
                'Container "%s" is not empty' % self.container,
1077
                r.status_code)
1078
        return r.headers
1079

    
1080
    def get_container_versioning(self, container=None):
1081
        """
1082
        :param container: (str)
1083

1084
        :returns: (dict)
1085
        """
1086
        cnt_back_up = self.container
1087
        try:
1088
            self.container = container or cnt_back_up
1089
            return filter_in(
1090
                self.get_container_info(),
1091
                'X-Container-Policy-Versioning')
1092
        finally:
1093
            self.container = cnt_back_up
1094

    
1095
    def get_container_limit(self, container=None):
1096
        """
1097
        :param container: (str)
1098

1099
        :returns: (dict)
1100
        """
1101
        cnt_back_up = self.container
1102
        try:
1103
            self.container = container or cnt_back_up
1104
            return filter_in(
1105
                self.get_container_info(),
1106
                'X-Container-Policy-Quota')
1107
        finally:
1108
            self.container = cnt_back_up
1109

    
1110
    def get_container_info(self, until=None):
1111
        """
1112
        :param until: (str) formated date
1113

1114
        :returns: (dict)
1115

1116
        :raises ClientError: 404 Container not found
1117
        """
1118
        try:
1119
            r = self.container_head(until=until)
1120
        except ClientError as err:
1121
            err.details.append('for container %s' % self.container)
1122
            raise err
1123
        return r.headers
1124

    
1125
    def get_container_meta(self, until=None):
1126
        """
1127
        :param until: (str) formated date
1128

1129
        :returns: (dict)
1130
        """
1131
        return filter_in(
1132
            self.get_container_info(until=until),
1133
            'X-Container-Meta')
1134

    
1135
    def get_container_object_meta(self, until=None):
1136
        """
1137
        :param until: (str) formated date
1138

1139
        :returns: (dict)
1140
        """
1141
        return filter_in(
1142
            self.get_container_info(until=until),
1143
            'X-Container-Object-Meta')
1144

    
1145
    def set_container_meta(self, metapairs):
1146
        """
1147
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1148
        """
1149
        assert(type(metapairs) is dict)
1150
        r = self.container_post(update=True, metadata=metapairs)
1151
        return r.headers
1152

    
1153
    def del_container_meta(self, metakey):
1154
        """
1155
        :param metakey: (str) metadatum key
1156

1157
        :returns: (dict) response headers
1158
        """
1159
        r = self.container_post(update=True, metadata={metakey: ''})
1160
        return r.headers
1161

    
1162
    def set_container_limit(self, limit):
1163
        """
1164
        :param limit: (int)
1165
        """
1166
        r = self.container_post(update=True, quota=limit)
1167
        return r.headers
1168

    
1169
    def set_container_versioning(self, versioning):
1170
        """
1171
        :param versioning: (str)
1172
        """
1173
        r = self.container_post(update=True, versioning=versioning)
1174
        return r.headers
1175

    
1176
    def del_object(self, obj, until=None, delimiter=None):
1177
        """
1178
        :param obj: (str) remote object path
1179

1180
        :param until: (str) formated date
1181

1182
        :param delimiter: (str)
1183
        """
1184
        self._assert_container()
1185
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1186
        return r.headers
1187

    
1188
    def set_object_meta(self, obj, metapairs):
1189
        """
1190
        :param obj: (str) remote object path
1191

1192
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1193
        """
1194
        assert(type(metapairs) is dict)
1195
        r = self.object_post(obj, update=True, metadata=metapairs)
1196
        return r.headers
1197

    
1198
    def del_object_meta(self, obj, metakey):
1199
        """
1200
        :param obj: (str) remote object path
1201

1202
        :param metakey: (str) metadatum key
1203
        """
1204
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1205
        return r.headers
1206

    
1207
    def publish_object(self, obj):
1208
        """
1209
        :param obj: (str) remote object path
1210

1211
        :returns: (str) access url
1212
        """
1213
        self.object_post(obj, update=True, public=True)
1214
        info = self.get_object_info(obj)
1215
        return info['x-object-public']
1216
        pref, sep, rest = self.base_url.partition('//')
1217
        base = rest.split('/')[0]
1218
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1219

    
1220
    def unpublish_object(self, obj):
1221
        """
1222
        :param obj: (str) remote object path
1223
        """
1224
        r = self.object_post(obj, update=True, public=False)
1225
        return r.headers
1226

    
1227
    def get_object_info(self, obj, version=None):
1228
        """
1229
        :param obj: (str) remote object path
1230

1231
        :param version: (str)
1232

1233
        :returns: (dict)
1234
        """
1235
        try:
1236
            r = self.object_head(obj, version=version)
1237
            return r.headers
1238
        except ClientError as ce:
1239
            if ce.status == 404:
1240
                raise ClientError('Object %s not found' % obj, status=404)
1241
            raise
1242

    
1243
    def get_object_meta(self, obj, version=None):
1244
        """
1245
        :param obj: (str) remote object path
1246

1247
        :param version: (str)
1248

1249
        :returns: (dict)
1250
        """
1251
        return filter_in(
1252
            self.get_object_info(obj, version=version),
1253
            'X-Object-Meta')
1254

    
1255
    def get_object_sharing(self, obj):
1256
        """
1257
        :param obj: (str) remote object path
1258

1259
        :returns: (dict)
1260
        """
1261
        r = filter_in(
1262
            self.get_object_info(obj),
1263
            'X-Object-Sharing',
1264
            exactMatch=True)
1265
        reply = {}
1266
        if len(r) > 0:
1267
            perms = r['x-object-sharing'].split(';')
1268
            for perm in perms:
1269
                try:
1270
                    perm.index('=')
1271
                except ValueError:
1272
                    raise ClientError('Incorrect reply format')
1273
                (key, val) = perm.strip().split('=')
1274
                reply[key] = val
1275
        return reply
1276

    
1277
    def set_object_sharing(
1278
            self, obj,
1279
            read_permission=False, write_permission=False):
1280
        """Give read/write permisions to an object.
1281

1282
        :param obj: (str) remote object path
1283

1284
        :param read_permission: (list - bool) users and user groups that get
1285
            read permission for this object - False means all previous read
1286
            permissions will be removed
1287

1288
        :param write_permission: (list - bool) of users and user groups to get
1289
           write permission for this object - False means all previous write
1290
           permissions will be removed
1291

1292
        :returns: (dict) response headers
1293
        """
1294

    
1295
        perms = dict(read=read_permission or '', write=write_permission or '')
1296
        r = self.object_post(obj, update=True, permissions=perms)
1297
        return r.headers
1298

    
1299
    def del_object_sharing(self, obj):
1300
        """
1301
        :param obj: (str) remote object path
1302
        """
1303
        return self.set_object_sharing(obj)
1304

    
1305
    def append_object(self, obj, source_file, upload_cb=None):
1306
        """
1307
        :param obj: (str) remote object path
1308

1309
        :param source_file: open file descriptor
1310

1311
        :param upload_db: progress.bar for uploading
1312
        """
1313
        self._assert_container()
1314
        meta = self.get_container_info()
1315
        blocksize = int(meta['x-container-block-size'])
1316
        filesize = fstat(source_file.fileno()).st_size
1317
        nblocks = 1 + (filesize - 1) // blocksize
1318
        offset = 0
1319
        headers = {}
1320
        if upload_cb:
1321
            self.progress_bar_gen = upload_cb(nblocks)
1322
            self._cb_next()
1323
        flying = {}
1324
        self._init_thread_limit()
1325
        try:
1326
            for i in range(nblocks):
1327
                block = source_file.read(min(blocksize, filesize - offset))
1328
                offset += len(block)
1329

    
1330
                self._watch_thread_limit(flying.values())
1331
                unfinished = {}
1332
                flying[i] = SilentEvent(
1333
                    method=self.object_post,
1334
                    obj=obj,
1335
                    update=True,
1336
                    content_range='bytes */*',
1337
                    content_type='application/octet-stream',
1338
                    content_length=len(block),
1339
                    data=block)
1340
                flying[i].start()
1341

    
1342
                for key, thread in flying.items():
1343
                    if thread.isAlive():
1344
                        if i < nblocks:
1345
                            unfinished[key] = thread
1346
                            continue
1347
                        thread.join()
1348
                    if thread.exception:
1349
                        raise thread.exception
1350
                    headers[key] = thread.value.headers
1351
                    self._cb_next()
1352
                flying = unfinished
1353
        except KeyboardInterrupt:
1354
            sendlog.info('- - - wait for threads to finish')
1355
            for thread in activethreads():
1356
                thread.join()
1357
        finally:
1358
            from time import sleep
1359
            sleep(2 * len(activethreads()))
1360
        return headers.values()
1361

    
1362
    def truncate_object(self, obj, upto_bytes):
1363
        """
1364
        :param obj: (str) remote object path
1365

1366
        :param upto_bytes: max number of bytes to leave on file
1367

1368
        :returns: (dict) response headers
1369
        """
1370
        r = self.object_post(
1371
            obj,
1372
            update=True,
1373
            content_range='bytes 0-%s/*' % upto_bytes,
1374
            content_type='application/octet-stream',
1375
            object_bytes=upto_bytes,
1376
            source_object=path4url(self.container, obj))
1377
        return r.headers
1378

    
1379
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1380
        """Overwrite a part of an object from local source file
1381

1382
        :param obj: (str) remote object path
1383

1384
        :param start: (int) position in bytes to start overwriting from
1385

1386
        :param end: (int) position in bytes to stop overwriting at
1387

1388
        :param source_file: open file descriptor
1389

1390
        :param upload_db: progress.bar for uploading
1391
        """
1392

    
1393
        r = self.get_object_info(obj)
1394
        rf_size = int(r['content-length'])
1395
        if rf_size < int(start):
1396
            raise ClientError(
1397
                'Range start exceeds file size',
1398
                status=416)
1399
        elif rf_size < int(end):
1400
            raise ClientError(
1401
                'Range end exceeds file size',
1402
                status=416)
1403
        self._assert_container()
1404
        meta = self.get_container_info()
1405
        blocksize = int(meta['x-container-block-size'])
1406
        filesize = fstat(source_file.fileno()).st_size
1407
        datasize = int(end) - int(start) + 1
1408
        nblocks = 1 + (datasize - 1) // blocksize
1409
        offset = 0
1410
        if upload_cb:
1411
            self.progress_bar_gen = upload_cb(nblocks)
1412
            self._cb_next()
1413
        headers = []
1414
        for i in range(nblocks):
1415
            read_size = min(blocksize, filesize - offset, datasize - offset)
1416
            block = source_file.read(read_size)
1417
            r = self.object_post(
1418
                obj,
1419
                update=True,
1420
                content_type='application/octet-stream',
1421
                content_length=len(block),
1422
                content_range='bytes %s-%s/*' % (
1423
                    start + offset,
1424
                    start + offset + len(block) - 1),
1425
                data=block)
1426
            headers.append(dict(r.headers))
1427
            offset += len(block)
1428

    
1429
            self._cb_next
1430
        return headers
1431

    
1432
    def copy_object(
1433
            self, src_container, src_object, dst_container,
1434
            dst_object=None,
1435
            source_version=None,
1436
            source_account=None,
1437
            public=False,
1438
            content_type=None,
1439
            delimiter=None):
1440
        """
1441
        :param src_container: (str) source container
1442

1443
        :param src_object: (str) source object path
1444

1445
        :param dst_container: (str) destination container
1446

1447
        :param dst_object: (str) destination object path
1448

1449
        :param source_version: (str) source object version
1450

1451
        :param source_account: (str) account to copy from
1452

1453
        :param public: (bool)
1454

1455
        :param content_type: (str)
1456

1457
        :param delimiter: (str)
1458

1459
        :returns: (dict) response headers
1460
        """
1461
        self._assert_account()
1462
        self.container = dst_container
1463
        src_path = path4url(src_container, src_object)
1464
        r = self.object_put(
1465
            dst_object or src_object,
1466
            success=201,
1467
            copy_from=src_path,
1468
            content_length=0,
1469
            source_version=source_version,
1470
            source_account=source_account,
1471
            public=public,
1472
            content_type=content_type,
1473
            delimiter=delimiter)
1474
        return r.headers
1475

    
1476
    def move_object(
1477
            self, src_container, src_object, dst_container,
1478
            dst_object=False,
1479
            source_account=None,
1480
            source_version=None,
1481
            public=False,
1482
            content_type=None,
1483
            delimiter=None):
1484
        """
1485
        :param src_container: (str) source container
1486

1487
        :param src_object: (str) source object path
1488

1489
        :param dst_container: (str) destination container
1490

1491
        :param dst_object: (str) destination object path
1492

1493
        :param source_account: (str) account to move from
1494

1495
        :param source_version: (str) source object version
1496

1497
        :param public: (bool)
1498

1499
        :param content_type: (str)
1500

1501
        :param delimiter: (str)
1502

1503
        :returns: (dict) response headers
1504
        """
1505
        self._assert_account()
1506
        self.container = dst_container
1507
        dst_object = dst_object or src_object
1508
        src_path = path4url(src_container, src_object)
1509
        r = self.object_put(
1510
            dst_object,
1511
            success=201,
1512
            move_from=src_path,
1513
            content_length=0,
1514
            source_account=source_account,
1515
            source_version=source_version,
1516
            public=public,
1517
            content_type=content_type,
1518
            delimiter=delimiter)
1519
        return r.headers
1520

    
1521
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1522
        """Get accounts that share with self.account
1523

1524
        :param limit: (str)
1525

1526
        :param marker: (str)
1527

1528
        :returns: (dict)
1529
        """
1530
        self._assert_account()
1531

    
1532
        self.set_param('format', 'json')
1533
        self.set_param('limit', limit, iff=limit is not None)
1534
        self.set_param('marker', marker, iff=marker is not None)
1535

    
1536
        path = ''
1537
        success = kwargs.pop('success', (200, 204))
1538
        r = self.get(path, *args, success=success, **kwargs)
1539
        return r.json
1540

    
1541
    def get_object_versionlist(self, obj):
1542
        """
1543
        :param obj: (str) remote object path
1544

1545
        :returns: (list)
1546
        """
1547
        self._assert_container()
1548
        r = self.object_get(obj, format='json', version='list')
1549
        return r.json['versions']