Revision eb83c485

b/snf-pithos-app/pithos/api/management/commands/reconcile-resources-pithos.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
1
# Copyright 2012, 2013 GRNET S.A. All rights reserved.
2 2
#
3 3
# Redistribution and use in source and binary forms, with or
4 4
# without modification, are permitted provided that the following
......
31 31
# interpreted as representing official policies, either expressed
32 32
# or implied, of GRNET S.A.
33 33

  
34
from django.core.management.base import NoArgsCommand
34
from datetime import datetime
35
from django.core.management.base import NoArgsCommand, CommandError
35 36

  
36 37
from optparse import make_option
37 38

  
38 39
from pithos.api.util import get_backend
39
from pithos.api.resources import resources
40
from pithos.backends.modular import DEFAULT_SOURCE
41 40

  
42 41
from snf_django.management import utils
43 42

  
44 43
from astakosclient.errors import QuotaLimit, NotFound
44
from snf_django.utils import reconcile
45 45

  
46 46
backend = get_backend()
47
RESOURCES = ['pithos.diskspace']
47 48

  
48 49

  
49 50
class Command(NoArgsCommand):
......
57 58
        make_option("--userid", dest="userid",
58 59
                    default=None,
59 60
                    help="Reconcile resources only for this user"),
61
        make_option("--project",
62
                    help="Reconcile resources only for this project"),
60 63
        make_option("--fix", dest="fix",
61 64
                    default=False,
62 65
                    action="store_true",
......
69 72
    )
70 73

  
71 74
    def handle_noargs(self, **options):
75
        write = self.stdout.write
72 76
        try:
73 77
            backend.pre_exec()
74 78
            userid = options['userid']
79
            project = options['project']
75 80

  
76 81
            # Get holding from Pithos DB
77
            db_usage = backend.node.node_account_usage(userid)
82
            db_usage = backend.node.node_account_usage(userid, project)
83
            db_project_usage = backend.node.node_project_usage(project)
78 84

  
79 85
            users = set(db_usage.keys())
80 86
            if userid and userid not in users:
81 87
                if backend._lookup_account(userid) is None:
82
                    self.stdout.write("User '%s' does not exist in DB!\n" %
83
                                      userid)
88
                    write("User '%s' does not exist in DB!\n" % userid)
84 89
                    return
85 90

  
86 91
            # Get holding from Quotaholder
87 92
            try:
88 93
                qh_result = backend.astakosclient.service_get_quotas(userid)
89 94
            except NotFound:
90
                self.stdout.write(
91
                    "User '%s' does not exist in Quotaholder!\n" % userid)
95
                write("User '%s' does not exist in Quotaholder!\n" % userid)
92 96
                return
93 97

  
94
            users.update(qh_result.keys())
95

  
96
            pending_exists = False
97
            unknown_user_exists = False
98
            unsynced = []
99
            for uuid in users:
100
                db_value = db_usage.get(uuid, 0)
101
                try:
102
                    qh_all = qh_result[uuid]
103
                except KeyError:
104
                    self.stdout.write(
105
                        "User '%s' does not exist in Quotaholder!\n" % uuid)
106
                    unknown_user_exists = True
107
                    continue
108
                else:
109
                    qh = qh_all.get(DEFAULT_SOURCE, {})
110
                    for resource in [r['name'] for r in resources]:
111
                        try:
112
                            qh_resource = qh[resource]
113
                        except KeyError:
114
                            self.stdout.write(
115
                                "Resource '%s' does not exist in Quotaholder "
116
                                "for user '%s'!\n" % (resource, uuid))
117
                            continue
118

  
119
                        if qh_resource['pending']:
120
                            self.stdout.write(
121
                                "Pending commission. "
122
                                "User '%s', resource '%s'.\n" %
123
                                (uuid, resource))
124
                            pending_exists = True
125
                            continue
126

  
127
                        qh_value = qh_resource['usage']
128

  
129
                        if db_value != qh_value:
130
                            data = (uuid, resource, db_value, qh_value)
131
                            unsynced.append(data)
132

  
98
            try:
99
                qh_project_result = \
100
                    backend.astakosclient.service_get_project_quotas(project)
101
            except NotFound:
102
                write("Project '%s' does not exist in Quotaholder!\n" %
103
                      project)
104

  
105
            unsynced_users, users_pending, users_unknown =\
106
                reconcile.check_users(self.stderr, RESOURCES,
107
                                      db_usage, qh_result)
108

  
109
            unsynced_projects, projects_pending, projects_unknown =\
110
            reconcile.check_projects(self.stderr, RESOURCES,
111
                                     db_project_usage, qh_project_result)
112
            pending_exists = users_pending or projects_pending
113
            unknown_exists = users_unknown or projects_unknown
114

  
115
            headers = ("Type", "Holder", "Source", "Resource",
116
                       "Database", "Quotaholder")
117
            unsynced = unsynced_users + unsynced_projects
133 118
            if unsynced:
134
                headers = ("User", "Resource", "Database", "Quotaholder")
135 119
                utils.pprint_table(self.stdout, unsynced, headers)
136
                if options['fix']:
137
                    request = {}
138
                    request['force'] = options['force']
139
                    request['auto_accept'] = True
140
                    request['name'] = "RECONCILE"
141
                    request['provisions'] = map(create_provision, unsynced)
120
                if options["fix"]:
121
                    force = options["force"]
122
                    name = ("client: reconcile-resources-pithos, time: %s"
123
                            % datetime.now())
124
                    user_provisions = reconcile.create_user_provisions(
125
                        unsynced_users)
126
                    project_provisions = reconcile.create_project_provisions(
127
                        unsynced_projects)
142 128
                    try:
143
                        backend.astakosclient.issue_commission(request)
129
                        backend.astakosclient.issue_commission_generic(
130
                            user_provisions, project_provisions, name=name,
131
                            force=force, auto_accept=True)
144 132
                    except QuotaLimit:
145
                        self.stdout.write(
146
                            "Reconciling failed because a limit has been "
147
                            "reached. Use --force to ignore the check.\n")
133
                        write("Reconciling failed because a limit has been "
134
                              "reached. Use --force to ignore the check.\n")
148 135
                        return
149
                    self.stdout.write("Fixed unsynced resources\n")
136
                    write("Fixed unsynced resources\n")
150 137

  
151 138
            if pending_exists:
152
                self.stdout.write(
153
                    "Found pending commissions. Run 'snf-manage"
154
                    " reconcile-commissions-pithos'\n")
155
            elif not (unsynced or unknown_user_exists):
156
                self.stdout.write("Everything in sync.\n")
139
                write("Found pending commissions. Run 'snf-manage"
140
                      " reconcile-commissions-pithos'\n")
141
            elif not (unsynced or unknown_exists):
142
                write("Everything in sync.\n")
157 143
        except BaseException as e:
158 144
            backend.post_exec(False)
159
            self.stdout.write(str(e) + "\n")
145
            raise CommandError(e)
160 146
        else:
161 147
            backend.post_exec(True)
162 148
        finally:
163 149
            backend.close()
164

  
165

  
166
def create_provision(provision_info):
167
    user, resource, db_value, qh_value = provision_info
168
    return {"holder": user,
169
            "source": DEFAULT_SOURCE,
170
            "resource": resource,
171
            "quantity": int(db_value - qh_value)}
b/snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
1
# Copyright 2011-2012 GRNET S.A. All rights reserved.
1
# Copyright 2011, 2012, 2013 GRNET S.A. All rights reserved.
2 2
#
3 3
# Redistribution and use in source and binary forms, with or
4 4
# without modification, are permitted provided that the following
......
34 34
from time import time
35 35
from operator import itemgetter
36 36
from itertools import groupby
37
from collections import defaultdict
37 38

  
38 39
from sqlalchemy import (Table, Integer, BigInteger, DECIMAL, Boolean,
39 40
                        Column, String, MetaData, ForeignKey)
......
45 46

  
46 47
from pithos.backends.filter import parse_filters
47 48

  
48

  
49
DEFAULT_DISKSPACE_RESOURCE = 'pithos.diskspace'
49 50
ROOTNODE = 0
50 51

  
51 52
(SERIAL, NODE, HASH, SIZE, TYPE, SOURCE, MTIME, MUSER, UUID, CHECKSUM,
......
499 500
        r.close()
500 501
        return dict(rows)
501 502

  
502
    def node_account_usage(self, account=None, cluster=0):
503
        """Return usage for a specific account.
503
    def node_account_usage(self, account=None, project=None, cluster=0):
504
        """Return a dict of dicts with the project usage for a specific account.
504 505

  
505 506
        Keyword arguments:
506
        account -- (default None: list usage for all the accounts)
507
        account -- (default None: list usage for all accounts)
508
        project -- (default None: list usage for all projects)
507 509
        cluster -- list current, history or deleted usage (default 0: normal)
508 510
        """
509 511

  
......
511 513
        n2 = self.nodes.alias('n2')
512 514
        n3 = self.nodes.alias('n3')
513 515

  
514
        s = select([n3.c.path, func.sum(self.versions.c.size)])
516
        s = select([n3.c.path, self.policy.c.value,
517
                    func.sum(self.versions.c.size)])
518
        s = s.where(self.policy.c.key == 'project')
519
        s = s.where(self.policy.c.node == n2.c.node)
515 520
        s = s.where(n1.c.node == self.versions.c.node)
516 521
        s = s.where(self.versions.c.cluster == cluster)
517 522
        s = s.where(n1.c.parent == n2.c.node)
518 523
        s = s.where(n2.c.parent == n3.c.node)
519 524
        s = s.where(n3.c.parent == 0)
520 525
        s = s.where(n3.c.node != 0)
526
        s = s.group_by(n3.c.path, self.policy.c.value)
521 527
        if account:
522 528
            s = s.where(n3.c.path == account)
523
        s = s.group_by(n3.c.path)
529
        if project:
530
            s = s.where(self.policy.c.value == project)
524 531
        r = self.conn.execute(s)
525
        usage = r.fetchall()
532
        rows = r.fetchall()
526 533
        r.close()
527
        return dict(usage)
534
        d = defaultdict(dict)
535
        for account, project, usage in rows:
536
            d[account][project][DEFAULT_DISKSPACE_RESOURCE] = usage
537
        return d
538

  
539
    def node_project_usage(self, project=None, cluster=0):
540
        """Return a dict of dicts with the project usage for a specific account.
541

  
542
        Keyword arguments:
543
        project -- (default None: list usage for all projects)
544
        cluster -- list current, history or deleted usage (default 0: normal)
545
        """
546

  
547
        n1 = self.nodes.alias('n1')
548
        n2 = self.nodes.alias('n2')
549
        n3 = self.nodes.alias('n3')
550

  
551
        s = select([self.policy.c.value,
552
                    func.sum(self.versions.c.size)])
553
        s = s.where(self.policy.c.key == 'project')
554
        s = s.where(self.policy.c.node == n2.c.node)
555
        s = s.where(n1.c.node == self.versions.c.node)
556
        s = s.where(self.versions.c.cluster == cluster)
557
        s = s.where(n1.c.parent == n2.c.node)
558
        s = s.where(n2.c.parent == n3.c.node)
559
        # s = s.where(n3.c.parent == 0)
560
        # s = s.where(n3.c.node != 0)
561
        s = s.group_by(self.policy.c.value)
562
        if project:
563
            s = s.where(self.policy.c.value == project)
564
        r = self.conn.execute(s)
565
        rows = r.fetchall()
566
        r.close()
567
        d = defaultdict(dict)
568
        for project, usage in rows:
569
            d[project][DEFAULT_DISKSPACE_RESOURCE] = usage
570
        return d
528 571

  
529 572
    def policy_get(self, node):
530 573
        s = select([self.policy.c.key, self.policy.c.value],

Also available in: Unified diff