Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 38a79780

History | View | Annotate | Download (49.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in, readall
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
70
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71
        end, start)
72
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73
        max_value, end)
74
    if not a_range:
75
        return '%s-%s' % (start, end)
76
    selected = []
77
    for some_range in a_range.split(','):
78
        v0, sep, v1 = some_range.partition('-')
79
        if v0:
80
            v0 = int(v0)
81
            if sep:
82
                v1 = int(v1)
83
                if v1 < start or v0 > end or v1 < v0:
84
                    continue
85
                v0 = v0 if v0 > start else start
86
                v1 = v1 if v1 < end else end
87
                selected.append('%s-%s' % (v0, v1))
88
            elif v0 < start:
89
                continue
90
            else:
91
                v1 = v0 if v0 <= end else end
92
                selected.append('%s-%s' % (start, v1))
93
        else:
94
            v1 = int(v1)
95
            if max_value - v1 > end:
96
                continue
97
            v0 = (max_value - v1) if max_value - v1 > start else start
98
            selected.append('%s-%s' % (v0, end))
99
    return ','.join(selected)
100

    
101

    
102
class PithosClient(PithosRestClient):
103
    """Synnefo Pithos+ API client"""
104

    
105
    def __init__(self, base_url, token, account=None, container=None):
106
        super(PithosClient, self).__init__(base_url, token, account, container)
107

    
108
    def create_container(
109
            self,
110
            container=None, sizelimit=None, versioning=None, metadata=None,
111
            **kwargs):
112
        """
113
        :param container: (str) if not given, self.container is used instead
114

115
        :param sizelimit: (int) container total size limit in bytes
116

117
        :param versioning: (str) can be auto or whatever supported by server
118

119
        :param metadata: (dict) Custom user-defined metadata of the form
120
            { 'name1': 'value1', 'name2': 'value2', ... }
121

122
        :returns: (dict) response headers
123
        """
124
        cnt_back_up = self.container
125
        try:
126
            self.container = container or cnt_back_up
127
            r = self.container_put(
128
                quota=sizelimit, versioning=versioning, metadata=metadata,
129
                **kwargs)
130
            return r.headers
131
        finally:
132
            self.container = cnt_back_up
133

    
134
    def purge_container(self, container=None):
135
        """Delete an empty container and destroy associated blocks"""
136
        cnt_back_up = self.container
137
        try:
138
            self.container = container or cnt_back_up
139
            r = self.container_delete(until=unicode(time()))
140
        finally:
141
            self.container = cnt_back_up
142
        return r.headers
143

    
144
    def upload_object_unchunked(
145
            self, obj, f,
146
            withHashFile=False,
147
            size=None,
148
            etag=None,
149
            content_encoding=None,
150
            content_disposition=None,
151
            content_type=None,
152
            sharing=None,
153
            public=None):
154
        """
155
        :param obj: (str) remote object path
156

157
        :param f: open file descriptor
158

159
        :param withHashFile: (bool)
160

161
        :param size: (int) size of data to upload
162

163
        :param etag: (str)
164

165
        :param content_encoding: (str)
166

167
        :param content_disposition: (str)
168

169
        :param content_type: (str)
170

171
        :param sharing: {'read':[user and/or grp names],
172
            'write':[usr and/or grp names]}
173

174
        :param public: (bool)
175

176
        :returns: (dict) created object metadata
177
        """
178
        self._assert_container()
179

    
180
        if withHashFile:
181
            data = f.read()
182
            try:
183
                import json
184
                data = json.dumps(json.loads(data))
185
            except ValueError:
186
                raise ClientError('"%s" is not json-formated' % f.name, 1)
187
            except SyntaxError:
188
                msg = '"%s" is not a valid hashmap file' % f.name
189
                raise ClientError(msg, 1)
190
            f = StringIO(data)
191
        else:
192
            data = readall(f, size) if size else f.read()
193
        r = self.object_put(
194
            obj,
195
            data=data,
196
            etag=etag,
197
            content_encoding=content_encoding,
198
            content_disposition=content_disposition,
199
            content_type=content_type,
200
            permissions=sharing,
201
            public=public,
202
            success=201)
203
        return r.headers
204

    
205
    def create_object_by_manifestation(
206
            self, obj,
207
            etag=None,
208
            content_encoding=None,
209
            content_disposition=None,
210
            content_type=None,
211
            sharing=None,
212
            public=None):
213
        """
214
        :param obj: (str) remote object path
215

216
        :param etag: (str)
217

218
        :param content_encoding: (str)
219

220
        :param content_disposition: (str)
221

222
        :param content_type: (str)
223

224
        :param sharing: {'read':[user and/or grp names],
225
            'write':[usr and/or grp names]}
226

227
        :param public: (bool)
228

229
        :returns: (dict) created object metadata
230
        """
231
        self._assert_container()
232
        r = self.object_put(
233
            obj,
234
            content_length=0,
235
            etag=etag,
236
            content_encoding=content_encoding,
237
            content_disposition=content_disposition,
238
            content_type=content_type,
239
            permissions=sharing,
240
            public=public,
241
            manifest='%s/%s' % (self.container, obj))
242
        return r.headers
243

    
244
    # upload_* auxiliary methods
245
    def _put_block_async(self, data, hash):
246
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
247
        event.start()
248
        return event
249

    
250
    def _put_block(self, data, hash):
251
        r = self.container_post(
252
            update=True,
253
            content_type='application/octet-stream',
254
            content_length=len(data),
255
            data=data,
256
            format='json')
257
        assert r.json[0] == hash, 'Local hash does not match server'
258

    
259
    def _get_file_block_info(self, fileobj, size=None, cache=None):
260
        """
261
        :param fileobj: (file descriptor) source
262

263
        :param size: (int) size of data to upload from source
264

265
        :param cache: (dict) if provided, cache container info response to
266
        avoid redundant calls
267
        """
268
        if isinstance(cache, dict):
269
            try:
270
                meta = cache[self.container]
271
            except KeyError:
272
                meta = self.get_container_info()
273
                cache[self.container] = meta
274
        else:
275
            meta = self.get_container_info()
276
        blocksize = int(meta['x-container-block-size'])
277
        blockhash = meta['x-container-block-hash']
278
        size = size if size is not None else fstat(fileobj.fileno()).st_size
279
        nblocks = 1 + (size - 1) // blocksize
280
        return (blocksize, blockhash, size, nblocks)
281

    
282
    def _create_object_or_get_missing_hashes(
283
            self, obj, json,
284
            size=None,
285
            format='json',
286
            hashmap=True,
287
            content_type=None,
288
            if_etag_match=None,
289
            if_etag_not_match=None,
290
            content_encoding=None,
291
            content_disposition=None,
292
            permissions=None,
293
            public=None,
294
            success=(201, 409)):
295
        r = self.object_put(
296
            obj,
297
            format='json',
298
            hashmap=True,
299
            content_type=content_type,
300
            json=json,
301
            if_etag_match=if_etag_match,
302
            if_etag_not_match=if_etag_not_match,
303
            content_encoding=content_encoding,
304
            content_disposition=content_disposition,
305
            permissions=permissions,
306
            public=public,
307
            success=success)
308
        return (None if r.status_code == 201 else r.json), r.headers
309

    
310
    def _calculate_blocks_for_upload(
311
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312
            hash_cb=None):
313
        offset = 0
314
        if hash_cb:
315
            hash_gen = hash_cb(nblocks)
316
            hash_gen.next()
317

    
318
        for i in xrange(nblocks):
319
            block = readall(fileobj, min(blocksize, size - offset))
320
            bytes = len(block)
321
            if bytes <= 0:
322
                break
323
            hash = _pithos_hash(block, blockhash)
324
            hashes.append(hash)
325
            hmap[hash] = (offset, bytes)
326
            offset += bytes
327
            if hash_cb:
328
                hash_gen.next()
329
        msg = ('Failed to calculate uploading blocks: '
330
               'read bytes(%s) != requested size (%s)' % (offset, size))
331
        assert offset == size, msg
332

    
333
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334
        """upload missing blocks asynchronously"""
335

    
336
        self._init_thread_limit()
337

    
338
        flying = []
339
        failures = []
340
        for hash in missing:
341
            offset, bytes = hmap[hash]
342
            fileobj.seek(offset)
343
            data = readall(fileobj, bytes)
344
            r = self._put_block_async(data, hash)
345
            flying.append(r)
346
            unfinished = self._watch_thread_limit(flying)
347
            for thread in set(flying).difference(unfinished):
348
                if thread.exception:
349
                    failures.append(thread)
350
                    if isinstance(
351
                            thread.exception,
352
                            ClientError) and thread.exception.status == 502:
353
                        self.POOLSIZE = self._thread_limit
354
                elif thread.isAlive():
355
                    flying.append(thread)
356
                elif upload_gen:
357
                    try:
358
                        upload_gen.next()
359
                    except:
360
                        pass
361
            flying = unfinished
362

    
363
        for thread in flying:
364
            thread.join()
365
            if thread.exception:
366
                failures.append(thread)
367
            elif upload_gen:
368
                try:
369
                    upload_gen.next()
370
                except:
371
                    pass
372

    
373
        return [failure.kwargs['hash'] for failure in failures]
374

    
375
    def upload_object(
376
            self, obj, f,
377
            size=None,
378
            hash_cb=None,
379
            upload_cb=None,
380
            etag=None,
381
            if_etag_match=None,
382
            if_not_exist=None,
383
            content_encoding=None,
384
            content_disposition=None,
385
            content_type=None,
386
            sharing=None,
387
            public=None,
388
            container_info_cache=None):
389
        """Upload an object using multiple connections (threads)
390

391
        :param obj: (str) remote object path
392

393
        :param f: open file descriptor (rb)
394

395
        :param hash_cb: optional progress.bar object for calculating hashes
396

397
        :param upload_cb: optional progress.bar object for uploading
398

399
        :param etag: (str)
400

401
        :param if_etag_match: (str) Push that value to if-match header at file
402
            creation
403

404
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405
            it does not exist remotely, otherwise the operation will fail.
406
            Involves the case of an object with the same path is created while
407
            the object is being uploaded.
408

409
        :param content_encoding: (str)
410

411
        :param content_disposition: (str)
412

413
        :param content_type: (str)
414

415
        :param sharing: {'read':[user and/or grp names],
416
            'write':[usr and/or grp names]}
417

418
        :param public: (bool)
419

420
        :param container_info_cache: (dict) if given, avoid redundant calls to
421
            server for container info (block size and hash information)
422
        """
423
        self._assert_container()
424

    
425
        block_info = (
426
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427
                f, size, container_info_cache)
428
        (hashes, hmap, offset) = ([], {}, 0)
429
        if not content_type:
430
            content_type = 'application/octet-stream'
431

    
432
        self._calculate_blocks_for_upload(
433
            *block_info,
434
            hashes=hashes,
435
            hmap=hmap,
436
            fileobj=f,
437
            hash_cb=hash_cb)
438

    
439
        hashmap = dict(bytes=size, hashes=hashes)
440
        missing, obj_headers = self._create_object_or_get_missing_hashes(
441
            obj, hashmap,
442
            content_type=content_type,
443
            size=size,
444
            if_etag_match=if_etag_match,
445
            if_etag_not_match='*' if if_not_exist else None,
446
            content_encoding=content_encoding,
447
            content_disposition=content_disposition,
448
            permissions=sharing,
449
            public=public)
450

    
451
        if missing is None:
452
            return obj_headers
453

    
454
        if upload_cb:
455
            upload_gen = upload_cb(len(missing))
456
            for i in range(len(missing), len(hashmap['hashes']) + 1):
457
                try:
458
                    upload_gen.next()
459
                except:
460
                    upload_gen = None
461
        else:
462
            upload_gen = None
463

    
464
        retries = 7
465
        try:
466
            while retries:
467
                sendlog.info('%s blocks missing' % len(missing))
468
                num_of_blocks = len(missing)
469
                missing = self._upload_missing_blocks(
470
                    missing,
471
                    hmap,
472
                    f,
473
                    upload_gen)
474
                if missing:
475
                    if num_of_blocks == len(missing):
476
                        retries -= 1
477
                    else:
478
                        num_of_blocks = len(missing)
479
                else:
480
                    break
481
            if missing:
482
                try:
483
                    details = ['%s' % thread.exception for thread in missing]
484
                except Exception:
485
                    details = ['Also, failed to read thread exceptions']
486
                raise ClientError(
487
                    '%s blocks failed to upload' % len(missing),
488
                    details=details)
489
        except KeyboardInterrupt:
490
            sendlog.info('- - - wait for threads to finish')
491
            for thread in activethreads():
492
                thread.join()
493
            raise
494

    
495
        r = self.object_put(
496
            obj,
497
            format='json',
498
            hashmap=True,
499
            content_type=content_type,
500
            content_encoding=content_encoding,
501
            if_etag_match=if_etag_match,
502
            if_etag_not_match='*' if if_not_exist else None,
503
            etag=etag,
504
            json=hashmap,
505
            permissions=sharing,
506
            public=public,
507
            success=201)
508
        return r.headers
509

    
510
    def upload_from_string(
511
            self, obj, input_str,
512
            hash_cb=None,
513
            upload_cb=None,
514
            etag=None,
515
            if_etag_match=None,
516
            if_not_exist=None,
517
            content_encoding=None,
518
            content_disposition=None,
519
            content_type=None,
520
            sharing=None,
521
            public=None,
522
            container_info_cache=None):
523
        """Upload an object using multiple connections (threads)
524

525
        :param obj: (str) remote object path
526

527
        :param input_str: (str) upload content
528

529
        :param hash_cb: optional progress.bar object for calculating hashes
530

531
        :param upload_cb: optional progress.bar object for uploading
532

533
        :param etag: (str)
534

535
        :param if_etag_match: (str) Push that value to if-match header at file
536
            creation
537

538
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
539
            it does not exist remotely, otherwise the operation will fail.
540
            Involves the case of an object with the same path is created while
541
            the object is being uploaded.
542

543
        :param content_encoding: (str)
544

545
        :param content_disposition: (str)
546

547
        :param content_type: (str)
548

549
        :param sharing: {'read':[user and/or grp names],
550
            'write':[usr and/or grp names]}
551

552
        :param public: (bool)
553

554
        :param container_info_cache: (dict) if given, avoid redundant calls to
555
            server for container info (block size and hash information)
556
        """
557
        self._assert_container()
558

    
559
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
560
                fileobj=None, size=len(input_str), cache=container_info_cache)
561
        (hashes, hmap, offset) = ([], {}, 0)
562
        if not content_type:
563
            content_type = 'application/octet-stream'
564

    
565
        hashes = []
566
        hmap = {}
567
        for blockid in range(nblocks):
568
            start = blockid * blocksize
569
            block = input_str[start: (start + blocksize)]
570
            hashes.append(_pithos_hash(block, blockhash))
571
            hmap[hashes[blockid]] = (start, block)
572

    
573
        hashmap = dict(bytes=size, hashes=hashes)
574
        missing, obj_headers = self._create_object_or_get_missing_hashes(
575
            obj, hashmap,
576
            content_type=content_type,
577
            size=size,
578
            if_etag_match=if_etag_match,
579
            if_etag_not_match='*' if if_not_exist else None,
580
            content_encoding=content_encoding,
581
            content_disposition=content_disposition,
582
            permissions=sharing,
583
            public=public)
584
        if missing is None:
585
            return obj_headers
586
        num_of_missing = len(missing)
587

    
588
        if upload_cb:
589
            self.progress_bar_gen = upload_cb(nblocks)
590
            for i in range(nblocks + 1 - num_of_missing):
591
                self._cb_next()
592

    
593
        tries = 7
594
        old_failures = 0
595
        try:
596
            while tries and missing:
597
                flying = []
598
                failures = []
599
                for hash in missing:
600
                    offset, block = hmap[hash]
601
                    bird = self._put_block_async(block, hash)
602
                    flying.append(bird)
603
                    unfinished = self._watch_thread_limit(flying)
604
                    for thread in set(flying).difference(unfinished):
605
                        if thread.exception:
606
                            failures.append(thread.kwargs['hash'])
607
                        if thread.isAlive():
608
                            flying.append(thread)
609
                        else:
610
                            self._cb_next()
611
                    flying = unfinished
612
                for thread in flying:
613
                    thread.join()
614
                    if thread.exception:
615
                        failures.append(thread.kwargs['hash'])
616
                    self._cb_next()
617
                missing = failures
618
                if missing and len(missing) == old_failures:
619
                    tries -= 1
620
                old_failures = len(missing)
621
            if missing:
622
                raise ClientError(
623
                    '%s blocks failed to upload' % len(missing),
624
                    details=['%s' % thread.exception for thread in missing])
625
        except KeyboardInterrupt:
626
            sendlog.info('- - - wait for threads to finish')
627
            for thread in activethreads():
628
                thread.join()
629
            raise
630

    
631
        r = self.object_put(
632
            obj,
633
            format='json',
634
            hashmap=True,
635
            content_type=content_type,
636
            content_encoding=content_encoding,
637
            if_etag_match=if_etag_match,
638
            if_etag_not_match='*' if if_not_exist else None,
639
            etag=etag,
640
            json=hashmap,
641
            permissions=sharing,
642
            public=public,
643
            success=201)
644
        return r.headers
645

    
646
    # download_* auxiliary methods
647
    def _get_remote_blocks_info(self, obj, **restargs):
648
        #retrieve object hashmap
649
        myrange = restargs.pop('data_range', None)
650
        hashmap = self.get_object_hashmap(obj, **restargs)
651
        restargs['data_range'] = myrange
652
        blocksize = int(hashmap['block_size'])
653
        blockhash = hashmap['block_hash']
654
        total_size = hashmap['bytes']
655
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
656
        map_dict = {}
657
        for i, h in enumerate(hashmap['hashes']):
658
            #  map_dict[h] = i   CHAGE
659
            if h in map_dict:
660
                map_dict[h].append(i)
661
            else:
662
                map_dict[h] = [i]
663
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
664

    
665
    def _dump_blocks_sync(
666
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
667
            **args):
668
        if not total_size:
669
            return
670
        for blockid, blockhash in enumerate(remote_hashes):
671
            if blockhash:
672
                start = blocksize * blockid
673
                is_last = start + blocksize > total_size
674
                end = (total_size - 1) if is_last else (start + blocksize - 1)
675
                data_range = _range_up(start, end, total_size, crange)
676
                if not data_range:
677
                    self._cb_next()
678
                    continue
679
                args['data_range'] = 'bytes=%s' % data_range
680
                r = self.object_get(obj, success=(200, 206), **args)
681
                self._cb_next()
682
                dst.write(r.content)
683
                dst.flush()
684

    
685
    def _get_block_async(self, obj, **args):
686
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
687
        event.start()
688
        return event
689

    
690
    def _hash_from_file(self, fp, start, size, blockhash):
691
        fp.seek(start)
692
        block = readall(fp, size)
693
        h = newhashlib(blockhash)
694
        h.update(block.strip('\x00'))
695
        return hexlify(h.digest())
696

    
697
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
698
        """write the results of a greenleted rest call to a file
699

700
        :param offset: the offset of the file up to blocksize
701
        - e.g. if the range is 10-100, all blocks will be written to
702
        normal_position - 10
703
        """
704
        for key, g in flying.items():
705
            if g.isAlive():
706
                continue
707
            if g.exception:
708
                raise g.exception
709
            block = g.value.content
710
            for block_start in blockids[key]:
711
                local_file.seek(block_start + offset)
712
                local_file.write(block)
713
                self._cb_next()
714
            flying.pop(key)
715
            blockids.pop(key)
716
        local_file.flush()
717

    
718
    def _dump_blocks_async(
719
            self, obj, remote_hashes, blocksize, total_size, local_file,
720
            blockhash=None, resume=False, filerange=None, **restargs):
721
        file_size = fstat(local_file.fileno()).st_size if resume else 0
722
        flying = dict()
723
        blockid_dict = dict()
724
        offset = 0
725

    
726
        self._init_thread_limit()
727
        for block_hash, blockids in remote_hashes.items():
728
            blockids = [blk * blocksize for blk in blockids]
729
            unsaved = [blk for blk in blockids if not (
730
                blk < file_size and block_hash == self._hash_from_file(
731
                        local_file, blk, blocksize, blockhash))]
732
            self._cb_next(len(blockids) - len(unsaved))
733
            if unsaved:
734
                key = unsaved[0]
735
                self._watch_thread_limit(flying.values())
736
                self._thread2file(
737
                    flying, blockid_dict, local_file, offset,
738
                    **restargs)
739
                end = total_size - 1 if (
740
                    key + blocksize > total_size) else key + blocksize - 1
741
                if end < key:
742
                    self._cb_next()
743
                    continue
744
                data_range = _range_up(key, end, total_size, filerange)
745
                if not data_range:
746
                    self._cb_next()
747
                    continue
748
                restargs[
749
                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
750
                flying[key] = self._get_block_async(obj, **restargs)
751
                blockid_dict[key] = unsaved
752

    
753
        for thread in flying.values():
754
            thread.join()
755
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
756

    
757
    def download_object(
758
            self, obj, dst,
759
            download_cb=None,
760
            version=None,
761
            resume=False,
762
            range_str=None,
763
            if_match=None,
764
            if_none_match=None,
765
            if_modified_since=None,
766
            if_unmodified_since=None):
767
        """Download an object (multiple connections, random blocks)
768

769
        :param obj: (str) remote object path
770

771
        :param dst: open file descriptor (wb+)
772

773
        :param download_cb: optional progress.bar object for downloading
774

775
        :param version: (str) file version
776

777
        :param resume: (bool) if set, preserve already downloaded file parts
778

779
        :param range_str: (str) from, to are file positions (int) in bytes
780

781
        :param if_match: (str)
782

783
        :param if_none_match: (str)
784

785
        :param if_modified_since: (str) formated date
786

787
        :param if_unmodified_since: (str) formated date"""
788
        restargs = dict(
789
            version=version,
790
            data_range=None if range_str is None else 'bytes=%s' % range_str,
791
            if_match=if_match,
792
            if_none_match=if_none_match,
793
            if_modified_since=if_modified_since,
794
            if_unmodified_since=if_unmodified_since)
795

    
796
        (
797
            blocksize,
798
            blockhash,
799
            total_size,
800
            hash_list,
801
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
802
        assert total_size >= 0
803

    
804
        if download_cb:
805
            self.progress_bar_gen = download_cb(len(hash_list))
806
            self._cb_next()
807

    
808
        if dst.isatty():
809
            self._dump_blocks_sync(
810
                obj,
811
                hash_list,
812
                blocksize,
813
                total_size,
814
                dst,
815
                range_str,
816
                **restargs)
817
        else:
818
            self._dump_blocks_async(
819
                obj,
820
                remote_hashes,
821
                blocksize,
822
                total_size,
823
                dst,
824
                blockhash,
825
                resume,
826
                range_str,
827
                **restargs)
828
            if not range_str:
829
                dst.truncate(total_size)
830

    
831
        self._complete_cb()
832

    
833
    def download_to_string(
834
            self, obj,
835
            download_cb=None,
836
            version=None,
837
            range_str=None,
838
            if_match=None,
839
            if_none_match=None,
840
            if_modified_since=None,
841
            if_unmodified_since=None):
842
        """Download an object to a string (multiple connections). This method
843
        uses threads for http requests, but stores all content in memory.
844

845
        :param obj: (str) remote object path
846

847
        :param download_cb: optional progress.bar object for downloading
848

849
        :param version: (str) file version
850

851
        :param range_str: (str) from, to are file positions (int) in bytes
852

853
        :param if_match: (str)
854

855
        :param if_none_match: (str)
856

857
        :param if_modified_since: (str) formated date
858

859
        :param if_unmodified_since: (str) formated date
860

861
        :returns: (str) the whole object contents
862
        """
863
        restargs = dict(
864
            version=version,
865
            data_range=None if range_str is None else 'bytes=%s' % range_str,
866
            if_match=if_match,
867
            if_none_match=if_none_match,
868
            if_modified_since=if_modified_since,
869
            if_unmodified_since=if_unmodified_since)
870

    
871
        (
872
            blocksize,
873
            blockhash,
874
            total_size,
875
            hash_list,
876
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
877
        assert total_size >= 0
878

    
879
        if download_cb:
880
            self.progress_bar_gen = download_cb(len(hash_list))
881
            self._cb_next()
882

    
883
        num_of_blocks = len(remote_hashes)
884
        ret = [''] * num_of_blocks
885
        self._init_thread_limit()
886
        flying = dict()
887
        try:
888
            for blockid, blockhash in enumerate(remote_hashes):
889
                start = blocksize * blockid
890
                is_last = start + blocksize > total_size
891
                end = (total_size - 1) if is_last else (start + blocksize - 1)
892
                data_range_str = _range_up(start, end, end, range_str)
893
                if data_range_str:
894
                    self._watch_thread_limit(flying.values())
895
                    restargs['data_range'] = 'bytes=%s' % data_range_str
896
                    flying[blockid] = self._get_block_async(obj, **restargs)
897
                for runid, thread in flying.items():
898
                    if (blockid + 1) == num_of_blocks:
899
                        thread.join()
900
                    elif thread.isAlive():
901
                        continue
902
                    if thread.exception:
903
                        raise thread.exception
904
                    ret[runid] = thread.value.content
905
                    self._cb_next()
906
                    flying.pop(runid)
907
            return ''.join(ret)
908
        except KeyboardInterrupt:
909
            sendlog.info('- - - wait for threads to finish')
910
            for thread in activethreads():
911
                thread.join()
912

    
913
    #Command Progress Bar method
914
    def _cb_next(self, step=1):
915
        if hasattr(self, 'progress_bar_gen'):
916
            try:
917
                for i in xrange(step):
918
                    self.progress_bar_gen.next()
919
            except:
920
                pass
921

    
922
    def _complete_cb(self):
923
        while True:
924
            try:
925
                self.progress_bar_gen.next()
926
            except:
927
                break
928

    
929
    def get_object_hashmap(
930
            self, obj,
931
            version=None,
932
            if_match=None,
933
            if_none_match=None,
934
            if_modified_since=None,
935
            if_unmodified_since=None):
936
        """
937
        :param obj: (str) remote object path
938

939
        :param if_match: (str)
940

941
        :param if_none_match: (str)
942

943
        :param if_modified_since: (str) formated date
944

945
        :param if_unmodified_since: (str) formated date
946

947
        :returns: (list)
948
        """
949
        try:
950
            r = self.object_get(
951
                obj,
952
                hashmap=True,
953
                version=version,
954
                if_etag_match=if_match,
955
                if_etag_not_match=if_none_match,
956
                if_modified_since=if_modified_since,
957
                if_unmodified_since=if_unmodified_since)
958
        except ClientError as err:
959
            if err.status == 304 or err.status == 412:
960
                return {}
961
            raise
962
        return r.json
963

    
964
    def set_account_group(self, group, usernames):
965
        """
966
        :param group: (str)
967

968
        :param usernames: (list)
969
        """
970
        r = self.account_post(update=True, groups={group: usernames})
971
        return r
972

    
973
    def del_account_group(self, group):
974
        """
975
        :param group: (str)
976
        """
977
        self.account_post(update=True, groups={group: []})
978

    
979
    def get_account_info(self, until=None):
980
        """
981
        :param until: (str) formated date
982

983
        :returns: (dict)
984
        """
985
        r = self.account_head(until=until)
986
        if r.status_code == 401:
987
            raise ClientError("No authorization", status=401)
988
        return r.headers
989

    
990
    def get_account_quota(self):
991
        """
992
        :returns: (dict)
993
        """
994
        return filter_in(
995
            self.get_account_info(),
996
            'X-Account-Policy-Quota',
997
            exactMatch=True)
998

    
999
    #def get_account_versioning(self):
1000
    #    """
1001
    #    :returns: (dict)
1002
    #    """
1003
    #    return filter_in(
1004
    #        self.get_account_info(),
1005
    #        'X-Account-Policy-Versioning',
1006
    #        exactMatch=True)
1007

    
1008
    def get_account_meta(self, until=None):
1009
        """
1010
        :param until: (str) formated date
1011

1012
        :returns: (dict)
1013
        """
1014
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1015

    
1016
    def get_account_group(self):
1017
        """
1018
        :returns: (dict)
1019
        """
1020
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1021

    
1022
    def set_account_meta(self, metapairs):
1023
        """
1024
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1025
        """
1026
        assert(type(metapairs) is dict)
1027
        r = self.account_post(update=True, metadata=metapairs)
1028
        return r.headers
1029

    
1030
    def del_account_meta(self, metakey):
1031
        """
1032
        :param metakey: (str) metadatum key
1033
        """
1034
        r = self.account_post(update=True, metadata={metakey: ''})
1035
        return r.headers
1036

    
1037
    #def set_account_quota(self, quota):
1038
    #    """
1039
    #    :param quota: (int)
1040
    #    """
1041
    #    self.account_post(update=True, quota=quota)
1042

    
1043
    #def set_account_versioning(self, versioning):
1044
    #    """
1045
    #    :param versioning: (str)
1046
    #    """
1047
    #    r = self.account_post(update=True, versioning=versioning)
1048
    #    return r.headers
1049

    
1050
    def list_containers(self):
1051
        """
1052
        :returns: (dict)
1053
        """
1054
        r = self.account_get()
1055
        return r.json
1056

    
1057
    def del_container(self, until=None, delimiter=None):
1058
        """
1059
        :param until: (str) formated date
1060

1061
        :param delimiter: (str) with / empty container
1062

1063
        :raises ClientError: 404 Container does not exist
1064

1065
        :raises ClientError: 409 Container is not empty
1066
        """
1067
        self._assert_container()
1068
        r = self.container_delete(
1069
            until=until,
1070
            delimiter=delimiter,
1071
            success=(204, 404, 409))
1072
        if r.status_code == 404:
1073
            raise ClientError(
1074
                'Container "%s" does not exist' % self.container,
1075
                r.status_code)
1076
        elif r.status_code == 409:
1077
            raise ClientError(
1078
                'Container "%s" is not empty' % self.container,
1079
                r.status_code)
1080
        return r.headers
1081

    
1082
    def get_container_versioning(self, container=None):
1083
        """
1084
        :param container: (str)
1085

1086
        :returns: (dict)
1087
        """
1088
        cnt_back_up = self.container
1089
        try:
1090
            self.container = container or cnt_back_up
1091
            return filter_in(
1092
                self.get_container_info(),
1093
                'X-Container-Policy-Versioning')
1094
        finally:
1095
            self.container = cnt_back_up
1096

    
1097
    def get_container_limit(self, container=None):
1098
        """
1099
        :param container: (str)
1100

1101
        :returns: (dict)
1102
        """
1103
        cnt_back_up = self.container
1104
        try:
1105
            self.container = container or cnt_back_up
1106
            return filter_in(
1107
                self.get_container_info(),
1108
                'X-Container-Policy-Quota')
1109
        finally:
1110
            self.container = cnt_back_up
1111

    
1112
    def get_container_info(self, container=None, until=None):
1113
        """
1114
        :param until: (str) formated date
1115

1116
        :returns: (dict)
1117

1118
        :raises ClientError: 404 Container not found
1119
        """
1120
        bck_cont = self.container
1121
        try:
1122
            self.container = container or bck_cont
1123
            self._assert_container()
1124
            r = self.container_head(until=until)
1125
        except ClientError as err:
1126
            err.details.append('for container %s' % self.container)
1127
            raise err
1128
        finally:
1129
            self.container = bck_cont
1130
        return r.headers
1131

    
1132
    def get_container_meta(self, until=None):
1133
        """
1134
        :param until: (str) formated date
1135

1136
        :returns: (dict)
1137
        """
1138
        return filter_in(
1139
            self.get_container_info(until=until), 'X-Container-Meta')
1140

    
1141
    def get_container_object_meta(self, until=None):
1142
        """
1143
        :param until: (str) formated date
1144

1145
        :returns: (dict)
1146
        """
1147
        return filter_in(
1148
            self.get_container_info(until=until), 'X-Container-Object-Meta')
1149

    
1150
    def set_container_meta(self, metapairs):
1151
        """
1152
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1153
        """
1154
        assert(type(metapairs) is dict)
1155
        r = self.container_post(update=True, metadata=metapairs)
1156
        return r.headers
1157

    
1158
    def del_container_meta(self, metakey):
1159
        """
1160
        :param metakey: (str) metadatum key
1161

1162
        :returns: (dict) response headers
1163
        """
1164
        r = self.container_post(update=True, metadata={metakey: ''})
1165
        return r.headers
1166

    
1167
    def set_container_limit(self, limit):
1168
        """
1169
        :param limit: (int)
1170
        """
1171
        r = self.container_post(update=True, quota=limit)
1172
        return r.headers
1173

    
1174
    def set_container_versioning(self, versioning):
1175
        """
1176
        :param versioning: (str)
1177
        """
1178
        r = self.container_post(update=True, versioning=versioning)
1179
        return r.headers
1180

    
1181
    def del_object(self, obj, until=None, delimiter=None):
1182
        """
1183
        :param obj: (str) remote object path
1184

1185
        :param until: (str) formated date
1186

1187
        :param delimiter: (str)
1188
        """
1189
        self._assert_container()
1190
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1191
        return r.headers
1192

    
1193
    def set_object_meta(self, obj, metapairs):
1194
        """
1195
        :param obj: (str) remote object path
1196

1197
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1198
        """
1199
        assert(type(metapairs) is dict)
1200
        r = self.object_post(obj, update=True, metadata=metapairs)
1201
        return r.headers
1202

    
1203
    def del_object_meta(self, obj, metakey):
1204
        """
1205
        :param obj: (str) remote object path
1206

1207
        :param metakey: (str) metadatum key
1208
        """
1209
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1210
        return r.headers
1211

    
1212
    def publish_object(self, obj):
1213
        """
1214
        :param obj: (str) remote object path
1215

1216
        :returns: (str) access url
1217
        """
1218
        self.object_post(obj, update=True, public=True)
1219
        info = self.get_object_info(obj)
1220
        return info['x-object-public']
1221
        pref, sep, rest = self.base_url.partition('//')
1222
        base = rest.split('/')[0]
1223
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1224

    
1225
    def unpublish_object(self, obj):
1226
        """
1227
        :param obj: (str) remote object path
1228
        """
1229
        r = self.object_post(obj, update=True, public=False)
1230
        return r.headers
1231

    
1232
    def get_object_info(self, obj, version=None):
1233
        """
1234
        :param obj: (str) remote object path
1235

1236
        :param version: (str)
1237

1238
        :returns: (dict)
1239
        """
1240
        try:
1241
            r = self.object_head(obj, version=version)
1242
            return r.headers
1243
        except ClientError as ce:
1244
            if ce.status == 404:
1245
                raise ClientError('Object %s not found' % obj, status=404)
1246
            raise
1247

    
1248
    def get_object_meta(self, obj, version=None):
1249
        """
1250
        :param obj: (str) remote object path
1251

1252
        :param version: (str)
1253

1254
        :returns: (dict)
1255
        """
1256
        return filter_in(
1257
            self.get_object_info(obj, version=version),
1258
            'X-Object-Meta')
1259

    
1260
    def get_object_sharing(self, obj):
1261
        """
1262
        :param obj: (str) remote object path
1263

1264
        :returns: (dict)
1265
        """
1266
        r = filter_in(
1267
            self.get_object_info(obj),
1268
            'X-Object-Sharing',
1269
            exactMatch=True)
1270
        reply = {}
1271
        if len(r) > 0:
1272
            perms = r['x-object-sharing'].split(';')
1273
            for perm in perms:
1274
                try:
1275
                    perm.index('=')
1276
                except ValueError:
1277
                    raise ClientError('Incorrect reply format')
1278
                (key, val) = perm.strip().split('=')
1279
                reply[key] = val
1280
        return reply
1281

    
1282
    def set_object_sharing(
1283
            self, obj,
1284
            read_permission=False, write_permission=False):
1285
        """Give read/write permisions to an object.
1286

1287
        :param obj: (str) remote object path
1288

1289
        :param read_permission: (list - bool) users and user groups that get
1290
            read permission for this object - False means all previous read
1291
            permissions will be removed
1292

1293
        :param write_permission: (list - bool) of users and user groups to get
1294
           write permission for this object - False means all previous write
1295
           permissions will be removed
1296

1297
        :returns: (dict) response headers
1298
        """
1299

    
1300
        perms = dict(read=read_permission or '', write=write_permission or '')
1301
        r = self.object_post(obj, update=True, permissions=perms)
1302
        return r.headers
1303

    
1304
    def del_object_sharing(self, obj):
1305
        """
1306
        :param obj: (str) remote object path
1307
        """
1308
        return self.set_object_sharing(obj)
1309

    
1310
    def append_object(self, obj, source_file, upload_cb=None):
1311
        """
1312
        :param obj: (str) remote object path
1313

1314
        :param source_file: open file descriptor
1315

1316
        :param upload_db: progress.bar for uploading
1317
        """
1318
        self._assert_container()
1319
        meta = self.get_container_info()
1320
        blocksize = int(meta['x-container-block-size'])
1321
        filesize = fstat(source_file.fileno()).st_size
1322
        nblocks = 1 + (filesize - 1) // blocksize
1323
        offset = 0
1324
        headers = {}
1325
        if upload_cb:
1326
            self.progress_bar_gen = upload_cb(nblocks)
1327
            self._cb_next()
1328
        flying = {}
1329
        self._init_thread_limit()
1330
        try:
1331
            for i in range(nblocks):
1332
                block = source_file.read(min(blocksize, filesize - offset))
1333
                offset += len(block)
1334

    
1335
                self._watch_thread_limit(flying.values())
1336
                unfinished = {}
1337
                flying[i] = SilentEvent(
1338
                    method=self.object_post,
1339
                    obj=obj,
1340
                    update=True,
1341
                    content_range='bytes */*',
1342
                    content_type='application/octet-stream',
1343
                    content_length=len(block),
1344
                    data=block)
1345
                flying[i].start()
1346

    
1347
                for key, thread in flying.items():
1348
                    if thread.isAlive():
1349
                        if i < nblocks:
1350
                            unfinished[key] = thread
1351
                            continue
1352
                        thread.join()
1353
                    if thread.exception:
1354
                        raise thread.exception
1355
                    headers[key] = thread.value.headers
1356
                    self._cb_next()
1357
                flying = unfinished
1358
        except KeyboardInterrupt:
1359
            sendlog.info('- - - wait for threads to finish')
1360
            for thread in activethreads():
1361
                thread.join()
1362
        finally:
1363
            from time import sleep
1364
            sleep(2 * len(activethreads()))
1365
        return headers.values()
1366

    
1367
    def truncate_object(self, obj, upto_bytes):
1368
        """
1369
        :param obj: (str) remote object path
1370

1371
        :param upto_bytes: max number of bytes to leave on file
1372

1373
        :returns: (dict) response headers
1374
        """
1375
        ctype = self.get_object_info(obj)['content-type']
1376
        r = self.object_post(
1377
            obj,
1378
            update=True,
1379
            content_range='bytes 0-%s/*' % upto_bytes,
1380
            content_type=ctype,
1381
            object_bytes=upto_bytes,
1382
            source_object=path4url(self.container, obj))
1383
        return r.headers
1384

    
1385
    def overwrite_object(
1386
            self, obj, start, end, source_file,
1387
            upload_cb=None):
1388
        """Overwrite a part of an object from local source file
1389
        ATTENTION: content_type must always be application/octet-stream
1390

1391
        :param obj: (str) remote object path
1392

1393
        :param start: (int) position in bytes to start overwriting from
1394

1395
        :param end: (int) position in bytes to stop overwriting at
1396

1397
        :param source_file: open file descriptor
1398

1399
        :param upload_db: progress.bar for uploading
1400
        """
1401

    
1402
        self._assert_container()
1403
        r = self.get_object_info(obj)
1404
        rf_size = int(r['content-length'])
1405
        start, end = int(start), int(end)
1406
        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1407
            start, rf_size)
1408
        meta = self.get_container_info()
1409
        blocksize = int(meta['x-container-block-size'])
1410
        filesize = fstat(source_file.fileno()).st_size
1411
        datasize = end - start + 1
1412
        nblocks = 1 + (datasize - 1) // blocksize
1413
        offset = 0
1414
        if upload_cb:
1415
            self.progress_bar_gen = upload_cb(nblocks)
1416
            self._cb_next()
1417
        headers = []
1418
        for i in range(nblocks):
1419
            read_size = min(blocksize, filesize - offset, datasize - offset)
1420
            block = source_file.read(read_size)
1421
            r = self.object_post(
1422
                obj,
1423
                update=True,
1424
                content_type='application/octet-stream',
1425
                content_length=len(block),
1426
                content_range='bytes %s-%s/*' % (
1427
                    start + offset,
1428
                    start + offset + len(block) - 1),
1429
                data=block)
1430
            headers.append(dict(r.headers))
1431
            offset += len(block)
1432

    
1433
            self._cb_next
1434
        return headers
1435

    
1436
    def copy_object(
1437
            self, src_container, src_object, dst_container,
1438
            dst_object=None,
1439
            source_version=None,
1440
            source_account=None,
1441
            public=False,
1442
            content_type=None,
1443
            delimiter=None):
1444
        """
1445
        :param src_container: (str) source container
1446

1447
        :param src_object: (str) source object path
1448

1449
        :param dst_container: (str) destination container
1450

1451
        :param dst_object: (str) destination object path
1452

1453
        :param source_version: (str) source object version
1454

1455
        :param source_account: (str) account to copy from
1456

1457
        :param public: (bool)
1458

1459
        :param content_type: (str)
1460

1461
        :param delimiter: (str)
1462

1463
        :returns: (dict) response headers
1464
        """
1465
        self._assert_account()
1466
        self.container = dst_container
1467
        src_path = path4url(src_container, src_object)
1468
        r = self.object_put(
1469
            dst_object or src_object,
1470
            success=201,
1471
            copy_from=src_path,
1472
            content_length=0,
1473
            source_version=source_version,
1474
            source_account=source_account,
1475
            public=public,
1476
            content_type=content_type,
1477
            delimiter=delimiter)
1478
        return r.headers
1479

    
1480
    def move_object(
1481
            self, src_container, src_object, dst_container,
1482
            dst_object=False,
1483
            source_account=None,
1484
            source_version=None,
1485
            public=False,
1486
            content_type=None,
1487
            delimiter=None):
1488
        """
1489
        :param src_container: (str) source container
1490

1491
        :param src_object: (str) source object path
1492

1493
        :param dst_container: (str) destination container
1494

1495
        :param dst_object: (str) destination object path
1496

1497
        :param source_account: (str) account to move from
1498

1499
        :param source_version: (str) source object version
1500

1501
        :param public: (bool)
1502

1503
        :param content_type: (str)
1504

1505
        :param delimiter: (str)
1506

1507
        :returns: (dict) response headers
1508
        """
1509
        self._assert_account()
1510
        self.container = dst_container
1511
        dst_object = dst_object or src_object
1512
        src_path = path4url(src_container, src_object)
1513
        r = self.object_put(
1514
            dst_object,
1515
            success=201,
1516
            move_from=src_path,
1517
            content_length=0,
1518
            source_account=source_account,
1519
            source_version=source_version,
1520
            public=public,
1521
            content_type=content_type,
1522
            delimiter=delimiter)
1523
        return r.headers
1524

    
1525
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1526
        """Get accounts that share with self.account
1527

1528
        :param limit: (str)
1529

1530
        :param marker: (str)
1531

1532
        :returns: (dict)
1533
        """
1534
        self._assert_account()
1535

    
1536
        self.set_param('format', 'json')
1537
        self.set_param('limit', limit, iff=limit is not None)
1538
        self.set_param('marker', marker, iff=marker is not None)
1539

    
1540
        path = ''
1541
        success = kwargs.pop('success', (200, 204))
1542
        r = self.get(path, *args, success=success, **kwargs)
1543
        return r.json
1544

    
1545
    def get_object_versionlist(self, obj):
1546
        """
1547
        :param obj: (str) remote object path
1548

1549
        :returns: (list)
1550
        """
1551
        self._assert_container()
1552
        r = self.object_get(obj, format='json', version='list')
1553
        return r.json['versions']