Revision d0b67cbc

b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf
60 60
#
61 61
#Archipelago Configuration File
62 62
#PITHOS_BACKEND_ARCHIPELAGO_CONF = '/etc/archipelago/archipelago.conf'
63
#
64
# Archipelagp xseg pool size
65
#PITHOS_BACKEND_XSEG_POOL_SIZE = 8
66
#
67
# The maximum interval (in seconds) for consequent backend object map checks
68
#PITHOS_BACKEND_MAP_CHECK_INTERVAL = 300
b/snf-pithos-app/pithos/api/functions.py
945 945
            raise faults.ItemNotFound('Object does not exist')
946 946
        except VersionNotExists:
947 947
            raise faults.ItemNotFound('Version does not exist')
948
        except IllegalOperationError, e:
949
            raise faults.Forbidden(str(e))
948 950
    else:
949 951
        try:
950 952
            s, h = request.backend.get_object_hashmap(
......
958 960
            raise faults.ItemNotFound('Object does not exist')
959 961
        except VersionNotExists:
960 962
            raise faults.ItemNotFound('Version does not exist')
963
        except IllegalOperationError, e:
964
            raise faults.Forbidden(str(e))
961 965

  
962 966
    # Reply with the hashmap.
963 967
    if hashmap_reply:
......
1371 1375
        raise faults.Forbidden('Not allowed')
1372 1376
    except ItemNotExists:
1373 1377
        raise faults.ItemNotFound('Object does not exist')
1378
    except IllegalOperationError, e:
1379
        raise faults.Forbidden(str(e))
1374 1380

  
1375 1381
    offset, length, total = ranges
1376 1382
    if offset is None:
......
1396 1402
            raise faults.Forbidden('Not allowed')
1397 1403
        except ItemNotExists:
1398 1404
            raise faults.ItemNotFound('Source object does not exist')
1405
        except IllegalOperationError, e:
1406
            raise faults.Forbidden(str(e))
1399 1407

  
1400 1408
        if length is None:
1401 1409
            length = src_size
b/snf-pithos-app/pithos/api/settings.py
138 138
    settings, 'PITHOS_BACKEND_BLOCK_PATH', '/tmp/pithos-data/')
139 139
BACKEND_BLOCK_UMASK = getattr(settings, 'PITHOS_BACKEND_BLOCK_UMASK', 0o022)
140 140

  
141
# Archipelago Configuration File
142
BACKEND_ARCHIPELAGO_CONF = getattr(
143
        settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
144
        '/etc/archipelago/archipelago.conf')
145

  
146 141
# Queue for billing.
147 142
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', None)
148 143
# Example: 'pithos.backends.lib.rabbitmq'
......
209 204
# Set domain to restrict requests of pithos object contents serve endpoint or
210 205
# None for no domain restriction
211 206
UNSAFE_DOMAIN = getattr(settings, 'PITHOS_UNSAFE_DOMAIN', None)
207

  
208
# Archipelago Configuration File
209
BACKEND_ARCHIPELAGO_CONF = getattr(settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
210
                                   '/etc/archipelago/archipelago.conf')
211

  
212
# Archipelagp xseg pool size
213
BACKEND_XSEG_POOL_SIZE = getattr(settings, 'PITHOS_BACKEND_XSEG_POOL_SIZE', 8)
214

  
215
# The maximum interval (in seconds) for consequent backend object map checks
216
BACKEND_MAP_CHECK_INTERVAL = getattr(settings,
217
                                     'PITHOS_BACKEND_MAP_CHECK_INTERVAL',
218
                                     300)
b/snf-pithos-app/pithos/api/util.py
63 63
                                 BACKEND_VERSIONING, BACKEND_FREE_VERSIONING,
64 64
                                 BACKEND_POOL_ENABLED, BACKEND_POOL_SIZE,
65 65
                                 BACKEND_BLOCK_SIZE, BACKEND_HASH_ALGORITHM,
66
                                 BACKEND_ARCHIPELAGO_CONF,
67
                                 BACKEND_XSEG_POOL_SIZE,
68
                                 BACKEND_MAP_CHECK_INTERVAL,
66 69
                                 RADOS_STORAGE, RADOS_POOL_BLOCKS,
67 70
                                 RADOS_POOL_MAPS, TRANSLATE_UUIDS,
68 71
                                 PUBLIC_URL_SECURITY, PUBLIC_URL_ALPHABET,
......
231 234
    response.override_serialization = True
232 235
    response['Content-Type'] = meta.get('type', 'application/octet-stream')
233 236
    response['Last-Modified'] = http_date(int(meta['modified']))
237
    response['Map-Exists'] = meta['available']
238
    response['Map-Checked-At'] = (
239
        http_date(int(meta['map_check_timestamp'])) if
240
        meta['map_check_timestamp'] is not None else '')
234 241
    if not restricted:
235 242
        response['X-Object-Hash'] = meta['hash']
236 243
        response['X-Object-UUID'] = meta['uuid']
......
1030 1037
    public_url_alphabet=PUBLIC_URL_ALPHABET,
1031 1038
    account_quota_policy=BACKEND_ACCOUNT_QUOTA,
1032 1039
    container_quota_policy=BACKEND_CONTAINER_QUOTA,
1033
    container_versioning_policy=BACKEND_VERSIONING)
1040
    container_versioning_policy=BACKEND_VERSIONING,
1041
    archipelago_conf_file=BACKEND_ARCHIPELAGO_CONF,
1042
    xseg_pool_size=BACKEND_XSEG_POOL_SIZE,
1043
    map_check_interval=BACKEND_MAP_CHECK_INTERVAL)
1034 1044

  
1035 1045
_pithos_backend_pool = PithosBackendPool(size=BACKEND_POOL_SIZE,
1036 1046
                                         **BACKEND_KWARGS)
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/__init__.py
33 33

  
34 34
from dbwrapper import DBWrapper
35 35
from node import (Node, ROOTNODE, SERIAL, NODE, HASH, SIZE, TYPE, MTIME, MUSER,
36
                  UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT)
36
                  UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT,
37
                  AVAILABLE, MAP_CHECK_TIMESTAMP)
37 38
from permissions import Permissions, READ, WRITE
38 39
from config import Config
39 40
from quotaholder_serials import QuotaholderSerial
......
41 42
__all__ = ["DBWrapper",
42 43
           "Node", "ROOTNODE", "NODE", "SERIAL", "HASH", "SIZE", "TYPE",
43 44
           "MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX",
44
           "MATCH_EXACT", "Permissions", "READ", "WRITE", "Config",
45
           "MATCH_EXACT", "AVAILABLE", "MAP_CHECK_TIMESTAMP",
46
           "Permissions", "READ", "WRITE", "Config",
45 47
           "QuotaholderSerial"]
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
49 49
ROOTNODE = 0
50 50

  
51 51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
52
 CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
53 53

  
54 54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55 55

  
......
103 103
    'uuid': 8,
104 104
    'checksum': 9,
105 105
    'cluster': 10,
106
    'available':11,
107
    'map_check_timestamp':12
106 108
}
107 109

  
108 110

  
......
164 166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165 167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166 168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    columns.append(Column('available', Boolean, nullable=False, default=True))
170
    columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
171
                                                         scale=6)))
167 172
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168 173
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169 174
    Index('idx_versions_node_uuid', versions.c.uuid)
......
282 287
        """Return the properties of all versions at node.
283 288
           If keys is empty, return all properties in the order
284 289
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
290
            checksum, cluster, available, map_check_timestamp).
286 291
        """
287 292

  
288 293
        s = select([self.versions.c.serial,
......
295 300
                    self.versions.c.muser,
296 301
                    self.versions.c.uuid,
297 302
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
303
                    self.versions.c.cluster,
304
                    self.versions.c.available,
305
                    self.versions.c.map_check_timestamp],
306
                   self.versions.c.node == node)
299 307
        s = s.order_by(self.versions.c.serial)
300 308
        r = self.conn.execute(s)
301 309
        rows = r.fetchall()
......
716 724

  
717 725
    def version_create(self, node, hash, size, type, source, muser, uuid,
718 726
                       checksum, cluster=0,
719
                       update_statistics_ancestors_depth=None):
727
                       update_statistics_ancestors_depth=None,
728
                       available=True):
720 729
        """Create a new version from the given properties.
721 730
           Return the (serial, mtime) of the new version.
722 731
        """
......
725 734
        s = self.versions.insert().values(
726 735
            node=node, hash=hash, size=size, type=type, source=source,
727 736
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
728
            cluster=cluster)
737
            cluster=cluster, available=available)
729 738
        serial = self.conn.execute(s).inserted_primary_key[0]
730 739
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
731 740
                                         update_statistics_ancestors_depth)
......
738 747
        """Lookup the current version of the given node.
739 748
           Return a list with its properties:
740 749
           (serial, node, hash, size, type, source, mtime,
741
            muser, uuid, checksum, cluster)
750
            muser, uuid, checksum, cluster, available, map_check_timestamp)
742 751
           or None if the current version is not found in the given cluster.
743 752
        """
744 753

  
......
749 758
            s = select([v.c.serial, v.c.node, v.c.hash,
750 759
                        v.c.size, v.c.type, v.c.source,
751 760
                        v.c.mtime, v.c.muser, v.c.uuid,
752
                        v.c.checksum, v.c.cluster])
761
                        v.c.checksum, v.c.cluster,
762
                        v.c.available, v.c.map_check_timestamp])
753 763
        if before != inf:
754 764
            c = select([func.max(self.versions.c.serial)],
755 765
                       self.versions.c.node == node)
......
771 781
        """Lookup the current versions of the given nodes.
772 782
           Return a list with their properties:
773 783
           (serial, node, hash, size, type, source, mtime, muser, uuid,
774
            checksum, cluster).
784
            checksum, cluster, available, map_check_timestamp).
775 785
        """
776 786
        if not nodes:
777 787
            return ()
......
783 793
            s = select([v.c.serial, v.c.node, v.c.hash,
784 794
                        v.c.size, v.c.type, v.c.source,
785 795
                        v.c.mtime, v.c.muser, v.c.uuid,
786
                        v.c.checksum, v.c.cluster])
796
                        v.c.checksum, v.c.cluster,
797
                        v.c.available, v.c.map_check_timestamp])
787 798
        if before != inf:
788 799
            c = select([func.max(self.versions.c.serial)],
789 800
                       self.versions.c.node.in_(nodes))
......
810 821
           the version specified by serial and the keys, in the order given.
811 822
           If keys is empty, return all properties in the order
812 823
           (serial, node, hash, size, type, source, mtime, muser, uuid,
813
            checksum, cluster).
824
            checksum, cluster, available, map_check_timestamp).
814 825
        """
815 826

  
816 827
        v = self.versions.alias()
817 828
        s = select([v.c.serial, v.c.node, v.c.hash,
818 829
                    v.c.size, v.c.type, v.c.source,
819 830
                    v.c.mtime, v.c.muser, v.c.uuid,
820
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
831
                    v.c.checksum, v.c.cluster,
832
                    v.c.available, v.c.map_check_timestamp],
833
                   v.c.serial == serial)
821 834
        if node is not None:
822 835
            s = s.where(v.c.node == node)
823 836
        rp = self.conn.execute(s)
b/snf-pithos-backend/pithos/backends/modular.py
39 39

  
40 40
from functools import wraps, partial
41 41
from traceback import format_exc
42
from time import time
42 43

  
43 44
from pithos.workers import glue
44 45
from archipelago.common import Segment, Xseg_ctx
......
129 130
DEFAULT_SOURCE = 'system'
130 131
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
131 132

  
133
DEFAULT_MAP_CHECK_INTERVAL = 300  # set to 300 secs
134

  
132 135
logger = logging.getLogger(__name__)
133 136

  
134 137

  
......
201 204
                 container_quota_policy=None,
202 205
                 container_versioning_policy=None,
203 206
                 archipelago_conf_file=None,
204
                 xseg_pool_size=8):
207
                 xseg_pool_size=8,
208
                 map_check_interval=None):
205 209
        db_module = db_module or DEFAULT_DB_MODULE
206 210
        db_connection = db_connection or DEFAULT_DB_CONNECTION
207 211
        block_module = block_module or DEFAULT_BLOCK_MODULE
......
218 222
            or DEFAULT_CONTAINER_VERSIONING
219 223
        archipelago_conf_file = archipelago_conf_file \
220 224
            or DEFAULT_ARCHIPELAGO_CONF_FILE
225
        map_check_interval = map_check_interval \
226
            or DEFAULT_MAP_CHECK_INTERVAL
221 227

  
222 228
        self.default_account_policy = {'quota': account_quota_policy}
223 229
        self.default_container_policy = {
......
235 241
        self.hash_algorithm = hash_algorithm
236 242
        self.block_size = block_size
237 243
        self.free_versioning = free_versioning
244
        self.map_check_interval = map_check_interval
238 245

  
239 246
        def load_module(m):
240 247
            __import__(m)
......
251 258
        self.node = self.db_module.Node(**params)
252 259
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
253 260
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
254
                  'MATCH_PREFIX', 'MATCH_EXACT']:
261
                  'MATCH_PREFIX', 'MATCH_EXACT',
262
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
255 263
            setattr(self, x, getattr(self.db_module, x))
256 264

  
257 265
        self.ALLOWED = ['read', 'write']
......
928 936
                     'modified': modified,
929 937
                     'modified_by': props[self.MUSER],
930 938
                     'uuid': props[self.UUID],
931
                     'checksum': props[self.CHECKSUM]})
939
                     'checksum': props[self.CHECKSUM],
940
                     'available': props[self.AVAILABLE],
941
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
932 942
        return meta
933 943

  
934 944
    @debug_method
......
1044 1054
            self.permissions.public_set(
1045 1055
                path, self.public_url_security, self.public_url_alphabet)
1046 1056

  
1057
    def _update_available(self, props):
1058
        """Checks if the object map exists and updates the database"""
1059

  
1060
        if not props[self.AVAILABLE]:
1061
            if props[self.MAP_CHECK_TIMESTAMP]:
1062
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1063
                if elapsed_time < self.map_check_interval:
1064
                    raise NotAllowedError(
1065
                        'Consequent map checks are limited: retry later.')
1066
        try:
1067
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1068
                                                     props[self.SIZE])
1069
        except:  # map does not exist
1070
            # Raising an exception results in db transaction rollback
1071
            # However we have to force the update of the database
1072
            self.wrapper.rollback()  # rollback existing transaction
1073
            self.wrapper.execute()  # start new transaction
1074
            self.node.version_put_property(props[self.SERIAL],
1075
                                           'map_check_timestamp', time())
1076
            self.wrapper.commit()  # commit transaction
1077
            self.wrapper.execute()  # start new transaction
1078
            raise IllegalOperationError(
1079
                'Unable to retrieve Archipelago Volume hashmap.')
1080
        else:  # map exists
1081
            self.node.version_put_property(props[self.SERIAL],
1082
                                           'available', True)
1083
            self.node.version_put_property(props[self.SERIAL],
1084
                                           'map_check_timestamp', time())
1085
            return hashmap
1086

  
1047 1087
    @debug_method
1048 1088
    @backend_method
1049 1089
    def get_object_hashmap(self, user, account, container, name, version=None):
......
1055 1095
        if props[self.HASH] is None:
1056 1096
            return 0, ()
1057 1097
        if props[self.HASH].startswith('archip:'):
1058
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1059
                                                     props[self.SIZE])
1098
            hashmap = self._update_available(props)
1060 1099
            return props[self.SIZE], [x for x in hashmap]
1061 1100
        else:
1062 1101
            hashmap = self.store.map_get(self._unhexlify_hash(
......
1066 1105
    def _update_object_hash(self, user, account, container, name, size, type,
1067 1106
                            hash, checksum, domain, meta, replace_meta,
1068 1107
                            permissions, src_node=None, src_version_id=None,
1069
                            is_copy=False, report_size_change=True):
1108
                            is_copy=False, report_size_change=True,
1109
                            available=True):
1070 1110
        if permissions is not None and user != account:
1071 1111
            raise NotAllowedError
1072 1112
        self._can_write(user, account, container, name)
......
1083 1123
        pre_version_id, dest_version_id = self._put_version_duplicate(
1084 1124
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1085 1125
            checksum=checksum, is_copy=is_copy,
1086
            update_statistics_ancestors_depth=1)
1126
            update_statistics_ancestors_depth=1, available=available)
1087 1127

  
1088 1128
        # Handle meta.
1089 1129
        if src_version_id is None:
......
1185 1225
            self.lock_container_path = False
1186 1226
        dest_version_id = self._update_object_hash(
1187 1227
            user, account, container, name, size, type, mapfile, checksum,
1188
            domain, meta, replace_meta, permissions)
1228
            domain, meta, replace_meta, permissions, available=False)
1189 1229
        return self.node.version_get_properties(dest_version_id,
1190 1230
                                                keys=('uuid',))[0]
1191 1231

  
......
1609 1649
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1610 1650
                               type=None, hash=None, checksum=None,
1611 1651
                               cluster=CLUSTER_NORMAL, is_copy=False,
1612
                               update_statistics_ancestors_depth=None):
1652
                               update_statistics_ancestors_depth=None,
1653
                               available=True):
1613 1654
        """Create a new version of the node."""
1614 1655

  
1615 1656
        props = self.node.version_lookup(
......
1650 1691

  
1651 1692
        dest_version_id, mtime = self.node.version_create(
1652 1693
            node, hash, size, type, src_version_id, user, uuid, checksum,
1653
            cluster, update_statistics_ancestors_depth)
1694
            cluster, update_statistics_ancestors_depth,
1695
            available=available)
1654 1696

  
1655 1697
        self.node.attribute_unset_is_latest(node, dest_version_id)
1656 1698

  
b/snf-pithos-backend/pithos/backends/util.py
53 53
                 public_url_alphabet=None,
54 54
                 account_quota_policy=None,
55 55
                 container_quota_policy=None,
56
                 container_versioning_policy=None):
56
                 container_versioning_policy=None,
57
                 archipelago_conf_file=None,
58
                 xseg_pool_size=8,
59
                 map_check_interval=None):
57 60
        super(PithosBackendPool, self).__init__(size=size)
58 61
        self.db_module = db_module
59 62
        self.db_connection = db_connection
......
75 78
        self.account_quota_policy = account_quota_policy
76 79
        self.container_quota_policy = container_quota_policy
77 80
        self.container_versioning_policy = container_versioning_policy
81
        self.archipelago_conf_file = archipelago_conf_file
82
        self.xseg_pool_size = xseg_pool_size
83
        self.map_check_interval = map_check_interval
78 84

  
79 85
    def _pool_create(self):
80 86
        backend = connect_backend(
......
97 103
            public_url_alphabet=self.public_url_alphabet,
98 104
            account_quota_policy=self.account_quota_policy,
99 105
            container_quota_policy=self.container_quota_policy,
100
            container_versioning_policy=self.container_versioning_policy)
106
            container_versioning_policy=self.container_versioning_policy,
107
            archipelago_conf_file=self.archipelago_conf_file,
108
            xseg_pool_size=self.xseg_pool_size,
109
            map_check_interval=self.map_check_interval)
101 110

  
102 111
        backend._real_close = backend.close
103 112
        backend.close = instancemethod(_pooled_backend_close, backend,

Also available in: Unified diff