Statistics
| Branch: | Tag: | Revision:

root / snf-pithos-backend / pithos / backends / lib / hashfiler / archipelagomapper.py @ f9093bea

History | View | Annotate | Download (5.8 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
from binascii import hexlify
35
import os
36
import re
37
import ctypes
38

    
39
from context_archipelago import ArchipelagoObject
40
from archipelago.common import (
41
    Request,
42
    xseg_reply_info,
43
    xseg_reply_map,
44
    xseg_reply_map_scatterlist,
45
    string_at,
46
    )
47

    
48
from pithos.workers import (
49
    glue,
50
    monkey,
51
    )
52

    
53
monkey.patch_Request()
54

    
55

    
56
class ArchipelagoMapper(object):
57
    """Mapper.
58
       Required constructor parameters: namelen.
59
    """
60

    
61
    namelen = None
62

    
63
    def __init__(self, **params):
64
        self.params = params
65
        self.namelen = params['namelen']
66
        cfg = {}
67
        bcfg = open(glue.WorkerGlue.ArchipelagoConfFile).read()
68
        cfg['blockerm'] = re.search('\'blockerm_port\'\s*:\s*\d+',
69
                                    bcfg).group(0).split(':')[1]
70
        cfg['mapperd'] = re.search('\'mapper_port\'\s*:\s*\d+',
71
                                   bcfg).group(0).split(':')[1]
72
        self.ioctx_pool = glue.WorkerGlue().ioctx_pool
73
        self.dst_port = int(cfg['blockerm'])
74
        self.mapperd_port = int(cfg['mapperd'])
75

    
76
    def _get_rear_map(self, maphash, create=0):
77
        name = hexlify(maphash)
78
        return ArchipelagoObject(name, self.ioctx_pool, self.dst_port, create)
79

    
80
    def _check_rear_map(self, maphash):
81
        name = hexlify(maphash)
82
        ioctx = self.ioctx_pool.pool_get()
83
        req = Request.get_info_request(ioctx, self.dst_port, name)
84
        req.submit()
85
        req.wait()
86
        ret = req.success()
87
        req.put()
88
        self.ioctx_pool.pool_put(ioctx)
89
        if ret:
90
            return True
91
        else:
92
            return False
93

    
94
    def map_retr(self, maphash, blkoff=0, nr=100000000000000):
95
        """Return as a list, part of the hashes map of an object
96
           at the given block offset.
97
           By default, return the whole hashes map.
98
        """
99
        namelen = self.namelen
100
        hashes = ()
101
        ioctx = self.ioctx_pool.pool_get()
102
        req = Request.get_info_request(ioctx, self.dst_port,
103
                                       hexlify(maphash))
104
        req.submit()
105
        req.wait()
106
        ret = req.success()
107
        if ret:
108
            info = req.get_data(_type=xseg_reply_info)
109
            size = int(info.contents.size)
110
            req.put()
111
        else:
112
            req.put()
113
            self.ioctx_pool.pool_put(ioctx)
114
            raise RuntimeError("Hashmap '%s' doesn't exists" %
115
                               hexlify(maphash))
116
        req = Request.get_read_request(ioctx, self.dst_port,
117
                                       hexlify(maphash), size=size)
118
        req.submit()
119
        req.wait()
120
        ret = req.success()
121
        if ret:
122
            data = string_at(req.get_data(), size)
123
            req.put()
124
            self.ioctx_pool.pool_put(ioctx)
125
            for idx in xrange(0, len(data), namelen):
126
                hashes = hashes + (data[idx:idx+namelen],)
127
            hashes = list(hashes)
128
        else:
129
            req.put()
130
            self.ioctx_pool.pool_put(ioctx)
131
            raise RuntimeError("Hashmap '%s' doesn't exists" %
132
                               hexlify(maphash))
133
        return hashes
134

    
135
    def map_retr_archipelago(self, maphash, size):
136
        """Retrieve Archipelago mapfile"""
137
        hashes = []
138
        ioctx = self.ioctx_pool.pool_get()
139
        maphash = maphash.split("archip:")[1]
140
        req = Request.get_mapr_request(ioctx, self.mapperd_port, maphash,
141
                                       offset=0, size=size)
142
        req.submit()
143
        req.wait()
144
        ret = req.success()
145
        if ret:
146
            data = req.get_data(xseg_reply_map)
147
            Segsarray = xseg_reply_map_scatterlist * data.contents.cnt
148
            segs = Segsarray.from_address(ctypes.addressof(data.contents.segs))
149
            hashes = [string_at(segs[idx].target, segs[idx].targetlen)
150
                    for idx in xrange(len(segs))]
151
            req.put()
152
        else:
153
            req.put()
154
            self.ioctx_pool.pool_put(ioctx)
155
            raise Exception("Could not retrieve Archipelago mapfile.")
156
        self.ioctx_pool.pool_put(ioctx)
157
        return hashes
158

    
159
    def map_stor(self, maphash, hashes=(), blkoff=0, create=1):
160
        """Store hashes in the given hashes map."""
161
        namelen = self.namelen
162
        if self._check_rear_map(maphash):
163
            return
164
        with self._get_rear_map(maphash, 1) as rmap:
165
            rmap.sync_write_chunks(namelen, blkoff, hashes, None)