Revision 47462eda snf-pithos-backend/pithos/backends/modular.py

b/snf-pithos-backend/pithos/backends/modular.py
1 1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2
# 
2
#
3 3
# Redistribution and use in source and binary forms, with or
4 4
# without modification, are permitted provided that the following
5 5
# conditions are met:
6
# 
6
#
7 7
#   1. Redistributions of source code must retain the above
8 8
#      copyright notice, this list of conditions and the following
9 9
#      disclaimer.
10
# 
10
#
11 11
#   2. Redistributions in binary form must reproduce the above
12 12
#      copyright notice, this list of conditions and the following
13 13
#      disclaimer in the documentation and/or other materials
14 14
#      provided with the distribution.
15
# 
15
#
16 16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
......
25 25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 27
# POSSIBILITY OF SUCH DAMAGE.
28
# 
28
#
29 29
# The views and conclusions contained in the software and
30 30
# documentation are those of the authors and should not be
31 31
# interpreted as representing official policies, either expressed
......
78 78
DEFAULT_BLOCK_UMASK = 0o022
79 79
#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq'
80 80
#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos'
81
DEFAULT_BLOCK_PARAMS = { 'mappool': None, 'blockpool': None }
81 82

  
82 83
QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s'
83 84
QUEUE_CLIENT_ID = 'pithos'
......
118 119

  
119 120
class ModularBackend(BaseBackend):
120 121
    """A modular backend.
121
    
122

  
122 123
    Uses modules for SQL functions and storage.
123 124
    """
124
    
125

  
125 126
    def __init__(self, db_module=None, db_connection=None,
126 127
                 block_module=None, block_path=None, block_umask=None,
127
                 queue_module=None, queue_connection=None):
128
                 queue_module=None, queue_connection=None,
129
                 block_params=None):
128 130
        db_module = db_module or DEFAULT_DB_MODULE
129 131
        db_connection = db_connection or DEFAULT_DB_CONNECTION
130 132
        block_module = block_module or DEFAULT_BLOCK_MODULE
131 133
        block_path = block_path or DEFAULT_BLOCK_PATH
132 134
        block_umask = block_umask or DEFAULT_BLOCK_UMASK
135
        block_params = block_params or DEFAULT_BLOCK_PARAMS
133 136
        #queue_module = queue_module or DEFAULT_QUEUE_MODULE
134 137
        #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION
135
        
138

  
136 139
        self.hash_algorithm = 'sha256'
137 140
        self.block_size = 4 * 1024 * 1024 # 4MB
138
        
141

  
139 142
        self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING}
140
        
143

  
141 144
        def load_module(m):
142 145
            __import__(m)
143 146
            return sys.modules[m]
144
        
147

  
145 148
        self.db_module = load_module(db_module)
146 149
        self.wrapper = self.db_module.DBWrapper(db_connection)
147 150
        params = {'wrapper': self.wrapper}
......
151 154
        self.node = self.db_module.Node(**params)
152 155
        for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']:
153 156
            setattr(self, x, getattr(self.db_module, x))
154
        
157

  
155 158
        self.block_module = load_module(block_module)
159
        self.block_params = block_params
156 160
        params = {'path': block_path,
157 161
                  'block_size': self.block_size,
158 162
                  'hash_algorithm': self.hash_algorithm,
159 163
                  'umask': block_umask}
164
        params.update(self.block_params)
160 165
        self.store = self.block_module.Store(**params)
161 166

  
162 167
        if queue_module and queue_connection:
......
168 173
            class NoQueue:
169 174
                def send(self, *args):
170 175
                    pass
171
                
176

  
172 177
                def close(self):
173 178
                    pass
174
            
179

  
175 180
            self.queue = NoQueue()
176
    
181

  
177 182
    def close(self):
178 183
        self.wrapper.close()
179 184
        self.queue.close()
180
    
185

  
181 186
    @backend_method
182 187
    def list_accounts(self, user, marker=None, limit=10000):
183 188
        """Return a list of accounts the user can access."""
184
        
189

  
185 190
        logger.debug("list_accounts: %s %s %s", user, marker, limit)
186 191
        allowed = self._allowed_accounts(user)
187 192
        start, limit = self._list_limits(allowed, marker, limit)
188 193
        return allowed[start:start + limit]
189
    
194

  
190 195
    @backend_method
191 196
    def get_account_meta(self, user, account, domain, until=None, include_user_defined=True):
192 197
        """Return a dictionary with the account metadata for the domain."""
193
        
198

  
194 199
        logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until)
195 200
        path, node = self._lookup_account(account, user == account)
196 201
        if user != account:
......
209 214
        else:
210 215
            modified = self._get_statistics(node)[2] # Overall last modification.
211 216
            modified = max(modified, mtime)
212
        
217

  
213 218
        if user != account:
214 219
            meta = {'name': account}
215 220
        else:
......
221 226
            meta.update({'name': account, 'count': count, 'bytes': bytes})
222 227
        meta.update({'modified': modified})
223 228
        return meta
224
    
229

  
225 230
    @backend_method
226 231
    def update_account_meta(self, user, account, domain, meta, replace=False):
227 232
        """Update the metadata associated with the account for the domain."""
228
        
233

  
229 234
        logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace)
230 235
        if user != account:
231 236
            raise NotAllowedError
232 237
        path, node = self._lookup_account(account, True)
233 238
        self._put_metadata(user, node, domain, meta, replace)
234
    
239

  
235 240
    @backend_method
236 241
    def get_account_groups(self, user, account):
237 242
        """Return a dictionary with the user groups defined for this account."""
238
        
243

  
239 244
        logger.debug("get_account_groups: %s %s", user, account)
240 245
        if user != account:
241 246
            if account not in self._allowed_accounts(user):
......
243 248
            return {}
244 249
        self._lookup_account(account, True)
245 250
        return self.permissions.group_dict(account)
246
    
251

  
247 252
    @backend_method
248 253
    def update_account_groups(self, user, account, groups, replace=False):
249 254
        """Update the groups associated with the account."""
250
        
255

  
251 256
        logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace)
252 257
        if user != account:
253 258
            raise NotAllowedError
......
260 265
                self.permissions.group_delete(account, k)
261 266
            if v:
262 267
                self.permissions.group_addmany(account, k, v)
263
    
268

  
264 269
    @backend_method
265 270
    def get_account_policy(self, user, account):
266 271
        """Return a dictionary with the account policy."""
267
        
272

  
268 273
        logger.debug("get_account_policy: %s %s", user, account)
269 274
        if user != account:
270 275
            if account not in self._allowed_accounts(user):
......
272 277
            return {}
273 278
        path, node = self._lookup_account(account, True)
274 279
        return self._get_policy(node)
275
    
280

  
276 281
    @backend_method
277 282
    def update_account_policy(self, user, account, policy, replace=False):
278 283
        """Update the policy associated with the account."""
279
        
284

  
280 285
        logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace)
281 286
        if user != account:
282 287
            raise NotAllowedError
283 288
        path, node = self._lookup_account(account, True)
284 289
        self._check_policy(policy)
285 290
        self._put_policy(node, policy, replace)
286
    
291

  
287 292
    @backend_method
288 293
    def put_account(self, user, account, policy={}):
289 294
        """Create a new account with the given name."""
290
        
295

  
291 296
        logger.debug("put_account: %s %s %s", user, account, policy)
292 297
        if user != account:
293 298
            raise NotAllowedError
......
298 303
            self._check_policy(policy)
299 304
        node = self._put_path(user, self.ROOTNODE, account)
300 305
        self._put_policy(node, policy, True)
301
    
306

  
302 307
    @backend_method
303 308
    def delete_account(self, user, account):
304 309
        """Delete the account with the given name."""
305
        
310

  
306 311
        logger.debug("delete_account: %s %s", user, account)
307 312
        if user != account:
308 313
            raise NotAllowedError
......
312 317
        if not self.node.node_remove(node):
313 318
            raise AccountNotEmpty('Account is not empty')
314 319
        self.permissions.group_destroy(account)
315
    
320

  
316 321
    @backend_method
317 322
    def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False):
318 323
        """Return a list of containers existing under an account."""
319
        
324

  
320 325
        logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public)
321 326
        if user != account:
322 327
            if until or account not in self._allowed_accounts(user):
......
337 342
        containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)]
338 343
        start, limit = self._list_limits([x[0] for x in containers], marker, limit)
339 344
        return containers[start:start + limit]
340
    
345

  
341 346
    @backend_method
342 347
    def list_container_meta(self, user, account, container, domain, until=None):
343 348
        """Return a list with all the container's object meta keys for the domain."""
344
        
349

  
345 350
        logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until)
346 351
        allowed = []
347 352
        if user != account:
......
354 359
        before = until if until is not None else inf
355 360
        allowed = self._get_formatted_paths(allowed)
356 361
        return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
357
    
362

  
358 363
    @backend_method
359 364
    def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True):
360 365
        """Return a dictionary with the container metadata for the domain."""
361
        
366

  
362 367
        logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until)
363 368
        if user != account:
364 369
            if until or container not in self._allowed_containers(user, account):
......
373 378
        else:
374 379
            modified = self._get_statistics(node)[2] # Overall last modification.
375 380
            modified = max(modified, mtime)
376
        
381

  
377 382
        if user != account:
378 383
            meta = {'name': container}
379 384
        else:
......
385 390
            meta.update({'name': container, 'count': count, 'bytes': bytes})
386 391
        meta.update({'modified': modified})
387 392
        return meta
388
    
393

  
389 394
    @backend_method
390 395
    def update_container_meta(self, user, account, container, domain, meta, replace=False):
391 396
        """Update the metadata associated with the container for the domain."""
392
        
397

  
393 398
        logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace)
394 399
        if user != account:
395 400
            raise NotAllowedError
......
399 404
            versioning = self._get_policy(node)['versioning']
400 405
            if versioning != 'auto':
401 406
                self.node.version_remove(src_version_id)
402
    
407

  
403 408
    @backend_method
404 409
    def get_container_policy(self, user, account, container):
405 410
        """Return a dictionary with the container policy."""
406
        
411

  
407 412
        logger.debug("get_container_policy: %s %s %s", user, account, container)
408 413
        if user != account:
409 414
            if container not in self._allowed_containers(user, account):
......
411 416
            return {}
412 417
        path, node = self._lookup_container(account, container)
413 418
        return self._get_policy(node)
414
    
419

  
415 420
    @backend_method
416 421
    def update_container_policy(self, user, account, container, policy, replace=False):
417 422
        """Update the policy associated with the container."""
418
        
423

  
419 424
        logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace)
420 425
        if user != account:
421 426
            raise NotAllowedError
422 427
        path, node = self._lookup_container(account, container)
423 428
        self._check_policy(policy)
424 429
        self._put_policy(node, policy, replace)
425
    
430

  
426 431
    @backend_method
427 432
    def put_container(self, user, account, container, policy={}):
428 433
        """Create a new container with the given name."""
429
        
434

  
430 435
        logger.debug("put_container: %s %s %s %s", user, account, container, policy)
431 436
        if user != account:
432 437
            raise NotAllowedError
......
441 446
        path = '/'.join((account, container))
442 447
        node = self._put_path(user, self._lookup_account(account, True)[1], path)
443 448
        self._put_policy(node, policy, True)
444
    
449

  
445 450
    @backend_method
446 451
    def delete_container(self, user, account, container, until=None, prefix='', delimiter=None):
447 452
        """Delete/purge the container with the given name."""
448
        
453

  
449 454
        logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter)
450 455
        if user != account:
451 456
            raise NotAllowedError
452 457
        path, node = self._lookup_container(account, container)
453
        
458

  
454 459
        if until is not None:
455 460
            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
456 461
            for h in hashes:
......
458 463
            self.node.node_purge_children(node, until, CLUSTER_DELETED)
459 464
            self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path})
460 465
            return
461
        
466

  
462 467
        if not delimiter:
463 468
            if self._get_statistics(node)[0] > 0:
464 469
                raise ContainerNotEmpty('Container is not empty')
......
482 487
                self._report_object_change(user, account, path, details={'action': 'object delete'})
483 488
                paths.append(path)
484 489
            self.permissions.access_clear_bulk(paths)
485
    
490

  
486 491
    def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public):
487 492
        if user != account and until:
488 493
            raise NotAllowedError
......
494 499
                path, node = self._lookup_container(account, container)
495 500
                shared = self._get_formatted_paths(shared)
496 501
                objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props))
497
            
502

  
498 503
            # get public
499 504
            objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props))
500 505
            objects = list(objects)
501
            
506

  
502 507
            objects.sort(key=lambda x: x[0])
503 508
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
504 509
            return objects[start:start + limit]
......
506 511
            objects = self._list_public_object_properties(user, account, container, prefix, all_props)
507 512
            start, limit = self._list_limits([x[0] for x in objects], marker, limit)
508 513
            return objects[start:start + limit]
509
        
514

  
510 515
        allowed = self._list_object_permissions(user, account, container, prefix, shared, public)
511 516
        if shared and not allowed:
512 517
            return []
......
515 520
        objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props)
516 521
        start, limit = self._list_limits([x[0] for x in objects], marker, limit)
517 522
        return objects[start:start + limit]
518
    
523

  
519 524
    def _list_public_object_properties(self, user, account, container, prefix, all_props):
520 525
        public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True)
521 526
        paths, nodes = self._lookup_objects(public)
......
525 530
        props = self.node.version_lookup_bulk(nodes, all_props=all_props)
526 531
        objects = [(path,) + props for path, props in zip(paths, props)]
527 532
        return objects
528
        
533

  
529 534
    def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public):
530 535
        objects = []
531 536
        while True:
......
536 541
            if not l or len(l) < limit:
537 542
                break
538 543
        return objects
539
    
544

  
540 545
    def _list_object_permissions(self, user, account, container, prefix, shared, public):
541 546
        allowed = []
542 547
        path = '/'.join((account, container, prefix)).rstrip('/')
......
554 559
            if not allowed:
555 560
                return []
556 561
        return allowed
557
    
562

  
558 563
    @backend_method
559 564
    def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
560 565
        """Return a list of object (name, version_id) tuples existing under a container."""
561
        
566

  
562 567
        logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
563 568
        return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public)
564
    
569

  
565 570
    @backend_method
566 571
    def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False):
567 572
        """Return a list of object metadata dicts existing under a container."""
568
        
573

  
569 574
        logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public)
570 575
        props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public)
571 576
        objects = []
......
584 589
                                'uuid': p[self.UUID + 1],
585 590
                                'checksum': p[self.CHECKSUM + 1]})
586 591
        return objects
587
    
592

  
588 593
    @backend_method
589 594
    def list_object_permissions(self, user, account, container, prefix=''):
590 595
        """Return a list of paths that enforce permissions under a container."""
591
        
596

  
592 597
        logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix)
593 598
        return self._list_object_permissions(user, account, container, prefix, True, False)
594
    
599

  
595 600
    @backend_method
596 601
    def list_object_public(self, user, account, container, prefix=''):
597 602
        """Return a dict mapping paths to public ids for objects that are public under a container."""
598
        
603

  
599 604
        logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix)
600 605
        public = {}
601 606
        for path, p in self.permissions.public_list('/'.join((account, container, prefix))):
602 607
            public[path] = p + ULTIMATE_ANSWER
603 608
        return public
604
    
609

  
605 610
    @backend_method
606 611
    def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True):
607 612
        """Return a dictionary with the object metadata for the domain."""
608
        
613

  
609 614
        logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version)
610 615
        self._can_read(user, account, container, name)
611 616
        path, node = self._lookup_object(account, container, name)
......
620 625
                if del_props is None:
621 626
                    raise ItemNotExists('Object does not exist')
622 627
                modified = del_props[self.MTIME]
623
        
628

  
624 629
        meta = {}
625 630
        if include_user_defined:
626 631
            meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
......
635 640
                     'uuid': props[self.UUID],
636 641
                     'checksum': props[self.CHECKSUM]})
637 642
        return meta
638
    
643

  
639 644
    @backend_method
640 645
    def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
641 646
        """Update the metadata associated with the object for the domain and return the new version."""
642
        
647

  
643 648
        logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace)
644 649
        self._can_write(user, account, container, name)
645 650
        path, node = self._lookup_object(account, container, name)
646 651
        src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
647 652
        self._apply_versioning(account, container, src_version_id)
648 653
        return dest_version_id
649
    
654

  
650 655
    @backend_method
651 656
    def get_object_permissions(self, user, account, container, name):
652 657
        """Return the action allowed on the object, the path
653 658
        from which the object gets its permissions from,
654 659
        along with a dictionary containing the permissions."""
655
        
660

  
656 661
        logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name)
657 662
        allowed = 'write'
658 663
        permissions_path = self._get_permissions_path(account, container, name)
......
665 670
                raise NotAllowedError
666 671
        self._lookup_object(account, container, name)
667 672
        return (allowed, permissions_path, self.permissions.access_get(permissions_path))
668
    
673

  
669 674
    @backend_method
670 675
    def update_object_permissions(self, user, account, container, name, permissions):
671 676
        """Update the permissions associated with the object."""
672
        
677

  
673 678
        logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions)
674 679
        if user != account:
675 680
            raise NotAllowedError
......
677 682
        self._check_permissions(path, permissions)
678 683
        self.permissions.access_set(path, permissions)
679 684
        self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
680
    
685

  
681 686
    @backend_method
682 687
    def get_object_public(self, user, account, container, name):
683 688
        """Return the public id of the object if applicable."""
684
        
689

  
685 690
        logger.debug("get_object_public: %s %s %s %s", user, account, container, name)
686 691
        self._can_read(user, account, container, name)
687 692
        path = self._lookup_object(account, container, name)[0]
......
689 694
        if p is not None:
690 695
            p += ULTIMATE_ANSWER
691 696
        return p
692
    
697

  
693 698
    @backend_method
694 699
    def update_object_public(self, user, account, container, name, public):
695 700
        """Update the public status of the object."""
696
        
701

  
697 702
        logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public)
698 703
        self._can_write(user, account, container, name)
699 704
        path = self._lookup_object(account, container, name)[0]
......
701 706
            self.permissions.public_unset(path)
702 707
        else:
703 708
            self.permissions.public_set(path)
704
    
709

  
705 710
    @backend_method
706 711
    def get_object_hashmap(self, user, account, container, name, version=None):
707 712
        """Return the object's size and a list with partial hashes."""
708
        
713

  
709 714
        logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version)
710 715
        self._can_read(user, account, container, name)
711 716
        path, node = self._lookup_object(account, container, name)
712 717
        props = self._get_version(node, version)
713 718
        hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
714 719
        return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
715
    
720

  
716 721
    def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False):
717 722
        if permissions is not None and user != account:
718 723
            raise NotAllowedError
......
720 725
        if permissions is not None:
721 726
            path = '/'.join((account, container, name))
722 727
            self._check_permissions(path, permissions)
723
        
728

  
724 729
        account_path, account_node = self._lookup_account(account, True)
725 730
        container_path, container_node = self._lookup_container(account, container)
726 731
        path, node = self._put_object_node(container_path, container_node, name)
727 732
        pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy)
728
        
733

  
729 734
        # Handle meta.
730 735
        if src_version_id is None:
731 736
            src_version_id = pre_version_id
732 737
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
733
        
738

  
734 739
        # Check quota.
735 740
        del_size = self._apply_versioning(account, container, pre_version_id)
736 741
        size_delta = size - del_size
......
742 747
                # This must be executed in a transaction, so the version is never created if it fails.
743 748
                raise QuotaError
744 749
        self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path})
745
        
750

  
746 751
        if permissions is not None:
747 752
            self.permissions.access_set(path, permissions)
748 753
            self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)})
749
        
754

  
750 755
        self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'})
751 756
        return dest_version_id
752
    
757

  
753 758
    @backend_method
754 759
    def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None):
755 760
        """Create/update an object with the specified size and partial hashes."""
756
        
761

  
757 762
        logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum)
758 763
        if size == 0: # No such thing as an empty hashmap.
759 764
            hashmap = [self.put_block('')]
......
764 769
            ie = IndexError()
765 770
            ie.data = [binascii.hexlify(x) for x in missing]
766 771
            raise ie
767
        
772

  
768 773
        hash = map.hash()
769 774
        dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions)
770 775
        self.store.map_put(hash, map)
771 776
        return dest_version_id
772
    
777

  
773 778
    @backend_method
774 779
    def update_object_checksum(self, user, account, container, name, version, checksum):
775 780
        """Update an object's checksum."""
776
        
781

  
777 782
        logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum)
778 783
        # Update objects with greater version and same hashmap and size (fix metadata updates).
779 784
        self._can_write(user, account, container, name)
......
783 788
        for x in versions:
784 789
            if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]:
785 790
                self.node.version_put_property(x[self.SERIAL], 'checksum', checksum)
786
    
791

  
787 792
    def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None):
788 793
        dest_version_ids = []
789 794
        self._can_read(user, src_account, src_container, src_name)
......
797 802
        dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy))
798 803
        if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
799 804
        	self._delete_object(user, src_account, src_container, src_name)
800
        
805

  
801 806
        if delimiter:
802 807
            prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name
803 808
            src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
......
806 811
            nodes = [elem[2] for elem in src_names]
807 812
            # TODO: Will do another fetch of the properties in duplicate version...
808 813
            props = self._get_versions(nodes) # Check to see if source exists.
809
            
814

  
810 815
            for prop, path, node in zip(props, paths, nodes):
811 816
                src_version_id = prop[self.SERIAL]
812 817
                hash = prop[self.HASH]
......
818 823
                if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
819 824
                	self._delete_object(user, src_account, src_container, path)
820 825
        return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids
821
    
826

  
822 827
    @backend_method
823 828
    def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None):
824 829
        """Copy an object's data and metadata."""
825
        
830

  
826 831
        logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter)
827 832
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter)
828 833
        return dest_version_id
829
    
834

  
830 835
    @backend_method
831 836
    def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None):
832 837
        """Move an object's data and metadata."""
833
        
838

  
834 839
        logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter)
835 840
        if user != src_account:
836 841
            raise NotAllowedError
837 842
        dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter)
838 843
        return dest_version_id
839
    
844

  
840 845
    def _delete_object(self, user, account, container, name, until=None, delimiter=None):
841 846
        if user != account:
842 847
            raise NotAllowedError
843
        
848

  
844 849
        if until is not None:
845 850
            path = '/'.join((account, container, name))
846 851
            node = self.node.node_lookup(path)
......
863 868
                self.permissions.access_clear(path)
864 869
            self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path})
865 870
            return
866
        
871

  
867 872
        path, node = self._lookup_object(account, container, name)
868 873
        src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
869 874
        del_size = self._apply_versioning(account, container, src_version_id)
......
871 876
            self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path})
872 877
        self._report_object_change(user, account, path, details={'action': 'object delete'})
873 878
        self.permissions.access_clear(path)
874
        
879

  
875 880
        if delimiter:
876 881
            prefix = name + delimiter if not name.endswith(delimiter) else name
877 882
            src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
......
886 891
                self._report_object_change(user, account, path, details={'action': 'object delete'})
887 892
                paths.append(path)
888 893
            self.permissions.access_clear_bulk(paths)
889
    
894

  
890 895
    @backend_method
891 896
    def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None):
892 897
        """Delete/purge an object."""
893
        
898

  
894 899
        logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter)
895 900
        self._delete_object(user, account, container, name, until, delimiter)
896
    
901

  
897 902
    @backend_method
898 903
    def list_versions(self, user, account, container, name):
899 904
        """Return a list of all (version, version_timestamp) tuples for an object."""
900
        
905

  
901 906
        logger.debug("list_versions: %s %s %s %s", user, account, container, name)
902 907
        self._can_read(user, account, container, name)
903 908
        path, node = self._lookup_object(account, container, name)
904 909
        versions = self.node.node_get_versions(node)
905 910
        return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
906
    
911

  
907 912
    @backend_method
908 913
    def get_uuid(self, user, uuid):
909 914
        """Return the (account, container, name) for the UUID given."""
910
        
915

  
911 916
        logger.debug("get_uuid: %s %s", user, uuid)
912 917
        info = self.node.latest_uuid(uuid)
913 918
        if info is None:
......
916 921
        account, container, name = path.split('/', 2)
917 922
        self._can_read(user, account, container, name)
918 923
        return (account, container, name)
919
    
924

  
920 925
    @backend_method
921 926
    def get_public(self, user, public):
922 927
        """Return the (account, container, name) for the public id given."""
923
        
928

  
924 929
        logger.debug("get_public: %s %s", user, public)
925 930
        if public is None or public < ULTIMATE_ANSWER:
926 931
            raise NameError
......
930 935
        account, container, name = path.split('/', 2)
931 936
        self._can_read(user, account, container, name)
932 937
        return (account, container, name)
933
    
938

  
934 939
    @backend_method(autocommit=0)
935 940
    def get_block(self, hash):
936 941
        """Return a block's data."""
937
        
942

  
938 943
        logger.debug("get_block: %s", hash)
939 944
        block = self.store.block_get(binascii.unhexlify(hash))
940 945
        if not block:
941 946
            raise ItemNotExists('Block does not exist')
942 947
        return block
943
    
948

  
944 949
    @backend_method(autocommit=0)
945 950
    def put_block(self, data):
946 951
        """Store a block and return the hash."""
947
        
952

  
948 953
        logger.debug("put_block: %s", len(data))
949 954
        return binascii.hexlify(self.store.block_put(data))
950
    
955

  
951 956
    @backend_method(autocommit=0)
952 957
    def update_block(self, hash, data, offset=0):
953 958
        """Update a known block and return the hash."""
954
        
959

  
955 960
        logger.debug("update_block: %s %s %s", hash, len(data), offset)
956 961
        if offset == 0 and len(data) == self.block_size:
957 962
            return self.put_block(data)
958 963
        h = self.store.block_update(binascii.unhexlify(hash), offset, data)
959 964
        return binascii.hexlify(h)
960
    
965

  
961 966
    # Path functions.
962
    
967

  
963 968
    def _generate_uuid(self):
964 969
        return str(uuidlib.uuid4())
965
    
970

  
966 971
    def _put_object_node(self, path, parent, name):
967 972
        path = '/'.join((path, name))
968 973
        node = self.node.node_lookup(path)
969 974
        if node is None:
970 975
            node = self.node.node_create(parent, path)
971 976
        return path, node
972
    
977

  
973 978
    def _put_path(self, user, parent, path):
974 979
        node = self.node.node_create(parent, path)
975 980
        self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL)
976 981
        return node
977
    
982

  
978 983
    def _lookup_account(self, account, create=True):
979 984
        node = self.node.node_lookup(account)
980 985
        if node is None and create:
981 986
            node = self._put_path(account, self.ROOTNODE, account) # User is account.
982 987
        return account, node
983
    
988

  
984 989
    def _lookup_container(self, account, container):
985 990
        path = '/'.join((account, container))
986 991
        node = self.node.node_lookup(path)
987 992
        if node is None:
988 993
            raise ItemNotExists('Container does not exist')
989 994
        return path, node
990
    
995

  
991 996
    def _lookup_object(self, account, container, name):
992 997
        path = '/'.join((account, container, name))
993 998
        node = self.node.node_lookup(path)
994 999
        if node is None:
995 1000
            raise ItemNotExists('Object does not exist')
996 1001
        return path, node
997
    
1002

  
998 1003
    def _lookup_objects(self, paths):
999 1004
        nodes = self.node.node_lookup_bulk(paths)
1000 1005
        return paths, nodes
1001
    
1006

  
1002 1007
    def _get_properties(self, node, until=None):
1003 1008
        """Return properties until the timestamp given."""
1004
        
1009

  
1005 1010
        before = until if until is not None else inf
1006 1011
        props = self.node.version_lookup(node, before, CLUSTER_NORMAL)
1007 1012
        if props is None and until is not None:
......
1009 1014
        if props is None:
1010 1015
            raise ItemNotExists('Path does not exist')
1011 1016
        return props
1012
    
1017

  
1013 1018
    def _get_statistics(self, node, until=None):
1014 1019
        """Return count, sum of size and latest timestamp of everything under node."""
1015
        
1020

  
1016 1021
        if until is None:
1017 1022
            stats = self.node.statistics_get(node, CLUSTER_NORMAL)
1018 1023
        else:
......
1020 1025
        if stats is None:
1021 1026
            stats = (0, 0, 0)
1022 1027
        return stats
1023
    
1028

  
1024 1029
    def _get_version(self, node, version=None):
1025 1030
        if version is None:
1026 1031
            props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
......
1038 1043

  
1039 1044
    def _get_versions(self, nodes):
1040 1045
        return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL)
1041
    
1046

  
1042 1047
    def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False):
1043 1048
        """Create a new version of the node."""
1044
        
1049

  
1045 1050
        props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL)
1046 1051
        if props is not None:
1047 1052
            src_version_id = props[self.SERIAL]
......
1063 1068
        if checksum is None:
1064 1069
            checksum = src_checksum
1065 1070
        uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID]
1066
        
1071

  
1067 1072
        if src_node is None:
1068 1073
            pre_version_id = src_version_id
1069 1074
        else:
......
1073 1078
                pre_version_id = props[self.SERIAL]
1074 1079
        if pre_version_id is not None:
1075 1080
            self.node.version_recluster(pre_version_id, CLUSTER_HISTORY)
1076
        
1081

  
1077 1082
        dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster)
1078 1083
        return pre_version_id, dest_version_id
1079
    
1084

  
1080 1085
    def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
1081 1086
        if src_version_id is not None:
1082 1087
            self.node.attribute_copy(src_version_id, dest_version_id)
......
1086 1091
        else:
1087 1092
            self.node.attribute_del(dest_version_id, domain)
1088 1093
            self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
1089
    
1094

  
1090 1095
    def _put_metadata(self, user, node, domain, meta, replace=False):
1091 1096
        """Create a new version and store metadata."""
1092
        
1097

  
1093 1098
        src_version_id, dest_version_id = self._put_version_duplicate(user, node)
1094 1099
        self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
1095 1100
        return src_version_id, dest_version_id
1096
    
1101

  
1097 1102
    def _list_limits(self, listing, marker, limit):
1098 1103
        start = 0
1099 1104
        if marker:
......
1104 1109
        if not limit or limit > 10000:
1105 1110
            limit = 10000
1106 1111
        return start, limit
1107
    
1112

  
1108 1113
    def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False):
1109 1114
        cont_prefix = path + '/'
1110 1115
        prefix = cont_prefix + prefix
......
1112 1117
        before = until if until is not None else inf
1113 1118
        filterq = keys if domain else []
1114 1119
        sizeq = size_range
1115
        
1120

  
1116 1121
        objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props)
1117 1122
        objects.extend([(p, None) for p in prefixes] if virtual else [])
1118 1123
        objects.sort(key=lambda x: x[0])
1119 1124
        objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects]
1120 1125
        return objects
1121
        
1126

  
1122 1127
    # Reporting functions.
1123
    
1128

  
1124 1129
    def _report_size_change(self, user, account, size, details={}):
1125 1130
        account_node = self._lookup_account(account, True)[1]
1126 1131
        total = self._get_statistics(account_node)[1]
1127 1132
        details.update({'user': user, 'total': total})
1128 1133
        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
1129 1134
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details))
1130
    
1135

  
1131 1136
    def _report_object_change(self, user, account, path, details={}):
1132 1137
        details.update({'user': user})
1133 1138
        logger.debug("_report_object_change: %s %s %s %s", user, account, path, details)
1134 1139
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details))
1135
    
1140

  
1136 1141
    def _report_sharing_change(self, user, account, path, details={}):
1137 1142
        logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details)
1138 1143
        details.update({'user': user})
1139 1144
        self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details))
1140
    
1145

  
1141 1146
    # Policy functions.
1142
    
1147

  
1143 1148
    def _check_policy(self, policy):
1144 1149
        for k in policy.keys():
1145 1150
            if policy[k] == '':
......
1154 1159
                    raise ValueError
1155 1160
            else:
1156 1161
                raise ValueError
1157
    
1162

  
1158 1163
    def _put_policy(self, node, policy, replace):
1159 1164
        if replace:
1160 1165
            for k, v in self.default_policy.iteritems():
1161 1166
                if k not in policy:
1162 1167
                    policy[k] = v
1163 1168
        self.node.policy_set(node, policy)
1164
    
1169

  
1165 1170
    def _get_policy(self, node):
1166 1171
        policy = self.default_policy.copy()
1167 1172
        policy.update(self.node.policy_get(node))
1168 1173
        return policy
1169
    
1174

  
1170 1175
    def _apply_versioning(self, account, container, version_id):
1171 1176
        """Delete the provided version if such is the policy.
1172 1177
           Return size of object removed.
1173 1178
        """
1174
        
1179

  
1175 1180
        if version_id is None:
1176 1181
            return 0
1177 1182
        path, node = self._lookup_container(account, container)
......
1181 1186
            self.store.map_delete(hash)
1182 1187
            return size
1183 1188
        return 0
1184
    
1189

  
1185 1190
    # Access control functions.
1186
    
1191

  
1187 1192
    def _check_groups(self, groups):
1188 1193
        # raise ValueError('Bad characters in groups')
1189 1194
        pass
1190
    
1195

  
1191 1196
    def _check_permissions(self, path, permissions):
1192 1197
        # raise ValueError('Bad characters in permissions')
1193 1198
        pass
1194
    
1199

  
1195 1200
    def _get_formatted_paths(self, paths):
1196 1201
        formatted = []
1197 1202
        for p in paths:
......
1204 1209
                    formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX))
1205 1210
                formatted.append((p, self.MATCH_EXACT))
1206 1211
        return formatted
1207
    
1212

  
1208 1213
    def _get_permissions_path(self, account, container, name):
1209 1214
        path = '/'.join((account, container, name))
1210 1215
        permission_paths = self.permissions.access_inherit(path)
......
1223 1228
                    if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'):
1224 1229
                        return p
1225 1230
        return None
1226
    
1231

  
1227 1232
    def _can_read(self, user, account, container, name):
1228 1233
        if user == account:
1229 1234
            return True
......
1235 1240
            raise NotAllowedError
1236 1241
        if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user):
1237 1242
            raise NotAllowedError
1238
    
1243

  
1239 1244
    def _can_write(self, user, account, container, name):
1240 1245
        if user == account:
1241 1246
            return True
......
1245 1250
            raise NotAllowedError
1246 1251
        if not self.permissions.access_check(path, self.WRITE, user):
1247 1252
            raise NotAllowedError
1248
    
1253

  
1249 1254
    def _allowed_accounts(self, user):
1250 1255
        allow = set()
1251 1256
        for path in self.permissions.access_list_paths(user):
1252 1257
            allow.add(path.split('/', 1)[0])
1253 1258
        return sorted(allow)
1254
    
1259

  
1255 1260
    def _allowed_containers(self, user, account):
1256 1261
        allow = set()
1257 1262
        for path in self.permissions.access_list_paths(user, account):

Also available in: Unified diff