Revision 876d7486

b/snf-pithos-app/conf/20-snf-pithos-app-settings.conf
60 60
#
61 61
#Archipelago Configuration File
62 62
#PITHOS_BACKEND_ARCHIPELAGO_CONF = '/etc/archipelago/archipelago.conf'
63
#
64
# Archipelagp xseg pool size
65
#PITHOS_BACKEND_XSEG_POOL_SIZE = 8
66
#
67
# The maximum interval (in seconds) for consequent backend object map checks
68
#PITHOS_BACKEND_MAP_CHECK_INTERVAL = 300
b/snf-pithos-app/pithos/api/functions.py
945 945
            raise faults.ItemNotFound('Object does not exist')
946 946
        except VersionNotExists:
947 947
            raise faults.ItemNotFound('Version does not exist')
948
        except IllegalOperationError, e:
949
            raise faults.Forbidden(str(e))
948 950
    else:
949 951
        try:
950 952
            s, h = request.backend.get_object_hashmap(
......
958 960
            raise faults.ItemNotFound('Object does not exist')
959 961
        except VersionNotExists:
960 962
            raise faults.ItemNotFound('Version does not exist')
963
        except IllegalOperationError, e:
964
            raise faults.Forbidden(str(e))
961 965

  
962 966
    # Reply with the hashmap.
963 967
    if hashmap_reply:
......
1371 1375
        raise faults.Forbidden('Not allowed')
1372 1376
    except ItemNotExists:
1373 1377
        raise faults.ItemNotFound('Object does not exist')
1378
    except IllegalOperationError, e:
1379
        raise faults.Forbidden(str(e))
1374 1380

  
1375 1381
    offset, length, total = ranges
1376 1382
    if offset is None:
......
1396 1402
            raise faults.Forbidden('Not allowed')
1397 1403
        except ItemNotExists:
1398 1404
            raise faults.ItemNotFound('Source object does not exist')
1405
        except IllegalOperationError, e:
1406
            raise faults.Forbidden(str(e))
1399 1407

  
1400 1408
        if length is None:
1401 1409
            length = src_size
b/snf-pithos-app/pithos/api/settings.py
138 138
    settings, 'PITHOS_BACKEND_BLOCK_PATH', '/tmp/pithos-data/')
139 139
BACKEND_BLOCK_UMASK = getattr(settings, 'PITHOS_BACKEND_BLOCK_UMASK', 0o022)
140 140

  
141
# Archipelago Configuration File
142
BACKEND_ARCHIPELAGO_CONF = getattr(
143
        settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
144
        '/etc/archipelago/archipelago.conf')
145

  
146 141
# Queue for billing.
147 142
BACKEND_QUEUE_MODULE = getattr(settings, 'PITHOS_BACKEND_QUEUE_MODULE', None)
148 143
# Example: 'pithos.backends.lib.rabbitmq'
......
209 204
# Set domain to restrict requests of pithos object contents serve endpoint or
210 205
# None for no domain restriction
211 206
UNSAFE_DOMAIN = getattr(settings, 'PITHOS_UNSAFE_DOMAIN', None)
207

  
208
# Archipelago Configuration File
209
BACKEND_ARCHIPELAGO_CONF = getattr(settings, 'PITHOS_BACKEND_ARCHIPELAGO_CONF',
210
                                   '/etc/archipelago/archipelago.conf')
211

  
212
# Archipelagp xseg pool size
213
BACKEND_XSEG_POOL_SIZE = getattr(settings, 'PITHOS_BACKEND_XSEG_POOL_SIZE', 8)
214

  
215
# The maximum interval (in seconds) for consequent backend object map checks
216
BACKEND_MAP_CHECK_INTERVAL = getattr(settings,
217
                                     'PITHOS_BACKEND_MAP_CHECK_INTERVAL',
218
                                     300)
b/snf-pithos-app/pithos/api/util.py
63 63
                                 BACKEND_VERSIONING, BACKEND_FREE_VERSIONING,
64 64
                                 BACKEND_POOL_ENABLED, BACKEND_POOL_SIZE,
65 65
                                 BACKEND_BLOCK_SIZE, BACKEND_HASH_ALGORITHM,
66
                                 BACKEND_ARCHIPELAGO_CONF,
67
                                 BACKEND_XSEG_POOL_SIZE,
68
                                 BACKEND_MAP_CHECK_INTERVAL,
66 69
                                 RADOS_STORAGE, RADOS_POOL_BLOCKS,
67 70
                                 RADOS_POOL_MAPS, TRANSLATE_UUIDS,
68 71
                                 PUBLIC_URL_SECURITY, PUBLIC_URL_ALPHABET,
......
231 234
    response.override_serialization = True
232 235
    response['Content-Type'] = meta.get('type', 'application/octet-stream')
233 236
    response['Last-Modified'] = http_date(int(meta['modified']))
237
    response['Map-Exists'] = meta['available']
238
    response['Map-Checked-At'] = (
239
        http_date(int(meta['map_check_timestamp'])) if
240
        meta['map_check_timestamp'] is not None else '')
234 241
    if not restricted:
235 242
        response['X-Object-Hash'] = meta['hash']
236 243
        response['X-Object-UUID'] = meta['uuid']
......
1030 1037
    public_url_alphabet=PUBLIC_URL_ALPHABET,
1031 1038
    account_quota_policy=BACKEND_ACCOUNT_QUOTA,
1032 1039
    container_quota_policy=BACKEND_CONTAINER_QUOTA,
1033
    container_versioning_policy=BACKEND_VERSIONING)
1040
    container_versioning_policy=BACKEND_VERSIONING,
1041
    archipelago_conf_file=BACKEND_ARCHIPELAGO_CONF,
1042
    xseg_pool_size=BACKEND_XSEG_POOL_SIZE,
1043
    map_check_interval=BACKEND_MAP_CHECK_INTERVAL)
1034 1044

  
1035 1045
_pithos_backend_pool = PithosBackendPool(size=BACKEND_POOL_SIZE,
1036 1046
                                         **BACKEND_KWARGS)
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/__init__.py
33 33

  
34 34
from dbwrapper import DBWrapper
35 35
from node import (Node, ROOTNODE, SERIAL, NODE, HASH, SIZE, TYPE, MTIME, MUSER,
36
                  UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT)
36
                  UUID, CHECKSUM, CLUSTER, MATCH_PREFIX, MATCH_EXACT,
37
                  AVAILABLE, MAP_CHECK_TIMESTAMP)
37 38
from permissions import Permissions, READ, WRITE
38 39
from config import Config
39 40
from quotaholder_serials import QuotaholderSerial
......
41 42
__all__ = ["DBWrapper",
42 43
           "Node", "ROOTNODE", "NODE", "SERIAL", "HASH", "SIZE", "TYPE",
43 44
           "MTIME", "MUSER", "UUID", "CHECKSUM", "CLUSTER", "MATCH_PREFIX",
44
           "MATCH_EXACT", "Permissions", "READ", "WRITE", "Config",
45
           "MATCH_EXACT", "AVAILABLE", "MAP_CHECK_TIMESTAMP",
46
           "Permissions", "READ", "WRITE", "Config",
45 47
           "QuotaholderSerial"]
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
49 49
ROOTNODE = 0
50 50

  
51 51
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
52
 CLUSTER) = range(11)
52
 CLUSTER, AVAILABLE, MAP_CHECK_TIMESTAMP) = range(13)
53 53

  
54 54
(MATCH_PREFIX, MATCH_EXACT) = range(2)
55 55

  
......
103 103
    'uuid': 8,
104 104
    'checksum': 9,
105 105
    'cluster': 10,
106
    'available':11,
107
    'map_check_timestamp':12
106 108
}
107 109

  
108 110

  
......
164 166
    columns.append(Column('uuid', String(64), nullable=False, default=''))
165 167
    columns.append(Column('checksum', String(256), nullable=False, default=''))
166 168
    columns.append(Column('cluster', Integer, nullable=False, default=0))
169
    columns.append(Column('available', Boolean, nullable=False, default=True))
170
    columns.append(Column('map_check_timestamp', DECIMAL(precision=16,
171
                                                         scale=6)))
167 172
    versions = Table('versions', metadata, *columns, mysql_engine='InnoDB')
168 173
    Index('idx_versions_node_mtime', versions.c.node, versions.c.mtime)
169 174
    Index('idx_versions_node_uuid', versions.c.uuid)
......
282 287
        """Return the properties of all versions at node.
283 288
           If keys is empty, return all properties in the order
284 289
           (serial, node, hash, size, type, source, mtime, muser, uuid,
285
            checksum, cluster).
290
            checksum, cluster, available, map_check_timestamp).
286 291
        """
287 292

  
288 293
        s = select([self.versions.c.serial,
......
295 300
                    self.versions.c.muser,
296 301
                    self.versions.c.uuid,
297 302
                    self.versions.c.checksum,
298
                    self.versions.c.cluster], self.versions.c.node == node)
303
                    self.versions.c.cluster,
304
                    self.versions.c.available,
305
                    self.versions.c.map_check_timestamp],
306
                   self.versions.c.node == node)
299 307
        s = s.order_by(self.versions.c.serial)
300 308
        r = self.conn.execute(s)
301 309
        rows = r.fetchall()
......
716 724

  
717 725
    def version_create(self, node, hash, size, type, source, muser, uuid,
718 726
                       checksum, cluster=0,
719
                       update_statistics_ancestors_depth=None):
727
                       update_statistics_ancestors_depth=None,
728
                       available=True):
720 729
        """Create a new version from the given properties.
721 730
           Return the (serial, mtime) of the new version.
722 731
        """
......
725 734
        s = self.versions.insert().values(
726 735
            node=node, hash=hash, size=size, type=type, source=source,
727 736
            mtime=mtime, muser=muser, uuid=uuid, checksum=checksum,
728
            cluster=cluster)
737
            cluster=cluster, available=available)
729 738
        serial = self.conn.execute(s).inserted_primary_key[0]
730 739
        self.statistics_update_ancestors(node, 1, size, mtime, cluster,
731 740
                                         update_statistics_ancestors_depth)
......
738 747
        """Lookup the current version of the given node.
739 748
           Return a list with its properties:
740 749
           (serial, node, hash, size, type, source, mtime,
741
            muser, uuid, checksum, cluster)
750
            muser, uuid, checksum, cluster, available, map_check_timestamp)
742 751
           or None if the current version is not found in the given cluster.
743 752
        """
744 753

  
......
749 758
            s = select([v.c.serial, v.c.node, v.c.hash,
750 759
                        v.c.size, v.c.type, v.c.source,
751 760
                        v.c.mtime, v.c.muser, v.c.uuid,
752
                        v.c.checksum, v.c.cluster])
761
                        v.c.checksum, v.c.cluster,
762
                        v.c.available, v.c.map_check_timestamp])
753 763
        if before != inf:
754 764
            c = select([func.max(self.versions.c.serial)],
755 765
                       self.versions.c.node == node)
......
771 781
        """Lookup the current versions of the given nodes.
772 782
           Return a list with their properties:
773 783
           (serial, node, hash, size, type, source, mtime, muser, uuid,
774
            checksum, cluster).
784
            checksum, cluster, available, map_check_timestamp).
775 785
        """
776 786
        if not nodes:
777 787
            return ()
......
783 793
            s = select([v.c.serial, v.c.node, v.c.hash,
784 794
                        v.c.size, v.c.type, v.c.source,
785 795
                        v.c.mtime, v.c.muser, v.c.uuid,
786
                        v.c.checksum, v.c.cluster])
796
                        v.c.checksum, v.c.cluster,
797
                        v.c.available, v.c.map_check_timestamp])
787 798
        if before != inf:
788 799
            c = select([func.max(self.versions.c.serial)],
789 800
                       self.versions.c.node.in_(nodes))
......
810 821
           the version specified by serial and the keys, in the order given.
811 822
           If keys is empty, return all properties in the order
812 823
           (serial, node, hash, size, type, source, mtime, muser, uuid,
813
            checksum, cluster).
824
            checksum, cluster, available, map_check_timestamp).
814 825
        """
815 826

  
816 827
        v = self.versions.alias()
817 828
        s = select([v.c.serial, v.c.node, v.c.hash,
818 829
                    v.c.size, v.c.type, v.c.source,
819 830
                    v.c.mtime, v.c.muser, v.c.uuid,
820
                    v.c.checksum, v.c.cluster], v.c.serial == serial)
831
                    v.c.checksum, v.c.cluster,
832
                    v.c.available, v.c.map_check_timestamp],
833
                   v.c.serial == serial)
821 834
        if node is not None:
822 835
            s = s.where(v.c.node == node)
823 836
        rp = self.conn.execute(s)
b/snf-pithos-backend/pithos/backends/modular.py
40 40
from collections import defaultdict
41 41
from functools import wraps, partial
42 42
from traceback import format_exc
43
from time import time
43 44

  
44 45
from pithos.workers import glue
45 46
from archipelago.common import Segment, Xseg_ctx
......
130 131
DEFAULT_SOURCE = 'system'
131 132
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
132 133

  
134
DEFAULT_MAP_CHECK_INTERVAL = 300  # set to 300 secs
135

  
133 136
logger = logging.getLogger(__name__)
134 137

  
135 138

  
......
238 241
                 container_quota_policy=None,
239 242
                 container_versioning_policy=None,
240 243
                 archipelago_conf_file=None,
241
                 xseg_pool_size=8):
244
                 xseg_pool_size=8,
245
                 map_check_interval=None):
242 246
        db_module = db_module or DEFAULT_DB_MODULE
243 247
        db_connection = db_connection or DEFAULT_DB_CONNECTION
244 248
        block_module = block_module or DEFAULT_BLOCK_MODULE
......
255 259
            or DEFAULT_CONTAINER_VERSIONING
256 260
        archipelago_conf_file = archipelago_conf_file \
257 261
            or DEFAULT_ARCHIPELAGO_CONF_FILE
262
        map_check_interval = map_check_interval \
263
            or DEFAULT_MAP_CHECK_INTERVAL
258 264

  
259 265
        self.default_account_policy = {'quota': account_quota_policy}
260 266
        self.default_container_policy = {
......
272 278
        self.hash_algorithm = hash_algorithm
273 279
        self.block_size = block_size
274 280
        self.free_versioning = free_versioning
281
        self.map_check_interval = map_check_interval
275 282

  
276 283
        def load_module(m):
277 284
            __import__(m)
......
288 295
        self.node = self.db_module.Node(**params)
289 296
        for x in ['ROOTNODE', 'SERIAL', 'NODE', 'HASH', 'SIZE', 'TYPE',
290 297
                  'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER',
291
                  'MATCH_PREFIX', 'MATCH_EXACT']:
298
                  'MATCH_PREFIX', 'MATCH_EXACT',
299
                  'AVAILABLE', 'MAP_CHECK_TIMESTAMP']:
292 300
            setattr(self, x, getattr(self.db_module, x))
293 301

  
294 302
        self.ALLOWED = ['read', 'write']
......
962 970
                     'modified': modified,
963 971
                     'modified_by': props[self.MUSER],
964 972
                     'uuid': props[self.UUID],
965
                     'checksum': props[self.CHECKSUM]})
973
                     'checksum': props[self.CHECKSUM],
974
                     'available': props[self.AVAILABLE],
975
                     'map_check_timestamp': props[self.MAP_CHECK_TIMESTAMP]})
966 976
        return meta
967 977

  
968 978
    @debug_method
......
1082 1092
            self.permissions.public_set(
1083 1093
                path, self.public_url_security, self.public_url_alphabet)
1084 1094

  
1095
    def _update_available(self, props):
1096
        """Checks if the object map exists and updates the database"""
1097

  
1098
        if not props[self.AVAILABLE]:
1099
            if props[self.MAP_CHECK_TIMESTAMP]:
1100
                elapsed_time = time() - float(props[self.MAP_CHECK_TIMESTAMP])
1101
                if elapsed_time < self.map_check_interval:
1102
                    raise NotAllowedError(
1103
                        'Consequent map checks are limited: retry later.')
1104
        try:
1105
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1106
                                                     props[self.SIZE])
1107
        except:  # map does not exist
1108
            # Raising an exception results in db transaction rollback
1109
            # However we have to force the update of the database
1110
            self.wrapper.rollback()  # rollback existing transaction
1111
            self.wrapper.execute()  # start new transaction
1112
            self.node.version_put_property(props[self.SERIAL],
1113
                                           'map_check_timestamp', time())
1114
            self.wrapper.commit()  # commit transaction
1115
            self.wrapper.execute()  # start new transaction
1116
            raise IllegalOperationError(
1117
                'Unable to retrieve Archipelago Volume hashmap.')
1118
        else:  # map exists
1119
            self.node.version_put_property(props[self.SERIAL],
1120
                                           'available', True)
1121
            self.node.version_put_property(props[self.SERIAL],
1122
                                           'map_check_timestamp', time())
1123
            return hashmap
1124

  
1085 1125
    @debug_method
1086 1126
    @backend_method
1087 1127
    def get_object_hashmap(self, user, account, container, name, version=None):
......
1093 1133
        if props[self.HASH] is None:
1094 1134
            return 0, ()
1095 1135
        if props[self.HASH].startswith('archip:'):
1096
            hashmap = self.store.map_get_archipelago(props[self.HASH],
1097
                                                     props[self.SIZE])
1136
            hashmap = self._update_available(props)
1098 1137
            return props[self.SIZE], [x for x in hashmap]
1099 1138
        else:
1100 1139
            hashmap = self.store.map_get(self._unhexlify_hash(
......
1104 1143
    def _update_object_hash(self, user, account, container, name, size, type,
1105 1144
                            hash, checksum, domain, meta, replace_meta,
1106 1145
                            permissions, src_node=None, src_version_id=None,
1107
                            is_copy=False, report_size_change=True):
1146
                            is_copy=False, report_size_change=True,
1147
                            available=True):
1108 1148
        if permissions is not None and user != account:
1109 1149
            raise NotAllowedError
1110 1150
        self._can_write_object(user, account, container, name)
......
1121 1161
        pre_version_id, dest_version_id = self._put_version_duplicate(
1122 1162
            user, node, src_node=src_node, size=size, type=type, hash=hash,
1123 1163
            checksum=checksum, is_copy=is_copy,
1124
            update_statistics_ancestors_depth=1)
1164
            update_statistics_ancestors_depth=1, available=available)
1125 1165

  
1126 1166
        # Handle meta.
1127 1167
        if src_version_id is None:
......
1223 1263
            self.lock_container_path = False
1224 1264
        dest_version_id = self._update_object_hash(
1225 1265
            user, account, container, name, size, type, mapfile, checksum,
1226
            domain, meta, replace_meta, permissions)
1266
            domain, meta, replace_meta, permissions, available=False)
1227 1267
        return self.node.version_get_properties(dest_version_id,
1228 1268
                                                keys=('uuid',))[0]
1229 1269

  
......
1651 1691
    def _put_version_duplicate(self, user, node, src_node=None, size=None,
1652 1692
                               type=None, hash=None, checksum=None,
1653 1693
                               cluster=CLUSTER_NORMAL, is_copy=False,
1654
                               update_statistics_ancestors_depth=None):
1694
                               update_statistics_ancestors_depth=None,
1695
                               available=True):
1655 1696
        """Create a new version of the node."""
1656 1697

  
1657 1698
        props = self.node.version_lookup(
......
1692 1733

  
1693 1734
        dest_version_id, mtime = self.node.version_create(
1694 1735
            node, hash, size, type, src_version_id, user, uuid, checksum,
1695
            cluster, update_statistics_ancestors_depth)
1736
            cluster, update_statistics_ancestors_depth,
1737
            available=available)
1696 1738

  
1697 1739
        self.node.attribute_unset_is_latest(node, dest_version_id)
1698 1740

  
b/snf-pithos-backend/pithos/backends/util.py
53 53
                 public_url_alphabet=None,
54 54
                 account_quota_policy=None,
55 55
                 container_quota_policy=None,
56
                 container_versioning_policy=None):
56
                 container_versioning_policy=None,
57
                 archipelago_conf_file=None,
58
                 xseg_pool_size=8,
59
                 map_check_interval=None):
57 60
        super(PithosBackendPool, self).__init__(size=size)
58 61
        self.db_module = db_module
59 62
        self.db_connection = db_connection
......
75 78
        self.account_quota_policy = account_quota_policy
76 79
        self.container_quota_policy = container_quota_policy
77 80
        self.container_versioning_policy = container_versioning_policy
81
        self.archipelago_conf_file = archipelago_conf_file
82
        self.xseg_pool_size = xseg_pool_size
83
        self.map_check_interval = map_check_interval
78 84

  
79 85
    def _pool_create(self):
80 86
        backend = connect_backend(
......
97 103
            public_url_alphabet=self.public_url_alphabet,
98 104
            account_quota_policy=self.account_quota_policy,
99 105
            container_quota_policy=self.container_quota_policy,
100
            container_versioning_policy=self.container_versioning_policy)
106
            container_versioning_policy=self.container_versioning_policy,
107
            archipelago_conf_file=self.archipelago_conf_file,
108
            xseg_pool_size=self.xseg_pool_size,
109
            map_check_interval=self.map_check_interval)
101 110

  
102 111
        backend._real_close = backend.close
103 112
        backend.close = instancemethod(_pooled_backend_close, backend,

Also available in: Unified diff