Statistics
| Branch: | Tag: | Revision:

root / kamaki / clients / pithos.py @ d5832d3b

History | View | Annotate | Download (6.8 kB)

1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import hashlib
35
import os
36

    
37
from time import time
38

    
39
from .storage import StorageClient
40
from .utils import path4url, params4url, prefix_keys, filter_in, filter_out
41

    
42

    
43
def pithos_hash(block, blockhash):
44
    h = hashlib.new(blockhash)
45
    h.update(block.rstrip('\x00'))
46
    return h.hexdigest()
47

    
48

    
49
class PithosClient(StorageClient):
50
    """GRNet Pithos API client"""
51

    
52
    def purge_container(self, container):
53
        self.assert_account()
54
        path = '/%s/%s' % (self.account, container)
55
        params = {'until': int(time())}
56
        self.delete(path, params=params, success=204)
57

    
58
    def put_block(self, data, hash):
59
        path = '/%s/%s' % (self.account, self.container)
60
        params = {'update': ''}
61
        headers = {'Content-Type': 'application/octet-stream',
62
                   'Content-Length': str(len(data))}
63
        r = self.post(path, params=params, data=data, headers=headers,
64
                      success=202)
65
        assert r.text.strip() == hash, 'Local hash does not match server'
66

    
67
    def create_object(self, object, f, size=None, hash_cb=None,
68
                      upload_cb=None):
69
        """Create an object by uploading only the missing blocks
70

71
        hash_cb is a generator function taking the total number of blocks to
72
        be hashed as an argument. Its next() will be called every time a block
73
        is hashed.
74

75
        upload_cb is a generator function with the same properties that is
76
        called every time a block is uploaded.
77
        """
78
        self.assert_container()
79

    
80
        meta = self.get_container_info(self.container)
81
        print(unicode(meta))
82
        blocksize = int(meta['x-container-block-size'])
83
        blockhash = meta['x-container-block-hash']
84

    
85
        size = size if size is not None else os.fstat(f.fileno()).st_size
86
        nblocks = 1 + (size - 1) // blocksize
87
        hashes = []
88
        map = {}
89

    
90
        offset = 0
91

    
92
        if hash_cb:
93
            hash_gen = hash_cb(nblocks)
94
            hash_gen.next()
95

    
96
        for i in range(nblocks):
97
            block = f.read(min(blocksize, size - offset))
98
            bytes = len(block)
99
            hash = pithos_hash(block, blockhash)
100
            hashes.append(hash)
101
            map[hash] = (offset, bytes)
102
            offset += bytes
103
            if hash_cb:
104
                hash_gen.next()
105

    
106
        assert offset == size
107

    
108
        path = '/%s/%s/%s' % (self.account, self.container, object)
109
        params = dict(format='json', hashmap='')
110
        hashmap = dict(bytes=size, hashes=hashes)
111
        headers = {'Content-Type': 'application/octet-stream'}
112
        r = self.put(path, params=params, headers=headers, json=hashmap,
113
                     success=(201, 409))
114

    
115
        if r.status_code == 201:
116
            return
117

    
118
        missing = r.json
119

    
120
        if upload_cb:
121
            upload_gen = upload_cb(len(missing))
122
            upload_gen.next()
123

    
124
        for hash in missing:
125
            offset, bytes = map[hash]
126
            f.seek(offset)
127
            data = f.read(bytes)
128
            self.put_block(data, hash)
129
            if upload_cb:
130
                upload_gen.next()
131

    
132
        self.put(path, params=params, headers=headers, json=hashmap,
133
                 success=201)
134

    
135
    def get_account_policy(self):
136
        return filter_in(self.get_account_info(), 'X-Account-Policy-')
137

    
138
    def get_account_meta(self):
139
        return filter_in(self.get_account_info(), 'X-Account-Meta-')
140

    
141
    def set_account_meta(self, metapairs):
142
        assert(type(metapairs) is dict)
143
        self.assert_account()
144
        path = path4url(self.account)+params4url({'update':None})
145
        meta = prefix_keys(metapairs, 'X-Account-Meta-')
146
        self.post(path, meta=meta, success=202)
147

    
148
    def get_container_policy(self, container):
149
        return filter_in(self.get_container_info(container), 'X-Container-Policy-')
150

    
151
    def get_container_meta(self, container):
152
        return filter_in(self.get_container_info(container), 'X-Container-Meta-')
153

    
154
    def get_container_object_meta(self, container):
155
        return filter_in(self.get_container_info(container), 'X-Container-Object-Meta')
156

    
157
    def set_container_meta(self, metapairs):
158
        assert(type(metapairs) is dict)
159
        self.assert_container()
160
        path=path4url(self.account, self.container)+params4url({'update':None})
161
        meta = prefix_keys(metapairs, 'X-Container-Meta-')
162
        self.post(path, meta=meta, success=202)
163

    
164
    def delete_container_meta(self, metakey):
165
        headers = self.get_container_info(self.container)
166
        new_headers = filter_out(headers, 'x-container-meta-'+metakey, exactMatch = True)
167
        if len(new_headers) == len(headers):
168
            raise ClientError('X-Container-Meta-%s not found' % metakey, 404)
169
        path = path4url(self.account, self.container)
170
        self.post(path, headers=new_headers, success = 202)
171

    
172
    def replace_container_meta(self, metapairs):
173
        self.assert_container()
174
        path=path4url(self.account, self.container)
175
        meta = prefix_keys(metapairs, 'X-Container-Meta-')
176
        self.post(path, meta=meta, success=202)
177

    
178
    def set_object_meta(self, object, metapairs):
179
        assert(type(metapairs) is dict)
180
        self.assert_container()
181
        path=path4url(self.account, self.container, object)+params4url({'update':None})
182
        meta = prefix_keys(metapairs, 'X-Object-Meta-')
183
        self.post(path, meta=meta, success=202)