Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos / __init__.py @ 776b275c

History | View | Annotate | Download (48.8 kB)

1
# Copyright 2011-2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from threading import enumerate as activethreads
35

    
36
from os import fstat
37
from hashlib import new as newhashlib
38
from time import time
39
from StringIO import StringIO
40

    
41
from binascii import hexlify
42

    
43
from kamaki.clients import SilentEvent, sendlog
44
from kamaki.clients.pithos.rest_api import PithosRestClient
45
from kamaki.clients.storage import ClientError
46
from kamaki.clients.utils import path4url, filter_in
47

    
48

    
49
def _pithos_hash(block, blockhash):
50
    h = newhashlib(blockhash)
51
    h.update(block.rstrip('\x00'))
52
    return h.hexdigest()
53

    
54

    
55
def _range_up(start, end, max_value, a_range):
56
    """
57
    :param start: (int) the window bottom
58

59
    :param end: (int) the window top
60

61
    :param max_value: (int) maximum accepted value
62

63
    :param a_range: (str) a range string in the form X[,X'[,X''[...]]]
64
        where X: x|x-y|-x where x < y and x, y natural numbers
65

66
    :returns: (str) a range string cut-off for the start-end range
67
        an empty response means this window is out of range
68
    """
69
    assert start >= 0, '_range_up was called with start < 0'
70
    assert end >= start, '_range_up was called with end < start'
71
    assert end <= max_value, '_range_up was called with max_value < end'
72
    if not a_range:
73
        return '%s-%s' % (start, end)
74
    selected = []
75
    for some_range in a_range.split(','):
76
        v0, sep, v1 = some_range.partition('-')
77
        if v0:
78
            v0 = int(v0)
79
            if sep:
80
                v1 = int(v1)
81
                if v1 < start or v0 > end or v1 < v0:
82
                    continue
83
                v0 = v0 if v0 > start else start
84
                v1 = v1 if v1 < end else end
85
                selected.append('%s-%s' % (v0, v1))
86
            elif v0 < start:
87
                continue
88
            else:
89
                v1 = v0 if v0 <= end else end
90
                selected.append('%s-%s' % (start, v1))
91
        else:
92
            v1 = int(v1)
93
            if max_value - v1 > end:
94
                continue
95
            v0 = (max_value - v1) if max_value - v1 > start else start
96
            selected.append('%s-%s' % (v0, end))
97
    return ','.join(selected)
98

    
99

    
100
class PithosClient(PithosRestClient):
101
    """Synnefo Pithos+ API client"""
102

    
103
    def __init__(self, base_url, token, account=None, container=None):
104
        super(PithosClient, self).__init__(base_url, token, account, container)
105

    
106
    def create_container(
107
            self,
108
            container=None, sizelimit=None, versioning=None, metadata=None):
109
        """
110
        :param container: (str) if not given, self.container is used instead
111

112
        :param sizelimit: (int) container total size limit in bytes
113

114
        :param versioning: (str) can be auto or whatever supported by server
115

116
        :param metadata: (dict) Custom user-defined metadata of the form
117
            { 'name1': 'value1', 'name2': 'value2', ... }
118

119
        :returns: (dict) response headers
120
        """
121
        cnt_back_up = self.container
122
        try:
123
            self.container = container or cnt_back_up
124
            r = self.container_put(
125
                quota=sizelimit, versioning=versioning, metadata=metadata)
126
            return r.headers
127
        finally:
128
            self.container = cnt_back_up
129

    
130
    def purge_container(self, container=None):
131
        """Delete an empty container and destroy associated blocks
132
        """
133
        cnt_back_up = self.container
134
        try:
135
            self.container = container or cnt_back_up
136
            r = self.container_delete(until=unicode(time()))
137
        finally:
138
            self.container = cnt_back_up
139
        return r.headers
140

    
141
    def upload_object_unchunked(
142
            self, obj, f,
143
            withHashFile=False,
144
            size=None,
145
            etag=None,
146
            content_encoding=None,
147
            content_disposition=None,
148
            content_type=None,
149
            sharing=None,
150
            public=None):
151
        """
152
        :param obj: (str) remote object path
153

154
        :param f: open file descriptor
155

156
        :param withHashFile: (bool)
157

158
        :param size: (int) size of data to upload
159

160
        :param etag: (str)
161

162
        :param content_encoding: (str)
163

164
        :param content_disposition: (str)
165

166
        :param content_type: (str)
167

168
        :param sharing: {'read':[user and/or grp names],
169
            'write':[usr and/or grp names]}
170

171
        :param public: (bool)
172

173
        :returns: (dict) created object metadata
174
        """
175
        self._assert_container()
176

    
177
        if withHashFile:
178
            data = f.read()
179
            try:
180
                import json
181
                data = json.dumps(json.loads(data))
182
            except ValueError:
183
                raise ClientError('"%s" is not json-formated' % f.name, 1)
184
            except SyntaxError:
185
                msg = '"%s" is not a valid hashmap file' % f.name
186
                raise ClientError(msg, 1)
187
            f = StringIO(data)
188
        else:
189
            data = f.read(size) if size else f.read()
190
        r = self.object_put(
191
            obj,
192
            data=data,
193
            etag=etag,
194
            content_encoding=content_encoding,
195
            content_disposition=content_disposition,
196
            content_type=content_type,
197
            permissions=sharing,
198
            public=public,
199
            success=201)
200
        return r.headers
201

    
202
    def create_object_by_manifestation(
203
            self, obj,
204
            etag=None,
205
            content_encoding=None,
206
            content_disposition=None,
207
            content_type=None,
208
            sharing=None,
209
            public=None):
210
        """
211
        :param obj: (str) remote object path
212

213
        :param etag: (str)
214

215
        :param content_encoding: (str)
216

217
        :param content_disposition: (str)
218

219
        :param content_type: (str)
220

221
        :param sharing: {'read':[user and/or grp names],
222
            'write':[usr and/or grp names]}
223

224
        :param public: (bool)
225

226
        :returns: (dict) created object metadata
227
        """
228
        self._assert_container()
229
        r = self.object_put(
230
            obj,
231
            content_length=0,
232
            etag=etag,
233
            content_encoding=content_encoding,
234
            content_disposition=content_disposition,
235
            content_type=content_type,
236
            permissions=sharing,
237
            public=public,
238
            manifest='%s/%s' % (self.container, obj))
239
        return r.headers
240

    
241
    # upload_* auxiliary methods
242
    def _put_block_async(self, data, hash):
243
        event = SilentEvent(method=self._put_block, data=data, hash=hash)
244
        event.start()
245
        return event
246

    
247
    def _put_block(self, data, hash):
248
        r = self.container_post(
249
            update=True,
250
            content_type='application/octet-stream',
251
            content_length=len(data),
252
            data=data,
253
            format='json')
254
        assert r.json[0] == hash, 'Local hash does not match server'
255

    
256
    def _get_file_block_info(self, fileobj, size=None, cache=None):
257
        """
258
        :param fileobj: (file descriptor) source
259

260
        :param size: (int) size of data to upload from source
261

262
        :param cache: (dict) if provided, cache container info response to
263
        avoid redundant calls
264
        """
265
        if isinstance(cache, dict):
266
            try:
267
                meta = cache[self.container]
268
            except KeyError:
269
                meta = self.get_container_info()
270
                cache[self.container] = meta
271
        else:
272
            meta = self.get_container_info()
273
        blocksize = int(meta['x-container-block-size'])
274
        blockhash = meta['x-container-block-hash']
275
        size = size if size is not None else fstat(fileobj.fileno()).st_size
276
        nblocks = 1 + (size - 1) // blocksize
277
        return (blocksize, blockhash, size, nblocks)
278

    
279
    def _create_object_or_get_missing_hashes(
280
            self, obj, json,
281
            size=None,
282
            format='json',
283
            hashmap=True,
284
            content_type=None,
285
            if_etag_match=None,
286
            if_etag_not_match=None,
287
            content_encoding=None,
288
            content_disposition=None,
289
            permissions=None,
290
            public=None,
291
            success=(201, 409)):
292
        r = self.object_put(
293
            obj,
294
            format='json',
295
            hashmap=True,
296
            content_type=content_type,
297
            json=json,
298
            if_etag_match=if_etag_match,
299
            if_etag_not_match=if_etag_not_match,
300
            content_encoding=content_encoding,
301
            content_disposition=content_disposition,
302
            permissions=permissions,
303
            public=public,
304
            success=success)
305
        return (None if r.status_code == 201 else r.json), r.headers
306

    
307
    def _calculate_blocks_for_upload(
308
            self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
309
            hash_cb=None):
310
        offset = 0
311
        if hash_cb:
312
            hash_gen = hash_cb(nblocks)
313
            hash_gen.next()
314

    
315
        for i in range(nblocks):
316
            block = fileobj.read(min(blocksize, size - offset))
317
            bytes = len(block)
318
            hash = _pithos_hash(block, blockhash)
319
            hashes.append(hash)
320
            hmap[hash] = (offset, bytes)
321
            offset += bytes
322
            if hash_cb:
323
                hash_gen.next()
324
        msg = 'Failed to calculate uploaded blocks:'
325
        ' Offset and object size do not match'
326
        assert offset == size, msg
327

    
328
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_gen=None):
329
        """upload missing blocks asynchronously"""
330

    
331
        self._init_thread_limit()
332

    
333
        flying = []
334
        failures = []
335
        for hash in missing:
336
            offset, bytes = hmap[hash]
337
            fileobj.seek(offset)
338
            data = fileobj.read(bytes)
339
            r = self._put_block_async(data, hash)
340
            flying.append(r)
341
            unfinished = self._watch_thread_limit(flying)
342
            for thread in set(flying).difference(unfinished):
343
                if thread.exception:
344
                    failures.append(thread)
345
                    if isinstance(
346
                            thread.exception,
347
                            ClientError) and thread.exception.status == 502:
348
                        self.POOLSIZE = self._thread_limit
349
                elif thread.isAlive():
350
                    flying.append(thread)
351
                elif upload_gen:
352
                    try:
353
                        upload_gen.next()
354
                    except:
355
                        pass
356
            flying = unfinished
357

    
358
        for thread in flying:
359
            thread.join()
360
            if thread.exception:
361
                failures.append(thread)
362
            elif upload_gen:
363
                try:
364
                    upload_gen.next()
365
                except:
366
                    pass
367

    
368
        return [failure.kwargs['hash'] for failure in failures]
369

    
370
    def upload_object(
371
            self, obj, f,
372
            size=None,
373
            hash_cb=None,
374
            upload_cb=None,
375
            etag=None,
376
            if_etag_match=None,
377
            if_not_exist=None,
378
            content_encoding=None,
379
            content_disposition=None,
380
            content_type=None,
381
            sharing=None,
382
            public=None,
383
            container_info_cache=None):
384
        """Upload an object using multiple connections (threads)
385

386
        :param obj: (str) remote object path
387

388
        :param f: open file descriptor (rb)
389

390
        :param hash_cb: optional progress.bar object for calculating hashes
391

392
        :param upload_cb: optional progress.bar object for uploading
393

394
        :param etag: (str)
395

396
        :param if_etag_match: (str) Push that value to if-match header at file
397
            creation
398

399
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
400
            it does not exist remotely, otherwise the operation will fail.
401
            Involves the case of an object with the same path is created while
402
            the object is being uploaded.
403

404
        :param content_encoding: (str)
405

406
        :param content_disposition: (str)
407

408
        :param content_type: (str)
409

410
        :param sharing: {'read':[user and/or grp names],
411
            'write':[usr and/or grp names]}
412

413
        :param public: (bool)
414

415
        :param container_info_cache: (dict) if given, avoid redundant calls to
416
            server for container info (block size and hash information)
417
        """
418
        self._assert_container()
419

    
420
        block_info = (
421
            blocksize, blockhash, size, nblocks) = self._get_file_block_info(
422
                f, size, container_info_cache)
423
        (hashes, hmap, offset) = ([], {}, 0)
424
        if not content_type:
425
            content_type = 'application/octet-stream'
426

    
427
        self._calculate_blocks_for_upload(
428
            *block_info,
429
            hashes=hashes,
430
            hmap=hmap,
431
            fileobj=f,
432
            hash_cb=hash_cb)
433

    
434
        hashmap = dict(bytes=size, hashes=hashes)
435
        missing, obj_headers = self._create_object_or_get_missing_hashes(
436
            obj, hashmap,
437
            content_type=content_type,
438
            size=size,
439
            if_etag_match=if_etag_match,
440
            if_etag_not_match='*' if if_not_exist else None,
441
            content_encoding=content_encoding,
442
            content_disposition=content_disposition,
443
            permissions=sharing,
444
            public=public)
445

    
446
        if missing is None:
447
            return obj_headers
448

    
449
        if upload_cb:
450
            upload_gen = upload_cb(len(missing))
451
            for i in range(len(missing), len(hashmap['hashes']) + 1):
452
                try:
453
                    upload_gen.next()
454
                except:
455
                    upload_gen = None
456
        else:
457
            upload_gen = None
458

    
459
        retries = 7
460
        try:
461
            while retries:
462
                sendlog.info('%s blocks missing' % len(missing))
463
                num_of_blocks = len(missing)
464
                missing = self._upload_missing_blocks(
465
                    missing,
466
                    hmap,
467
                    f,
468
                    upload_gen)
469
                if missing:
470
                    if num_of_blocks == len(missing):
471
                        retries -= 1
472
                    else:
473
                        num_of_blocks = len(missing)
474
                else:
475
                    break
476
            if missing:
477
                try:
478
                    details = ['%s' % thread.exception for thread in missing]
479
                except Exception:
480
                    details = ['Also, failed to read thread exceptions']
481
                raise ClientError(
482
                    '%s blocks failed to upload' % len(missing),
483
                    details=details)
484
        except KeyboardInterrupt:
485
            sendlog.info('- - - wait for threads to finish')
486
            for thread in activethreads():
487
                thread.join()
488
            raise
489

    
490
        r = self.object_put(
491
            obj,
492
            format='json',
493
            hashmap=True,
494
            content_type=content_type,
495
            content_encoding=content_encoding,
496
            if_etag_match=if_etag_match,
497
            if_etag_not_match='*' if if_not_exist else None,
498
            etag=etag,
499
            json=hashmap,
500
            permissions=sharing,
501
            public=public,
502
            success=201)
503
        return r.headers
504

    
505
    def upload_from_string(
506
            self, obj, input_str,
507
            hash_cb=None,
508
            upload_cb=None,
509
            etag=None,
510
            if_etag_match=None,
511
            if_not_exist=None,
512
            content_encoding=None,
513
            content_disposition=None,
514
            content_type=None,
515
            sharing=None,
516
            public=None,
517
            container_info_cache=None):
518
        """Upload an object using multiple connections (threads)
519

520
        :param obj: (str) remote object path
521

522
        :param input_str: (str) upload content
523

524
        :param hash_cb: optional progress.bar object for calculating hashes
525

526
        :param upload_cb: optional progress.bar object for uploading
527

528
        :param etag: (str)
529

530
        :param if_etag_match: (str) Push that value to if-match header at file
531
            creation
532

533
        :param if_not_exist: (bool) If true, the file will be uploaded ONLY if
534
            it does not exist remotely, otherwise the operation will fail.
535
            Involves the case of an object with the same path is created while
536
            the object is being uploaded.
537

538
        :param content_encoding: (str)
539

540
        :param content_disposition: (str)
541

542
        :param content_type: (str)
543

544
        :param sharing: {'read':[user and/or grp names],
545
            'write':[usr and/or grp names]}
546

547
        :param public: (bool)
548

549
        :param container_info_cache: (dict) if given, avoid redundant calls to
550
            server for container info (block size and hash information)
551
        """
552
        self._assert_container()
553

    
554
        blocksize, blockhash, size, nblocks = self._get_file_block_info(
555
                fileobj=None, size=len(input_str), cache=container_info_cache)
556
        (hashes, hmap, offset) = ([], {}, 0)
557
        if not content_type:
558
            content_type = 'application/octet-stream'
559

    
560
        hashes = []
561
        hmap = {}
562
        for blockid in range(nblocks):
563
            start = blockid * blocksize
564
            block = input_str[start: (start + blocksize)]
565
            hashes.append(_pithos_hash(block, blockhash))
566
            hmap[hashes[blockid]] = (start, block)
567

    
568
        hashmap = dict(bytes=size, hashes=hashes)
569
        missing, obj_headers = self._create_object_or_get_missing_hashes(
570
            obj, hashmap,
571
            content_type=content_type,
572
            size=size,
573
            if_etag_match=if_etag_match,
574
            if_etag_not_match='*' if if_not_exist else None,
575
            content_encoding=content_encoding,
576
            content_disposition=content_disposition,
577
            permissions=sharing,
578
            public=public)
579
        if missing is None:
580
            return obj_headers
581
        num_of_missing = len(missing)
582

    
583
        if upload_cb:
584
            self.progress_bar_gen = upload_cb(nblocks)
585
            for i in range(nblocks + 1 - num_of_missing):
586
                self._cb_next()
587

    
588
        tries = 7
589
        old_failures = 0
590
        try:
591
            while tries and missing:
592
                flying = []
593
                failures = []
594
                for hash in missing:
595
                    offset, block = hmap[hash]
596
                    bird = self._put_block_async(block, hash)
597
                    flying.append(bird)
598
                    unfinished = self._watch_thread_limit(flying)
599
                    for thread in set(flying).difference(unfinished):
600
                        if thread.exception:
601
                            failures.append(thread.kwargs['hash'])
602
                        if thread.isAlive():
603
                            flying.append(thread)
604
                        else:
605
                            self._cb_next()
606
                    flying = unfinished
607
                for thread in flying:
608
                    thread.join()
609
                    if thread.exception:
610
                        failures.append(thread.kwargs['hash'])
611
                    self._cb_next()
612
                missing = failures
613
                if missing and len(missing) == old_failures:
614
                    tries -= 1
615
                old_failures = len(missing)
616
            if missing:
617
                raise ClientError(
618
                    '%s blocks failed to upload' % len(missing),
619
                    details=['%s' % thread.exception for thread in missing])
620
        except KeyboardInterrupt:
621
            sendlog.info('- - - wait for threads to finish')
622
            for thread in activethreads():
623
                thread.join()
624
            raise
625

    
626
        r = self.object_put(
627
            obj,
628
            format='json',
629
            hashmap=True,
630
            content_type=content_type,
631
            content_encoding=content_encoding,
632
            if_etag_match=if_etag_match,
633
            if_etag_not_match='*' if if_not_exist else None,
634
            etag=etag,
635
            json=hashmap,
636
            permissions=sharing,
637
            public=public,
638
            success=201)
639
        return r.headers
640

    
641
    # download_* auxiliary methods
642
    def _get_remote_blocks_info(self, obj, **restargs):
643
        #retrieve object hashmap
644
        myrange = restargs.pop('data_range', None)
645
        hashmap = self.get_object_hashmap(obj, **restargs)
646
        restargs['data_range'] = myrange
647
        blocksize = int(hashmap['block_size'])
648
        blockhash = hashmap['block_hash']
649
        total_size = hashmap['bytes']
650
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
651
        map_dict = {}
652
        for i, h in enumerate(hashmap['hashes']):
653
            #  map_dict[h] = i   CHAGE
654
            if h in map_dict:
655
                map_dict[h].append(i)
656
            else:
657
                map_dict[h] = [i]
658
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
659

    
660
    def _dump_blocks_sync(
661
            self, obj, remote_hashes, blocksize, total_size, dst, crange,
662
            **args):
663
        for blockid, blockhash in enumerate(remote_hashes):
664
            if blockhash:
665
                start = blocksize * blockid
666
                is_last = start + blocksize > total_size
667
                end = (total_size - 1) if is_last else (start + blocksize - 1)
668
                data_range = _range_up(start, end, total_size, crange)
669
                if not data_range:
670
                    self._cb_next()
671
                    continue
672
                args['data_range'] = 'bytes=%s' % data_range
673
                r = self.object_get(obj, success=(200, 206), **args)
674
                self._cb_next()
675
                dst.write(r.content)
676
                dst.flush()
677

    
678
    def _get_block_async(self, obj, **args):
679
        event = SilentEvent(self.object_get, obj, success=(200, 206), **args)
680
        event.start()
681
        return event
682

    
683
    def _hash_from_file(self, fp, start, size, blockhash):
684
        fp.seek(start)
685
        block = fp.read(size)
686
        h = newhashlib(blockhash)
687
        h.update(block.strip('\x00'))
688
        return hexlify(h.digest())
689

    
690
    def _thread2file(self, flying, blockids, local_file, offset=0, **restargs):
691
        """write the results of a greenleted rest call to a file
692

693
        :param offset: the offset of the file up to blocksize
694
        - e.g. if the range is 10-100, all blocks will be written to
695
        normal_position - 10
696
        """
697
        for key, g in flying.items():
698
            if g.isAlive():
699
                continue
700
            if g.exception:
701
                raise g.exception
702
            block = g.value.content
703
            for block_start in blockids[key]:
704
                local_file.seek(block_start + offset)
705
                local_file.write(block)
706
                self._cb_next()
707
            flying.pop(key)
708
            blockids.pop(key)
709
        local_file.flush()
710

    
711
    def _dump_blocks_async(
712
            self, obj, remote_hashes, blocksize, total_size, local_file,
713
            blockhash=None, resume=False, filerange=None, **restargs):
714
        file_size = fstat(local_file.fileno()).st_size if resume else 0
715
        flying = dict()
716
        blockid_dict = dict()
717
        offset = 0
718

    
719
        self._init_thread_limit()
720
        for block_hash, blockids in remote_hashes.items():
721
            blockids = [blk * blocksize for blk in blockids]
722
            unsaved = [blk for blk in blockids if not (
723
                blk < file_size and block_hash == self._hash_from_file(
724
                        local_file, blk, blocksize, blockhash))]
725
            self._cb_next(len(blockids) - len(unsaved))
726
            if unsaved:
727
                key = unsaved[0]
728
                self._watch_thread_limit(flying.values())
729
                self._thread2file(
730
                    flying, blockid_dict, local_file, offset,
731
                    **restargs)
732
                end = total_size - 1 if (
733
                    key + blocksize > total_size) else key + blocksize - 1
734
                data_range = _range_up(key, end, total_size, filerange)
735
                if not data_range:
736
                    self._cb_next()
737
                    continue
738
                restargs['async_headers'] = {'Range': 'bytes=%s' % data_range}
739
                flying[key] = self._get_block_async(obj, **restargs)
740
                blockid_dict[key] = unsaved
741

    
742
        for thread in flying.values():
743
            thread.join()
744
        self._thread2file(flying, blockid_dict, local_file, offset, **restargs)
745

    
746
    def download_object(
747
            self, obj, dst,
748
            download_cb=None,
749
            version=None,
750
            resume=False,
751
            range_str=None,
752
            if_match=None,
753
            if_none_match=None,
754
            if_modified_since=None,
755
            if_unmodified_since=None):
756
        """Download an object (multiple connections, random blocks)
757

758
        :param obj: (str) remote object path
759

760
        :param dst: open file descriptor (wb+)
761

762
        :param download_cb: optional progress.bar object for downloading
763

764
        :param version: (str) file version
765

766
        :param resume: (bool) if set, preserve already downloaded file parts
767

768
        :param range_str: (str) from, to are file positions (int) in bytes
769

770
        :param if_match: (str)
771

772
        :param if_none_match: (str)
773

774
        :param if_modified_since: (str) formated date
775

776
        :param if_unmodified_since: (str) formated date"""
777
        restargs = dict(
778
            version=version,
779
            data_range=None if range_str is None else 'bytes=%s' % range_str,
780
            if_match=if_match,
781
            if_none_match=if_none_match,
782
            if_modified_since=if_modified_since,
783
            if_unmodified_since=if_unmodified_since)
784

    
785
        (
786
            blocksize,
787
            blockhash,
788
            total_size,
789
            hash_list,
790
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
791
        assert total_size >= 0
792

    
793
        if download_cb:
794
            self.progress_bar_gen = download_cb(len(hash_list))
795
            self._cb_next()
796

    
797
        if dst.isatty():
798
            self._dump_blocks_sync(
799
                obj,
800
                hash_list,
801
                blocksize,
802
                total_size,
803
                dst,
804
                range_str,
805
                **restargs)
806
        else:
807
            self._dump_blocks_async(
808
                obj,
809
                remote_hashes,
810
                blocksize,
811
                total_size,
812
                dst,
813
                blockhash,
814
                resume,
815
                range_str,
816
                **restargs)
817
            if not range_str:
818
                dst.truncate(total_size)
819

    
820
        self._complete_cb()
821

    
822
    def download_to_string(
823
            self, obj,
824
            download_cb=None,
825
            version=None,
826
            range_str=None,
827
            if_match=None,
828
            if_none_match=None,
829
            if_modified_since=None,
830
            if_unmodified_since=None):
831
        """Download an object to a string (multiple connections). This method
832
        uses threads for http requests, but stores all content in memory.
833

834
        :param obj: (str) remote object path
835

836
        :param download_cb: optional progress.bar object for downloading
837

838
        :param version: (str) file version
839

840
        :param range_str: (str) from, to are file positions (int) in bytes
841

842
        :param if_match: (str)
843

844
        :param if_none_match: (str)
845

846
        :param if_modified_since: (str) formated date
847

848
        :param if_unmodified_since: (str) formated date
849

850
        :returns: (str) the whole object contents
851
        """
852
        restargs = dict(
853
            version=version,
854
            data_range=None if range_str is None else 'bytes=%s' % range_str,
855
            if_match=if_match,
856
            if_none_match=if_none_match,
857
            if_modified_since=if_modified_since,
858
            if_unmodified_since=if_unmodified_since)
859

    
860
        (
861
            blocksize,
862
            blockhash,
863
            total_size,
864
            hash_list,
865
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
866
        assert total_size >= 0
867

    
868
        if download_cb:
869
            self.progress_bar_gen = download_cb(len(hash_list))
870
            self._cb_next()
871

    
872
        num_of_blocks = len(remote_hashes)
873
        ret = [''] * num_of_blocks
874
        self._init_thread_limit()
875
        flying = dict()
876
        try:
877
            for blockid, blockhash in enumerate(remote_hashes):
878
                start = blocksize * blockid
879
                is_last = start + blocksize > total_size
880
                end = (total_size - 1) if is_last else (start + blocksize - 1)
881
                data_range_str = _range_up(start, end, end, range_str)
882
                if data_range_str:
883
                    self._watch_thread_limit(flying.values())
884
                    restargs['data_range'] = 'bytes=%s' % data_range_str
885
                    flying[blockid] = self._get_block_async(obj, **restargs)
886
                for runid, thread in flying.items():
887
                    if (blockid + 1) == num_of_blocks:
888
                        thread.join()
889
                    elif thread.isAlive():
890
                        continue
891
                    if thread.exception:
892
                        raise thread.exception
893
                    ret[runid] = thread.value.content
894
                    self._cb_next()
895
                    flying.pop(runid)
896
            return ''.join(ret)
897
        except KeyboardInterrupt:
898
            sendlog.info('- - - wait for threads to finish')
899
            for thread in activethreads():
900
                thread.join()
901

    
902
    #Command Progress Bar method
903
    def _cb_next(self, step=1):
904
        if hasattr(self, 'progress_bar_gen'):
905
            try:
906
                for i in xrange(step):
907
                    self.progress_bar_gen.next()
908
            except:
909
                pass
910

    
911
    def _complete_cb(self):
912
        while True:
913
            try:
914
                self.progress_bar_gen.next()
915
            except:
916
                break
917

    
918
    def get_object_hashmap(
919
            self, obj,
920
            version=None,
921
            if_match=None,
922
            if_none_match=None,
923
            if_modified_since=None,
924
            if_unmodified_since=None):
925
        """
926
        :param obj: (str) remote object path
927

928
        :param if_match: (str)
929

930
        :param if_none_match: (str)
931

932
        :param if_modified_since: (str) formated date
933

934
        :param if_unmodified_since: (str) formated date
935

936
        :returns: (list)
937
        """
938
        try:
939
            r = self.object_get(
940
                obj,
941
                hashmap=True,
942
                version=version,
943
                if_etag_match=if_match,
944
                if_etag_not_match=if_none_match,
945
                if_modified_since=if_modified_since,
946
                if_unmodified_since=if_unmodified_since)
947
        except ClientError as err:
948
            if err.status == 304 or err.status == 412:
949
                return {}
950
            raise
951
        return r.json
952

    
953
    def set_account_group(self, group, usernames):
954
        """
955
        :param group: (str)
956

957
        :param usernames: (list)
958
        """
959
        r = self.account_post(update=True, groups={group: usernames})
960
        return r
961

    
962
    def del_account_group(self, group):
963
        """
964
        :param group: (str)
965
        """
966
        self.account_post(update=True, groups={group: []})
967

    
968
    def get_account_info(self, until=None):
969
        """
970
        :param until: (str) formated date
971

972
        :returns: (dict)
973
        """
974
        r = self.account_head(until=until)
975
        if r.status_code == 401:
976
            raise ClientError("No authorization", status=401)
977
        return r.headers
978

    
979
    def get_account_quota(self):
980
        """
981
        :returns: (dict)
982
        """
983
        return filter_in(
984
            self.get_account_info(),
985
            'X-Account-Policy-Quota',
986
            exactMatch=True)
987

    
988
    #def get_account_versioning(self):
989
    #    """
990
    #    :returns: (dict)
991
    #    """
992
    #    return filter_in(
993
    #        self.get_account_info(),
994
    #        'X-Account-Policy-Versioning',
995
    #        exactMatch=True)
996

    
997
    def get_account_meta(self, until=None):
998
        """
999
        :param until: (str) formated date
1000

1001
        :returns: (dict)
1002
        """
1003
        return filter_in(self.get_account_info(until=until), 'X-Account-Meta-')
1004

    
1005
    def get_account_group(self):
1006
        """
1007
        :returns: (dict)
1008
        """
1009
        return filter_in(self.get_account_info(), 'X-Account-Group-')
1010

    
1011
    def set_account_meta(self, metapairs):
1012
        """
1013
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1014
        """
1015
        assert(type(metapairs) is dict)
1016
        r = self.account_post(update=True, metadata=metapairs)
1017
        return r.headers
1018

    
1019
    def del_account_meta(self, metakey):
1020
        """
1021
        :param metakey: (str) metadatum key
1022
        """
1023
        r = self.account_post(update=True, metadata={metakey: ''})
1024
        return r.headers
1025

    
1026
    #def set_account_quota(self, quota):
1027
    #    """
1028
    #    :param quota: (int)
1029
    #    """
1030
    #    self.account_post(update=True, quota=quota)
1031

    
1032
    #def set_account_versioning(self, versioning):
1033
    #    """
1034
    #    :param versioning: (str)
1035
    #    """
1036
    #    r = self.account_post(update=True, versioning=versioning)
1037
    #    return r.headers
1038

    
1039
    def list_containers(self):
1040
        """
1041
        :returns: (dict)
1042
        """
1043
        r = self.account_get()
1044
        return r.json
1045

    
1046
    def del_container(self, until=None, delimiter=None):
1047
        """
1048
        :param until: (str) formated date
1049

1050
        :param delimiter: (str) with / empty container
1051

1052
        :raises ClientError: 404 Container does not exist
1053

1054
        :raises ClientError: 409 Container is not empty
1055
        """
1056
        self._assert_container()
1057
        r = self.container_delete(
1058
            until=until,
1059
            delimiter=delimiter,
1060
            success=(204, 404, 409))
1061
        if r.status_code == 404:
1062
            raise ClientError(
1063
                'Container "%s" does not exist' % self.container,
1064
                r.status_code)
1065
        elif r.status_code == 409:
1066
            raise ClientError(
1067
                'Container "%s" is not empty' % self.container,
1068
                r.status_code)
1069
        return r.headers
1070

    
1071
    def get_container_versioning(self, container=None):
1072
        """
1073
        :param container: (str)
1074

1075
        :returns: (dict)
1076
        """
1077
        cnt_back_up = self.container
1078
        try:
1079
            self.container = container or cnt_back_up
1080
            return filter_in(
1081
                self.get_container_info(),
1082
                'X-Container-Policy-Versioning')
1083
        finally:
1084
            self.container = cnt_back_up
1085

    
1086
    def get_container_limit(self, container=None):
1087
        """
1088
        :param container: (str)
1089

1090
        :returns: (dict)
1091
        """
1092
        cnt_back_up = self.container
1093
        try:
1094
            self.container = container or cnt_back_up
1095
            return filter_in(
1096
                self.get_container_info(),
1097
                'X-Container-Policy-Quota')
1098
        finally:
1099
            self.container = cnt_back_up
1100

    
1101
    def get_container_info(self, until=None):
1102
        """
1103
        :param until: (str) formated date
1104

1105
        :returns: (dict)
1106

1107
        :raises ClientError: 404 Container not found
1108
        """
1109
        try:
1110
            r = self.container_head(until=until)
1111
        except ClientError as err:
1112
            err.details.append('for container %s' % self.container)
1113
            raise err
1114
        return r.headers
1115

    
1116
    def get_container_meta(self, until=None):
1117
        """
1118
        :param until: (str) formated date
1119

1120
        :returns: (dict)
1121
        """
1122
        return filter_in(
1123
            self.get_container_info(until=until),
1124
            'X-Container-Meta')
1125

    
1126
    def get_container_object_meta(self, until=None):
1127
        """
1128
        :param until: (str) formated date
1129

1130
        :returns: (dict)
1131
        """
1132
        return filter_in(
1133
            self.get_container_info(until=until),
1134
            'X-Container-Object-Meta')
1135

    
1136
    def set_container_meta(self, metapairs):
1137
        """
1138
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1139
        """
1140
        assert(type(metapairs) is dict)
1141
        r = self.container_post(update=True, metadata=metapairs)
1142
        return r.headers
1143

    
1144
    def del_container_meta(self, metakey):
1145
        """
1146
        :param metakey: (str) metadatum key
1147

1148
        :returns: (dict) response headers
1149
        """
1150
        r = self.container_post(update=True, metadata={metakey: ''})
1151
        return r.headers
1152

    
1153
    def set_container_limit(self, limit):
1154
        """
1155
        :param limit: (int)
1156
        """
1157
        r = self.container_post(update=True, quota=limit)
1158
        return r.headers
1159

    
1160
    def set_container_versioning(self, versioning):
1161
        """
1162
        :param versioning: (str)
1163
        """
1164
        r = self.container_post(update=True, versioning=versioning)
1165
        return r.headers
1166

    
1167
    def del_object(self, obj, until=None, delimiter=None):
1168
        """
1169
        :param obj: (str) remote object path
1170

1171
        :param until: (str) formated date
1172

1173
        :param delimiter: (str)
1174
        """
1175
        self._assert_container()
1176
        r = self.object_delete(obj, until=until, delimiter=delimiter)
1177
        return r.headers
1178

    
1179
    def set_object_meta(self, obj, metapairs):
1180
        """
1181
        :param obj: (str) remote object path
1182

1183
        :param metapairs: (dict) {key1:val1, key2:val2, ...}
1184
        """
1185
        assert(type(metapairs) is dict)
1186
        r = self.object_post(obj, update=True, metadata=metapairs)
1187
        return r.headers
1188

    
1189
    def del_object_meta(self, obj, metakey):
1190
        """
1191
        :param obj: (str) remote object path
1192

1193
        :param metakey: (str) metadatum key
1194
        """
1195
        r = self.object_post(obj, update=True, metadata={metakey: ''})
1196
        return r.headers
1197

    
1198
    def publish_object(self, obj):
1199
        """
1200
        :param obj: (str) remote object path
1201

1202
        :returns: (str) access url
1203
        """
1204
        self.object_post(obj, update=True, public=True)
1205
        info = self.get_object_info(obj)
1206
        return info['x-object-public']
1207
        pref, sep, rest = self.base_url.partition('//')
1208
        base = rest.split('/')[0]
1209
        return '%s%s%s/%s' % (pref, sep, base, info['x-object-public'])
1210

    
1211
    def unpublish_object(self, obj):
1212
        """
1213
        :param obj: (str) remote object path
1214
        """
1215
        r = self.object_post(obj, update=True, public=False)
1216
        return r.headers
1217

    
1218
    def get_object_info(self, obj, version=None):
1219
        """
1220
        :param obj: (str) remote object path
1221

1222
        :param version: (str)
1223

1224
        :returns: (dict)
1225
        """
1226
        try:
1227
            r = self.object_head(obj, version=version)
1228
            return r.headers
1229
        except ClientError as ce:
1230
            if ce.status == 404:
1231
                raise ClientError('Object %s not found' % obj, status=404)
1232
            raise
1233

    
1234
    def get_object_meta(self, obj, version=None):
1235
        """
1236
        :param obj: (str) remote object path
1237

1238
        :param version: (str)
1239

1240
        :returns: (dict)
1241
        """
1242
        return filter_in(
1243
            self.get_object_info(obj, version=version),
1244
            'X-Object-Meta')
1245

    
1246
    def get_object_sharing(self, obj):
1247
        """
1248
        :param obj: (str) remote object path
1249

1250
        :returns: (dict)
1251
        """
1252
        r = filter_in(
1253
            self.get_object_info(obj),
1254
            'X-Object-Sharing',
1255
            exactMatch=True)
1256
        reply = {}
1257
        if len(r) > 0:
1258
            perms = r['x-object-sharing'].split(';')
1259
            for perm in perms:
1260
                try:
1261
                    perm.index('=')
1262
                except ValueError:
1263
                    raise ClientError('Incorrect reply format')
1264
                (key, val) = perm.strip().split('=')
1265
                reply[key] = val
1266
        return reply
1267

    
1268
    def set_object_sharing(
1269
            self, obj,
1270
            read_permission=False, write_permission=False):
1271
        """Give read/write permisions to an object.
1272

1273
        :param obj: (str) remote object path
1274

1275
        :param read_permission: (list - bool) users and user groups that get
1276
            read permission for this object - False means all previous read
1277
            permissions will be removed
1278

1279
        :param write_permission: (list - bool) of users and user groups to get
1280
           write permission for this object - False means all previous write
1281
           permissions will be removed
1282

1283
        :returns: (dict) response headers
1284
        """
1285

    
1286
        perms = dict(read=read_permission or '', write=write_permission or '')
1287
        r = self.object_post(obj, update=True, permissions=perms)
1288
        return r.headers
1289

    
1290
    def del_object_sharing(self, obj):
1291
        """
1292
        :param obj: (str) remote object path
1293
        """
1294
        return self.set_object_sharing(obj)
1295

    
1296
    def append_object(self, obj, source_file, upload_cb=None):
1297
        """
1298
        :param obj: (str) remote object path
1299

1300
        :param source_file: open file descriptor
1301

1302
        :param upload_db: progress.bar for uploading
1303
        """
1304
        self._assert_container()
1305
        meta = self.get_container_info()
1306
        blocksize = int(meta['x-container-block-size'])
1307
        filesize = fstat(source_file.fileno()).st_size
1308
        nblocks = 1 + (filesize - 1) // blocksize
1309
        offset = 0
1310
        headers = {}
1311
        if upload_cb:
1312
            self.progress_bar_gen = upload_cb(nblocks)
1313
            self._cb_next()
1314
        flying = {}
1315
        self._init_thread_limit()
1316
        try:
1317
            for i in range(nblocks):
1318
                block = source_file.read(min(blocksize, filesize - offset))
1319
                offset += len(block)
1320

    
1321
                self._watch_thread_limit(flying.values())
1322
                unfinished = {}
1323
                flying[i] = SilentEvent(
1324
                    method=self.object_post,
1325
                    obj=obj,
1326
                    update=True,
1327
                    content_range='bytes */*',
1328
                    content_type='application/octet-stream',
1329
                    content_length=len(block),
1330
                    data=block)
1331
                flying[i].start()
1332

    
1333
                for key, thread in flying.items():
1334
                    if thread.isAlive():
1335
                        if i < nblocks:
1336
                            unfinished[key] = thread
1337
                            continue
1338
                        thread.join()
1339
                    if thread.exception:
1340
                        raise thread.exception
1341
                    headers[key] = thread.value.headers
1342
                    self._cb_next()
1343
                flying = unfinished
1344
        except KeyboardInterrupt:
1345
            sendlog.info('- - - wait for threads to finish')
1346
            for thread in activethreads():
1347
                thread.join()
1348
        finally:
1349
            from time import sleep
1350
            sleep(2 * len(activethreads()))
1351
        return headers.values()
1352

    
1353
    def truncate_object(self, obj, upto_bytes):
1354
        """
1355
        :param obj: (str) remote object path
1356

1357
        :param upto_bytes: max number of bytes to leave on file
1358

1359
        :returns: (dict) response headers
1360
        """
1361
        r = self.object_post(
1362
            obj,
1363
            update=True,
1364
            content_range='bytes 0-%s/*' % upto_bytes,
1365
            content_type='application/octet-stream',
1366
            object_bytes=upto_bytes,
1367
            source_object=path4url(self.container, obj))
1368
        return r.headers
1369

    
1370
    def overwrite_object(self, obj, start, end, source_file, upload_cb=None):
1371
        """Overwrite a part of an object from local source file
1372

1373
        :param obj: (str) remote object path
1374

1375
        :param start: (int) position in bytes to start overwriting from
1376

1377
        :param end: (int) position in bytes to stop overwriting at
1378

1379
        :param source_file: open file descriptor
1380

1381
        :param upload_db: progress.bar for uploading
1382
        """
1383

    
1384
        r = self.get_object_info(obj)
1385
        rf_size = int(r['content-length'])
1386
        if rf_size < int(start):
1387
            raise ClientError(
1388
                'Range start exceeds file size',
1389
                status=416)
1390
        elif rf_size < int(end):
1391
            raise ClientError(
1392
                'Range end exceeds file size',
1393
                status=416)
1394
        self._assert_container()
1395
        meta = self.get_container_info()
1396
        blocksize = int(meta['x-container-block-size'])
1397
        filesize = fstat(source_file.fileno()).st_size
1398
        datasize = int(end) - int(start) + 1
1399
        nblocks = 1 + (datasize - 1) // blocksize
1400
        offset = 0
1401
        if upload_cb:
1402
            self.progress_bar_gen = upload_cb(nblocks)
1403
            self._cb_next()
1404
        headers = []
1405
        for i in range(nblocks):
1406
            read_size = min(blocksize, filesize - offset, datasize - offset)
1407
            block = source_file.read(read_size)
1408
            r = self.object_post(
1409
                obj,
1410
                update=True,
1411
                content_type='application/octet-stream',
1412
                content_length=len(block),
1413
                content_range='bytes %s-%s/*' % (
1414
                    start + offset,
1415
                    start + offset + len(block) - 1),
1416
                data=block)
1417
            headers.append(dict(r.headers))
1418
            offset += len(block)
1419

    
1420
            self._cb_next
1421
        return headers
1422

    
1423
    def copy_object(
1424
            self, src_container, src_object, dst_container,
1425
            dst_object=None,
1426
            source_version=None,
1427
            source_account=None,
1428
            public=False,
1429
            content_type=None,
1430
            delimiter=None):
1431
        """
1432
        :param src_container: (str) source container
1433

1434
        :param src_object: (str) source object path
1435

1436
        :param dst_container: (str) destination container
1437

1438
        :param dst_object: (str) destination object path
1439

1440
        :param source_version: (str) source object version
1441

1442
        :param source_account: (str) account to copy from
1443

1444
        :param public: (bool)
1445

1446
        :param content_type: (str)
1447

1448
        :param delimiter: (str)
1449

1450
        :returns: (dict) response headers
1451
        """
1452
        self._assert_account()
1453
        self.container = dst_container
1454
        src_path = path4url(src_container, src_object)
1455
        r = self.object_put(
1456
            dst_object or src_object,
1457
            success=201,
1458
            copy_from=src_path,
1459
            content_length=0,
1460
            source_version=source_version,
1461
            source_account=source_account,
1462
            public=public,
1463
            content_type=content_type,
1464
            delimiter=delimiter)
1465
        return r.headers
1466

    
1467
    def move_object(
1468
            self, src_container, src_object, dst_container,
1469
            dst_object=False,
1470
            source_account=None,
1471
            source_version=None,
1472
            public=False,
1473
            content_type=None,
1474
            delimiter=None):
1475
        """
1476
        :param src_container: (str) source container
1477

1478
        :param src_object: (str) source object path
1479

1480
        :param dst_container: (str) destination container
1481

1482
        :param dst_object: (str) destination object path
1483

1484
        :param source_account: (str) account to move from
1485

1486
        :param source_version: (str) source object version
1487

1488
        :param public: (bool)
1489

1490
        :param content_type: (str)
1491

1492
        :param delimiter: (str)
1493

1494
        :returns: (dict) response headers
1495
        """
1496
        self._assert_account()
1497
        self.container = dst_container
1498
        dst_object = dst_object or src_object
1499
        src_path = path4url(src_container, src_object)
1500
        r = self.object_put(
1501
            dst_object,
1502
            success=201,
1503
            move_from=src_path,
1504
            content_length=0,
1505
            source_account=source_account,
1506
            source_version=source_version,
1507
            public=public,
1508
            content_type=content_type,
1509
            delimiter=delimiter)
1510
        return r.headers
1511

    
1512
    def get_sharing_accounts(self, limit=None, marker=None, *args, **kwargs):
1513
        """Get accounts that share with self.account
1514

1515
        :param limit: (str)
1516

1517
        :param marker: (str)
1518

1519
        :returns: (dict)
1520
        """
1521
        self._assert_account()
1522

    
1523
        self.set_param('format', 'json')
1524
        self.set_param('limit', limit, iff=limit is not None)
1525
        self.set_param('marker', marker, iff=marker is not None)
1526

    
1527
        path = ''
1528
        success = kwargs.pop('success', (200, 204))
1529
        r = self.get(path, *args, success=success, **kwargs)
1530
        return r.json
1531

    
1532
    def get_object_versionlist(self, obj):
1533
        """
1534
        :param obj: (str) remote object path
1535

1536
        :returns: (list)
1537
        """
1538
        self._assert_container()
1539
        r = self.object_get(obj, format='json', version='list')
1540
        return r.json['versions']