Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / context_archipelago.py @ b5636704

History | View | Annotate | Download (5.9 kB)

1 f3525003 Chrysostomos Nanakos
# Copyright 2013 GRNET S.A. All rights reserved.
2 f3525003 Chrysostomos Nanakos
#
3 f3525003 Chrysostomos Nanakos
# Redistribution and use in source and binary forms, with or
4 f3525003 Chrysostomos Nanakos
# without modification, are permitted provided that the following
5 f3525003 Chrysostomos Nanakos
# conditions are met:
6 f3525003 Chrysostomos Nanakos
#
7 f3525003 Chrysostomos Nanakos
#   1. Redistributions of source code must retain the above
8 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
9 f3525003 Chrysostomos Nanakos
#      disclaimer.
10 f3525003 Chrysostomos Nanakos
#
11 f3525003 Chrysostomos Nanakos
#   2. Redistributions in binary form must reproduce the above
12 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
13 f3525003 Chrysostomos Nanakos
#      disclaimer in the documentation and/or other materials
14 f3525003 Chrysostomos Nanakos
#      provided with the distribution.
15 f3525003 Chrysostomos Nanakos
#
16 f3525003 Chrysostomos Nanakos
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 f3525003 Chrysostomos Nanakos
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 f3525003 Chrysostomos Nanakos
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 f3525003 Chrysostomos Nanakos
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 f3525003 Chrysostomos Nanakos
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 f3525003 Chrysostomos Nanakos
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 f3525003 Chrysostomos Nanakos
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 f3525003 Chrysostomos Nanakos
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 f3525003 Chrysostomos Nanakos
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 f3525003 Chrysostomos Nanakos
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 f3525003 Chrysostomos Nanakos
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 f3525003 Chrysostomos Nanakos
# POSSIBILITY OF SUCH DAMAGE.
28 f3525003 Chrysostomos Nanakos
#
29 f3525003 Chrysostomos Nanakos
# The views and conclusions contained in the software and
30 f3525003 Chrysostomos Nanakos
# documentation are those of the authors and should not be
31 f3525003 Chrysostomos Nanakos
# interpreted as representing official policies, either expressed
32 f3525003 Chrysostomos Nanakos
# or implied, of GRNET S.A.
33 f3525003 Chrysostomos Nanakos
34 f3525003 Chrysostomos Nanakos
from os import SEEK_CUR, SEEK_SET
35 f3525003 Chrysostomos Nanakos
from archipelago.common import (
36 b5636704 Chrysostomos Nanakos
    Request,
37 b5636704 Chrysostomos Nanakos
    string_at,
38 b5636704 Chrysostomos Nanakos
    )
39 f3525003 Chrysostomos Nanakos
from pithos.workers import monkey
40 f3525003 Chrysostomos Nanakos
monkey.patch_Request()
41 f3525003 Chrysostomos Nanakos
42 f3525003 Chrysostomos Nanakos
_zeros = ''
43 f3525003 Chrysostomos Nanakos
44 f3525003 Chrysostomos Nanakos
45 f3525003 Chrysostomos Nanakos
def zeros(nr):
46 f3525003 Chrysostomos Nanakos
    global _zeros
47 f3525003 Chrysostomos Nanakos
    size = len(_zeros)
48 f3525003 Chrysostomos Nanakos
    if nr == size:
49 f3525003 Chrysostomos Nanakos
        return _zeros
50 f3525003 Chrysostomos Nanakos
51 f3525003 Chrysostomos Nanakos
    if nr > size:
52 f3525003 Chrysostomos Nanakos
        _zeros += '\0' * (nr - size)
53 f3525003 Chrysostomos Nanakos
        return _zeros
54 f3525003 Chrysostomos Nanakos
55 f3525003 Chrysostomos Nanakos
    if nr < size:
56 f3525003 Chrysostomos Nanakos
        _zeros = _zeros[:nr]
57 f3525003 Chrysostomos Nanakos
        return _zeros
58 f3525003 Chrysostomos Nanakos
59 f3525003 Chrysostomos Nanakos
60 f3525003 Chrysostomos Nanakos
def file_sync_write_chunks(archipelagoobject, chunksize, offset,
61 f3525003 Chrysostomos Nanakos
                           chunks, size=None):
62 f3525003 Chrysostomos Nanakos
    """Write given chunks to the given buffered file object.
63 f3525003 Chrysostomos Nanakos
       Writes never span across chunk boundaries.
64 f3525003 Chrysostomos Nanakos
       If size is given stop after or pad until size bytes have been written.
65 f3525003 Chrysostomos Nanakos
    """
66 f3525003 Chrysostomos Nanakos
    padding = 0
67 f3525003 Chrysostomos Nanakos
    cursize = chunksize * offset
68 f3525003 Chrysostomos Nanakos
    archipelagoobject.seek(cursize)
69 f3525003 Chrysostomos Nanakos
    for chunk in chunks:
70 f3525003 Chrysostomos Nanakos
        if padding:
71 f3525003 Chrysostomos Nanakos
            archipelagoobject.sync_write(buffer(zeros(chunksize), 0, padding))
72 f3525003 Chrysostomos Nanakos
        if size is not None and cursize + chunksize >= size:
73 f3525003 Chrysostomos Nanakos
            chunk = chunk[:chunksize - (cursize - size)]
74 f3525003 Chrysostomos Nanakos
            archipelagoobject.sync_write(chunk)
75 f3525003 Chrysostomos Nanakos
            cursize += len(chunk)
76 f3525003 Chrysostomos Nanakos
            break
77 f3525003 Chrysostomos Nanakos
        archipelagoobject.sync_write(chunk)
78 f3525003 Chrysostomos Nanakos
        padding = chunksize - len(chunk)
79 f3525003 Chrysostomos Nanakos
80 f3525003 Chrysostomos Nanakos
    padding = size - cursize if size is not None else 0
81 f3525003 Chrysostomos Nanakos
    if padding <= 0:
82 f3525003 Chrysostomos Nanakos
        return
83 f3525003 Chrysostomos Nanakos
84 f3525003 Chrysostomos Nanakos
    q, r = divmod(padding, chunksize)
85 f3525003 Chrysostomos Nanakos
    for x in xrange(q):
86 f3525003 Chrysostomos Nanakos
        archipelagoobject.sync_write(zeros(chunksize))
87 f3525003 Chrysostomos Nanakos
    archipelagoobject.sync_write(buffer(zeros(chunksize), 0, r))
88 f3525003 Chrysostomos Nanakos
89 f3525003 Chrysostomos Nanakos
90 f3525003 Chrysostomos Nanakos
def file_sync_read_chunks(archipelagoobject, chunksize, nr, offset=0):
91 f3525003 Chrysostomos Nanakos
    """Read and yield groups of chunks from a buffered file object at offset.
92 f3525003 Chrysostomos Nanakos
       Reads never span accros chunksize boundaries.
93 f3525003 Chrysostomos Nanakos
    """
94 f3525003 Chrysostomos Nanakos
    archipelagoobject.seek(offset * chunksize)
95 f3525003 Chrysostomos Nanakos
    while nr:
96 f3525003 Chrysostomos Nanakos
        remains = chunksize
97 f3525003 Chrysostomos Nanakos
        chunk = ''
98 f3525003 Chrysostomos Nanakos
        while 1:
99 f3525003 Chrysostomos Nanakos
            s = archipelagoobject.sync_read(remains)
100 f3525003 Chrysostomos Nanakos
            if not s:
101 f3525003 Chrysostomos Nanakos
                if chunk:
102 f3525003 Chrysostomos Nanakos
                    yield chunk
103 f3525003 Chrysostomos Nanakos
                return
104 f3525003 Chrysostomos Nanakos
            chunk += s
105 f3525003 Chrysostomos Nanakos
            remains -= len(s)
106 f3525003 Chrysostomos Nanakos
            if remains <= 0:
107 f3525003 Chrysostomos Nanakos
                break
108 f3525003 Chrysostomos Nanakos
        yield chunk
109 f3525003 Chrysostomos Nanakos
        nr -= 1
110 f3525003 Chrysostomos Nanakos
111 f3525003 Chrysostomos Nanakos
112 f3525003 Chrysostomos Nanakos
class ArchipelagoObject(object):
113 f3525003 Chrysostomos Nanakos
    __slots__ = ("name", "ioctx_pool", "dst_port", "create", "offset")
114 f3525003 Chrysostomos Nanakos
115 f3525003 Chrysostomos Nanakos
    def __init__(self, name, ioctx_pool, dst_port=None, create=0):
116 f3525003 Chrysostomos Nanakos
        self.name = name
117 f3525003 Chrysostomos Nanakos
        self.ioctx_pool = ioctx_pool
118 f3525003 Chrysostomos Nanakos
        self.create = create
119 f3525003 Chrysostomos Nanakos
        self.dst_port = dst_port
120 f3525003 Chrysostomos Nanakos
        self.offset = 0
121 f3525003 Chrysostomos Nanakos
122 f3525003 Chrysostomos Nanakos
    def __enter__(self):
123 f3525003 Chrysostomos Nanakos
        return self
124 f3525003 Chrysostomos Nanakos
125 f3525003 Chrysostomos Nanakos
    def __exit__(self, exc, arg, trace):
126 f3525003 Chrysostomos Nanakos
        return False
127 f3525003 Chrysostomos Nanakos
128 f3525003 Chrysostomos Nanakos
    def seek(self, offset, whence=SEEK_SET):
129 f3525003 Chrysostomos Nanakos
        if whence == SEEK_CUR:
130 f3525003 Chrysostomos Nanakos
            offset += self.offset
131 f3525003 Chrysostomos Nanakos
        self.offset = offset
132 f3525003 Chrysostomos Nanakos
        return offset
133 f3525003 Chrysostomos Nanakos
134 f3525003 Chrysostomos Nanakos
    def tell(self):
135 f3525003 Chrysostomos Nanakos
        return self.offset
136 f3525003 Chrysostomos Nanakos
137 f3525003 Chrysostomos Nanakos
    def truncate(self, size):
138 f3525003 Chrysostomos Nanakos
        raise NotImplementedError("File truncation is not implemented yet \
139 f3525003 Chrysostomos Nanakos
                                   in archipelago")
140 f3525003 Chrysostomos Nanakos
141 f3525003 Chrysostomos Nanakos
    def sync_write(self, data):
142 b5636704 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
143 f3525003 Chrysostomos Nanakos
        req = Request.get_write_request(ioctx, self.dst_port, self.name,
144 f3525003 Chrysostomos Nanakos
                                        data=data, offset=self.offset,
145 f3525003 Chrysostomos Nanakos
                                        datalen=len(data))
146 f3525003 Chrysostomos Nanakos
        req.submit()
147 f3525003 Chrysostomos Nanakos
        req.wait()
148 f3525003 Chrysostomos Nanakos
        ret = req.success()
149 f3525003 Chrysostomos Nanakos
        req.put()
150 f3525003 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
151 f3525003 Chrysostomos Nanakos
        if ret:
152 f3525003 Chrysostomos Nanakos
            self.offset += len(data)
153 f3525003 Chrysostomos Nanakos
        else:
154 f3525003 Chrysostomos Nanakos
            raise IOError("archipelago: Write request error")
155 f3525003 Chrysostomos Nanakos
156 f3525003 Chrysostomos Nanakos
    def sync_write_chunks(self, chunksize, offset, chunks, size=None):
157 b5636704 Chrysostomos Nanakos
        return file_sync_write_chunks(self, chunksize, offset, chunks, size)
158 f3525003 Chrysostomos Nanakos
159 f3525003 Chrysostomos Nanakos
    def sync_read(self, size):
160 f3525003 Chrysostomos Nanakos
        read = Request.get_read_request
161 f3525003 Chrysostomos Nanakos
        data = ''
162 f3525003 Chrysostomos Nanakos
        datalen = 0
163 f3525003 Chrysostomos Nanakos
        dsize = size
164 f3525003 Chrysostomos Nanakos
        while 1:
165 b5636704 Chrysostomos Nanakos
            ioctx = self.ioctx_pool.pool_get()
166 f3525003 Chrysostomos Nanakos
            req = read(ioctx, self.dst_port,
167 b5636704 Chrysostomos Nanakos
                       self.name, size=dsize-datalen, offset=self.offset)
168 f3525003 Chrysostomos Nanakos
            req.submit()
169 f3525003 Chrysostomos Nanakos
            req.wait()
170 f3525003 Chrysostomos Nanakos
            ret = req.success()
171 f3525003 Chrysostomos Nanakos
            if ret:
172 b5636704 Chrysostomos Nanakos
                s = string_at(req.get_data(), dsize-datalen)
173 f3525003 Chrysostomos Nanakos
            else:
174 f3525003 Chrysostomos Nanakos
                s = None
175 f3525003 Chrysostomos Nanakos
            req.put()
176 f3525003 Chrysostomos Nanakos
            self.ioctx_pool.pool_put(ioctx)
177 f3525003 Chrysostomos Nanakos
            if not s:
178 f3525003 Chrysostomos Nanakos
                break
179 f3525003 Chrysostomos Nanakos
            data += s
180 f3525003 Chrysostomos Nanakos
            datalen += len(s)
181 f3525003 Chrysostomos Nanakos
            self.offset += len(s)
182 f3525003 Chrysostomos Nanakos
            if datalen >= size:
183 f3525003 Chrysostomos Nanakos
                break
184 f3525003 Chrysostomos Nanakos
        return data
185 f3525003 Chrysostomos Nanakos
186 f3525003 Chrysostomos Nanakos
    def sync_read_chunks(self, chunksize, nr, offset=0):
187 f3525003 Chrysostomos Nanakos
        return file_sync_read_chunks(self, chunksize, nr, offset)