Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ f17121cd

History | View | Annotate | Download (47.4 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
        server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                raise ClientError(
446
                    '%s blocks failed to upload' % len(missing),
447
                    details=['%s' % thread.exception for thread in missing])
448
        except KeyboardInterrupt:
449
            sendlog.info('- - - wait for threads to finish')
450
            for thread in activethreads():
451
                thread.join()
452
            raise
453

    
454
        r = self.object_put(
455
            obj,
456
            format='json',
457
            hashmap=True,
458
            content_type=content_type,
459
            if_etag_match=if_etag_match,
460
            if_etag_not_match='*' if if_not_exist else None,
461
            etag=etag,
462
            json=hashmap,
463
            permissions=sharing,
464
            public=public,
465
            success=201)
466
        return r.headers
467

    
468
    def upload_from_string(
469
            self, obj, input_str,
470
            hash_cb=None,
471
            upload_cb=None,
472
            etag=None,
473
            if_etag_match=None,
474
            if_not_exist=None,
475
            content_encoding=None,
476
            content_disposition=None,
477
            content_type=None,
478
            sharing=None,
479
            public=None,
480
            container_info_cache=None):
481
        """Upload an object using multiple connections (threads)
482

483
        :param obj: (str) remote object path
484

485
        :param input_str: (str) upload content
486

487
        :param hash_cb: optional progress.bar object for calculating hashes
488

489
        :param upload_cb: optional progress.bar object for uploading
490

491
        :param etag: (str)
492

493
        :param if_etag_match: (str) Push that value to if-match header at file
494
            creation
495

496
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497
            it does not exist remotely, otherwise the operation will fail.
498
            Involves the case of an object with the same path is created while
499
            the object is being uploaded.
500

501
        :param content_encoding: (str)
502

503
        :param content_disposition: (str)
504

505
        :param content_type: (str)
506

507
        :param sharing: {'read':[user and/or grp names],
508
            'write':[usr and/or grp names]}
509

510
        :param public: (bool)
511

512
        :param container_info_cache: (dict) if given, avoid redundant calls to
513
        server for container info (block size and hash information)
514
        """
515
        self._assert_container()
516

    
517
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
518
                fileobj=None, size=len(input_str), cache=container_info_cache)
519
        (hashes, hmap, offset) = ([], {}, 0)
520
        if not content_type:
521
            content_type = 'application/octet-stream'
522

    
523
        hashes = []
524
        hmap = {}
525
        for blockid in range(nblocks):
526
            start = blockid * blocksize
527
            block = input_str[start: (start + blocksize)]
528
            hashes.append(_pithos_hash(block, blockhash))
529
            hmap[hashes[blockid]] = (start, block)
530

    
531
        hashmap = dict(bytes=size, hashes=hashes)
532
        missing, obj_headers = self._create_object_or_get_missing_hashes(
533
            obj, hashmap,
534
            content_type=content_type,
535
            size=size,
536
            if_etag_match=if_etag_match,
537
            if_etag_not_match='*' if if_not_exist else None,
538
            content_encoding=content_encoding,
539
            content_disposition=content_disposition,
540
            permissions=sharing,
541
            public=public)
542
        if missing is None:
543
            return obj_headers
544
        num_of_missing = len(missing)
545

    
546
        if upload_cb:
547
            self.progress_bar_gen = upload_cb(nblocks)
548
            for i in range(nblocks + 1 - num_of_missing):
549
                self._cb_next()
550

    
551
        tries = 7
552
        old_failures = 0
553
        try:
554
            while tries and missing:
555
                flying = []
556
                failures = []
557
                for hash in missing:
558
                    offset, block = hmap[hash]
559
                    bird = self._put_block_async(block, hash)
560
                    flying.append(bird)
561
                    unfinished = self._watch_thread_limit(flying)
562
                    for thread in set(flying).difference(unfinished):
563
                        if thread.exception:
564
                            failures.append(thread.kwargs['hash'])
565
                        if thread.isAlive():
566
                            flying.append(thread)
567
                        else:
568
                            self._cb_next()
569
                    flying = unfinished
570
                for thread in flying:
571
                    thread.join()
572
                    if thread.exception:
573
                        failures.append(thread.kwargs['hash'])
574
                    self._cb_next()
575
                missing = failures
576
                if missing and len(missing) == old_failures:
577
                    tries -= 1
578
                old_failures = len(missing)
579
            if missing:
580
                raise ClientError(
581
                    '%s blocks failed to upload' % len(missing),
582
                    details=['%s' % thread.exception for thread in missing])
583
        except KeyboardInterrupt:
584
            sendlog.info('- - - wait for threads to finish')
585
            for thread in activethreads():
586
                thread.join()
587
            raise
588

    
589
        r = self.object_put(
590
            obj,
591
            format='json',
592
            hashmap=True,
593
            content_type=content_type,
594
            if_etag_match=if_etag_match,
595
            if_etag_not_match='*' if if_not_exist else None,
596
            etag=etag,
597
            json=hashmap,
598
            permissions=sharing,
599
            public=public,
600
            success=201)
601
        return r.headers
602

    
603
    # download_* auxiliary methods
604
    def _get_remote_blocks_info(self, obj, **restargs):
605
        #retrieve object hashmap
606
        myrange = restargs.pop('data_range', None)
607
        hashmap = self.get_object_hashmap(obj, **restargs)
608
        restargs['data_range'] = myrange
609
        blocksize = int(hashmap['block_size'])
610
        blockhash = hashmap['block_hash']
611
        total_size = hashmap['bytes']
612
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613
        map_dict = {}
614
        for i, h in enumerate(hashmap['hashes']):
615
            #  map_dict[h] = i   CHAGE
616
            if h in map_dict:
617
                map_dict[h].append(i)
618
            else:
619
                map_dict[h] = [i]
620
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621

    
622
    def _dump_blocks_sync(
623
            self, obj, remote_hashes, blocksize, total_size, dst, range,
624
            **args):
625
        for blockid, blockhash in enumerate(remote_hashes):
626
            if blockhash:
627
                start = blocksize * blockid
628
                is_last = start + blocksize > total_size
629
                end = (total_size - 1) if is_last else (start + blocksize - 1)
630
                (start, end) = _range_up(start, end, range)
631
                args['data_range'] = 'bytes=%s-%s' % (start, end)
632
                r = self.object_get(obj, success=(200, 206), **args)
633
                self._cb_next()
634
                dst.write(r.content)
635
                dst.flush()
636

    
637
    def _get_block_async(self, obj, **args):
638
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639
        event.start()
640
        return event
641

    
642
    def _hash_from_file(self, fp, start, size, blockhash):
643
        fp.seek(start)
644
        block = fp.read(size)
645
        h = newhashlib(blockhash)
646
        h.update(block.strip('\x00'))
647
        return hexlify(h.digest())
648

    
649
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650
        """write the results of a greenleted rest call to a file
651

652
        :param offset: the offset of the file up to blocksize
653
        - e.g. if the range is 10-100, all blocks will be written to
654
        normal_position - 10
655
        """
656
        for key, g in flying.items():
657
            if g.isAlive():
658
                continue
659
            if g.exception:
660
                raise g.exception
661
            block = g.value.content
662
            for block_start in blockids[key]:
663
                local_file.seek(block_start + offset)
664
                local_file.write(block)
665
                self._cb_next()
666
            flying.pop(key)
667
            blockids.pop(key)
668
        local_file.flush()
669

    
670
    def _dump_blocks_async(
671
            self, obj, remote_hashes, blocksize, total_size, local_file,
672
            blockhash=None, resume=False, filerange=None, **restargs):
673
        file_size = fstat(local_file.fileno()).st_size if resume else 0
674
        flying = dict()
675
        blockid_dict = dict()
676
        offset = 0
677
        if filerange is not None:
678
            rstart = int(filerange.split('-')[0])
679
            offset = rstart if blocksize > rstart else rstart % blocksize
680

    
681
        self._init_thread_limit()
682
        for block_hash, blockids in remote_hashes.items():
683
            blockids = [blk * blocksize for blk in blockids]
684
            unsaved = [blk for blk in blockids if not (
685
                blk < file_size and block_hash == self._hash_from_file(
686
                        local_file, blk, blocksize, blockhash))]
687
            self._cb_next(len(blockids) - len(unsaved))
688
            if unsaved:
689
                key = unsaved[0]
690
                self._watch_thread_limit(flying.values())
691
                self._thread2file(
692
                    flying, blockid_dict, local_file, offset,
693
                    **restargs)
694
                end = total_size - 1 if (
695
                    key + blocksize > total_size) else key + blocksize - 1
696
                start, end = _range_up(key, end, filerange)
697
                if start == end:
698
                    self._cb_next()
699
                    continue
700
                restargs['async_headers'] = {
701
                    'Range': 'bytes=%s-%s' % (start, end)}
702
                flying[key] = self._get_block_async(obj, **restargs)
703
                blockid_dict[key] = unsaved
704

    
705
        for thread in flying.values():
706
            thread.join()
707
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708

    
709
    def download_object(
710
            self, obj, dst,
711
            download_cb=None,
712
            version=None,
713
            resume=False,
714
            range_str=None,
715
            if_match=None,
716
            if_none_match=None,
717
            if_modified_since=None,
718
            if_unmodified_since=None):
719
        """Download an object (multiple connections, random blocks)
720

721
        :param obj: (str) remote object path
722

723
        :param dst: open file descriptor (wb+)
724

725
        :param download_cb: optional progress.bar object for downloading
726

727
        :param version: (str) file version
728

729
        :param resume: (bool) if set, preserve already downloaded file parts
730

731
        :param range_str: (str) from, to are file positions (int) in bytes
732

733
        :param if_match: (str)
734

735
        :param if_none_match: (str)
736

737
        :param if_modified_since: (str) formated date
738

739
        :param if_unmodified_since: (str) formated date"""
740
        restargs = dict(
741
            version=version,
742
            data_range=None if range_str is None else 'bytes=%s' % range_str,
743
            if_match=if_match,
744
            if_none_match=if_none_match,
745
            if_modified_since=if_modified_since,
746
            if_unmodified_since=if_unmodified_since)
747

    
748
        (
749
            blocksize,
750
            blockhash,
751
            total_size,
752
            hash_list,
753
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754
        assert total_size >= 0
755

    
756
        if download_cb:
757
            self.progress_bar_gen = download_cb(len(hash_list))
758
            self._cb_next()
759

    
760
        if dst.isatty():
761
            self._dump_blocks_sync(
762
                obj,
763
                hash_list,
764
                blocksize,
765
                total_size,
766
                dst,
767
                range_str,
768
                **restargs)
769
        else:
770
            self._dump_blocks_async(
771
                obj,
772
                remote_hashes,
773
                blocksize,
774
                total_size,
775
                dst,
776
                blockhash,
777
                resume,
778
                range_str,
779
                **restargs)
780
            if not range_str:
781
                dst.truncate(total_size)
782

    
783
        self._complete_cb()
784

    
785
    def download_to_string(
786
            self, obj,
787
            download_cb=None,
788
            version=None,
789
            range_str=None,
790
            if_match=None,
791
            if_none_match=None,
792
            if_modified_since=None,
793
            if_unmodified_since=None):
794
        """Download an object to a string (multiple connections). This method
795
        uses threads for http requests, but stores all content in memory.
796

797
        :param obj: (str) remote object path
798

799
        :param download_cb: optional progress.bar object for downloading
800

801
        :param version: (str) file version
802

803
        :param range_str: (str) from, to are file positions (int) in bytes
804

805
        :param if_match: (str)
806

807
        :param if_none_match: (str)
808

809
        :param if_modified_since: (str) formated date
810

811
        :param if_unmodified_since: (str) formated date
812

813
        :returns: (str) the whole object contents
814
        """
815
        restargs = dict(
816
            version=version,
817
            data_range=None if range_str is None else 'bytes=%s' % range_str,
818
            if_match=if_match,
819
            if_none_match=if_none_match,
820
            if_modified_since=if_modified_since,
821
            if_unmodified_since=if_unmodified_since)
822

    
823
        (
824
            blocksize,
825
            blockhash,
826
            total_size,
827
            hash_list,
828
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829
        assert total_size >= 0
830

    
831
        if download_cb:
832
            self.progress_bar_gen = download_cb(len(hash_list))
833
            self._cb_next()
834

    
835
        num_of_blocks = len(remote_hashes)
836
        ret = [''] * num_of_blocks
837
        self._init_thread_limit()
838
        flying = dict()
839
        try:
840
            for blockid, blockhash in enumerate(remote_hashes):
841
                start = blocksize * blockid
842
                is_last = start + blocksize > total_size
843
                end = (total_size - 1) if is_last else (start + blocksize - 1)
844
                (start, end) = _range_up(start, end, range_str)
845
                if start < end:
846
                    self._watch_thread_limit(flying.values())
847
                    flying[blockid] = self._get_block_async(obj, **restargs)
848
                for runid, thread in flying.items():
849
                    if (blockid + 1) == num_of_blocks:
850
                        thread.join()
851
                    elif thread.isAlive():
852
                        continue
853
                    if thread.exception:
854
                        raise thread.exception
855
                    ret[runid] = thread.value.content
856
                    self._cb_next()
857
                    flying.pop(runid)
858
            return ''.join(ret)
859
        except KeyboardInterrupt:
860
            sendlog.info('- - - wait for threads to finish')
861
            for thread in activethreads():
862
                thread.join()
863

    
864
    #Command Progress Bar method
865
    def _cb_next(self, step=1):
866
        if hasattr(self, 'progress_bar_gen'):
867
            try:
868
                for i in xrange(step):
869
                    self.progress_bar_gen.next()
870
            except:
871
                pass
872

    
873
    def _complete_cb(self):
874
        while True:
875
            try:
876
                self.progress_bar_gen.next()
877
            except:
878
                break
879

    
880
    def get_object_hashmap(
881
            self, obj,
882
            version=None,
883
            if_match=None,
884
            if_none_match=None,
885
            if_modified_since=None,
886
            if_unmodified_since=None,
887
            data_range=None):
888
        """
889
        :param obj: (str) remote object path
890

891
        :param if_match: (str)
892

893
        :param if_none_match: (str)
894

895
        :param if_modified_since: (str) formated date
896

897
        :param if_unmodified_since: (str) formated date
898

899
        :param data_range: (str) from-to where from and to are integers
900
            denoting file positions in bytes
901

902
        :returns: (list)
903
        """
904
        try:
905
            r = self.object_get(
906
                obj,
907
                hashmap=True,
908
                version=version,
909
                if_etag_match=if_match,
910
                if_etag_not_match=if_none_match,
911
                if_modified_since=if_modified_since,
912
                if_unmodified_since=if_unmodified_since,
913
                data_range=data_range)
914
        except ClientError as err:
915
            if err.status == 304 or err.status == 412:
916
                return {}
917
            raise
918
        return r.json
919

    
920
    def set_account_group(self, group, usernames):
921
        """
922
        :param group: (str)
923

924
        :param usernames: (list)
925
        """
926
        r = self.account_post(update=True, groups={group: usernames})
927
        return r
928

    
929
    def del_account_group(self, group):
930
        """
931
        :param group: (str)
932
        """
933
        self.account_post(update=True, groups={group: []})
934

    
935
    def get_account_info(self, until=None):
936
        """
937
        :param until: (str) formated date
938

939
        :returns: (dict)
940
        """
941
        r = self.account_head(until=until)
942
        if r.status_code == 401:
943
            raise ClientError("No authorization", status=401)
944
        return r.headers
945

    
946
    def get_account_quota(self):
947
        """
948
        :returns: (dict)
949
        """
950
        return filter_in(
951
            self.get_account_info(),
952
            'X-Account-Policy-Quota',
953
            exactMatch=True)
954

    
955
    def get_account_versioning(self):
956
        """
957
        :returns: (dict)
958
        """
959
        return filter_in(
960
            self.get_account_info(),
961
            'X-Account-Policy-Versioning',
962
            exactMatch=True)
963

    
964
    def get_account_meta(self, until=None):
965
        """
966
        :meta until: (str) formated date
967

968
        :returns: (dict)
969
        """
970
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971

    
972
    def get_account_group(self):
973
        """
974
        :returns: (dict)
975
        """
976
        return filter_in(self.get_account_info(), 'X-Account-Group-')
977

    
978
    def set_account_meta(self, metapairs):
979
        """
980
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
981
        """
982
        assert(type(metapairs) is dict)
983
        r = self.account_post(update=True, metadata=metapairs)
984
        return r.headers
985

    
986
    def del_account_meta(self, metakey):
987
        """
988
        :param metakey: (str) metadatum key
989
        """
990
        r = self.account_post(update=True, metadata={metakey: ''})
991
        return r.headers
992

    
993
    """
994
    def set_account_quota(self, quota):
995
        ""
996
        :param quota: (int)
997
        ""
998
        self.account_post(update=True, quota=quota)
999
    """
1000

    
1001
    def set_account_versioning(self, versioning):
1002
        """
1003
        "param versioning: (str)
1004
        """
1005
        r = self.account_post(update=True, versioning=versioning)
1006
        return r.headers
1007

    
1008
    def list_containers(self):
1009
        """
1010
        :returns: (dict)
1011
        """
1012
        r = self.account_get()
1013
        return r.json
1014

    
1015
    def del_container(self, until=None, delimiter=None):
1016
        """
1017
        :param until: (str) formated date
1018

1019
        :param delimiter: (str) with / empty container
1020

1021
        :raises ClientError: 404 Container does not exist
1022

1023
        :raises ClientError: 409 Container is not empty
1024
        """
1025
        self._assert_container()
1026
        r = self.container_delete(
1027
            until=until,
1028
            delimiter=delimiter,
1029
            success=(204, 404, 409))
1030
        if r.status_code == 404:
1031
            raise ClientError(
1032
                'Container "%s" does not exist' % self.container,
1033
                r.status_code)
1034
        elif r.status_code == 409:
1035
            raise ClientError(
1036
                'Container "%s" is not empty' % self.container,
1037
                r.status_code)
1038
        return r.headers
1039

    
1040
    def get_container_versioning(self, container=None):
1041
        """
1042
        :param container: (str)
1043

1044
        :returns: (dict)
1045
        """
1046
        cnt_back_up = self.container
1047
        try:
1048
            self.container = container or cnt_back_up
1049
            return filter_in(
1050
                self.get_container_info(),
1051
                'X-Container-Policy-Versioning')
1052
        finally:
1053
            self.container = cnt_back_up
1054

    
1055
    def get_container_limit(self, container=None):
1056
        """
1057
        :param container: (str)
1058

1059
        :returns: (dict)
1060
        """
1061
        cnt_back_up = self.container
1062
        try:
1063
            self.container = container or cnt_back_up
1064
            return filter_in(
1065
                self.get_container_info(),
1066
                'X-Container-Policy-Quota')
1067
        finally:
1068
            self.container = cnt_back_up
1069

    
1070
    def get_container_info(self, until=None):
1071
        """
1072
        :param until: (str) formated date
1073

1074
        :returns: (dict)
1075

1076
        :raises ClientError: 404 Container not found
1077
        """
1078
        try:
1079
            r = self.container_head(until=until)
1080
        except ClientError as err:
1081
            err.details.append('for container %s' % self.container)
1082
            raise err
1083
        return r.headers
1084

    
1085
    def get_container_meta(self, until=None):
1086
        """
1087
        :param until: (str) formated date
1088

1089
        :returns: (dict)
1090
        """
1091
        return filter_in(
1092
            self.get_container_info(until=until),
1093
            'X-Container-Meta')
1094

    
1095
    def get_container_object_meta(self, until=None):
1096
        """
1097
        :param until: (str) formated date
1098

1099
        :returns: (dict)
1100
        """
1101
        return filter_in(
1102
            self.get_container_info(until=until),
1103
            'X-Container-Object-Meta')
1104

    
1105
    def set_container_meta(self, metapairs):
1106
        """
1107
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1108
        """
1109
        assert(type(metapairs) is dict)
1110
        r = self.container_post(update=True, metadata=metapairs)
1111
        return r.headers
1112

    
1113
    def del_container_meta(self, metakey):
1114
        """
1115
        :param metakey: (str) metadatum key
1116
        """
1117
        r = self.container_post(update=True, metadata={metakey: ''})
1118
        return r.headers
1119

    
1120
    def set_container_limit(self, limit):
1121
        """
1122
        :param limit: (int)
1123
        """
1124
        r = self.container_post(update=True, quota=limit)
1125
        return r.headers
1126

    
1127
    def set_container_versioning(self, versioning):
1128
        """
1129
        :param versioning: (str)
1130
        """
1131
        r = self.container_post(update=True, versioning=versioning)
1132
        return r.headers
1133

    
1134
    def del_object(self, obj, until=None, delimiter=None):
1135
        """
1136
        :param obj: (str) remote object path
1137

1138
        :param until: (str) formated date
1139

1140
        :param delimiter: (str)
1141
        """
1142
        self._assert_container()
1143
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1144
        return r.headers
1145

    
1146
    def set_object_meta(self, obj, metapairs):
1147
        """
1148
        :param obj: (str) remote object path
1149

1150
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1151
        """
1152
        assert(type(metapairs) is dict)
1153
        r = self.object_post(obj, update=True, metadata=metapairs)
1154
        return r.headers
1155

    
1156
    def del_object_meta(self, obj, metakey):
1157
        """
1158
        :param obj: (str) remote object path
1159

1160
        :param metakey: (str) metadatum key
1161
        """
1162
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1163
        return r.headers
1164

    
1165
    def publish_object(self, obj):
1166
        """
1167
        :param obj: (str) remote object path
1168

1169
        :returns: (str) access url
1170
        """
1171
        self.object_post(obj, update=True, public=True)
1172
        info = self.get_object_info(obj)
1173
        pref, sep, rest = self.base_url.partition('//')
1174
        base = rest.split('/')[0]
1175
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1176

    
1177
    def unpublish_object(self, obj):
1178
        """
1179
        :param obj: (str) remote object path
1180
        """
1181
        r = self.object_post(obj, update=True, public=False)
1182
        return r.headers
1183

    
1184
    def get_object_info(self, obj, version=None):
1185
        """
1186
        :param obj: (str) remote object path
1187

1188
        :param version: (str)
1189

1190
        :returns: (dict)
1191
        """
1192
        try:
1193
            r = self.object_head(obj, version=version)
1194
            return r.headers
1195
        except ClientError as ce:
1196
            if ce.status == 404:
1197
                raise ClientError('Object %s not found' % obj, status=404)
1198
            raise
1199

    
1200
    def get_object_meta(self, obj, version=None):
1201
        """
1202
        :param obj: (str) remote object path
1203

1204
        :param version: (str)
1205

1206
        :returns: (dict)
1207
        """
1208
        return filter_in(
1209
            self.get_object_info(obj, version=version),
1210
            'X-Object-Meta')
1211

    
1212
    def get_object_sharing(self, obj):
1213
        """
1214
        :param obj: (str) remote object path
1215

1216
        :returns: (dict)
1217
        """
1218
        r = filter_in(
1219
            self.get_object_info(obj),
1220
            'X-Object-Sharing',
1221
            exactMatch=True)
1222
        reply = {}
1223
        if len(r) > 0:
1224
            perms = r['x-object-sharing'].split(';')
1225
            for perm in perms:
1226
                try:
1227
                    perm.index('=')
1228
                except ValueError:
1229
                    raise ClientError('Incorrect reply format')
1230
                (key, val) = perm.strip().split('=')
1231
                reply[key] = val
1232
        return reply
1233

    
1234
    def set_object_sharing(
1235
            self, obj,
1236
            read_permission=False, write_permission=False):
1237
        """Give read/write permisions to an object.
1238

1239
        :param obj: (str) remote object path
1240

1241
        :param read_permission: (list - bool) users and user groups that get
1242
            read permission for this object - False means all previous read
1243
            permissions will be removed
1244

1245
        :param write_permission: (list - bool) of users and user groups to get
1246
           write permission for this object - False means all previous write
1247
           permissions will be removed
1248

1249
        :returns: (dict) response headers
1250
        """
1251

    
1252
        perms = dict(read=read_permission or '', write=write_permission or '')
1253
        r = self.object_post(obj, update=True, permissions=perms)
1254
        return r.headers
1255

    
1256
    def del_object_sharing(self, obj):
1257
        """
1258
        :param obj: (str) remote object path
1259
        """
1260
        return self.set_object_sharing(obj)
1261

    
1262
    def append_object(self, obj, source_file, upload_cb=None):
1263
        """
1264
        :param obj: (str) remote object path
1265

1266
        :param source_file: open file descriptor
1267

1268
        :param upload_db: progress.bar for uploading
1269
        """
1270

    
1271
        self._assert_container()
1272
        meta = self.get_container_info()
1273
        blocksize = int(meta['x-container-block-size'])
1274
        filesize = fstat(source_file.fileno()).st_size
1275
        nblocks = 1 + (filesize - 1) // blocksize
1276
        offset = 0
1277
        headers = {}
1278
        if upload_cb:
1279
            self.progress_bar_gen = upload_cb(nblocks)
1280
            self._cb_next()
1281
        flying = {}
1282
        self._init_thread_limit()
1283
        try:
1284
            for i in range(nblocks):
1285
                block = source_file.read(min(blocksize, filesize - offset))
1286
                offset += len(block)
1287

    
1288
                self._watch_thread_limit(flying.values())
1289
                unfinished = {}
1290
                flying[i] = SilentEvent(
1291
                    method=self.object_post,
1292
                    obj=obj,
1293
                    update=True,
1294
                    content_range='bytes */*',
1295
                    content_type='application/octet-stream',
1296
                    content_length=len(block),
1297
                    data=block)
1298
                flying[i].start()
1299

    
1300
                for key, thread in flying.items():
1301
                    if thread.isAlive():
1302
                        if i < nblocks:
1303
                            unfinished[key] = thread
1304
                            continue
1305
                        thread.join()
1306
                    if thread.exception:
1307
                        raise thread.exception
1308
                    headers[key] = thread.value.headers
1309
                    self._cb_next()
1310
                flying = unfinished
1311
        except KeyboardInterrupt:
1312
            sendlog.info('- - - wait for threads to finish')
1313
            for thread in activethreads():
1314
                thread.join()
1315
        return headers.values()
1316

    
1317
    def truncate_object(self, obj, upto_bytes):
1318
        """
1319
        :param obj: (str) remote object path
1320

1321
        :param upto_bytes: max number of bytes to leave on file
1322

1323
        :returns: (dict) response headers
1324
        """
1325
        r = self.object_post(
1326
            obj,
1327
            update=True,
1328
            content_range='bytes 0-%s/*' % upto_bytes,
1329
            content_type='application/octet-stream',
1330
            object_bytes=upto_bytes,
1331
            source_object=path4url(self.container, obj))
1332
        return r.headers
1333

    
1334
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1335
        """Overwrite a part of an object from local source file
1336

1337
        :param obj: (str) remote object path
1338

1339
        :param start: (int) position in bytes to start overwriting from
1340

1341
        :param end: (int) position in bytes to stop overwriting at
1342

1343
        :param source_file: open file descriptor
1344

1345
        :param upload_db: progress.bar for uploading
1346
        """
1347

    
1348
        r = self.get_object_info(obj)
1349
        rf_size = int(r['content-length'])
1350
        if rf_size < int(start):
1351
            raise ClientError(
1352
                'Range start exceeds file size',
1353
                status=416)
1354
        elif rf_size < int(end):
1355
            raise ClientError(
1356
                'Range end exceeds file size',
1357
                status=416)
1358
        self._assert_container()
1359
        meta = self.get_container_info()
1360
        blocksize = int(meta['x-container-block-size'])
1361
        filesize = fstat(source_file.fileno()).st_size
1362
        datasize = int(end) - int(start) + 1
1363
        nblocks = 1 + (datasize - 1) // blocksize
1364
        offset = 0
1365
        if upload_cb:
1366
            self.progress_bar_gen = upload_cb(nblocks)
1367
            self._cb_next()
1368
        headers = []
1369
        for i in range(nblocks):
1370
            read_size = min(blocksize, filesize - offset, datasize - offset)
1371
            block = source_file.read(read_size)
1372
            r = self.object_post(
1373
                obj,
1374
                update=True,
1375
                content_type='application/octet-stream',
1376
                content_length=len(block),
1377
                content_range='bytes %s-%s/*' % (
1378
                    start + offset,
1379
                    start + offset + len(block) - 1),
1380
                data=block)
1381
            headers.append(dict(r.headers))
1382
            offset += len(block)
1383

    
1384
            self._cb_next
1385
        return headers
1386

    
1387
    def copy_object(
1388
            self, src_container, src_object, dst_container,
1389
            dst_object=None,
1390
            source_version=None,
1391
            source_account=None,
1392
            public=False,
1393
            content_type=None,
1394
            delimiter=None):
1395
        """
1396
        :param src_container: (str) source container
1397

1398
        :param src_object: (str) source object path
1399

1400
        :param dst_container: (str) destination container
1401

1402
        :param dst_object: (str) destination object path
1403

1404
        :param source_version: (str) source object version
1405

1406
        :param source_account: (str) account to copy from
1407

1408
        :param public: (bool)
1409

1410
        :param content_type: (str)
1411

1412
        :param delimiter: (str)
1413

1414
        :returns: (dict) response headers
1415
        """
1416
        self._assert_account()
1417
        self.container = dst_container
1418
        src_path = path4url(src_container, src_object)
1419
        r = self.object_put(
1420
            dst_object or src_object,
1421
            success=201,
1422
            copy_from=src_path,
1423
            content_length=0,
1424
            source_version=source_version,
1425
            source_account=source_account,
1426
            public=public,
1427
            content_type=content_type,
1428
            delimiter=delimiter)
1429
        return r.headers
1430

    
1431
    def move_object(
1432
            self, src_container, src_object, dst_container,
1433
            dst_object=False,
1434
            source_account=None,
1435
            source_version=None,
1436
            public=False,
1437
            content_type=None,
1438
            delimiter=None):
1439
        """
1440
        :param src_container: (str) source container
1441

1442
        :param src_object: (str) source object path
1443

1444
        :param dst_container: (str) destination container
1445

1446
        :param dst_object: (str) destination object path
1447

1448
        :param source_account: (str) account to move from
1449

1450
        :param source_version: (str) source object version
1451

1452
        :param public: (bool)
1453

1454
        :param content_type: (str)
1455

1456
        :param delimiter: (str)
1457

1458
        :returns: (dict) response headers
1459
        """
1460
        self._assert_account()
1461
        self.container = dst_container
1462
        dst_object = dst_object or src_object
1463
        src_path = path4url(src_container, src_object)
1464
        r = self.object_put(
1465
            dst_object,
1466
            success=201,
1467
            move_from=src_path,
1468
            content_length=0,
1469
            source_account=source_account,
1470
            source_version=source_version,
1471
            public=public,
1472
            content_type=content_type,
1473
            delimiter=delimiter)
1474
        return r.headers
1475

    
1476
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1477
        """Get accounts that share with self.account
1478

1479
        :param limit: (str)
1480

1481
        :param marker: (str)
1482

1483
        :returns: (dict)
1484
        """
1485
        self._assert_account()
1486

    
1487
        self.set_param('format', 'json')
1488
        self.set_param('limit', limit, iff=limit is not None)
1489
        self.set_param('marker', marker, iff=marker is not None)
1490

    
1491
        path = ''
1492
        success = kwargs.pop('success', (200, 204))
1493
        r = self.get(path, *args, success=success, **kwargs)
1494
        return r.json
1495

    
1496
    def get_object_versionlist(self, obj):
1497
        """
1498
        :param obj: (str) remote object path
1499

1500
        :returns: (list)
1501
        """
1502
        self._assert_container()
1503
        r = self.object_get(obj, format='json', version='list')
1504
        return r.json['versions']