Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagomapper.py @ 7be22e8d

History | View | Annotate | Download (6.1 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from binascii import hexlify
35
import os
36
import re
37
import ctypes
38
import ConfigParser
39
import logging
40

    
41
from context_archipelago import ArchipelagoObject
42
from archipelago.common import (
43
    Request,
44
    xseg_reply_info,
45
    xseg_reply_map,
46
    xseg_reply_map_scatterlist,
47
    string_at,
48
    )
49

    
50
from pithos.workers import (
51
    glue,
52
    monkey,
53
    )
54

    
55
monkey.patch_Request()
56

    
57
logger = logging.getLogger(__name__)
58

    
59
class ArchipelagoMapper(object):
60
    """Mapper.
61
       Required constructor parameters: namelen.
62
    """
63

    
64
    namelen = None
65

    
66
    def __init__(self, **params):
67
        self.params = params
68
        self.namelen = params['namelen']
69
        cfg = {}
70
        bcfg = ConfigParser.ConfigParser()
71
        bcfg.readfp(open(glue.WorkerGlue.ArchipelagoConfFile))
72
        cfg['blockerm'] = bcfg.getint('mapperd','blockerm_port')
73
        cfg['mapperd'] = bcfg.getint('vlmcd','mapper_port')
74
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
75
        self.dst_port = int(cfg['blockerm'])
76
        self.mapperd_port = int(cfg['mapperd'])
77

    
78
    def _get_rear_map(self, maphash, create=0):
79
        name = hexlify(maphash)
80
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
81

    
82
    def _check_rear_map(self, maphash):
83
        name = hexlify(maphash)
84
        ioctx = self.ioctx_pool.pool_get()
85
        req = Request.get_info_request(ioctx, self.dst_port, name)
86
        req.submit()
87
        req.wait()
88
        ret = req.success()
89
        req.put()
90
        self.ioctx_pool.pool_put(ioctx)
91
        if ret:
92
            return True
93
        else:
94
            return False
95

    
96
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
97
        """Return as a list, part of the hashes map of an object
98
           at the given block offset.
99
           By default, return the whole hashes map.
100
        """
101
        namelen = self.namelen
102
        hashes = ()
103
        ioctx = self.ioctx_pool.pool_get()
104
        req = Request.get_info_request(ioctx, self.dst_port,
105
                                       hexlify(maphash))
106
        req.submit()
107
        req.wait()
108
        ret = req.success()
109
        if ret:
110
            info = req.get_data(_type=xseg_reply_info)
111
            size = int(info.contents.size)
112
            req.put()
113
        else:
114
            req.put()
115
            self.ioctx_pool.pool_put(ioctx)
116
            raise RuntimeError("Hashmap '%s' doesn't exists" %
117
                               hexlify(maphash))
118
        req = Request.get_read_request(ioctx, self.dst_port,
119
                                       hexlify(maphash), size=size)
120
        req.submit()
121
        req.wait()
122
        ret = req.success()
123
        if ret:
124
            data = string_at(req.get_data(), size)
125
            req.put()
126
            self.ioctx_pool.pool_put(ioctx)
127
            for idx in xrange(0, len(data), namelen):
128
                hashes = hashes + (data[idx:idx + namelen],)
129
            hashes = list(hashes)
130
        else:
131
            req.put()
132
            self.ioctx_pool.pool_put(ioctx)
133
            raise RuntimeError("Hashmap '%s' doesn't exists" %
134
                               hexlify(maphash))
135
        return hashes
136

    
137
    def map_retr_archipelago(self, maphash, size):
138
        """Retrieve Archipelago mapfile"""
139
        hashes = []
140
        ioctx = self.ioctx_pool.pool_get()
141
        maphash = maphash.split("archip:")[1]
142
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
143
                                       offset=0, size=size)
144
        req.submit()
145
        req.wait()
146
        ret = req.success()
147
        if ret:
148
            data = req.get_data(xseg_reply_map)
149
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
150
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
151
            hashes = [string_at(segs[idx].target, segs[idx].targetlen)
152
                    for idx in xrange(len(segs))]
153
            req.put()
154
        else:
155
            req.put()
156
            self.ioctx_pool.pool_put(ioctx)
157
            raise Exception("Could not retrieve Archipelago mapfile.")
158
        req = Request.get_close_request(ioctx, self.mapperd_port, maphash);
159
        req.submit()
160
        req.wait()
161
        ret = req.success();
162
        if ret is False:
163
            logger.warning("Could not close map %s" % maphash)
164
            pass
165
        req.put();
166
        self.ioctx_pool.pool_put(ioctx)
167
        return hashes
168

    
169
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
170
        """Store hashes in the given hashes map."""
171
        namelen = self.namelen
172
        if self._check_rear_map(maphash):
173
            return
174
        with self._get_rear_map(maphash, 1) as rmap:
175
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)