root / snf-pithos-backend / pithos / backends / lib / hashfiler / context_archipelago.py @ 3759eddb
History | View | Annotate | Download (5.9 kB)
1 | f3525003 | Chrysostomos Nanakos | # Copyright 2013 GRNET S.A. All rights reserved.
|
---|---|---|---|
2 | f3525003 | Chrysostomos Nanakos | #
|
3 | f3525003 | Chrysostomos Nanakos | # Redistribution and use in source and binary forms, with or
|
4 | f3525003 | Chrysostomos Nanakos | # without modification, are permitted provided that the following
|
5 | f3525003 | Chrysostomos Nanakos | # conditions are met:
|
6 | f3525003 | Chrysostomos Nanakos | #
|
7 | f3525003 | Chrysostomos Nanakos | # 1. Redistributions of source code must retain the above
|
8 | f3525003 | Chrysostomos Nanakos | # copyright notice, this list of conditions and the following
|
9 | f3525003 | Chrysostomos Nanakos | # disclaimer.
|
10 | f3525003 | Chrysostomos Nanakos | #
|
11 | f3525003 | Chrysostomos Nanakos | # 2. Redistributions in binary form must reproduce the above
|
12 | f3525003 | Chrysostomos Nanakos | # copyright notice, this list of conditions and the following
|
13 | f3525003 | Chrysostomos Nanakos | # disclaimer in the documentation and/or other materials
|
14 | f3525003 | Chrysostomos Nanakos | # provided with the distribution.
|
15 | f3525003 | Chrysostomos Nanakos | #
|
16 | f3525003 | Chrysostomos Nanakos | # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
|
17 | f3525003 | Chrysostomos Nanakos | # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
18 | f3525003 | Chrysostomos Nanakos | # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
19 | f3525003 | Chrysostomos Nanakos | # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
|
20 | f3525003 | Chrysostomos Nanakos | # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
21 | f3525003 | Chrysostomos Nanakos | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
22 | f3525003 | Chrysostomos Nanakos | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
|
23 | f3525003 | Chrysostomos Nanakos | # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
|
24 | f3525003 | Chrysostomos Nanakos | # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
25 | f3525003 | Chrysostomos Nanakos | # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
26 | f3525003 | Chrysostomos Nanakos | # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
27 | f3525003 | Chrysostomos Nanakos | # POSSIBILITY OF SUCH DAMAGE.
|
28 | f3525003 | Chrysostomos Nanakos | #
|
29 | f3525003 | Chrysostomos Nanakos | # The views and conclusions contained in the software and
|
30 | f3525003 | Chrysostomos Nanakos | # documentation are those of the authors and should not be
|
31 | f3525003 | Chrysostomos Nanakos | # interpreted as representing official policies, either expressed
|
32 | f3525003 | Chrysostomos Nanakos | # or implied, of GRNET S.A.
|
33 | f3525003 | Chrysostomos Nanakos | |
34 | f3525003 | Chrysostomos Nanakos | from os import SEEK_CUR, SEEK_SET |
35 | f3525003 | Chrysostomos Nanakos | from archipelago.common import ( |
36 | b5636704 | Chrysostomos Nanakos | Request, |
37 | b5636704 | Chrysostomos Nanakos | string_at, |
38 | b5636704 | Chrysostomos Nanakos | ) |
39 | f3525003 | Chrysostomos Nanakos | from pithos.workers import monkey |
40 | f3525003 | Chrysostomos Nanakos | monkey.patch_Request() |
41 | f3525003 | Chrysostomos Nanakos | |
42 | f3525003 | Chrysostomos Nanakos | _zeros = ''
|
43 | f3525003 | Chrysostomos Nanakos | |
44 | f3525003 | Chrysostomos Nanakos | |
45 | f3525003 | Chrysostomos Nanakos | def zeros(nr): |
46 | f3525003 | Chrysostomos Nanakos | global _zeros
|
47 | f3525003 | Chrysostomos Nanakos | size = len(_zeros)
|
48 | f3525003 | Chrysostomos Nanakos | if nr == size:
|
49 | f3525003 | Chrysostomos Nanakos | return _zeros
|
50 | f3525003 | Chrysostomos Nanakos | |
51 | f3525003 | Chrysostomos Nanakos | if nr > size:
|
52 | f3525003 | Chrysostomos Nanakos | _zeros += '\0' * (nr - size)
|
53 | f3525003 | Chrysostomos Nanakos | return _zeros
|
54 | f3525003 | Chrysostomos Nanakos | |
55 | f3525003 | Chrysostomos Nanakos | if nr < size:
|
56 | f3525003 | Chrysostomos Nanakos | _zeros = _zeros[:nr] |
57 | f3525003 | Chrysostomos Nanakos | return _zeros
|
58 | f3525003 | Chrysostomos Nanakos | |
59 | f3525003 | Chrysostomos Nanakos | |
60 | f3525003 | Chrysostomos Nanakos | def file_sync_write_chunks(archipelagoobject, chunksize, offset, |
61 | f3525003 | Chrysostomos Nanakos | chunks, size=None):
|
62 | f3525003 | Chrysostomos Nanakos | """Write given chunks to the given buffered file object.
|
63 | f3525003 | Chrysostomos Nanakos | Writes never span across chunk boundaries.
|
64 | f3525003 | Chrysostomos Nanakos | If size is given stop after or pad until size bytes have been written.
|
65 | f3525003 | Chrysostomos Nanakos | """
|
66 | f3525003 | Chrysostomos Nanakos | padding = 0
|
67 | f3525003 | Chrysostomos Nanakos | cursize = chunksize * offset |
68 | f3525003 | Chrysostomos Nanakos | archipelagoobject.seek(cursize) |
69 | f3525003 | Chrysostomos Nanakos | for chunk in chunks: |
70 | f3525003 | Chrysostomos Nanakos | if padding:
|
71 | f3525003 | Chrysostomos Nanakos | archipelagoobject.sync_write(buffer(zeros(chunksize), 0, padding)) |
72 | f3525003 | Chrysostomos Nanakos | if size is not None and cursize + chunksize >= size: |
73 | f3525003 | Chrysostomos Nanakos | chunk = chunk[:chunksize - (cursize - size)] |
74 | f3525003 | Chrysostomos Nanakos | archipelagoobject.sync_write(chunk) |
75 | f3525003 | Chrysostomos Nanakos | cursize += len(chunk)
|
76 | f3525003 | Chrysostomos Nanakos | break
|
77 | f3525003 | Chrysostomos Nanakos | archipelagoobject.sync_write(chunk) |
78 | f3525003 | Chrysostomos Nanakos | padding = chunksize - len(chunk)
|
79 | f3525003 | Chrysostomos Nanakos | |
80 | f3525003 | Chrysostomos Nanakos | padding = size - cursize if size is not None else 0 |
81 | f3525003 | Chrysostomos Nanakos | if padding <= 0: |
82 | f3525003 | Chrysostomos Nanakos | return
|
83 | f3525003 | Chrysostomos Nanakos | |
84 | f3525003 | Chrysostomos Nanakos | q, r = divmod(padding, chunksize)
|
85 | f3525003 | Chrysostomos Nanakos | for x in xrange(q): |
86 | f3525003 | Chrysostomos Nanakos | archipelagoobject.sync_write(zeros(chunksize)) |
87 | f3525003 | Chrysostomos Nanakos | archipelagoobject.sync_write(buffer(zeros(chunksize), 0, r)) |
88 | f3525003 | Chrysostomos Nanakos | |
89 | f3525003 | Chrysostomos Nanakos | |
90 | f3525003 | Chrysostomos Nanakos | def file_sync_read_chunks(archipelagoobject, chunksize, nr, offset=0): |
91 | f3525003 | Chrysostomos Nanakos | """Read and yield groups of chunks from a buffered file object at offset.
|
92 | f3525003 | Chrysostomos Nanakos | Reads never span accros chunksize boundaries.
|
93 | f3525003 | Chrysostomos Nanakos | """
|
94 | f3525003 | Chrysostomos Nanakos | archipelagoobject.seek(offset * chunksize) |
95 | f3525003 | Chrysostomos Nanakos | while nr:
|
96 | f3525003 | Chrysostomos Nanakos | remains = chunksize |
97 | f3525003 | Chrysostomos Nanakos | chunk = ''
|
98 | f3525003 | Chrysostomos Nanakos | while 1: |
99 | f3525003 | Chrysostomos Nanakos | s = archipelagoobject.sync_read(remains) |
100 | f3525003 | Chrysostomos Nanakos | if not s: |
101 | f3525003 | Chrysostomos Nanakos | if chunk:
|
102 | f3525003 | Chrysostomos Nanakos | yield chunk
|
103 | f3525003 | Chrysostomos Nanakos | return
|
104 | f3525003 | Chrysostomos Nanakos | chunk += s |
105 | f3525003 | Chrysostomos Nanakos | remains -= len(s)
|
106 | f3525003 | Chrysostomos Nanakos | if remains <= 0: |
107 | f3525003 | Chrysostomos Nanakos | break
|
108 | f3525003 | Chrysostomos Nanakos | yield chunk
|
109 | f3525003 | Chrysostomos Nanakos | nr -= 1
|
110 | f3525003 | Chrysostomos Nanakos | |
111 | f3525003 | Chrysostomos Nanakos | |
112 | f3525003 | Chrysostomos Nanakos | class ArchipelagoObject(object): |
113 | f3525003 | Chrysostomos Nanakos | __slots__ = ("name", "ioctx_pool", "dst_port", "create", "offset") |
114 | f3525003 | Chrysostomos Nanakos | |
115 | f3525003 | Chrysostomos Nanakos | def __init__(self, name, ioctx_pool, dst_port=None, create=0): |
116 | f3525003 | Chrysostomos Nanakos | self.name = name
|
117 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool = ioctx_pool
|
118 | f3525003 | Chrysostomos Nanakos | self.create = create
|
119 | f3525003 | Chrysostomos Nanakos | self.dst_port = dst_port
|
120 | f3525003 | Chrysostomos Nanakos | self.offset = 0 |
121 | f3525003 | Chrysostomos Nanakos | |
122 | f3525003 | Chrysostomos Nanakos | def __enter__(self): |
123 | f3525003 | Chrysostomos Nanakos | return self |
124 | f3525003 | Chrysostomos Nanakos | |
125 | f3525003 | Chrysostomos Nanakos | def __exit__(self, exc, arg, trace): |
126 | f3525003 | Chrysostomos Nanakos | return False |
127 | f3525003 | Chrysostomos Nanakos | |
128 | f3525003 | Chrysostomos Nanakos | def seek(self, offset, whence=SEEK_SET): |
129 | f3525003 | Chrysostomos Nanakos | if whence == SEEK_CUR:
|
130 | f3525003 | Chrysostomos Nanakos | offset += self.offset
|
131 | f3525003 | Chrysostomos Nanakos | self.offset = offset
|
132 | f3525003 | Chrysostomos Nanakos | return offset
|
133 | f3525003 | Chrysostomos Nanakos | |
134 | f3525003 | Chrysostomos Nanakos | def tell(self): |
135 | f3525003 | Chrysostomos Nanakos | return self.offset |
136 | f3525003 | Chrysostomos Nanakos | |
137 | f3525003 | Chrysostomos Nanakos | def truncate(self, size): |
138 | f3525003 | Chrysostomos Nanakos | raise NotImplementedError("File truncation is not implemented yet \ |
139 | f3525003 | Chrysostomos Nanakos | in archipelago")
|
140 | f3525003 | Chrysostomos Nanakos | |
141 | f3525003 | Chrysostomos Nanakos | def sync_write(self, data): |
142 | b5636704 | Chrysostomos Nanakos | ioctx = self.ioctx_pool.pool_get()
|
143 | f3525003 | Chrysostomos Nanakos | req = Request.get_write_request(ioctx, self.dst_port, self.name, |
144 | f3525003 | Chrysostomos Nanakos | data=data, offset=self.offset,
|
145 | f3525003 | Chrysostomos Nanakos | datalen=len(data))
|
146 | f3525003 | Chrysostomos Nanakos | req.submit() |
147 | f3525003 | Chrysostomos Nanakos | req.wait() |
148 | f3525003 | Chrysostomos Nanakos | ret = req.success() |
149 | f3525003 | Chrysostomos Nanakos | req.put() |
150 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.pool_put(ioctx)
|
151 | f3525003 | Chrysostomos Nanakos | if ret:
|
152 | f3525003 | Chrysostomos Nanakos | self.offset += len(data) |
153 | f3525003 | Chrysostomos Nanakos | else:
|
154 | f3525003 | Chrysostomos Nanakos | raise IOError("archipelago: Write request error") |
155 | f3525003 | Chrysostomos Nanakos | |
156 | f3525003 | Chrysostomos Nanakos | def sync_write_chunks(self, chunksize, offset, chunks, size=None): |
157 | b5636704 | Chrysostomos Nanakos | return file_sync_write_chunks(self, chunksize, offset, chunks, size) |
158 | f3525003 | Chrysostomos Nanakos | |
159 | f3525003 | Chrysostomos Nanakos | def sync_read(self, size): |
160 | f3525003 | Chrysostomos Nanakos | read = Request.get_read_request |
161 | f3525003 | Chrysostomos Nanakos | data = ''
|
162 | f3525003 | Chrysostomos Nanakos | datalen = 0
|
163 | f3525003 | Chrysostomos Nanakos | dsize = size |
164 | f3525003 | Chrysostomos Nanakos | while 1: |
165 | b5636704 | Chrysostomos Nanakos | ioctx = self.ioctx_pool.pool_get()
|
166 | f3525003 | Chrysostomos Nanakos | req = read(ioctx, self.dst_port,
|
167 | f8e0f0ed | Chrysostomos Nanakos | self.name, size=dsize - datalen, offset=self.offset) |
168 | f3525003 | Chrysostomos Nanakos | req.submit() |
169 | f3525003 | Chrysostomos Nanakos | req.wait() |
170 | f3525003 | Chrysostomos Nanakos | ret = req.success() |
171 | f3525003 | Chrysostomos Nanakos | if ret:
|
172 | f8e0f0ed | Chrysostomos Nanakos | s = string_at(req.get_data(), dsize - datalen) |
173 | f3525003 | Chrysostomos Nanakos | else:
|
174 | f3525003 | Chrysostomos Nanakos | s = None
|
175 | f3525003 | Chrysostomos Nanakos | req.put() |
176 | f3525003 | Chrysostomos Nanakos | self.ioctx_pool.pool_put(ioctx)
|
177 | f3525003 | Chrysostomos Nanakos | if not s: |
178 | f3525003 | Chrysostomos Nanakos | break
|
179 | f3525003 | Chrysostomos Nanakos | data += s |
180 | f3525003 | Chrysostomos Nanakos | datalen += len(s)
|
181 | f3525003 | Chrysostomos Nanakos | self.offset += len(s) |
182 | f3525003 | Chrysostomos Nanakos | if datalen >= size:
|
183 | f3525003 | Chrysostomos Nanakos | break
|
184 | f3525003 | Chrysostomos Nanakos | return data
|
185 | f3525003 | Chrysostomos Nanakos | |
186 | f3525003 | Chrysostomos Nanakos | def sync_read_chunks(self, chunksize, nr, offset=0): |
187 | f3525003 | Chrysostomos Nanakos | return file_sync_read_chunks(self, chunksize, nr, offset) |