Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ 9a7efb0d

History | View | Annotate | Download (24.3 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import gevent
35
import gevent.monkey
36
# Monkey-patch everything for gevent early on
37
gevent.monkey.patch_all()
38
import gevent.pool
39

    
40
from os import fstat, path
41
from hashlib import new as newhashlib
42
from time import time, sleep
43
from datetime import datetime
44
import sys
45

    
46
from binascii import hexlify
47
from .pithos_sh_lib.hashmap import HashMap
48

    
49
from .pithos_rest_api import PithosRestAPI
50
from .storage import ClientError
51
from .utils import path4url, filter_in
52

    
53
def pithos_hash(block, blockhash):
54
    h = newhashlib(blockhash)
55
    h.update(block.rstrip('\x00'))
56
    return h.hexdigest()
57

    
58
class PithosClient(PithosRestAPI):
59
    """GRNet Pithos API client"""
60

    
61
    def __init__(self, base_url, token, account=None, container = None):
62
        super(PithosClient, self).__init__(base_url, token, account = account,
63
            container = container)
64
        self.async_pool = None
65

    
66
    def purge_container(self):
67
        self.container_delete(until=unicode(time()))
68
        
69
    def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
70
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
71
        public=None):
72
        # This is a naive implementation, it loads the whole file in memory
73
        #Look in pithos for a nice implementation
74
        self.assert_container()
75

    
76
        if withHashFile:
77
            data = f.read()
78
            try:
79
                import json
80
                data = json.dumps(json.loads(data))
81
            except ValueError:
82
                raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
83
            except SyntaxError:
84
                raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
85
            from StringIO import StringIO
86
            f = StringIO(data)
87
        data = f.read(size) if size is not None else f.read()
88
        self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
89
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
90
            public=public, success=201)
91
        
92
    def put_block_async(self, data, hash):
93
        class SilentGreenlet(gevent.Greenlet):
94
            def _report_error(self, exc_info):
95
                _stderr = None
96
                try:
97
                    _stderr = sys._stderr
98
                    sys.stderr = StringIO()
99
                    gevent.Greenlet._report_error(self, exc_info)
100
                finally:
101
                    sys.stderr = _stderr
102
        POOL_SIZE = 5
103
        if self.async_pool is None:
104
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
105
        g = SilentGreenlet(self.put_block, data, hash)
106
        self.async_pool.start(g)
107
        return g
108

    
109
    def put_block(self, data, hash):
110
        r = self.container_post(update=True, content_type='application/octet-stream',
111
            content_length=len(data), data=data, format='json')
112
        assert r.json[0] == hash, 'Local hash does not match server'
113
        
114

    
115
    def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
116
        content_disposition=None, content_type=None, sharing=None, public=None):
117
        self.assert_container()
118
        obj_content_type = 'application/octet-stream' if content_type is None else content_type
119
        self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
120
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
121
            public=public, manifest='%s/%s'%(self.container,obj))
122
       
123
    #upload_* auxiliary methods 
124
    def _get_file_block_info(self, fileobj, size=None):
125
        meta = self.get_container_info()
126
        blocksize = int(meta['x-container-block-size'])
127
        blockhash = meta['x-container-block-hash']
128
        size = size if size is not None else fstat(fileobj.fileno()).st_size
129
        nblocks = 1 + (size - 1) // blocksize
130
        return (blocksize, blockhash, size, nblocks)
131

    
132
    def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
133
        content_type=None, etag=None, content_encoding=None, content_disposition=None,
134
        permitions=None, public=None, success=(201, 409)):
135
        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
136
            json=json, etag=etag, content_encoding=content_encoding,
137
            content_disposition=content_disposition, permitions=permitions, public=public,
138
            success=success)
139
        if r.status_code == 201:
140
            return None
141
        return r.json
142

    
143
    def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
144
        hash_cb=None):
145
        offset=0
146
        if hash_cb:
147
            hash_gen = hash_cb(nblocks)
148
            hash_gen.next()
149

    
150
        for i in range(nblocks):
151
            block = fileobj.read(min(blocksize, size - offset))
152
            bytes = len(block)
153
            hash = pithos_hash(block, blockhash)
154
            hashes.append(hash)
155
            hmap[hash] = (offset, bytes)
156
            offset += bytes
157
            if hash_cb:
158
                hash_gen.next()
159
        assert offset == size
160

    
161
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
162
        """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
163
        """
164
        if upload_cb:
165
            upload_gen = upload_cb(len(missing))
166
            upload_gen.next()
167

    
168
        flying = []
169
        for hash in missing:
170
            offset, bytes = hmap[hash]
171
            fileobj.seek(offset)
172
            data = fileobj.read(bytes)
173
            r = self.put_block_async(data, hash)
174
            flying.append(r)
175
            for r in flying:
176
                if r.ready():
177
                    if r.exception:
178
                        raise r.exception
179
                    if upload_cb:
180
                        upload_gen.next()
181
            flying = [r for r in flying if not r.ready()]
182
        while upload_cb:
183
            try:
184
                upload_gen.next()
185
            except StopIteration:
186
                break
187
        gevent.joinall(flying)
188

    
189
    def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
190
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
191
        public=None):
192
        self.assert_container()
193

    
194
        #init
195
        block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
196
        (hashes, hmap, offset) = ([], {}, 0)
197
        content_type = 'application/octet-stream' if content_type is None else content_type
198

    
199
        self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
200
            hash_cb=hash_cb)
201

    
202
        hashmap = dict(bytes=size, hashes=hashes)
203
        missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
204
            etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
205
            permitions=sharing, public=public)
206

    
207
        if missing is None:
208
            return
209
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
210

    
211
        self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
212
            json=hashmap, success=201)
213
    
214
    #download_* auxiliary methods
215
    def _get_remote_blocks_info(self, obj, **restargs):
216
        #retrieve object hashmap
217
        hashmap = self.get_object_hashmap(obj, **restargs)
218
        blocksize = int(hashmap['block_size'])
219
        blockhash = hashmap['block_hash']
220
        total_size = hashmap['bytes']
221
        print('total_size:%s, blocksize:%s, x/y:%s, len:%s'%(total_size, blocksize,
222
            total_size/blocksize + 1, len(hashmap['hashes'])))
223
        #assert total_size/blocksize + 1 == len(hashmap['hashes'])
224
        map_dict = {}
225
        for i, h in enumerate(hashmap['hashes']):
226
            map_dict[h] = i
227
        return (blocksize, blockhash, total_size, hashmap['hashes'], map_dict)
228

    
229
    def _dump_blocks_sync(self, obj, remote_hashes, blocksize, total_size, dst, **restargs):
230
        for blockid, blockhash in enumerate(remote_hashes):
231
            if blockhash == None:
232
                continue
233
            start = blocksize*blockid
234
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
235
            restargs['data_range'] = 'bytes=%s-%s'%(start, end)
236
            r = self.object_get(obj, success=(200, 206), **restargs)
237
            self._cb_next()
238
            dst.write(r.content)
239
            dst.flush()
240

    
241
    def _filter_out_downloaded_hashses(self, remote_hashes, hash_list, local_file, blocksize,
242
        blockhash):
243
        #load file hashmap
244
        file_hashmap = HashMap(blocksize, blockhash)
245
        file_hashmap.load(local_file, hasattr(self, 'progress_bar_gen'))
246

    
247
        for i, x in enumerate(file_hashmap):
248
            local_hash = hexlify(x)
249
            if local_hash in remote_hashes:
250
                blockid = remote_hashes.pop(local_hash)
251
                hash_list[blockid] = None
252
                self._cb_next()
253
            else:
254
                raise ClientError(message='Local file is substantialy different', status=600)
255

    
256
    def _get_block_async(self, obj, **restargs):
257
        class SilentGreenlet(gevent.Greenlet):
258
            def _report_error(self, exc_info):
259
                _stderr = sys._stderr
260
                try:
261
                    sys.stderr = StringIO()
262
                    gevent.Greenlet._report_error(self, exc_info)
263
                finally:
264
                    sys.stderr = _stderr
265
        if not hasattr(self, 'POOL_SIZE'):
266
            self.POOL_SIZE = 5
267
        if self.async_pool is None:
268
            self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
269
        g = SilentGreenlet(self.object_get, obj, success=(200, 206), **restargs)
270
        self.async_pool.start(g)
271
        return g
272

    
273
    def _greenlet2file(self, flying_greenlets, local_file, broken={}, **restargs):
274
        finished = []
275
        for start, g in flying_greenlets.items():
276
            print('\tIs g ID(%s) ready? %s'%(self.mmaapp[start], g.ready()))
277
            if g.ready():
278
                if g.exception:
279
                    raise g.exception
280
                try:
281
                    block = g.value.content
282
                except AttributeError:
283
                    broken[start] = flying_greenlets.pop(start)
284
                    #g.spawn()
285
                    continue
286
                local_file.seek(start)
287
                print('\tID(%s) [%s...]\n\tg.value:%s\n\tg:%s\n'%(self.mmaapp[start], block[1:10],
288
                    g.value, g))
289
                print('\tID(%s): g.value.request: %s\n---'%(self.mmaapp[start], g.value.request))
290
                local_file.write(block)
291
                #local_file.flush()
292
                self._cb_next()
293
                finished.append(flying_greenlets.pop(start))
294
        local_file.flush()
295
        return finished
296

    
297
    def _dump_blocks_async(self, obj, remote_hashes, blocksize, total_size, local_file, **restargs):
298
        flying_greenlets = {}
299
        finished_greenlets = []
300
        broken = {}
301
        self.mmaapp = {}
302
        for block_hash, blockid in remote_hashes.items():
303
            if len(flying_greenlets) >= self.POOL_SIZE:
304
                finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
305
                    **restargs)
306
            start = blocksize*blockid
307
            self.mmaapp[start] = blockid
308
            end = total_size-1 if start+blocksize > total_size else start+blocksize-1
309
            restargs['async_headers'] = dict(data_range='bytes=%s-%s'%(start, end))
310
            print('ID(%s) get_grnlt {'%blockid)
311
            flying_greenlets[start] = self._get_block_async(obj, **restargs)
312
            print('ID(%s) got_grnlt }'%blockid)
313

    
314
        #check the greenlets
315
        while len(flying_greenlets) > 0:
316
            sleep(0.1)
317
            finished_greenlets += self._greenlet2file(flying_greenlets, local_file, broken,
318
                **restargs)
319

    
320
        gevent.joinall(finished_greenlets)
321

    
322

    
323
    def download_object(self, obj, dst, download_cb=None, version=None, overide=False, resume=False,
324
        range=None, if_match=None, if_none_match=None, if_modified_since=None,
325
        if_unmodified_since=None):
326

    
327
        #init REST api args
328
        restargs=dict(version=version,
329
            data_range = None if range is None else 'bytes=%s'%range,
330
            if_match=if_match,
331
            if_none_match=if_none_match,
332
            if_modified_since=if_modified_since,
333
            if_unmodified_since=if_unmodified_since)
334

    
335
        #1. get remote object hash info
336
        (   blocksize,
337
            blockhash,
338
            total_size,
339
            hash_list, 
340
            remote_hashes) = self._get_remote_blocks_info(obj, **restargs)
341
        assert total_size >= 0
342
        self.POOL_SIZE = 5
343

    
344
        if download_cb:
345
            self.progress_bar_gen = download_cb(len(remote_hashes)+1)
346
            self._cb_next()
347

    
348
        if dst.isatty():
349
            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
350
        elif resume:
351
            self._filter_out_downloaded_hashses(remote_hashes, hash_list, dst, blocksize, blockhash)
352
            self._dump_blocks_sync(obj, hash_list, blocksize, total_size, dst, **restargs)
353
        else:
354
            self._dump_blocks_async(obj, remote_hashes, blocksize, total_size, dst, **restargs)
355
            dst.truncate(total_size)
356

    
357
        self._complete_cb()
358

    
359
    #Command Progress Bar method
360
    def _cb_next(self):
361
        if hasattr(self, 'progress_bar_gen'):
362
            try:
363
                self.progress_bar_gen.next()
364
            except:
365
                pass
366
    def _complete_cb(self):
367
        while True:
368
            try:
369
                self.progress_bar_gen.next()
370
            except:
371
                break
372

    
373
    def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
374
        if_modified_since=None, if_unmodified_since=None, data_range=None):
375
        try:
376
            r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
377
                if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
378
                if_unmodified_since=if_unmodified_since, data_range=data_range)
379
        except ClientError as err:
380
            if err.status == 304 or err.status == 412:
381
                return {}
382
            raise
383
        return r.json
384

    
385
    def set_account_group(self, group, usernames):
386
        self.account_post(update=True, groups = {group:usernames})
387

    
388
    def del_account_group(self, group):
389
        self.account_post(update=True, groups={group:[]})
390

    
391
    def get_account_info(self, until=None):
392
        r = self.account_head(until=until)
393
        if r.status_code == 401:
394
            raise ClientError("No authorization")
395
        return r.headers
396

    
397
    def get_account_quota(self):
398
        return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
399

    
400
    def get_account_versioning(self):
401
        return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
402

    
403
    def get_account_meta(self, until=None):
404
        return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
405

    
406
    def get_account_group(self):
407
        return filter_in(self.get_account_info(), 'X-Account-Group-')
408

    
409
    def set_account_meta(self, metapairs):
410
        assert(type(metapairs) is dict)
411
        self.account_post(update=True, metadata=metapairs)
412

    
413
    def del_account_meta(self, metakey):
414
        self.account_post(update=True, metadata={metakey:''})
415

    
416
    def set_account_quota(self, quota):
417
        self.account_post(update=True, quota=quota)
418

    
419
    def set_account_versioning(self, versioning):
420
        self.account_post(update=True, versioning = versioning)
421

    
422
    def list_containers(self):
423
        r = self.account_get()
424
        return r.json
425

    
426
    def del_container(self, until=None, delimiter=None):
427
        self.assert_container()
428
        r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
429
        if r.status_code == 404:
430
            raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
431
        elif r.status_code == 409:
432
            raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
433

    
434
    def get_container_versioning(self, container):
435
        self.container = container
436
        return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
437

    
438
    def get_container_quota(self, container):
439
        self.container = container
440
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
441

    
442
    def get_container_info(self, until = None):
443
        r = self.container_head(until=until)
444
        return r.headers
445

    
446
    def get_container_meta(self, until = None):
447
        return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
448

    
449
    def get_container_object_meta(self, until = None):
450
        return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
451

    
452
    def set_container_meta(self, metapairs):
453
        assert(type(metapairs) is dict)
454
        self.container_post(update=True, metadata=metapairs)
455
        
456
    def del_container_meta(self, metakey):
457
        self.container_post(update=True, metadata={metakey:''})
458

    
459
    def set_container_quota(self, quota):
460
        self.container_post(update=True, quota=quota)
461

    
462
    def set_container_versioning(self, versioning):
463
        self.container_post(update=True, versioning=versioning)
464

    
465
    def del_object(self, obj, until=None, delimiter=None):
466
        self.assert_container()
467
        self.object_delete(obj, until=until, delimiter=delimiter)
468

    
469
    def set_object_meta(self, object, metapairs):
470
        assert(type(metapairs) is dict)
471
        self.object_post(object, update=True, metadata=metapairs)
472

    
473
    def del_object_meta(self, metakey, object):
474
        self.object_post(object, update=True, metadata={metakey:''})
475

    
476
    def publish_object(self, object):
477
        self.object_post(object, update=True, public=True)
478

    
479
    def unpublish_object(self, object):
480
        self.object_post(object, update=True, public=False)
481

    
482
    def get_object_info(self, obj, version=None):
483
        r = self.object_head(obj, version=version)
484
        return r.headers
485

    
486
    def get_object_meta(self, obj, version=None):
487
        return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
488

    
489
    def get_object_sharing(self, object):
490
        r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
491
        reply = {}
492
        if len(r) > 0:
493
            perms = r['x-object-sharing'].split(';')
494
            for perm in perms:
495
                try:
496
                    perm.index('=')
497
                except ValueError:
498
                    raise ClientError('Incorrect reply format')
499
                (key, val) = perm.strip().split('=')
500
                reply[key] = val
501
        return reply
502

    
503
    def set_object_sharing(self, object, read_permition = False, write_permition = False):
504
        """Give read/write permisions to an object.
505
           @param object is the object to change sharing permitions onto
506
           @param read_permition is a list of users and user groups that get read permition for this object
507
                False means all previous read permitions will be removed
508
           @param write_perimition is a list of users and user groups to get write permition for this object
509
                False means all previous read permitions will be removed
510
        """
511
        perms = {}
512
        perms['read'] = read_permition if isinstance(read_permition, list) else ''
513
        perms['write'] = write_permition if isinstance(write_permition, list) else ''
514
        self.object_post(object, update=True, permitions=perms)
515

    
516
    def del_object_sharing(self, object):
517
        self.set_object_sharing(object)
518

    
519
    def append_object(self, object, source_file, upload_cb = None):
520
        """@param upload_db is a generator for showing progress of upload
521
            to caller application, e.g. a progress bar. Its next is called
522
            whenever a block is uploaded
523
        """
524
        self.assert_container()
525
        meta = self.get_container_info()
526
        blocksize = int(meta['x-container-block-size'])
527
        filesize = fstat(source_file.fileno()).st_size
528
        nblocks = 1 + (filesize - 1)//blocksize
529
        offset = 0
530
        if upload_cb is not None:
531
            upload_gen = upload_cb(nblocks)
532
        for i in range(nblocks):
533
            block = source_file.read(min(blocksize, filesize - offset))
534
            offset += len(block)
535
            self.object_post(object, update=True, content_range='bytes */*',
536
                content_type='application/octet-stream', content_length=len(block), data=block)
537
            
538
            if upload_cb is not None:
539
                upload_gen.next()
540

    
541
    def truncate_object(self, object, upto_bytes):
542
        self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
543
            content_type='application/octet-stream', object_bytes=upto_bytes,
544
            source_object=path4url(self.container, object))
545

    
546
    def overwrite_object(self, object, start, end, source_file, upload_cb=None):
547
        """Overwrite a part of an object with given source file
548
           @start the part of the remote object to start overwriting from, in bytes
549
           @end the part of the remote object to stop overwriting to, in bytes
550
        """
551
        self.assert_container()
552
        meta = self.get_container_info()
553
        blocksize = int(meta['x-container-block-size'])
554
        filesize = fstat(source_file.fileno()).st_size
555
        datasize = int(end) - int(start) + 1
556
        nblocks = 1 + (datasize - 1)//blocksize
557
        offset = 0
558
        if upload_cb is not None:
559
            upload_gen = upload_cb(nblocks)
560
        for i in range(nblocks):
561
            block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
562
            offset += len(block)
563
            self.object_post(object, update=True, content_type='application/octet-stream', 
564
                content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
565
            
566
            if upload_cb is not None:
567
                upload_gen.next()
568

    
569
    def copy_object(self, src_container, src_object, dst_container, dst_object=False,
570
        source_version = None, public=False, content_type=None, delimiter=None):
571
        self.assert_account()
572
        self.container = dst_container
573
        dst_object = dst_object or src_object
574
        src_path = path4url(src_container, src_object)
575
        self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
576
            source_version=source_version, public=public, content_type=content_type,
577
            delimiter=delimiter)
578

    
579
    def move_object(self, src_container, src_object, dst_container, dst_object=False,
580
        source_version = None, public=False, content_type=None, delimiter=None):
581
        self.assert_account()
582
        self.container = dst_container
583
        dst_object = dst_object or src_object
584
        src_path = path4url(src_container, src_object)
585
        self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
586
            source_version=source_version, public=public, content_type=content_type,
587
            delimiter=delimiter)