Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagomapper.py @ 3759eddb

History | View | Annotate | Download (6.2 kB)

1 f3525003 Chrysostomos Nanakos
# Copyright 2013 GRNET S.A. All rights reserved.
2 f3525003 Chrysostomos Nanakos
#
3 f3525003 Chrysostomos Nanakos
# Redistribution and use in source and binary forms, with or
4 f3525003 Chrysostomos Nanakos
# without modification, are permitted provided that the following
5 f3525003 Chrysostomos Nanakos
# conditions are met:
6 f3525003 Chrysostomos Nanakos
#
7 f3525003 Chrysostomos Nanakos
#   1. Redistributions of source code must retain the above
8 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
9 f3525003 Chrysostomos Nanakos
#      disclaimer.
10 f3525003 Chrysostomos Nanakos
#
11 f3525003 Chrysostomos Nanakos
#   2. Redistributions in binary form must reproduce the above
12 f3525003 Chrysostomos Nanakos
#      copyright notice, this list of conditions and the following
13 f3525003 Chrysostomos Nanakos
#      disclaimer in the documentation and/or other materials
14 f3525003 Chrysostomos Nanakos
#      provided with the distribution.
15 f3525003 Chrysostomos Nanakos
#
16 f3525003 Chrysostomos Nanakos
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 f3525003 Chrysostomos Nanakos
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 f3525003 Chrysostomos Nanakos
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 f3525003 Chrysostomos Nanakos
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 f3525003 Chrysostomos Nanakos
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 f3525003 Chrysostomos Nanakos
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 f3525003 Chrysostomos Nanakos
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 f3525003 Chrysostomos Nanakos
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 f3525003 Chrysostomos Nanakos
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 f3525003 Chrysostomos Nanakos
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 f3525003 Chrysostomos Nanakos
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 f3525003 Chrysostomos Nanakos
# POSSIBILITY OF SUCH DAMAGE.
28 f3525003 Chrysostomos Nanakos
#
29 f3525003 Chrysostomos Nanakos
# The views and conclusions contained in the software and
30 f3525003 Chrysostomos Nanakos
# documentation are those of the authors and should not be
31 f3525003 Chrysostomos Nanakos
# interpreted as representing official policies, either expressed
32 f3525003 Chrysostomos Nanakos
# or implied, of GRNET S.A.
33 f3525003 Chrysostomos Nanakos
34 f3525003 Chrysostomos Nanakos
from binascii import hexlify
35 b5636704 Chrysostomos Nanakos
import os
36 b5636704 Chrysostomos Nanakos
import re
37 f3525003 Chrysostomos Nanakos
import ctypes
38 3759eddb Filippos Giannakos
import logging
39 f3525003 Chrysostomos Nanakos
40 f3525003 Chrysostomos Nanakos
from context_archipelago import ArchipelagoObject
41 f3525003 Chrysostomos Nanakos
from archipelago.common import (
42 f3525003 Chrysostomos Nanakos
    Request,
43 f3525003 Chrysostomos Nanakos
    xseg_reply_info,
44 f3525003 Chrysostomos Nanakos
    xseg_reply_map,
45 f3525003 Chrysostomos Nanakos
    xseg_reply_map_scatterlist,
46 f3525003 Chrysostomos Nanakos
    string_at,
47 f3525003 Chrysostomos Nanakos
    )
48 f3525003 Chrysostomos Nanakos
49 f3525003 Chrysostomos Nanakos
from pithos.workers import (
50 f3525003 Chrysostomos Nanakos
    glue,
51 f3525003 Chrysostomos Nanakos
    monkey,
52 f3525003 Chrysostomos Nanakos
    )
53 f3525003 Chrysostomos Nanakos
54 f3525003 Chrysostomos Nanakos
monkey.patch_Request()
55 f3525003 Chrysostomos Nanakos
56 3759eddb Filippos Giannakos
logger = logging.getLogger(__name__)
57 f3525003 Chrysostomos Nanakos
58 f3525003 Chrysostomos Nanakos
class ArchipelagoMapper(object):
59 f3525003 Chrysostomos Nanakos
    """Mapper.
60 f3525003 Chrysostomos Nanakos
       Required constructor parameters: namelen.
61 f3525003 Chrysostomos Nanakos
    """
62 f3525003 Chrysostomos Nanakos
63 f3525003 Chrysostomos Nanakos
    namelen = None
64 f3525003 Chrysostomos Nanakos
65 f3525003 Chrysostomos Nanakos
    def __init__(self, **params):
66 f3525003 Chrysostomos Nanakos
        self.params = params
67 f3525003 Chrysostomos Nanakos
        self.namelen = params['namelen']
68 f3525003 Chrysostomos Nanakos
        cfg = {}
69 f9093bea Chrysostomos Nanakos
        bcfg = open(glue.WorkerGlue.ArchipelagoConfFile).read()
70 f3525003 Chrysostomos Nanakos
        cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+',
71 b5636704 Chrysostomos Nanakos
                                    bcfg).group(0).split(':')[1]
72 f3525003 Chrysostomos Nanakos
        cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+',
73 b5636704 Chrysostomos Nanakos
                                   bcfg).group(0).split(':')[1]
74 f9093bea Chrysostomos Nanakos
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
75 f3525003 Chrysostomos Nanakos
        self.dst_port = int(cfg['blockerm'])
76 f3525003 Chrysostomos Nanakos
        self.mapperd_port = int(cfg['mapperd'])
77 f3525003 Chrysostomos Nanakos
78 f3525003 Chrysostomos Nanakos
    def _get_rear_map(self, maphash, create=0):
79 f3525003 Chrysostomos Nanakos
        name = hexlify(maphash)
80 f3525003 Chrysostomos Nanakos
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
81 f3525003 Chrysostomos Nanakos
82 f3525003 Chrysostomos Nanakos
    def _check_rear_map(self, maphash):
83 f3525003 Chrysostomos Nanakos
        name = hexlify(maphash)
84 f3525003 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
85 f3525003 Chrysostomos Nanakos
        req = Request.get_info_request(ioctx, self.dst_port, name)
86 f3525003 Chrysostomos Nanakos
        req.submit()
87 f3525003 Chrysostomos Nanakos
        req.wait()
88 f3525003 Chrysostomos Nanakos
        ret = req.success()
89 f3525003 Chrysostomos Nanakos
        req.put()
90 f3525003 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
91 f3525003 Chrysostomos Nanakos
        if ret:
92 f3525003 Chrysostomos Nanakos
            return True
93 f3525003 Chrysostomos Nanakos
        else:
94 f3525003 Chrysostomos Nanakos
            return False
95 f3525003 Chrysostomos Nanakos
96 f3525003 Chrysostomos Nanakos
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
97 f3525003 Chrysostomos Nanakos
        """Return as a list, part of the hashes map of an object
98 f3525003 Chrysostomos Nanakos
           at the given block offset.
99 f3525003 Chrysostomos Nanakos
           By default, return the whole hashes map.
100 f3525003 Chrysostomos Nanakos
        """
101 f3525003 Chrysostomos Nanakos
        namelen = self.namelen
102 f3525003 Chrysostomos Nanakos
        hashes = ()
103 f3525003 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
104 f3525003 Chrysostomos Nanakos
        req = Request.get_info_request(ioctx, self.dst_port,
105 f3525003 Chrysostomos Nanakos
                                       hexlify(maphash))
106 f3525003 Chrysostomos Nanakos
        req.submit()
107 f3525003 Chrysostomos Nanakos
        req.wait()
108 f3525003 Chrysostomos Nanakos
        ret = req.success()
109 f3525003 Chrysostomos Nanakos
        if ret:
110 f3525003 Chrysostomos Nanakos
            info = req.get_data(_type=xseg_reply_info)
111 f3525003 Chrysostomos Nanakos
            size = int(info.contents.size)
112 f3525003 Chrysostomos Nanakos
            req.put()
113 f3525003 Chrysostomos Nanakos
        else:
114 f3525003 Chrysostomos Nanakos
            req.put()
115 f3525003 Chrysostomos Nanakos
            self.ioctx_pool.pool_put(ioctx)
116 b5636704 Chrysostomos Nanakos
            raise RuntimeError("Hashmap '%s' doesn't exists" %
117 b5636704 Chrysostomos Nanakos
                               hexlify(maphash))
118 f3525003 Chrysostomos Nanakos
        req = Request.get_read_request(ioctx, self.dst_port,
119 b5636704 Chrysostomos Nanakos
                                       hexlify(maphash), size=size)
120 f3525003 Chrysostomos Nanakos
        req.submit()
121 f3525003 Chrysostomos Nanakos
        req.wait()
122 f3525003 Chrysostomos Nanakos
        ret = req.success()
123 f3525003 Chrysostomos Nanakos
        if ret:
124 b5636704 Chrysostomos Nanakos
            data = string_at(req.get_data(), size)
125 f3525003 Chrysostomos Nanakos
            req.put()
126 f3525003 Chrysostomos Nanakos
            self.ioctx_pool.pool_put(ioctx)
127 b5636704 Chrysostomos Nanakos
            for idx in xrange(0, len(data), namelen):
128 f8e0f0ed Chrysostomos Nanakos
                hashes = hashes + (data[idx:idx + namelen],)
129 f3525003 Chrysostomos Nanakos
            hashes = list(hashes)
130 f3525003 Chrysostomos Nanakos
        else:
131 f3525003 Chrysostomos Nanakos
            req.put()
132 f3525003 Chrysostomos Nanakos
            self.ioctx_pool.pool_put(ioctx)
133 b5636704 Chrysostomos Nanakos
            raise RuntimeError("Hashmap '%s' doesn't exists" %
134 b5636704 Chrysostomos Nanakos
                               hexlify(maphash))
135 f3525003 Chrysostomos Nanakos
        return hashes
136 f3525003 Chrysostomos Nanakos
137 f3525003 Chrysostomos Nanakos
    def map_retr_archipelago(self, maphash, size):
138 f3525003 Chrysostomos Nanakos
        """Retrieve Archipelago mapfile"""
139 9762de48 Filippos Giannakos
        hashes = []
140 f3525003 Chrysostomos Nanakos
        ioctx = self.ioctx_pool.pool_get()
141 f3525003 Chrysostomos Nanakos
        maphash = maphash.split("archip:")[1]
142 f3525003 Chrysostomos Nanakos
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
143 f3525003 Chrysostomos Nanakos
                                       offset=0, size=size)
144 f3525003 Chrysostomos Nanakos
        req.submit()
145 f3525003 Chrysostomos Nanakos
        req.wait()
146 f3525003 Chrysostomos Nanakos
        ret = req.success()
147 f3525003 Chrysostomos Nanakos
        if ret:
148 f3525003 Chrysostomos Nanakos
            data = req.get_data(xseg_reply_map)
149 f3525003 Chrysostomos Nanakos
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
150 f3525003 Chrysostomos Nanakos
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
151 9762de48 Filippos Giannakos
            hashes = [string_at(segs[idx].target, segs[idx].targetlen)
152 9762de48 Filippos Giannakos
                    for idx in xrange(len(segs))]
153 f3525003 Chrysostomos Nanakos
            req.put()
154 f3525003 Chrysostomos Nanakos
        else:
155 f3525003 Chrysostomos Nanakos
            req.put()
156 f3525003 Chrysostomos Nanakos
            self.ioctx_pool.pool_put(ioctx)
157 f3525003 Chrysostomos Nanakos
            raise Exception("Could not retrieve Archipelago mapfile.")
158 3759eddb Filippos Giannakos
        req = Request.get_close_request(ioctx, self.mapperd_port, maphash);
159 3759eddb Filippos Giannakos
        req.submit()
160 3759eddb Filippos Giannakos
        req.wait()
161 3759eddb Filippos Giannakos
        ret = req.success();
162 3759eddb Filippos Giannakos
        if ret is False:
163 3759eddb Filippos Giannakos
            logger.warning("Could not close map %s" % maphash)
164 3759eddb Filippos Giannakos
            pass
165 3759eddb Filippos Giannakos
        req.put();
166 f3525003 Chrysostomos Nanakos
        self.ioctx_pool.pool_put(ioctx)
167 9762de48 Filippos Giannakos
        return hashes
168 f3525003 Chrysostomos Nanakos
169 f3525003 Chrysostomos Nanakos
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
170 f3525003 Chrysostomos Nanakos
        """Store hashes in the given hashes map."""
171 f3525003 Chrysostomos Nanakos
        namelen = self.namelen
172 f3525003 Chrysostomos Nanakos
        if self._check_rear_map(maphash):
173 f3525003 Chrysostomos Nanakos
            return
174 f3525003 Chrysostomos Nanakos
        with self._get_rear_map(maphash, 1) as rmap:
175 f3525003 Chrysostomos Nanakos
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)