Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagomapper.py @ 32293ec0

History | View | Annotate | Download (5.8 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from binascii import hexlify
35
import os, re
36
import ctypes
37

    
38
from context_archipelago import ArchipelagoObject
39
from archipelago.common import (
40
    Request,
41
    xseg_reply_info,
42
    xseg_reply_map,
43
    xseg_reply_map_scatterlist,
44
    string_at,
45
    )
46

    
47
from pithos.workers import (
48
    glue,
49
    monkey,
50
    )
51

    
52
monkey.patch_Request()
53

    
54
from pithos.api.settings import BACKEND_ARCHIPELAGO_CONF
55

    
56

    
57
class ArchipelagoMapper(object):
58
    """Mapper.
59
       Required constructor parameters: namelen.
60
    """
61

    
62
    namelen = None
63

    
64
    def __init__(self, **params):
65
        self.params = params
66
        self.namelen = params['namelen']
67
        ioctx_pool = glue.WorkerGlue().ioctx_pool
68
        cfg = {}
69
        bcfg = open(BACKEND_ARCHIPELAGO_CONF).read()
70
        cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+',
71
                                        bcfg).group(0).split(':')[1]
72
        cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+',
73
                                        bcfg).group(0).split(':')[1]
74
        self.ioctx_pool = ioctx_pool
75
        self.dst_port = int(cfg['blockerm'])
76
        self.mapperd_port = int(cfg['mapperd'])
77

    
78
    def _get_rear_map(self, maphash, create=0):
79
        name = hexlify(maphash)
80
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
81

    
82
    def _check_rear_map(self, maphash):
83
        name = hexlify(maphash)
84
        ioctx = self.ioctx_pool.pool_get()
85
        req = Request.get_info_request(ioctx, self.dst_port, name)
86
        req.submit()
87
        req.wait()
88
        ret = req.success()
89
        req.put()
90
        self.ioctx_pool.pool_put(ioctx)
91
        if ret:
92
            return True
93
        else:
94
            return False
95

    
96
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
97
        """Return as a list, part of the hashes map of an object
98
           at the given block offset.
99
           By default, return the whole hashes map.
100
        """
101
        namelen = self.namelen
102
        hashes = ()
103
        ioctx = self.ioctx_pool.pool_get()
104
        req = Request.get_info_request(ioctx, self.dst_port,
105
                                       hexlify(maphash))
106
        req.submit()
107
        req.wait()
108
        ret = req.success()
109
        if ret:
110
            info = req.get_data(_type=xseg_reply_info)
111
            size = int(info.contents.size)
112
            req.put()
113
        else:
114
            req.put()
115
            self.ioctx_pool.pool_put(ioctx)
116
            raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash))
117
        req = Request.get_read_request(ioctx, self.dst_port,
118
                                       hexlify(maphash), size = size)
119
        req.submit()
120
        req.wait()
121
        ret = req.success()
122
        if ret:
123
            data = string_at(req.get_data(),size)
124
            req.put()
125
            self.ioctx_pool.pool_put(ioctx)
126
            for idx in xrange(0,len(data),namelen):
127
                hashes = hashes + (data[idx:idx+namelen],)
128
            hashes = list(hashes)
129
        else:
130
            req.put()
131
            self.ioctx_pool.pool_put(ioctx)
132
            raise RuntimeError("Hashmap '%s' doesn't exists" % hexlify(maphash))
133
        return hashes
134

    
135
    def map_retr_archipelago(self, maphash, size):
136
        """Retrieve Archipelago mapfile"""
137
        ioctx = self.ioctx_pool.pool_get()
138
        maphash = maphash.split("archip:")[1]
139
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
140
                                       offset=0, size=size)
141
        req.submit()
142
        req.wait()
143
        ret = req.success()
144
        if ret:
145
            data = req.get_data(xseg_reply_map)
146
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
147
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
148
            req.put()
149
        else:
150
            req.put()
151
            self.ioctx_pool.pool_put(ioctx)
152
            raise Exception("Could not retrieve Archipelago mapfile.")
153
        self.ioctx_pool.pool_put(ioctx)
154
        return [string_at(segs[idx].target, segs[idx].targetlen) for idx in xrange(len(segs))]
155

    
156
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
157
        """Store hashes in the given hashes map."""
158
        namelen = self.namelen
159
        if self._check_rear_map(maphash):
160
            return
161
        with self._get_rear_map(maphash, 1) as rmap:
162
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)