Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ da17a63d

History | View | Annotate | Download (49.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in, readall
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up called w. start(%s) < 0' % start
70
    assert end >= start, '_range_up called w. end(%s) < start(%s)' % (
71
        end, start)
72
    assert end <= max_value, '_range_up called w. max_value(%s) < end(%s)' % (
73
        max_value, end)
74
    if not a_range:
75
        return '%s-%s' % (start, end)
76
    selected = []
77
    for some_range in a_range.split(','):
78
        v0, sep, v1 = some_range.partition('-')
79
        if v0:
80
            v0 = int(v0)
81
            if sep:
82
                v1 = int(v1)
83
                if v1 < start or v0 > end or v1 < v0:
84
                    continue
85
                v0 = v0 if v0 > start else start
86
                v1 = v1 if v1 < end else end
87
                selected.append('%s-%s' % (v0, v1))
88
            elif v0 < start:
89
                continue
90
            else:
91
                v1 = v0 if v0 <= end else end
92
                selected.append('%s-%s' % (start, v1))
93
        else:
94
            v1 = int(v1)
95
            if max_value - v1 > end:
96
                continue
97
            v0 = (max_value - v1) if max_value - v1 > start else start
98
            selected.append('%s-%s' % (v0, end))
99
    return ','.join(selected)
100

    
101

    
102
class PithosClient(PithosRestClient):
103
    """Synnefo Pithos+ API client"""
104

    
105
    def __init__(self, base_url, token, account=None, container=None):
106
        super(PithosClient, self).__init__(base_url, token, account, container)
107

    
108
    def create_container(
109
            self,
110
            container=None, sizelimit=None, versioning=None, metadata=None,
111
            **kwargs):
112
        """
113
        :param container: (str) if not given, self.container is used instead
114

115
        :param sizelimit: (int) container total size limit in bytes
116

117
        :param versioning: (str) can be auto or whatever supported by server
118

119
        :param metadata: (dict) Custom user-defined metadata of the form
120
            { 'name1': 'value1', 'name2': 'value2', ... }
121

122
        :returns: (dict) response headers
123
        """
124
        cnt_back_up = self.container
125
        try:
126
            self.container = container or cnt_back_up
127
            r = self.container_put(
128
                quota=sizelimit, versioning=versioning, metadata=metadata,
129
                **kwargs)
130
            return r.headers
131
        finally:
132
            self.container = cnt_back_up
133

    
134
    def purge_container(self, container=None):
135
        """Delete an empty container and destroy associated blocks
136
        """
137
        cnt_back_up = self.container
138
        try:
139
            self.container = container or cnt_back_up
140
            r = self.container_delete(until=unicode(time()))
141
        finally:
142
            self.container = cnt_back_up
143
        return r.headers
144

    
145
    def upload_object_unchunked(
146
            self, obj, f,
147
            withHashFile=False,
148
            size=None,
149
            etag=None,
150
            content_encoding=None,
151
            content_disposition=None,
152
            content_type=None,
153
            sharing=None,
154
            public=None):
155
        """
156
        :param obj: (str) remote object path
157

158
        :param f: open file descriptor
159

160
        :param withHashFile: (bool)
161

162
        :param size: (int) size of data to upload
163

164
        :param etag: (str)
165

166
        :param content_encoding: (str)
167

168
        :param content_disposition: (str)
169

170
        :param content_type: (str)
171

172
        :param sharing: {'read':[user and/or grp names],
173
            'write':[usr and/or grp names]}
174

175
        :param public: (bool)
176

177
        :returns: (dict) created object metadata
178
        """
179
        self._assert_container()
180

    
181
        if withHashFile:
182
            data = f.read()
183
            try:
184
                import json
185
                data = json.dumps(json.loads(data))
186
            except ValueError:
187
                raise ClientError('"%s" is not json-formated' % f.name, 1)
188
            except SyntaxError:
189
                msg = '"%s" is not a valid hashmap file' % f.name
190
                raise ClientError(msg, 1)
191
            f = StringIO(data)
192
        else:
193
            data = readall(f, size) if size else f.read()
194
        r = self.object_put(
195
            obj,
196
            data=data,
197
            etag=etag,
198
            content_encoding=content_encoding,
199
            content_disposition=content_disposition,
200
            content_type=content_type,
201
            permissions=sharing,
202
            public=public,
203
            success=201)
204
        return r.headers
205

    
206
    def create_object_by_manifestation(
207
            self, obj,
208
            etag=None,
209
            content_encoding=None,
210
            content_disposition=None,
211
            content_type=None,
212
            sharing=None,
213
            public=None):
214
        """
215
        :param obj: (str) remote object path
216

217
        :param etag: (str)
218

219
        :param content_encoding: (str)
220

221
        :param content_disposition: (str)
222

223
        :param content_type: (str)
224

225
        :param sharing: {'read':[user and/or grp names],
226
            'write':[usr and/or grp names]}
227

228
        :param public: (bool)
229

230
        :returns: (dict) created object metadata
231
        """
232
        self._assert_container()
233
        r = self.object_put(
234
            obj,
235
            content_length=0,
236
            etag=etag,
237
            content_encoding=content_encoding,
238
            content_disposition=content_disposition,
239
            content_type=content_type,
240
            permissions=sharing,
241
            public=public,
242
            manifest='%s/%s' % (self.container, obj))
243
        return r.headers
244

    
245
    # upload_* auxiliary methods
246
    def _put_block_async(self, data, hash):
247
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
248
        event.start()
249
        return event
250

    
251
    def _put_block(self, data, hash):
252
        r = self.container_post(
253
            update=True,
254
            content_type='application/octet-stream',
255
            content_length=len(data),
256
            data=data,
257
            format='json')
258
        assert r.json[0] == hash, 'Local hash does not match server'
259

    
260
    def _get_file_block_info(self, fileobj, size=None, cache=None):
261
        """
262
        :param fileobj: (file descriptor) source
263

264
        :param size: (int) size of data to upload from source
265

266
        :param cache: (dict) if provided, cache container info response to
267
        avoid redundant calls
268
        """
269
        if isinstance(cache, dict):
270
            try:
271
                meta = cache[self.container]
272
            except KeyError:
273
                meta = self.get_container_info()
274
                cache[self.container] = meta
275
        else:
276
            meta = self.get_container_info()
277
        blocksize = int(meta['x-container-block-size'])
278
        blockhash = meta['x-container-block-hash']
279
        size = size if size is not None else fstat(fileobj.fileno()).st_size
280
        nblocks = 1 + (size - 1) // blocksize
281
        return (blocksize, blockhash, size, nblocks)
282

    
283
    def _create_object_or_get_missing_hashes(
284
            self, obj, json,
285
            size=None,
286
            format='json',
287
            hashmap=True,
288
            content_type=None,
289
            if_etag_match=None,
290
            if_etag_not_match=None,
291
            content_encoding=None,
292
            content_disposition=None,
293
            permissions=None,
294
            public=None,
295
            success=(201, 409)):
296
        r = self.object_put(
297
            obj,
298
            format='json',
299
            hashmap=True,
300
            content_type=content_type,
301
            json=json,
302
            if_etag_match=if_etag_match,
303
            if_etag_not_match=if_etag_not_match,
304
            content_encoding=content_encoding,
305
            content_disposition=content_disposition,
306
            permissions=permissions,
307
            public=public,
308
            success=success)
309
        return (None if r.status_code == 201 else r.json), r.headers
310

    
311
    def _calculate_blocks_for_upload(
312
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
313
            hash_cb=None):
314
        offset = 0
315
        if hash_cb:
316
            hash_gen = hash_cb(nblocks)
317
            hash_gen.next()
318

    
319
        for i in range(nblocks):
320
            block = readall(fileobj, min(blocksize, size - offset))
321
            bytes = len(block)
322
            hash = _pithos_hash(block, blockhash)
323
            hashes.append(hash)
324
            hmap[hash] = (offset, bytes)
325
            offset += bytes
326
            if hash_cb:
327
                hash_gen.next()
328
        msg = ('Failed to calculate uploaded blocks:'
329
               ' Offset and object size do not match')
330
        assert offset == size, msg
331

    
332
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
333
        """upload missing blocks asynchronously"""
334

    
335
        self._init_thread_limit()
336

    
337
        flying = []
338
        failures = []
339
        for hash in missing:
340
            offset, bytes = hmap[hash]
341
            fileobj.seek(offset)
342
            data = readall(fileobj, bytes)
343
            r = self._put_block_async(data, hash)
344
            flying.append(r)
345
            unfinished = self._watch_thread_limit(flying)
346
            for thread in set(flying).difference(unfinished):
347
                if thread.exception:
348
                    failures.append(thread)
349
                    if isinstance(
350
                            thread.exception,
351
                            ClientError) and thread.exception.status == 502:
352
                        self.POOLSIZE = self._thread_limit
353
                elif thread.isAlive():
354
                    flying.append(thread)
355
                elif upload_gen:
356
                    try:
357
                        upload_gen.next()
358
                    except:
359
                        pass
360
            flying = unfinished
361

    
362
        for thread in flying:
363
            thread.join()
364
            if thread.exception:
365
                failures.append(thread)
366
            elif upload_gen:
367
                try:
368
                    upload_gen.next()
369
                except:
370
                    pass
371

    
372
        return [failure.kwargs['hash'] for failure in failures]
373

    
374
    def upload_object(
375
            self, obj, f,
376
            size=None,
377
            hash_cb=None,
378
            upload_cb=None,
379
            etag=None,
380
            if_etag_match=None,
381
            if_not_exist=None,
382
            content_encoding=None,
383
            content_disposition=None,
384
            content_type=None,
385
            sharing=None,
386
            public=None,
387
            container_info_cache=None):
388
        """Upload an object using multiple connections (threads)
389

390
        :param obj: (str) remote object path
391

392
        :param f: open file descriptor (rb)
393

394
        :param hash_cb: optional progress.bar object for calculating hashes
395

396
        :param upload_cb: optional progress.bar object for uploading
397

398
        :param etag: (str)
399

400
        :param if_etag_match: (str) Push that value to if-match header at file
401
            creation
402

403
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
404
            it does not exist remotely, otherwise the operation will fail.
405
            Involves the case of an object with the same path is created while
406
            the object is being uploaded.
407

408
        :param content_encoding: (str)
409

410
        :param content_disposition: (str)
411

412
        :param content_type: (str)
413

414
        :param sharing: {'read':[user and/or grp names],
415
            'write':[usr and/or grp names]}
416

417
        :param public: (bool)
418

419
        :param container_info_cache: (dict) if given, avoid redundant calls to
420
            server for container info (block size and hash information)
421
        """
422
        self._assert_container()
423

    
424
        block_info = (
425
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
426
                f, size, container_info_cache)
427
        (hashes, hmap, offset) = ([], {}, 0)
428
        if not content_type:
429
            content_type = 'application/octet-stream'
430

    
431
        self._calculate_blocks_for_upload(
432
            *block_info,
433
            hashes=hashes,
434
            hmap=hmap,
435
            fileobj=f,
436
            hash_cb=hash_cb)
437

    
438
        hashmap = dict(bytes=size, hashes=hashes)
439
        missing, obj_headers = self._create_object_or_get_missing_hashes(
440
            obj, hashmap,
441
            content_type=content_type,
442
            size=size,
443
            if_etag_match=if_etag_match,
444
            if_etag_not_match='*' if if_not_exist else None,
445
            content_encoding=content_encoding,
446
            content_disposition=content_disposition,
447
            permissions=sharing,
448
            public=public)
449

    
450
        if missing is None:
451
            return obj_headers
452

    
453
        if upload_cb:
454
            upload_gen = upload_cb(len(missing))
455
            for i in range(len(missing), len(hashmap['hashes']) + 1):
456
                try:
457
                    upload_gen.next()
458
                except:
459
                    upload_gen = None
460
        else:
461
            upload_gen = None
462

    
463
        retries = 7
464
        try:
465
            while retries:
466
                sendlog.info('%s blocks missing' % len(missing))
467
                num_of_blocks = len(missing)
468
                missing = self._upload_missing_blocks(
469
                    missing,
470
                    hmap,
471
                    f,
472
                    upload_gen)
473
                if missing:
474
                    if num_of_blocks == len(missing):
475
                        retries -= 1
476
                    else:
477
                        num_of_blocks = len(missing)
478
                else:
479
                    break
480
            if missing:
481
                try:
482
                    details = ['%s' % thread.exception for thread in missing]
483
                except Exception:
484
                    details = ['Also, failed to read thread exceptions']
485
                raise ClientError(
486
                    '%s blocks failed to upload' % len(missing),
487
                    details=details)
488
        except KeyboardInterrupt:
489
            sendlog.info('- - - wait for threads to finish')
490
            for thread in activethreads():
491
                thread.join()
492
            raise
493

    
494
        r = self.object_put(
495
            obj,
496
            format='json',
497
            hashmap=True,
498
            content_type=content_type,
499
            content_encoding=content_encoding,
500
            if_etag_match=if_etag_match,
501
            if_etag_not_match='*' if if_not_exist else None,
502
            etag=etag,
503
            json=hashmap,
504
            permissions=sharing,
505
            public=public,
506
            success=201)
507
        return r.headers
508

    
509
    def upload_from_string(
510
            self, obj, input_str,
511
            hash_cb=None,
512
            upload_cb=None,
513
            etag=None,
514
            if_etag_match=None,
515
            if_not_exist=None,
516
            content_encoding=None,
517
            content_disposition=None,
518
            content_type=None,
519
            sharing=None,
520
            public=None,
521
            container_info_cache=None):
522
        """Upload an object using multiple connections (threads)
523

524
        :param obj: (str) remote object path
525

526
        :param input_str: (str) upload content
527

528
        :param hash_cb: optional progress.bar object for calculating hashes
529

530
        :param upload_cb: optional progress.bar object for uploading
531

532
        :param etag: (str)
533

534
        :param if_etag_match: (str) Push that value to if-match header at file
535
            creation
536

537
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
538
            it does not exist remotely, otherwise the operation will fail.
539
            Involves the case of an object with the same path is created while
540
            the object is being uploaded.
541

542
        :param content_encoding: (str)
543

544
        :param content_disposition: (str)
545

546
        :param content_type: (str)
547

548
        :param sharing: {'read':[user and/or grp names],
549
            'write':[usr and/or grp names]}
550

551
        :param public: (bool)
552

553
        :param container_info_cache: (dict) if given, avoid redundant calls to
554
            server for container info (block size and hash information)
555
        """
556
        self._assert_container()
557

    
558
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
559
                fileobj=None, size=len(input_str), cache=container_info_cache)
560
        (hashes, hmap, offset) = ([], {}, 0)
561
        if not content_type:
562
            content_type = 'application/octet-stream'
563

    
564
        hashes = []
565
        hmap = {}
566
        for blockid in range(nblocks):
567
            start = blockid * blocksize
568
            block = input_str[start: (start + blocksize)]
569
            hashes.append(_pithos_hash(block, blockhash))
570
            hmap[hashes[blockid]] = (start, block)
571

    
572
        hashmap = dict(bytes=size, hashes=hashes)
573
        missing, obj_headers = self._create_object_or_get_missing_hashes(
574
            obj, hashmap,
575
            content_type=content_type,
576
            size=size,
577
            if_etag_match=if_etag_match,
578
            if_etag_not_match='*' if if_not_exist else None,
579
            content_encoding=content_encoding,
580
            content_disposition=content_disposition,
581
            permissions=sharing,
582
            public=public)
583
        if missing is None:
584
            return obj_headers
585
        num_of_missing = len(missing)
586

    
587
        if upload_cb:
588
            self.progress_bar_gen = upload_cb(nblocks)
589
            for i in range(nblocks + 1 - num_of_missing):
590
                self._cb_next()
591

    
592
        tries = 7
593
        old_failures = 0
594
        try:
595
            while tries and missing:
596
                flying = []
597
                failures = []
598
                for hash in missing:
599
                    offset, block = hmap[hash]
600
                    bird = self._put_block_async(block, hash)
601
                    flying.append(bird)
602
                    unfinished = self._watch_thread_limit(flying)
603
                    for thread in set(flying).difference(unfinished):
604
                        if thread.exception:
605
                            failures.append(thread.kwargs['hash'])
606
                        if thread.isAlive():
607
                            flying.append(thread)
608
                        else:
609
                            self._cb_next()
610
                    flying = unfinished
611
                for thread in flying:
612
                    thread.join()
613
                    if thread.exception:
614
                        failures.append(thread.kwargs['hash'])
615
                    self._cb_next()
616
                missing = failures
617
                if missing and len(missing) == old_failures:
618
                    tries -= 1
619
                old_failures = len(missing)
620
            if missing:
621
                raise ClientError(
622
                    '%s blocks failed to upload' % len(missing),
623
                    details=['%s' % thread.exception for thread in missing])
624
        except KeyboardInterrupt:
625
            sendlog.info('- - - wait for threads to finish')
626
            for thread in activethreads():
627
                thread.join()
628
            raise
629

    
630
        r = self.object_put(
631
            obj,
632
            format='json',
633
            hashmap=True,
634
            content_type=content_type,
635
            content_encoding=content_encoding,
636
            if_etag_match=if_etag_match,
637
            if_etag_not_match='*' if if_not_exist else None,
638
            etag=etag,
639
            json=hashmap,
640
            permissions=sharing,
641
            public=public,
642
            success=201)
643
        return r.headers
644

    
645
    # download_* auxiliary methods
646
    def _get_remote_blocks_info(self, obj, **restargs):
647
        #retrieve object hashmap
648
        myrange = restargs.pop('data_range', None)
649
        hashmap = self.get_object_hashmap(obj, **restargs)
650
        restargs['data_range'] = myrange
651
        blocksize = int(hashmap['block_size'])
652
        blockhash = hashmap['block_hash']
653
        total_size = hashmap['bytes']
654
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
655
        map_dict = {}
656
        for i, h in enumerate(hashmap['hashes']):
657
            #  map_dict[h] = i   CHAGE
658
            if h in map_dict:
659
                map_dict[h].append(i)
660
            else:
661
                map_dict[h] = [i]
662
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
663

    
664
    def _dump_blocks_sync(
665
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
666
            **args):
667
        if not total_size:
668
            return
669
        for blockid, blockhash in enumerate(remote_hashes):
670
            if blockhash:
671
                start = blocksize * blockid
672
                is_last = start + blocksize > total_size
673
                end = (total_size - 1) if is_last else (start + blocksize - 1)
674
                data_range = _range_up(start, end, total_size, crange)
675
                if not data_range:
676
                    self._cb_next()
677
                    continue
678
                args['data_range'] = 'bytes=%s' % data_range
679
                r = self.object_get(obj, success=(200, 206), **args)
680
                self._cb_next()
681
                dst.write(r.content)
682
                dst.flush()
683

    
684
    def _get_block_async(self, obj, **args):
685
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
686
        event.start()
687
        return event
688

    
689
    def _hash_from_file(self, fp, start, size, blockhash):
690
        fp.seek(start)
691
        block = readall(fp, size)
692
        h = newhashlib(blockhash)
693
        h.update(block.strip('\x00'))
694
        return hexlify(h.digest())
695

    
696
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
697
        """write the results of a greenleted rest call to a file
698

699
        :param offset: the offset of the file up to blocksize
700
        - e.g. if the range is 10-100, all blocks will be written to
701
        normal_position - 10
702
        """
703
        for key, g in flying.items():
704
            if g.isAlive():
705
                continue
706
            if g.exception:
707
                raise g.exception
708
            block = g.value.content
709
            for block_start in blockids[key]:
710
                local_file.seek(block_start + offset)
711
                local_file.write(block)
712
                self._cb_next()
713
            flying.pop(key)
714
            blockids.pop(key)
715
        local_file.flush()
716

    
717
    def _dump_blocks_async(
718
            self, obj, remote_hashes, blocksize, total_size, local_file,
719
            blockhash=None, resume=False, filerange=None, **restargs):
720
        file_size = fstat(local_file.fileno()).st_size if resume else 0
721
        flying = dict()
722
        blockid_dict = dict()
723
        offset = 0
724

    
725
        self._init_thread_limit()
726
        for block_hash, blockids in remote_hashes.items():
727
            blockids = [blk * blocksize for blk in blockids]
728
            unsaved = [blk for blk in blockids if not (
729
                blk < file_size and block_hash == self._hash_from_file(
730
                        local_file, blk, blocksize, blockhash))]
731
            self._cb_next(len(blockids) - len(unsaved))
732
            if unsaved:
733
                key = unsaved[0]
734
                self._watch_thread_limit(flying.values())
735
                self._thread2file(
736
                    flying, blockid_dict, local_file, offset,
737
                    **restargs)
738
                end = total_size - 1 if (
739
                    key + blocksize > total_size) else key + blocksize - 1
740
                if end < key:
741
                    self._cb_next()
742
                    continue
743
                data_range = _range_up(key, end, total_size, filerange)
744
                if not data_range:
745
                    self._cb_next()
746
                    continue
747
                restargs[
748
                    'async_headers'] = {'Range': 'bytes=%s' % data_range}
749
                flying[key] = self._get_block_async(obj, **restargs)
750
                blockid_dict[key] = unsaved
751

    
752
        for thread in flying.values():
753
            thread.join()
754
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
755

    
756
    def download_object(
757
            self, obj, dst,
758
            download_cb=None,
759
            version=None,
760
            resume=False,
761
            range_str=None,
762
            if_match=None,
763
            if_none_match=None,
764
            if_modified_since=None,
765
            if_unmodified_since=None):
766
        """Download an object (multiple connections, random blocks)
767

768
        :param obj: (str) remote object path
769

770
        :param dst: open file descriptor (wb+)
771

772
        :param download_cb: optional progress.bar object for downloading
773

774
        :param version: (str) file version
775

776
        :param resume: (bool) if set, preserve already downloaded file parts
777

778
        :param range_str: (str) from, to are file positions (int) in bytes
779

780
        :param if_match: (str)
781

782
        :param if_none_match: (str)
783

784
        :param if_modified_since: (str) formated date
785

786
        :param if_unmodified_since: (str) formated date"""
787
        restargs = dict(
788
            version=version,
789
            data_range=None if range_str is None else 'bytes=%s' % range_str,
790
            if_match=if_match,
791
            if_none_match=if_none_match,
792
            if_modified_since=if_modified_since,
793
            if_unmodified_since=if_unmodified_since)
794

    
795
        (
796
            blocksize,
797
            blockhash,
798
            total_size,
799
            hash_list,
800
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
801
        assert total_size >= 0
802

    
803
        if download_cb:
804
            self.progress_bar_gen = download_cb(len(hash_list))
805
            self._cb_next()
806

    
807
        if dst.isatty():
808
            self._dump_blocks_sync(
809
                obj,
810
                hash_list,
811
                blocksize,
812
                total_size,
813
                dst,
814
                range_str,
815
                **restargs)
816
        else:
817
            self._dump_blocks_async(
818
                obj,
819
                remote_hashes,
820
                blocksize,
821
                total_size,
822
                dst,
823
                blockhash,
824
                resume,
825
                range_str,
826
                **restargs)
827
            if not range_str:
828
                dst.truncate(total_size)
829

    
830
        self._complete_cb()
831

    
832
    def download_to_string(
833
            self, obj,
834
            download_cb=None,
835
            version=None,
836
            range_str=None,
837
            if_match=None,
838
            if_none_match=None,
839
            if_modified_since=None,
840
            if_unmodified_since=None):
841
        """Download an object to a string (multiple connections). This method
842
        uses threads for http requests, but stores all content in memory.
843

844
        :param obj: (str) remote object path
845

846
        :param download_cb: optional progress.bar object for downloading
847

848
        :param version: (str) file version
849

850
        :param range_str: (str) from, to are file positions (int) in bytes
851

852
        :param if_match: (str)
853

854
        :param if_none_match: (str)
855

856
        :param if_modified_since: (str) formated date
857

858
        :param if_unmodified_since: (str) formated date
859

860
        :returns: (str) the whole object contents
861
        """
862
        restargs = dict(
863
            version=version,
864
            data_range=None if range_str is None else 'bytes=%s' % range_str,
865
            if_match=if_match,
866
            if_none_match=if_none_match,
867
            if_modified_since=if_modified_since,
868
            if_unmodified_since=if_unmodified_since)
869

    
870
        (
871
            blocksize,
872
            blockhash,
873
            total_size,
874
            hash_list,
875
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
876
        assert total_size >= 0
877

    
878
        if download_cb:
879
            self.progress_bar_gen = download_cb(len(hash_list))
880
            self._cb_next()
881

    
882
        num_of_blocks = len(remote_hashes)
883
        ret = [''] * num_of_blocks
884
        self._init_thread_limit()
885
        flying = dict()
886
        try:
887
            for blockid, blockhash in enumerate(remote_hashes):
888
                start = blocksize * blockid
889
                is_last = start + blocksize > total_size
890
                end = (total_size - 1) if is_last else (start + blocksize - 1)
891
                data_range_str = _range_up(start, end, end, range_str)
892
                if data_range_str:
893
                    self._watch_thread_limit(flying.values())
894
                    restargs['data_range'] = 'bytes=%s' % data_range_str
895
                    flying[blockid] = self._get_block_async(obj, **restargs)
896
                for runid, thread in flying.items():
897
                    if (blockid + 1) == num_of_blocks:
898
                        thread.join()
899
                    elif thread.isAlive():
900
                        continue
901
                    if thread.exception:
902
                        raise thread.exception
903
                    ret[runid] = thread.value.content
904
                    self._cb_next()
905
                    flying.pop(runid)
906
            return ''.join(ret)
907
        except KeyboardInterrupt:
908
            sendlog.info('- - - wait for threads to finish')
909
            for thread in activethreads():
910
                thread.join()
911

    
912
    #Command Progress Bar method
913
    def _cb_next(self, step=1):
914
        if hasattr(self, 'progress_bar_gen'):
915
            try:
916
                for i in xrange(step):
917
                    self.progress_bar_gen.next()
918
            except:
919
                pass
920

    
921
    def _complete_cb(self):
922
        while True:
923
            try:
924
                self.progress_bar_gen.next()
925
            except:
926
                break
927

    
928
    def get_object_hashmap(
929
            self, obj,
930
            version=None,
931
            if_match=None,
932
            if_none_match=None,
933
            if_modified_since=None,
934
            if_unmodified_since=None):
935
        """
936
        :param obj: (str) remote object path
937

938
        :param if_match: (str)
939

940
        :param if_none_match: (str)
941

942
        :param if_modified_since: (str) formated date
943

944
        :param if_unmodified_since: (str) formated date
945

946
        :returns: (list)
947
        """
948
        try:
949
            r = self.object_get(
950
                obj,
951
                hashmap=True,
952
                version=version,
953
                if_etag_match=if_match,
954
                if_etag_not_match=if_none_match,
955
                if_modified_since=if_modified_since,
956
                if_unmodified_since=if_unmodified_since)
957
        except ClientError as err:
958
            if err.status == 304 or err.status == 412:
959
                return {}
960
            raise
961
        return r.json
962

    
963
    def set_account_group(self, group, usernames):
964
        """
965
        :param group: (str)
966

967
        :param usernames: (list)
968
        """
969
        r = self.account_post(update=True, groups={group: usernames})
970
        return r
971

    
972
    def del_account_group(self, group):
973
        """
974
        :param group: (str)
975
        """
976
        self.account_post(update=True, groups={group: []})
977

    
978
    def get_account_info(self, until=None):
979
        """
980
        :param until: (str) formated date
981

982
        :returns: (dict)
983
        """
984
        r = self.account_head(until=until)
985
        if r.status_code == 401:
986
            raise ClientError("No authorization", status=401)
987
        return r.headers
988

    
989
    def get_account_quota(self):
990
        """
991
        :returns: (dict)
992
        """
993
        return filter_in(
994
            self.get_account_info(),
995
            'X-Account-Policy-Quota',
996
            exactMatch=True)
997

    
998
    #def get_account_versioning(self):
999
    #    """
1000
    #    :returns: (dict)
1001
    #    """
1002
    #    return filter_in(
1003
    #        self.get_account_info(),
1004
    #        'X-Account-Policy-Versioning',
1005
    #        exactMatch=True)
1006

    
1007
    def get_account_meta(self, until=None):
1008
        """
1009
        :param until: (str) formated date
1010

1011
        :returns: (dict)
1012
        """
1013
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1014

    
1015
    def get_account_group(self):
1016
        """
1017
        :returns: (dict)
1018
        """
1019
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1020

    
1021
    def set_account_meta(self, metapairs):
1022
        """
1023
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1024
        """
1025
        assert(type(metapairs) is dict)
1026
        r = self.account_post(update=True, metadata=metapairs)
1027
        return r.headers
1028

    
1029
    def del_account_meta(self, metakey):
1030
        """
1031
        :param metakey: (str) metadatum key
1032
        """
1033
        r = self.account_post(update=True, metadata={metakey: ''})
1034
        return r.headers
1035

    
1036
    #def set_account_quota(self, quota):
1037
    #    """
1038
    #    :param quota: (int)
1039
    #    """
1040
    #    self.account_post(update=True, quota=quota)
1041

    
1042
    #def set_account_versioning(self, versioning):
1043
    #    """
1044
    #    :param versioning: (str)
1045
    #    """
1046
    #    r = self.account_post(update=True, versioning=versioning)
1047
    #    return r.headers
1048

    
1049
    def list_containers(self):
1050
        """
1051
        :returns: (dict)
1052
        """
1053
        r = self.account_get()
1054
        return r.json
1055

    
1056
    def del_container(self, until=None, delimiter=None):
1057
        """
1058
        :param until: (str) formated date
1059

1060
        :param delimiter: (str) with / empty container
1061

1062
        :raises ClientError: 404 Container does not exist
1063

1064
        :raises ClientError: 409 Container is not empty
1065
        """
1066
        self._assert_container()
1067
        r = self.container_delete(
1068
            until=until,
1069
            delimiter=delimiter,
1070
            success=(204, 404, 409))
1071
        if r.status_code == 404:
1072
            raise ClientError(
1073
                'Container "%s" does not exist' % self.container,
1074
                r.status_code)
1075
        elif r.status_code == 409:
1076
            raise ClientError(
1077
                'Container "%s" is not empty' % self.container,
1078
                r.status_code)
1079
        return r.headers
1080

    
1081
    def get_container_versioning(self, container=None):
1082
        """
1083
        :param container: (str)
1084

1085
        :returns: (dict)
1086
        """
1087
        cnt_back_up = self.container
1088
        try:
1089
            self.container = container or cnt_back_up
1090
            return filter_in(
1091
                self.get_container_info(),
1092
                'X-Container-Policy-Versioning')
1093
        finally:
1094
            self.container = cnt_back_up
1095

    
1096
    def get_container_limit(self, container=None):
1097
        """
1098
        :param container: (str)
1099

1100
        :returns: (dict)
1101
        """
1102
        cnt_back_up = self.container
1103
        try:
1104
            self.container = container or cnt_back_up
1105
            return filter_in(
1106
                self.get_container_info(),
1107
                'X-Container-Policy-Quota')
1108
        finally:
1109
            self.container = cnt_back_up
1110

    
1111
    def get_container_info(self, until=None):
1112
        """
1113
        :param until: (str) formated date
1114

1115
        :returns: (dict)
1116

1117
        :raises ClientError: 404 Container not found
1118
        """
1119
        try:
1120
            r = self.container_head(until=until)
1121
        except ClientError as err:
1122
            err.details.append('for container %s' % self.container)
1123
            raise err
1124
        return r.headers
1125

    
1126
    def get_container_meta(self, until=None):
1127
        """
1128
        :param until: (str) formated date
1129

1130
        :returns: (dict)
1131
        """
1132
        return filter_in(
1133
            self.get_container_info(until=until),
1134
            'X-Container-Meta')
1135

    
1136
    def get_container_object_meta(self, until=None):
1137
        """
1138
        :param until: (str) formated date
1139

1140
        :returns: (dict)
1141
        """
1142
        return filter_in(
1143
            self.get_container_info(until=until),
1144
            'X-Container-Object-Meta')
1145

    
1146
    def set_container_meta(self, metapairs):
1147
        """
1148
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1149
        """
1150
        assert(type(metapairs) is dict)
1151
        r = self.container_post(update=True, metadata=metapairs)
1152
        return r.headers
1153

    
1154
    def del_container_meta(self, metakey):
1155
        """
1156
        :param metakey: (str) metadatum key
1157

1158
        :returns: (dict) response headers
1159
        """
1160
        r = self.container_post(update=True, metadata={metakey: ''})
1161
        return r.headers
1162

    
1163
    def set_container_limit(self, limit):
1164
        """
1165
        :param limit: (int)
1166
        """
1167
        r = self.container_post(update=True, quota=limit)
1168
        return r.headers
1169

    
1170
    def set_container_versioning(self, versioning):
1171
        """
1172
        :param versioning: (str)
1173
        """
1174
        r = self.container_post(update=True, versioning=versioning)
1175
        return r.headers
1176

    
1177
    def del_object(self, obj, until=None, delimiter=None):
1178
        """
1179
        :param obj: (str) remote object path
1180

1181
        :param until: (str) formated date
1182

1183
        :param delimiter: (str)
1184
        """
1185
        self._assert_container()
1186
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1187
        return r.headers
1188

    
1189
    def set_object_meta(self, obj, metapairs):
1190
        """
1191
        :param obj: (str) remote object path
1192

1193
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1194
        """
1195
        assert(type(metapairs) is dict)
1196
        r = self.object_post(obj, update=True, metadata=metapairs)
1197
        return r.headers
1198

    
1199
    def del_object_meta(self, obj, metakey):
1200
        """
1201
        :param obj: (str) remote object path
1202

1203
        :param metakey: (str) metadatum key
1204
        """
1205
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1206
        return r.headers
1207

    
1208
    def publish_object(self, obj):
1209
        """
1210
        :param obj: (str) remote object path
1211

1212
        :returns: (str) access url
1213
        """
1214
        self.object_post(obj, update=True, public=True)
1215
        info = self.get_object_info(obj)
1216
        return info['x-object-public']
1217
        pref, sep, rest = self.base_url.partition('//')
1218
        base = rest.split('/')[0]
1219
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1220

    
1221
    def unpublish_object(self, obj):
1222
        """
1223
        :param obj: (str) remote object path
1224
        """
1225
        r = self.object_post(obj, update=True, public=False)
1226
        return r.headers
1227

    
1228
    def get_object_info(self, obj, version=None):
1229
        """
1230
        :param obj: (str) remote object path
1231

1232
        :param version: (str)
1233

1234
        :returns: (dict)
1235
        """
1236
        try:
1237
            r = self.object_head(obj, version=version)
1238
            return r.headers
1239
        except ClientError as ce:
1240
            if ce.status == 404:
1241
                raise ClientError('Object %s not found' % obj, status=404)
1242
            raise
1243

    
1244
    def get_object_meta(self, obj, version=None):
1245
        """
1246
        :param obj: (str) remote object path
1247

1248
        :param version: (str)
1249

1250
        :returns: (dict)
1251
        """
1252
        return filter_in(
1253
            self.get_object_info(obj, version=version),
1254
            'X-Object-Meta')
1255

    
1256
    def get_object_sharing(self, obj):
1257
        """
1258
        :param obj: (str) remote object path
1259

1260
        :returns: (dict)
1261
        """
1262
        r = filter_in(
1263
            self.get_object_info(obj),
1264
            'X-Object-Sharing',
1265
            exactMatch=True)
1266
        reply = {}
1267
        if len(r) > 0:
1268
            perms = r['x-object-sharing'].split(';')
1269
            for perm in perms:
1270
                try:
1271
                    perm.index('=')
1272
                except ValueError:
1273
                    raise ClientError('Incorrect reply format')
1274
                (key, val) = perm.strip().split('=')
1275
                reply[key] = val
1276
        return reply
1277

    
1278
    def set_object_sharing(
1279
            self, obj,
1280
            read_permission=False, write_permission=False):
1281
        """Give read/write permisions to an object.
1282

1283
        :param obj: (str) remote object path
1284

1285
        :param read_permission: (list - bool) users and user groups that get
1286
            read permission for this object - False means all previous read
1287
            permissions will be removed
1288

1289
        :param write_permission: (list - bool) of users and user groups to get
1290
           write permission for this object - False means all previous write
1291
           permissions will be removed
1292

1293
        :returns: (dict) response headers
1294
        """
1295

    
1296
        perms = dict(read=read_permission or '', write=write_permission or '')
1297
        r = self.object_post(obj, update=True, permissions=perms)
1298
        return r.headers
1299

    
1300
    def del_object_sharing(self, obj):
1301
        """
1302
        :param obj: (str) remote object path
1303
        """
1304
        return self.set_object_sharing(obj)
1305

    
1306
    def append_object(self, obj, source_file, upload_cb=None):
1307
        """
1308
        :param obj: (str) remote object path
1309

1310
        :param source_file: open file descriptor
1311

1312
        :param upload_db: progress.bar for uploading
1313
        """
1314
        self._assert_container()
1315
        meta = self.get_container_info()
1316
        blocksize = int(meta['x-container-block-size'])
1317
        filesize = fstat(source_file.fileno()).st_size
1318
        nblocks = 1 + (filesize - 1) // blocksize
1319
        offset = 0
1320
        headers = {}
1321
        if upload_cb:
1322
            self.progress_bar_gen = upload_cb(nblocks)
1323
            self._cb_next()
1324
        flying = {}
1325
        self._init_thread_limit()
1326
        try:
1327
            for i in range(nblocks):
1328
                block = source_file.read(min(blocksize, filesize - offset))
1329
                offset += len(block)
1330

    
1331
                self._watch_thread_limit(flying.values())
1332
                unfinished = {}
1333
                flying[i] = SilentEvent(
1334
                    method=self.object_post,
1335
                    obj=obj,
1336
                    update=True,
1337
                    content_range='bytes */*',
1338
                    content_type='application/octet-stream',
1339
                    content_length=len(block),
1340
                    data=block)
1341
                flying[i].start()
1342

    
1343
                for key, thread in flying.items():
1344
                    if thread.isAlive():
1345
                        if i < nblocks:
1346
                            unfinished[key] = thread
1347
                            continue
1348
                        thread.join()
1349
                    if thread.exception:
1350
                        raise thread.exception
1351
                    headers[key] = thread.value.headers
1352
                    self._cb_next()
1353
                flying = unfinished
1354
        except KeyboardInterrupt:
1355
            sendlog.info('- - - wait for threads to finish')
1356
            for thread in activethreads():
1357
                thread.join()
1358
        finally:
1359
            from time import sleep
1360
            sleep(2 * len(activethreads()))
1361
        return headers.values()
1362

    
1363
    def truncate_object(self, obj, upto_bytes):
1364
        """
1365
        :param obj: (str) remote object path
1366

1367
        :param upto_bytes: max number of bytes to leave on file
1368

1369
        :returns: (dict) response headers
1370
        """
1371
        r = self.object_post(
1372
            obj,
1373
            update=True,
1374
            content_range='bytes 0-%s/*' % upto_bytes,
1375
            content_type='application/octet-stream',
1376
            object_bytes=upto_bytes,
1377
            source_object=path4url(self.container, obj))
1378
        return r.headers
1379

    
1380
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1381
        """Overwrite a part of an object from local source file
1382

1383
        :param obj: (str) remote object path
1384

1385
        :param start: (int) position in bytes to start overwriting from
1386

1387
        :param end: (int) position in bytes to stop overwriting at
1388

1389
        :param source_file: open file descriptor
1390

1391
        :param upload_db: progress.bar for uploading
1392
        """
1393

    
1394
        r = self.get_object_info(obj)
1395
        rf_size = int(r['content-length'])
1396
        if rf_size < int(start):
1397
            raise ClientError(
1398
                'Range start exceeds file size',
1399
                status=416)
1400
        elif rf_size < int(end):
1401
            raise ClientError(
1402
                'Range end exceeds file size',
1403
                status=416)
1404
        self._assert_container()
1405
        meta = self.get_container_info()
1406
        blocksize = int(meta['x-container-block-size'])
1407
        filesize = fstat(source_file.fileno()).st_size
1408
        datasize = int(end) - int(start) + 1
1409
        nblocks = 1 + (datasize - 1) // blocksize
1410
        offset = 0
1411
        if upload_cb:
1412
            self.progress_bar_gen = upload_cb(nblocks)
1413
            self._cb_next()
1414
        headers = []
1415
        for i in range(nblocks):
1416
            read_size = min(blocksize, filesize - offset, datasize - offset)
1417
            block = source_file.read(read_size)
1418
            r = self.object_post(
1419
                obj,
1420
                update=True,
1421
                content_type='application/octet-stream',
1422
                content_length=len(block),
1423
                content_range='bytes %s-%s/*' % (
1424
                    start + offset,
1425
                    start + offset + len(block) - 1),
1426
                data=block)
1427
            headers.append(dict(r.headers))
1428
            offset += len(block)
1429

    
1430
            self._cb_next
1431
        return headers
1432

    
1433
    def copy_object(
1434
            self, src_container, src_object, dst_container,
1435
            dst_object=None,
1436
            source_version=None,
1437
            source_account=None,
1438
            public=False,
1439
            content_type=None,
1440
            delimiter=None):
1441
        """
1442
        :param src_container: (str) source container
1443

1444
        :param src_object: (str) source object path
1445

1446
        :param dst_container: (str) destination container
1447

1448
        :param dst_object: (str) destination object path
1449

1450
        :param source_version: (str) source object version
1451

1452
        :param source_account: (str) account to copy from
1453

1454
        :param public: (bool)
1455

1456
        :param content_type: (str)
1457

1458
        :param delimiter: (str)
1459

1460
        :returns: (dict) response headers
1461
        """
1462
        self._assert_account()
1463
        self.container = dst_container
1464
        src_path = path4url(src_container, src_object)
1465
        r = self.object_put(
1466
            dst_object or src_object,
1467
            success=201,
1468
            copy_from=src_path,
1469
            content_length=0,
1470
            source_version=source_version,
1471
            source_account=source_account,
1472
            public=public,
1473
            content_type=content_type,
1474
            delimiter=delimiter)
1475
        return r.headers
1476

    
1477
    def move_object(
1478
            self, src_container, src_object, dst_container,
1479
            dst_object=False,
1480
            source_account=None,
1481
            source_version=None,
1482
            public=False,
1483
            content_type=None,
1484
            delimiter=None):
1485
        """
1486
        :param src_container: (str) source container
1487

1488
        :param src_object: (str) source object path
1489

1490
        :param dst_container: (str) destination container
1491

1492
        :param dst_object: (str) destination object path
1493

1494
        :param source_account: (str) account to move from
1495

1496
        :param source_version: (str) source object version
1497

1498
        :param public: (bool)
1499

1500
        :param content_type: (str)
1501

1502
        :param delimiter: (str)
1503

1504
        :returns: (dict) response headers
1505
        """
1506
        self._assert_account()
1507
        self.container = dst_container
1508
        dst_object = dst_object or src_object
1509
        src_path = path4url(src_container, src_object)
1510
        r = self.object_put(
1511
            dst_object,
1512
            success=201,
1513
            move_from=src_path,
1514
            content_length=0,
1515
            source_account=source_account,
1516
            source_version=source_version,
1517
            public=public,
1518
            content_type=content_type,
1519
            delimiter=delimiter)
1520
        return r.headers
1521

    
1522
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1523
        """Get accounts that share with self.account
1524

1525
        :param limit: (str)
1526

1527
        :param marker: (str)
1528

1529
        :returns: (dict)
1530
        """
1531
        self._assert_account()
1532

    
1533
        self.set_param('format', 'json')
1534
        self.set_param('limit', limit, iff=limit is not None)
1535
        self.set_param('marker', marker, iff=marker is not None)
1536

    
1537
        path = ''
1538
        success = kwargs.pop('success', (200, 204))
1539
        r = self.get(path, *args, success=success, **kwargs)
1540
        return r.json
1541

    
1542
    def get_object_versionlist(self, obj):
1543
        """
1544
        :param obj: (str) remote object path
1545

1546
        :returns: (list)
1547
        """
1548
        self._assert_container()
1549
        r = self.object_get(obj, format='json', version='list')
1550
        return r.json['versions']