83dd736289311b0f614b00ee51aa47b995e882b8
[kamaki] / kamaki / clients / pithos.py
1 # Copyright 2011-2012 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import hashlib
35 import os
36
37 from time import time
38
39 from .storage import StorageClient, ClientError
40 from .utils import path4url, params4url, prefix_keys, filter_in, filter_out
41
42
43 def pithos_hash(block, blockhash):
44     h = hashlib.new(blockhash)
45     h.update(block.rstrip('\x00'))
46     return h.hexdigest()
47
48
49 class PithosClient(StorageClient):
50     """GRNet Pithos API client"""
51
52     def purge_container(self, container):
53         self.assert_account()
54         path = path4url(self.account, container)+params4url({'until': unicode(time())})
55         self.delete(path, success=204)
56
57     def put_block(self, data, hash):
58         path = path4url(self.account, self.container)+params4url({'update':None})
59         self.set_header('Content-Type', 'application/octet-stream')
60         self.set_header('Content-Length', len(data))
61         r = self.post(path, data=data, success=202)
62         assert r.text.strip() == hash, 'Local hash does not match server'
63
64     def create_object(self, object, f, size=None, hash_cb=None,
65                       upload_cb=None):
66         """Create an object by uploading only the missing blocks
67         hash_cb is a generator function taking the total number of blocks to
68         be hashed as an argument. Its next() will be called every time a block
69         is hashed.
70
71         upload_cb is a generator function with the same properties that is
72         called every time a block is uploaded.
73         """
74         self.assert_container()
75
76         meta = self.get_container_info(self.container)
77         blocksize = int(meta['x-container-block-size'])
78         blockhash = meta['x-container-block-hash']
79
80         size = size if size is not None else os.fstat(f.fileno()).st_size
81         nblocks = 1 + (size - 1) // blocksize
82         hashes = []
83         map = {}
84
85         offset = 0
86
87         if hash_cb:
88             hash_gen = hash_cb(nblocks)
89             hash_gen.next()
90
91         for i in range(nblocks):
92             block = f.read(min(blocksize, size - offset))
93             bytes = len(block)
94             hash = pithos_hash(block, blockhash)
95             hashes.append(hash)
96             map[hash] = (offset, bytes)
97             offset += bytes
98             if hash_cb:
99                 hash_gen.next()
100
101         assert offset == size
102
103         path = path4url(self.account, self.container, object)+params4url(dict(format='json', hashmap=''))
104         hashmap = dict(bytes=size, hashes=hashes)
105         self.set_header('Content-Type', 'application/octet-stream')
106         r = self.put(path, json=hashmap, success=(201, 409))
107
108         if r.status_code == 201:
109             return
110
111         missing = r.json
112
113         if upload_cb:
114             upload_gen = upload_cb(len(missing))
115             upload_gen.next()
116
117         for hash in missing:
118             offset, bytes = map[hash]
119             f.seek(offset)
120             data = f.read(bytes)
121             self.put_block(data, hash)
122             if upload_cb:
123                 upload_gen.next()
124
125         self.put(path, json=hashmap, success=201)
126
127     def set_account_group(self, group, usernames):
128         self.assert_account()
129         path = path4url(self.account)+params4url({'update':None})
130         userstr = ''
131         dlm = ''
132         for user in usernames:
133             userstr = userstr + dlm + user
134             dlm = ','
135         self.set_header('X-Account-Group-'+group, userstr)
136         self.post(path, success=202)
137
138     def del_account_group(self, group):
139         self.assert_account()
140         path = path4url(self.account)+params4url({'update':None})
141         self.set_header('X-Account-Group-'+group, '')
142         r = self.post(path, success=202)
143
144     def get_account_quota(self):
145         return filter_in(self.get_account_info(), 'X-Account-Policy-Quota', exactMatch = True)
146
147     def get_account_versioning(self):
148         return filter_in(self.get_account_info(), 'X-Account-Policy-Versioning', exactMatch = True)
149
150     def get_account_meta(self):
151         return filter_in(self.get_account_info(), 'X-Account-Meta-')
152
153     def get_account_group(self):
154         return filter_in(self.get_account_info(), 'X-Account-Group-')
155
156     def set_account_meta(self, metapairs):
157         assert(type(metapairs) is dict)
158         self.assert_account()
159         path = path4url(self.account)+params4url({'update':None})
160         for key, val in metapairs.items():
161             self.set_header('X-Account-Meta-'+key, val)
162         self.post(path, success=202)
163
164     def set_account_quota(self, quota):
165         self.assert_account()
166         path = path4url(self.account)+params4url({'update':None})
167         self.set_header('X-Account-Policy-Quota', quota)
168         self.post(path, success=202)
169
170     def set_account_versioning(self, versioning):
171         self.assert_account()
172         path = path4url(self.account)+params4url({'update':None})
173         self.set_header('X-Account-Policy-Versioning', versioning)
174         self.post(path, success=202)
175
176     def get_container_versioning(self, container):
177         return filter_in(self.get_container_info(container), 'X-Container-Policy-Versioning')
178
179     def get_container_quota(self, container):
180         return filter_in(self.get_container_info(container), 'X-Container-Policy-Quota')
181
182     def get_container_meta(self, container):
183         return filter_in(self.get_container_info(container), 'X-Container-Meta-')
184
185     def get_container_object_meta(self, container):
186         return filter_in(self.get_container_info(container), 'X-Container-Object-Meta')
187
188     def set_container_meta(self, metapairs):
189         assert(type(metapairs) is dict)
190         self.assert_container()
191         path=path4url(self.account, self.container)+params4url({'update':None})
192         for key, val in metapairs.items():
193             self.set_header('X-Container-Meta-'+key, val)
194         self.post(path, success=202)
195
196     def delete_container_meta(self, metakey):
197         headers = self.get_container_info(self.container)
198         self.headers = filter_out(headers, 'x-container-meta-'+metakey, exactMatch = True)
199         if len(self.headers) == len(headers):
200             raise ClientError('X-Container-Meta-%s not found' % metakey, 404)
201         path = path4url(self.account, self.container)
202         self.post(path, success = 202)
203
204     def replace_container_meta(self, metapairs):
205         self.assert_container()
206         path=path4url(self.account, self.container)
207         for key, val in metapairs.items():
208             self.set_header('X-Container-Meta-'+key, val)
209         self.post(path, success=202)
210
211     def set_container_quota(self, quota):
212         self.assert_container()
213         path = path4url(self.account, self.container)+params4url({'update':None})
214         self.set_header('X-Container-Policy-Quota', quota)
215         self.post(path, success=202)
216
217     def set_container_versioning(self, versioning):
218         self.assert_container()
219         path = path4url(self.account, self.container)+params4url({'update':None})
220         self.set_header('X-Container-Policy-Versioning', versioning)
221         self.post(path, success=202)
222
223     def set_object_meta(self, object, metapairs):
224         assert(type(metapairs) is dict)
225         self.assert_container()
226         path=path4url(self.account, self.container, object)+params4url({'update':None})
227         for key, val in metapairs.items():
228             self.set_header('X-Object-Meta-'+key, val)
229         self.post(path, success=202)
230
231     def delete_object_meta(self, metakey, object):
232         self.assert_container()
233         self.set_header('X-Object-Meta-'+metakey, '')
234         path = path4url(self.account, self.container, object)+params4url({'update':None})
235         self.post(path, success=202)
236
237     def publish_object(self, object):
238         self.assert_container()
239         path = path4url(self.account, self.container, object)+params4url({'update':None})
240         self.set_header('X-Object-Public', True)
241         self.post(path, success=202)
242
243     def unpublish_object(self, object):
244         self.assert_container()
245         path = path4url(self.account, self.container, object)+params4url({'update':None})
246         self.set_header('X-Object-Public', False)
247         self.post(path, success=202)
248
249     def get_object_sharing(self, object):
250         return filter_in(self.get_object_info(object), 'X-Object-Sharing', exactMatch = True)
251
252     def set_object_sharing(self, object, read_permition = False, write_permition = False):
253         """Give read/write permisions to an object.
254            @param object is the object to change sharing permitions onto
255            @param read_permition is a list of users and user groups that get read permition for this object
256                 False means all previous read permitions will be removed
257            @param write_perimition is a list of users and user groups to get write permition for this object
258                 False means all previous read permitions will be removed
259         """
260         self.assert_container()
261         perms = ''
262         if read_permition:
263             dlm = ''
264             perms = 'read='
265             for rperm in read_permition:
266                 perms = perms + dlm + rperm
267                 dlm = ','
268         if write_permition:
269             dlm = ''
270             perms = 'write=' if not read_permition else perms + ';write='
271             for wperm in write_permition:
272                 perms = perms + dlm + wperm
273                 dlm = ','
274         path = path4url(self.account, self.container, object)+params4url({'update':None})
275         self.set_header('X-Object-Sharing', perms)
276         self.post(path, success=(202, 204))
277
278     def del_object_sharing(self, object):
279         self.set_object_sharing(object)
280
281     def append_object(self, object, source_file, upload_cb = None):
282         """@poaram upload_db is a generator for showing progress of upload
283             to caller application, e.g. a progress bar. Its next is called
284             whenever a block is uploaded
285         """
286         self.assert_container()
287         meta = self.get_container_info(self.container)
288         blocksize = int(meta['x-container-block-size'])
289         filesize = os.fstat(source_file.fileno()).st_size
290         nblocks = 1 + (filesize - 1)//blocksize
291         offset = 0
292         self.set_header('Content-Range', 'bytes */*')
293         self.set_header('Content-Type', 'application/octet-stream')
294         path=path4url(self.account, self.container, object)+params4url({'update':None})
295         if upload_cb is not None:
296             upload_gen = upload_cb(nblocks)
297         for i in range(nblocks):
298             block = source_file.read(min(blocksize, filesize - offset))
299             offset += len(block)
300             self.set_header('Content-Length', len(block))
301             self.post(path, data=block, success=(202, 204))
302             if upload_cb is not None:
303                 upload_gen.next()
304
305     def truncate_object(self, object, upto_bytes):
306         self.assert_container()
307         self.set_header('Content-Range', 'bytes 0-%s/*'%upto_bytes)
308         self.set_header('Content-Type', 'application/octet-stream')
309         self.set_header('X-Object-Bytes', upto_bytes)
310         self.set_header('X-Source-Object', path4url(self.container, object))
311         path=path4url(self.account, self.container, object)+params4url({'update':None})
312         self.post(path, success=(202, 204))
313
314     def overwrite_object(self, object, start, end, source_file, upload_cb=None):
315         """Overwrite a part of an object with given source file
316            @start the part of the remote object to start overwriting from, in bytes
317            @end the part of the remote object to stop overwriting to, in bytes
318         """
319         self.assert_container()
320         meta = self.get_container_info(self.container)
321         blocksize = int(meta['x-container-block-size'])
322         filesize = os.fstat(source_file.fileno()).st_size
323         datasize = int(end) - int(start) + 1
324         nblocks = 1 + (datasize - 1)//blocksize
325         offset = 0
326         self.set_header('Content-Range', 'bytes %s-%s/*' % (start, end) )
327         self.set_header('Content-Type', 'application/octet-stream')
328         path=path4url(self.account, self.container, object)+params4url({'update':None})
329         if upload_cb is not None:
330             upload_gen = upload_cb(nblocks)
331         for i in range(nblocks):
332             block = source_file.read(min(blocksize, filesize - offset, datasize - offset))
333             offset += len(block)
334             self.set_header('Content-Length', len(block))
335             self.post(path, data=block, success=(202, 204))
336             if upload_cb is not None:
337                 upload_gen.next()
338