Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ edc1182f

History | View | Annotate | Download (49 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in, readall
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up was called with start < 0'
70
    assert end >= start, '_range_up was called with end < start'
71
    assert end <= max_value, '_range_up was called with max_value < end'
72
    if not a_range:
73
        return '%s-%s' % (start, end)
74
    selected = []
75
    for some_range in a_range.split(','):
76
        v0, sep, v1 = some_range.partition('-')
77
        if v0:
78
            v0 = int(v0)
79
            if sep:
80
                v1 = int(v1)
81
                if v1 < start or v0 > end or v1 < v0:
82
                    continue
83
                v0 = v0 if v0 > start else start
84
                v1 = v1 if v1 < end else end
85
                selected.append('%s-%s' % (v0, v1))
86
            elif v0 < start:
87
                continue
88
            else:
89
                v1 = v0 if v0 <= end else end
90
                selected.append('%s-%s' % (start, v1))
91
        else:
92
            v1 = int(v1)
93
            if max_value - v1 > end:
94
                continue
95
            v0 = (max_value - v1) if max_value - v1 > start else start
96
            selected.append('%s-%s' % (v0, end))
97
    return ','.join(selected)
98

    
99

    
100
class PithosClient(PithosRestClient):
101
    """Synnefo Pithos+ API client"""
102

    
103
    def __init__(self, base_url, token, account=None, container=None):
104
        super(PithosClient, self).__init__(base_url, token, account, container)
105

    
106
    def create_container(
107
            self,
108
            container=None, sizelimit=None, versioning=None, metadata=None,
109
            **kwargs):
110
        """
111
        :param container: (str) if not given, self.container is used instead
112

113
        :param sizelimit: (int) container total size limit in bytes
114

115
        :param versioning: (str) can be auto or whatever supported by server
116

117
        :param metadata: (dict) Custom user-defined metadata of the form
118
            { 'name1': 'value1', 'name2': 'value2', ... }
119

120
        :returns: (dict) response headers
121
        """
122
        cnt_back_up = self.container
123
        try:
124
            self.container = container or cnt_back_up
125
            r = self.container_put(
126
                quota=sizelimit, versioning=versioning, metadata=metadata,
127
                **kwargs)
128
            return r.headers
129
        finally:
130
            self.container = cnt_back_up
131

    
132
    def purge_container(self, container=None):
133
        """Delete an empty container and destroy associated blocks
134
        """
135
        cnt_back_up = self.container
136
        try:
137
            self.container = container or cnt_back_up
138
            r = self.container_delete(until=unicode(time()))
139
        finally:
140
            self.container = cnt_back_up
141
        return r.headers
142

    
143
    def upload_object_unchunked(
144
            self, obj, f,
145
            withHashFile=False,
146
            size=None,
147
            etag=None,
148
            content_encoding=None,
149
            content_disposition=None,
150
            content_type=None,
151
            sharing=None,
152
            public=None):
153
        """
154
        :param obj: (str) remote object path
155

156
        :param f: open file descriptor
157

158
        :param withHashFile: (bool)
159

160
        :param size: (int) size of data to upload
161

162
        :param etag: (str)
163

164
        :param content_encoding: (str)
165

166
        :param content_disposition: (str)
167

168
        :param content_type: (str)
169

170
        :param sharing: {'read':[user and/or grp names],
171
            'write':[usr and/or grp names]}
172

173
        :param public: (bool)
174

175
        :returns: (dict) created object metadata
176
        """
177
        self._assert_container()
178

    
179
        if withHashFile:
180
            data = f.read()
181
            try:
182
                import json
183
                data = json.dumps(json.loads(data))
184
            except ValueError:
185
                raise ClientError('"%s" is not json-formated' % f.name, 1)
186
            except SyntaxError:
187
                msg = '"%s" is not a valid hashmap file' % f.name
188
                raise ClientError(msg, 1)
189
            f = StringIO(data)
190
        else:
191
            data = readall(f, size) if size else f.read()
192
        r = self.object_put(
193
            obj,
194
            data=data,
195
            etag=etag,
196
            content_encoding=content_encoding,
197
            content_disposition=content_disposition,
198
            content_type=content_type,
199
            permissions=sharing,
200
            public=public,
201
            success=201)
202
        return r.headers
203

    
204
    def create_object_by_manifestation(
205
            self, obj,
206
            etag=None,
207
            content_encoding=None,
208
            content_disposition=None,
209
            content_type=None,
210
            sharing=None,
211
            public=None):
212
        """
213
        :param obj: (str) remote object path
214

215
        :param etag: (str)
216

217
        :param content_encoding: (str)
218

219
        :param content_disposition: (str)
220

221
        :param content_type: (str)
222

223
        :param sharing: {'read':[user and/or grp names],
224
            'write':[usr and/or grp names]}
225

226
        :param public: (bool)
227

228
        :returns: (dict) created object metadata
229
        """
230
        self._assert_container()
231
        r = self.object_put(
232
            obj,
233
            content_length=0,
234
            etag=etag,
235
            content_encoding=content_encoding,
236
            content_disposition=content_disposition,
237
            content_type=content_type,
238
            permissions=sharing,
239
            public=public,
240
            manifest='%s/%s' % (self.container, obj))
241
        return r.headers
242

    
243
    # upload_* auxiliary methods
244
    def _put_block_async(self, data, hash):
245
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
246
        event.start()
247
        return event
248

    
249
    def _put_block(self, data, hash):
250
        r = self.container_post(
251
            update=True,
252
            content_type='application/octet-stream',
253
            content_length=len(data),
254
            data=data,
255
            format='json')
256
        assert r.json[0] == hash, 'Local hash does not match server'
257

    
258
    def _get_file_block_info(self, fileobj, size=None, cache=None):
259
        """
260
        :param fileobj: (file descriptor) source
261

262
        :param size: (int) size of data to upload from source
263

264
        :param cache: (dict) if provided, cache container info response to
265
        avoid redundant calls
266
        """
267
        if isinstance(cache, dict):
268
            try:
269
                meta = cache[self.container]
270
            except KeyError:
271
                meta = self.get_container_info()
272
                cache[self.container] = meta
273
        else:
274
            meta = self.get_container_info()
275
        blocksize = int(meta['x-container-block-size'])
276
        blockhash = meta['x-container-block-hash']
277
        size = size if size is not None else fstat(fileobj.fileno()).st_size
278
        nblocks = 1 + (size - 1) // blocksize
279
        return (blocksize, blockhash, size, nblocks)
280

    
281
    def _create_object_or_get_missing_hashes(
282
            self, obj, json,
283
            size=None,
284
            format='json',
285
            hashmap=True,
286
            content_type=None,
287
            if_etag_match=None,
288
            if_etag_not_match=None,
289
            content_encoding=None,
290
            content_disposition=None,
291
            permissions=None,
292
            public=None,
293
            success=(201, 409)):
294
        r = self.object_put(
295
            obj,
296
            format='json',
297
            hashmap=True,
298
            content_type=content_type,
299
            json=json,
300
            if_etag_match=if_etag_match,
301
            if_etag_not_match=if_etag_not_match,
302
            content_encoding=content_encoding,
303
            content_disposition=content_disposition,
304
            permissions=permissions,
305
            public=public,
306
            success=success)
307
        return (None if r.status_code == 201 else r.json), r.headers
308

    
309
    def _calculate_blocks_for_upload(
310
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
311
            hash_cb=None):
312
        offset = 0
313
        if hash_cb:
314
            hash_gen = hash_cb(nblocks)
315
            hash_gen.next()
316

    
317
        for i in range(nblocks):
318
            block = readall(fileobj, min(blocksize, size - offset))
319
            bytes = len(block)
320
            hash = _pithos_hash(block, blockhash)
321
            hashes.append(hash)
322
            hmap[hash] = (offset, bytes)
323
            offset += bytes
324
            if hash_cb:
325
                hash_gen.next()
326
        msg = ('Failed to calculate uploaded blocks:'
327
               ' Offset and object size do not match')
328
        assert offset == size, msg
329

    
330
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
331
        """upload missing blocks asynchronously"""
332

    
333
        self._init_thread_limit()
334

    
335
        flying = []
336
        failures = []
337
        for hash in missing:
338
            offset, bytes = hmap[hash]
339
            fileobj.seek(offset)
340
            data = readall(fileobj, bytes)
341
            r = self._put_block_async(data, hash)
342
            flying.append(r)
343
            unfinished = self._watch_thread_limit(flying)
344
            for thread in set(flying).difference(unfinished):
345
                if thread.exception:
346
                    failures.append(thread)
347
                    if isinstance(
348
                            thread.exception,
349
                            ClientError) and thread.exception.status == 502:
350
                        self.POOLSIZE = self._thread_limit
351
                elif thread.isAlive():
352
                    flying.append(thread)
353
                elif upload_gen:
354
                    try:
355
                        upload_gen.next()
356
                    except:
357
                        pass
358
            flying = unfinished
359

    
360
        for thread in flying:
361
            thread.join()
362
            if thread.exception:
363
                failures.append(thread)
364
            elif upload_gen:
365
                try:
366
                    upload_gen.next()
367
                except:
368
                    pass
369

    
370
        return [failure.kwargs['hash'] for failure in failures]
371

    
372
    def upload_object(
373
            self, obj, f,
374
            size=None,
375
            hash_cb=None,
376
            upload_cb=None,
377
            etag=None,
378
            if_etag_match=None,
379
            if_not_exist=None,
380
            content_encoding=None,
381
            content_disposition=None,
382
            content_type=None,
383
            sharing=None,
384
            public=None,
385
            container_info_cache=None):
386
        """Upload an object using multiple connections (threads)
387

388
        :param obj: (str) remote object path
389

390
        :param f: open file descriptor (rb)
391

392
        :param hash_cb: optional progress.bar object for calculating hashes
393

394
        :param upload_cb: optional progress.bar object for uploading
395

396
        :param etag: (str)
397

398
        :param if_etag_match: (str) Push that value to if-match header at file
399
            creation
400

401
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
402
            it does not exist remotely, otherwise the operation will fail.
403
            Involves the case of an object with the same path is created while
404
            the object is being uploaded.
405

406
        :param content_encoding: (str)
407

408
        :param content_disposition: (str)
409

410
        :param content_type: (str)
411

412
        :param sharing: {'read':[user and/or grp names],
413
            'write':[usr and/or grp names]}
414

415
        :param public: (bool)
416

417
        :param container_info_cache: (dict) if given, avoid redundant calls to
418
            server for container info (block size and hash information)
419
        """
420
        self._assert_container()
421

    
422
        block_info = (
423
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
424
                f, size, container_info_cache)
425
        (hashes, hmap, offset) = ([], {}, 0)
426
        if not content_type:
427
            content_type = 'application/octet-stream'
428

    
429
        self._calculate_blocks_for_upload(
430
            *block_info,
431
            hashes=hashes,
432
            hmap=hmap,
433
            fileobj=f,
434
            hash_cb=hash_cb)
435

    
436
        hashmap = dict(bytes=size, hashes=hashes)
437
        missing, obj_headers = self._create_object_or_get_missing_hashes(
438
            obj, hashmap,
439
            content_type=content_type,
440
            size=size,
441
            if_etag_match=if_etag_match,
442
            if_etag_not_match='*' if if_not_exist else None,
443
            content_encoding=content_encoding,
444
            content_disposition=content_disposition,
445
            permissions=sharing,
446
            public=public)
447

    
448
        if missing is None:
449
            return obj_headers
450

    
451
        if upload_cb:
452
            upload_gen = upload_cb(len(missing))
453
            for i in range(len(missing), len(hashmap['hashes']) + 1):
454
                try:
455
                    upload_gen.next()
456
                except:
457
                    upload_gen = None
458
        else:
459
            upload_gen = None
460

    
461
        retries = 7
462
        try:
463
            while retries:
464
                sendlog.info('%s blocks missing' % len(missing))
465
                num_of_blocks = len(missing)
466
                missing = self._upload_missing_blocks(
467
                    missing,
468
                    hmap,
469
                    f,
470
                    upload_gen)
471
                if missing:
472
                    if num_of_blocks == len(missing):
473
                        retries -= 1
474
                    else:
475
                        num_of_blocks = len(missing)
476
                else:
477
                    break
478
            if missing:
479
                try:
480
                    details = ['%s' % thread.exception for thread in missing]
481
                except Exception:
482
                    details = ['Also, failed to read thread exceptions']
483
                raise ClientError(
484
                    '%s blocks failed to upload' % len(missing),
485
                    details=details)
486
        except KeyboardInterrupt:
487
            sendlog.info('- - - wait for threads to finish')
488
            for thread in activethreads():
489
                thread.join()
490
            raise
491

    
492
        r = self.object_put(
493
            obj,
494
            format='json',
495
            hashmap=True,
496
            content_type=content_type,
497
            content_encoding=content_encoding,
498
            if_etag_match=if_etag_match,
499
            if_etag_not_match='*' if if_not_exist else None,
500
            etag=etag,
501
            json=hashmap,
502
            permissions=sharing,
503
            public=public,
504
            success=201)
505
        return r.headers
506

    
507
    def upload_from_string(
508
            self, obj, input_str,
509
            hash_cb=None,
510
            upload_cb=None,
511
            etag=None,
512
            if_etag_match=None,
513
            if_not_exist=None,
514
            content_encoding=None,
515
            content_disposition=None,
516
            content_type=None,
517
            sharing=None,
518
            public=None,
519
            container_info_cache=None):
520
        """Upload an object using multiple connections (threads)
521

522
        :param obj: (str) remote object path
523

524
        :param input_str: (str) upload content
525

526
        :param hash_cb: optional progress.bar object for calculating hashes
527

528
        :param upload_cb: optional progress.bar object for uploading
529

530
        :param etag: (str)
531

532
        :param if_etag_match: (str) Push that value to if-match header at file
533
            creation
534

535
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
536
            it does not exist remotely, otherwise the operation will fail.
537
            Involves the case of an object with the same path is created while
538
            the object is being uploaded.
539

540
        :param content_encoding: (str)
541

542
        :param content_disposition: (str)
543

544
        :param content_type: (str)
545

546
        :param sharing: {'read':[user and/or grp names],
547
            'write':[usr and/or grp names]}
548

549
        :param public: (bool)
550

551
        :param container_info_cache: (dict) if given, avoid redundant calls to
552
            server for container info (block size and hash information)
553
        """
554
        self._assert_container()
555

    
556
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
557
                fileobj=None, size=len(input_str), cache=container_info_cache)
558
        (hashes, hmap, offset) = ([], {}, 0)
559
        if not content_type:
560
            content_type = 'application/octet-stream'
561

    
562
        hashes = []
563
        hmap = {}
564
        for blockid in range(nblocks):
565
            start = blockid * blocksize
566
            block = input_str[start: (start + blocksize)]
567
            hashes.append(_pithos_hash(block, blockhash))
568
            hmap[hashes[blockid]] = (start, block)
569

    
570
        hashmap = dict(bytes=size, hashes=hashes)
571
        missing, obj_headers = self._create_object_or_get_missing_hashes(
572
            obj, hashmap,
573
            content_type=content_type,
574
            size=size,
575
            if_etag_match=if_etag_match,
576
            if_etag_not_match='*' if if_not_exist else None,
577
            content_encoding=content_encoding,
578
            content_disposition=content_disposition,
579
            permissions=sharing,
580
            public=public)
581
        if missing is None:
582
            return obj_headers
583
        num_of_missing = len(missing)
584

    
585
        if upload_cb:
586
            self.progress_bar_gen = upload_cb(nblocks)
587
            for i in range(nblocks + 1 - num_of_missing):
588
                self._cb_next()
589

    
590
        tries = 7
591
        old_failures = 0
592
        try:
593
            while tries and missing:
594
                flying = []
595
                failures = []
596
                for hash in missing:
597
                    offset, block = hmap[hash]
598
                    bird = self._put_block_async(block, hash)
599
                    flying.append(bird)
600
                    unfinished = self._watch_thread_limit(flying)
601
                    for thread in set(flying).difference(unfinished):
602
                        if thread.exception:
603
                            failures.append(thread.kwargs['hash'])
604
                        if thread.isAlive():
605
                            flying.append(thread)
606
                        else:
607
                            self._cb_next()
608
                    flying = unfinished
609
                for thread in flying:
610
                    thread.join()
611
                    if thread.exception:
612
                        failures.append(thread.kwargs['hash'])
613
                    self._cb_next()
614
                missing = failures
615
                if missing and len(missing) == old_failures:
616
                    tries -= 1
617
                old_failures = len(missing)
618
            if missing:
619
                raise ClientError(
620
                    '%s blocks failed to upload' % len(missing),
621
                    details=['%s' % thread.exception for thread in missing])
622
        except KeyboardInterrupt:
623
            sendlog.info('- - - wait for threads to finish')
624
            for thread in activethreads():
625
                thread.join()
626
            raise
627

    
628
        r = self.object_put(
629
            obj,
630
            format='json',
631
            hashmap=True,
632
            content_type=content_type,
633
            content_encoding=content_encoding,
634
            if_etag_match=if_etag_match,
635
            if_etag_not_match='*' if if_not_exist else None,
636
            etag=etag,
637
            json=hashmap,
638
            permissions=sharing,
639
            public=public,
640
            success=201)
641
        return r.headers
642

    
643
    # download_* auxiliary methods
644
    def _get_remote_blocks_info(self, obj, **restargs):
645
        #retrieve object hashmap
646
        myrange = restargs.pop('data_range', None)
647
        hashmap = self.get_object_hashmap(obj, **restargs)
648
        restargs['data_range'] = myrange
649
        blocksize = int(hashmap['block_size'])
650
        blockhash = hashmap['block_hash']
651
        total_size = hashmap['bytes']
652
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
653
        map_dict = {}
654
        for i, h in enumerate(hashmap['hashes']):
655
            #  map_dict[h] = i   CHAGE
656
            if h in map_dict:
657
                map_dict[h].append(i)
658
            else:
659
                map_dict[h] = [i]
660
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
661

    
662
    def _dump_blocks_sync(
663
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
664
            **args):
665
        if not total_size:
666
            return
667
        for blockid, blockhash in enumerate(remote_hashes):
668
            if blockhash:
669
                start = blocksize * blockid
670
                is_last = start + blocksize > total_size
671
                end = (total_size - 1) if is_last else (start + blocksize - 1)
672
                data_range = _range_up(start, end, total_size, crange)
673
                if not data_range:
674
                    self._cb_next()
675
                    continue
676
                args['data_range'] = 'bytes=%s' % data_range
677
                r = self.object_get(obj, success=(200, 206), **args)
678
                self._cb_next()
679
                dst.write(r.content)
680
                dst.flush()
681

    
682
    def _get_block_async(self, obj, **args):
683
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
684
        event.start()
685
        return event
686

    
687
    def _hash_from_file(self, fp, start, size, blockhash):
688
        fp.seek(start)
689
        block = readall(fp, size)
690
        h = newhashlib(blockhash)
691
        h.update(block.strip('\x00'))
692
        return hexlify(h.digest())
693

    
694
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
695
        """write the results of a greenleted rest call to a file
696

697
        :param offset: the offset of the file up to blocksize
698
        - e.g. if the range is 10-100, all blocks will be written to
699
        normal_position - 10
700
        """
701
        for key, g in flying.items():
702
            if g.isAlive():
703
                continue
704
            if g.exception:
705
                raise g.exception
706
            block = g.value.content
707
            for block_start in blockids[key]:
708
                local_file.seek(block_start + offset)
709
                local_file.write(block)
710
                self._cb_next()
711
            flying.pop(key)
712
            blockids.pop(key)
713
        local_file.flush()
714

    
715
    def _dump_blocks_async(
716
            self, obj, remote_hashes, blocksize, total_size, local_file,
717
            blockhash=None, resume=False, filerange=None, **restargs):
718
        file_size = fstat(local_file.fileno()).st_size if resume else 0
719
        flying = dict()
720
        blockid_dict = dict()
721
        offset = 0
722

    
723
        self._init_thread_limit()
724
        for block_hash, blockids in remote_hashes.items():
725
            blockids = [blk * blocksize for blk in blockids]
726
            unsaved = [blk for blk in blockids if not (
727
                blk < file_size and block_hash == self._hash_from_file(
728
                        local_file, blk, blocksize, blockhash))]
729
            self._cb_next(len(blockids) - len(unsaved))
730
            if unsaved:
731
                key = unsaved[0]
732
                self._watch_thread_limit(flying.values())
733
                self._thread2file(
734
                    flying, blockid_dict, local_file, offset,
735
                    **restargs)
736
                end = total_size - 1 if (
737
                    key + blocksize > total_size) else key + blocksize - 1
738
                data_range = _range_up(key, end, total_size, filerange)
739
                if not data_range:
740
                    self._cb_next()
741
                    continue
742
                restargs['async_headers'] = {'Range': 'bytes=%s' % data_range}
743
                flying[key] = self._get_block_async(obj, **restargs)
744
                blockid_dict[key] = unsaved
745

    
746
        for thread in flying.values():
747
            thread.join()
748
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
749

    
750
    def download_object(
751
            self, obj, dst,
752
            download_cb=None,
753
            version=None,
754
            resume=False,
755
            range_str=None,
756
            if_match=None,
757
            if_none_match=None,
758
            if_modified_since=None,
759
            if_unmodified_since=None):
760
        """Download an object (multiple connections, random blocks)
761

762
        :param obj: (str) remote object path
763

764
        :param dst: open file descriptor (wb+)
765

766
        :param download_cb: optional progress.bar object for downloading
767

768
        :param version: (str) file version
769

770
        :param resume: (bool) if set, preserve already downloaded file parts
771

772
        :param range_str: (str) from, to are file positions (int) in bytes
773

774
        :param if_match: (str)
775

776
        :param if_none_match: (str)
777

778
        :param if_modified_since: (str) formated date
779

780
        :param if_unmodified_since: (str) formated date"""
781
        restargs = dict(
782
            version=version,
783
            data_range=None if range_str is None else 'bytes=%s' % range_str,
784
            if_match=if_match,
785
            if_none_match=if_none_match,
786
            if_modified_since=if_modified_since,
787
            if_unmodified_since=if_unmodified_since)
788

    
789
        (
790
            blocksize,
791
            blockhash,
792
            total_size,
793
            hash_list,
794
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
795
        assert total_size >= 0
796

    
797
        if download_cb:
798
            self.progress_bar_gen = download_cb(len(hash_list))
799
            self._cb_next()
800

    
801
        if dst.isatty():
802
            self._dump_blocks_sync(
803
                obj,
804
                hash_list,
805
                blocksize,
806
                total_size,
807
                dst,
808
                range_str,
809
                **restargs)
810
        else:
811
            self._dump_blocks_async(
812
                obj,
813
                remote_hashes,
814
                blocksize,
815
                total_size,
816
                dst,
817
                blockhash,
818
                resume,
819
                range_str,
820
                **restargs)
821
            if not range_str:
822
                dst.truncate(total_size)
823

    
824
        self._complete_cb()
825

    
826
    def download_to_string(
827
            self, obj,
828
            download_cb=None,
829
            version=None,
830
            range_str=None,
831
            if_match=None,
832
            if_none_match=None,
833
            if_modified_since=None,
834
            if_unmodified_since=None):
835
        """Download an object to a string (multiple connections). This method
836
        uses threads for http requests, but stores all content in memory.
837

838
        :param obj: (str) remote object path
839

840
        :param download_cb: optional progress.bar object for downloading
841

842
        :param version: (str) file version
843

844
        :param range_str: (str) from, to are file positions (int) in bytes
845

846
        :param if_match: (str)
847

848
        :param if_none_match: (str)
849

850
        :param if_modified_since: (str) formated date
851

852
        :param if_unmodified_since: (str) formated date
853

854
        :returns: (str) the whole object contents
855
        """
856
        restargs = dict(
857
            version=version,
858
            data_range=None if range_str is None else 'bytes=%s' % range_str,
859
            if_match=if_match,
860
            if_none_match=if_none_match,
861
            if_modified_since=if_modified_since,
862
            if_unmodified_since=if_unmodified_since)
863

    
864
        (
865
            blocksize,
866
            blockhash,
867
            total_size,
868
            hash_list,
869
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
870
        assert total_size >= 0
871

    
872
        if download_cb:
873
            self.progress_bar_gen = download_cb(len(hash_list))
874
            self._cb_next()
875

    
876
        num_of_blocks = len(remote_hashes)
877
        ret = [''] * num_of_blocks
878
        self._init_thread_limit()
879
        flying = dict()
880
        try:
881
            for blockid, blockhash in enumerate(remote_hashes):
882
                start = blocksize * blockid
883
                is_last = start + blocksize > total_size
884
                end = (total_size - 1) if is_last else (start + blocksize - 1)
885
                data_range_str = _range_up(start, end, end, range_str)
886
                if data_range_str:
887
                    self._watch_thread_limit(flying.values())
888
                    restargs['data_range'] = 'bytes=%s' % data_range_str
889
                    flying[blockid] = self._get_block_async(obj, **restargs)
890
                for runid, thread in flying.items():
891
                    if (blockid + 1) == num_of_blocks:
892
                        thread.join()
893
                    elif thread.isAlive():
894
                        continue
895
                    if thread.exception:
896
                        raise thread.exception
897
                    ret[runid] = thread.value.content
898
                    self._cb_next()
899
                    flying.pop(runid)
900
            return ''.join(ret)
901
        except KeyboardInterrupt:
902
            sendlog.info('- - - wait for threads to finish')
903
            for thread in activethreads():
904
                thread.join()
905

    
906
    #Command Progress Bar method
907
    def _cb_next(self, step=1):
908
        if hasattr(self, 'progress_bar_gen'):
909
            try:
910
                for i in xrange(step):
911
                    self.progress_bar_gen.next()
912
            except:
913
                pass
914

    
915
    def _complete_cb(self):
916
        while True:
917
            try:
918
                self.progress_bar_gen.next()
919
            except:
920
                break
921

    
922
    def get_object_hashmap(
923
            self, obj,
924
            version=None,
925
            if_match=None,
926
            if_none_match=None,
927
            if_modified_since=None,
928
            if_unmodified_since=None):
929
        """
930
        :param obj: (str) remote object path
931

932
        :param if_match: (str)
933

934
        :param if_none_match: (str)
935

936
        :param if_modified_since: (str) formated date
937

938
        :param if_unmodified_since: (str) formated date
939

940
        :returns: (list)
941
        """
942
        try:
943
            r = self.object_get(
944
                obj,
945
                hashmap=True,
946
                version=version,
947
                if_etag_match=if_match,
948
                if_etag_not_match=if_none_match,
949
                if_modified_since=if_modified_since,
950
                if_unmodified_since=if_unmodified_since)
951
        except ClientError as err:
952
            if err.status == 304 or err.status == 412:
953
                return {}
954
            raise
955
        return r.json
956

    
957
    def set_account_group(self, group, usernames):
958
        """
959
        :param group: (str)
960

961
        :param usernames: (list)
962
        """
963
        r = self.account_post(update=True, groups={group: usernames})
964
        return r
965

    
966
    def del_account_group(self, group):
967
        """
968
        :param group: (str)
969
        """
970
        self.account_post(update=True, groups={group: []})
971

    
972
    def get_account_info(self, until=None):
973
        """
974
        :param until: (str) formated date
975

976
        :returns: (dict)
977
        """
978
        r = self.account_head(until=until)
979
        if r.status_code == 401:
980
            raise ClientError("No authorization", status=401)
981
        return r.headers
982

    
983
    def get_account_quota(self):
984
        """
985
        :returns: (dict)
986
        """
987
        return filter_in(
988
            self.get_account_info(),
989
            'X-Account-Policy-Quota',
990
            exactMatch=True)
991

    
992
    #def get_account_versioning(self):
993
    #    """
994
    #    :returns: (dict)
995
    #    """
996
    #    return filter_in(
997
    #        self.get_account_info(),
998
    #        'X-Account-Policy-Versioning',
999
    #        exactMatch=True)
1000

    
1001
    def get_account_meta(self, until=None):
1002
        """
1003
        :param until: (str) formated date
1004

1005
        :returns: (dict)
1006
        """
1007
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1008

    
1009
    def get_account_group(self):
1010
        """
1011
        :returns: (dict)
1012
        """
1013
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1014

    
1015
    def set_account_meta(self, metapairs):
1016
        """
1017
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1018
        """
1019
        assert(type(metapairs) is dict)
1020
        r = self.account_post(update=True, metadata=metapairs)
1021
        return r.headers
1022

    
1023
    def del_account_meta(self, metakey):
1024
        """
1025
        :param metakey: (str) metadatum key
1026
        """
1027
        r = self.account_post(update=True, metadata={metakey: ''})
1028
        return r.headers
1029

    
1030
    #def set_account_quota(self, quota):
1031
    #    """
1032
    #    :param quota: (int)
1033
    #    """
1034
    #    self.account_post(update=True, quota=quota)
1035

    
1036
    #def set_account_versioning(self, versioning):
1037
    #    """
1038
    #    :param versioning: (str)
1039
    #    """
1040
    #    r = self.account_post(update=True, versioning=versioning)
1041
    #    return r.headers
1042

    
1043
    def list_containers(self):
1044
        """
1045
        :returns: (dict)
1046
        """
1047
        r = self.account_get()
1048
        return r.json
1049

    
1050
    def del_container(self, until=None, delimiter=None):
1051
        """
1052
        :param until: (str) formated date
1053

1054
        :param delimiter: (str) with / empty container
1055

1056
        :raises ClientError: 404 Container does not exist
1057

1058
        :raises ClientError: 409 Container is not empty
1059
        """
1060
        self._assert_container()
1061
        r = self.container_delete(
1062
            until=until,
1063
            delimiter=delimiter,
1064
            success=(204, 404, 409))
1065
        if r.status_code == 404:
1066
            raise ClientError(
1067
                'Container "%s" does not exist' % self.container,
1068
                r.status_code)
1069
        elif r.status_code == 409:
1070
            raise ClientError(
1071
                'Container "%s" is not empty' % self.container,
1072
                r.status_code)
1073
        return r.headers
1074

    
1075
    def get_container_versioning(self, container=None):
1076
        """
1077
        :param container: (str)
1078

1079
        :returns: (dict)
1080
        """
1081
        cnt_back_up = self.container
1082
        try:
1083
            self.container = container or cnt_back_up
1084
            return filter_in(
1085
                self.get_container_info(),
1086
                'X-Container-Policy-Versioning')
1087
        finally:
1088
            self.container = cnt_back_up
1089

    
1090
    def get_container_limit(self, container=None):
1091
        """
1092
        :param container: (str)
1093

1094
        :returns: (dict)
1095
        """
1096
        cnt_back_up = self.container
1097
        try:
1098
            self.container = container or cnt_back_up
1099
            return filter_in(
1100
                self.get_container_info(),
1101
                'X-Container-Policy-Quota')
1102
        finally:
1103
            self.container = cnt_back_up
1104

    
1105
    def get_container_info(self, until=None):
1106
        """
1107
        :param until: (str) formated date
1108

1109
        :returns: (dict)
1110

1111
        :raises ClientError: 404 Container not found
1112
        """
1113
        try:
1114
            r = self.container_head(until=until)
1115
        except ClientError as err:
1116
            err.details.append('for container %s' % self.container)
1117
            raise err
1118
        return r.headers
1119

    
1120
    def get_container_meta(self, until=None):
1121
        """
1122
        :param until: (str) formated date
1123

1124
        :returns: (dict)
1125
        """
1126
        return filter_in(
1127
            self.get_container_info(until=until),
1128
            'X-Container-Meta')
1129

    
1130
    def get_container_object_meta(self, until=None):
1131
        """
1132
        :param until: (str) formated date
1133

1134
        :returns: (dict)
1135
        """
1136
        return filter_in(
1137
            self.get_container_info(until=until),
1138
            'X-Container-Object-Meta')
1139

    
1140
    def set_container_meta(self, metapairs):
1141
        """
1142
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1143
        """
1144
        assert(type(metapairs) is dict)
1145
        r = self.container_post(update=True, metadata=metapairs)
1146
        return r.headers
1147

    
1148
    def del_container_meta(self, metakey):
1149
        """
1150
        :param metakey: (str) metadatum key
1151

1152
        :returns: (dict) response headers
1153
        """
1154
        r = self.container_post(update=True, metadata={metakey: ''})
1155
        return r.headers
1156

    
1157
    def set_container_limit(self, limit):
1158
        """
1159
        :param limit: (int)
1160
        """
1161
        r = self.container_post(update=True, quota=limit)
1162
        return r.headers
1163

    
1164
    def set_container_versioning(self, versioning):
1165
        """
1166
        :param versioning: (str)
1167
        """
1168
        r = self.container_post(update=True, versioning=versioning)
1169
        return r.headers
1170

    
1171
    def del_object(self, obj, until=None, delimiter=None):
1172
        """
1173
        :param obj: (str) remote object path
1174

1175
        :param until: (str) formated date
1176

1177
        :param delimiter: (str)
1178
        """
1179
        self._assert_container()
1180
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1181
        return r.headers
1182

    
1183
    def set_object_meta(self, obj, metapairs):
1184
        """
1185
        :param obj: (str) remote object path
1186

1187
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1188
        """
1189
        assert(type(metapairs) is dict)
1190
        r = self.object_post(obj, update=True, metadata=metapairs)
1191
        return r.headers
1192

    
1193
    def del_object_meta(self, obj, metakey):
1194
        """
1195
        :param obj: (str) remote object path
1196

1197
        :param metakey: (str) metadatum key
1198
        """
1199
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1200
        return r.headers
1201

    
1202
    def publish_object(self, obj):
1203
        """
1204
        :param obj: (str) remote object path
1205

1206
        :returns: (str) access url
1207
        """
1208
        self.object_post(obj, update=True, public=True)
1209
        info = self.get_object_info(obj)
1210
        return info['x-object-public']
1211
        pref, sep, rest = self.base_url.partition('//')
1212
        base = rest.split('/')[0]
1213
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1214

    
1215
    def unpublish_object(self, obj):
1216
        """
1217
        :param obj: (str) remote object path
1218
        """
1219
        r = self.object_post(obj, update=True, public=False)
1220
        return r.headers
1221

    
1222
    def get_object_info(self, obj, version=None):
1223
        """
1224
        :param obj: (str) remote object path
1225

1226
        :param version: (str)
1227

1228
        :returns: (dict)
1229
        """
1230
        try:
1231
            r = self.object_head(obj, version=version)
1232
            return r.headers
1233
        except ClientError as ce:
1234
            if ce.status == 404:
1235
                raise ClientError('Object %s not found' % obj, status=404)
1236
            raise
1237

    
1238
    def get_object_meta(self, obj, version=None):
1239
        """
1240
        :param obj: (str) remote object path
1241

1242
        :param version: (str)
1243

1244
        :returns: (dict)
1245
        """
1246
        return filter_in(
1247
            self.get_object_info(obj, version=version),
1248
            'X-Object-Meta')
1249

    
1250
    def get_object_sharing(self, obj):
1251
        """
1252
        :param obj: (str) remote object path
1253

1254
        :returns: (dict)
1255
        """
1256
        r = filter_in(
1257
            self.get_object_info(obj),
1258
            'X-Object-Sharing',
1259
            exactMatch=True)
1260
        reply = {}
1261
        if len(r) > 0:
1262
            perms = r['x-object-sharing'].split(';')
1263
            for perm in perms:
1264
                try:
1265
                    perm.index('=')
1266
                except ValueError:
1267
                    raise ClientError('Incorrect reply format')
1268
                (key, val) = perm.strip().split('=')
1269
                reply[key] = val
1270
        return reply
1271

    
1272
    def set_object_sharing(
1273
            self, obj,
1274
            read_permission=False, write_permission=False):
1275
        """Give read/write permisions to an object.
1276

1277
        :param obj: (str) remote object path
1278

1279
        :param read_permission: (list - bool) users and user groups that get
1280
            read permission for this object - False means all previous read
1281
            permissions will be removed
1282

1283
        :param write_permission: (list - bool) of users and user groups to get
1284
           write permission for this object - False means all previous write
1285
           permissions will be removed
1286

1287
        :returns: (dict) response headers
1288
        """
1289

    
1290
        perms = dict(read=read_permission or '', write=write_permission or '')
1291
        r = self.object_post(obj, update=True, permissions=perms)
1292
        return r.headers
1293

    
1294
    def del_object_sharing(self, obj):
1295
        """
1296
        :param obj: (str) remote object path
1297
        """
1298
        return self.set_object_sharing(obj)
1299

    
1300
    def append_object(self, obj, source_file, upload_cb=None):
1301
        """
1302
        :param obj: (str) remote object path
1303

1304
        :param source_file: open file descriptor
1305

1306
        :param upload_db: progress.bar for uploading
1307
        """
1308
        self._assert_container()
1309
        meta = self.get_container_info()
1310
        blocksize = int(meta['x-container-block-size'])
1311
        filesize = fstat(source_file.fileno()).st_size
1312
        nblocks = 1 + (filesize - 1) // blocksize
1313
        offset = 0
1314
        headers = {}
1315
        if upload_cb:
1316
            self.progress_bar_gen = upload_cb(nblocks)
1317
            self._cb_next()
1318
        flying = {}
1319
        self._init_thread_limit()
1320
        try:
1321
            for i in range(nblocks):
1322
                block = source_file.read(min(blocksize, filesize - offset))
1323
                offset += len(block)
1324

    
1325
                self._watch_thread_limit(flying.values())
1326
                unfinished = {}
1327
                flying[i] = SilentEvent(
1328
                    method=self.object_post,
1329
                    obj=obj,
1330
                    update=True,
1331
                    content_range='bytes */*',
1332
                    content_type='application/octet-stream',
1333
                    content_length=len(block),
1334
                    data=block)
1335
                flying[i].start()
1336

    
1337
                for key, thread in flying.items():
1338
                    if thread.isAlive():
1339
                        if i < nblocks:
1340
                            unfinished[key] = thread
1341
                            continue
1342
                        thread.join()
1343
                    if thread.exception:
1344
                        raise thread.exception
1345
                    headers[key] = thread.value.headers
1346
                    self._cb_next()
1347
                flying = unfinished
1348
        except KeyboardInterrupt:
1349
            sendlog.info('- - - wait for threads to finish')
1350
            for thread in activethreads():
1351
                thread.join()
1352
        finally:
1353
            from time import sleep
1354
            sleep(2 * len(activethreads()))
1355
        return headers.values()
1356

    
1357
    def truncate_object(self, obj, upto_bytes):
1358
        """
1359
        :param obj: (str) remote object path
1360

1361
        :param upto_bytes: max number of bytes to leave on file
1362

1363
        :returns: (dict) response headers
1364
        """
1365
        r = self.object_post(
1366
            obj,
1367
            update=True,
1368
            content_range='bytes 0-%s/*' % upto_bytes,
1369
            content_type='application/octet-stream',
1370
            object_bytes=upto_bytes,
1371
            source_object=path4url(self.container, obj))
1372
        return r.headers
1373

    
1374
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1375
        """Overwrite a part of an object from local source file
1376

1377
        :param obj: (str) remote object path
1378

1379
        :param start: (int) position in bytes to start overwriting from
1380

1381
        :param end: (int) position in bytes to stop overwriting at
1382

1383
        :param source_file: open file descriptor
1384

1385
        :param upload_db: progress.bar for uploading
1386
        """
1387

    
1388
        r = self.get_object_info(obj)
1389
        rf_size = int(r['content-length'])
1390
        if rf_size < int(start):
1391
            raise ClientError(
1392
                'Range start exceeds file size',
1393
                status=416)
1394
        elif rf_size < int(end):
1395
            raise ClientError(
1396
                'Range end exceeds file size',
1397
                status=416)
1398
        self._assert_container()
1399
        meta = self.get_container_info()
1400
        blocksize = int(meta['x-container-block-size'])
1401
        filesize = fstat(source_file.fileno()).st_size
1402
        datasize = int(end) - int(start) + 1
1403
        nblocks = 1 + (datasize - 1) // blocksize
1404
        offset = 0
1405
        if upload_cb:
1406
            self.progress_bar_gen = upload_cb(nblocks)
1407
            self._cb_next()
1408
        headers = []
1409
        for i in range(nblocks):
1410
            read_size = min(blocksize, filesize - offset, datasize - offset)
1411
            block = source_file.read(read_size)
1412
            r = self.object_post(
1413
                obj,
1414
                update=True,
1415
                content_type='application/octet-stream',
1416
                content_length=len(block),
1417
                content_range='bytes %s-%s/*' % (
1418
                    start + offset,
1419
                    start + offset + len(block) - 1),
1420
                data=block)
1421
            headers.append(dict(r.headers))
1422
            offset += len(block)
1423

    
1424
            self._cb_next
1425
        return headers
1426

    
1427
    def copy_object(
1428
            self, src_container, src_object, dst_container,
1429
            dst_object=None,
1430
            source_version=None,
1431
            source_account=None,
1432
            public=False,
1433
            content_type=None,
1434
            delimiter=None):
1435
        """
1436
        :param src_container: (str) source container
1437

1438
        :param src_object: (str) source object path
1439

1440
        :param dst_container: (str) destination container
1441

1442
        :param dst_object: (str) destination object path
1443

1444
        :param source_version: (str) source object version
1445

1446
        :param source_account: (str) account to copy from
1447

1448
        :param public: (bool)
1449

1450
        :param content_type: (str)
1451

1452
        :param delimiter: (str)
1453

1454
        :returns: (dict) response headers
1455
        """
1456
        self._assert_account()
1457
        self.container = dst_container
1458
        src_path = path4url(src_container, src_object)
1459
        r = self.object_put(
1460
            dst_object or src_object,
1461
            success=201,
1462
            copy_from=src_path,
1463
            content_length=0,
1464
            source_version=source_version,
1465
            source_account=source_account,
1466
            public=public,
1467
            content_type=content_type,
1468
            delimiter=delimiter)
1469
        return r.headers
1470

    
1471
    def move_object(
1472
            self, src_container, src_object, dst_container,
1473
            dst_object=False,
1474
            source_account=None,
1475
            source_version=None,
1476
            public=False,
1477
            content_type=None,
1478
            delimiter=None):
1479
        """
1480
        :param src_container: (str) source container
1481

1482
        :param src_object: (str) source object path
1483

1484
        :param dst_container: (str) destination container
1485

1486
        :param dst_object: (str) destination object path
1487

1488
        :param source_account: (str) account to move from
1489

1490
        :param source_version: (str) source object version
1491

1492
        :param public: (bool)
1493

1494
        :param content_type: (str)
1495

1496
        :param delimiter: (str)
1497

1498
        :returns: (dict) response headers
1499
        """
1500
        self._assert_account()
1501
        self.container = dst_container
1502
        dst_object = dst_object or src_object
1503
        src_path = path4url(src_container, src_object)
1504
        r = self.object_put(
1505
            dst_object,
1506
            success=201,
1507
            move_from=src_path,
1508
            content_length=0,
1509
            source_account=source_account,
1510
            source_version=source_version,
1511
            public=public,
1512
            content_type=content_type,
1513
            delimiter=delimiter)
1514
        return r.headers
1515

    
1516
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1517
        """Get accounts that share with self.account
1518

1519
        :param limit: (str)
1520

1521
        :param marker: (str)
1522

1523
        :returns: (dict)
1524
        """
1525
        self._assert_account()
1526

    
1527
        self.set_param('format', 'json')
1528
        self.set_param('limit', limit, iff=limit is not None)
1529
        self.set_param('marker', marker, iff=marker is not None)
1530

    
1531
        path = ''
1532
        success = kwargs.pop('success', (200, 204))
1533
        r = self.get(path, *args, success=success, **kwargs)
1534
        return r.json
1535

    
1536
    def get_object_versionlist(self, obj):
1537
        """
1538
        :param obj: (str) remote object path
1539

1540
        :returns: (list)
1541
        """
1542
        self._assert_container()
1543
        r = self.object_get(obj, format='json', version='list')
1544
        return r.json['versions']