Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 8df239df

History | View | Annotate | Download (49.3 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in, readall
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
70
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71
        end, start)
72
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73
        max_value, end)
74
    if not a_range:
75
        return '%s-%s' % (start, end)
76
    selected = []
77
    for some_range in a_range.split(','):
78
        v0, sep, v1 = some_range.partition('-')
79
        if v0:
80
            v0 = int(v0)
81
            if sep:
82
                v1 = int(v1)
83
                if v1 < start or v0 > end or v1 < v0:
84
                    continue
85
                v0 = v0 if v0 > start else start
86
                v1 = v1 if v1 < end else end
87
                selected.append('%s-%s' % (v0, v1))
88
            elif v0 < start:
89
                continue
90
            else:
91
                v1 = v0 if v0 <= end else end
92
                selected.append('%s-%s' % (start, v1))
93
        else:
94
            v1 = int(v1)
95
            if max_value - v1 > end:
96
                continue
97
            v0 = (max_value - v1) if max_value - v1 > start else start
98
            selected.append('%s-%s' % (v0, end))
99
    return ','.join(selected)
100

    
101

    
102
class PithosClient(PithosRestClient):
103
    """Synnefo Pithos+ API client"""
104

    
105
    def __init__(self, base_url, token, account=None, container=None):
106
        super(PithosClient, self).__init__(base_url, token, account, container)
107

    
108
    def create_container(
109
            self,
110
            container=None, sizelimit=None, versioning=None, metadata=None,
111
            **kwargs):
112
        """
113
        :param container: (str) if not given, self.container is used instead
114

115
        :param sizelimit: (int) container total size limit in bytes
116

117
        :param versioning: (str) can be auto or whatever supported by server
118

119
        :param metadata: (dict) Custom user-defined metadata of the form
120
            { 'name1': 'value1', 'name2': 'value2', ... }
121

122
        :returns: (dict) response headers
123
        """
124
        cnt_back_up = self.container
125
        try:
126
            self.container = container or cnt_back_up
127
            r = self.container_put(
128
                quota=sizelimit, versioning=versioning, metadata=metadata,
129
                **kwargs)
130
            return r.headers
131
        finally:
132
            self.container = cnt_back_up
133

    
134
    def purge_container(self, container=None):
135
        """Delete an empty container and destroy associated blocks"""
136
        cnt_back_up = self.container
137
        try:
138
            self.container = container or cnt_back_up
139
            r = self.container_delete(until=unicode(time()))
140
        finally:
141
            self.container = cnt_back_up
142
        return r.headers
143

    
144
    def upload_object_unchunked(
145
            self, obj, f,
146
            withHashFile=False,
147
            size=None,
148
            etag=None,
149
            content_encoding=None,
150
            content_disposition=None,
151
            content_type=None,
152
            sharing=None,
153
            public=None):
154
        """
155
        :param obj: (str) remote object path
156

157
        :param f: open file descriptor
158

159
        :param withHashFile: (bool)
160

161
        :param size: (int) size of data to upload
162

163
        :param etag: (str)
164

165
        :param content_encoding: (str)
166

167
        :param content_disposition: (str)
168

169
        :param content_type: (str)
170

171
        :param sharing: {'read':[user and/or grp names],
172
            'write':[usr and/or grp names]}
173

174
        :param public: (bool)
175

176
        :returns: (dict) created object metadata
177
        """
178
        self._assert_container()
179

    
180
        if withHashFile:
181
            data = f.read()
182
            try:
183
                import json
184
                data = json.dumps(json.loads(data))
185
            except ValueError:
186
                raise ClientError('"%s" is not json-formated' % f.name, 1)
187
            except SyntaxError:
188
                msg = '"%s" is not a valid hashmap file' % f.name
189
                raise ClientError(msg, 1)
190
            f = StringIO(data)
191
        else:
192
            data = readall(f, size) if size else f.read()
193
        r = self.object_put(
194
            obj,
195
            data=data,
196
            etag=etag,
197
            content_encoding=content_encoding,
198
            content_disposition=content_disposition,
199
            content_type=content_type,
200
            permissions=sharing,
201
            public=public,
202
            success=201)
203
        return r.headers
204

    
205
    def create_object_by_manifestation(
206
            self, obj,
207
            etag=None,
208
            content_encoding=None,
209
            content_disposition=None,
210
            content_type=None,
211
            sharing=None,
212
            public=None):
213
        """
214
        :param obj: (str) remote object path
215

216
        :param etag: (str)
217

218
        :param content_encoding: (str)
219

220
        :param content_disposition: (str)
221

222
        :param content_type: (str)
223

224
        :param sharing: {'read':[user and/or grp names],
225
            'write':[usr and/or grp names]}
226

227
        :param public: (bool)
228

229
        :returns: (dict) created object metadata
230
        """
231
        self._assert_container()
232
        r = self.object_put(
233
            obj,
234
            content_length=0,
235
            etag=etag,
236
            content_encoding=content_encoding,
237
            content_disposition=content_disposition,
238
            content_type=content_type,
239
            permissions=sharing,
240
            public=public,
241
            manifest='%s/%s' % (self.container, obj))
242
        return r.headers
243

    
244
    # upload_* auxiliary methods
245
    def _put_block_async(self, data, hash):
246
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
247
        event.start()
248
        return event
249

    
250
    def _put_block(self, data, hash):
251
        r = self.container_post(
252
            update=True,
253
            content_type='application/octet-stream',
254
            content_length=len(data),
255
            data=data,
256
            format='json')
257
        assert r.json[0] == hash, 'Local hash does not match server'
258

    
259
    def _get_file_block_info(self, fileobj, size=None, cache=None):
260
        """
261
        :param fileobj: (file descriptor) source
262

263
        :param size: (int) size of data to upload from source
264

265
        :param cache: (dict) if provided, cache container info response to
266
        avoid redundant calls
267
        """
268
        if isinstance(cache, dict):
269
            try:
270
                meta = cache[self.container]
271
            except KeyError:
272
                meta = self.get_container_info()
273
                cache[self.container] = meta
274
        else:
275
            meta = self.get_container_info()
276
        blocksize = int(meta['x-container-block-size'])
277
        blockhash = meta['x-container-block-hash']
278
        size = size if size is not None else fstat(fileobj.fileno()).st_size
279
        nblocks = 1 + (size - 1) // blocksize
280
        return (blocksize, blockhash, size, nblocks)
281

    
282
    def _create_object_or_get_missing_hashes(
283
            self, obj, json,
284
            size=None,
285
            format='json',
286
            hashmap=True,
287
            content_type=None,
288
            if_etag_match=None,
289
            if_etag_not_match=None,
290
            content_encoding=None,
291
            content_disposition=None,
292
            permissions=None,
293
            public=None,
294
            success=(201, 409)):
295
        r = self.object_put(
296
            obj,
297
            format='json',
298
            hashmap=True,
299
            content_type=content_type,
300
            json=json,
301
            if_etag_match=if_etag_match,
302
            if_etag_not_match=if_etag_not_match,
303
            content_encoding=content_encoding,
304
            content_disposition=content_disposition,
305
            permissions=permissions,
306
            public=public,
307
            success=success)
308
        return (None if r.status_code == 201 else r.json), r.headers
309

    
310
    def _calculate_blocks_for_upload(
311
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
312
            hash_cb=None):
313
        offset = 0
314
        if hash_cb:
315
            hash_gen = hash_cb(nblocks)
316
            hash_gen.next()
317

    
318
        for i in xrange(nblocks):
319
            block = readall(fileobj, min(blocksize, size - offset))
320
            bytes = len(block)
321
            if bytes <= 0:
322
                break
323
            hash = _pithos_hash(block, blockhash)
324
            hashes.append(hash)
325
            hmap[hash] = (offset, bytes)
326
            offset += bytes
327
            if hash_cb:
328
                hash_gen.next()
329
        msg = ('Failed to calculate uploading blocks: '
330
               'read bytes(%s) != requested size (%s)' % (offset, size))
331
        assert offset == size, msg
332

    
333
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
334
        """upload missing blocks asynchronously"""
335

    
336
        self._init_thread_limit()
337

    
338
        flying = []
339
        failures = []
340
        for hash in missing:
341
            offset, bytes = hmap[hash]
342
            fileobj.seek(offset)
343
            data = readall(fileobj, bytes)
344
            r = self._put_block_async(data, hash)
345
            flying.append(r)
346
            unfinished = self._watch_thread_limit(flying)
347
            for thread in set(flying).difference(unfinished):
348
                if thread.exception:
349
                    failures.append(thread)
350
                    if isinstance(
351
                            thread.exception,
352
                            ClientError) and thread.exception.status == 502:
353
                        self.POOLSIZE = self._thread_limit
354
                elif thread.isAlive():
355
                    flying.append(thread)
356
                elif upload_gen:
357
                    try:
358
                        upload_gen.next()
359
                    except:
360
                        pass
361
            flying = unfinished
362

    
363
        for thread in flying:
364
            thread.join()
365
            if thread.exception:
366
                failures.append(thread)
367
            elif upload_gen:
368
                try:
369
                    upload_gen.next()
370
                except:
371
                    pass
372

    
373
        return [failure.kwargs['hash'] for failure in failures]
374

    
375
    def upload_object(
376
            self, obj, f,
377
            size=None,
378
            hash_cb=None,
379
            upload_cb=None,
380
            etag=None,
381
            if_etag_match=None,
382
            if_not_exist=None,
383
            content_encoding=None,
384
            content_disposition=None,
385
            content_type=None,
386
            sharing=None,
387
            public=None,
388
            container_info_cache=None):
389
        """Upload an object using multiple connections (threads)
390

391
        :param obj: (str) remote object path
392

393
        :param f: open file descriptor (rb)
394

395
        :param hash_cb: optional progress.bar object for calculating hashes
396

397
        :param upload_cb: optional progress.bar object for uploading
398

399
        :param etag: (str)
400

401
        :param if_etag_match: (str) Push that value to if-match header at file
402
            creation
403

404
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
405
            it does not exist remotely, otherwise the operation will fail.
406
            Involves the case of an object with the same path is created while
407
            the object is being uploaded.
408

409
        :param content_encoding: (str)
410

411
        :param content_disposition: (str)
412

413
        :param content_type: (str)
414

415
        :param sharing: {'read':[user and/or grp names],
416
            'write':[usr and/or grp names]}
417

418
        :param public: (bool)
419

420
        :param container_info_cache: (dict) if given, avoid redundant calls to
421
            server for container info (block size and hash information)
422
        """
423
        self._assert_container()
424

    
425
        block_info = (
426
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
427
                f, size, container_info_cache)
428
        (hashes, hmap, offset) = ([], {}, 0)
429
        content_type = content_type or 'application/octet-stream'
430

    
431
        self._calculate_blocks_for_upload(
432
            *block_info,
433
            hashes=hashes,
434
            hmap=hmap,
435
            fileobj=f,
436
            hash_cb=hash_cb)
437

    
438
        hashmap = dict(bytes=size, hashes=hashes)
439
        missing, obj_headers = self._create_object_or_get_missing_hashes(
440
            obj, hashmap,
441
            content_type=content_type,
442
            size=size,
443
            if_etag_match=if_etag_match,
444
            if_etag_not_match='*' if if_not_exist else None,
445
            content_encoding=content_encoding,
446
            content_disposition=content_disposition,
447
            permissions=sharing,
448
            public=public)
449

    
450
        if missing is None:
451
            return obj_headers
452

    
453
        if upload_cb:
454
            upload_gen = upload_cb(len(missing))
455
            for i in range(len(missing), len(hashmap['hashes']) + 1):
456
                try:
457
                    upload_gen.next()
458
                except:
459
                    upload_gen = None
460
        else:
461
            upload_gen = None
462

    
463
        retries = 7
464
        try:
465
            while retries:
466
                sendlog.info('%s blocks missing' % len(missing))
467
                num_of_blocks = len(missing)
468
                missing = self._upload_missing_blocks(
469
                    missing,
470
                    hmap,
471
                    f,
472
                    upload_gen)
473
                if missing:
474
                    if num_of_blocks == len(missing):
475
                        retries -= 1
476
                    else:
477
                        num_of_blocks = len(missing)
478
                else:
479
                    break
480
            if missing:
481
                try:
482
                    details = ['%s' % thread.exception for thread in missing]
483
                except Exception:
484
                    details = ['Also, failed to read thread exceptions']
485
                raise ClientError(
486
                    '%s blocks failed to upload' % len(missing),
487
                    details=details)
488
        except KeyboardInterrupt:
489
            sendlog.info('- - - wait for threads to finish')
490
            for thread in activethreads():
491
                thread.join()
492
            raise
493

    
494
        r = self.object_put(
495
            obj,
496
            format='json',
497
            hashmap=True,
498
            content_type=content_type,
499
            content_encoding=content_encoding,
500
            if_etag_match=if_etag_match,
501
            if_etag_not_match='*' if if_not_exist else None,
502
            etag=etag,
503
            json=hashmap,
504
            permissions=sharing,
505
            public=public,
506
            success=201)
507
        return r.headers
508

    
509
    def upload_from_string(
510
            self, obj, input_str,
511
            hash_cb=None,
512
            upload_cb=None,
513
            etag=None,
514
            if_etag_match=None,
515
            if_not_exist=None,
516
            content_encoding=None,
517
            content_disposition=None,
518
            content_type=None,
519
            sharing=None,
520
            public=None,
521
            container_info_cache=None):
522
        """Upload an object using multiple connections (threads)
523

524
        :param obj: (str) remote object path
525

526
        :param input_str: (str) upload content
527

528
        :param hash_cb: optional progress.bar object for calculating hashes
529

530
        :param upload_cb: optional progress.bar object for uploading
531

532
        :param etag: (str)
533

534
        :param if_etag_match: (str) Push that value to if-match header at file
535
            creation
536

537
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538
            it does not exist remotely, otherwise the operation will fail.
539
            Involves the case of an object with the same path is created while
540
            the object is being uploaded.
541

542
        :param content_encoding: (str)
543

544
        :param content_disposition: (str)
545

546
        :param content_type: (str)
547

548
        :param sharing: {'read':[user and/or grp names],
549
            'write':[usr and/or grp names]}
550

551
        :param public: (bool)
552

553
        :param container_info_cache: (dict) if given, avoid redundant calls to
554
            server for container info (block size and hash information)
555
        """
556
        self._assert_container()
557

    
558
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
559
                fileobj=None, size=len(input_str), cache=container_info_cache)
560
        (hashes, hmap, offset) = ([], {}, 0)
561
        if not content_type:
562
            content_type = 'application/octet-stream'
563

    
564
        hashes = []
565
        hmap = {}
566
        for blockid in range(nblocks):
567
            start = blockid * blocksize
568
            block = input_str[start: (start + blocksize)]
569
            hashes.append(_pithos_hash(block, blockhash))
570
            hmap[hashes[blockid]] = (start, block)
571

    
572
        hashmap = dict(bytes=size, hashes=hashes)
573
        missing, obj_headers = self._create_object_or_get_missing_hashes(
574
            obj, hashmap,
575
            content_type=content_type,
576
            size=size,
577
            if_etag_match=if_etag_match,
578
            if_etag_not_match='*' if if_not_exist else None,
579
            content_encoding=content_encoding,
580
            content_disposition=content_disposition,
581
            permissions=sharing,
582
            public=public)
583
        if missing is None:
584
            return obj_headers
585
        num_of_missing = len(missing)
586

    
587
        if upload_cb:
588
            self.progress_bar_gen = upload_cb(nblocks)
589
            for i in range(nblocks + 1 - num_of_missing):
590
                self._cb_next()
591

    
592
        tries = 7
593
        old_failures = 0
594
        try:
595
            while tries and missing:
596
                flying = []
597
                failures = []
598
                for hash in missing:
599
                    offset, block = hmap[hash]
600
                    bird = self._put_block_async(block, hash)
601
                    flying.append(bird)
602
                    unfinished = self._watch_thread_limit(flying)
603
                    for thread in set(flying).difference(unfinished):
604
                        if thread.exception:
605
                            failures.append(thread.kwargs['hash'])
606
                        if thread.isAlive():
607
                            flying.append(thread)
608
                        else:
609
                            self._cb_next()
610
                    flying = unfinished
611
                for thread in flying:
612
                    thread.join()
613
                    if thread.exception:
614
                        failures.append(thread.kwargs['hash'])
615
                    self._cb_next()
616
                missing = failures
617
                if missing and len(missing) == old_failures:
618
                    tries -= 1
619
                old_failures = len(missing)
620
            if missing:
621
                raise ClientError('%s blocks failed to upload' % len(missing))
622
        except KeyboardInterrupt:
623
            sendlog.info('- - - wait for threads to finish')
624
            for thread in activethreads():
625
                thread.join()
626
            raise
627

    
628
        r = self.object_put(
629
            obj,
630
            format='json',
631
            hashmap=True,
632
            content_type=content_type,
633
            content_encoding=content_encoding,
634
            if_etag_match=if_etag_match,
635
            if_etag_not_match='*' if if_not_exist else None,
636
            etag=etag,
637
            json=hashmap,
638
            permissions=sharing,
639
            public=public,
640
            success=201)
641
        return r.headers
642

    
643
    # download_* auxiliary methods
644
    def _get_remote_blocks_info(self, obj, **restargs):
645
        #retrieve object hashmap
646
        myrange = restargs.pop('data_range', None)
647
        hashmap = self.get_object_hashmap(obj, **restargs)
648
        restargs['data_range'] = myrange
649
        blocksize = int(hashmap['block_size'])
650
        blockhash = hashmap['block_hash']
651
        total_size = hashmap['bytes']
652
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
653
        map_dict = {}
654
        for i, h in enumerate(hashmap['hashes']):
655
            #  map_dict[h] = i   CHAGE
656
            if h in map_dict:
657
                map_dict[h].append(i)
658
            else:
659
                map_dict[h] = [i]
660
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
661

    
662
    def _dump_blocks_sync(
663
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
664
            **args):
665
        if not total_size:
666
            return
667
        for blockid, blockhash in enumerate(remote_hashes):
668
            if blockhash:
669
                start = blocksize * blockid
670
                is_last = start + blocksize > total_size
671
                end = (total_size - 1) if is_last else (start + blocksize - 1)
672
                data_range = _range_up(start, end, total_size, crange)
673
                if not data_range:
674
                    self._cb_next()
675
                    continue
676
                args['data_range'] = 'bytes=%s' % data_range
677
                r = self.object_get(obj, success=(200, 206), **args)
678
                self._cb_next()
679
                dst.write(r.content)
680
                dst.flush()
681

    
682
    def _get_block_async(self, obj, **args):
683
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
684
        event.start()
685
        return event
686

    
687
    def _hash_from_file(self, fp, start, size, blockhash):
688
        fp.seek(start)
689
        block = readall(fp, size)
690
        h = newhashlib(blockhash)
691
        h.update(block.strip('\x00'))
692
        return hexlify(h.digest())
693

    
694
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
695
        """write the results of a greenleted rest call to a file
696

697
        :param offset: the offset of the file up to blocksize
698
        - e.g. if the range is 10-100, all blocks will be written to
699
        normal_position - 10
700
        """
701
        for key, g in flying.items():
702
            if g.isAlive():
703
                continue
704
            if g.exception:
705
                raise g.exception
706
            block = g.value.content
707
            for block_start in blockids[key]:
708
                local_file.seek(block_start + offset)
709
                local_file.write(block)
710
                self._cb_next()
711
            flying.pop(key)
712
            blockids.pop(key)
713
        local_file.flush()
714

    
715
    def _dump_blocks_async(
716
            self, obj, remote_hashes, blocksize, total_size, local_file,
717
            blockhash=None, resume=False, filerange=None, **restargs):
718
        file_size = fstat(local_file.fileno()).st_size if resume else 0
719
        flying = dict()
720
        blockid_dict = dict()
721
        offset = 0
722

    
723
        self._init_thread_limit()
724
        for block_hash, blockids in remote_hashes.items():
725
            blockids = [blk * blocksize for blk in blockids]
726
            unsaved = [blk for blk in blockids if not (
727
                blk < file_size and block_hash == self._hash_from_file(
728
                        local_file, blk, blocksize, blockhash))]
729
            self._cb_next(len(blockids) - len(unsaved))
730
            if unsaved:
731
                key = unsaved[0]
732
                self._watch_thread_limit(flying.values())
733
                self._thread2file(
734
                    flying, blockid_dict, local_file, offset,
735
                    **restargs)
736
                end = total_size - 1 if (
737
                    key + blocksize > total_size) else key + blocksize - 1
738
                if end < key:
739
                    self._cb_next()
740
                    continue
741
                data_range = _range_up(key, end, total_size, filerange)
742
                if not data_range:
743
                    self._cb_next()
744
                    continue
745
                restargs[
746
                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
747
                flying[key] = self._get_block_async(obj, **restargs)
748
                blockid_dict[key] = unsaved
749

    
750
        for thread in flying.values():
751
            thread.join()
752
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
753

    
754
    def download_object(
755
            self, obj, dst,
756
            download_cb=None,
757
            version=None,
758
            resume=False,
759
            range_str=None,
760
            if_match=None,
761
            if_none_match=None,
762
            if_modified_since=None,
763
            if_unmodified_since=None):
764
        """Download an object (multiple connections, random blocks)
765

766
        :param obj: (str) remote object path
767

768
        :param dst: open file descriptor (wb+)
769

770
        :param download_cb: optional progress.bar object for downloading
771

772
        :param version: (str) file version
773

774
        :param resume: (bool) if set, preserve already downloaded file parts
775

776
        :param range_str: (str) from, to are file positions (int) in bytes
777

778
        :param if_match: (str)
779

780
        :param if_none_match: (str)
781

782
        :param if_modified_since: (str) formated date
783

784
        :param if_unmodified_since: (str) formated date"""
785
        restargs = dict(
786
            version=version,
787
            data_range=None if range_str is None else 'bytes=%s' % range_str,
788
            if_match=if_match,
789
            if_none_match=if_none_match,
790
            if_modified_since=if_modified_since,
791
            if_unmodified_since=if_unmodified_since)
792

    
793
        (
794
            blocksize,
795
            blockhash,
796
            total_size,
797
            hash_list,
798
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
799
        assert total_size >= 0
800

    
801
        if download_cb:
802
            self.progress_bar_gen = download_cb(len(hash_list))
803
            self._cb_next()
804

    
805
        if dst.isatty():
806
            self._dump_blocks_sync(
807
                obj,
808
                hash_list,
809
                blocksize,
810
                total_size,
811
                dst,
812
                range_str,
813
                **restargs)
814
        else:
815
            self._dump_blocks_async(
816
                obj,
817
                remote_hashes,
818
                blocksize,
819
                total_size,
820
                dst,
821
                blockhash,
822
                resume,
823
                range_str,
824
                **restargs)
825
            if not range_str:
826
                dst.truncate(total_size)
827

    
828
        self._complete_cb()
829

    
830
    def download_to_string(
831
            self, obj,
832
            download_cb=None,
833
            version=None,
834
            range_str=None,
835
            if_match=None,
836
            if_none_match=None,
837
            if_modified_since=None,
838
            if_unmodified_since=None):
839
        """Download an object to a string (multiple connections). This method
840
        uses threads for http requests, but stores all content in memory.
841

842
        :param obj: (str) remote object path
843

844
        :param download_cb: optional progress.bar object for downloading
845

846
        :param version: (str) file version
847

848
        :param range_str: (str) from, to are file positions (int) in bytes
849

850
        :param if_match: (str)
851

852
        :param if_none_match: (str)
853

854
        :param if_modified_since: (str) formated date
855

856
        :param if_unmodified_since: (str) formated date
857

858
        :returns: (str) the whole object contents
859
        """
860
        restargs = dict(
861
            version=version,
862
            data_range=None if range_str is None else 'bytes=%s' % range_str,
863
            if_match=if_match,
864
            if_none_match=if_none_match,
865
            if_modified_since=if_modified_since,
866
            if_unmodified_since=if_unmodified_since)
867

    
868
        (
869
            blocksize,
870
            blockhash,
871
            total_size,
872
            hash_list,
873
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
874
        assert total_size >= 0
875

    
876
        if download_cb:
877
            self.progress_bar_gen = download_cb(len(hash_list))
878
            self._cb_next()
879

    
880
        num_of_blocks = len(remote_hashes)
881
        ret = [''] * num_of_blocks
882
        self._init_thread_limit()
883
        flying = dict()
884
        try:
885
            for blockid, blockhash in enumerate(remote_hashes):
886
                start = blocksize * blockid
887
                is_last = start + blocksize > total_size
888
                end = (total_size - 1) if is_last else (start + blocksize - 1)
889
                data_range_str = _range_up(start, end, end, range_str)
890
                if data_range_str:
891
                    self._watch_thread_limit(flying.values())
892
                    restargs['data_range'] = 'bytes=%s' % data_range_str
893
                    flying[blockid] = self._get_block_async(obj, **restargs)
894
                for runid, thread in flying.items():
895
                    if (blockid + 1) == num_of_blocks:
896
                        thread.join()
897
                    elif thread.isAlive():
898
                        continue
899
                    if thread.exception:
900
                        raise thread.exception
901
                    ret[runid] = thread.value.content
902
                    self._cb_next()
903
                    flying.pop(runid)
904
            return ''.join(ret)
905
        except KeyboardInterrupt:
906
            sendlog.info('- - - wait for threads to finish')
907
            for thread in activethreads():
908
                thread.join()
909

    
910
    #Command Progress Bar method
911
    def _cb_next(self, step=1):
912
        if hasattr(self, 'progress_bar_gen'):
913
            try:
914
                for i in xrange(step):
915
                    self.progress_bar_gen.next()
916
            except:
917
                pass
918

    
919
    def _complete_cb(self):
920
        while True:
921
            try:
922
                self.progress_bar_gen.next()
923
            except:
924
                break
925

    
926
    def get_object_hashmap(
927
            self, obj,
928
            version=None,
929
            if_match=None,
930
            if_none_match=None,
931
            if_modified_since=None,
932
            if_unmodified_since=None):
933
        """
934
        :param obj: (str) remote object path
935

936
        :param if_match: (str)
937

938
        :param if_none_match: (str)
939

940
        :param if_modified_since: (str) formated date
941

942
        :param if_unmodified_since: (str) formated date
943

944
        :returns: (list)
945
        """
946
        try:
947
            r = self.object_get(
948
                obj,
949
                hashmap=True,
950
                version=version,
951
                if_etag_match=if_match,
952
                if_etag_not_match=if_none_match,
953
                if_modified_since=if_modified_since,
954
                if_unmodified_since=if_unmodified_since)
955
        except ClientError as err:
956
            if err.status == 304 or err.status == 412:
957
                return {}
958
            raise
959
        return r.json
960

    
961
    def set_account_group(self, group, usernames):
962
        """
963
        :param group: (str)
964

965
        :param usernames: (list)
966
        """
967
        r = self.account_post(update=True, groups={group: usernames})
968
        return r
969

    
970
    def del_account_group(self, group):
971
        """
972
        :param group: (str)
973
        """
974
        self.account_post(update=True, groups={group: []})
975

    
976
    def get_account_info(self, until=None):
977
        """
978
        :param until: (str) formated date
979

980
        :returns: (dict)
981
        """
982
        r = self.account_head(until=until)
983
        if r.status_code == 401:
984
            raise ClientError("No authorization", status=401)
985
        return r.headers
986

    
987
    def get_account_quota(self):
988
        """
989
        :returns: (dict)
990
        """
991
        return filter_in(
992
            self.get_account_info(),
993
            'X-Account-Policy-Quota',
994
            exactMatch=True)
995

    
996
    #def get_account_versioning(self):
997
    #    """
998
    #    :returns: (dict)
999
    #    """
1000
    #    return filter_in(
1001
    #        self.get_account_info(),
1002
    #        'X-Account-Policy-Versioning',
1003
    #        exactMatch=True)
1004

    
1005
    def get_account_meta(self, until=None):
1006
        """
1007
        :param until: (str) formated date
1008

1009
        :returns: (dict)
1010
        """
1011
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1012

    
1013
    def get_account_group(self):
1014
        """
1015
        :returns: (dict)
1016
        """
1017
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1018

    
1019
    def set_account_meta(self, metapairs):
1020
        """
1021
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1022
        """
1023
        assert(type(metapairs) is dict)
1024
        r = self.account_post(update=True, metadata=metapairs)
1025
        return r.headers
1026

    
1027
    def del_account_meta(self, metakey):
1028
        """
1029
        :param metakey: (str) metadatum key
1030
        """
1031
        r = self.account_post(update=True, metadata={metakey: ''})
1032
        return r.headers
1033

    
1034
    #def set_account_quota(self, quota):
1035
    #    """
1036
    #    :param quota: (int)
1037
    #    """
1038
    #    self.account_post(update=True, quota=quota)
1039

    
1040
    #def set_account_versioning(self, versioning):
1041
    #    """
1042
    #    :param versioning: (str)
1043
    #    """
1044
    #    r = self.account_post(update=True, versioning=versioning)
1045
    #    return r.headers
1046

    
1047
    def list_containers(self):
1048
        """
1049
        :returns: (dict)
1050
        """
1051
        r = self.account_get()
1052
        return r.json
1053

    
1054
    def del_container(self, until=None, delimiter=None):
1055
        """
1056
        :param until: (str) formated date
1057

1058
        :param delimiter: (str) with / empty container
1059

1060
        :raises ClientError: 404 Container does not exist
1061

1062
        :raises ClientError: 409 Container is not empty
1063
        """
1064
        self._assert_container()
1065
        r = self.container_delete(
1066
            until=until,
1067
            delimiter=delimiter,
1068
            success=(204, 404, 409))
1069
        if r.status_code == 404:
1070
            raise ClientError(
1071
                'Container "%s" does not exist' % self.container,
1072
                r.status_code)
1073
        elif r.status_code == 409:
1074
            raise ClientError(
1075
                'Container "%s" is not empty' % self.container,
1076
                r.status_code)
1077
        return r.headers
1078

    
1079
    def get_container_versioning(self, container=None):
1080
        """
1081
        :param container: (str)
1082

1083
        :returns: (dict)
1084
        """
1085
        cnt_back_up = self.container
1086
        try:
1087
            self.container = container or cnt_back_up
1088
            return filter_in(
1089
                self.get_container_info(),
1090
                'X-Container-Policy-Versioning')
1091
        finally:
1092
            self.container = cnt_back_up
1093

    
1094
    def get_container_limit(self, container=None):
1095
        """
1096
        :param container: (str)
1097

1098
        :returns: (dict)
1099
        """
1100
        cnt_back_up = self.container
1101
        try:
1102
            self.container = container or cnt_back_up
1103
            return filter_in(
1104
                self.get_container_info(),
1105
                'X-Container-Policy-Quota')
1106
        finally:
1107
            self.container = cnt_back_up
1108

    
1109
    def get_container_info(self, container=None, until=None):
1110
        """
1111
        :param until: (str) formated date
1112

1113
        :returns: (dict)
1114

1115
        :raises ClientError: 404 Container not found
1116
        """
1117
        bck_cont = self.container
1118
        try:
1119
            self.container = container or bck_cont
1120
            self._assert_container()
1121
            r = self.container_head(until=until)
1122
        except ClientError as err:
1123
            err.details.append('for container %s' % self.container)
1124
            raise err
1125
        finally:
1126
            self.container = bck_cont
1127
        return r.headers
1128

    
1129
    def get_container_meta(self, until=None):
1130
        """
1131
        :param until: (str) formated date
1132

1133
        :returns: (dict)
1134
        """
1135
        return filter_in(
1136
            self.get_container_info(until=until), 'X-Container-Meta')
1137

    
1138
    def get_container_object_meta(self, until=None):
1139
        """
1140
        :param until: (str) formated date
1141

1142
        :returns: (dict)
1143
        """
1144
        return filter_in(
1145
            self.get_container_info(until=until), 'X-Container-Object-Meta')
1146

    
1147
    def set_container_meta(self, metapairs):
1148
        """
1149
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1150
        """
1151
        assert(type(metapairs) is dict)
1152
        r = self.container_post(update=True, metadata=metapairs)
1153
        return r.headers
1154

    
1155
    def del_container_meta(self, metakey):
1156
        """
1157
        :param metakey: (str) metadatum key
1158

1159
        :returns: (dict) response headers
1160
        """
1161
        r = self.container_post(update=True, metadata={metakey: ''})
1162
        return r.headers
1163

    
1164
    def set_container_limit(self, limit):
1165
        """
1166
        :param limit: (int)
1167
        """
1168
        r = self.container_post(update=True, quota=limit)
1169
        return r.headers
1170

    
1171
    def set_container_versioning(self, versioning):
1172
        """
1173
        :param versioning: (str)
1174
        """
1175
        r = self.container_post(update=True, versioning=versioning)
1176
        return r.headers
1177

    
1178
    def del_object(self, obj, until=None, delimiter=None):
1179
        """
1180
        :param obj: (str) remote object path
1181

1182
        :param until: (str) formated date
1183

1184
        :param delimiter: (str)
1185
        """
1186
        self._assert_container()
1187
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1188
        return r.headers
1189

    
1190
    def set_object_meta(self, obj, metapairs):
1191
        """
1192
        :param obj: (str) remote object path
1193

1194
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1195
        """
1196
        assert(type(metapairs) is dict)
1197
        r = self.object_post(obj, update=True, metadata=metapairs)
1198
        return r.headers
1199

    
1200
    def del_object_meta(self, obj, metakey):
1201
        """
1202
        :param obj: (str) remote object path
1203

1204
        :param metakey: (str) metadatum key
1205
        """
1206
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1207
        return r.headers
1208

    
1209
    def publish_object(self, obj):
1210
        """
1211
        :param obj: (str) remote object path
1212

1213
        :returns: (str) access url
1214
        """
1215
        self.object_post(obj, update=True, public=True)
1216
        info = self.get_object_info(obj)
1217
        return info['x-object-public']
1218
        pref, sep, rest = self.base_url.partition('//')
1219
        base = rest.split('/')[0]
1220
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1221

    
1222
    def unpublish_object(self, obj):
1223
        """
1224
        :param obj: (str) remote object path
1225
        """
1226
        r = self.object_post(obj, update=True, public=False)
1227
        return r.headers
1228

    
1229
    def get_object_info(self, obj, version=None):
1230
        """
1231
        :param obj: (str) remote object path
1232

1233
        :param version: (str)
1234

1235
        :returns: (dict)
1236
        """
1237
        try:
1238
            r = self.object_head(obj, version=version)
1239
            return r.headers
1240
        except ClientError as ce:
1241
            if ce.status == 404:
1242
                raise ClientError('Object %s not found' % obj, status=404)
1243
            raise
1244

    
1245
    def get_object_meta(self, obj, version=None):
1246
        """
1247
        :param obj: (str) remote object path
1248

1249
        :param version: (str)
1250

1251
        :returns: (dict)
1252
        """
1253
        return filter_in(
1254
            self.get_object_info(obj, version=version),
1255
            'X-Object-Meta')
1256

    
1257
    def get_object_sharing(self, obj):
1258
        """
1259
        :param obj: (str) remote object path
1260

1261
        :returns: (dict)
1262
        """
1263
        r = filter_in(
1264
            self.get_object_info(obj),
1265
            'X-Object-Sharing',
1266
            exactMatch=True)
1267
        reply = {}
1268
        if len(r) > 0:
1269
            perms = r['x-object-sharing'].split(';')
1270
            for perm in perms:
1271
                try:
1272
                    perm.index('=')
1273
                except ValueError:
1274
                    raise ClientError('Incorrect reply format')
1275
                (key, val) = perm.strip().split('=')
1276
                reply[key] = val
1277
        return reply
1278

    
1279
    def set_object_sharing(
1280
            self, obj,
1281
            read_permission=False, write_permission=False):
1282
        """Give read/write permisions to an object.
1283

1284
        :param obj: (str) remote object path
1285

1286
        :param read_permission: (list - bool) users and user groups that get
1287
            read permission for this object - False means all previous read
1288
            permissions will be removed
1289

1290
        :param write_permission: (list - bool) of users and user groups to get
1291
           write permission for this object - False means all previous write
1292
           permissions will be removed
1293

1294
        :returns: (dict) response headers
1295
        """
1296

    
1297
        perms = dict(read=read_permission or '', write=write_permission or '')
1298
        r = self.object_post(obj, update=True, permissions=perms)
1299
        return r.headers
1300

    
1301
    def del_object_sharing(self, obj):
1302
        """
1303
        :param obj: (str) remote object path
1304
        """
1305
        return self.set_object_sharing(obj)
1306

    
1307
    def append_object(self, obj, source_file, upload_cb=None):
1308
        """
1309
        :param obj: (str) remote object path
1310

1311
        :param source_file: open file descriptor
1312

1313
        :param upload_db: progress.bar for uploading
1314
        """
1315
        self._assert_container()
1316
        meta = self.get_container_info()
1317
        blocksize = int(meta['x-container-block-size'])
1318
        filesize = fstat(source_file.fileno()).st_size
1319
        nblocks = 1 + (filesize - 1) // blocksize
1320
        offset = 0
1321
        headers = {}
1322
        if upload_cb:
1323
            self.progress_bar_gen = upload_cb(nblocks)
1324
            self._cb_next()
1325
        flying = {}
1326
        self._init_thread_limit()
1327
        try:
1328
            for i in range(nblocks):
1329
                block = source_file.read(min(blocksize, filesize - offset))
1330
                offset += len(block)
1331

    
1332
                self._watch_thread_limit(flying.values())
1333
                unfinished = {}
1334
                flying[i] = SilentEvent(
1335
                    method=self.object_post,
1336
                    obj=obj,
1337
                    update=True,
1338
                    content_range='bytes */*',
1339
                    content_type='application/octet-stream',
1340
                    content_length=len(block),
1341
                    data=block)
1342
                flying[i].start()
1343

    
1344
                for key, thread in flying.items():
1345
                    if thread.isAlive():
1346
                        if i < nblocks:
1347
                            unfinished[key] = thread
1348
                            continue
1349
                        thread.join()
1350
                    if thread.exception:
1351
                        raise thread.exception
1352
                    headers[key] = thread.value.headers
1353
                    self._cb_next()
1354
                flying = unfinished
1355
        except KeyboardInterrupt:
1356
            sendlog.info('- - - wait for threads to finish')
1357
            for thread in activethreads():
1358
                thread.join()
1359
        finally:
1360
            from time import sleep
1361
            sleep(2 * len(activethreads()))
1362
        return headers.values()
1363

    
1364
    def truncate_object(self, obj, upto_bytes):
1365
        """
1366
        :param obj: (str) remote object path
1367

1368
        :param upto_bytes: max number of bytes to leave on file
1369

1370
        :returns: (dict) response headers
1371
        """
1372
        ctype = self.get_object_info(obj)['content-type']
1373
        r = self.object_post(
1374
            obj,
1375
            update=True,
1376
            content_range='bytes 0-%s/*' % upto_bytes,
1377
            content_type=ctype,
1378
            object_bytes=upto_bytes,
1379
            source_object=path4url(self.container, obj))
1380
        return r.headers
1381

    
1382
    def overwrite_object(
1383
            self, obj, start, end, source_file,
1384
            source_version=None, upload_cb=None):
1385
        """Overwrite a part of an object from local source file
1386
        ATTENTION: content_type must always be application/octet-stream
1387

1388
        :param obj: (str) remote object path
1389

1390
        :param start: (int) position in bytes to start overwriting from
1391

1392
        :param end: (int) position in bytes to stop overwriting at
1393

1394
        :param source_file: open file descriptor
1395

1396
        :param upload_db: progress.bar for uploading
1397
        """
1398

    
1399
        self._assert_container()
1400
        r = self.get_object_info(obj, version=source_version)
1401
        rf_size = int(r['content-length'])
1402
        start, end = int(start), int(end)
1403
        assert rf_size >= start, 'Range start %s exceeds file size %s' % (
1404
            start, rf_size)
1405
        meta = self.get_container_info()
1406
        blocksize = int(meta['x-container-block-size'])
1407
        filesize = fstat(source_file.fileno()).st_size
1408
        datasize = end - start + 1
1409
        nblocks = 1 + (datasize - 1) // blocksize
1410
        offset = 0
1411
        if upload_cb:
1412
            self.progress_bar_gen = upload_cb(nblocks)
1413
            self._cb_next()
1414
        headers = []
1415
        for i in range(nblocks):
1416
            read_size = min(blocksize, filesize - offset, datasize - offset)
1417
            block = source_file.read(read_size)
1418
            r = self.object_post(
1419
                obj,
1420
                update=True,
1421
                content_type='application/octet-stream',
1422
                content_length=len(block),
1423
                content_range='bytes %s-%s/*' % (
1424
                    start + offset,
1425
                    start + offset + len(block) - 1),
1426
                source_version=source_version,
1427
                data=block)
1428
            headers.append(dict(r.headers))
1429
            offset += len(block)
1430

    
1431
            self._cb_next
1432
        return headers
1433

    
1434
    def copy_object(
1435
            self, src_container, src_object, dst_container,
1436
            dst_object=None,
1437
            source_version=None,
1438
            source_account=None,
1439
            public=False,
1440
            content_type=None,
1441
            delimiter=None):
1442
        """
1443
        :param src_container: (str) source container
1444

1445
        :param src_object: (str) source object path
1446

1447
        :param dst_container: (str) destination container
1448

1449
        :param dst_object: (str) destination object path
1450

1451
        :param source_version: (str) source object version
1452

1453
        :param source_account: (str) account to copy from
1454

1455
        :param public: (bool)
1456

1457
        :param content_type: (str)
1458

1459
        :param delimiter: (str)
1460

1461
        :returns: (dict) response headers
1462
        """
1463
        self._assert_account()
1464
        self.container = dst_container
1465
        src_path = path4url(src_container, src_object)
1466
        r = self.object_put(
1467
            dst_object or src_object,
1468
            success=201,
1469
            copy_from=src_path,
1470
            content_length=0,
1471
            source_version=source_version,
1472
            source_account=source_account,
1473
            public=public,
1474
            content_type=content_type,
1475
            delimiter=delimiter)
1476
        return r.headers
1477

    
1478
    def move_object(
1479
            self, src_container, src_object, dst_container,
1480
            dst_object=False,
1481
            source_account=None,
1482
            source_version=None,
1483
            public=False,
1484
            content_type=None,
1485
            delimiter=None):
1486
        """
1487
        :param src_container: (str) source container
1488

1489
        :param src_object: (str) source object path
1490

1491
        :param dst_container: (str) destination container
1492

1493
        :param dst_object: (str) destination object path
1494

1495
        :param source_account: (str) account to move from
1496

1497
        :param source_version: (str) source object version
1498

1499
        :param public: (bool)
1500

1501
        :param content_type: (str)
1502

1503
        :param delimiter: (str)
1504

1505
        :returns: (dict) response headers
1506
        """
1507
        self._assert_account()
1508
        self.container = dst_container
1509
        dst_object = dst_object or src_object
1510
        src_path = path4url(src_container, src_object)
1511
        r = self.object_put(
1512
            dst_object,
1513
            success=201,
1514
            move_from=src_path,
1515
            content_length=0,
1516
            source_account=source_account,
1517
            source_version=source_version,
1518
            public=public,
1519
            content_type=content_type,
1520
            delimiter=delimiter)
1521
        return r.headers
1522

    
1523
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1524
        """Get accounts that share with self.account
1525

1526
        :param limit: (str)
1527

1528
        :param marker: (str)
1529

1530
        :returns: (dict)
1531
        """
1532
        self._assert_account()
1533

    
1534
        self.set_param('format', 'json')
1535
        self.set_param('limit', limit, iff=limit is not None)
1536
        self.set_param('marker', marker, iff=marker is not None)
1537

    
1538
        path = ''
1539
        success = kwargs.pop('success', (200, 204))
1540
        r = self.get(path, *args, success=success, **kwargs)
1541
        return r.json
1542

    
1543
    def get_object_versionlist(self, obj):
1544
        """
1545
        :param obj: (str) remote object path
1546

1547
        :returns: (list)
1548
        """
1549
        self._assert_container()
1550
        r = self.object_get(obj, format='json', version='list')
1551
        return r.json['versions']