Revision 63f9eb8e

b/snf-cyclades-app/synnefo/logic/management/commands/reconcile-servers.py
71 71
        make_option('--fix-unsynced-flavors', action='store_true',
72 72
                    dest='fix_unsynced_flavors', default=False,
73 73
                    help='Fix unsynced flavors between DB and Ganeti'),
74
        make_option('--fix-pending-tasks', action='store_true',
75
                    dest='fix_pending_tasks', default=False,
76
                    help='Fix servers with stale pending tasks.'),
74 77
        make_option('--fix-all', action='store_true', dest='fix_all',
75 78
                    default=False, help='Enable all --fix-* arguments'),
76 79
    )
b/snf-cyclades-app/synnefo/logic/reconciliation.py
68 68
import logging
69 69
import itertools
70 70
import bitarray
71
from datetime import datetime, timedelta
71
from datetime import datetime
72 72

  
73 73
from django.db import transaction
74 74
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
......
76 76
                               BackendNetwork)
77 77
from synnefo.db.pools import IPPool
78 78
from synnefo.logic import utils, backend as backend_mod
79
from synnefo.logic.rapi import GanetiApiError
80 79

  
81 80
logger = logging.getLogger()
82 81
logging.basicConfig()
......
86 85
except AttributeError:
87 86
    CHECK_INTERVAL = 60
88 87

  
88
GANETI_JOB_ERROR = "error"
89
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"]
90
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"]
91

  
89 92

  
90 93
class BackendReconciler(object):
91 94
    def __init__(self, backend, logger, options=None):
......
114 117
        self.gnt_servers_keys = set(self.gnt_servers.keys())
115 118
        log.debug("Got servers info from Ganeti backend.")
116 119

  
120
        self.gnt_jobs = get_ganeti_jobs(backend)
121
        log.debug("Got jobs from Ganeti backend")
122

  
117 123
        self.event_time = datetime.now()
118 124

  
119 125
        self.stale_servers = self.reconcile_stale_servers()
......
122 128
        self.close()
123 129

  
124 130
    def get_build_status(self, db_server):
125
        job = db_server.backendjobid
126
        if job is None:
127
            created = db_server.created
128
            # Job has not yet been enqueued.
129
            if self.event_time < created + timedelta(seconds=60):
131
        job_id = db_server.backendjobid
132
        if job_id in self.gnt_jobs:
133
            gnt_job_status = self.gnt_jobs[job_id]["status"]
134
            if gnt_job_status == GANETI_JOB_ERROR:
135
                return "ERROR"
136
            elif gnt_job_status not in GANETI_JOBS_FINALIZED:
130 137
                return "RUNNING"
131 138
            else:
132
                return "ERROR"
139
                return "FINALIZED"
133 140
        else:
134
            updated = db_server.backendtime
135
            if self.event_time >= updated + timedelta(seconds=60):
136
                try:
137
                    job_info = self.client.GetJobStatus(job_id=job)
138
                    finalized = ["success", "error", "cancelled"]
139
                    if job_info["status"] == "error":
140
                        return "ERROR"
141
                    elif job_info["status"] not in finalized:
142
                        return "RUNNING"
143
                    else:
144
                        return "FINALIZED"
145
                except GanetiApiError:
146
                    return "ERROR"
147
            else:
148
                self.log.debug("Pending build for server '%s'", db_server.id)
149
                return "RUNNING"
141
            return "ERROR"
150 142

  
151 143
    def reconcile_stale_servers(self):
152 144
        # Detect stale servers
......
218 210
                                           gnt_server)
219 211
            self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
220 212
            self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
213
            if db_server.task is not None:
214
                self.reconcile_pending_task(server_id, db_server)
221 215

  
222 216
    def reconcile_building_server(self, db_server):
223 217
        self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
......
309 303
    def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
310 304
        pass
311 305

  
306
    def reconcile_pending_task(self, server_id, db_server):
307
        job_id = db_server.task_job_id
308
        pending_task = False
309
        if job_id not in self.gnt_jobs:
310
            pending_task = True
311
        else:
312
            gnt_job_status = self.gnt_job[job_id]["status"]
313
            if gnt_job_status in GANETI_JOBS_FINALIZED:
314
                pending_task = True
315

  
316
        if pending_task:
317
            self.log.info("Found server '%s' with pending task: '%s'",
318
                          server_id, db_server.task)
319
            if self.options["fixed_pending_tasks"]:
320
                db_server.task = None
321
                db_server.task_job_id = None
322
                db_server.save()
323
                self.log.info("Cleared pending task for server '%s", server_id)
324

  
312 325

  
313 326
def format_db_nic(nic):
314 327
    return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
......
435 448
    return nics
436 449

  
437 450

  
451
def get_ganeti_jobs(backend):
452
    gnt_jobs = backend_mod.get_jobs(backend)
453
    return dict([(int(j["id"]), j) for j in gnt_jobs])
454

  
455

  
438 456
def disks_from_instance(i):
439 457
    return dict([(index, {"size": size})
440 458
                 for index, size in enumerate(i["disk.sizes"])])
b/snf-cyclades-app/synnefo/logic/tests/reconciliation.py
33 33
from synnefo.db.models import VirtualMachine, Network, BackendNetwork
34 34
from synnefo.db import models_factory as mfactory
35 35
from synnefo.logic import reconciliation
36
from datetime import timedelta
37 36
from mock import patch
38 37
from snf_django.utils.testing import mocked_quotaholder
39 38
from time import time
......
56 55
                                                           logger=log)
57 56

  
58 57
    def test_building_vm(self, mrapi):
59
        mrapi = self.reconciler.client
60
        vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
61
                                             backendjobid=None,
62
                                             operstate="BUILD")
63
        self.reconciler.reconcile()
64
        # Assert not deleted
65
        vm1 = VirtualMachine.objects.get(id=vm1.id)
66
        self.assertFalse(vm1.deleted)
67
        self.assertEqual(vm1.operstate, "BUILD")
68

  
69
        vm1.created = vm1.created - timedelta(seconds=120)
70
        vm1.save()
71
        with mocked_quotaholder():
72
            self.reconciler.reconcile()
73
        vm1 = VirtualMachine.objects.get(id=vm1.id)
74
        self.assertEqual(vm1.operstate, "ERROR")
75

  
76 58
        vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
77 59
                                             backendjobid=1,
78
                                             deleted=False,
79 60
                                             operstate="BUILD")
80
        vm1.backendtime = vm1.created - timedelta(seconds=120)
81
        vm1.backendjobid = 10
82
        vm1.save()
83 61
        for status in ["queued", "waiting", "running"]:
84
            mrapi.GetJobStatus.return_value = {"status": status}
62
            mrapi().GetJobs.return_value = [{"id": "1", "status": status}]
85 63
            with mocked_quotaholder():
86 64
                self.reconciler.reconcile()
87 65
            vm1 = VirtualMachine.objects.get(id=vm1.id)
88 66
            self.assertFalse(vm1.deleted)
89 67
            self.assertEqual(vm1.operstate, "BUILD")
90 68

  
91
        mrapi.GetJobStatus.return_value = {"status": "error"}
69
        mrapi().GetJobs.return_value = [{"id": "1", "status": "error"}]
92 70
        with mocked_quotaholder():
93 71
            self.reconciler.reconcile()
94 72
        vm1 = VirtualMachine.objects.get(id=vm1.id)
95 73
        self.assertFalse(vm1.deleted)
96 74
        self.assertEqual(vm1.operstate, "ERROR")
97 75

  
98
        for status in ["success", "cancelled"]:
76
        for status in ["success", "canceled"]:
77
            vm1.operstate = "BUILD"
99 78
            vm1.deleted = False
100 79
            vm1.save()
101
            mrapi.GetJobStatus.return_value = {"status": status}
80
            mrapi().GetJobs.return_value = [{"id": "1", "status": status}]
102 81
            with mocked_quotaholder():
103 82
                self.reconciler.reconcile()
104 83
            vm1 = VirtualMachine.objects.get(id=vm1.id)
105
            self.assertTrue(vm1.deleted)
106
            self.assertEqual(vm1.operstate, "DESTROYED")
107

  
108
        vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
109
                                             backendjobid=1,
110
                                             operstate="BUILD")
111
        vm1.backendtime = vm1.created - timedelta(seconds=120)
112
        vm1.backendjobid = 10
113
        vm1.save()
114
        cmrapi = self.reconciler.client
115
        cmrapi.GetInstances.return_value = \
116
            [{"name": vm1.backend_vm_id,
117
             "beparams": {"maxmem": 1024,
118
                          "minmem": 1024,
119
                          "vcpus": 4},
120
             "oper_state": False,
121
             "mtime": time(),
122
             "disk.sizes": [],
123
             "nic.ips": [],
124
             "nic.macs": [],
125
             "nic.networks": [],
126
             "tags": []}]
127
        mrapi.GetJobStatus.return_value = {"status": "running"}
128
        with mocked_quotaholder():
129
            self.reconciler.reconcile()
130
        vm1 = VirtualMachine.objects.get(id=vm1.id)
131
        self.assertEqual(vm1.operstate, "BUILD")
132
        mrapi.GetJobStatus.return_value = {"status": "error"}
133
        with mocked_quotaholder():
134
            self.reconciler.reconcile()
135
        vm1 = VirtualMachine.objects.get(id=vm1.id)
136
        self.assertEqual(vm1.operstate, "ERROR")
84
            self.assertFalse(vm1.deleted)
85
            self.assertEqual(vm1.operstate, "ERROR")
137 86

  
138 87
    def test_stale_server(self, mrapi):
139 88
        mrapi.GetInstances = []

Also available in: Unified diff