Revision 9fea53cc

b/README.upgrade
4 4
This file documents the upgrade to newer versions of the Synnefo software.
5 5
For more information, please see README.deploy.
6 6

  
7
v0.5.5 -> v0.5.6
8
RECONCILIATION
9
    * Implemented new reconciliation management command, please see
10
      ./manage.py reconcile --help for more info.
11
      Recommended to run ./manage.py reconcile --detect-all periodically,
12
      via cron.
13

  
14

  
7 15
v0.5.4 -> v0.5.5
8 16
LOGGING
9 17
    * Changed the default logging settings for the dispatcher to also log
b/logic/management/commands/reconcile.py
27 27
# those of the authors and should not be interpreted as representing official
28 28
# policies, either expressed or implied, of GRNET S.A.
29 29
#
30
# Reconcile VM state - Management Script
31
from synnefo.db.models import VirtualMachine
32
from django.db.models import Q
33
from django.conf import settings
30
"""Reconciliation management command
31

  
32
Management command to reconcile the contents of the Synnefo DB with
33
the state of the Ganeti backend. See docstring on top of logic/reconcile.py
34
for a description of reconciliation rules.
35

  
36
"""
37
import sys
38

  
34 39
from datetime import datetime, timedelta
35 40
from optparse import make_option
41

  
42
from django.conf import settings
43
from django.db.models import Q
36 44
from django.core.management.base import BaseCommand
37 45

  
38
from synnefo.logic import amqp_connection
39
from synnefo.logic.amqp_connection import AMQPError
46
from synnefo.db.models import VirtualMachine
47
from synnefo.logic import reconciliation, backend
48
from synnefo.util.rapi import GanetiRapiClient
40 49

  
41
import json
42
import sys
43 50

  
44 51
class Command(BaseCommand):
45
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
46
    help = 'Reconcile VM status with the backend'
47

  
48
    option_list = BaseCommand.option_list +  (
49
         make_option('--all', action='store_true', dest='all_vms', default=False,
50
                     help='Run the reconciliation function for all VMs, now'),
51
         make_option('--interval', action='store', dest='interval', default=1,
52
                     help='Interval in minutes between reconciliations'),
53
    )
54

  
55
    def handle(self, all_vms = False, interval = 1, **options):
56
        all =  VirtualMachine.objects.filter(Q(deleted = False) &
57
                                             Q(suspended = False))
58

  
59
        if not all_vms:
60
            now = datetime.now()
61
            last_update = timedelta(minutes = settings.RECONCILIATION_MIN)
62
            not_updated = VirtualMachine.objects.filter(Q(deleted = False) &
63
                                                        Q(suspended = False) &
64
                                                        Q(updated__lte = (now - last_update)))
65

  
66
            to_update = ((all.count() / settings.RECONCILIATION_MIN) * interval)
67
        else:
68
            to_update = all.count()
69
            not_updated = all
70

  
71
        vm_ids = map(lambda x: x.id, not_updated[:to_update])
72

  
73
        for vmid in vm_ids :
74
            msg = dict(type = "reconcile", vmid = vmid)
75
            try:
76
                amqp_connection.send(json.dumps(msg), settings.EXCHANGE_CRON,
77
                                 "reconciliation.%s.%s" % (self.prefix,vmid))
78
            except AMQPError as e:
79
                print >> sys.stderr, 'Error sending reconciliation request: %s' % e
80
                raise
81

  
82
        print "All: %d, To update: %d, Triggered update for: %s" % \
83
              (all.count(), not_updated.count(), vm_ids)
52
    can_import_settings = True
53

  
54
    help = 'Reconcile contents of Synnefo DB with state of Ganeti backend'
55
    output_transaction = True  # The management command runs inside
56
                               # an SQL transaction
57
    option_list = BaseCommand.option_list + (
58
        make_option('--detect-stale', action='store_true', dest='detect_stale',
59
                    default=False, help='Detect stale VM entries in DB'),
60
        make_option('--detect-orphans', action='store_true',
61
                    dest='detect_orphans',
62
                    default=False, help='Detect orphan instances in Ganeti'),
63
        make_option('--detect-unsynced', action='store_true',
64
                    dest='detect_unsynced',
65
                    default=False, help='Detect unsynced operstate between ' +
66
                                        'DB and Ganeti'),
67
        make_option('--detect-all', action='store_true',
68
                    dest='detect_all',
69
                    default=False, help='Enable all --detect-* arguments'),
70
        make_option('--fix-stale', action='store_true', dest='fix_stale',
71
                    default=False, help='Fix (remove) stale DB entries in DB'),
72
        make_option('--fix-orphans', action='store_true', dest='fix_orphans',
73
                    default=False, help='Fix (remove) orphan Ganeti VMs'),
74
        make_option('--fix-unsynced', action='store_true', dest='fix_unsynced',
75
                    default=False, help='Fix server operstate in DB, set ' +
76
                                        'from Ganeti'),
77
        make_option('--fix-all', action='store_true', dest='fix_all',
78
                    default=False, help='Enable all --fix-* arguments'))
79

  
80
    def _process_args(self, options):
81
        keys_detect = [k for k in options.keys() if k.startswith('detect_')]
82
        keys_fix = [k for k in options.keys() if k.startswith('fix_')]
83

  
84
        if options['detect_all']:
85
            for kd in keys_detect:
86
                options[kd] = True
87
        if options['fix_all']:
88
            for kf in keys_fix:
89
                options[kf] = True
90

  
91
        if not reduce(lambda x, y: x or y,
92
                      map(lambda x: options[x], keys_detect)):
93
            raise Exception("At least one of --detect-* must be specified")
94

  
95
        for kf in keys_fix:
96
            kd = kf.replace('fix_', 'detect_', 1)
97
            if (options[kf] and not options[kd]):
98
                raise Exception("Cannot use --%s without corresponding "
99
                                "--%s argument" % (kf, kd))
100

  
101
    def handle(self, **options):
102
        verbosity = int(options['verbosity'])
103
        self._process_args(options)
104

  
105
        D = reconciliation.get_servers_from_db()
106
        G = reconciliation.get_instances_from_ganeti()
107

  
108
        #
109
        # Detect problems
110
        #
111
        if options['detect_stale']:
112
            stale = reconciliation.stale_servers_in_db(D, G)
113
            if len(stale) > 0:
114
                print >> sys.stderr, "Found the following stale server IDs: "
115
                print "    " + "\n    ".join(
116
                    [str(x) for x in stale])
117
            elif verbosity == 2:
118
                print >> sys.stderr, "Found no stale server IDs in DB."
119

  
120
        if options['detect_orphans']:
121
            orphans = reconciliation.orphan_instances_in_ganeti(D, G)
122
            if len(orphans) > 0:
123
                print >> sys.stderr, "Found orphan Ganeti instances with IDs: "
124
                print "    " + "\n    ".join(
125
                    [str(x) for x in orphans])
126
            elif verbosity == 2:
127
                print >> sys.stderr, "Found no orphan Ganeti instances."
128

  
129
        if options['detect_unsynced']:
130
            unsynced = reconciliation.unsynced_operstate(D, G)
131
            if len(unsynced) > 0:
132
                print >> sys.stderr, "The operstate of the following server" \
133
                                     " IDs is out-of-sync:"
134
                print "    " + "\n    ".join(
135
                    ["%d is %s in DB, %s in Ganeti" %
136
                     (x[0], x[1], ('UP' if x[2] else 'DOWN'))
137
                     for x in unsynced])
138
            elif verbosity == 2:
139
                print >> sys.stderr, "The operstate of all servers is in sync."
140

  
141
        #
142
        # Then fix them
143
        #
144
        if options['fix_stale'] and len(stale) > 0:
145
            print >> sys.stderr, \
146
                "Simulating successful Ganeti removal for %d " \
147
                "servers in the DB:" % len(stale)
148
            for vm in VirtualMachine.objects.filter(pk__in=stale):
149
                backend.process_op_status(vm=vm, jobid=-0,
150
                    opcode='OP_INSTANCE_REMOVE', status='success',
151
                    logmsg='Reconciliation: simulated Ganeti event')
152
            print >> sys.stderr, "    ...done"
153

  
154
        if options['fix_orphans'] and len(orphans) > 0:
155
            print >> sys.stderr, \
156
                "Issuing OP_INSTANCE_REMOVE for %d Ganeti instances:" % \
157
                len(orphans)
158
            for id in orphans:
159
                rapi = GanetiRapiClient(*settings.GANETI_CLUSTER_INFO)
160
                rapi.DeleteInstance('%s%s' %
161
                                    (settings.BACKEND_PREFIX_ID, str(id)))
162
            print >> sys.stderr, "    ...done"
163

  
164
        if options['fix_unsynced'] and len(unsynced) > 0:
165
            print >> sys.stderr, "Setting the state of %d out-of-sync VMs:" % \
166
                len(unsynced)
167
            for id, db_state, ganeti_up in unsynced:
168
                vm = VirtualMachine.objects.get(pk=id)
169
                opcode = "OP_INSTANCE_REBOOT" if ganeti_up \
170
                         else "OP_INSTANCE_SHUTDOWN"
171
                backend.process_op_status(vm=vm, jobid=-0,
172
                    opcode=opcode, status='success',
173
                    logmsg='Reconciliation: simulated Ganeti event')
174
            print >> sys.stderr, "    ...done"
b/logic/management/commands/reconcile_old.py
1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29
#
30
# Reconcile VM state - Management Script
31
from synnefo.db.models import VirtualMachine
32
from django.db.models import Q
33
from django.conf import settings
34
from datetime import datetime, timedelta
35
from optparse import make_option
36
from django.core.management.base import BaseCommand
37

  
38
from synnefo.logic import amqp_connection
39
from synnefo.logic.amqp_connection import AMQPError
40

  
41
import json
42
import sys
43

  
44
class Command(BaseCommand):
45
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
46
    help = 'Reconcile VM status with the backend'
47

  
48
    option_list = BaseCommand.option_list +  (
49
         make_option('--all', action='store_true', dest='all_vms', default=False,
50
                     help='Run the reconciliation function for all VMs, now'),
51
         make_option('--interval', action='store', dest='interval', default=1,
52
                     help='Interval in minutes between reconciliations'),
53
    )
54

  
55
    def handle(self, all_vms = False, interval = 1, **options):
56
        all =  VirtualMachine.objects.filter(Q(deleted = False) &
57
                                             Q(suspended = False))
58

  
59
        if not all_vms:
60
            now = datetime.now()
61
            last_update = timedelta(minutes = settings.RECONCILIATION_MIN)
62
            not_updated = VirtualMachine.objects.filter(Q(deleted = False) &
63
                                                        Q(suspended = False) &
64
                                                        Q(updated__lte = (now - last_update)))
65

  
66
            to_update = ((all.count() / settings.RECONCILIATION_MIN) * interval)
67
        else:
68
            to_update = all.count()
69
            not_updated = all
70

  
71
        vm_ids = map(lambda x: x.id, not_updated[:to_update])
72

  
73
        for vmid in vm_ids :
74
            msg = dict(type = "reconcile", vmid = vmid)
75
            try:
76
                amqp_connection.send(json.dumps(msg), settings.EXCHANGE_CRON,
77
                                 "reconciliation.%s.%s" % (self.prefix,vmid))
78
            except AMQPError as e:
79
                print >> sys.stderr, 'Error sending reconciliation request: %s' % e
80
                raise
81

  
82
        print "All: %d, To update: %d, Triggered update for: %s" % \
83
              (all.count(), not_updated.count(), vm_ids)
b/logic/reconciliation.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37
"""Business logic for reconciliation
38

  
39
Reconcile the contents of the DB with the actual state of the
40
Ganeti backend.
41

  
42
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
43
RULES:
44
    R1. Stale servers in DB:
45
            For any v in D but not in G:
46
            Set deleted=True.
47
    R2. Orphan instances in Ganet:
48
            For any v in G with deleted=True in D:
49
            Issue OP_INSTANCE_DESTROY.
50
    R3. Unsynced operstate:
51
            For any v whose operating state differs between G and V:
52
            Set the operating state in D based on the state in G.
53
In the code, D, G are Python dicts mapping instance ids to operating state.
54
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
55
For G, the operating state is True if the machine is up, False otherwise.
56

  
57
"""
58

  
59
import sys
60

  
61
from synnefo.logic import log
62
_logger = log.get_logger("reconciliation")
63

  
64
from django.core.management import setup_environ
65
try:
66
    from synnefo import settings
67
except ImportError:
68
    raise Exception("Cannot import settings, make sure PYTHONPATH contains "
69
                    "the parent directory of the Synnefo Django project.")
70
setup_environ(settings)
71

  
72
from synnefo.db.models import VirtualMachine
73
from synnefo.util.rapi import GanetiRapiClient
74

  
75

  
76
def stale_servers_in_db(D, G):
77
    idD = set(D.keys())
78
    idG = set(G.keys())
79

  
80
    return idD - idG
81

  
82

  
83
def orphan_instances_in_ganeti(D, G):
84
    idD = set(D.keys())
85
    idG = set(G.keys())
86

  
87
    return idG - idD
88

  
89

  
90
def unsynced_operstate(D, G):
91
    unsynced = set()
92
    idD = set(D.keys())
93
    idG = set(G.keys())
94

  
95
    for i in idD & idG:
96
        if (G[i] and D[i] != 'STARTED' or
97
            not G[i] and D[i] not in ('BUILD', 'ERROR', 'STOPPED')):
98
            unsynced.add((i, D[i], G[i]))
99

  
100
    return unsynced
101

  
102

  
103
def get_servers_from_db():
104
    vms = VirtualMachine.objects.filter(deleted=False)
105
    return dict(map(lambda x: (x.id, x.operstate), vms))
106

  
107

  
108
def get_instances_from_ganeti():
109
    rapi = GanetiRapiClient(*settings.GANETI_CLUSTER_INFO)
110
    ganeti_instances = rapi.GetInstances(bulk=True)
111
    snf_instances = {}
112

  
113
    prefix = settings.BACKEND_PREFIX_ID
114
    for i in ganeti_instances:
115
        if i['name'].startswith(prefix):
116
            try:
117
                id = int(i['name'].split(prefix)[1])
118
            except Exception:
119
                _logger.error("Ignoring instance with malformed name %s",
120
                              i['name'])
121
                continue
122

  
123
            if id in snf_instances:
124
                _logger.error("Ignoring instance with duplicate Synnefo id %s",
125
                    i['name'])
126
                continue
127

  
128
            snf_instances[id] = i['oper_state']
129

  
130
    return snf_instances
131

  
132

  
133
def main():
134
    print get_instances_from_ganeti()
135

  
136

  
137
if __name__ == "__main__":
138
    log.console_output(_logger)
139
    sys.exit(main())
b/logic/tests.py
41 41
from synnefo.logic import backend
42 42
from synnefo.logic import credits
43 43
from synnefo.logic import users
44
from synnefo.logic import reconciliation
44 45
from synnefo.logic.utils import get_rsapi_state
45 46

  
46 47

  
......
346 347
        #self.assertRaises(VirtualMachine.IllegalState,
347 348
        #                  backend.process_create_progress, vm, 1)
348 349

  
350

  
351
class ReconciliationTestCase(TestCase):
352
    SERVERS = 1000
353
    fixtures = ['db_test_data']
354

  
355
    def test_get_servers_from_db(self):
356
        """Test getting a dictionary from each server to its operstate"""
357
        reconciliation.get_servers_from_db()
358
        self.assertEquals(reconciliation.get_servers_from_db(),
359
                          {30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'})
360

  
361
    def test_stale_servers_in_db(self):
362
        """Test discovery of stale entries in DB"""
363

  
364
        D = {1: 'STARTED', 2: 'STOPPED', 3: 'STARTED', 4: 'BUILD', 5: 'BUILD'}
365
        G = {1: True, 3: True}
366
        self.assertEquals(reconciliation.stale_servers_in_db(D, G),
367
                          [2, 4, 5])
368

  
369
    def test_orphan_instances_in_ganeti(self):
370
        """Test discovery of orphan instances in Ganeti, without a DB entry"""
371

  
372
        G = {1: True, 2: False, 3: False, 4: True, 50: True}
373
        D = {1: True, 3: False}
374
        self.assertEquals(reconciliation.orphan_instances_in_ganeti(D, G),
375
                          [2, 4, 50])
376

  
377
    def test_unsynced_operstate(self):
378
        """Test discovery of unsynced operstate between the DB and Ganeti"""
379

  
380
        G = {1: True, 2: False, 3: True, 4: False, 50: True}
381
        D = {1: 'STARTED', 2: 'STARTED', 3: 'BUILD', 4: 'STARTED', 50: 'BUILD'}
382
        self.assertEquals(reconciliation.unsynced_operstate(D, G),
383
                          set((2, 'STARTED', False),
384
                           (3, 'BUILD', True), (4, 'STARTED', False),
385
                           (50, 'BUILD', True)))

Also available in: Unified diff