Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 91d443f9

History | View | Annotate | Download (49 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
70
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71
        end, start)
72
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73
        max_value, end)
74
    if not a_range:
75
        return '%s-%s' % (start, end)
76
    selected = []
77
    for some_range in a_range.split(','):
78
        v0, sep, v1 = some_range.partition('-')
79
        if v0:
80
            v0 = int(v0)
81
            if sep:
82
                v1 = int(v1)
83
                if v1 < start or v0 > end or v1 < v0:
84
                    continue
85
                v0 = v0 if v0 > start else start
86
                v1 = v1 if v1 < end else end
87
                selected.append('%s-%s' % (v0, v1))
88
            elif v0 < start:
89
                continue
90
            else:
91
                v1 = v0 if v0 <= end else end
92
                selected.append('%s-%s' % (start, v1))
93
        else:
94
            v1 = int(v1)
95
            if max_value - v1 > end:
96
                continue
97
            v0 = (max_value - v1) if max_value - v1 > start else start
98
            selected.append('%s-%s' % (v0, end))
99
    return ','.join(selected)
100

    
101

    
102
class PithosClient(PithosRestClient):
103
    """Synnefo Pithos+ API client"""
104

    
105
    def __init__(self, base_url, token, account=None, container=None):
106
        super(PithosClient, self).__init__(base_url, token, account, container)
107

    
108
    def create_container(
109
            self,
110
            container=None, sizelimit=None, versioning=None, metadata=None,
111
            **kwargs):
112
        """
113
        :param container: (str) if not given, self.container is used instead
114

115
        :param sizelimit: (int) container total size limit in bytes
116

117
        :param versioning: (str) can be auto or whatever supported by server
118

119
        :param metadata: (dict) Custom user-defined metadata of the form
120
            { 'name1': 'value1', 'name2': 'value2', ... }
121

122
        :returns: (dict) response headers
123
        """
124
        cnt_back_up = self.container
125
        try:
126
            self.container = container or cnt_back_up
127
            r = self.container_put(
128
                quota=sizelimit, versioning=versioning, metadata=metadata,
129
                **kwargs)
130
            return r.headers
131
        finally:
132
            self.container = cnt_back_up
133

    
134
    def purge_container(self, container=None):
135
        """Delete an empty container and destroy associated blocks
136
        """
137
        cnt_back_up = self.container
138
        try:
139
            self.container = container or cnt_back_up
140
            r = self.container_delete(until=unicode(time()))
141
        finally:
142
            self.container = cnt_back_up
143
        return r.headers
144

    
145
    def upload_object_unchunked(
146
            self, obj, f,
147
            withHashFile=False,
148
            size=None,
149
            etag=None,
150
            content_encoding=None,
151
            content_disposition=None,
152
            content_type=None,
153
            sharing=None,
154
            public=None):
155
        """
156
        :param obj: (str) remote object path
157

158
        :param f: open file descriptor
159

160
        :param withHashFile: (bool)
161

162
        :param size: (int) size of data to upload
163

164
        :param etag: (str)
165

166
        :param content_encoding: (str)
167

168
        :param content_disposition: (str)
169

170
        :param content_type: (str)
171

172
        :param sharing: {'read':[user and/or grp names],
173
            'write':[usr and/or grp names]}
174

175
        :param public: (bool)
176

177
        :returns: (dict) created object metadata
178
        """
179
        self._assert_container()
180

    
181
        if withHashFile:
182
            data = f.read()
183
            try:
184
                import json
185
                data = json.dumps(json.loads(data))
186
            except ValueError:
187
                raise ClientError('"%s" is not json-formated' % f.name, 1)
188
            except SyntaxError:
189
                msg = '"%s" is not a valid hashmap file' % f.name
190
                raise ClientError(msg, 1)
191
            f = StringIO(data)
192
        else:
193
            data = f.read(size) if size else f.read()
194
        r = self.object_put(
195
            obj,
196
            data=data,
197
            etag=etag,
198
            content_encoding=content_encoding,
199
            content_disposition=content_disposition,
200
            content_type=content_type,
201
            permissions=sharing,
202
            public=public,
203
            success=201)
204
        return r.headers
205

    
206
    def create_object_by_manifestation(
207
            self, obj,
208
            etag=None,
209
            content_encoding=None,
210
            content_disposition=None,
211
            content_type=None,
212
            sharing=None,
213
            public=None):
214
        """
215
        :param obj: (str) remote object path
216

217
        :param etag: (str)
218

219
        :param content_encoding: (str)
220

221
        :param content_disposition: (str)
222

223
        :param content_type: (str)
224

225
        :param sharing: {'read':[user and/or grp names],
226
            'write':[usr and/or grp names]}
227

228
        :param public: (bool)
229

230
        :returns: (dict) created object metadata
231
        """
232
        self._assert_container()
233
        r = self.object_put(
234
            obj,
235
            content_length=0,
236
            etag=etag,
237
            content_encoding=content_encoding,
238
            content_disposition=content_disposition,
239
            content_type=content_type,
240
            permissions=sharing,
241
            public=public,
242
            manifest='%s/%s' % (self.container, obj))
243
        return r.headers
244

    
245
    # upload_* auxiliary methods
246
    def _put_block_async(self, data, hash):
247
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
248
        event.start()
249
        return event
250

    
251
    def _put_block(self, data, hash):
252
        r = self.container_post(
253
            update=True,
254
            content_type='application/octet-stream',
255
            content_length=len(data),
256
            data=data,
257
            format='json')
258
        assert r.json[0] == hash, 'Local hash does not match server'
259

    
260
    def _get_file_block_info(self, fileobj, size=None, cache=None):
261
        """
262
        :param fileobj: (file descriptor) source
263

264
        :param size: (int) size of data to upload from source
265

266
        :param cache: (dict) if provided, cache container info response to
267
        avoid redundant calls
268
        """
269
        if isinstance(cache, dict):
270
            try:
271
                meta = cache[self.container]
272
            except KeyError:
273
                meta = self.get_container_info()
274
                cache[self.container] = meta
275
        else:
276
            meta = self.get_container_info()
277
        blocksize = int(meta['x-container-block-size'])
278
        blockhash = meta['x-container-block-hash']
279
        size = size if size is not None else fstat(fileobj.fileno()).st_size
280
        nblocks = 1 + (size - 1) // blocksize
281
        return (blocksize, blockhash, size, nblocks)
282

    
283
    def _create_object_or_get_missing_hashes(
284
            self, obj, json,
285
            size=None,
286
            format='json',
287
            hashmap=True,
288
            content_type=None,
289
            if_etag_match=None,
290
            if_etag_not_match=None,
291
            content_encoding=None,
292
            content_disposition=None,
293
            permissions=None,
294
            public=None,
295
            success=(201, 409)):
296
        r = self.object_put(
297
            obj,
298
            format='json',
299
            hashmap=True,
300
            content_type=content_type,
301
            json=json,
302
            if_etag_match=if_etag_match,
303
            if_etag_not_match=if_etag_not_match,
304
            content_encoding=content_encoding,
305
            content_disposition=content_disposition,
306
            permissions=permissions,
307
            public=public,
308
            success=success)
309
        return (None if r.status_code == 201 else r.json), r.headers
310

    
311
    def _calculate_blocks_for_upload(
312
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
313
            hash_cb=None):
314
        offset = 0
315
        if hash_cb:
316
            hash_gen = hash_cb(nblocks)
317
            hash_gen.next()
318

    
319
        for i in range(nblocks):
320
            block = fileobj.read(min(blocksize, size - offset))
321
            bytes = len(block)
322
            hash = _pithos_hash(block, blockhash)
323
            hashes.append(hash)
324
            hmap[hash] = (offset, bytes)
325
            offset += bytes
326
            if hash_cb:
327
                hash_gen.next()
328
        msg = 'Failed to calculate uploaded blocks:'
329
        ' Offset and object size do not match'
330
        assert offset == size, msg
331

    
332
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333
        """upload missing blocks asynchronously"""
334

    
335
        self._init_thread_limit()
336

    
337
        flying = []
338
        failures = []
339
        for hash in missing:
340
            offset, bytes = hmap[hash]
341
            fileobj.seek(offset)
342
            data = fileobj.read(bytes)
343
            r = self._put_block_async(data, hash)
344
            flying.append(r)
345
            unfinished = self._watch_thread_limit(flying)
346
            for thread in set(flying).difference(unfinished):
347
                if thread.exception:
348
                    failures.append(thread)
349
                    if isinstance(
350
                            thread.exception,
351
                            ClientError) and thread.exception.status == 502:
352
                        self.POOLSIZE = self._thread_limit
353
                elif thread.isAlive():
354
                    flying.append(thread)
355
                elif upload_gen:
356
                    try:
357
                        upload_gen.next()
358
                    except:
359
                        pass
360
            flying = unfinished
361

    
362
        for thread in flying:
363
            thread.join()
364
            if thread.exception:
365
                failures.append(thread)
366
            elif upload_gen:
367
                try:
368
                    upload_gen.next()
369
                except:
370
                    pass
371

    
372
        return [failure.kwargs['hash'] for failure in failures]
373

    
374
    def upload_object(
375
            self, obj, f,
376
            size=None,
377
            hash_cb=None,
378
            upload_cb=None,
379
            etag=None,
380
            if_etag_match=None,
381
            if_not_exist=None,
382
            content_encoding=None,
383
            content_disposition=None,
384
            content_type=None,
385
            sharing=None,
386
            public=None,
387
            container_info_cache=None):
388
        """Upload an object using multiple connections (threads)
389

390
        :param obj: (str) remote object path
391

392
        :param f: open file descriptor (rb)
393

394
        :param hash_cb: optional progress.bar object for calculating hashes
395

396
        :param upload_cb: optional progress.bar object for uploading
397

398
        :param etag: (str)
399

400
        :param if_etag_match: (str) Push that value to if-match header at file
401
            creation
402

403
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404
            it does not exist remotely, otherwise the operation will fail.
405
            Involves the case of an object with the same path is created while
406
            the object is being uploaded.
407

408
        :param content_encoding: (str)
409

410
        :param content_disposition: (str)
411

412
        :param content_type: (str)
413

414
        :param sharing: {'read':[user and/or grp names],
415
            'write':[usr and/or grp names]}
416

417
        :param public: (bool)
418

419
        :param container_info_cache: (dict) if given, avoid redundant calls to
420
            server for container info (block size and hash information)
421
        """
422
        self._assert_container()
423

    
424
        block_info = (
425
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426
                f, size, container_info_cache)
427
        (hashes, hmap, offset) = ([], {}, 0)
428
        if not content_type:
429
            content_type = 'application/octet-stream'
430

    
431
        self._calculate_blocks_for_upload(
432
            *block_info,
433
            hashes=hashes,
434
            hmap=hmap,
435
            fileobj=f,
436
            hash_cb=hash_cb)
437

    
438
        hashmap = dict(bytes=size, hashes=hashes)
439
        missing, obj_headers = self._create_object_or_get_missing_hashes(
440
            obj, hashmap,
441
            content_type=content_type,
442
            size=size,
443
            if_etag_match=if_etag_match,
444
            if_etag_not_match='*' if if_not_exist else None,
445
            content_encoding=content_encoding,
446
            content_disposition=content_disposition,
447
            permissions=sharing,
448
            public=public)
449

    
450
        if missing is None:
451
            return obj_headers
452

    
453
        if upload_cb:
454
            upload_gen = upload_cb(len(missing))
455
            for i in range(len(missing), len(hashmap['hashes']) + 1):
456
                try:
457
                    upload_gen.next()
458
                except:
459
                    upload_gen = None
460
        else:
461
            upload_gen = None
462

    
463
        retries = 7
464
        try:
465
            while retries:
466
                sendlog.info('%s blocks missing' % len(missing))
467
                num_of_blocks = len(missing)
468
                missing = self._upload_missing_blocks(
469
                    missing,
470
                    hmap,
471
                    f,
472
                    upload_gen)
473
                if missing:
474
                    if num_of_blocks == len(missing):
475
                        retries -= 1
476
                    else:
477
                        num_of_blocks = len(missing)
478
                else:
479
                    break
480
            if missing:
481
                try:
482
                    details = ['%s' % thread.exception for thread in missing]
483
                except Exception:
484
                    details = ['Also, failed to read thread exceptions']
485
                raise ClientError(
486
                    '%s blocks failed to upload' % len(missing),
487
                    details=details)
488
        except KeyboardInterrupt:
489
            sendlog.info('- - - wait for threads to finish')
490
            for thread in activethreads():
491
                thread.join()
492
            raise
493

    
494
        r = self.object_put(
495
            obj,
496
            format='json',
497
            hashmap=True,
498
            content_type=content_type,
499
            content_encoding=content_encoding,
500
            if_etag_match=if_etag_match,
501
            if_etag_not_match='*' if if_not_exist else None,
502
            etag=etag,
503
            json=hashmap,
504
            permissions=sharing,
505
            public=public,
506
            success=201)
507
        return r.headers
508

    
509
    def upload_from_string(
510
            self, obj, input_str,
511
            hash_cb=None,
512
            upload_cb=None,
513
            etag=None,
514
            if_etag_match=None,
515
            if_not_exist=None,
516
            content_encoding=None,
517
            content_disposition=None,
518
            content_type=None,
519
            sharing=None,
520
            public=None,
521
            container_info_cache=None):
522
        """Upload an object using multiple connections (threads)
523

524
        :param obj: (str) remote object path
525

526
        :param input_str: (str) upload content
527

528
        :param hash_cb: optional progress.bar object for calculating hashes
529

530
        :param upload_cb: optional progress.bar object for uploading
531

532
        :param etag: (str)
533

534
        :param if_etag_match: (str) Push that value to if-match header at file
535
            creation
536

537
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538
            it does not exist remotely, otherwise the operation will fail.
539
            Involves the case of an object with the same path is created while
540
            the object is being uploaded.
541

542
        :param content_encoding: (str)
543

544
        :param content_disposition: (str)
545

546
        :param content_type: (str)
547

548
        :param sharing: {'read':[user and/or grp names],
549
            'write':[usr and/or grp names]}
550

551
        :param public: (bool)
552

553
        :param container_info_cache: (dict) if given, avoid redundant calls to
554
            server for container info (block size and hash information)
555
        """
556
        self._assert_container()
557

    
558
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
559
                fileobj=None, size=len(input_str), cache=container_info_cache)
560
        (hashes, hmap, offset) = ([], {}, 0)
561
        if not content_type:
562
            content_type = 'application/octet-stream'
563

    
564
        hashes = []
565
        hmap = {}
566
        for blockid in range(nblocks):
567
            start = blockid * blocksize
568
            block = input_str[start: (start + blocksize)]
569
            hashes.append(_pithos_hash(block, blockhash))
570
            hmap[hashes[blockid]] = (start, block)
571

    
572
        hashmap = dict(bytes=size, hashes=hashes)
573
        missing, obj_headers = self._create_object_or_get_missing_hashes(
574
            obj, hashmap,
575
            content_type=content_type,
576
            size=size,
577
            if_etag_match=if_etag_match,
578
            if_etag_not_match='*' if if_not_exist else None,
579
            content_encoding=content_encoding,
580
            content_disposition=content_disposition,
581
            permissions=sharing,
582
            public=public)
583
        if missing is None:
584
            return obj_headers
585
        num_of_missing = len(missing)
586

    
587
        if upload_cb:
588
            self.progress_bar_gen = upload_cb(nblocks)
589
            for i in range(nblocks + 1 - num_of_missing):
590
                self._cb_next()
591

    
592
        tries = 7
593
        old_failures = 0
594
        try:
595
            while tries and missing:
596
                flying = []
597
                failures = []
598
                for hash in missing:
599
                    offset, block = hmap[hash]
600
                    bird = self._put_block_async(block, hash)
601
                    flying.append(bird)
602
                    unfinished = self._watch_thread_limit(flying)
603
                    for thread in set(flying).difference(unfinished):
604
                        if thread.exception:
605
                            failures.append(thread.kwargs['hash'])
606
                        if thread.isAlive():
607
                            flying.append(thread)
608
                        else:
609
                            self._cb_next()
610
                    flying = unfinished
611
                for thread in flying:
612
                    thread.join()
613
                    if thread.exception:
614
                        failures.append(thread.kwargs['hash'])
615
                    self._cb_next()
616
                missing = failures
617
                if missing and len(missing) == old_failures:
618
                    tries -= 1
619
                old_failures = len(missing)
620
            if missing:
621
                raise ClientError(
622
                    '%s blocks failed to upload' % len(missing),
623
                    details=['%s' % thread.exception for thread in missing])
624
        except KeyboardInterrupt:
625
            sendlog.info('- - - wait for threads to finish')
626
            for thread in activethreads():
627
                thread.join()
628
            raise
629

    
630
        r = self.object_put(
631
            obj,
632
            format='json',
633
            hashmap=True,
634
            content_type=content_type,
635
            content_encoding=content_encoding,
636
            if_etag_match=if_etag_match,
637
            if_etag_not_match='*' if if_not_exist else None,
638
            etag=etag,
639
            json=hashmap,
640
            permissions=sharing,
641
            public=public,
642
            success=201)
643
        return r.headers
644

    
645
    # download_* auxiliary methods
646
    def _get_remote_blocks_info(self, obj, **restargs):
647
        #retrieve object hashmap
648
        myrange = restargs.pop('data_range', None)
649
        hashmap = self.get_object_hashmap(obj, **restargs)
650
        restargs['data_range'] = myrange
651
        blocksize = int(hashmap['block_size'])
652
        blockhash = hashmap['block_hash']
653
        total_size = hashmap['bytes']
654
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655
        map_dict = {}
656
        for i, h in enumerate(hashmap['hashes']):
657
            #  map_dict[h] = i   CHAGE
658
            if h in map_dict:
659
                map_dict[h].append(i)
660
            else:
661
                map_dict[h] = [i]
662
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663

    
664
    def _dump_blocks_sync(
665
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
666
            **args):
667
        for blockid, blockhash in enumerate(remote_hashes):
668
            if blockhash:
669
                start = blocksize * blockid
670
                is_last = start + blocksize > total_size
671
                end = (total_size - 1) if is_last else (start + blocksize - 1)
672
                data_range = _range_up(start, end, total_size, crange)
673
                if not data_range:
674
                    self._cb_next()
675
                    continue
676
                args['data_range'] = 'bytes=%s' % data_range
677
                r = self.object_get(obj, success=(200, 206), **args)
678
                self._cb_next()
679
                dst.write(r.content)
680
                dst.flush()
681

    
682
    def _get_block_async(self, obj, **args):
683
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
684
        event.start()
685
        return event
686

    
687
    def _hash_from_file(self, fp, start, size, blockhash):
688
        fp.seek(start)
689
        block = fp.read(size)
690
        h = newhashlib(blockhash)
691
        h.update(block.strip('\x00'))
692
        return hexlify(h.digest())
693

    
694
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
695
        """write the results of a greenleted rest call to a file
696

697
        :param offset: the offset of the file up to blocksize
698
        - e.g. if the range is 10-100, all blocks will be written to
699
        normal_position - 10
700
        """
701
        for key, g in flying.items():
702
            if g.isAlive():
703
                continue
704
            if g.exception:
705
                raise g.exception
706
            block = g.value.content
707
            for block_start in blockids[key]:
708
                local_file.seek(block_start + offset)
709
                local_file.write(block)
710
                self._cb_next()
711
            flying.pop(key)
712
            blockids.pop(key)
713
        local_file.flush()
714

    
715
    def _dump_blocks_async(
716
            self, obj, remote_hashes, blocksize, total_size, local_file,
717
            blockhash=None, resume=False, filerange=None, **restargs):
718
        file_size = fstat(local_file.fileno()).st_size if resume else 0
719
        flying = dict()
720
        blockid_dict = dict()
721
        offset = 0
722

    
723
        self._init_thread_limit()
724
        for block_hash, blockids in remote_hashes.items():
725
            blockids = [blk * blocksize for blk in blockids]
726
            unsaved = [blk for blk in blockids if not (
727
                blk < file_size and block_hash == self._hash_from_file(
728
                        local_file, blk, blocksize, blockhash))]
729
            self._cb_next(len(blockids) - len(unsaved))
730
            if unsaved:
731
                key = unsaved[0]
732
                if key:
733
                    self._watch_thread_limit(flying.values())
734
                    self._thread2file(
735
                        flying, blockid_dict, local_file, offset,
736
                        **restargs)
737
                    end = total_size - 1 if (
738
                        key + blocksize > total_size) else key + blocksize - 1
739
                    data_range = _range_up(key, end, total_size, filerange)
740
                    if not data_range:
741
                        self._cb_next()
742
                        continue
743
                    restargs[
744
                        'async_headers'] = {'Range': 'bytes=%s' % data_range}
745
                    flying[key] = self._get_block_async(obj, **restargs)
746
                    blockid_dict[key] = unsaved
747

    
748
        for thread in flying.values():
749
            thread.join()
750
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
751

    
752
    def download_object(
753
            self, obj, dst,
754
            download_cb=None,
755
            version=None,
756
            resume=False,
757
            range_str=None,
758
            if_match=None,
759
            if_none_match=None,
760
            if_modified_since=None,
761
            if_unmodified_since=None):
762
        """Download an object (multiple connections, random blocks)
763

764
        :param obj: (str) remote object path
765

766
        :param dst: open file descriptor (wb+)
767

768
        :param download_cb: optional progress.bar object for downloading
769

770
        :param version: (str) file version
771

772
        :param resume: (bool) if set, preserve already downloaded file parts
773

774
        :param range_str: (str) from, to are file positions (int) in bytes
775

776
        :param if_match: (str)
777

778
        :param if_none_match: (str)
779

780
        :param if_modified_since: (str) formated date
781

782
        :param if_unmodified_since: (str) formated date"""
783
        restargs = dict(
784
            version=version,
785
            data_range=None if range_str is None else 'bytes=%s' % range_str,
786
            if_match=if_match,
787
            if_none_match=if_none_match,
788
            if_modified_since=if_modified_since,
789
            if_unmodified_since=if_unmodified_since)
790

    
791
        (
792
            blocksize,
793
            blockhash,
794
            total_size,
795
            hash_list,
796
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
797
        assert total_size >= 0
798

    
799
        if download_cb:
800
            self.progress_bar_gen = download_cb(len(hash_list))
801
            self._cb_next()
802

    
803
        if dst.isatty():
804
            self._dump_blocks_sync(
805
                obj,
806
                hash_list,
807
                blocksize,
808
                total_size,
809
                dst,
810
                range_str,
811
                **restargs)
812
        else:
813
            self._dump_blocks_async(
814
                obj,
815
                remote_hashes,
816
                blocksize,
817
                total_size,
818
                dst,
819
                blockhash,
820
                resume,
821
                range_str,
822
                **restargs)
823
            if not range_str:
824
                dst.truncate(total_size)
825

    
826
        self._complete_cb()
827

    
828
    def download_to_string(
829
            self, obj,
830
            download_cb=None,
831
            version=None,
832
            range_str=None,
833
            if_match=None,
834
            if_none_match=None,
835
            if_modified_since=None,
836
            if_unmodified_since=None):
837
        """Download an object to a string (multiple connections). This method
838
        uses threads for http requests, but stores all content in memory.
839

840
        :param obj: (str) remote object path
841

842
        :param download_cb: optional progress.bar object for downloading
843

844
        :param version: (str) file version
845

846
        :param range_str: (str) from, to are file positions (int) in bytes
847

848
        :param if_match: (str)
849

850
        :param if_none_match: (str)
851

852
        :param if_modified_since: (str) formated date
853

854
        :param if_unmodified_since: (str) formated date
855

856
        :returns: (str) the whole object contents
857
        """
858
        restargs = dict(
859
            version=version,
860
            data_range=None if range_str is None else 'bytes=%s' % range_str,
861
            if_match=if_match,
862
            if_none_match=if_none_match,
863
            if_modified_since=if_modified_since,
864
            if_unmodified_since=if_unmodified_since)
865

    
866
        (
867
            blocksize,
868
            blockhash,
869
            total_size,
870
            hash_list,
871
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
872
        assert total_size >= 0
873

    
874
        if download_cb:
875
            self.progress_bar_gen = download_cb(len(hash_list))
876
            self._cb_next()
877

    
878
        num_of_blocks = len(remote_hashes)
879
        ret = [''] * num_of_blocks
880
        self._init_thread_limit()
881
        flying = dict()
882
        try:
883
            for blockid, blockhash in enumerate(remote_hashes):
884
                start = blocksize * blockid
885
                is_last = start + blocksize > total_size
886
                end = (total_size - 1) if is_last else (start + blocksize - 1)
887
                data_range_str = _range_up(start, end, end, range_str)
888
                if data_range_str:
889
                    self._watch_thread_limit(flying.values())
890
                    restargs['data_range'] = 'bytes=%s' % data_range_str
891
                    flying[blockid] = self._get_block_async(obj, **restargs)
892
                for runid, thread in flying.items():
893
                    if (blockid + 1) == num_of_blocks:
894
                        thread.join()
895
                    elif thread.isAlive():
896
                        continue
897
                    if thread.exception:
898
                        raise thread.exception
899
                    ret[runid] = thread.value.content
900
                    self._cb_next()
901
                    flying.pop(runid)
902
            return ''.join(ret)
903
        except KeyboardInterrupt:
904
            sendlog.info('- - - wait for threads to finish')
905
            for thread in activethreads():
906
                thread.join()
907

    
908
    #Command Progress Bar method
909
    def _cb_next(self, step=1):
910
        if hasattr(self, 'progress_bar_gen'):
911
            try:
912
                for i in xrange(step):
913
                    self.progress_bar_gen.next()
914
            except:
915
                pass
916

    
917
    def _complete_cb(self):
918
        while True:
919
            try:
920
                self.progress_bar_gen.next()
921
            except:
922
                break
923

    
924
    def get_object_hashmap(
925
            self, obj,
926
            version=None,
927
            if_match=None,
928
            if_none_match=None,
929
            if_modified_since=None,
930
            if_unmodified_since=None):
931
        """
932
        :param obj: (str) remote object path
933

934
        :param if_match: (str)
935

936
        :param if_none_match: (str)
937

938
        :param if_modified_since: (str) formated date
939

940
        :param if_unmodified_since: (str) formated date
941

942
        :returns: (list)
943
        """
944
        try:
945
            r = self.object_get(
946
                obj,
947
                hashmap=True,
948
                version=version,
949
                if_etag_match=if_match,
950
                if_etag_not_match=if_none_match,
951
                if_modified_since=if_modified_since,
952
                if_unmodified_since=if_unmodified_since)
953
        except ClientError as err:
954
            if err.status == 304 or err.status == 412:
955
                return {}
956
            raise
957
        return r.json
958

    
959
    def set_account_group(self, group, usernames):
960
        """
961
        :param group: (str)
962

963
        :param usernames: (list)
964
        """
965
        r = self.account_post(update=True, groups={group: usernames})
966
        return r
967

    
968
    def del_account_group(self, group):
969
        """
970
        :param group: (str)
971
        """
972
        self.account_post(update=True, groups={group: []})
973

    
974
    def get_account_info(self, until=None):
975
        """
976
        :param until: (str) formated date
977

978
        :returns: (dict)
979
        """
980
        r = self.account_head(until=until)
981
        if r.status_code == 401:
982
            raise ClientError("No authorization", status=401)
983
        return r.headers
984

    
985
    def get_account_quota(self):
986
        """
987
        :returns: (dict)
988
        """
989
        return filter_in(
990
            self.get_account_info(),
991
            'X-Account-Policy-Quota',
992
            exactMatch=True)
993

    
994
    #def get_account_versioning(self):
995
    #    """
996
    #    :returns: (dict)
997
    #    """
998
    #    return filter_in(
999
    #        self.get_account_info(),
1000
    #        'X-Account-Policy-Versioning',
1001
    #        exactMatch=True)
1002

    
1003
    def get_account_meta(self, until=None):
1004
        """
1005
        :param until: (str) formated date
1006

1007
        :returns: (dict)
1008
        """
1009
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1010

    
1011
    def get_account_group(self):
1012
        """
1013
        :returns: (dict)
1014
        """
1015
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1016

    
1017
    def set_account_meta(self, metapairs):
1018
        """
1019
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1020
        """
1021
        assert(type(metapairs) is dict)
1022
        r = self.account_post(update=True, metadata=metapairs)
1023
        return r.headers
1024

    
1025
    def del_account_meta(self, metakey):
1026
        """
1027
        :param metakey: (str) metadatum key
1028
        """
1029
        r = self.account_post(update=True, metadata={metakey: ''})
1030
        return r.headers
1031

    
1032
    #def set_account_quota(self, quota):
1033
    #    """
1034
    #    :param quota: (int)
1035
    #    """
1036
    #    self.account_post(update=True, quota=quota)
1037

    
1038
    #def set_account_versioning(self, versioning):
1039
    #    """
1040
    #    :param versioning: (str)
1041
    #    """
1042
    #    r = self.account_post(update=True, versioning=versioning)
1043
    #    return r.headers
1044

    
1045
    def list_containers(self):
1046
        """
1047
        :returns: (dict)
1048
        """
1049
        r = self.account_get()
1050
        return r.json
1051

    
1052
    def del_container(self, until=None, delimiter=None):
1053
        """
1054
        :param until: (str) formated date
1055

1056
        :param delimiter: (str) with / empty container
1057

1058
        :raises ClientError: 404 Container does not exist
1059

1060
        :raises ClientError: 409 Container is not empty
1061
        """
1062
        self._assert_container()
1063
        r = self.container_delete(
1064
            until=until,
1065
            delimiter=delimiter,
1066
            success=(204, 404, 409))
1067
        if r.status_code == 404:
1068
            raise ClientError(
1069
                'Container "%s" does not exist' % self.container,
1070
                r.status_code)
1071
        elif r.status_code == 409:
1072
            raise ClientError(
1073
                'Container "%s" is not empty' % self.container,
1074
                r.status_code)
1075
        return r.headers
1076

    
1077
    def get_container_versioning(self, container=None):
1078
        """
1079
        :param container: (str)
1080

1081
        :returns: (dict)
1082
        """
1083
        cnt_back_up = self.container
1084
        try:
1085
            self.container = container or cnt_back_up
1086
            return filter_in(
1087
                self.get_container_info(),
1088
                'X-Container-Policy-Versioning')
1089
        finally:
1090
            self.container = cnt_back_up
1091

    
1092
    def get_container_limit(self, container=None):
1093
        """
1094
        :param container: (str)
1095

1096
        :returns: (dict)
1097
        """
1098
        cnt_back_up = self.container
1099
        try:
1100
            self.container = container or cnt_back_up
1101
            return filter_in(
1102
                self.get_container_info(),
1103
                'X-Container-Policy-Quota')
1104
        finally:
1105
            self.container = cnt_back_up
1106

    
1107
    def get_container_info(self, until=None):
1108
        """
1109
        :param until: (str) formated date
1110

1111
        :returns: (dict)
1112

1113
        :raises ClientError: 404 Container not found
1114
        """
1115
        try:
1116
            r = self.container_head(until=until)
1117
        except ClientError as err:
1118
            err.details.append('for container %s' % self.container)
1119
            raise err
1120
        return r.headers
1121

    
1122
    def get_container_meta(self, until=None):
1123
        """
1124
        :param until: (str) formated date
1125

1126
        :returns: (dict)
1127
        """
1128
        return filter_in(
1129
            self.get_container_info(until=until),
1130
            'X-Container-Meta')
1131

    
1132
    def get_container_object_meta(self, until=None):
1133
        """
1134
        :param until: (str) formated date
1135

1136
        :returns: (dict)
1137
        """
1138
        return filter_in(
1139
            self.get_container_info(until=until),
1140
            'X-Container-Object-Meta')
1141

    
1142
    def set_container_meta(self, metapairs):
1143
        """
1144
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1145
        """
1146
        assert(type(metapairs) is dict)
1147
        r = self.container_post(update=True, metadata=metapairs)
1148
        return r.headers
1149

    
1150
    def del_container_meta(self, metakey):
1151
        """
1152
        :param metakey: (str) metadatum key
1153

1154
        :returns: (dict) response headers
1155
        """
1156
        r = self.container_post(update=True, metadata={metakey: ''})
1157
        return r.headers
1158

    
1159
    def set_container_limit(self, limit):
1160
        """
1161
        :param limit: (int)
1162
        """
1163
        r = self.container_post(update=True, quota=limit)
1164
        return r.headers
1165

    
1166
    def set_container_versioning(self, versioning):
1167
        """
1168
        :param versioning: (str)
1169
        """
1170
        r = self.container_post(update=True, versioning=versioning)
1171
        return r.headers
1172

    
1173
    def del_object(self, obj, until=None, delimiter=None):
1174
        """
1175
        :param obj: (str) remote object path
1176

1177
        :param until: (str) formated date
1178

1179
        :param delimiter: (str)
1180
        """
1181
        self._assert_container()
1182
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1183
        return r.headers
1184

    
1185
    def set_object_meta(self, obj, metapairs):
1186
        """
1187
        :param obj: (str) remote object path
1188

1189
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1190
        """
1191
        assert(type(metapairs) is dict)
1192
        r = self.object_post(obj, update=True, metadata=metapairs)
1193
        return r.headers
1194

    
1195
    def del_object_meta(self, obj, metakey):
1196
        """
1197
        :param obj: (str) remote object path
1198

1199
        :param metakey: (str) metadatum key
1200
        """
1201
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1202
        return r.headers
1203

    
1204
    def publish_object(self, obj):
1205
        """
1206
        :param obj: (str) remote object path
1207

1208
        :returns: (str) access url
1209
        """
1210
        self.object_post(obj, update=True, public=True)
1211
        info = self.get_object_info(obj)
1212
        return info['x-object-public']
1213
        pref, sep, rest = self.base_url.partition('//')
1214
        base = rest.split('/')[0]
1215
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1216

    
1217
    def unpublish_object(self, obj):
1218
        """
1219
        :param obj: (str) remote object path
1220
        """
1221
        r = self.object_post(obj, update=True, public=False)
1222
        return r.headers
1223

    
1224
    def get_object_info(self, obj, version=None):
1225
        """
1226
        :param obj: (str) remote object path
1227

1228
        :param version: (str)
1229

1230
        :returns: (dict)
1231
        """
1232
        try:
1233
            r = self.object_head(obj, version=version)
1234
            return r.headers
1235
        except ClientError as ce:
1236
            if ce.status == 404:
1237
                raise ClientError('Object %s not found' % obj, status=404)
1238
            raise
1239

    
1240
    def get_object_meta(self, obj, version=None):
1241
        """
1242
        :param obj: (str) remote object path
1243

1244
        :param version: (str)
1245

1246
        :returns: (dict)
1247
        """
1248
        return filter_in(
1249
            self.get_object_info(obj, version=version),
1250
            'X-Object-Meta')
1251

    
1252
    def get_object_sharing(self, obj):
1253
        """
1254
        :param obj: (str) remote object path
1255

1256
        :returns: (dict)
1257
        """
1258
        r = filter_in(
1259
            self.get_object_info(obj),
1260
            'X-Object-Sharing',
1261
            exactMatch=True)
1262
        reply = {}
1263
        if len(r) > 0:
1264
            perms = r['x-object-sharing'].split(';')
1265
            for perm in perms:
1266
                try:
1267
                    perm.index('=')
1268
                except ValueError:
1269
                    raise ClientError('Incorrect reply format')
1270
                (key, val) = perm.strip().split('=')
1271
                reply[key] = val
1272
        return reply
1273

    
1274
    def set_object_sharing(
1275
            self, obj,
1276
            read_permission=False, write_permission=False):
1277
        """Give read/write permisions to an object.
1278

1279
        :param obj: (str) remote object path
1280

1281
        :param read_permission: (list - bool) users and user groups that get
1282
            read permission for this object - False means all previous read
1283
            permissions will be removed
1284

1285
        :param write_permission: (list - bool) of users and user groups to get
1286
           write permission for this object - False means all previous write
1287
           permissions will be removed
1288

1289
        :returns: (dict) response headers
1290
        """
1291

    
1292
        perms = dict(read=read_permission or '', write=write_permission or '')
1293
        r = self.object_post(obj, update=True, permissions=perms)
1294
        return r.headers
1295

    
1296
    def del_object_sharing(self, obj):
1297
        """
1298
        :param obj: (str) remote object path
1299
        """
1300
        return self.set_object_sharing(obj)
1301

    
1302
    def append_object(self, obj, source_file, upload_cb=None):
1303
        """
1304
        :param obj: (str) remote object path
1305

1306
        :param source_file: open file descriptor
1307

1308
        :param upload_db: progress.bar for uploading
1309
        """
1310
        self._assert_container()
1311
        meta = self.get_container_info()
1312
        blocksize = int(meta['x-container-block-size'])
1313
        filesize = fstat(source_file.fileno()).st_size
1314
        nblocks = 1 + (filesize - 1) // blocksize
1315
        offset = 0
1316
        headers = {}
1317
        if upload_cb:
1318
            self.progress_bar_gen = upload_cb(nblocks)
1319
            self._cb_next()
1320
        flying = {}
1321
        self._init_thread_limit()
1322
        try:
1323
            for i in range(nblocks):
1324
                block = source_file.read(min(blocksize, filesize - offset))
1325
                offset += len(block)
1326

    
1327
                self._watch_thread_limit(flying.values())
1328
                unfinished = {}
1329
                flying[i] = SilentEvent(
1330
                    method=self.object_post,
1331
                    obj=obj,
1332
                    update=True,
1333
                    content_range='bytes */*',
1334
                    content_type='application/octet-stream',
1335
                    content_length=len(block),
1336
                    data=block)
1337
                flying[i].start()
1338

    
1339
                for key, thread in flying.items():
1340
                    if thread.isAlive():
1341
                        if i < nblocks:
1342
                            unfinished[key] = thread
1343
                            continue
1344
                        thread.join()
1345
                    if thread.exception:
1346
                        raise thread.exception
1347
                    headers[key] = thread.value.headers
1348
                    self._cb_next()
1349
                flying = unfinished
1350
        except KeyboardInterrupt:
1351
            sendlog.info('- - - wait for threads to finish')
1352
            for thread in activethreads():
1353
                thread.join()
1354
        finally:
1355
            from time import sleep
1356
            sleep(2 * len(activethreads()))
1357
        return headers.values()
1358

    
1359
    def truncate_object(self, obj, upto_bytes):
1360
        """
1361
        :param obj: (str) remote object path
1362

1363
        :param upto_bytes: max number of bytes to leave on file
1364

1365
        :returns: (dict) response headers
1366
        """
1367
        r = self.object_post(
1368
            obj,
1369
            update=True,
1370
            content_range='bytes 0-%s/*' % upto_bytes,
1371
            content_type='application/octet-stream',
1372
            object_bytes=upto_bytes,
1373
            source_object=path4url(self.container, obj))
1374
        return r.headers
1375

    
1376
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1377
        """Overwrite a part of an object from local source file
1378

1379
        :param obj: (str) remote object path
1380

1381
        :param start: (int) position in bytes to start overwriting from
1382

1383
        :param end: (int) position in bytes to stop overwriting at
1384

1385
        :param source_file: open file descriptor
1386

1387
        :param upload_db: progress.bar for uploading
1388
        """
1389

    
1390
        r = self.get_object_info(obj)
1391
        rf_size = int(r['content-length'])
1392
        if rf_size < int(start):
1393
            raise ClientError(
1394
                'Range start exceeds file size',
1395
                status=416)
1396
        elif rf_size < int(end):
1397
            raise ClientError(
1398
                'Range end exceeds file size',
1399
                status=416)
1400
        self._assert_container()
1401
        meta = self.get_container_info()
1402
        blocksize = int(meta['x-container-block-size'])
1403
        filesize = fstat(source_file.fileno()).st_size
1404
        datasize = int(end) - int(start) + 1
1405
        nblocks = 1 + (datasize - 1) // blocksize
1406
        offset = 0
1407
        if upload_cb:
1408
            self.progress_bar_gen = upload_cb(nblocks)
1409
            self._cb_next()
1410
        headers = []
1411
        for i in range(nblocks):
1412
            read_size = min(blocksize, filesize - offset, datasize - offset)
1413
            block = source_file.read(read_size)
1414
            r = self.object_post(
1415
                obj,
1416
                update=True,
1417
                content_type='application/octet-stream',
1418
                content_length=len(block),
1419
                content_range='bytes %s-%s/*' % (
1420
                    start + offset,
1421
                    start + offset + len(block) - 1),
1422
                data=block)
1423
            headers.append(dict(r.headers))
1424
            offset += len(block)
1425

    
1426
            self._cb_next
1427
        return headers
1428

    
1429
    def copy_object(
1430
            self, src_container, src_object, dst_container,
1431
            dst_object=None,
1432
            source_version=None,
1433
            source_account=None,
1434
            public=False,
1435
            content_type=None,
1436
            delimiter=None):
1437
        """
1438
        :param src_container: (str) source container
1439

1440
        :param src_object: (str) source object path
1441

1442
        :param dst_container: (str) destination container
1443

1444
        :param dst_object: (str) destination object path
1445

1446
        :param source_version: (str) source object version
1447

1448
        :param source_account: (str) account to copy from
1449

1450
        :param public: (bool)
1451

1452
        :param content_type: (str)
1453

1454
        :param delimiter: (str)
1455

1456
        :returns: (dict) response headers
1457
        """
1458
        self._assert_account()
1459
        self.container = dst_container
1460
        src_path = path4url(src_container, src_object)
1461
        r = self.object_put(
1462
            dst_object or src_object,
1463
            success=201,
1464
            copy_from=src_path,
1465
            content_length=0,
1466
            source_version=source_version,
1467
            source_account=source_account,
1468
            public=public,
1469
            content_type=content_type,
1470
            delimiter=delimiter)
1471
        return r.headers
1472

    
1473
    def move_object(
1474
            self, src_container, src_object, dst_container,
1475
            dst_object=False,
1476
            source_account=None,
1477
            source_version=None,
1478
            public=False,
1479
            content_type=None,
1480
            delimiter=None):
1481
        """
1482
        :param src_container: (str) source container
1483

1484
        :param src_object: (str) source object path
1485

1486
        :param dst_container: (str) destination container
1487

1488
        :param dst_object: (str) destination object path
1489

1490
        :param source_account: (str) account to move from
1491

1492
        :param source_version: (str) source object version
1493

1494
        :param public: (bool)
1495

1496
        :param content_type: (str)
1497

1498
        :param delimiter: (str)
1499

1500
        :returns: (dict) response headers
1501
        """
1502
        self._assert_account()
1503
        self.container = dst_container
1504
        dst_object = dst_object or src_object
1505
        src_path = path4url(src_container, src_object)
1506
        r = self.object_put(
1507
            dst_object,
1508
            success=201,
1509
            move_from=src_path,
1510
            content_length=0,
1511
            source_account=source_account,
1512
            source_version=source_version,
1513
            public=public,
1514
            content_type=content_type,
1515
            delimiter=delimiter)
1516
        return r.headers
1517

    
1518
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1519
        """Get accounts that share with self.account
1520

1521
        :param limit: (str)
1522

1523
        :param marker: (str)
1524

1525
        :returns: (dict)
1526
        """
1527
        self._assert_account()
1528

    
1529
        self.set_param('format', 'json')
1530
        self.set_param('limit', limit, iff=limit is not None)
1531
        self.set_param('marker', marker, iff=marker is not None)
1532

    
1533
        path = ''
1534
        success = kwargs.pop('success', (200, 204))
1535
        r = self.get(path, *args, success=success, **kwargs)
1536
        return r.json
1537

    
1538
    def get_object_versionlist(self, obj):
1539
        """
1540
        :param obj: (str) remote object path
1541

1542
        :returns: (list)
1543
        """
1544
        self._assert_container()
1545
        r = self.object_get(obj, format='json', version='list')
1546
        return r.json['versions']