Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ afd9d603

History | View | Annotate | Download (24.4 kB)

1 0c6d7489 Giorgos Verigakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 d2cea1e2 Giorgos Verigakis
#
3 d2cea1e2 Giorgos Verigakis
# Redistribution and use in source and binary forms, with or
4 d2cea1e2 Giorgos Verigakis
# without modification, are permitted provided that the following
5 d2cea1e2 Giorgos Verigakis
# conditions are met:
6 d2cea1e2 Giorgos Verigakis
#
7 d2cea1e2 Giorgos Verigakis
#   1. Redistributions of source code must retain the above
8 d2cea1e2 Giorgos Verigakis
#      copyright notice, this list of conditions and the following
9 d2cea1e2 Giorgos Verigakis
#      disclaimer.
10 d2cea1e2 Giorgos Verigakis
#
11 d2cea1e2 Giorgos Verigakis
#   2. Redistributions in binary form must reproduce the above
12 d2cea1e2 Giorgos Verigakis
#      copyright notice, this list of conditions and the following
13 d2cea1e2 Giorgos Verigakis
#      disclaimer in the documentation and/or other materials
14 d2cea1e2 Giorgos Verigakis
#      provided with the distribution.
15 d2cea1e2 Giorgos Verigakis
#
16 d2cea1e2 Giorgos Verigakis
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 d2cea1e2 Giorgos Verigakis
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 d2cea1e2 Giorgos Verigakis
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 d2cea1e2 Giorgos Verigakis
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 d2cea1e2 Giorgos Verigakis
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 d2cea1e2 Giorgos Verigakis
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 d2cea1e2 Giorgos Verigakis
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 d2cea1e2 Giorgos Verigakis
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 d2cea1e2 Giorgos Verigakis
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 d2cea1e2 Giorgos Verigakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 d2cea1e2 Giorgos Verigakis
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 d2cea1e2 Giorgos Verigakis
# POSSIBILITY OF SUCH DAMAGE.
28 d2cea1e2 Giorgos Verigakis
#
29 d2cea1e2 Giorgos Verigakis
# The views and conclusions contained in the software and
30 d2cea1e2 Giorgos Verigakis
# documentation are those of the authors and should not be
31 d2cea1e2 Giorgos Verigakis
# interpreted as representing official policies, either expressed
32 d2cea1e2 Giorgos Verigakis
# or implied, of GRNET S.A.
33 d2cea1e2 Giorgos Verigakis
34 435008b6 Stavros Sachtouris
import gevent
35 435008b6 Stavros Sachtouris
import gevent.monkey
36 435008b6 Stavros Sachtouris
# Monkey-patch everything for gevent early on
37 435008b6 Stavros Sachtouris
gevent.monkey.patch_all()
38 64ab4c13 Stavros Sachtouris
import gevent.pool
39 435008b6 Stavros Sachtouris
40 64ab4c13 Stavros Sachtouris
from os import fstat, path
41 64ab4c13 Stavros Sachtouris
from hashlib import new as newhashlib
42 a91e0293 Giorgos Verigakis
from time import time
43 5b263ba2 Stavros Sachtouris
from datetime import datetime
44 f0598cb2 Stavros Sachtouris
import sys
45 a91e0293 Giorgos Verigakis
46 64ab4c13 Stavros Sachtouris
from binascii import hexlify
47 64ab4c13 Stavros Sachtouris
from .pithos_sh_lib.hashmap import HashMap
48 6a0b1658 Giorgos Verigakis
49 64ab4c13 Stavros Sachtouris
from .pithos_rest_api import PithosRestAPI
50 64ab4c13 Stavros Sachtouris
from .storage import ClientError
51 64ab4c13 Stavros Sachtouris
from .utils import path4url, filter_in
52 6a0b1658 Giorgos Verigakis
53 6a0b1658 Giorgos Verigakis
def pithos_hash(block, blockhash):
54 64ab4c13 Stavros Sachtouris
    h = newhashlib(blockhash)
55 6a0b1658 Giorgos Verigakis
    h.update(block.rstrip('\x00'))
56 6a0b1658 Giorgos Verigakis
    return h.hexdigest()
57 6a0b1658 Giorgos Verigakis
58 64ab4c13 Stavros Sachtouris
class PithosClient(PithosRestAPI):
59 d2cea1e2 Giorgos Verigakis
    """GRNet Pithos API client"""
60 a91e0293 Giorgos Verigakis
61 435008b6 Stavros Sachtouris
    def __init__(self, base_url, token, account=None, container = None):
62 64ab4c13 Stavros Sachtouris
        super(PithosClient, self).__init__(base_url, token, account = account,
63 64ab4c13 Stavros Sachtouris
            container = container)
64 435008b6 Stavros Sachtouris
        self.async_pool = None
65 435008b6 Stavros Sachtouris
66 17edd3f4 Stavros Sachtouris
    def purge_container(self):
67 64ab4c13 Stavros Sachtouris
        self.container_delete(until=unicode(time()))
68 f0598cb2 Stavros Sachtouris
        
69 65a45524 Stavros Sachtouris
    def upload_object_unchunked(self, obj, f, withHashFile = False, size=None, etag=None,
70 65a45524 Stavros Sachtouris
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
71 65a45524 Stavros Sachtouris
        public=None):
72 65a45524 Stavros Sachtouris
        # This is a naive implementation, it loads the whole file in memory
73 65a45524 Stavros Sachtouris
        #Look in pithos for a nice implementation
74 65a45524 Stavros Sachtouris
        self.assert_container()
75 65a45524 Stavros Sachtouris
76 65a45524 Stavros Sachtouris
        if withHashFile:
77 65a45524 Stavros Sachtouris
            data = f.read()
78 65a45524 Stavros Sachtouris
            try:
79 65a45524 Stavros Sachtouris
                import json
80 65a45524 Stavros Sachtouris
                data = json.dumps(json.loads(data))
81 65a45524 Stavros Sachtouris
            except ValueError:
82 65a45524 Stavros Sachtouris
                raise ClientError(message='"%s" is not json-formated'%f.name, status=1)
83 65a45524 Stavros Sachtouris
            except SyntaxError:
84 65a45524 Stavros Sachtouris
                raise ClientError(message='"%s" is not a valid hashmap file'%f.name, status=1)
85 65a45524 Stavros Sachtouris
            from StringIO import StringIO
86 65a45524 Stavros Sachtouris
            f = StringIO(data)
87 65a45524 Stavros Sachtouris
        data = f.read(size) if size is not None else f.read()
88 64ab4c13 Stavros Sachtouris
        self.object_put(obj, data=data, etag=etag, content_encoding=content_encoding,
89 65a45524 Stavros Sachtouris
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
90 65a45524 Stavros Sachtouris
            public=public, success=201)
91 f0598cb2 Stavros Sachtouris
        
92 435008b6 Stavros Sachtouris
    def put_block_async(self, data, hash):
93 435008b6 Stavros Sachtouris
        class SilentGreenlet(gevent.Greenlet):
94 435008b6 Stavros Sachtouris
            def _report_error(self, exc_info):
95 64ab4c13 Stavros Sachtouris
                _stderr = None
96 435008b6 Stavros Sachtouris
                try:
97 64ab4c13 Stavros Sachtouris
                    _stderr = sys._stderr
98 435008b6 Stavros Sachtouris
                    sys.stderr = StringIO()
99 435008b6 Stavros Sachtouris
                    gevent.Greenlet._report_error(self, exc_info)
100 435008b6 Stavros Sachtouris
                finally:
101 435008b6 Stavros Sachtouris
                    sys.stderr = _stderr
102 435008b6 Stavros Sachtouris
        POOL_SIZE = 5
103 435008b6 Stavros Sachtouris
        if self.async_pool is None:
104 435008b6 Stavros Sachtouris
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
105 435008b6 Stavros Sachtouris
        g = SilentGreenlet(self.put_block, data, hash)
106 435008b6 Stavros Sachtouris
        self.async_pool.start(g)
107 435008b6 Stavros Sachtouris
        return g
108 435008b6 Stavros Sachtouris
109 53129af9 Giorgos Verigakis
    def put_block(self, data, hash):
110 4adfa919 Stavros Sachtouris
        r = self.container_post(update=True, content_type='application/octet-stream',
111 c2236544 Stavros Sachtouris
            content_length=len(data), data=data, format='json')
112 c2236544 Stavros Sachtouris
        assert r.json[0] == hash, 'Local hash does not match server'
113 f0598cb2 Stavros Sachtouris
        
114 44b8928d Giorgos Verigakis
115 65a45524 Stavros Sachtouris
    def create_object_by_manifestation(self, obj, etag=None, content_encoding=None,
116 65a45524 Stavros Sachtouris
        content_disposition=None, content_type=None, sharing=None, public=None):
117 65a45524 Stavros Sachtouris
        self.assert_container()
118 65a45524 Stavros Sachtouris
        obj_content_type = 'application/octet-stream' if content_type is None else content_type
119 64ab4c13 Stavros Sachtouris
        self.object_put(obj, content_length=0, etag=etag, content_encoding=content_encoding,
120 65a45524 Stavros Sachtouris
            content_disposition=content_disposition, content_type=content_type, permitions=sharing,
121 65a45524 Stavros Sachtouris
            public=public, manifest='%s/%s'%(self.container,obj))
122 64ab4c13 Stavros Sachtouris
       
123 64ab4c13 Stavros Sachtouris
    #upload_* auxiliary methods 
124 64ab4c13 Stavros Sachtouris
    def _get_file_block_info(self, fileobj, size=None):
125 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
126 435008b6 Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
127 435008b6 Stavros Sachtouris
        blockhash = meta['x-container-block-hash']
128 64ab4c13 Stavros Sachtouris
        size = size if size is not None else fstat(fileobj.fileno()).st_size
129 435008b6 Stavros Sachtouris
        nblocks = 1 + (size - 1) // blocksize
130 64ab4c13 Stavros Sachtouris
        return (blocksize, blockhash, size, nblocks)
131 64ab4c13 Stavros Sachtouris
132 64ab4c13 Stavros Sachtouris
    def _get_missing_hashes(self, obj, json, size=None, format='json', hashmap=True,
133 64ab4c13 Stavros Sachtouris
        content_type=None, etag=None, content_encoding=None, content_disposition=None,
134 64ab4c13 Stavros Sachtouris
        permitions=None, public=None, success=(201, 409)):
135 64ab4c13 Stavros Sachtouris
        r = self.object_put(obj, format='json', hashmap=True, content_type=content_type,
136 64ab4c13 Stavros Sachtouris
            json=json, etag=etag, content_encoding=content_encoding,
137 64ab4c13 Stavros Sachtouris
            content_disposition=content_disposition, permitions=permitions, public=public,
138 64ab4c13 Stavros Sachtouris
            success=success)
139 64ab4c13 Stavros Sachtouris
        if r.status_code == 201:
140 64ab4c13 Stavros Sachtouris
            return None
141 64ab4c13 Stavros Sachtouris
        return r.json
142 435008b6 Stavros Sachtouris
143 64ab4c13 Stavros Sachtouris
    def _caclulate_uploaded_blocks(self, blocksize, blockhash, size, nblocks, hashes, hmap, fileobj,
144 64ab4c13 Stavros Sachtouris
        hash_cb=None):
145 64ab4c13 Stavros Sachtouris
        offset=0
146 435008b6 Stavros Sachtouris
        if hash_cb:
147 435008b6 Stavros Sachtouris
            hash_gen = hash_cb(nblocks)
148 435008b6 Stavros Sachtouris
            hash_gen.next()
149 435008b6 Stavros Sachtouris
150 435008b6 Stavros Sachtouris
        for i in range(nblocks):
151 64ab4c13 Stavros Sachtouris
            block = fileobj.read(min(blocksize, size - offset))
152 435008b6 Stavros Sachtouris
            bytes = len(block)
153 435008b6 Stavros Sachtouris
            hash = pithos_hash(block, blockhash)
154 435008b6 Stavros Sachtouris
            hashes.append(hash)
155 5b263ba2 Stavros Sachtouris
            hmap[hash] = (offset, bytes)
156 435008b6 Stavros Sachtouris
            offset += bytes
157 435008b6 Stavros Sachtouris
            if hash_cb:
158 435008b6 Stavros Sachtouris
                hash_gen.next()
159 435008b6 Stavros Sachtouris
        assert offset == size
160 435008b6 Stavros Sachtouris
161 64ab4c13 Stavros Sachtouris
    def _upload_missing_blocks(self, missing, hmap, fileobj, upload_cb=None):
162 64ab4c13 Stavros Sachtouris
        """upload missing blocks asynchronously in a pseudo-parallel fashion (greenlets)
163 64ab4c13 Stavros Sachtouris
        """
164 435008b6 Stavros Sachtouris
        if upload_cb:
165 435008b6 Stavros Sachtouris
            upload_gen = upload_cb(len(missing))
166 435008b6 Stavros Sachtouris
            upload_gen.next()
167 435008b6 Stavros Sachtouris
168 435008b6 Stavros Sachtouris
        flying = []
169 435008b6 Stavros Sachtouris
        for hash in missing:
170 5b263ba2 Stavros Sachtouris
            offset, bytes = hmap[hash]
171 64ab4c13 Stavros Sachtouris
            fileobj.seek(offset)
172 64ab4c13 Stavros Sachtouris
            data = fileobj.read(bytes)
173 435008b6 Stavros Sachtouris
            r = self.put_block_async(data, hash)
174 435008b6 Stavros Sachtouris
            flying.append(r)
175 435008b6 Stavros Sachtouris
            for r in flying:
176 435008b6 Stavros Sachtouris
                if r.ready():
177 435008b6 Stavros Sachtouris
                    if r.exception:
178 435008b6 Stavros Sachtouris
                        raise r.exception
179 435008b6 Stavros Sachtouris
                    if upload_cb:
180 435008b6 Stavros Sachtouris
                        upload_gen.next()
181 435008b6 Stavros Sachtouris
            flying = [r for r in flying if not r.ready()]
182 64ab4c13 Stavros Sachtouris
        while upload_cb:
183 64ab4c13 Stavros Sachtouris
            try:
184 64ab4c13 Stavros Sachtouris
                upload_gen.next()
185 64ab4c13 Stavros Sachtouris
            except StopIteration:
186 64ab4c13 Stavros Sachtouris
                break
187 435008b6 Stavros Sachtouris
        gevent.joinall(flying)
188 56f0908a Stavros Sachtouris
189 64ab4c13 Stavros Sachtouris
    def upload_object(self, obj, f, size=None, hash_cb=None, upload_cb=None, etag=None,
190 64ab4c13 Stavros Sachtouris
        content_encoding=None, content_disposition=None, content_type=None, sharing=None,
191 64ab4c13 Stavros Sachtouris
        public=None):
192 56f0908a Stavros Sachtouris
        self.assert_container()
193 56f0908a Stavros Sachtouris
194 64ab4c13 Stavros Sachtouris
        #init
195 64ab4c13 Stavros Sachtouris
        block_info = (blocksize, blockhash, size, nblocks) = self._get_file_block_info(f, size)
196 64ab4c13 Stavros Sachtouris
        (hashes, hmap, offset) = ([], {}, 0)
197 64ab4c13 Stavros Sachtouris
        content_type = 'application/octet-stream' if content_type is None else content_type
198 64ab4c13 Stavros Sachtouris
199 64ab4c13 Stavros Sachtouris
        self._caclulate_uploaded_blocks(*block_info, hashes=hashes, hmap=hmap, fileobj=f,
200 64ab4c13 Stavros Sachtouris
            hash_cb=hash_cb)
201 64ab4c13 Stavros Sachtouris
202 64ab4c13 Stavros Sachtouris
        hashmap = dict(bytes=size, hashes=hashes)
203 64ab4c13 Stavros Sachtouris
        missing = self._get_missing_hashes(obj, hashmap, content_type=content_type, size=size,
204 64ab4c13 Stavros Sachtouris
            etag=etag, content_encoding=content_encoding, content_disposition=content_disposition,
205 64ab4c13 Stavros Sachtouris
            permitions=sharing, public=public)
206 64ab4c13 Stavros Sachtouris
207 64ab4c13 Stavros Sachtouris
        if missing is None:
208 64ab4c13 Stavros Sachtouris
            return
209 64ab4c13 Stavros Sachtouris
        self._upload_missing_blocks(missing, hmap, f, upload_cb=upload_cb)
210 64ab4c13 Stavros Sachtouris
211 64ab4c13 Stavros Sachtouris
        self.object_put(obj, format='json', hashmap=True, content_type=content_type, 
212 64ab4c13 Stavros Sachtouris
            json=hashmap, success=201)
213 64ab4c13 Stavros Sachtouris
      
214 64ab4c13 Stavros Sachtouris
    #download_* auxiliary methods
215 64ab4c13 Stavros Sachtouris
    def _get_object_block_info(self,obj, **kwargs):
216 56f0908a Stavros Sachtouris
        #retrieve object hashmap
217 64ab4c13 Stavros Sachtouris
        hashmap = self.get_object_hashmap(obj, **kwargs)
218 56f0908a Stavros Sachtouris
        blocksize = int(hashmap['block_size'])
219 56f0908a Stavros Sachtouris
        blockhash = hashmap['block_hash']
220 56f0908a Stavros Sachtouris
        total_size = hashmap['bytes']
221 64ab4c13 Stavros Sachtouris
        hmap = hashmap['hashes']
222 56f0908a Stavros Sachtouris
        map_dict = {}
223 64ab4c13 Stavros Sachtouris
        for h in hmap:
224 56f0908a Stavros Sachtouris
            map_dict[h] = True
225 64ab4c13 Stavros Sachtouris
        return (blocksize, blockhash, total_size, hmap, map_dict)
226 64ab4c13 Stavros Sachtouris
227 64ab4c13 Stavros Sachtouris
    def _get_range_limits(self, range):
228 64ab4c13 Stavros Sachtouris
        try:
229 64ab4c13 Stavros Sachtouris
            (custom_start, custom_end) = range.split('-')
230 64ab4c13 Stavros Sachtouris
            (custom_start, custom_end) = (int(custom_start), int(custom_end))
231 64ab4c13 Stavros Sachtouris
        except ValueError:
232 64ab4c13 Stavros Sachtouris
            raise ClientError(message='Invalid range string', status=601)
233 64ab4c13 Stavros Sachtouris
        if custom_start > custom_end or custom_start < 0:
234 64ab4c13 Stavros Sachtouris
            raise ClientError(message='Negative range', status=601)
235 64ab4c13 Stavros Sachtouris
        elif custom_start == custom_end:
236 64ab4c13 Stavros Sachtouris
            return
237 64ab4c13 Stavros Sachtouris
        elif custom_end > total_size:
238 64ab4c13 Stavros Sachtouris
            raise ClientError(message='Range exceeds file size', status=601)
239 64ab4c13 Stavros Sachtouris
        return (custom_start, custom_end)
240 64ab4c13 Stavros Sachtouris
241 64ab4c13 Stavros Sachtouris
    def _get_downloaded_blocks(self, hmap, fileobj, blocksize, blockhash, map_dict,
242 64ab4c13 Stavros Sachtouris
        overide=False, download_gen=None):
243 64ab4c13 Stavros Sachtouris
        if fileobj.isatty() or not path.exists(fileobj.name):
244 64ab4c13 Stavros Sachtouris
            return {}
245 64ab4c13 Stavros Sachtouris
        h = HashMap(blocksize, blockhash)
246 64ab4c13 Stavros Sachtouris
        with_progress_bar = False if download_gen is None else True
247 64ab4c13 Stavros Sachtouris
        h.load(fileobj, with_progress_bar)
248 64ab4c13 Stavros Sachtouris
        resumed = {}
249 64ab4c13 Stavros Sachtouris
        for i, x in enumerate(h):
250 64ab4c13 Stavros Sachtouris
            existing_hash = hexlify(x)
251 64ab4c13 Stavros Sachtouris
            if existing_hash in map_dict:
252 64ab4c13 Stavros Sachtouris
        #resume if some blocks have been downloaded
253 64ab4c13 Stavros Sachtouris
                resumed[existing_hash] = i
254 64ab4c13 Stavros Sachtouris
                if with_progress_bar:
255 64ab4c13 Stavros Sachtouris
                    download_gen.next()
256 64ab4c13 Stavros Sachtouris
            elif not overide:
257 64ab4c13 Stavros Sachtouris
                raise ClientError(message='Local file is substantialy different',
258 64ab4c13 Stavros Sachtouris
                    status=600)
259 64ab4c13 Stavros Sachtouris
        return resumed
260 64ab4c13 Stavros Sachtouris
261 64ab4c13 Stavros Sachtouris
    def download_object(self, obj, f, download_cb=None, version=None, overide=False, range=None,
262 64ab4c13 Stavros Sachtouris
        if_match=None, if_none_match=None, if_modified_since=None, if_unmodified_since=None):
263 64ab4c13 Stavros Sachtouris
        """overide is forcing the local file to become exactly as the remote, even if it is
264 64ab4c13 Stavros Sachtouris
        substantialy different
265 64ab4c13 Stavros Sachtouris
        """
266 64ab4c13 Stavros Sachtouris
267 64ab4c13 Stavros Sachtouris
        self.assert_container()
268 64ab4c13 Stavros Sachtouris
        islocalfile = False if f.isatty() else True
269 64ab4c13 Stavros Sachtouris
        (blocksize, blockhash, total_size, hmap, map_dict) = self._get_object_block_info(obj,
270 64ab4c13 Stavros Sachtouris
            version=version, if_match=if_match, if_none_match=if_none_match,
271 64ab4c13 Stavros Sachtouris
            if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
272 56f0908a Stavros Sachtouris
273 d804de82 Stavros Sachtouris
        if total_size <= 0:
274 d804de82 Stavros Sachtouris
            return
275 d804de82 Stavros Sachtouris
276 64ab4c13 Stavros Sachtouris
        if range is not None:
277 64ab4c13 Stavros Sachtouris
            (custom_start, custom_end) = self._get_range_limits(range)
278 64ab4c13 Stavros Sachtouris
279 56f0908a Stavros Sachtouris
        #load progress bar
280 56f0908a Stavros Sachtouris
        if download_cb is not None:
281 56f0908a Stavros Sachtouris
            download_gen = download_cb(total_size/blocksize + 1)
282 56f0908a Stavros Sachtouris
            download_gen.next()
283 64ab4c13 Stavros Sachtouris
        resumed = self._get_downloaded_blocks(hmap, f, blocksize, blockhash, map_dict,
284 64ab4c13 Stavros Sachtouris
            overide=overide, download_gen=download_gen)
285 56f0908a Stavros Sachtouris
286 b1713259 Stavros Sachtouris
        #download and save/print
287 5b263ba2 Stavros Sachtouris
        flying = []
288 64ab4c13 Stavros Sachtouris
        for i, h in enumerate(hmap):
289 64ab4c13 Stavros Sachtouris
            if h in resumed:
290 56f0908a Stavros Sachtouris
                continue
291 56f0908a Stavros Sachtouris
            if download_cb is not None:
292 74275b1a Stavros Sachtouris
                try:
293 74275b1a Stavros Sachtouris
                    download_gen.next()
294 74275b1a Stavros Sachtouris
                except StopIteration:
295 74275b1a Stavros Sachtouris
                    pass
296 56f0908a Stavros Sachtouris
            start = i*blocksize
297 d804de82 Stavros Sachtouris
            if range is not None:
298 d804de82 Stavros Sachtouris
                if start < custom_start:
299 d804de82 Stavros Sachtouris
                    start = custom_start
300 d804de82 Stavros Sachtouris
                elif start > custom_end:
301 d804de82 Stavros Sachtouris
                    continue
302 56f0908a Stavros Sachtouris
            end = start + blocksize -1 if start+blocksize < total_size else total_size -1
303 d804de82 Stavros Sachtouris
            if range is not None and end > custom_end:
304 d804de82 Stavros Sachtouris
                end = custom_end
305 56f0908a Stavros Sachtouris
            data_range = 'bytes=%s-%s'%(start, end)
306 5b263ba2 Stavros Sachtouris
            result_array = []
307 b1713259 Stavros Sachtouris
            if islocalfile:
308 5b263ba2 Stavros Sachtouris
                handler = self._get_block_async(obj, data_range=data_range, version=version,
309 5b263ba2 Stavros Sachtouris
                    if_etag_match=if_match, if_etag_not_match=if_none_match,
310 5b263ba2 Stavros Sachtouris
                    if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
311 f0598cb2 Stavros Sachtouris
                flying.append({'handler':handler, 'start':start, 'data_range':data_range})
312 5b263ba2 Stavros Sachtouris
                newflying = []
313 5b263ba2 Stavros Sachtouris
                for v in flying:
314 5b263ba2 Stavros Sachtouris
                    h = v['handler']
315 5b263ba2 Stavros Sachtouris
                    if h.ready():
316 5b263ba2 Stavros Sachtouris
                        if h.exception:
317 5b263ba2 Stavros Sachtouris
                            h.release()
318 5b263ba2 Stavros Sachtouris
                            raise h.exception
319 5b263ba2 Stavros Sachtouris
                        f.seek(v['start'])
320 5b263ba2 Stavros Sachtouris
                        f.write(h.value.content)
321 5b263ba2 Stavros Sachtouris
                        f.flush()
322 64ab4c13 Stavros Sachtouris
                        #h.value.release()
323 5b263ba2 Stavros Sachtouris
                    else:
324 5b263ba2 Stavros Sachtouris
                        newflying.append(v)
325 5b263ba2 Stavros Sachtouris
                flying = newflying
326 5b263ba2 Stavros Sachtouris
            else:
327 5b263ba2 Stavros Sachtouris
                r = self._get_block(obj, data_range=data_range, version=version,
328 5b263ba2 Stavros Sachtouris
                    if_etag_match=if_match, if_etag_not_match=if_none_match,
329 5b263ba2 Stavros Sachtouris
                    if_modified_since=if_modified_since, if_unmodified_since=if_unmodified_since)
330 5b263ba2 Stavros Sachtouris
                f.write(r.content)
331 5b263ba2 Stavros Sachtouris
                f.flush()
332 f0598cb2 Stavros Sachtouris
                
333 5b263ba2 Stavros Sachtouris
        #write the last results and exit
334 5b263ba2 Stavros Sachtouris
        if islocalfile:
335 5b263ba2 Stavros Sachtouris
            from time import sleep
336 5b263ba2 Stavros Sachtouris
            while len(flying) > 0:
337 5b263ba2 Stavros Sachtouris
                result_array=[]
338 5b263ba2 Stavros Sachtouris
                newflying = []
339 5b263ba2 Stavros Sachtouris
                for v in flying:
340 5b263ba2 Stavros Sachtouris
                    h = v['handler']
341 5b263ba2 Stavros Sachtouris
                    if h.ready():
342 5b263ba2 Stavros Sachtouris
                        if h.exception:
343 5b263ba2 Stavros Sachtouris
                            h.release()
344 5b263ba2 Stavros Sachtouris
                            raise h.exception
345 5b263ba2 Stavros Sachtouris
                        f.seek(v['start'])
346 5b263ba2 Stavros Sachtouris
                        f.write(h.value.content)
347 5b263ba2 Stavros Sachtouris
                        f.flush()
348 64ab4c13 Stavros Sachtouris
                        #h.value.release()
349 5b263ba2 Stavros Sachtouris
                    else:
350 5b263ba2 Stavros Sachtouris
                        sleep(.2)
351 5b263ba2 Stavros Sachtouris
                        newflying.append(v)
352 5b263ba2 Stavros Sachtouris
                flying = newflying
353 d804de82 Stavros Sachtouris
            f.truncate(total_size)
354 d804de82 Stavros Sachtouris
355 64ab4c13 Stavros Sachtouris
        gevent.joinall(flying)
356 64ab4c13 Stavros Sachtouris
357 5b263ba2 Stavros Sachtouris
    def _get_block(self, obj, **kwargs):
358 64ab4c13 Stavros Sachtouris
        return self.object_get(obj, success=(200, 206), binary=True, **kwargs)
359 5b263ba2 Stavros Sachtouris
360 5b263ba2 Stavros Sachtouris
    def _get_block_async(self, obj, **kwargs):
361 5b263ba2 Stavros Sachtouris
        class SilentGreenlet(gevent.Greenlet):
362 5b263ba2 Stavros Sachtouris
            def _report_error(self, exc_info):
363 5b263ba2 Stavros Sachtouris
                _stderr = sys._stderr
364 5b263ba2 Stavros Sachtouris
                try:
365 5b263ba2 Stavros Sachtouris
                    sys.stderr = StringIO()
366 5b263ba2 Stavros Sachtouris
                    gevent.Greenlet._report_error(self, exc_info)
367 5b263ba2 Stavros Sachtouris
                finally:
368 5b263ba2 Stavros Sachtouris
                    sys.stderr = _stderr
369 5b263ba2 Stavros Sachtouris
        POOL_SIZE = 5
370 5b263ba2 Stavros Sachtouris
        if self.async_pool is None:
371 5b263ba2 Stavros Sachtouris
            self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
372 5b263ba2 Stavros Sachtouris
        g = SilentGreenlet(self._get_block, obj, **kwargs)
373 5b263ba2 Stavros Sachtouris
        self.async_pool.start(g)
374 5b263ba2 Stavros Sachtouris
        return g
375 b1713259 Stavros Sachtouris
376 d804de82 Stavros Sachtouris
    def get_object_hashmap(self, obj, version=None, if_match=None, if_none_match=None,
377 d804de82 Stavros Sachtouris
        if_modified_since=None, if_unmodified_since=None):
378 d804de82 Stavros Sachtouris
        try:
379 d804de82 Stavros Sachtouris
            r = self.object_get(obj, hashmap=True, version=version, if_etag_match=if_match,
380 d804de82 Stavros Sachtouris
                if_etag_not_match=if_none_match, if_modified_since=if_modified_since,
381 d804de82 Stavros Sachtouris
                if_unmodified_since=if_unmodified_since)
382 d804de82 Stavros Sachtouris
        except ClientError as err:
383 f0598cb2 Stavros Sachtouris
            
384 d804de82 Stavros Sachtouris
            if err.status == 304 or err.status == 412:
385 d804de82 Stavros Sachtouris
                return {}
386 d804de82 Stavros Sachtouris
            raise
387 64ab4c13 Stavros Sachtouris
        return r.json
388 56f0908a Stavros Sachtouris
389 3a9e54b0 Stavros Sachtouris
    def set_account_group(self, group, usernames):
390 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, groups = {group:usernames})
391 eb903ba7 Stavros Sachtouris
392 c2867610 Stavros Sachtouris
    def del_account_group(self, group):
393 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, groups={group:[]})
394 c2867610 Stavros Sachtouris
395 8af4cc0b Stavros Sachtouris
    def get_account_info(self, until=None):
396 8af4cc0b Stavros Sachtouris
        r = self.account_head(until=until)
397 6657ec8c Stavros Sachtouris
        if r.status_code == 401:
398 6657ec8c Stavros Sachtouris
            raise ClientError("No authorization")
399 64ab4c13 Stavros Sachtouris
        return r.headers
400 6657ec8c Stavros Sachtouris
401 d1856abf Stavros Sachtouris
    def get_account_quota(self):
402 eb903ba7 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
403 d1856abf Stavros Sachtouris
404 d1856abf Stavros Sachtouris
    def get_account_versioning(self):
405 eb903ba7 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
406 e6b39366 Stavros Sachtouris
407 8af4cc0b Stavros Sachtouris
    def get_account_meta(self, until=None):
408 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_account_info(until = until), 'X-Account-Meta-')
409 af3b2b36 Stavros Sachtouris
410 c2867610 Stavros Sachtouris
    def get_account_group(self):
411 c2867610 Stavros Sachtouris
        return filter_in(self.get_account_info(), 'X-Account-Group-')
412 c2867610 Stavros Sachtouris
413 af3b2b36 Stavros Sachtouris
    def set_account_meta(self, metapairs):
414 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
415 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, metadata=metapairs)
416 af3b2b36 Stavros Sachtouris
417 379cd4bb Stavros Sachtouris
    def del_account_meta(self, metakey):
418 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, metadata={metakey:''})
419 379cd4bb Stavros Sachtouris
420 d1856abf Stavros Sachtouris
    def set_account_quota(self, quota):
421 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, quota=quota)
422 d1856abf Stavros Sachtouris
423 d1856abf Stavros Sachtouris
    def set_account_versioning(self, versioning):
424 64ab4c13 Stavros Sachtouris
        self.account_post(update=True, versioning = versioning)
425 d1856abf Stavros Sachtouris
426 4fd88feb Stavros Sachtouris
    def list_containers(self):
427 4fd88feb Stavros Sachtouris
        r = self.account_get()
428 64ab4c13 Stavros Sachtouris
        return r.json
429 b758e547 Stavros Sachtouris
430 a298f2ab Stavros Sachtouris
    def del_container(self, until=None, delimiter=None):
431 a298f2ab Stavros Sachtouris
        self.assert_container()
432 a298f2ab Stavros Sachtouris
        r = self.container_delete(until=until, delimiter=delimiter, success=(204, 404, 409))
433 a298f2ab Stavros Sachtouris
        if r.status_code == 404:
434 a298f2ab Stavros Sachtouris
            raise ClientError('Container "%s" does not exist'%self.container, r.status_code)
435 a298f2ab Stavros Sachtouris
        elif r.status_code == 409:
436 a298f2ab Stavros Sachtouris
            raise ClientError('Container "%s" is not empty'%self.container, r.status_code)
437 a298f2ab Stavros Sachtouris
438 d1856abf Stavros Sachtouris
    def get_container_versioning(self, container):
439 2f749e6e Stavros Sachtouris
        self.container = container
440 2f749e6e Stavros Sachtouris
        return filter_in(self.get_container_info(), 'X-Container-Policy-Versioning')
441 d1856abf Stavros Sachtouris
442 d1856abf Stavros Sachtouris
    def get_container_quota(self, container):
443 2f749e6e Stavros Sachtouris
        self.container = container
444 2f749e6e Stavros Sachtouris
        return filter_in(self.get_container_info(), 'X-Container-Policy-Quota')
445 e6b39366 Stavros Sachtouris
446 8af4cc0b Stavros Sachtouris
    def get_container_info(self, until = None):
447 8af4cc0b Stavros Sachtouris
        r = self.container_head(until=until)
448 8af4cc0b Stavros Sachtouris
        return r.headers
449 8af4cc0b Stavros Sachtouris
450 8af4cc0b Stavros Sachtouris
    def get_container_meta(self, until = None):
451 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_container_info(until=until), 'X-Container-Meta')
452 e6b39366 Stavros Sachtouris
453 8af4cc0b Stavros Sachtouris
    def get_container_object_meta(self, until = None):
454 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_container_info(until=until), 'X-Container-Object-Meta')
455 e6b39366 Stavros Sachtouris
456 af3b2b36 Stavros Sachtouris
    def set_container_meta(self, metapairs):
457 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
458 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, metadata=metapairs)
459 f0598cb2 Stavros Sachtouris
        
460 3e544e5b Stavros Sachtouris
    def del_container_meta(self, metakey):
461 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, metadata={metakey:''})
462 e6b39366 Stavros Sachtouris
463 d1856abf Stavros Sachtouris
    def set_container_quota(self, quota):
464 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, quota=quota)
465 d1856abf Stavros Sachtouris
466 d1856abf Stavros Sachtouris
    def set_container_versioning(self, versioning):
467 64ab4c13 Stavros Sachtouris
        self.container_post(update=True, versioning=versioning)
468 d1856abf Stavros Sachtouris
469 a298f2ab Stavros Sachtouris
    def del_object(self, obj, until=None, delimiter=None):
470 a298f2ab Stavros Sachtouris
        self.assert_container()
471 64ab4c13 Stavros Sachtouris
        self.object_delete(obj, until=until, delimiter=delimiter)
472 a298f2ab Stavros Sachtouris
473 af3b2b36 Stavros Sachtouris
    def set_object_meta(self, object, metapairs):
474 af3b2b36 Stavros Sachtouris
        assert(type(metapairs) is dict)
475 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, metadata=metapairs)
476 87688514 Stavros Sachtouris
477 89c2e77b Stavros Sachtouris
    def del_object_meta(self, metakey, object):
478 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, metadata={metakey:''})
479 6de1f262 Stavros Sachtouris
480 87688514 Stavros Sachtouris
    def publish_object(self, object):
481 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, public=True)
482 87688514 Stavros Sachtouris
483 87688514 Stavros Sachtouris
    def unpublish_object(self, object):
484 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, public=False)
485 28470086 Stavros Sachtouris
486 8af4cc0b Stavros Sachtouris
    def get_object_info(self, obj, version=None):
487 8af4cc0b Stavros Sachtouris
        r = self.object_head(obj, version=version)
488 64ab4c13 Stavros Sachtouris
        return r.headers
489 8af4cc0b Stavros Sachtouris
490 8af4cc0b Stavros Sachtouris
    def get_object_meta(self, obj, version=None):
491 8af4cc0b Stavros Sachtouris
        return filter_in(self.get_object_info(obj, version=version), 'X-Object-Meta')
492 8af4cc0b Stavros Sachtouris
493 f49084df Stavros Sachtouris
    def get_object_sharing(self, object):
494 f70616fc Stavros Sachtouris
        r = filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
495 f70616fc Stavros Sachtouris
        reply = {}
496 f70616fc Stavros Sachtouris
        if len(r) > 0:
497 f70616fc Stavros Sachtouris
            perms = r['x-object-sharing'].split(';')
498 f70616fc Stavros Sachtouris
            for perm in perms:
499 f70616fc Stavros Sachtouris
                try:
500 f70616fc Stavros Sachtouris
                    perm.index('=')
501 f70616fc Stavros Sachtouris
                except ValueError:
502 f70616fc Stavros Sachtouris
                    raise ClientError('Incorrect reply format')
503 f70616fc Stavros Sachtouris
                (key, val) = perm.strip().split('=')
504 f70616fc Stavros Sachtouris
                reply[key] = val
505 f70616fc Stavros Sachtouris
        return reply
506 f49084df Stavros Sachtouris
507 28470086 Stavros Sachtouris
    def set_object_sharing(self, object, read_permition = False, write_permition = False):
508 28470086 Stavros Sachtouris
        """Give read/write permisions to an object.
509 28470086 Stavros Sachtouris
           @param object is the object to change sharing permitions onto
510 28470086 Stavros Sachtouris
           @param read_permition is a list of users and user groups that get read permition for this object
511 28470086 Stavros Sachtouris
                False means all previous read permitions will be removed
512 28470086 Stavros Sachtouris
           @param write_perimition is a list of users and user groups to get write permition for this object
513 28470086 Stavros Sachtouris
                False means all previous read permitions will be removed
514 28470086 Stavros Sachtouris
        """
515 4adfa919 Stavros Sachtouris
        perms = {}
516 4adfa919 Stavros Sachtouris
        perms['read'] = read_permition if isinstance(read_permition, list) else ''
517 4adfa919 Stavros Sachtouris
        perms['write'] = write_permition if isinstance(write_permition, list) else ''
518 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, permitions=perms)
519 28470086 Stavros Sachtouris
520 f49084df Stavros Sachtouris
    def del_object_sharing(self, object):
521 28470086 Stavros Sachtouris
        self.set_object_sharing(object)
522 f49084df Stavros Sachtouris
523 bcabbc35 Stavros Sachtouris
    def append_object(self, object, source_file, upload_cb = None):
524 9f74ca46 Stavros Sachtouris
        """@param upload_db is a generator for showing progress of upload
525 bcabbc35 Stavros Sachtouris
            to caller application, e.g. a progress bar. Its next is called
526 bcabbc35 Stavros Sachtouris
            whenever a block is uploaded
527 bcabbc35 Stavros Sachtouris
        """
528 ab474306 Stavros Sachtouris
        self.assert_container()
529 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
530 ab474306 Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
531 64ab4c13 Stavros Sachtouris
        filesize = fstat(source_file.fileno()).st_size
532 ab474306 Stavros Sachtouris
        nblocks = 1 + (filesize - 1)//blocksize
533 ab474306 Stavros Sachtouris
        offset = 0
534 bcabbc35 Stavros Sachtouris
        if upload_cb is not None:
535 bcabbc35 Stavros Sachtouris
            upload_gen = upload_cb(nblocks)
536 ab474306 Stavros Sachtouris
        for i in range(nblocks):
537 ab474306 Stavros Sachtouris
            block = source_file.read(min(blocksize, filesize - offset))
538 ab474306 Stavros Sachtouris
            offset += len(block)
539 64ab4c13 Stavros Sachtouris
            self.object_post(object, update=True, content_range='bytes */*',
540 e92440bd Stavros Sachtouris
                content_type='application/octet-stream', content_length=len(block), data=block)
541 f0598cb2 Stavros Sachtouris
            
542 bcabbc35 Stavros Sachtouris
            if upload_cb is not None:
543 bcabbc35 Stavros Sachtouris
                upload_gen.next()
544 561116a6 Stavros Sachtouris
545 561116a6 Stavros Sachtouris
    def truncate_object(self, object, upto_bytes):
546 64ab4c13 Stavros Sachtouris
        self.object_post(object, update=True, content_range='bytes 0-%s/*'%upto_bytes,
547 4adfa919 Stavros Sachtouris
            content_type='application/octet-stream', object_bytes=upto_bytes,
548 4adfa919 Stavros Sachtouris
            source_object=path4url(self.container, object))
549 ee62607e Stavros Sachtouris
550 bcabbc35 Stavros Sachtouris
    def overwrite_object(self, object, start, end, source_file, upload_cb=None):
551 ee62607e Stavros Sachtouris
        """Overwrite a part of an object with given source file
552 ee62607e Stavros Sachtouris
           @start the part of the remote object to start overwriting from, in bytes
553 ee62607e Stavros Sachtouris
           @end the part of the remote object to stop overwriting to, in bytes
554 ee62607e Stavros Sachtouris
        """
555 ee62607e Stavros Sachtouris
        self.assert_container()
556 2f749e6e Stavros Sachtouris
        meta = self.get_container_info()
557 ee62607e Stavros Sachtouris
        blocksize = int(meta['x-container-block-size'])
558 64ab4c13 Stavros Sachtouris
        filesize = fstat(source_file.fileno()).st_size
559 ee62607e Stavros Sachtouris
        datasize = int(end) - int(start) + 1
560 ee62607e Stavros Sachtouris
        nblocks = 1 + (datasize - 1)//blocksize
561 ee62607e Stavros Sachtouris
        offset = 0
562 bcabbc35 Stavros Sachtouris
        if upload_cb is not None:
563 bcabbc35 Stavros Sachtouris
            upload_gen = upload_cb(nblocks)
564 ee62607e Stavros Sachtouris
        for i in range(nblocks):
565 ee62607e Stavros Sachtouris
            block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
566 ee62607e Stavros Sachtouris
            offset += len(block)
567 64ab4c13 Stavros Sachtouris
            self.object_post(object, update=True, content_type='application/octet-stream', 
568 4adfa919 Stavros Sachtouris
                content_length=len(block), content_range='bytes %s-%s/*'%(start,end), data=block)
569 f0598cb2 Stavros Sachtouris
            
570 bcabbc35 Stavros Sachtouris
            if upload_cb is not None:
571 bcabbc35 Stavros Sachtouris
                upload_gen.next()
572 7d420701 Stavros Sachtouris
573 7d420701 Stavros Sachtouris
    def copy_object(self, src_container, src_object, dst_container, dst_object=False,
574 7d420701 Stavros Sachtouris
        source_version = None, public=False, content_type=None, delimiter=None):
575 7d420701 Stavros Sachtouris
        self.assert_account()
576 7d420701 Stavros Sachtouris
        self.container = dst_container
577 7d420701 Stavros Sachtouris
        dst_object = dst_object or src_object
578 7d420701 Stavros Sachtouris
        src_path = path4url(src_container, src_object)
579 64ab4c13 Stavros Sachtouris
        self.object_put(dst_object, success=201, copy_from=src_path, content_length=0,
580 7d420701 Stavros Sachtouris
            source_version=source_version, public=public, content_type=content_type,
581 7d420701 Stavros Sachtouris
            delimiter=delimiter)
582 a5e0629d Stavros Sachtouris
583 a5e0629d Stavros Sachtouris
    def move_object(self, src_container, src_object, dst_container, dst_object=False,
584 a5e0629d Stavros Sachtouris
        source_version = None, public=False, content_type=None, delimiter=None):
585 a5e0629d Stavros Sachtouris
        self.assert_account()
586 a5e0629d Stavros Sachtouris
        self.container = dst_container
587 a5e0629d Stavros Sachtouris
        dst_object = dst_object or src_object
588 a5e0629d Stavros Sachtouris
        src_path = path4url(src_container, src_object)
589 64ab4c13 Stavros Sachtouris
        self.object_put(dst_object, success=201, move_from=src_path, content_length=0,
590 a5e0629d Stavros Sachtouris
            source_version=source_version, public=public, content_type=content_type,
591 64ab4c13 Stavros Sachtouris
            delimiter=delimiter)