Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ df0045d8

History | View | Annotate | Download (47.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
            server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                raise ClientError(
446
                    '%s blocks failed to upload' % len(missing),
447
                    details=['%s' % thread.exception for thread in missing])
448
        except KeyboardInterrupt:
449
            sendlog.info('- - - wait for threads to finish')
450
            for thread in activethreads():
451
                thread.join()
452
            raise
453

    
454
        r = self.object_put(
455
            obj,
456
            format='json',
457
            hashmap=True,
458
            content_type=content_type,
459
            if_etag_match=if_etag_match,
460
            if_etag_not_match='*' if if_not_exist else None,
461
            etag=etag,
462
            json=hashmap,
463
            permissions=sharing,
464
            public=public,
465
            success=201)
466
        return r.headers
467

    
468
    def upload_from_string(
469
            self, obj, input_str,
470
            hash_cb=None,
471
            upload_cb=None,
472
            etag=None,
473
            if_etag_match=None,
474
            if_not_exist=None,
475
            content_encoding=None,
476
            content_disposition=None,
477
            content_type=None,
478
            sharing=None,
479
            public=None,
480
            container_info_cache=None):
481
        """Upload an object using multiple connections (threads)
482

483
        :param obj: (str) remote object path
484

485
        :param input_str: (str) upload content
486

487
        :param hash_cb: optional progress.bar object for calculating hashes
488

489
        :param upload_cb: optional progress.bar object for uploading
490

491
        :param etag: (str)
492

493
        :param if_etag_match: (str) Push that value to if-match header at file
494
            creation
495

496
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497
            it does not exist remotely, otherwise the operation will fail.
498
            Involves the case of an object with the same path is created while
499
            the object is being uploaded.
500

501
        :param content_encoding: (str)
502

503
        :param content_disposition: (str)
504

505
        :param content_type: (str)
506

507
        :param sharing: {'read':[user and/or grp names],
508
            'write':[usr and/or grp names]}
509

510
        :param public: (bool)
511

512
        :param container_info_cache: (dict) if given, avoid redundant calls to
513
            server for container info (block size and hash information)
514
        """
515
        self._assert_container()
516

    
517
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
518
                fileobj=None, size=len(input_str), cache=container_info_cache)
519
        (hashes, hmap, offset) = ([], {}, 0)
520
        if not content_type:
521
            content_type = 'application/octet-stream'
522

    
523
        hashes = []
524
        hmap = {}
525
        for blockid in range(nblocks):
526
            start = blockid * blocksize
527
            block = input_str[start: (start + blocksize)]
528
            hashes.append(_pithos_hash(block, blockhash))
529
            hmap[hashes[blockid]] = (start, block)
530

    
531
        hashmap = dict(bytes=size, hashes=hashes)
532
        missing, obj_headers = self._create_object_or_get_missing_hashes(
533
            obj, hashmap,
534
            content_type=content_type,
535
            size=size,
536
            if_etag_match=if_etag_match,
537
            if_etag_not_match='*' if if_not_exist else None,
538
            content_encoding=content_encoding,
539
            content_disposition=content_disposition,
540
            permissions=sharing,
541
            public=public)
542
        if missing is None:
543
            return obj_headers
544
        num_of_missing = len(missing)
545

    
546
        if upload_cb:
547
            self.progress_bar_gen = upload_cb(nblocks)
548
            for i in range(nblocks + 1 - num_of_missing):
549
                self._cb_next()
550

    
551
        tries = 7
552
        old_failures = 0
553
        try:
554
            while tries and missing:
555
                flying = []
556
                failures = []
557
                for hash in missing:
558
                    offset, block = hmap[hash]
559
                    bird = self._put_block_async(block, hash)
560
                    flying.append(bird)
561
                    unfinished = self._watch_thread_limit(flying)
562
                    for thread in set(flying).difference(unfinished):
563
                        if thread.exception:
564
                            failures.append(thread.kwargs['hash'])
565
                        if thread.isAlive():
566
                            flying.append(thread)
567
                        else:
568
                            self._cb_next()
569
                    flying = unfinished
570
                for thread in flying:
571
                    thread.join()
572
                    if thread.exception:
573
                        failures.append(thread.kwargs['hash'])
574
                    self._cb_next()
575
                missing = failures
576
                if missing and len(missing) == old_failures:
577
                    tries -= 1
578
                old_failures = len(missing)
579
            if missing:
580
                raise ClientError(
581
                    '%s blocks failed to upload' % len(missing),
582
                    details=['%s' % thread.exception for thread in missing])
583
        except KeyboardInterrupt:
584
            sendlog.info('- - - wait for threads to finish')
585
            for thread in activethreads():
586
                thread.join()
587
            raise
588

    
589
        r = self.object_put(
590
            obj,
591
            format='json',
592
            hashmap=True,
593
            content_type=content_type,
594
            if_etag_match=if_etag_match,
595
            if_etag_not_match='*' if if_not_exist else None,
596
            etag=etag,
597
            json=hashmap,
598
            permissions=sharing,
599
            public=public,
600
            success=201)
601
        return r.headers
602

    
603
    # download_* auxiliary methods
604
    def _get_remote_blocks_info(self, obj, **restargs):
605
        #retrieve object hashmap
606
        myrange = restargs.pop('data_range', None)
607
        hashmap = self.get_object_hashmap(obj, **restargs)
608
        restargs['data_range'] = myrange
609
        blocksize = int(hashmap['block_size'])
610
        blockhash = hashmap['block_hash']
611
        total_size = hashmap['bytes']
612
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613
        map_dict = {}
614
        for i, h in enumerate(hashmap['hashes']):
615
            #  map_dict[h] = i   CHAGE
616
            if h in map_dict:
617
                map_dict[h].append(i)
618
            else:
619
                map_dict[h] = [i]
620
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621

    
622
    def _dump_blocks_sync(
623
            self, obj, remote_hashes, blocksize, total_size, dst, range,
624
            **args):
625
        for blockid, blockhash in enumerate(remote_hashes):
626
            if blockhash:
627
                start = blocksize * blockid
628
                is_last = start + blocksize > total_size
629
                end = (total_size - 1) if is_last else (start + blocksize - 1)
630
                (start, end) = _range_up(start, end, range)
631
                args['data_range'] = 'bytes=%s-%s' % (start, end)
632
                r = self.object_get(obj, success=(200, 206), **args)
633
                self._cb_next()
634
                dst.write(r.content)
635
                dst.flush()
636

    
637
    def _get_block_async(self, obj, **args):
638
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639
        event.start()
640
        return event
641

    
642
    def _hash_from_file(self, fp, start, size, blockhash):
643
        fp.seek(start)
644
        block = fp.read(size)
645
        h = newhashlib(blockhash)
646
        h.update(block.strip('\x00'))
647
        return hexlify(h.digest())
648

    
649
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650
        """write the results of a greenleted rest call to a file
651

652
        :param offset: the offset of the file up to blocksize
653
        - e.g. if the range is 10-100, all blocks will be written to
654
        normal_position - 10
655
        """
656
        for key, g in flying.items():
657
            if g.isAlive():
658
                continue
659
            if g.exception:
660
                raise g.exception
661
            block = g.value.content
662
            for block_start in blockids[key]:
663
                local_file.seek(block_start + offset)
664
                local_file.write(block)
665
                self._cb_next()
666
            flying.pop(key)
667
            blockids.pop(key)
668
        local_file.flush()
669

    
670
    def _dump_blocks_async(
671
            self, obj, remote_hashes, blocksize, total_size, local_file,
672
            blockhash=None, resume=False, filerange=None, **restargs):
673
        file_size = fstat(local_file.fileno()).st_size if resume else 0
674
        flying = dict()
675
        blockid_dict = dict()
676
        offset = 0
677
        if filerange is not None:
678
            rstart = int(filerange.split('-')[0])
679
            offset = rstart if blocksize > rstart else rstart % blocksize
680

    
681
        self._init_thread_limit()
682
        for block_hash, blockids in remote_hashes.items():
683
            blockids = [blk * blocksize for blk in blockids]
684
            unsaved = [blk for blk in blockids if not (
685
                blk < file_size and block_hash == self._hash_from_file(
686
                        local_file, blk, blocksize, blockhash))]
687
            self._cb_next(len(blockids) - len(unsaved))
688
            if unsaved:
689
                key = unsaved[0]
690
                self._watch_thread_limit(flying.values())
691
                self._thread2file(
692
                    flying, blockid_dict, local_file, offset,
693
                    **restargs)
694
                end = total_size - 1 if (
695
                    key + blocksize > total_size) else key + blocksize - 1
696
                start, end = _range_up(key, end, filerange)
697
                if start == end:
698
                    self._cb_next()
699
                    continue
700
                restargs['async_headers'] = {
701
                    'Range': 'bytes=%s-%s' % (start, end)}
702
                flying[key] = self._get_block_async(obj, **restargs)
703
                blockid_dict[key] = unsaved
704

    
705
        for thread in flying.values():
706
            thread.join()
707
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708

    
709
    def download_object(
710
            self, obj, dst,
711
            download_cb=None,
712
            version=None,
713
            resume=False,
714
            range_str=None,
715
            if_match=None,
716
            if_none_match=None,
717
            if_modified_since=None,
718
            if_unmodified_since=None):
719
        """Download an object (multiple connections, random blocks)
720

721
        :param obj: (str) remote object path
722

723
        :param dst: open file descriptor (wb+)
724

725
        :param download_cb: optional progress.bar object for downloading
726

727
        :param version: (str) file version
728

729
        :param resume: (bool) if set, preserve already downloaded file parts
730

731
        :param range_str: (str) from, to are file positions (int) in bytes
732

733
        :param if_match: (str)
734

735
        :param if_none_match: (str)
736

737
        :param if_modified_since: (str) formated date
738

739
        :param if_unmodified_since: (str) formated date"""
740
        restargs = dict(
741
            version=version,
742
            data_range=None if range_str is None else 'bytes=%s' % range_str,
743
            if_match=if_match,
744
            if_none_match=if_none_match,
745
            if_modified_since=if_modified_since,
746
            if_unmodified_since=if_unmodified_since)
747

    
748
        (
749
            blocksize,
750
            blockhash,
751
            total_size,
752
            hash_list,
753
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754
        assert total_size >= 0
755

    
756
        if download_cb:
757
            self.progress_bar_gen = download_cb(len(hash_list))
758
            self._cb_next()
759

    
760
        if dst.isatty():
761
            self._dump_blocks_sync(
762
                obj,
763
                hash_list,
764
                blocksize,
765
                total_size,
766
                dst,
767
                range_str,
768
                **restargs)
769
        else:
770
            self._dump_blocks_async(
771
                obj,
772
                remote_hashes,
773
                blocksize,
774
                total_size,
775
                dst,
776
                blockhash,
777
                resume,
778
                range_str,
779
                **restargs)
780
            if not range_str:
781
                dst.truncate(total_size)
782

    
783
        self._complete_cb()
784

    
785
    def download_to_string(
786
            self, obj,
787
            download_cb=None,
788
            version=None,
789
            range_str=None,
790
            if_match=None,
791
            if_none_match=None,
792
            if_modified_since=None,
793
            if_unmodified_since=None):
794
        """Download an object to a string (multiple connections). This method
795
        uses threads for http requests, but stores all content in memory.
796

797
        :param obj: (str) remote object path
798

799
        :param download_cb: optional progress.bar object for downloading
800

801
        :param version: (str) file version
802

803
        :param range_str: (str) from, to are file positions (int) in bytes
804

805
        :param if_match: (str)
806

807
        :param if_none_match: (str)
808

809
        :param if_modified_since: (str) formated date
810

811
        :param if_unmodified_since: (str) formated date
812

813
        :returns: (str) the whole object contents
814
        """
815
        restargs = dict(
816
            version=version,
817
            data_range=None if range_str is None else 'bytes=%s' % range_str,
818
            if_match=if_match,
819
            if_none_match=if_none_match,
820
            if_modified_since=if_modified_since,
821
            if_unmodified_since=if_unmodified_since)
822

    
823
        (
824
            blocksize,
825
            blockhash,
826
            total_size,
827
            hash_list,
828
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829
        assert total_size >= 0
830

    
831
        if download_cb:
832
            self.progress_bar_gen = download_cb(len(hash_list))
833
            self._cb_next()
834

    
835
        num_of_blocks = len(remote_hashes)
836
        ret = [''] * num_of_blocks
837
        self._init_thread_limit()
838
        flying = dict()
839
        try:
840
            for blockid, blockhash in enumerate(remote_hashes):
841
                start = blocksize * blockid
842
                is_last = start + blocksize > total_size
843
                end = (total_size - 1) if is_last else (start + blocksize - 1)
844
                (start, end) = _range_up(start, end, range_str)
845
                if start < end:
846
                    self._watch_thread_limit(flying.values())
847
                    flying[blockid] = self._get_block_async(obj, **restargs)
848
                for runid, thread in flying.items():
849
                    if (blockid + 1) == num_of_blocks:
850
                        thread.join()
851
                    elif thread.isAlive():
852
                        continue
853
                    if thread.exception:
854
                        raise thread.exception
855
                    ret[runid] = thread.value.content
856
                    self._cb_next()
857
                    flying.pop(runid)
858
            return ''.join(ret)
859
        except KeyboardInterrupt:
860
            sendlog.info('- - - wait for threads to finish')
861
            for thread in activethreads():
862
                thread.join()
863

    
864
    #Command Progress Bar method
865
    def _cb_next(self, step=1):
866
        if hasattr(self, 'progress_bar_gen'):
867
            try:
868
                for i in xrange(step):
869
                    self.progress_bar_gen.next()
870
            except:
871
                pass
872

    
873
    def _complete_cb(self):
874
        while True:
875
            try:
876
                self.progress_bar_gen.next()
877
            except:
878
                break
879

    
880
    def get_object_hashmap(
881
            self, obj,
882
            version=None,
883
            if_match=None,
884
            if_none_match=None,
885
            if_modified_since=None,
886
            if_unmodified_since=None,
887
            data_range=None):
888
        """
889
        :param obj: (str) remote object path
890

891
        :param if_match: (str)
892

893
        :param if_none_match: (str)
894

895
        :param if_modified_since: (str) formated date
896

897
        :param if_unmodified_since: (str) formated date
898

899
        :param data_range: (str) from-to where from and to are integers
900
            denoting file positions in bytes
901

902
        :returns: (list)
903
        """
904
        try:
905
            r = self.object_get(
906
                obj,
907
                hashmap=True,
908
                version=version,
909
                if_etag_match=if_match,
910
                if_etag_not_match=if_none_match,
911
                if_modified_since=if_modified_since,
912
                if_unmodified_since=if_unmodified_since,
913
                data_range=data_range)
914
        except ClientError as err:
915
            if err.status == 304 or err.status == 412:
916
                return {}
917
            raise
918
        return r.json
919

    
920
    def set_account_group(self, group, usernames):
921
        """
922
        :param group: (str)
923

924
        :param usernames: (list)
925
        """
926
        r = self.account_post(update=True, groups={group: usernames})
927
        return r
928

    
929
    def del_account_group(self, group):
930
        """
931
        :param group: (str)
932
        """
933
        self.account_post(update=True, groups={group: []})
934

    
935
    def get_account_info(self, until=None):
936
        """
937
        :param until: (str) formated date
938

939
        :returns: (dict)
940
        """
941
        r = self.account_head(until=until)
942
        if r.status_code == 401:
943
            raise ClientError("No authorization", status=401)
944
        return r.headers
945

    
946
    def get_account_quota(self):
947
        """
948
        :returns: (dict)
949
        """
950
        return filter_in(
951
            self.get_account_info(),
952
            'X-Account-Policy-Quota',
953
            exactMatch=True)
954

    
955
    #def get_account_versioning(self):
956
    #    """
957
    #    :returns: (dict)
958
    #    """
959
    #    return filter_in(
960
    #        self.get_account_info(),
961
    #        'X-Account-Policy-Versioning',
962
    #        exactMatch=True)
963

    
964
    def get_account_meta(self, until=None):
965
        """
966
        :param until: (str) formated date
967

968
        :returns: (dict)
969
        """
970
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971

    
972
    def get_account_group(self):
973
        """
974
        :returns: (dict)
975
        """
976
        return filter_in(self.get_account_info(), 'X-Account-Group-')
977

    
978
    def set_account_meta(self, metapairs):
979
        """
980
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
981
        """
982
        assert(type(metapairs) is dict)
983
        r = self.account_post(update=True, metadata=metapairs)
984
        return r.headers
985

    
986
    def del_account_meta(self, metakey):
987
        """
988
        :param metakey: (str) metadatum key
989
        """
990
        r = self.account_post(update=True, metadata={metakey: ''})
991
        return r.headers
992

    
993
    #def set_account_quota(self, quota):
994
    #    """
995
    #    :param quota: (int)
996
    #    """
997
    #    self.account_post(update=True, quota=quota)
998

    
999
    #def set_account_versioning(self, versioning):
1000
    #    """
1001
    #    :param versioning: (str)
1002
    #    """
1003
    #    r = self.account_post(update=True, versioning=versioning)
1004
    #    return r.headers
1005

    
1006
    def list_containers(self):
1007
        """
1008
        :returns: (dict)
1009
        """
1010
        r = self.account_get()
1011
        return r.json
1012

    
1013
    def del_container(self, until=None, delimiter=None):
1014
        """
1015
        :param until: (str) formated date
1016

1017
        :param delimiter: (str) with / empty container
1018

1019
        :raises ClientError: 404 Container does not exist
1020

1021
        :raises ClientError: 409 Container is not empty
1022
        """
1023
        self._assert_container()
1024
        r = self.container_delete(
1025
            until=until,
1026
            delimiter=delimiter,
1027
            success=(204, 404, 409))
1028
        if r.status_code == 404:
1029
            raise ClientError(
1030
                'Container "%s" does not exist' % self.container,
1031
                r.status_code)
1032
        elif r.status_code == 409:
1033
            raise ClientError(
1034
                'Container "%s" is not empty' % self.container,
1035
                r.status_code)
1036
        return r.headers
1037

    
1038
    def get_container_versioning(self, container=None):
1039
        """
1040
        :param container: (str)
1041

1042
        :returns: (dict)
1043
        """
1044
        cnt_back_up = self.container
1045
        try:
1046
            self.container = container or cnt_back_up
1047
            return filter_in(
1048
                self.get_container_info(),
1049
                'X-Container-Policy-Versioning')
1050
        finally:
1051
            self.container = cnt_back_up
1052

    
1053
    def get_container_limit(self, container=None):
1054
        """
1055
        :param container: (str)
1056

1057
        :returns: (dict)
1058
        """
1059
        cnt_back_up = self.container
1060
        try:
1061
            self.container = container or cnt_back_up
1062
            return filter_in(
1063
                self.get_container_info(),
1064
                'X-Container-Policy-Quota')
1065
        finally:
1066
            self.container = cnt_back_up
1067

    
1068
    def get_container_info(self, until=None):
1069
        """
1070
        :param until: (str) formated date
1071

1072
        :returns: (dict)
1073

1074
        :raises ClientError: 404 Container not found
1075
        """
1076
        try:
1077
            r = self.container_head(until=until)
1078
        except ClientError as err:
1079
            err.details.append('for container %s' % self.container)
1080
            raise err
1081
        return r.headers
1082

    
1083
    def get_container_meta(self, until=None):
1084
        """
1085
        :param until: (str) formated date
1086

1087
        :returns: (dict)
1088
        """
1089
        return filter_in(
1090
            self.get_container_info(until=until),
1091
            'X-Container-Meta')
1092

    
1093
    def get_container_object_meta(self, until=None):
1094
        """
1095
        :param until: (str) formated date
1096

1097
        :returns: (dict)
1098
        """
1099
        return filter_in(
1100
            self.get_container_info(until=until),
1101
            'X-Container-Object-Meta')
1102

    
1103
    def set_container_meta(self, metapairs):
1104
        """
1105
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1106
        """
1107
        assert(type(metapairs) is dict)
1108
        r = self.container_post(update=True, metadata=metapairs)
1109
        return r.headers
1110

    
1111
    def del_container_meta(self, metakey):
1112
        """
1113
        :param metakey: (str) metadatum key
1114

1115
        :returns: (dict) response headers
1116
        """
1117
        r = self.container_post(update=True, metadata={metakey: ''})
1118
        return r.headers
1119

    
1120
    def set_container_limit(self, limit):
1121
        """
1122
        :param limit: (int)
1123
        """
1124
        r = self.container_post(update=True, quota=limit)
1125
        return r.headers
1126

    
1127
    def set_container_versioning(self, versioning):
1128
        """
1129
        :param versioning: (str)
1130
        """
1131
        r = self.container_post(update=True, versioning=versioning)
1132
        return r.headers
1133

    
1134
    def del_object(self, obj, until=None, delimiter=None):
1135
        """
1136
        :param obj: (str) remote object path
1137

1138
        :param until: (str) formated date
1139

1140
        :param delimiter: (str)
1141
        """
1142
        self._assert_container()
1143
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1144
        return r.headers
1145

    
1146
    def set_object_meta(self, obj, metapairs):
1147
        """
1148
        :param obj: (str) remote object path
1149

1150
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1151
        """
1152
        assert(type(metapairs) is dict)
1153
        r = self.object_post(obj, update=True, metadata=metapairs)
1154
        return r.headers
1155

    
1156
    def del_object_meta(self, obj, metakey):
1157
        """
1158
        :param obj: (str) remote object path
1159

1160
        :param metakey: (str) metadatum key
1161
        """
1162
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1163
        return r.headers
1164

    
1165
    def publish_object(self, obj):
1166
        """
1167
        :param obj: (str) remote object path
1168

1169
        :returns: (str) access url
1170
        """
1171
        self.object_post(obj, update=True, public=True)
1172
        info = self.get_object_info(obj)
1173
        pref, sep, rest = self.base_url.partition('//')
1174
        base = rest.split('/')[0]
1175
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1176

    
1177
    def unpublish_object(self, obj):
1178
        """
1179
        :param obj: (str) remote object path
1180
        """
1181
        r = self.object_post(obj, update=True, public=False)
1182
        return r.headers
1183

    
1184
    def get_object_info(self, obj, version=None):
1185
        """
1186
        :param obj: (str) remote object path
1187

1188
        :param version: (str)
1189

1190
        :returns: (dict)
1191
        """
1192
        try:
1193
            r = self.object_head(obj, version=version)
1194
            return r.headers
1195
        except ClientError as ce:
1196
            if ce.status == 404:
1197
                raise ClientError('Object %s not found' % obj, status=404)
1198
            raise
1199

    
1200
    def get_object_meta(self, obj, version=None):
1201
        """
1202
        :param obj: (str) remote object path
1203

1204
        :param version: (str)
1205

1206
        :returns: (dict)
1207
        """
1208
        return filter_in(
1209
            self.get_object_info(obj, version=version),
1210
            'X-Object-Meta')
1211

    
1212
    def get_object_sharing(self, obj):
1213
        """
1214
        :param obj: (str) remote object path
1215

1216
        :returns: (dict)
1217
        """
1218
        r = filter_in(
1219
            self.get_object_info(obj),
1220
            'X-Object-Sharing',
1221
            exactMatch=True)
1222
        reply = {}
1223
        if len(r) > 0:
1224
            perms = r['x-object-sharing'].split(';')
1225
            for perm in perms:
1226
                try:
1227
                    perm.index('=')
1228
                except ValueError:
1229
                    raise ClientError('Incorrect reply format')
1230
                (key, val) = perm.strip().split('=')
1231
                reply[key] = val
1232
        return reply
1233

    
1234
    def set_object_sharing(
1235
            self, obj,
1236
            read_permission=False, write_permission=False):
1237
        """Give read/write permisions to an object.
1238

1239
        :param obj: (str) remote object path
1240

1241
        :param read_permission: (list - bool) users and user groups that get
1242
            read permission for this object - False means all previous read
1243
            permissions will be removed
1244

1245
        :param write_permission: (list - bool) of users and user groups to get
1246
           write permission for this object - False means all previous write
1247
           permissions will be removed
1248

1249
        :returns: (dict) response headers
1250
        """
1251

    
1252
        perms = dict(read=read_permission or '', write=write_permission or '')
1253
        r = self.object_post(obj, update=True, permissions=perms)
1254
        return r.headers
1255

    
1256
    def del_object_sharing(self, obj):
1257
        """
1258
        :param obj: (str) remote object path
1259
        """
1260
        return self.set_object_sharing(obj)
1261

    
1262
    def append_object(self, obj, source_file, upload_cb=None):
1263
        """
1264
        :param obj: (str) remote object path
1265

1266
        :param source_file: open file descriptor
1267

1268
        :param upload_db: progress.bar for uploading
1269
        """
1270
        self._assert_container()
1271
        meta = self.get_container_info()
1272
        blocksize = int(meta['x-container-block-size'])
1273
        filesize = fstat(source_file.fileno()).st_size
1274
        nblocks = 1 + (filesize - 1) // blocksize
1275
        offset = 0
1276
        headers = {}
1277
        if upload_cb:
1278
            self.progress_bar_gen = upload_cb(nblocks)
1279
            self._cb_next()
1280
        flying = {}
1281
        self._init_thread_limit()
1282
        try:
1283
            for i in range(nblocks):
1284
                block = source_file.read(min(blocksize, filesize - offset))
1285
                offset += len(block)
1286

    
1287
                self._watch_thread_limit(flying.values())
1288
                unfinished = {}
1289
                flying[i] = SilentEvent(
1290
                    method=self.object_post,
1291
                    obj=obj,
1292
                    update=True,
1293
                    content_range='bytes */*',
1294
                    content_type='application/octet-stream',
1295
                    content_length=len(block),
1296
                    data=block)
1297
                flying[i].start()
1298

    
1299
                for key, thread in flying.items():
1300
                    if thread.isAlive():
1301
                        if i < nblocks:
1302
                            unfinished[key] = thread
1303
                            continue
1304
                        thread.join()
1305
                    if thread.exception:
1306
                        raise thread.exception
1307
                    headers[key] = thread.value.headers
1308
                    self._cb_next()
1309
                flying = unfinished
1310
        except KeyboardInterrupt:
1311
            sendlog.info('- - - wait for threads to finish')
1312
            for thread in activethreads():
1313
                thread.join()
1314
        finally:
1315
            from time import sleep
1316
            sleep(2 * len(activethreads()))
1317
        return headers.values()
1318

    
1319
    def truncate_object(self, obj, upto_bytes):
1320
        """
1321
        :param obj: (str) remote object path
1322

1323
        :param upto_bytes: max number of bytes to leave on file
1324

1325
        :returns: (dict) response headers
1326
        """
1327
        r = self.object_post(
1328
            obj,
1329
            update=True,
1330
            content_range='bytes 0-%s/*' % upto_bytes,
1331
            content_type='application/octet-stream',
1332
            object_bytes=upto_bytes,
1333
            source_object=path4url(self.container, obj))
1334
        return r.headers
1335

    
1336
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1337
        """Overwrite a part of an object from local source file
1338

1339
        :param obj: (str) remote object path
1340

1341
        :param start: (int) position in bytes to start overwriting from
1342

1343
        :param end: (int) position in bytes to stop overwriting at
1344

1345
        :param source_file: open file descriptor
1346

1347
        :param upload_db: progress.bar for uploading
1348
        """
1349

    
1350
        r = self.get_object_info(obj)
1351
        rf_size = int(r['content-length'])
1352
        if rf_size < int(start):
1353
            raise ClientError(
1354
                'Range start exceeds file size',
1355
                status=416)
1356
        elif rf_size < int(end):
1357
            raise ClientError(
1358
                'Range end exceeds file size',
1359
                status=416)
1360
        self._assert_container()
1361
        meta = self.get_container_info()
1362
        blocksize = int(meta['x-container-block-size'])
1363
        filesize = fstat(source_file.fileno()).st_size
1364
        datasize = int(end) - int(start) + 1
1365
        nblocks = 1 + (datasize - 1) // blocksize
1366
        offset = 0
1367
        if upload_cb:
1368
            self.progress_bar_gen = upload_cb(nblocks)
1369
            self._cb_next()
1370
        headers = []
1371
        for i in range(nblocks):
1372
            read_size = min(blocksize, filesize - offset, datasize - offset)
1373
            block = source_file.read(read_size)
1374
            r = self.object_post(
1375
                obj,
1376
                update=True,
1377
                content_type='application/octet-stream',
1378
                content_length=len(block),
1379
                content_range='bytes %s-%s/*' % (
1380
                    start + offset,
1381
                    start + offset + len(block) - 1),
1382
                data=block)
1383
            headers.append(dict(r.headers))
1384
            offset += len(block)
1385

    
1386
            self._cb_next
1387
        return headers
1388

    
1389
    def copy_object(
1390
            self, src_container, src_object, dst_container,
1391
            dst_object=None,
1392
            source_version=None,
1393
            source_account=None,
1394
            public=False,
1395
            content_type=None,
1396
            delimiter=None):
1397
        """
1398
        :param src_container: (str) source container
1399

1400
        :param src_object: (str) source object path
1401

1402
        :param dst_container: (str) destination container
1403

1404
        :param dst_object: (str) destination object path
1405

1406
        :param source_version: (str) source object version
1407

1408
        :param source_account: (str) account to copy from
1409

1410
        :param public: (bool)
1411

1412
        :param content_type: (str)
1413

1414
        :param delimiter: (str)
1415

1416
        :returns: (dict) response headers
1417
        """
1418
        self._assert_account()
1419
        self.container = dst_container
1420
        src_path = path4url(src_container, src_object)
1421
        r = self.object_put(
1422
            dst_object or src_object,
1423
            success=201,
1424
            copy_from=src_path,
1425
            content_length=0,
1426
            source_version=source_version,
1427
            source_account=source_account,
1428
            public=public,
1429
            content_type=content_type,
1430
            delimiter=delimiter)
1431
        return r.headers
1432

    
1433
    def move_object(
1434
            self, src_container, src_object, dst_container,
1435
            dst_object=False,
1436
            source_account=None,
1437
            source_version=None,
1438
            public=False,
1439
            content_type=None,
1440
            delimiter=None):
1441
        """
1442
        :param src_container: (str) source container
1443

1444
        :param src_object: (str) source object path
1445

1446
        :param dst_container: (str) destination container
1447

1448
        :param dst_object: (str) destination object path
1449

1450
        :param source_account: (str) account to move from
1451

1452
        :param source_version: (str) source object version
1453

1454
        :param public: (bool)
1455

1456
        :param content_type: (str)
1457

1458
        :param delimiter: (str)
1459

1460
        :returns: (dict) response headers
1461
        """
1462
        self._assert_account()
1463
        self.container = dst_container
1464
        dst_object = dst_object or src_object
1465
        src_path = path4url(src_container, src_object)
1466
        r = self.object_put(
1467
            dst_object,
1468
            success=201,
1469
            move_from=src_path,
1470
            content_length=0,
1471
            source_account=source_account,
1472
            source_version=source_version,
1473
            public=public,
1474
            content_type=content_type,
1475
            delimiter=delimiter)
1476
        return r.headers
1477

    
1478
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1479
        """Get accounts that share with self.account
1480

1481
        :param limit: (str)
1482

1483
        :param marker: (str)
1484

1485
        :returns: (dict)
1486
        """
1487
        self._assert_account()
1488

    
1489
        self.set_param('format', 'json')
1490
        self.set_param('limit', limit, iff=limit is not None)
1491
        self.set_param('marker', marker, iff=marker is not None)
1492

    
1493
        path = ''
1494
        success = kwargs.pop('success', (200, 204))
1495
        r = self.get(path, *args, success=success, **kwargs)
1496
        return r.json
1497

    
1498
    def get_object_versionlist(self, obj):
1499
        """
1500
        :param obj: (str) remote object path
1501

1502
        :returns: (list)
1503
        """
1504
        self._assert_container()
1505
        r = self.object_get(obj, format='json', version='list')
1506
        return r.json['versions']