Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 9dc6159f

History | View | Annotate | Download (47.5 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
            server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                raise ClientError(
446
                    '%s blocks failed to upload' % len(missing),
447
                    details=['%s' % thread.exception for thread in missing])
448
        except KeyboardInterrupt:
449
            sendlog.info('- - - wait for threads to finish')
450
            for thread in activethreads():
451
                thread.join()
452
            raise
453

    
454
        r = self.object_put(
455
            obj,
456
            format='json',
457
            hashmap=True,
458
            content_type=content_type,
459
            if_etag_match=if_etag_match,
460
            if_etag_not_match='*' if if_not_exist else None,
461
            etag=etag,
462
            json=hashmap,
463
            permissions=sharing,
464
            public=public,
465
            success=201)
466
        return r.headers
467

    
468
    def upload_from_string(
469
            self, obj, input_str,
470
            hash_cb=None,
471
            upload_cb=None,
472
            etag=None,
473
            if_etag_match=None,
474
            if_not_exist=None,
475
            content_encoding=None,
476
            content_disposition=None,
477
            content_type=None,
478
            sharing=None,
479
            public=None,
480
            container_info_cache=None):
481
        """Upload an object using multiple connections (threads)
482

483
        :param obj: (str) remote object path
484

485
        :param input_str: (str) upload content
486

487
        :param hash_cb: optional progress.bar object for calculating hashes
488

489
        :param upload_cb: optional progress.bar object for uploading
490

491
        :param etag: (str)
492

493
        :param if_etag_match: (str) Push that value to if-match header at file
494
            creation
495

496
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
497
            it does not exist remotely, otherwise the operation will fail.
498
            Involves the case of an object with the same path is created while
499
            the object is being uploaded.
500

501
        :param content_encoding: (str)
502

503
        :param content_disposition: (str)
504

505
        :param content_type: (str)
506

507
        :param sharing: {'read':[user and/or grp names],
508
            'write':[usr and/or grp names]}
509

510
        :param public: (bool)
511

512
        :param container_info_cache: (dict) if given, avoid redundant calls to
513
            server for container info (block size and hash information)
514
        """
515
        self._assert_container()
516

    
517
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
518
                fileobj=None, size=len(input_str), cache=container_info_cache)
519
        (hashes, hmap, offset) = ([], {}, 0)
520
        if not content_type:
521
            content_type = 'application/octet-stream'
522

    
523
        hashes = []
524
        hmap = {}
525
        for blockid in range(nblocks):
526
            start = blockid * blocksize
527
            block = input_str[start: (start + blocksize)]
528
            hashes.append(_pithos_hash(block, blockhash))
529
            hmap[hashes[blockid]] = (start, block)
530

    
531
        hashmap = dict(bytes=size, hashes=hashes)
532
        missing, obj_headers = self._create_object_or_get_missing_hashes(
533
            obj, hashmap,
534
            content_type=content_type,
535
            size=size,
536
            if_etag_match=if_etag_match,
537
            if_etag_not_match='*' if if_not_exist else None,
538
            content_encoding=content_encoding,
539
            content_disposition=content_disposition,
540
            permissions=sharing,
541
            public=public)
542
        if missing is None:
543
            return obj_headers
544
        num_of_missing = len(missing)
545

    
546
        if upload_cb:
547
            self.progress_bar_gen = upload_cb(nblocks)
548
            for i in range(nblocks + 1 - num_of_missing):
549
                self._cb_next()
550

    
551
        tries = 7
552
        old_failures = 0
553
        try:
554
            while tries and missing:
555
                flying = []
556
                failures = []
557
                for hash in missing:
558
                    offset, block = hmap[hash]
559
                    bird = self._put_block_async(block, hash)
560
                    flying.append(bird)
561
                    unfinished = self._watch_thread_limit(flying)
562
                    for thread in set(flying).difference(unfinished):
563
                        if thread.exception:
564
                            failures.append(thread.kwargs['hash'])
565
                        if thread.isAlive():
566
                            flying.append(thread)
567
                        else:
568
                            self._cb_next()
569
                    flying = unfinished
570
                for thread in flying:
571
                    thread.join()
572
                    if thread.exception:
573
                        failures.append(thread.kwargs['hash'])
574
                    self._cb_next()
575
                missing = failures
576
                if missing and len(missing) == old_failures:
577
                    tries -= 1
578
                old_failures = len(missing)
579
            if missing:
580
                raise ClientError(
581
                    '%s blocks failed to upload' % len(missing),
582
                    details=['%s' % thread.exception for thread in missing])
583
        except KeyboardInterrupt:
584
            sendlog.info('- - - wait for threads to finish')
585
            for thread in activethreads():
586
                thread.join()
587
            raise
588

    
589
        r = self.object_put(
590
            obj,
591
            format='json',
592
            hashmap=True,
593
            content_type=content_type,
594
            if_etag_match=if_etag_match,
595
            if_etag_not_match='*' if if_not_exist else None,
596
            etag=etag,
597
            json=hashmap,
598
            permissions=sharing,
599
            public=public,
600
            success=201)
601
        return r.headers
602

    
603
    # download_* auxiliary methods
604
    def _get_remote_blocks_info(self, obj, **restargs):
605
        #retrieve object hashmap
606
        myrange = restargs.pop('data_range', None)
607
        hashmap = self.get_object_hashmap(obj, **restargs)
608
        restargs['data_range'] = myrange
609
        blocksize = int(hashmap['block_size'])
610
        blockhash = hashmap['block_hash']
611
        total_size = hashmap['bytes']
612
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
613
        map_dict = {}
614
        for i, h in enumerate(hashmap['hashes']):
615
            #  map_dict[h] = i   CHAGE
616
            if h in map_dict:
617
                map_dict[h].append(i)
618
            else:
619
                map_dict[h] = [i]
620
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
621

    
622
    def _dump_blocks_sync(
623
            self, obj, remote_hashes, blocksize, total_size, dst, range,
624
            **args):
625
        for blockid, blockhash in enumerate(remote_hashes):
626
            if blockhash:
627
                start = blocksize * blockid
628
                is_last = start + blocksize > total_size
629
                end = (total_size - 1) if is_last else (start + blocksize - 1)
630
                (start, end) = _range_up(start, end, range)
631
                args['data_range'] = 'bytes=%s-%s' % (start, end)
632
                r = self.object_get(obj, success=(200, 206), **args)
633
                self._cb_next()
634
                dst.write(r.content)
635
                dst.flush()
636

    
637
    def _get_block_async(self, obj, **args):
638
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
639
        event.start()
640
        return event
641

    
642
    def _hash_from_file(self, fp, start, size, blockhash):
643
        fp.seek(start)
644
        block = fp.read(size)
645
        h = newhashlib(blockhash)
646
        h.update(block.strip('\x00'))
647
        return hexlify(h.digest())
648

    
649
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
650
        """write the results of a greenleted rest call to a file
651

652
        :param offset: the offset of the file up to blocksize
653
        - e.g. if the range is 10-100, all blocks will be written to
654
        normal_position - 10
655
        """
656
        for key, g in flying.items():
657
            if g.isAlive():
658
                continue
659
            if g.exception:
660
                raise g.exception
661
            block = g.value.content
662
            for block_start in blockids[key]:
663
                local_file.seek(block_start + offset)
664
                local_file.write(block)
665
                self._cb_next()
666
            flying.pop(key)
667
            blockids.pop(key)
668
        local_file.flush()
669

    
670
    def _dump_blocks_async(
671
            self, obj, remote_hashes, blocksize, total_size, local_file,
672
            blockhash=None, resume=False, filerange=None, **restargs):
673
        file_size = fstat(local_file.fileno()).st_size if resume else 0
674
        flying = dict()
675
        blockid_dict = dict()
676
        offset = 0
677
        if filerange is not None:
678
            rstart = int(filerange.split('-')[0])
679
            offset = rstart if blocksize > rstart else rstart % blocksize
680

    
681
        self._init_thread_limit()
682
        for block_hash, blockids in remote_hashes.items():
683
            blockids = [blk * blocksize for blk in blockids]
684
            unsaved = [blk for blk in blockids if not (
685
                blk < file_size and block_hash == self._hash_from_file(
686
                        local_file, blk, blocksize, blockhash))]
687
            self._cb_next(len(blockids) - len(unsaved))
688
            if unsaved:
689
                key = unsaved[0]
690
                self._watch_thread_limit(flying.values())
691
                self._thread2file(
692
                    flying, blockid_dict, local_file, offset,
693
                    **restargs)
694
                end = total_size - 1 if (
695
                    key + blocksize > total_size) else key + blocksize - 1
696
                start, end = _range_up(key, end, filerange)
697
                if start == end:
698
                    self._cb_next()
699
                    continue
700
                restargs['async_headers'] = {
701
                    'Range': 'bytes=%s-%s' % (start, end)}
702
                flying[key] = self._get_block_async(obj, **restargs)
703
                blockid_dict[key] = unsaved
704

    
705
        for thread in flying.values():
706
            thread.join()
707
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
708

    
709
    def download_object(
710
            self, obj, dst,
711
            download_cb=None,
712
            version=None,
713
            resume=False,
714
            range_str=None,
715
            if_match=None,
716
            if_none_match=None,
717
            if_modified_since=None,
718
            if_unmodified_since=None):
719
        """Download an object (multiple connections, random blocks)
720

721
        :param obj: (str) remote object path
722

723
        :param dst: open file descriptor (wb+)
724

725
        :param download_cb: optional progress.bar object for downloading
726

727
        :param version: (str) file version
728

729
        :param resume: (bool) if set, preserve already downloaded file parts
730

731
        :param range_str: (str) from, to are file positions (int) in bytes
732

733
        :param if_match: (str)
734

735
        :param if_none_match: (str)
736

737
        :param if_modified_since: (str) formated date
738

739
        :param if_unmodified_since: (str) formated date"""
740
        restargs = dict(
741
            version=version,
742
            data_range=None if range_str is None else 'bytes=%s' % range_str,
743
            if_match=if_match,
744
            if_none_match=if_none_match,
745
            if_modified_since=if_modified_since,
746
            if_unmodified_since=if_unmodified_since)
747

    
748
        (
749
            blocksize,
750
            blockhash,
751
            total_size,
752
            hash_list,
753
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
754
        assert total_size >= 0
755

    
756
        if download_cb:
757
            self.progress_bar_gen = download_cb(len(hash_list))
758
            self._cb_next()
759

    
760
        if dst.isatty():
761
            self._dump_blocks_sync(
762
                obj,
763
                hash_list,
764
                blocksize,
765
                total_size,
766
                dst,
767
                range_str,
768
                **restargs)
769
        else:
770
            self._dump_blocks_async(
771
                obj,
772
                remote_hashes,
773
                blocksize,
774
                total_size,
775
                dst,
776
                blockhash,
777
                resume,
778
                range_str,
779
                **restargs)
780
            if not range_str:
781
                dst.truncate(total_size)
782

    
783
        self._complete_cb()
784

    
785
    def download_to_string(
786
            self, obj,
787
            download_cb=None,
788
            version=None,
789
            range_str=None,
790
            if_match=None,
791
            if_none_match=None,
792
            if_modified_since=None,
793
            if_unmodified_since=None):
794
        """Download an object to a string (multiple connections). This method
795
        uses threads for http requests, but stores all content in memory.
796

797
        :param obj: (str) remote object path
798

799
        :param download_cb: optional progress.bar object for downloading
800

801
        :param version: (str) file version
802

803
        :param range_str: (str) from, to are file positions (int) in bytes
804

805
        :param if_match: (str)
806

807
        :param if_none_match: (str)
808

809
        :param if_modified_since: (str) formated date
810

811
        :param if_unmodified_since: (str) formated date
812

813
        :returns: (str) the whole object contents
814
        """
815
        restargs = dict(
816
            version=version,
817
            data_range=None if range_str is None else 'bytes=%s' % range_str,
818
            if_match=if_match,
819
            if_none_match=if_none_match,
820
            if_modified_since=if_modified_since,
821
            if_unmodified_since=if_unmodified_since)
822

    
823
        (
824
            blocksize,
825
            blockhash,
826
            total_size,
827
            hash_list,
828
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
829
        assert total_size >= 0
830

    
831
        if download_cb:
832
            self.progress_bar_gen = download_cb(len(hash_list))
833
            self._cb_next()
834

    
835
        num_of_blocks = len(remote_hashes)
836
        ret = [''] * num_of_blocks
837
        self._init_thread_limit()
838
        flying = dict()
839
        try:
840
            for blockid, blockhash in enumerate(remote_hashes):
841
                start = blocksize * blockid
842
                is_last = start + blocksize > total_size
843
                end = (total_size - 1) if is_last else (start + blocksize - 1)
844
                (start, end) = _range_up(start, end, range_str)
845
                if start < end:
846
                    self._watch_thread_limit(flying.values())
847
                    flying[blockid] = self._get_block_async(obj, **restargs)
848
                for runid, thread in flying.items():
849
                    if (blockid + 1) == num_of_blocks:
850
                        thread.join()
851
                    elif thread.isAlive():
852
                        continue
853
                    if thread.exception:
854
                        raise thread.exception
855
                    ret[runid] = thread.value.content
856
                    self._cb_next()
857
                    flying.pop(runid)
858
            return ''.join(ret)
859
        except KeyboardInterrupt:
860
            sendlog.info('- - - wait for threads to finish')
861
            for thread in activethreads():
862
                thread.join()
863

    
864
    #Command Progress Bar method
865
    def _cb_next(self, step=1):
866
        if hasattr(self, 'progress_bar_gen'):
867
            try:
868
                for i in xrange(step):
869
                    self.progress_bar_gen.next()
870
            except:
871
                pass
872

    
873
    def _complete_cb(self):
874
        while True:
875
            try:
876
                self.progress_bar_gen.next()
877
            except:
878
                break
879

    
880
    def get_object_hashmap(
881
            self, obj,
882
            version=None,
883
            if_match=None,
884
            if_none_match=None,
885
            if_modified_since=None,
886
            if_unmodified_since=None,
887
            data_range=None):
888
        """
889
        :param obj: (str) remote object path
890

891
        :param if_match: (str)
892

893
        :param if_none_match: (str)
894

895
        :param if_modified_since: (str) formated date
896

897
        :param if_unmodified_since: (str) formated date
898

899
        :param data_range: (str) from-to where from and to are integers
900
            denoting file positions in bytes
901

902
        :returns: (list)
903
        """
904
        try:
905
            r = self.object_get(
906
                obj,
907
                hashmap=True,
908
                version=version,
909
                if_etag_match=if_match,
910
                if_etag_not_match=if_none_match,
911
                if_modified_since=if_modified_since,
912
                if_unmodified_since=if_unmodified_since,
913
                data_range=data_range)
914
        except ClientError as err:
915
            if err.status == 304 or err.status == 412:
916
                return {}
917
            raise
918
        return r.json
919

    
920
    def set_account_group(self, group, usernames):
921
        """
922
        :param group: (str)
923

924
        :param usernames: (list)
925
        """
926
        r = self.account_post(update=True, groups={group: usernames})
927
        return r
928

    
929
    def del_account_group(self, group):
930
        """
931
        :param group: (str)
932
        """
933
        self.account_post(update=True, groups={group: []})
934

    
935
    def get_account_info(self, until=None):
936
        """
937
        :param until: (str) formated date
938

939
        :returns: (dict)
940
        """
941
        r = self.account_head(until=until)
942
        if r.status_code == 401:
943
            raise ClientError("No authorization", status=401)
944
        return r.headers
945

    
946
    def get_account_quota(self):
947
        """
948
        :returns: (dict)
949
        """
950
        return filter_in(
951
            self.get_account_info(),
952
            'X-Account-Policy-Quota',
953
            exactMatch=True)
954

    
955
    def get_account_versioning(self):
956
        """
957
        :returns: (dict)
958
        """
959
        return filter_in(
960
            self.get_account_info(),
961
            'X-Account-Policy-Versioning',
962
            exactMatch=True)
963

    
964
    def get_account_meta(self, until=None):
965
        """
966
        :meta until: (str) formated date
967

968
        :returns: (dict)
969
        """
970
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
971

    
972
    def get_account_group(self):
973
        """
974
        :returns: (dict)
975
        """
976
        return filter_in(self.get_account_info(), 'X-Account-Group-')
977

    
978
    def set_account_meta(self, metapairs):
979
        """
980
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
981
        """
982
        assert(type(metapairs) is dict)
983
        r = self.account_post(update=True, metadata=metapairs)
984
        return r.headers
985

    
986
    def del_account_meta(self, metakey):
987
        """
988
        :param metakey: (str) metadatum key
989
        """
990
        r = self.account_post(update=True, metadata={metakey: ''})
991
        return r.headers
992

    
993
    """
994
    def set_account_quota(self, quota):
995
        ""
996
        :param quota: (int)
997
        ""
998
        self.account_post(update=True, quota=quota)
999
    """
1000

    
1001
    def set_account_versioning(self, versioning):
1002
        """
1003
        "param versioning: (str)
1004
        """
1005
        r = self.account_post(update=True, versioning=versioning)
1006
        return r.headers
1007

    
1008
    def list_containers(self):
1009
        """
1010
        :returns: (dict)
1011
        """
1012
        r = self.account_get()
1013
        return r.json
1014

    
1015
    def del_container(self, until=None, delimiter=None):
1016
        """
1017
        :param until: (str) formated date
1018

1019
        :param delimiter: (str) with / empty container
1020

1021
        :raises ClientError: 404 Container does not exist
1022

1023
        :raises ClientError: 409 Container is not empty
1024
        """
1025
        self._assert_container()
1026
        r = self.container_delete(
1027
            until=until,
1028
            delimiter=delimiter,
1029
            success=(204, 404, 409))
1030
        if r.status_code == 404:
1031
            raise ClientError(
1032
                'Container "%s" does not exist' % self.container,
1033
                r.status_code)
1034
        elif r.status_code == 409:
1035
            raise ClientError(
1036
                'Container "%s" is not empty' % self.container,
1037
                r.status_code)
1038
        return r.headers
1039

    
1040
    def get_container_versioning(self, container=None):
1041
        """
1042
        :param container: (str)
1043

1044
        :returns: (dict)
1045
        """
1046
        cnt_back_up = self.container
1047
        try:
1048
            self.container = container or cnt_back_up
1049
            return filter_in(
1050
                self.get_container_info(),
1051
                'X-Container-Policy-Versioning')
1052
        finally:
1053
            self.container = cnt_back_up
1054

    
1055
    def get_container_limit(self, container=None):
1056
        """
1057
        :param container: (str)
1058

1059
        :returns: (dict)
1060
        """
1061
        cnt_back_up = self.container
1062
        try:
1063
            self.container = container or cnt_back_up
1064
            return filter_in(
1065
                self.get_container_info(),
1066
                'X-Container-Policy-Quota')
1067
        finally:
1068
            self.container = cnt_back_up
1069

    
1070
    def get_container_info(self, until=None):
1071
        """
1072
        :param until: (str) formated date
1073

1074
        :returns: (dict)
1075

1076
        :raises ClientError: 404 Container not found
1077
        """
1078
        try:
1079
            r = self.container_head(until=until)
1080
        except ClientError as err:
1081
            err.details.append('for container %s' % self.container)
1082
            raise err
1083
        return r.headers
1084

    
1085
    def get_container_meta(self, until=None):
1086
        """
1087
        :param until: (str) formated date
1088

1089
        :returns: (dict)
1090
        """
1091
        return filter_in(
1092
            self.get_container_info(until=until),
1093
            'X-Container-Meta')
1094

    
1095
    def get_container_object_meta(self, until=None):
1096
        """
1097
        :param until: (str) formated date
1098

1099
        :returns: (dict)
1100
        """
1101
        return filter_in(
1102
            self.get_container_info(until=until),
1103
            'X-Container-Object-Meta')
1104

    
1105
    def set_container_meta(self, metapairs):
1106
        """
1107
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1108
        """
1109
        assert(type(metapairs) is dict)
1110
        r = self.container_post(update=True, metadata=metapairs)
1111
        return r.headers
1112

    
1113
    def del_container_meta(self, metakey):
1114
        """
1115
        :param metakey: (str) metadatum key
1116

1117
        :returns: (dict) response headers
1118
        """
1119
        r = self.container_post(update=True, metadata={metakey: ''})
1120
        return r.headers
1121

    
1122
    def set_container_limit(self, limit):
1123
        """
1124
        :param limit: (int)
1125
        """
1126
        r = self.container_post(update=True, quota=limit)
1127
        return r.headers
1128

    
1129
    def set_container_versioning(self, versioning):
1130
        """
1131
        :param versioning: (str)
1132
        """
1133
        r = self.container_post(update=True, versioning=versioning)
1134
        return r.headers
1135

    
1136
    def del_object(self, obj, until=None, delimiter=None):
1137
        """
1138
        :param obj: (str) remote object path
1139

1140
        :param until: (str) formated date
1141

1142
        :param delimiter: (str)
1143
        """
1144
        self._assert_container()
1145
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1146
        return r.headers
1147

    
1148
    def set_object_meta(self, obj, metapairs):
1149
        """
1150
        :param obj: (str) remote object path
1151

1152
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1153
        """
1154
        assert(type(metapairs) is dict)
1155
        r = self.object_post(obj, update=True, metadata=metapairs)
1156
        return r.headers
1157

    
1158
    def del_object_meta(self, obj, metakey):
1159
        """
1160
        :param obj: (str) remote object path
1161

1162
        :param metakey: (str) metadatum key
1163
        """
1164
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1165
        return r.headers
1166

    
1167
    def publish_object(self, obj):
1168
        """
1169
        :param obj: (str) remote object path
1170

1171
        :returns: (str) access url
1172
        """
1173
        self.object_post(obj, update=True, public=True)
1174
        info = self.get_object_info(obj)
1175
        pref, sep, rest = self.base_url.partition('//')
1176
        base = rest.split('/')[0]
1177
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1178

    
1179
    def unpublish_object(self, obj):
1180
        """
1181
        :param obj: (str) remote object path
1182
        """
1183
        r = self.object_post(obj, update=True, public=False)
1184
        return r.headers
1185

    
1186
    def get_object_info(self, obj, version=None):
1187
        """
1188
        :param obj: (str) remote object path
1189

1190
        :param version: (str)
1191

1192
        :returns: (dict)
1193
        """
1194
        try:
1195
            r = self.object_head(obj, version=version)
1196
            return r.headers
1197
        except ClientError as ce:
1198
            if ce.status == 404:
1199
                raise ClientError('Object %s not found' % obj, status=404)
1200
            raise
1201

    
1202
    def get_object_meta(self, obj, version=None):
1203
        """
1204
        :param obj: (str) remote object path
1205

1206
        :param version: (str)
1207

1208
        :returns: (dict)
1209
        """
1210
        return filter_in(
1211
            self.get_object_info(obj, version=version),
1212
            'X-Object-Meta')
1213

    
1214
    def get_object_sharing(self, obj):
1215
        """
1216
        :param obj: (str) remote object path
1217

1218
        :returns: (dict)
1219
        """
1220
        r = filter_in(
1221
            self.get_object_info(obj),
1222
            'X-Object-Sharing',
1223
            exactMatch=True)
1224
        reply = {}
1225
        if len(r) > 0:
1226
            perms = r['x-object-sharing'].split(';')
1227
            for perm in perms:
1228
                try:
1229
                    perm.index('=')
1230
                except ValueError:
1231
                    raise ClientError('Incorrect reply format')
1232
                (key, val) = perm.strip().split('=')
1233
                reply[key] = val
1234
        return reply
1235

    
1236
    def set_object_sharing(
1237
            self, obj,
1238
            read_permission=False, write_permission=False):
1239
        """Give read/write permisions to an object.
1240

1241
        :param obj: (str) remote object path
1242

1243
        :param read_permission: (list - bool) users and user groups that get
1244
            read permission for this object - False means all previous read
1245
            permissions will be removed
1246

1247
        :param write_permission: (list - bool) of users and user groups to get
1248
           write permission for this object - False means all previous write
1249
           permissions will be removed
1250

1251
        :returns: (dict) response headers
1252
        """
1253

    
1254
        perms = dict(read=read_permission or '', write=write_permission or '')
1255
        r = self.object_post(obj, update=True, permissions=perms)
1256
        return r.headers
1257

    
1258
    def del_object_sharing(self, obj):
1259
        """
1260
        :param obj: (str) remote object path
1261
        """
1262
        return self.set_object_sharing(obj)
1263

    
1264
    def append_object(self, obj, source_file, upload_cb=None):
1265
        """
1266
        :param obj: (str) remote object path
1267

1268
        :param source_file: open file descriptor
1269

1270
        :param upload_db: progress.bar for uploading
1271
        """
1272
        self._assert_container()
1273
        meta = self.get_container_info()
1274
        blocksize = int(meta['x-container-block-size'])
1275
        filesize = fstat(source_file.fileno()).st_size
1276
        nblocks = 1 + (filesize - 1) // blocksize
1277
        offset = 0
1278
        headers = {}
1279
        if upload_cb:
1280
            self.progress_bar_gen = upload_cb(nblocks)
1281
            self._cb_next()
1282
        flying = {}
1283
        self._init_thread_limit()
1284
        try:
1285
            for i in range(nblocks):
1286
                block = source_file.read(min(blocksize, filesize - offset))
1287
                offset += len(block)
1288

    
1289
                self._watch_thread_limit(flying.values())
1290
                unfinished = {}
1291
                flying[i] = SilentEvent(
1292
                    method=self.object_post,
1293
                    obj=obj,
1294
                    update=True,
1295
                    content_range='bytes */*',
1296
                    content_type='application/octet-stream',
1297
                    content_length=len(block),
1298
                    data=block)
1299
                flying[i].start()
1300

    
1301
                for key, thread in flying.items():
1302
                    if thread.isAlive():
1303
                        if i < nblocks:
1304
                            unfinished[key] = thread
1305
                            continue
1306
                        thread.join()
1307
                    if thread.exception:
1308
                        raise thread.exception
1309
                    headers[key] = thread.value.headers
1310
                    self._cb_next()
1311
                flying = unfinished
1312
        except KeyboardInterrupt:
1313
            sendlog.info('- - - wait for threads to finish')
1314
            for thread in activethreads():
1315
                thread.join()
1316
        finally:
1317
            from time import sleep
1318
            sleep(2 * len(activethreads()))
1319
        return headers.values()
1320

    
1321
    def truncate_object(self, obj, upto_bytes):
1322
        """
1323
        :param obj: (str) remote object path
1324

1325
        :param upto_bytes: max number of bytes to leave on file
1326

1327
        :returns: (dict) response headers
1328
        """
1329
        r = self.object_post(
1330
            obj,
1331
            update=True,
1332
            content_range='bytes 0-%s/*' % upto_bytes,
1333
            content_type='application/octet-stream',
1334
            object_bytes=upto_bytes,
1335
            source_object=path4url(self.container, obj))
1336
        return r.headers
1337

    
1338
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1339
        """Overwrite a part of an object from local source file
1340

1341
        :param obj: (str) remote object path
1342

1343
        :param start: (int) position in bytes to start overwriting from
1344

1345
        :param end: (int) position in bytes to stop overwriting at
1346

1347
        :param source_file: open file descriptor
1348

1349
        :param upload_db: progress.bar for uploading
1350
        """
1351

    
1352
        r = self.get_object_info(obj)
1353
        rf_size = int(r['content-length'])
1354
        if rf_size < int(start):
1355
            raise ClientError(
1356
                'Range start exceeds file size',
1357
                status=416)
1358
        elif rf_size < int(end):
1359
            raise ClientError(
1360
                'Range end exceeds file size',
1361
                status=416)
1362
        self._assert_container()
1363
        meta = self.get_container_info()
1364
        blocksize = int(meta['x-container-block-size'])
1365
        filesize = fstat(source_file.fileno()).st_size
1366
        datasize = int(end) - int(start) + 1
1367
        nblocks = 1 + (datasize - 1) // blocksize
1368
        offset = 0
1369
        if upload_cb:
1370
            self.progress_bar_gen = upload_cb(nblocks)
1371
            self._cb_next()
1372
        headers = []
1373
        for i in range(nblocks):
1374
            read_size = min(blocksize, filesize - offset, datasize - offset)
1375
            block = source_file.read(read_size)
1376
            r = self.object_post(
1377
                obj,
1378
                update=True,
1379
                content_type='application/octet-stream',
1380
                content_length=len(block),
1381
                content_range='bytes %s-%s/*' % (
1382
                    start + offset,
1383
                    start + offset + len(block) - 1),
1384
                data=block)
1385
            headers.append(dict(r.headers))
1386
            offset += len(block)
1387

    
1388
            self._cb_next
1389
        return headers
1390

    
1391
    def copy_object(
1392
            self, src_container, src_object, dst_container,
1393
            dst_object=None,
1394
            source_version=None,
1395
            source_account=None,
1396
            public=False,
1397
            content_type=None,
1398
            delimiter=None):
1399
        """
1400
        :param src_container: (str) source container
1401

1402
        :param src_object: (str) source object path
1403

1404
        :param dst_container: (str) destination container
1405

1406
        :param dst_object: (str) destination object path
1407

1408
        :param source_version: (str) source object version
1409

1410
        :param source_account: (str) account to copy from
1411

1412
        :param public: (bool)
1413

1414
        :param content_type: (str)
1415

1416
        :param delimiter: (str)
1417

1418
        :returns: (dict) response headers
1419
        """
1420
        self._assert_account()
1421
        self.container = dst_container
1422
        src_path = path4url(src_container, src_object)
1423
        r = self.object_put(
1424
            dst_object or src_object,
1425
            success=201,
1426
            copy_from=src_path,
1427
            content_length=0,
1428
            source_version=source_version,
1429
            source_account=source_account,
1430
            public=public,
1431
            content_type=content_type,
1432
            delimiter=delimiter)
1433
        return r.headers
1434

    
1435
    def move_object(
1436
            self, src_container, src_object, dst_container,
1437
            dst_object=False,
1438
            source_account=None,
1439
            source_version=None,
1440
            public=False,
1441
            content_type=None,
1442
            delimiter=None):
1443
        """
1444
        :param src_container: (str) source container
1445

1446
        :param src_object: (str) source object path
1447

1448
        :param dst_container: (str) destination container
1449

1450
        :param dst_object: (str) destination object path
1451

1452
        :param source_account: (str) account to move from
1453

1454
        :param source_version: (str) source object version
1455

1456
        :param public: (bool)
1457

1458
        :param content_type: (str)
1459

1460
        :param delimiter: (str)
1461

1462
        :returns: (dict) response headers
1463
        """
1464
        self._assert_account()
1465
        self.container = dst_container
1466
        dst_object = dst_object or src_object
1467
        src_path = path4url(src_container, src_object)
1468
        r = self.object_put(
1469
            dst_object,
1470
            success=201,
1471
            move_from=src_path,
1472
            content_length=0,
1473
            source_account=source_account,
1474
            source_version=source_version,
1475
            public=public,
1476
            content_type=content_type,
1477
            delimiter=delimiter)
1478
        return r.headers
1479

    
1480
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1481
        """Get accounts that share with self.account
1482

1483
        :param limit: (str)
1484

1485
        :param marker: (str)
1486

1487
        :returns: (dict)
1488
        """
1489
        self._assert_account()
1490

    
1491
        self.set_param('format', 'json')
1492
        self.set_param('limit', limit, iff=limit is not None)
1493
        self.set_param('marker', marker, iff=marker is not None)
1494

    
1495
        path = ''
1496
        success = kwargs.pop('success', (200, 204))
1497
        r = self.get(path, *args, success=success, **kwargs)
1498
        return r.json
1499

    
1500
    def get_object_versionlist(self, obj):
1501
        """
1502
        :param obj: (str) remote object path
1503

1504
        :returns: (list)
1505
        """
1506
        self._assert_container()
1507
        r = self.object_get(obj, format='json', version='list')
1508
        return r.json['versions']