Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 6e50fed4

History | View | Annotate | Download (48.1 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, a_range):
56
    if a_range:
57
        rstart, sep, rend = a_range.partition('-')
58
        if rstart:
59
            if sep:
60
                rstart, rend = int(rstart), int(rend)
61
            else:
62
                rstart, rend = 0, int(rstart)
63
        elif sep:
64
            return (0, - int(rend))
65
        else:
66
            return (0, 0)
67
        if rstart > end or rend < start:
68
            return (0, 0)
69
        if rstart > start:
70
            start = rstart
71
        if rend < end:
72
            end = rend
73
    return (start, end)
74

    
75

    
76
class PithosClient(PithosRestClient):
77
    """Synnefo Pithos+ API client"""
78

    
79
    def __init__(self, base_url, token, account=None, container=None):
80
        super(PithosClient, self).__init__(base_url, token, account, container)
81

    
82
    def create_container(
83
            self,
84
            container=None, sizelimit=None, versioning=None, metadata=None):
85
        """
86
        :param container: (str) if not given, self.container is used instead
87

88
        :param sizelimit: (int) container total size limit in bytes
89

90
        :param versioning: (str) can be auto or whatever supported by server
91

92
        :param metadata: (dict) Custom user-defined metadata of the form
93
            { 'name1': 'value1', 'name2': 'value2', ... }
94

95
        :returns: (dict) response headers
96
        """
97
        cnt_back_up = self.container
98
        try:
99
            self.container = container or cnt_back_up
100
            r = self.container_put(
101
                quota=sizelimit, versioning=versioning, metadata=metadata)
102
            return r.headers
103
        finally:
104
            self.container = cnt_back_up
105

    
106
    def purge_container(self, container=None):
107
        """Delete an empty container and destroy associated blocks
108
        """
109
        cnt_back_up = self.container
110
        try:
111
            self.container = container or cnt_back_up
112
            r = self.container_delete(until=unicode(time()))
113
        finally:
114
            self.container = cnt_back_up
115
        return r.headers
116

    
117
    def upload_object_unchunked(
118
            self, obj, f,
119
            withHashFile=False,
120
            size=None,
121
            etag=None,
122
            content_encoding=None,
123
            content_disposition=None,
124
            content_type=None,
125
            sharing=None,
126
            public=None):
127
        """
128
        :param obj: (str) remote object path
129

130
        :param f: open file descriptor
131

132
        :param withHashFile: (bool)
133

134
        :param size: (int) size of data to upload
135

136
        :param etag: (str)
137

138
        :param content_encoding: (str)
139

140
        :param content_disposition: (str)
141

142
        :param content_type: (str)
143

144
        :param sharing: {'read':[user and/or grp names],
145
            'write':[usr and/or grp names]}
146

147
        :param public: (bool)
148

149
        :returns: (dict) created object metadata
150
        """
151
        self._assert_container()
152

    
153
        if withHashFile:
154
            data = f.read()
155
            try:
156
                import json
157
                data = json.dumps(json.loads(data))
158
            except ValueError:
159
                raise ClientError('"%s" is not json-formated' % f.name, 1)
160
            except SyntaxError:
161
                msg = '"%s" is not a valid hashmap file' % f.name
162
                raise ClientError(msg, 1)
163
            f = StringIO(data)
164
        else:
165
            data = f.read(size) if size else f.read()
166
        r = self.object_put(
167
            obj,
168
            data=data,
169
            etag=etag,
170
            content_encoding=content_encoding,
171
            content_disposition=content_disposition,
172
            content_type=content_type,
173
            permissions=sharing,
174
            public=public,
175
            success=201)
176
        return r.headers
177

    
178
    def create_object_by_manifestation(
179
            self, obj,
180
            etag=None,
181
            content_encoding=None,
182
            content_disposition=None,
183
            content_type=None,
184
            sharing=None,
185
            public=None):
186
        """
187
        :param obj: (str) remote object path
188

189
        :param etag: (str)
190

191
        :param content_encoding: (str)
192

193
        :param content_disposition: (str)
194

195
        :param content_type: (str)
196

197
        :param sharing: {'read':[user and/or grp names],
198
            'write':[usr and/or grp names]}
199

200
        :param public: (bool)
201

202
        :returns: (dict) created object metadata
203
        """
204
        self._assert_container()
205
        r = self.object_put(
206
            obj,
207
            content_length=0,
208
            etag=etag,
209
            content_encoding=content_encoding,
210
            content_disposition=content_disposition,
211
            content_type=content_type,
212
            permissions=sharing,
213
            public=public,
214
            manifest='%s/%s' % (self.container, obj))
215
        return r.headers
216

    
217
    # upload_* auxiliary methods
218
    def _put_block_async(self, data, hash):
219
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
220
        event.start()
221
        return event
222

    
223
    def _put_block(self, data, hash):
224
        r = self.container_post(
225
            update=True,
226
            content_type='application/octet-stream',
227
            content_length=len(data),
228
            data=data,
229
            format='json')
230
        assert r.json[0] == hash, 'Local hash does not match server'
231

    
232
    def _get_file_block_info(self, fileobj, size=None, cache=None):
233
        """
234
        :param fileobj: (file descriptor) source
235

236
        :param size: (int) size of data to upload from source
237

238
        :param cache: (dict) if provided, cache container info response to
239
        avoid redundant calls
240
        """
241
        if isinstance(cache, dict):
242
            try:
243
                meta = cache[self.container]
244
            except KeyError:
245
                meta = self.get_container_info()
246
                cache[self.container] = meta
247
        else:
248
            meta = self.get_container_info()
249
        blocksize = int(meta['x-container-block-size'])
250
        blockhash = meta['x-container-block-hash']
251
        size = size if size is not None else fstat(fileobj.fileno()).st_size
252
        nblocks = 1 + (size - 1) // blocksize
253
        return (blocksize, blockhash, size, nblocks)
254

    
255
    def _create_object_or_get_missing_hashes(
256
            self, obj, json,
257
            size=None,
258
            format='json',
259
            hashmap=True,
260
            content_type=None,
261
            if_etag_match=None,
262
            if_etag_not_match=None,
263
            content_encoding=None,
264
            content_disposition=None,
265
            permissions=None,
266
            public=None,
267
            success=(201, 409)):
268
        r = self.object_put(
269
            obj,
270
            format='json',
271
            hashmap=True,
272
            content_type=content_type,
273
            json=json,
274
            if_etag_match=if_etag_match,
275
            if_etag_not_match=if_etag_not_match,
276
            content_encoding=content_encoding,
277
            content_disposition=content_disposition,
278
            permissions=permissions,
279
            public=public,
280
            success=success)
281
        return (None if r.status_code == 201 else r.json), r.headers
282

    
283
    def _calculate_blocks_for_upload(
284
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
285
            hash_cb=None):
286
        offset = 0
287
        if hash_cb:
288
            hash_gen = hash_cb(nblocks)
289
            hash_gen.next()
290

    
291
        for i in range(nblocks):
292
            block = fileobj.read(min(blocksize, size - offset))
293
            bytes = len(block)
294
            hash = _pithos_hash(block, blockhash)
295
            hashes.append(hash)
296
            hmap[hash] = (offset, bytes)
297
            offset += bytes
298
            if hash_cb:
299
                hash_gen.next()
300
        msg = 'Failed to calculate uploaded blocks:'
301
        ' Offset and object size do not match'
302
        assert offset == size, msg
303

    
304
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
305
        """upload missing blocks asynchronously"""
306

    
307
        self._init_thread_limit()
308

    
309
        flying = []
310
        failures = []
311
        for hash in missing:
312
            offset, bytes = hmap[hash]
313
            fileobj.seek(offset)
314
            data = fileobj.read(bytes)
315
            r = self._put_block_async(data, hash)
316
            flying.append(r)
317
            unfinished = self._watch_thread_limit(flying)
318
            for thread in set(flying).difference(unfinished):
319
                if thread.exception:
320
                    failures.append(thread)
321
                    if isinstance(
322
                            thread.exception,
323
                            ClientError) and thread.exception.status == 502:
324
                        self.POOLSIZE = self._thread_limit
325
                elif thread.isAlive():
326
                    flying.append(thread)
327
                elif upload_gen:
328
                    try:
329
                        upload_gen.next()
330
                    except:
331
                        pass
332
            flying = unfinished
333

    
334
        for thread in flying:
335
            thread.join()
336
            if thread.exception:
337
                failures.append(thread)
338
            elif upload_gen:
339
                try:
340
                    upload_gen.next()
341
                except:
342
                    pass
343

    
344
        return [failure.kwargs['hash'] for failure in failures]
345

    
346
    def upload_object(
347
            self, obj, f,
348
            size=None,
349
            hash_cb=None,
350
            upload_cb=None,
351
            etag=None,
352
            if_etag_match=None,
353
            if_not_exist=None,
354
            content_encoding=None,
355
            content_disposition=None,
356
            content_type=None,
357
            sharing=None,
358
            public=None,
359
            container_info_cache=None):
360
        """Upload an object using multiple connections (threads)
361

362
        :param obj: (str) remote object path
363

364
        :param f: open file descriptor (rb)
365

366
        :param hash_cb: optional progress.bar object for calculating hashes
367

368
        :param upload_cb: optional progress.bar object for uploading
369

370
        :param etag: (str)
371

372
        :param if_etag_match: (str) Push that value to if-match header at file
373
            creation
374

375
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
376
            it does not exist remotely, otherwise the operation will fail.
377
            Involves the case of an object with the same path is created while
378
            the object is being uploaded.
379

380
        :param content_encoding: (str)
381

382
        :param content_disposition: (str)
383

384
        :param content_type: (str)
385

386
        :param sharing: {'read':[user and/or grp names],
387
            'write':[usr and/or grp names]}
388

389
        :param public: (bool)
390

391
        :param container_info_cache: (dict) if given, avoid redundant calls to
392
            server for container info (block size and hash information)
393
        """
394
        self._assert_container()
395

    
396
        block_info = (
397
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
398
                f, size, container_info_cache)
399
        (hashes, hmap, offset) = ([], {}, 0)
400
        if not content_type:
401
            content_type = 'application/octet-stream'
402

    
403
        self._calculate_blocks_for_upload(
404
            *block_info,
405
            hashes=hashes,
406
            hmap=hmap,
407
            fileobj=f,
408
            hash_cb=hash_cb)
409

    
410
        hashmap = dict(bytes=size, hashes=hashes)
411
        missing, obj_headers = self._create_object_or_get_missing_hashes(
412
            obj, hashmap,
413
            content_type=content_type,
414
            size=size,
415
            if_etag_match=if_etag_match,
416
            if_etag_not_match='*' if if_not_exist else None,
417
            content_encoding=content_encoding,
418
            content_disposition=content_disposition,
419
            permissions=sharing,
420
            public=public)
421

    
422
        if missing is None:
423
            return obj_headers
424

    
425
        if upload_cb:
426
            upload_gen = upload_cb(len(missing))
427
            for i in range(len(missing), len(hashmap['hashes']) + 1):
428
                try:
429
                    upload_gen.next()
430
                except:
431
                    upload_gen = None
432
        else:
433
            upload_gen = None
434

    
435
        retries = 7
436
        try:
437
            while retries:
438
                sendlog.info('%s blocks missing' % len(missing))
439
                num_of_blocks = len(missing)
440
                missing = self._upload_missing_blocks(
441
                    missing,
442
                    hmap,
443
                    f,
444
                    upload_gen)
445
                if missing:
446
                    if num_of_blocks == len(missing):
447
                        retries -= 1
448
                    else:
449
                        num_of_blocks = len(missing)
450
                else:
451
                    break
452
            if missing:
453
                try:
454
                    details = ['%s' % thread.exception for thread in missing]
455
                except Exception:
456
                    details = ['Also, failed to read thread exceptions']
457
                raise ClientError(
458
                    '%s blocks failed to upload' % len(missing),
459
                    details=details)
460
        except KeyboardInterrupt:
461
            sendlog.info('- - - wait for threads to finish')
462
            for thread in activethreads():
463
                thread.join()
464
            raise
465

    
466
        r = self.object_put(
467
            obj,
468
            format='json',
469
            hashmap=True,
470
            content_type=content_type,
471
            content_encoding=content_encoding,
472
            if_etag_match=if_etag_match,
473
            if_etag_not_match='*' if if_not_exist else None,
474
            etag=etag,
475
            json=hashmap,
476
            permissions=sharing,
477
            public=public,
478
            success=201)
479
        return r.headers
480

    
481
    def upload_from_string(
482
            self, obj, input_str,
483
            hash_cb=None,
484
            upload_cb=None,
485
            etag=None,
486
            if_etag_match=None,
487
            if_not_exist=None,
488
            content_encoding=None,
489
            content_disposition=None,
490
            content_type=None,
491
            sharing=None,
492
            public=None,
493
            container_info_cache=None):
494
        """Upload an object using multiple connections (threads)
495

496
        :param obj: (str) remote object path
497

498
        :param input_str: (str) upload content
499

500
        :param hash_cb: optional progress.bar object for calculating hashes
501

502
        :param upload_cb: optional progress.bar object for uploading
503

504
        :param etag: (str)
505

506
        :param if_etag_match: (str) Push that value to if-match header at file
507
            creation
508

509
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
510
            it does not exist remotely, otherwise the operation will fail.
511
            Involves the case of an object with the same path is created while
512
            the object is being uploaded.
513

514
        :param content_encoding: (str)
515

516
        :param content_disposition: (str)
517

518
        :param content_type: (str)
519

520
        :param sharing: {'read':[user and/or grp names],
521
            'write':[usr and/or grp names]}
522

523
        :param public: (bool)
524

525
        :param container_info_cache: (dict) if given, avoid redundant calls to
526
            server for container info (block size and hash information)
527
        """
528
        self._assert_container()
529

    
530
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
531
                fileobj=None, size=len(input_str), cache=container_info_cache)
532
        (hashes, hmap, offset) = ([], {}, 0)
533
        if not content_type:
534
            content_type = 'application/octet-stream'
535

    
536
        hashes = []
537
        hmap = {}
538
        for blockid in range(nblocks):
539
            start = blockid * blocksize
540
            block = input_str[start: (start + blocksize)]
541
            hashes.append(_pithos_hash(block, blockhash))
542
            hmap[hashes[blockid]] = (start, block)
543

    
544
        hashmap = dict(bytes=size, hashes=hashes)
545
        missing, obj_headers = self._create_object_or_get_missing_hashes(
546
            obj, hashmap,
547
            content_type=content_type,
548
            size=size,
549
            if_etag_match=if_etag_match,
550
            if_etag_not_match='*' if if_not_exist else None,
551
            content_encoding=content_encoding,
552
            content_disposition=content_disposition,
553
            permissions=sharing,
554
            public=public)
555
        if missing is None:
556
            return obj_headers
557
        num_of_missing = len(missing)
558

    
559
        if upload_cb:
560
            self.progress_bar_gen = upload_cb(nblocks)
561
            for i in range(nblocks + 1 - num_of_missing):
562
                self._cb_next()
563

    
564
        tries = 7
565
        old_failures = 0
566
        try:
567
            while tries and missing:
568
                flying = []
569
                failures = []
570
                for hash in missing:
571
                    offset, block = hmap[hash]
572
                    bird = self._put_block_async(block, hash)
573
                    flying.append(bird)
574
                    unfinished = self._watch_thread_limit(flying)
575
                    for thread in set(flying).difference(unfinished):
576
                        if thread.exception:
577
                            failures.append(thread.kwargs['hash'])
578
                        if thread.isAlive():
579
                            flying.append(thread)
580
                        else:
581
                            self._cb_next()
582
                    flying = unfinished
583
                for thread in flying:
584
                    thread.join()
585
                    if thread.exception:
586
                        failures.append(thread.kwargs['hash'])
587
                    self._cb_next()
588
                missing = failures
589
                if missing and len(missing) == old_failures:
590
                    tries -= 1
591
                old_failures = len(missing)
592
            if missing:
593
                raise ClientError(
594
                    '%s blocks failed to upload' % len(missing),
595
                    details=['%s' % thread.exception for thread in missing])
596
        except KeyboardInterrupt:
597
            sendlog.info('- - - wait for threads to finish')
598
            for thread in activethreads():
599
                thread.join()
600
            raise
601

    
602
        r = self.object_put(
603
            obj,
604
            format='json',
605
            hashmap=True,
606
            content_type=content_type,
607
            content_encoding=content_encoding,
608
            if_etag_match=if_etag_match,
609
            if_etag_not_match='*' if if_not_exist else None,
610
            etag=etag,
611
            json=hashmap,
612
            permissions=sharing,
613
            public=public,
614
            success=201)
615
        return r.headers
616

    
617
    # download_* auxiliary methods
618
    def _get_remote_blocks_info(self, obj, **restargs):
619
        #retrieve object hashmap
620
        myrange = restargs.pop('data_range', None)
621
        hashmap = self.get_object_hashmap(obj, **restargs)
622
        restargs['data_range'] = myrange
623
        blocksize = int(hashmap['block_size'])
624
        blockhash = hashmap['block_hash']
625
        total_size = hashmap['bytes']
626
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
627
        map_dict = {}
628
        for i, h in enumerate(hashmap['hashes']):
629
            #  map_dict[h] = i   CHAGE
630
            if h in map_dict:
631
                map_dict[h].append(i)
632
            else:
633
                map_dict[h] = [i]
634
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
635

    
636
    def _dump_blocks_sync(
637
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
638
            **args):
639
        for blockid, blockhash in enumerate(remote_hashes):
640
            if blockhash:
641
                start = blocksize * blockid
642
                is_last = start + blocksize > total_size
643
                end = (total_size - 1) if is_last else (start + blocksize - 1)
644
                (start, end) = _range_up(start, end, crange)
645
                args['data_range'] = 'bytes=%s-%s' % (
646
                    (start, end) if end >= 0 else ('', - end))
647
                r = self.object_get(obj, success=(200, 206), **args)
648
                self._cb_next()
649
                dst.write(r.content)
650
                dst.flush()
651

    
652
    def _get_block_async(self, obj, **args):
653
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
654
        event.start()
655
        return event
656

    
657
    def _hash_from_file(self, fp, start, size, blockhash):
658
        fp.seek(start)
659
        block = fp.read(size)
660
        h = newhashlib(blockhash)
661
        h.update(block.strip('\x00'))
662
        return hexlify(h.digest())
663

    
664
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
665
        """write the results of a greenleted rest call to a file
666

667
        :param offset: the offset of the file up to blocksize
668
        - e.g. if the range is 10-100, all blocks will be written to
669
        normal_position - 10
670
        """
671
        for key, g in flying.items():
672
            if g.isAlive():
673
                continue
674
            if g.exception:
675
                raise g.exception
676
            block = g.value.content
677
            for block_start in blockids[key]:
678
                local_file.seek(block_start + offset)
679
                local_file.write(block)
680
                self._cb_next()
681
            flying.pop(key)
682
            blockids.pop(key)
683
        local_file.flush()
684

    
685
    def _dump_blocks_async(
686
            self, obj, remote_hashes, blocksize, total_size, local_file,
687
            blockhash=None, resume=False, filerange=None, **restargs):
688
        file_size = fstat(local_file.fileno()).st_size if resume else 0
689
        flying = dict()
690
        blockid_dict = dict()
691
        offset = 0
692
        if filerange is not None:
693
            rstart = int(filerange.split('-')[0])
694
            offset = rstart if blocksize > rstart else rstart % blocksize
695

    
696
        self._init_thread_limit()
697
        for block_hash, blockids in remote_hashes.items():
698
            blockids = [blk * blocksize for blk in blockids]
699
            unsaved = [blk for blk in blockids if not (
700
                blk < file_size and block_hash == self._hash_from_file(
701
                        local_file, blk, blocksize, blockhash))]
702
            self._cb_next(len(blockids) - len(unsaved))
703
            if unsaved:
704
                key = unsaved[0]
705
                self._watch_thread_limit(flying.values())
706
                self._thread2file(
707
                    flying, blockid_dict, local_file, offset,
708
                    **restargs)
709
                end = total_size - 1 if (
710
                    key + blocksize > total_size) else key + blocksize - 1
711
                start, end = _range_up(key, end, filerange)
712
                if start == end:
713
                    self._cb_next()
714
                    continue
715
                restargs['async_headers'] = {
716
                    'Range': 'bytes=%s-%s' % (
717
                        (start, end) if end >= 0 else ('', - end))}
718
                flying[key] = self._get_block_async(obj, **restargs)
719
                blockid_dict[key] = unsaved
720

    
721
        for thread in flying.values():
722
            thread.join()
723
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
724

    
725
    def download_object(
726
            self, obj, dst,
727
            download_cb=None,
728
            version=None,
729
            resume=False,
730
            range_str=None,
731
            if_match=None,
732
            if_none_match=None,
733
            if_modified_since=None,
734
            if_unmodified_since=None):
735
        """Download an object (multiple connections, random blocks)
736

737
        :param obj: (str) remote object path
738

739
        :param dst: open file descriptor (wb+)
740

741
        :param download_cb: optional progress.bar object for downloading
742

743
        :param version: (str) file version
744

745
        :param resume: (bool) if set, preserve already downloaded file parts
746

747
        :param range_str: (str) from, to are file positions (int) in bytes
748

749
        :param if_match: (str)
750

751
        :param if_none_match: (str)
752

753
        :param if_modified_since: (str) formated date
754

755
        :param if_unmodified_since: (str) formated date"""
756
        restargs = dict(
757
            version=version,
758
            data_range=None if range_str is None else 'bytes=%s' % range_str,
759
            if_match=if_match,
760
            if_none_match=if_none_match,
761
            if_modified_since=if_modified_since,
762
            if_unmodified_since=if_unmodified_since)
763

    
764
        (
765
            blocksize,
766
            blockhash,
767
            total_size,
768
            hash_list,
769
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
770
        assert total_size >= 0
771

    
772
        if download_cb:
773
            self.progress_bar_gen = download_cb(len(hash_list))
774
            self._cb_next()
775

    
776
        if dst.isatty():
777
            self._dump_blocks_sync(
778
                obj,
779
                hash_list,
780
                blocksize,
781
                total_size,
782
                dst,
783
                range_str,
784
                **restargs)
785
        else:
786
            self._dump_blocks_async(
787
                obj,
788
                remote_hashes,
789
                blocksize,
790
                total_size,
791
                dst,
792
                blockhash,
793
                resume,
794
                range_str,
795
                **restargs)
796
            if not range_str:
797
                dst.truncate(total_size)
798

    
799
        self._complete_cb()
800

    
801
    def download_to_string(
802
            self, obj,
803
            download_cb=None,
804
            version=None,
805
            range_str=None,
806
            if_match=None,
807
            if_none_match=None,
808
            if_modified_since=None,
809
            if_unmodified_since=None):
810
        """Download an object to a string (multiple connections). This method
811
        uses threads for http requests, but stores all content in memory.
812

813
        :param obj: (str) remote object path
814

815
        :param download_cb: optional progress.bar object for downloading
816

817
        :param version: (str) file version
818

819
        :param range_str: (str) from, to are file positions (int) in bytes
820

821
        :param if_match: (str)
822

823
        :param if_none_match: (str)
824

825
        :param if_modified_since: (str) formated date
826

827
        :param if_unmodified_since: (str) formated date
828

829
        :returns: (str) the whole object contents
830
        """
831
        restargs = dict(
832
            version=version,
833
            data_range=None if range_str is None else 'bytes=%s' % range_str,
834
            if_match=if_match,
835
            if_none_match=if_none_match,
836
            if_modified_since=if_modified_since,
837
            if_unmodified_since=if_unmodified_since)
838

    
839
        (
840
            blocksize,
841
            blockhash,
842
            total_size,
843
            hash_list,
844
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
845
        assert total_size >= 0
846

    
847
        if download_cb:
848
            self.progress_bar_gen = download_cb(len(hash_list))
849
            self._cb_next()
850

    
851
        num_of_blocks = len(remote_hashes)
852
        ret = [''] * num_of_blocks
853
        self._init_thread_limit()
854
        flying = dict()
855
        try:
856
            for blockid, blockhash in enumerate(remote_hashes):
857
                start = blocksize * blockid
858
                is_last = start + blocksize > total_size
859
                end = (total_size - 1) if is_last else (start + blocksize - 1)
860
                (start, end) = _range_up(start, end, range_str)
861
                if start < end or end < 0:
862
                    self._watch_thread_limit(flying.values())
863
                    flying[blockid] = self._get_block_async(obj, **restargs)
864
                for runid, thread in flying.items():
865
                    if (blockid + 1) == num_of_blocks:
866
                        thread.join()
867
                    elif thread.isAlive():
868
                        continue
869
                    if thread.exception:
870
                        raise thread.exception
871
                    ret[runid] = thread.value.content
872
                    self._cb_next()
873
                    flying.pop(runid)
874
            return ''.join(ret)
875
        except KeyboardInterrupt:
876
            sendlog.info('- - - wait for threads to finish')
877
            for thread in activethreads():
878
                thread.join()
879

    
880
    #Command Progress Bar method
881
    def _cb_next(self, step=1):
882
        if hasattr(self, 'progress_bar_gen'):
883
            try:
884
                for i in xrange(step):
885
                    self.progress_bar_gen.next()
886
            except:
887
                pass
888

    
889
    def _complete_cb(self):
890
        while True:
891
            try:
892
                self.progress_bar_gen.next()
893
            except:
894
                break
895

    
896
    def get_object_hashmap(
897
            self, obj,
898
            version=None,
899
            if_match=None,
900
            if_none_match=None,
901
            if_modified_since=None,
902
            if_unmodified_since=None,
903
            data_range=None):
904
        """
905
        :param obj: (str) remote object path
906

907
        :param if_match: (str)
908

909
        :param if_none_match: (str)
910

911
        :param if_modified_since: (str) formated date
912

913
        :param if_unmodified_since: (str) formated date
914

915
        :param data_range: (str) from-to where from and to are integers
916
            denoting file positions in bytes
917

918
        :returns: (list)
919
        """
920
        try:
921
            r = self.object_get(
922
                obj,
923
                hashmap=True,
924
                version=version,
925
                if_etag_match=if_match,
926
                if_etag_not_match=if_none_match,
927
                if_modified_since=if_modified_since,
928
                if_unmodified_since=if_unmodified_since,
929
                data_range=data_range)
930
        except ClientError as err:
931
            if err.status == 304 or err.status == 412:
932
                return {}
933
            raise
934
        return r.json
935

    
936
    def set_account_group(self, group, usernames):
937
        """
938
        :param group: (str)
939

940
        :param usernames: (list)
941
        """
942
        r = self.account_post(update=True, groups={group: usernames})
943
        return r
944

    
945
    def del_account_group(self, group):
946
        """
947
        :param group: (str)
948
        """
949
        self.account_post(update=True, groups={group: []})
950

    
951
    def get_account_info(self, until=None):
952
        """
953
        :param until: (str) formated date
954

955
        :returns: (dict)
956
        """
957
        r = self.account_head(until=until)
958
        if r.status_code == 401:
959
            raise ClientError("No authorization", status=401)
960
        return r.headers
961

    
962
    def get_account_quota(self):
963
        """
964
        :returns: (dict)
965
        """
966
        return filter_in(
967
            self.get_account_info(),
968
            'X-Account-Policy-Quota',
969
            exactMatch=True)
970

    
971
    #def get_account_versioning(self):
972
    #    """
973
    #    :returns: (dict)
974
    #    """
975
    #    return filter_in(
976
    #        self.get_account_info(),
977
    #        'X-Account-Policy-Versioning',
978
    #        exactMatch=True)
979

    
980
    def get_account_meta(self, until=None):
981
        """
982
        :param until: (str) formated date
983

984
        :returns: (dict)
985
        """
986
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
987

    
988
    def get_account_group(self):
989
        """
990
        :returns: (dict)
991
        """
992
        return filter_in(self.get_account_info(), 'X-Account-Group-')
993

    
994
    def set_account_meta(self, metapairs):
995
        """
996
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
997
        """
998
        assert(type(metapairs) is dict)
999
        r = self.account_post(update=True, metadata=metapairs)
1000
        return r.headers
1001

    
1002
    def del_account_meta(self, metakey):
1003
        """
1004
        :param metakey: (str) metadatum key
1005
        """
1006
        r = self.account_post(update=True, metadata={metakey: ''})
1007
        return r.headers
1008

    
1009
    #def set_account_quota(self, quota):
1010
    #    """
1011
    #    :param quota: (int)
1012
    #    """
1013
    #    self.account_post(update=True, quota=quota)
1014

    
1015
    #def set_account_versioning(self, versioning):
1016
    #    """
1017
    #    :param versioning: (str)
1018
    #    """
1019
    #    r = self.account_post(update=True, versioning=versioning)
1020
    #    return r.headers
1021

    
1022
    def list_containers(self):
1023
        """
1024
        :returns: (dict)
1025
        """
1026
        r = self.account_get()
1027
        return r.json
1028

    
1029
    def del_container(self, until=None, delimiter=None):
1030
        """
1031
        :param until: (str) formated date
1032

1033
        :param delimiter: (str) with / empty container
1034

1035
        :raises ClientError: 404 Container does not exist
1036

1037
        :raises ClientError: 409 Container is not empty
1038
        """
1039
        self._assert_container()
1040
        r = self.container_delete(
1041
            until=until,
1042
            delimiter=delimiter,
1043
            success=(204, 404, 409))
1044
        if r.status_code == 404:
1045
            raise ClientError(
1046
                'Container "%s" does not exist' % self.container,
1047
                r.status_code)
1048
        elif r.status_code == 409:
1049
            raise ClientError(
1050
                'Container "%s" is not empty' % self.container,
1051
                r.status_code)
1052
        return r.headers
1053

    
1054
    def get_container_versioning(self, container=None):
1055
        """
1056
        :param container: (str)
1057

1058
        :returns: (dict)
1059
        """
1060
        cnt_back_up = self.container
1061
        try:
1062
            self.container = container or cnt_back_up
1063
            return filter_in(
1064
                self.get_container_info(),
1065
                'X-Container-Policy-Versioning')
1066
        finally:
1067
            self.container = cnt_back_up
1068

    
1069
    def get_container_limit(self, container=None):
1070
        """
1071
        :param container: (str)
1072

1073
        :returns: (dict)
1074
        """
1075
        cnt_back_up = self.container
1076
        try:
1077
            self.container = container or cnt_back_up
1078
            return filter_in(
1079
                self.get_container_info(),
1080
                'X-Container-Policy-Quota')
1081
        finally:
1082
            self.container = cnt_back_up
1083

    
1084
    def get_container_info(self, until=None):
1085
        """
1086
        :param until: (str) formated date
1087

1088
        :returns: (dict)
1089

1090
        :raises ClientError: 404 Container not found
1091
        """
1092
        try:
1093
            r = self.container_head(until=until)
1094
        except ClientError as err:
1095
            err.details.append('for container %s' % self.container)
1096
            raise err
1097
        return r.headers
1098

    
1099
    def get_container_meta(self, until=None):
1100
        """
1101
        :param until: (str) formated date
1102

1103
        :returns: (dict)
1104
        """
1105
        return filter_in(
1106
            self.get_container_info(until=until),
1107
            'X-Container-Meta')
1108

    
1109
    def get_container_object_meta(self, until=None):
1110
        """
1111
        :param until: (str) formated date
1112

1113
        :returns: (dict)
1114
        """
1115
        return filter_in(
1116
            self.get_container_info(until=until),
1117
            'X-Container-Object-Meta')
1118

    
1119
    def set_container_meta(self, metapairs):
1120
        """
1121
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1122
        """
1123
        assert(type(metapairs) is dict)
1124
        r = self.container_post(update=True, metadata=metapairs)
1125
        return r.headers
1126

    
1127
    def del_container_meta(self, metakey):
1128
        """
1129
        :param metakey: (str) metadatum key
1130

1131
        :returns: (dict) response headers
1132
        """
1133
        r = self.container_post(update=True, metadata={metakey: ''})
1134
        return r.headers
1135

    
1136
    def set_container_limit(self, limit):
1137
        """
1138
        :param limit: (int)
1139
        """
1140
        r = self.container_post(update=True, quota=limit)
1141
        return r.headers
1142

    
1143
    def set_container_versioning(self, versioning):
1144
        """
1145
        :param versioning: (str)
1146
        """
1147
        r = self.container_post(update=True, versioning=versioning)
1148
        return r.headers
1149

    
1150
    def del_object(self, obj, until=None, delimiter=None):
1151
        """
1152
        :param obj: (str) remote object path
1153

1154
        :param until: (str) formated date
1155

1156
        :param delimiter: (str)
1157
        """
1158
        self._assert_container()
1159
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1160
        return r.headers
1161

    
1162
    def set_object_meta(self, obj, metapairs):
1163
        """
1164
        :param obj: (str) remote object path
1165

1166
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1167
        """
1168
        assert(type(metapairs) is dict)
1169
        r = self.object_post(obj, update=True, metadata=metapairs)
1170
        return r.headers
1171

    
1172
    def del_object_meta(self, obj, metakey):
1173
        """
1174
        :param obj: (str) remote object path
1175

1176
        :param metakey: (str) metadatum key
1177
        """
1178
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1179
        return r.headers
1180

    
1181
    def publish_object(self, obj):
1182
        """
1183
        :param obj: (str) remote object path
1184

1185
        :returns: (str) access url
1186
        """
1187
        self.object_post(obj, update=True, public=True)
1188
        info = self.get_object_info(obj)
1189
        return info['x-object-public']
1190
        pref, sep, rest = self.base_url.partition('//')
1191
        base = rest.split('/')[0]
1192
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1193

    
1194
    def unpublish_object(self, obj):
1195
        """
1196
        :param obj: (str) remote object path
1197
        """
1198
        r = self.object_post(obj, update=True, public=False)
1199
        return r.headers
1200

    
1201
    def get_object_info(self, obj, version=None):
1202
        """
1203
        :param obj: (str) remote object path
1204

1205
        :param version: (str)
1206

1207
        :returns: (dict)
1208
        """
1209
        try:
1210
            r = self.object_head(obj, version=version)
1211
            return r.headers
1212
        except ClientError as ce:
1213
            if ce.status == 404:
1214
                raise ClientError('Object %s not found' % obj, status=404)
1215
            raise
1216

    
1217
    def get_object_meta(self, obj, version=None):
1218
        """
1219
        :param obj: (str) remote object path
1220

1221
        :param version: (str)
1222

1223
        :returns: (dict)
1224
        """
1225
        return filter_in(
1226
            self.get_object_info(obj, version=version),
1227
            'X-Object-Meta')
1228

    
1229
    def get_object_sharing(self, obj):
1230
        """
1231
        :param obj: (str) remote object path
1232

1233
        :returns: (dict)
1234
        """
1235
        r = filter_in(
1236
            self.get_object_info(obj),
1237
            'X-Object-Sharing',
1238
            exactMatch=True)
1239
        reply = {}
1240
        if len(r) > 0:
1241
            perms = r['x-object-sharing'].split(';')
1242
            for perm in perms:
1243
                try:
1244
                    perm.index('=')
1245
                except ValueError:
1246
                    raise ClientError('Incorrect reply format')
1247
                (key, val) = perm.strip().split('=')
1248
                reply[key] = val
1249
        return reply
1250

    
1251
    def set_object_sharing(
1252
            self, obj,
1253
            read_permission=False, write_permission=False):
1254
        """Give read/write permisions to an object.
1255

1256
        :param obj: (str) remote object path
1257

1258
        :param read_permission: (list - bool) users and user groups that get
1259
            read permission for this object - False means all previous read
1260
            permissions will be removed
1261

1262
        :param write_permission: (list - bool) of users and user groups to get
1263
           write permission for this object - False means all previous write
1264
           permissions will be removed
1265

1266
        :returns: (dict) response headers
1267
        """
1268

    
1269
        perms = dict(read=read_permission or '', write=write_permission or '')
1270
        r = self.object_post(obj, update=True, permissions=perms)
1271
        return r.headers
1272

    
1273
    def del_object_sharing(self, obj):
1274
        """
1275
        :param obj: (str) remote object path
1276
        """
1277
        return self.set_object_sharing(obj)
1278

    
1279
    def append_object(self, obj, source_file, upload_cb=None):
1280
        """
1281
        :param obj: (str) remote object path
1282

1283
        :param source_file: open file descriptor
1284

1285
        :param upload_db: progress.bar for uploading
1286
        """
1287
        self._assert_container()
1288
        meta = self.get_container_info()
1289
        blocksize = int(meta['x-container-block-size'])
1290
        filesize = fstat(source_file.fileno()).st_size
1291
        nblocks = 1 + (filesize - 1) // blocksize
1292
        offset = 0
1293
        headers = {}
1294
        if upload_cb:
1295
            self.progress_bar_gen = upload_cb(nblocks)
1296
            self._cb_next()
1297
        flying = {}
1298
        self._init_thread_limit()
1299
        try:
1300
            for i in range(nblocks):
1301
                block = source_file.read(min(blocksize, filesize - offset))
1302
                offset += len(block)
1303

    
1304
                self._watch_thread_limit(flying.values())
1305
                unfinished = {}
1306
                flying[i] = SilentEvent(
1307
                    method=self.object_post,
1308
                    obj=obj,
1309
                    update=True,
1310
                    content_range='bytes */*',
1311
                    content_type='application/octet-stream',
1312
                    content_length=len(block),
1313
                    data=block)
1314
                flying[i].start()
1315

    
1316
                for key, thread in flying.items():
1317
                    if thread.isAlive():
1318
                        if i < nblocks:
1319
                            unfinished[key] = thread
1320
                            continue
1321
                        thread.join()
1322
                    if thread.exception:
1323
                        raise thread.exception
1324
                    headers[key] = thread.value.headers
1325
                    self._cb_next()
1326
                flying = unfinished
1327
        except KeyboardInterrupt:
1328
            sendlog.info('- - - wait for threads to finish')
1329
            for thread in activethreads():
1330
                thread.join()
1331
        finally:
1332
            from time import sleep
1333
            sleep(2 * len(activethreads()))
1334
        return headers.values()
1335

    
1336
    def truncate_object(self, obj, upto_bytes):
1337
        """
1338
        :param obj: (str) remote object path
1339

1340
        :param upto_bytes: max number of bytes to leave on file
1341

1342
        :returns: (dict) response headers
1343
        """
1344
        r = self.object_post(
1345
            obj,
1346
            update=True,
1347
            content_range='bytes 0-%s/*' % upto_bytes,
1348
            content_type='application/octet-stream',
1349
            object_bytes=upto_bytes,
1350
            source_object=path4url(self.container, obj))
1351
        return r.headers
1352

    
1353
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1354
        """Overwrite a part of an object from local source file
1355

1356
        :param obj: (str) remote object path
1357

1358
        :param start: (int) position in bytes to start overwriting from
1359

1360
        :param end: (int) position in bytes to stop overwriting at
1361

1362
        :param source_file: open file descriptor
1363

1364
        :param upload_db: progress.bar for uploading
1365
        """
1366

    
1367
        r = self.get_object_info(obj)
1368
        rf_size = int(r['content-length'])
1369
        if rf_size < int(start):
1370
            raise ClientError(
1371
                'Range start exceeds file size',
1372
                status=416)
1373
        elif rf_size < int(end):
1374
            raise ClientError(
1375
                'Range end exceeds file size',
1376
                status=416)
1377
        self._assert_container()
1378
        meta = self.get_container_info()
1379
        blocksize = int(meta['x-container-block-size'])
1380
        filesize = fstat(source_file.fileno()).st_size
1381
        datasize = int(end) - int(start) + 1
1382
        nblocks = 1 + (datasize - 1) // blocksize
1383
        offset = 0
1384
        if upload_cb:
1385
            self.progress_bar_gen = upload_cb(nblocks)
1386
            self._cb_next()
1387
        headers = []
1388
        for i in range(nblocks):
1389
            read_size = min(blocksize, filesize - offset, datasize - offset)
1390
            block = source_file.read(read_size)
1391
            r = self.object_post(
1392
                obj,
1393
                update=True,
1394
                content_type='application/octet-stream',
1395
                content_length=len(block),
1396
                content_range='bytes %s-%s/*' % (
1397
                    start + offset,
1398
                    start + offset + len(block) - 1),
1399
                data=block)
1400
            headers.append(dict(r.headers))
1401
            offset += len(block)
1402

    
1403
            self._cb_next
1404
        return headers
1405

    
1406
    def copy_object(
1407
            self, src_container, src_object, dst_container,
1408
            dst_object=None,
1409
            source_version=None,
1410
            source_account=None,
1411
            public=False,
1412
            content_type=None,
1413
            delimiter=None):
1414
        """
1415
        :param src_container: (str) source container
1416

1417
        :param src_object: (str) source object path
1418

1419
        :param dst_container: (str) destination container
1420

1421
        :param dst_object: (str) destination object path
1422

1423
        :param source_version: (str) source object version
1424

1425
        :param source_account: (str) account to copy from
1426

1427
        :param public: (bool)
1428

1429
        :param content_type: (str)
1430

1431
        :param delimiter: (str)
1432

1433
        :returns: (dict) response headers
1434
        """
1435
        self._assert_account()
1436
        self.container = dst_container
1437
        src_path = path4url(src_container, src_object)
1438
        r = self.object_put(
1439
            dst_object or src_object,
1440
            success=201,
1441
            copy_from=src_path,
1442
            content_length=0,
1443
            source_version=source_version,
1444
            source_account=source_account,
1445
            public=public,
1446
            content_type=content_type,
1447
            delimiter=delimiter)
1448
        return r.headers
1449

    
1450
    def move_object(
1451
            self, src_container, src_object, dst_container,
1452
            dst_object=False,
1453
            source_account=None,
1454
            source_version=None,
1455
            public=False,
1456
            content_type=None,
1457
            delimiter=None):
1458
        """
1459
        :param src_container: (str) source container
1460

1461
        :param src_object: (str) source object path
1462

1463
        :param dst_container: (str) destination container
1464

1465
        :param dst_object: (str) destination object path
1466

1467
        :param source_account: (str) account to move from
1468

1469
        :param source_version: (str) source object version
1470

1471
        :param public: (bool)
1472

1473
        :param content_type: (str)
1474

1475
        :param delimiter: (str)
1476

1477
        :returns: (dict) response headers
1478
        """
1479
        self._assert_account()
1480
        self.container = dst_container
1481
        dst_object = dst_object or src_object
1482
        src_path = path4url(src_container, src_object)
1483
        r = self.object_put(
1484
            dst_object,
1485
            success=201,
1486
            move_from=src_path,
1487
            content_length=0,
1488
            source_account=source_account,
1489
            source_version=source_version,
1490
            public=public,
1491
            content_type=content_type,
1492
            delimiter=delimiter)
1493
        return r.headers
1494

    
1495
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1496
        """Get accounts that share with self.account
1497

1498
        :param limit: (str)
1499

1500
        :param marker: (str)
1501

1502
        :returns: (dict)
1503
        """
1504
        self._assert_account()
1505

    
1506
        self.set_param('format', 'json')
1507
        self.set_param('limit', limit, iff=limit is not None)
1508
        self.set_param('marker', marker, iff=marker is not None)
1509

    
1510
        path = ''
1511
        success = kwargs.pop('success', (200, 204))
1512
        r = self.get(path, *args, success=success, **kwargs)
1513
        return r.json
1514

    
1515
    def get_object_versionlist(self, obj):
1516
        """
1517
        :param obj: (str) remote object path
1518

1519
        :returns: (list)
1520
        """
1521
        self._assert_container()
1522
        r = self.object_get(obj, format='json', version='list')
1523
        return r.json['versions']