Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagomapper.py @ 9762de48

History | View | Annotate | Download (5.9 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from binascii import hexlify
35
import os
36
import re
37
import ctypes
38

    
39
from context_archipelago import ArchipelagoObject
40
from archipelago.common import (
41
    Request,
42
    xseg_reply_info,
43
    xseg_reply_map,
44
    xseg_reply_map_scatterlist,
45
    string_at,
46
    )
47

    
48
from pithos.workers import (
49
    glue,
50
    monkey,
51
    )
52

    
53
monkey.patch_Request()
54

    
55
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
56

    
57

    
58
class ArchipelagoMapper(object):
59
    """Mapper.
60
       Required constructor parameters: namelen.
61
    """
62

    
63
    namelen = None
64

    
65
    def __init__(self, **params):
66
        self.params = params
67
        self.namelen = params['namelen']
68
        ioctx_pool = glue.WorkerGlue().ioctx_pool
69
        cfg = {}
70
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
71
        cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+',
72
                                    bcfg).group(0).split(':')[1]
73
        cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+',
74
                                   bcfg).group(0).split(':')[1]
75
        self.ioctx_pool = ioctx_pool
76
        self.dst_port = int(cfg['blockerm'])
77
        self.mapperd_port = int(cfg['mapperd'])
78

    
79
    def _get_rear_map(self, maphash, create=0):
80
        name = hexlify(maphash)
81
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
82

    
83
    def _check_rear_map(self, maphash):
84
        name = hexlify(maphash)
85
        ioctx = self.ioctx_pool.pool_get()
86
        req = Request.get_info_request(ioctx, self.dst_port, name)
87
        req.submit()
88
        req.wait()
89
        ret = req.success()
90
        req.put()
91
        self.ioctx_pool.pool_put(ioctx)
92
        if ret:
93
            return True
94
        else:
95
            return False
96

    
97
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
98
        """Return as a list, part of the hashes map of an object
99
           at the given block offset.
100
           By default, return the whole hashes map.
101
        """
102
        namelen = self.namelen
103
        hashes = ()
104
        ioctx = self.ioctx_pool.pool_get()
105
        req = Request.get_info_request(ioctx, self.dst_port,
106
                                       hexlify(maphash))
107
        req.submit()
108
        req.wait()
109
        ret = req.success()
110
        if ret:
111
            info = req.get_data(_type=xseg_reply_info)
112
            size = int(info.contents.size)
113
            req.put()
114
        else:
115
            req.put()
116
            self.ioctx_pool.pool_put(ioctx)
117
            raise RuntimeError("Hashmap '%s' doesn't exists" %
118
                               hexlify(maphash))
119
        req = Request.get_read_request(ioctx, self.dst_port,
120
                                       hexlify(maphash), size=size)
121
        req.submit()
122
        req.wait()
123
        ret = req.success()
124
        if ret:
125
            data = string_at(req.get_data(), size)
126
            req.put()
127
            self.ioctx_pool.pool_put(ioctx)
128
            for idx in xrange(0, len(data), namelen):
129
                hashes = hashes + (data[idx:idx+namelen],)
130
            hashes = list(hashes)
131
        else:
132
            req.put()
133
            self.ioctx_pool.pool_put(ioctx)
134
            raise RuntimeError("Hashmap '%s' doesn't exists" %
135
                               hexlify(maphash))
136
        return hashes
137

    
138
    def map_retr_archipelago(self, maphash, size):
139
        """Retrieve Archipelago mapfile"""
140
        hashes = []
141
        ioctx = self.ioctx_pool.pool_get()
142
        maphash = maphash.split("archip:")[1]
143
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
144
                                       offset=0, size=size)
145
        req.submit()
146
        req.wait()
147
        ret = req.success()
148
        if ret:
149
            data = req.get_data(xseg_reply_map)
150
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
151
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
152
            hashes = [string_at(segs[idx].target, segs[idx].targetlen)
153
                    for idx in xrange(len(segs))]
154
            req.put()
155
        else:
156
            req.put()
157
            self.ioctx_pool.pool_put(ioctx)
158
            raise Exception("Could not retrieve Archipelago mapfile.")
159
        self.ioctx_pool.pool_put(ioctx)
160
        return hashes
161

    
162
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
163
        """Store hashes in the given hashes map."""
164
        namelen = self.namelen
165
        if self._check_rear_map(maphash):
166
            return
167
        with self._get_rear_map(maphash, 1) as rmap:
168
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)