Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ ecf0fc97

History | View | Annotate | Download (47.7 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        (rstart, rend) = a_range.split('-')
58
        (rstart, rend) = (int(rstart), int(rend))
59
        if rstart > end or rend < start:
60
            return (0, 0)
61
        if rstart > start:
62
            start = rstart
63
        if rend < end:
64
            end = rend
65
    return (start, end)
66

    
67

    
68
class PithosClient(PithosRestClient):
69
    """Synnefo Pithos+ API client"""
70

    
71
    def __init__(self, base_url, token, account=None, container=None):
72
        super(PithosClient, self).__init__(base_url, token, account, container)
73

    
74
    def create_container(
75
            self,
76
            container=None, sizelimit=None, versioning=None, metadata=None):
77
        """
78
        :param container: (str) if not given, self.container is used instead
79

80
        :param sizelimit: (int) container total size limit in bytes
81

82
        :param versioning: (str) can be auto or whatever supported by server
83

84
        :param metadata: (dict) Custom user-defined metadata of the form
85
            { 'name1': 'value1', 'name2': 'value2', ... }
86

87
        :returns: (dict) response headers
88
        """
89
        cnt_back_up = self.container
90
        try:
91
            self.container = container or cnt_back_up
92
            r = self.container_put(
93
                quota=sizelimit, versioning=versioning, metadata=metadata)
94
            return r.headers
95
        finally:
96
            self.container = cnt_back_up
97

    
98
    def purge_container(self, container=None):
99
        """Delete an empty container and destroy associated blocks
100
        """
101
        cnt_back_up = self.container
102
        try:
103
            self.container = container or cnt_back_up
104
            r = self.container_delete(until=unicode(time()))
105
        finally:
106
            self.container = cnt_back_up
107
        return r.headers
108

    
109
    def upload_object_unchunked(
110
            self, obj, f,
111
            withHashFile=False,
112
            size=None,
113
            etag=None,
114
            content_encoding=None,
115
            content_disposition=None,
116
            content_type=None,
117
            sharing=None,
118
            public=None):
119
        """
120
        :param obj: (str) remote object path
121

122
        :param f: open file descriptor
123

124
        :param withHashFile: (bool)
125

126
        :param size: (int) size of data to upload
127

128
        :param etag: (str)
129

130
        :param content_encoding: (str)
131

132
        :param content_disposition: (str)
133

134
        :param content_type: (str)
135

136
        :param sharing: {'read':[user and/or grp names],
137
            'write':[usr and/or grp names]}
138

139
        :param public: (bool)
140

141
        :returns: (dict) created object metadata
142
        """
143
        self._assert_container()
144

    
145
        if withHashFile:
146
            data = f.read()
147
            try:
148
                import json
149
                data = json.dumps(json.loads(data))
150
            except ValueError:
151
                raise ClientError('"%s" is not json-formated' % f.name, 1)
152
            except SyntaxError:
153
                msg = '"%s" is not a valid hashmap file' % f.name
154
                raise ClientError(msg, 1)
155
            f = StringIO(data)
156
        else:
157
            data = f.read(size) if size else f.read()
158
        r = self.object_put(
159
            obj,
160
            data=data,
161
            etag=etag,
162
            content_encoding=content_encoding,
163
            content_disposition=content_disposition,
164
            content_type=content_type,
165
            permissions=sharing,
166
            public=public,
167
            success=201)
168
        return r.headers
169

    
170
    def create_object_by_manifestation(
171
            self, obj,
172
            etag=None,
173
            content_encoding=None,
174
            content_disposition=None,
175
            content_type=None,
176
            sharing=None,
177
            public=None):
178
        """
179
        :param obj: (str) remote object path
180

181
        :param etag: (str)
182

183
        :param content_encoding: (str)
184

185
        :param content_disposition: (str)
186

187
        :param content_type: (str)
188

189
        :param sharing: {'read':[user and/or grp names],
190
            'write':[usr and/or grp names]}
191

192
        :param public: (bool)
193

194
        :returns: (dict) created object metadata
195
        """
196
        self._assert_container()
197
        r = self.object_put(
198
            obj,
199
            content_length=0,
200
            etag=etag,
201
            content_encoding=content_encoding,
202
            content_disposition=content_disposition,
203
            content_type=content_type,
204
            permissions=sharing,
205
            public=public,
206
            manifest='%s/%s' % (self.container, obj))
207
        return r.headers
208

    
209
    # upload_* auxiliary methods
210
    def _put_block_async(self, data, hash):
211
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
212
        event.start()
213
        return event
214

    
215
    def _put_block(self, data, hash):
216
        r = self.container_post(
217
            update=True,
218
            content_type='application/octet-stream',
219
            content_length=len(data),
220
            data=data,
221
            format='json')
222
        assert r.json[0] == hash, 'Local hash does not match server'
223

    
224
    def _get_file_block_info(self, fileobj, size=None, cache=None):
225
        """
226
        :param fileobj: (file descriptor) source
227

228
        :param size: (int) size of data to upload from source
229

230
        :param cache: (dict) if provided, cache container info response to
231
        avoid redundant calls
232
        """
233
        if isinstance(cache, dict):
234
            try:
235
                meta = cache[self.container]
236
            except KeyError:
237
                meta = self.get_container_info()
238
                cache[self.container] = meta
239
        else:
240
            meta = self.get_container_info()
241
        blocksize = int(meta['x-container-block-size'])
242
        blockhash = meta['x-container-block-hash']
243
        size = size if size is not None else fstat(fileobj.fileno()).st_size
244
        nblocks = 1 + (size - 1) // blocksize
245
        return (blocksize, blockhash, size, nblocks)
246

    
247
    def _create_object_or_get_missing_hashes(
248
            self, obj, json,
249
            size=None,
250
            format='json',
251
            hashmap=True,
252
            content_type=None,
253
            if_etag_match=None,
254
            if_etag_not_match=None,
255
            content_encoding=None,
256
            content_disposition=None,
257
            permissions=None,
258
            public=None,
259
            success=(201, 409)):
260
        r = self.object_put(
261
            obj,
262
            format='json',
263
            hashmap=True,
264
            content_type=content_type,
265
            json=json,
266
            if_etag_match=if_etag_match,
267
            if_etag_not_match=if_etag_not_match,
268
            content_encoding=content_encoding,
269
            content_disposition=content_disposition,
270
            permissions=permissions,
271
            public=public,
272
            success=success)
273
        return (None if r.status_code == 201 else r.json), r.headers
274

    
275
    def _calculate_blocks_for_upload(
276
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
277
            hash_cb=None):
278
        offset = 0
279
        if hash_cb:
280
            hash_gen = hash_cb(nblocks)
281
            hash_gen.next()
282

    
283
        for i in range(nblocks):
284
            block = fileobj.read(min(blocksize, size - offset))
285
            bytes = len(block)
286
            hash = _pithos_hash(block, blockhash)
287
            hashes.append(hash)
288
            hmap[hash] = (offset, bytes)
289
            offset += bytes
290
            if hash_cb:
291
                hash_gen.next()
292
        msg = 'Failed to calculate uploaded blocks:'
293
        ' Offset and object size do not match'
294
        assert offset == size, msg
295

    
296
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
297
        """upload missing blocks asynchronously"""
298

    
299
        self._init_thread_limit()
300

    
301
        flying = []
302
        failures = []
303
        for hash in missing:
304
            offset, bytes = hmap[hash]
305
            fileobj.seek(offset)
306
            data = fileobj.read(bytes)
307
            r = self._put_block_async(data, hash)
308
            flying.append(r)
309
            unfinished = self._watch_thread_limit(flying)
310
            for thread in set(flying).difference(unfinished):
311
                if thread.exception:
312
                    failures.append(thread)
313
                    if isinstance(
314
                            thread.exception,
315
                            ClientError) and thread.exception.status == 502:
316
                        self.POOLSIZE = self._thread_limit
317
                elif thread.isAlive():
318
                    flying.append(thread)
319
                elif upload_gen:
320
                    try:
321
                        upload_gen.next()
322
                    except:
323
                        pass
324
            flying = unfinished
325

    
326
        for thread in flying:
327
            thread.join()
328
            if thread.exception:
329
                failures.append(thread)
330
            elif upload_gen:
331
                try:
332
                    upload_gen.next()
333
                except:
334
                    pass
335

    
336
        return [failure.kwargs['hash'] for failure in failures]
337

    
338
    def upload_object(
339
            self, obj, f,
340
            size=None,
341
            hash_cb=None,
342
            upload_cb=None,
343
            etag=None,
344
            if_etag_match=None,
345
            if_not_exist=None,
346
            content_encoding=None,
347
            content_disposition=None,
348
            content_type=None,
349
            sharing=None,
350
            public=None,
351
            container_info_cache=None):
352
        """Upload an object using multiple connections (threads)
353

354
        :param obj: (str) remote object path
355

356
        :param f: open file descriptor (rb)
357

358
        :param hash_cb: optional progress.bar object for calculating hashes
359

360
        :param upload_cb: optional progress.bar object for uploading
361

362
        :param etag: (str)
363

364
        :param if_etag_match: (str) Push that value to if-match header at file
365
            creation
366

367
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
368
            it does not exist remotely, otherwise the operation will fail.
369
            Involves the case of an object with the same path is created while
370
            the object is being uploaded.
371

372
        :param content_encoding: (str)
373

374
        :param content_disposition: (str)
375

376
        :param content_type: (str)
377

378
        :param sharing: {'read':[user and/or grp names],
379
            'write':[usr and/or grp names]}
380

381
        :param public: (bool)
382

383
        :param container_info_cache: (dict) if given, avoid redundant calls to
384
            server for container info (block size and hash information)
385
        """
386
        self._assert_container()
387

    
388
        block_info = (
389
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
390
                f, size, container_info_cache)
391
        (hashes, hmap, offset) = ([], {}, 0)
392
        if not content_type:
393
            content_type = 'application/octet-stream'
394

    
395
        self._calculate_blocks_for_upload(
396
            *block_info,
397
            hashes=hashes,
398
            hmap=hmap,
399
            fileobj=f,
400
            hash_cb=hash_cb)
401

    
402
        hashmap = dict(bytes=size, hashes=hashes)
403
        missing, obj_headers = self._create_object_or_get_missing_hashes(
404
            obj, hashmap,
405
            content_type=content_type,
406
            size=size,
407
            if_etag_match=if_etag_match,
408
            if_etag_not_match='*' if if_not_exist else None,
409
            content_encoding=content_encoding,
410
            content_disposition=content_disposition,
411
            permissions=sharing,
412
            public=public)
413

    
414
        if missing is None:
415
            return obj_headers
416

    
417
        if upload_cb:
418
            upload_gen = upload_cb(len(missing))
419
            for i in range(len(missing), len(hashmap['hashes']) + 1):
420
                try:
421
                    upload_gen.next()
422
                except:
423
                    upload_gen = None
424
        else:
425
            upload_gen = None
426

    
427
        retries = 7
428
        try:
429
            while retries:
430
                sendlog.info('%s blocks missing' % len(missing))
431
                num_of_blocks = len(missing)
432
                missing = self._upload_missing_blocks(
433
                    missing,
434
                    hmap,
435
                    f,
436
                    upload_gen)
437
                if missing:
438
                    if num_of_blocks == len(missing):
439
                        retries -= 1
440
                    else:
441
                        num_of_blocks = len(missing)
442
                else:
443
                    break
444
            if missing:
445
                try:
446
                    details = ['%s' % thread.exception for thread in missing]
447
                except Exception:
448
                    details = []
449
                raise ClientError(
450
                    '%s blocks failed to upload' % len(missing),
451
                    details=details)
452
        except KeyboardInterrupt:
453
            sendlog.info('- - - wait for threads to finish')
454
            for thread in activethreads():
455
                thread.join()
456
            raise
457

    
458
        r = self.object_put(
459
            obj,
460
            format='json',
461
            hashmap=True,
462
            content_type=content_type,
463
            content_encoding=content_encoding,
464
            if_etag_match=if_etag_match,
465
            if_etag_not_match='*' if if_not_exist else None,
466
            etag=etag,
467
            json=hashmap,
468
            permissions=sharing,
469
            public=public,
470
            success=201)
471
        return r.headers
472

    
473
    def upload_from_string(
474
            self, obj, input_str,
475
            hash_cb=None,
476
            upload_cb=None,
477
            etag=None,
478
            if_etag_match=None,
479
            if_not_exist=None,
480
            content_encoding=None,
481
            content_disposition=None,
482
            content_type=None,
483
            sharing=None,
484
            public=None,
485
            container_info_cache=None):
486
        """Upload an object using multiple connections (threads)
487

488
        :param obj: (str) remote object path
489

490
        :param input_str: (str) upload content
491

492
        :param hash_cb: optional progress.bar object for calculating hashes
493

494
        :param upload_cb: optional progress.bar object for uploading
495

496
        :param etag: (str)
497

498
        :param if_etag_match: (str) Push that value to if-match header at file
499
            creation
500

501
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
502
            it does not exist remotely, otherwise the operation will fail.
503
            Involves the case of an object with the same path is created while
504
            the object is being uploaded.
505

506
        :param content_encoding: (str)
507

508
        :param content_disposition: (str)
509

510
        :param content_type: (str)
511

512
        :param sharing: {'read':[user and/or grp names],
513
            'write':[usr and/or grp names]}
514

515
        :param public: (bool)
516

517
        :param container_info_cache: (dict) if given, avoid redundant calls to
518
            server for container info (block size and hash information)
519
        """
520
        self._assert_container()
521

    
522
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
523
                fileobj=None, size=len(input_str), cache=container_info_cache)
524
        (hashes, hmap, offset) = ([], {}, 0)
525
        if not content_type:
526
            content_type = 'application/octet-stream'
527

    
528
        hashes = []
529
        hmap = {}
530
        for blockid in range(nblocks):
531
            start = blockid * blocksize
532
            block = input_str[start: (start + blocksize)]
533
            hashes.append(_pithos_hash(block, blockhash))
534
            hmap[hashes[blockid]] = (start, block)
535

    
536
        hashmap = dict(bytes=size, hashes=hashes)
537
        missing, obj_headers = self._create_object_or_get_missing_hashes(
538
            obj, hashmap,
539
            content_type=content_type,
540
            size=size,
541
            if_etag_match=if_etag_match,
542
            if_etag_not_match='*' if if_not_exist else None,
543
            content_encoding=content_encoding,
544
            content_disposition=content_disposition,
545
            permissions=sharing,
546
            public=public)
547
        if missing is None:
548
            return obj_headers
549
        num_of_missing = len(missing)
550

    
551
        if upload_cb:
552
            self.progress_bar_gen = upload_cb(nblocks)
553
            for i in range(nblocks + 1 - num_of_missing):
554
                self._cb_next()
555

    
556
        tries = 7
557
        old_failures = 0
558
        try:
559
            while tries and missing:
560
                flying = []
561
                failures = []
562
                for hash in missing:
563
                    offset, block = hmap[hash]
564
                    bird = self._put_block_async(block, hash)
565
                    flying.append(bird)
566
                    unfinished = self._watch_thread_limit(flying)
567
                    for thread in set(flying).difference(unfinished):
568
                        if thread.exception:
569
                            failures.append(thread.kwargs['hash'])
570
                        if thread.isAlive():
571
                            flying.append(thread)
572
                        else:
573
                            self._cb_next()
574
                    flying = unfinished
575
                for thread in flying:
576
                    thread.join()
577
                    if thread.exception:
578
                        failures.append(thread.kwargs['hash'])
579
                    self._cb_next()
580
                missing = failures
581
                if missing and len(missing) == old_failures:
582
                    tries -= 1
583
                old_failures = len(missing)
584
            if missing:
585
                raise ClientError(
586
                    '%s blocks failed to upload' % len(missing),
587
                    details=['%s' % thread.exception for thread in missing])
588
        except KeyboardInterrupt:
589
            sendlog.info('- - - wait for threads to finish')
590
            for thread in activethreads():
591
                thread.join()
592
            raise
593

    
594
        r = self.object_put(
595
            obj,
596
            format='json',
597
            hashmap=True,
598
            content_type=content_type,
599
            if_etag_match=if_etag_match,
600
            if_etag_not_match='*' if if_not_exist else None,
601
            etag=etag,
602
            json=hashmap,
603
            permissions=sharing,
604
            public=public,
605
            success=201)
606
        return r.headers
607

    
608
    # download_* auxiliary methods
609
    def _get_remote_blocks_info(self, obj, **restargs):
610
        #retrieve object hashmap
611
        myrange = restargs.pop('data_range', None)
612
        hashmap = self.get_object_hashmap(obj, **restargs)
613
        restargs['data_range'] = myrange
614
        blocksize = int(hashmap['block_size'])
615
        blockhash = hashmap['block_hash']
616
        total_size = hashmap['bytes']
617
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
618
        map_dict = {}
619
        for i, h in enumerate(hashmap['hashes']):
620
            #  map_dict[h] = i   CHAGE
621
            if h in map_dict:
622
                map_dict[h].append(i)
623
            else:
624
                map_dict[h] = [i]
625
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
626

    
627
    def _dump_blocks_sync(
628
            self, obj, remote_hashes, blocksize, total_size, dst, range,
629
            **args):
630
        for blockid, blockhash in enumerate(remote_hashes):
631
            if blockhash:
632
                start = blocksize * blockid
633
                is_last = start + blocksize > total_size
634
                end = (total_size - 1) if is_last else (start + blocksize - 1)
635
                (start, end) = _range_up(start, end, range)
636
                args['data_range'] = 'bytes=%s-%s' % (start, end)
637
                r = self.object_get(obj, success=(200, 206), **args)
638
                self._cb_next()
639
                dst.write(r.content)
640
                dst.flush()
641

    
642
    def _get_block_async(self, obj, **args):
643
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
644
        event.start()
645
        return event
646

    
647
    def _hash_from_file(self, fp, start, size, blockhash):
648
        fp.seek(start)
649
        block = fp.read(size)
650
        h = newhashlib(blockhash)
651
        h.update(block.strip('\x00'))
652
        return hexlify(h.digest())
653

    
654
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
655
        """write the results of a greenleted rest call to a file
656

657
        :param offset: the offset of the file up to blocksize
658
        - e.g. if the range is 10-100, all blocks will be written to
659
        normal_position - 10
660
        """
661
        for key, g in flying.items():
662
            if g.isAlive():
663
                continue
664
            if g.exception:
665
                raise g.exception
666
            block = g.value.content
667
            for block_start in blockids[key]:
668
                local_file.seek(block_start + offset)
669
                local_file.write(block)
670
                self._cb_next()
671
            flying.pop(key)
672
            blockids.pop(key)
673
        local_file.flush()
674

    
675
    def _dump_blocks_async(
676
            self, obj, remote_hashes, blocksize, total_size, local_file,
677
            blockhash=None, resume=False, filerange=None, **restargs):
678
        file_size = fstat(local_file.fileno()).st_size if resume else 0
679
        flying = dict()
680
        blockid_dict = dict()
681
        offset = 0
682
        if filerange is not None:
683
            rstart = int(filerange.split('-')[0])
684
            offset = rstart if blocksize > rstart else rstart % blocksize
685

    
686
        self._init_thread_limit()
687
        for block_hash, blockids in remote_hashes.items():
688
            blockids = [blk * blocksize for blk in blockids]
689
            unsaved = [blk for blk in blockids if not (
690
                blk < file_size and block_hash == self._hash_from_file(
691
                        local_file, blk, blocksize, blockhash))]
692
            self._cb_next(len(blockids) - len(unsaved))
693
            if unsaved:
694
                key = unsaved[0]
695
                self._watch_thread_limit(flying.values())
696
                self._thread2file(
697
                    flying, blockid_dict, local_file, offset,
698
                    **restargs)
699
                end = total_size - 1 if (
700
                    key + blocksize > total_size) else key + blocksize - 1
701
                start, end = _range_up(key, end, filerange)
702
                if start == end:
703
                    self._cb_next()
704
                    continue
705
                restargs['async_headers'] = {
706
                    'Range': 'bytes=%s-%s' % (start, end)}
707
                flying[key] = self._get_block_async(obj, **restargs)
708
                blockid_dict[key] = unsaved
709

    
710
        for thread in flying.values():
711
            thread.join()
712
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
713

    
714
    def download_object(
715
            self, obj, dst,
716
            download_cb=None,
717
            version=None,
718
            resume=False,
719
            range_str=None,
720
            if_match=None,
721
            if_none_match=None,
722
            if_modified_since=None,
723
            if_unmodified_since=None):
724
        """Download an object (multiple connections, random blocks)
725

726
        :param obj: (str) remote object path
727

728
        :param dst: open file descriptor (wb+)
729

730
        :param download_cb: optional progress.bar object for downloading
731

732
        :param version: (str) file version
733

734
        :param resume: (bool) if set, preserve already downloaded file parts
735

736
        :param range_str: (str) from, to are file positions (int) in bytes
737

738
        :param if_match: (str)
739

740
        :param if_none_match: (str)
741

742
        :param if_modified_since: (str) formated date
743

744
        :param if_unmodified_since: (str) formated date"""
745
        restargs = dict(
746
            version=version,
747
            data_range=None if range_str is None else 'bytes=%s' % range_str,
748
            if_match=if_match,
749
            if_none_match=if_none_match,
750
            if_modified_since=if_modified_since,
751
            if_unmodified_since=if_unmodified_since)
752

    
753
        (
754
            blocksize,
755
            blockhash,
756
            total_size,
757
            hash_list,
758
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
759
        assert total_size >= 0
760

    
761
        if download_cb:
762
            self.progress_bar_gen = download_cb(len(hash_list))
763
            self._cb_next()
764

    
765
        if dst.isatty():
766
            self._dump_blocks_sync(
767
                obj,
768
                hash_list,
769
                blocksize,
770
                total_size,
771
                dst,
772
                range_str,
773
                **restargs)
774
        else:
775
            self._dump_blocks_async(
776
                obj,
777
                remote_hashes,
778
                blocksize,
779
                total_size,
780
                dst,
781
                blockhash,
782
                resume,
783
                range_str,
784
                **restargs)
785
            if not range_str:
786
                dst.truncate(total_size)
787

    
788
        self._complete_cb()
789

    
790
    def download_to_string(
791
            self, obj,
792
            download_cb=None,
793
            version=None,
794
            range_str=None,
795
            if_match=None,
796
            if_none_match=None,
797
            if_modified_since=None,
798
            if_unmodified_since=None):
799
        """Download an object to a string (multiple connections). This method
800
        uses threads for http requests, but stores all content in memory.
801

802
        :param obj: (str) remote object path
803

804
        :param download_cb: optional progress.bar object for downloading
805

806
        :param version: (str) file version
807

808
        :param range_str: (str) from, to are file positions (int) in bytes
809

810
        :param if_match: (str)
811

812
        :param if_none_match: (str)
813

814
        :param if_modified_since: (str) formated date
815

816
        :param if_unmodified_since: (str) formated date
817

818
        :returns: (str) the whole object contents
819
        """
820
        restargs = dict(
821
            version=version,
822
            data_range=None if range_str is None else 'bytes=%s' % range_str,
823
            if_match=if_match,
824
            if_none_match=if_none_match,
825
            if_modified_since=if_modified_since,
826
            if_unmodified_since=if_unmodified_since)
827

    
828
        (
829
            blocksize,
830
            blockhash,
831
            total_size,
832
            hash_list,
833
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
834
        assert total_size >= 0
835

    
836
        if download_cb:
837
            self.progress_bar_gen = download_cb(len(hash_list))
838
            self._cb_next()
839

    
840
        num_of_blocks = len(remote_hashes)
841
        ret = [''] * num_of_blocks
842
        self._init_thread_limit()
843
        flying = dict()
844
        try:
845
            for blockid, blockhash in enumerate(remote_hashes):
846
                start = blocksize * blockid
847
                is_last = start + blocksize > total_size
848
                end = (total_size - 1) if is_last else (start + blocksize - 1)
849
                (start, end) = _range_up(start, end, range_str)
850
                if start < end:
851
                    self._watch_thread_limit(flying.values())
852
                    flying[blockid] = self._get_block_async(obj, **restargs)
853
                for runid, thread in flying.items():
854
                    if (blockid + 1) == num_of_blocks:
855
                        thread.join()
856
                    elif thread.isAlive():
857
                        continue
858
                    if thread.exception:
859
                        raise thread.exception
860
                    ret[runid] = thread.value.content
861
                    self._cb_next()
862
                    flying.pop(runid)
863
            return ''.join(ret)
864
        except KeyboardInterrupt:
865
            sendlog.info('- - - wait for threads to finish')
866
            for thread in activethreads():
867
                thread.join()
868

    
869
    #Command Progress Bar method
870
    def _cb_next(self, step=1):
871
        if hasattr(self, 'progress_bar_gen'):
872
            try:
873
                for i in xrange(step):
874
                    self.progress_bar_gen.next()
875
            except:
876
                pass
877

    
878
    def _complete_cb(self):
879
        while True:
880
            try:
881
                self.progress_bar_gen.next()
882
            except:
883
                break
884

    
885
    def get_object_hashmap(
886
            self, obj,
887
            version=None,
888
            if_match=None,
889
            if_none_match=None,
890
            if_modified_since=None,
891
            if_unmodified_since=None,
892
            data_range=None):
893
        """
894
        :param obj: (str) remote object path
895

896
        :param if_match: (str)
897

898
        :param if_none_match: (str)
899

900
        :param if_modified_since: (str) formated date
901

902
        :param if_unmodified_since: (str) formated date
903

904
        :param data_range: (str) from-to where from and to are integers
905
            denoting file positions in bytes
906

907
        :returns: (list)
908
        """
909
        try:
910
            r = self.object_get(
911
                obj,
912
                hashmap=True,
913
                version=version,
914
                if_etag_match=if_match,
915
                if_etag_not_match=if_none_match,
916
                if_modified_since=if_modified_since,
917
                if_unmodified_since=if_unmodified_since,
918
                data_range=data_range)
919
        except ClientError as err:
920
            if err.status == 304 or err.status == 412:
921
                return {}
922
            raise
923
        return r.json
924

    
925
    def set_account_group(self, group, usernames):
926
        """
927
        :param group: (str)
928

929
        :param usernames: (list)
930
        """
931
        r = self.account_post(update=True, groups={group: usernames})
932
        return r
933

    
934
    def del_account_group(self, group):
935
        """
936
        :param group: (str)
937
        """
938
        self.account_post(update=True, groups={group: []})
939

    
940
    def get_account_info(self, until=None):
941
        """
942
        :param until: (str) formated date
943

944
        :returns: (dict)
945
        """
946
        r = self.account_head(until=until)
947
        if r.status_code == 401:
948
            raise ClientError("No authorization", status=401)
949
        return r.headers
950

    
951
    def get_account_quota(self):
952
        """
953
        :returns: (dict)
954
        """
955
        return filter_in(
956
            self.get_account_info(),
957
            'X-Account-Policy-Quota',
958
            exactMatch=True)
959

    
960
    #def get_account_versioning(self):
961
    #    """
962
    #    :returns: (dict)
963
    #    """
964
    #    return filter_in(
965
    #        self.get_account_info(),
966
    #        'X-Account-Policy-Versioning',
967
    #        exactMatch=True)
968

    
969
    def get_account_meta(self, until=None):
970
        """
971
        :param until: (str) formated date
972

973
        :returns: (dict)
974
        """
975
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
976

    
977
    def get_account_group(self):
978
        """
979
        :returns: (dict)
980
        """
981
        return filter_in(self.get_account_info(), 'X-Account-Group-')
982

    
983
    def set_account_meta(self, metapairs):
984
        """
985
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
986
        """
987
        assert(type(metapairs) is dict)
988
        r = self.account_post(update=True, metadata=metapairs)
989
        return r.headers
990

    
991
    def del_account_meta(self, metakey):
992
        """
993
        :param metakey: (str) metadatum key
994
        """
995
        r = self.account_post(update=True, metadata={metakey: ''})
996
        return r.headers
997

    
998
    #def set_account_quota(self, quota):
999
    #    """
1000
    #    :param quota: (int)
1001
    #    """
1002
    #    self.account_post(update=True, quota=quota)
1003

    
1004
    #def set_account_versioning(self, versioning):
1005
    #    """
1006
    #    :param versioning: (str)
1007
    #    """
1008
    #    r = self.account_post(update=True, versioning=versioning)
1009
    #    return r.headers
1010

    
1011
    def list_containers(self):
1012
        """
1013
        :returns: (dict)
1014
        """
1015
        r = self.account_get()
1016
        return r.json
1017

    
1018
    def del_container(self, until=None, delimiter=None):
1019
        """
1020
        :param until: (str) formated date
1021

1022
        :param delimiter: (str) with / empty container
1023

1024
        :raises ClientError: 404 Container does not exist
1025

1026
        :raises ClientError: 409 Container is not empty
1027
        """
1028
        self._assert_container()
1029
        r = self.container_delete(
1030
            until=until,
1031
            delimiter=delimiter,
1032
            success=(204, 404, 409))
1033
        if r.status_code == 404:
1034
            raise ClientError(
1035
                'Container "%s" does not exist' % self.container,
1036
                r.status_code)
1037
        elif r.status_code == 409:
1038
            raise ClientError(
1039
                'Container "%s" is not empty' % self.container,
1040
                r.status_code)
1041
        return r.headers
1042

    
1043
    def get_container_versioning(self, container=None):
1044
        """
1045
        :param container: (str)
1046

1047
        :returns: (dict)
1048
        """
1049
        cnt_back_up = self.container
1050
        try:
1051
            self.container = container or cnt_back_up
1052
            return filter_in(
1053
                self.get_container_info(),
1054
                'X-Container-Policy-Versioning')
1055
        finally:
1056
            self.container = cnt_back_up
1057

    
1058
    def get_container_limit(self, container=None):
1059
        """
1060
        :param container: (str)
1061

1062
        :returns: (dict)
1063
        """
1064
        cnt_back_up = self.container
1065
        try:
1066
            self.container = container or cnt_back_up
1067
            return filter_in(
1068
                self.get_container_info(),
1069
                'X-Container-Policy-Quota')
1070
        finally:
1071
            self.container = cnt_back_up
1072

    
1073
    def get_container_info(self, until=None):
1074
        """
1075
        :param until: (str) formated date
1076

1077
        :returns: (dict)
1078

1079
        :raises ClientError: 404 Container not found
1080
        """
1081
        try:
1082
            r = self.container_head(until=until)
1083
        except ClientError as err:
1084
            err.details.append('for container %s' % self.container)
1085
            raise err
1086
        return r.headers
1087

    
1088
    def get_container_meta(self, until=None):
1089
        """
1090
        :param until: (str) formated date
1091

1092
        :returns: (dict)
1093
        """
1094
        return filter_in(
1095
            self.get_container_info(until=until),
1096
            'X-Container-Meta')
1097

    
1098
    def get_container_object_meta(self, until=None):
1099
        """
1100
        :param until: (str) formated date
1101

1102
        :returns: (dict)
1103
        """
1104
        return filter_in(
1105
            self.get_container_info(until=until),
1106
            'X-Container-Object-Meta')
1107

    
1108
    def set_container_meta(self, metapairs):
1109
        """
1110
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1111
        """
1112
        assert(type(metapairs) is dict)
1113
        r = self.container_post(update=True, metadata=metapairs)
1114
        return r.headers
1115

    
1116
    def del_container_meta(self, metakey):
1117
        """
1118
        :param metakey: (str) metadatum key
1119

1120
        :returns: (dict) response headers
1121
        """
1122
        r = self.container_post(update=True, metadata={metakey: ''})
1123
        return r.headers
1124

    
1125
    def set_container_limit(self, limit):
1126
        """
1127
        :param limit: (int)
1128
        """
1129
        r = self.container_post(update=True, quota=limit)
1130
        return r.headers
1131

    
1132
    def set_container_versioning(self, versioning):
1133
        """
1134
        :param versioning: (str)
1135
        """
1136
        r = self.container_post(update=True, versioning=versioning)
1137
        return r.headers
1138

    
1139
    def del_object(self, obj, until=None, delimiter=None):
1140
        """
1141
        :param obj: (str) remote object path
1142

1143
        :param until: (str) formated date
1144

1145
        :param delimiter: (str)
1146
        """
1147
        self._assert_container()
1148
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1149
        return r.headers
1150

    
1151
    def set_object_meta(self, obj, metapairs):
1152
        """
1153
        :param obj: (str) remote object path
1154

1155
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1156
        """
1157
        assert(type(metapairs) is dict)
1158
        r = self.object_post(obj, update=True, metadata=metapairs)
1159
        return r.headers
1160

    
1161
    def del_object_meta(self, obj, metakey):
1162
        """
1163
        :param obj: (str) remote object path
1164

1165
        :param metakey: (str) metadatum key
1166
        """
1167
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1168
        return r.headers
1169

    
1170
    def publish_object(self, obj):
1171
        """
1172
        :param obj: (str) remote object path
1173

1174
        :returns: (str) access url
1175
        """
1176
        self.object_post(obj, update=True, public=True)
1177
        info = self.get_object_info(obj)
1178
        return info['x-object-public']
1179
        pref, sep, rest = self.base_url.partition('//')
1180
        base = rest.split('/')[0]
1181
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1182

    
1183
    def unpublish_object(self, obj):
1184
        """
1185
        :param obj: (str) remote object path
1186
        """
1187
        r = self.object_post(obj, update=True, public=False)
1188
        return r.headers
1189

    
1190
    def get_object_info(self, obj, version=None):
1191
        """
1192
        :param obj: (str) remote object path
1193

1194
        :param version: (str)
1195

1196
        :returns: (dict)
1197
        """
1198
        try:
1199
            r = self.object_head(obj, version=version)
1200
            return r.headers
1201
        except ClientError as ce:
1202
            if ce.status == 404:
1203
                raise ClientError('Object %s not found' % obj, status=404)
1204
            raise
1205

    
1206
    def get_object_meta(self, obj, version=None):
1207
        """
1208
        :param obj: (str) remote object path
1209

1210
        :param version: (str)
1211

1212
        :returns: (dict)
1213
        """
1214
        return filter_in(
1215
            self.get_object_info(obj, version=version),
1216
            'X-Object-Meta')
1217

    
1218
    def get_object_sharing(self, obj):
1219
        """
1220
        :param obj: (str) remote object path
1221

1222
        :returns: (dict)
1223
        """
1224
        r = filter_in(
1225
            self.get_object_info(obj),
1226
            'X-Object-Sharing',
1227
            exactMatch=True)
1228
        reply = {}
1229
        if len(r) > 0:
1230
            perms = r['x-object-sharing'].split(';')
1231
            for perm in perms:
1232
                try:
1233
                    perm.index('=')
1234
                except ValueError:
1235
                    raise ClientError('Incorrect reply format')
1236
                (key, val) = perm.strip().split('=')
1237
                reply[key] = val
1238
        return reply
1239

    
1240
    def set_object_sharing(
1241
            self, obj,
1242
            read_permission=False, write_permission=False):
1243
        """Give read/write permisions to an object.
1244

1245
        :param obj: (str) remote object path
1246

1247
        :param read_permission: (list - bool) users and user groups that get
1248
            read permission for this object - False means all previous read
1249
            permissions will be removed
1250

1251
        :param write_permission: (list - bool) of users and user groups to get
1252
           write permission for this object - False means all previous write
1253
           permissions will be removed
1254

1255
        :returns: (dict) response headers
1256
        """
1257

    
1258
        perms = dict(read=read_permission or '', write=write_permission or '')
1259
        r = self.object_post(obj, update=True, permissions=perms)
1260
        return r.headers
1261

    
1262
    def del_object_sharing(self, obj):
1263
        """
1264
        :param obj: (str) remote object path
1265
        """
1266
        return self.set_object_sharing(obj)
1267

    
1268
    def append_object(self, obj, source_file, upload_cb=None):
1269
        """
1270
        :param obj: (str) remote object path
1271

1272
        :param source_file: open file descriptor
1273

1274
        :param upload_db: progress.bar for uploading
1275
        """
1276
        self._assert_container()
1277
        meta = self.get_container_info()
1278
        blocksize = int(meta['x-container-block-size'])
1279
        filesize = fstat(source_file.fileno()).st_size
1280
        nblocks = 1 + (filesize - 1) // blocksize
1281
        offset = 0
1282
        headers = {}
1283
        if upload_cb:
1284
            self.progress_bar_gen = upload_cb(nblocks)
1285
            self._cb_next()
1286
        flying = {}
1287
        self._init_thread_limit()
1288
        try:
1289
            for i in range(nblocks):
1290
                block = source_file.read(min(blocksize, filesize - offset))
1291
                offset += len(block)
1292

    
1293
                self._watch_thread_limit(flying.values())
1294
                unfinished = {}
1295
                flying[i] = SilentEvent(
1296
                    method=self.object_post,
1297
                    obj=obj,
1298
                    update=True,
1299
                    content_range='bytes */*',
1300
                    content_type='application/octet-stream',
1301
                    content_length=len(block),
1302
                    data=block)
1303
                flying[i].start()
1304

    
1305
                for key, thread in flying.items():
1306
                    if thread.isAlive():
1307
                        if i < nblocks:
1308
                            unfinished[key] = thread
1309
                            continue
1310
                        thread.join()
1311
                    if thread.exception:
1312
                        raise thread.exception
1313
                    headers[key] = thread.value.headers
1314
                    self._cb_next()
1315
                flying = unfinished
1316
        except KeyboardInterrupt:
1317
            sendlog.info('- - - wait for threads to finish')
1318
            for thread in activethreads():
1319
                thread.join()
1320
        finally:
1321
            from time import sleep
1322
            sleep(2 * len(activethreads()))
1323
        return headers.values()
1324

    
1325
    def truncate_object(self, obj, upto_bytes):
1326
        """
1327
        :param obj: (str) remote object path
1328

1329
        :param upto_bytes: max number of bytes to leave on file
1330

1331
        :returns: (dict) response headers
1332
        """
1333
        r = self.object_post(
1334
            obj,
1335
            update=True,
1336
            content_range='bytes 0-%s/*' % upto_bytes,
1337
            content_type='application/octet-stream',
1338
            object_bytes=upto_bytes,
1339
            source_object=path4url(self.container, obj))
1340
        return r.headers
1341

    
1342
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1343
        """Overwrite a part of an object from local source file
1344

1345
        :param obj: (str) remote object path
1346

1347
        :param start: (int) position in bytes to start overwriting from
1348

1349
        :param end: (int) position in bytes to stop overwriting at
1350

1351
        :param source_file: open file descriptor
1352

1353
        :param upload_db: progress.bar for uploading
1354
        """
1355

    
1356
        r = self.get_object_info(obj)
1357
        rf_size = int(r['content-length'])
1358
        if rf_size < int(start):
1359
            raise ClientError(
1360
                'Range start exceeds file size',
1361
                status=416)
1362
        elif rf_size < int(end):
1363
            raise ClientError(
1364
                'Range end exceeds file size',
1365
                status=416)
1366
        self._assert_container()
1367
        meta = self.get_container_info()
1368
        blocksize = int(meta['x-container-block-size'])
1369
        filesize = fstat(source_file.fileno()).st_size
1370
        datasize = int(end) - int(start) + 1
1371
        nblocks = 1 + (datasize - 1) // blocksize
1372
        offset = 0
1373
        if upload_cb:
1374
            self.progress_bar_gen = upload_cb(nblocks)
1375
            self._cb_next()
1376
        headers = []
1377
        for i in range(nblocks):
1378
            read_size = min(blocksize, filesize - offset, datasize - offset)
1379
            block = source_file.read(read_size)
1380
            r = self.object_post(
1381
                obj,
1382
                update=True,
1383
                content_type='application/octet-stream',
1384
                content_length=len(block),
1385
                content_range='bytes %s-%s/*' % (
1386
                    start + offset,
1387
                    start + offset + len(block) - 1),
1388
                data=block)
1389
            headers.append(dict(r.headers))
1390
            offset += len(block)
1391

    
1392
            self._cb_next
1393
        return headers
1394

    
1395
    def copy_object(
1396
            self, src_container, src_object, dst_container,
1397
            dst_object=None,
1398
            source_version=None,
1399
            source_account=None,
1400
            public=False,
1401
            content_type=None,
1402
            delimiter=None):
1403
        """
1404
        :param src_container: (str) source container
1405

1406
        :param src_object: (str) source object path
1407

1408
        :param dst_container: (str) destination container
1409

1410
        :param dst_object: (str) destination object path
1411

1412
        :param source_version: (str) source object version
1413

1414
        :param source_account: (str) account to copy from
1415

1416
        :param public: (bool)
1417

1418
        :param content_type: (str)
1419

1420
        :param delimiter: (str)
1421

1422
        :returns: (dict) response headers
1423
        """
1424
        self._assert_account()
1425
        self.container = dst_container
1426
        src_path = path4url(src_container, src_object)
1427
        r = self.object_put(
1428
            dst_object or src_object,
1429
            success=201,
1430
            copy_from=src_path,
1431
            content_length=0,
1432
            source_version=source_version,
1433
            source_account=source_account,
1434
            public=public,
1435
            content_type=content_type,
1436
            delimiter=delimiter)
1437
        return r.headers
1438

    
1439
    def move_object(
1440
            self, src_container, src_object, dst_container,
1441
            dst_object=False,
1442
            source_account=None,
1443
            source_version=None,
1444
            public=False,
1445
            content_type=None,
1446
            delimiter=None):
1447
        """
1448
        :param src_container: (str) source container
1449

1450
        :param src_object: (str) source object path
1451

1452
        :param dst_container: (str) destination container
1453

1454
        :param dst_object: (str) destination object path
1455

1456
        :param source_account: (str) account to move from
1457

1458
        :param source_version: (str) source object version
1459

1460
        :param public: (bool)
1461

1462
        :param content_type: (str)
1463

1464
        :param delimiter: (str)
1465

1466
        :returns: (dict) response headers
1467
        """
1468
        self._assert_account()
1469
        self.container = dst_container
1470
        dst_object = dst_object or src_object
1471
        src_path = path4url(src_container, src_object)
1472
        r = self.object_put(
1473
            dst_object,
1474
            success=201,
1475
            move_from=src_path,
1476
            content_length=0,
1477
            source_account=source_account,
1478
            source_version=source_version,
1479
            public=public,
1480
            content_type=content_type,
1481
            delimiter=delimiter)
1482
        return r.headers
1483

    
1484
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1485
        """Get accounts that share with self.account
1486

1487
        :param limit: (str)
1488

1489
        :param marker: (str)
1490

1491
        :returns: (dict)
1492
        """
1493
        self._assert_account()
1494

    
1495
        self.set_param('format', 'json')
1496
        self.set_param('limit', limit, iff=limit is not None)
1497
        self.set_param('marker', marker, iff=marker is not None)
1498

    
1499
        path = ''
1500
        success = kwargs.pop('success', (200, 204))
1501
        r = self.get(path, *args, success=success, **kwargs)
1502
        return r.json
1503

    
1504
    def get_object_versionlist(self, obj):
1505
        """
1506
        :param obj: (str) remote object path
1507

1508
        :returns: (list)
1509
        """
1510
        self._assert_container()
1511
        r = self.object_get(obj, format='json', version='list')
1512
        return r.json['versions']