Revision 63f9eb8e snf-cyclades-app/synnefo/logic/reconciliation.py
b/snf-cyclades-app/synnefo/logic/reconciliation.py | ||
---|---|---|
68 | 68 |
import logging |
69 | 69 |
import itertools |
70 | 70 |
import bitarray |
71 |
from datetime import datetime, timedelta
|
|
71 |
from datetime import datetime |
|
72 | 72 |
|
73 | 73 |
from django.db import transaction |
74 | 74 |
from synnefo.db.models import (Backend, VirtualMachine, Flavor, |
... | ... | |
76 | 76 |
BackendNetwork) |
77 | 77 |
from synnefo.db.pools import IPPool |
78 | 78 |
from synnefo.logic import utils, backend as backend_mod |
79 |
from synnefo.logic.rapi import GanetiApiError |
|
80 | 79 |
|
81 | 80 |
logger = logging.getLogger() |
82 | 81 |
logging.basicConfig() |
... | ... | |
86 | 85 |
except AttributeError: |
87 | 86 |
CHECK_INTERVAL = 60 |
88 | 87 |
|
88 |
GANETI_JOB_ERROR = "error" |
|
89 |
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"] |
|
90 |
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"] |
|
91 |
|
|
89 | 92 |
|
90 | 93 |
class BackendReconciler(object): |
91 | 94 |
def __init__(self, backend, logger, options=None): |
... | ... | |
114 | 117 |
self.gnt_servers_keys = set(self.gnt_servers.keys()) |
115 | 118 |
log.debug("Got servers info from Ganeti backend.") |
116 | 119 |
|
120 |
self.gnt_jobs = get_ganeti_jobs(backend) |
|
121 |
log.debug("Got jobs from Ganeti backend") |
|
122 |
|
|
117 | 123 |
self.event_time = datetime.now() |
118 | 124 |
|
119 | 125 |
self.stale_servers = self.reconcile_stale_servers() |
... | ... | |
122 | 128 |
self.close() |
123 | 129 |
|
124 | 130 |
def get_build_status(self, db_server): |
125 |
job = db_server.backendjobid |
|
126 |
if job is None: |
|
127 |
created = db_server.created |
|
128 |
# Job has not yet been enqueued. |
|
129 |
if self.event_time < created + timedelta(seconds=60): |
|
131 |
job_id = db_server.backendjobid |
|
132 |
if job_id in self.gnt_jobs: |
|
133 |
gnt_job_status = self.gnt_jobs[job_id]["status"] |
|
134 |
if gnt_job_status == GANETI_JOB_ERROR: |
|
135 |
return "ERROR" |
|
136 |
elif gnt_job_status not in GANETI_JOBS_FINALIZED: |
|
130 | 137 |
return "RUNNING" |
131 | 138 |
else: |
132 |
return "ERROR"
|
|
139 |
return "FINALIZED"
|
|
133 | 140 |
else: |
134 |
updated = db_server.backendtime |
|
135 |
if self.event_time >= updated + timedelta(seconds=60): |
|
136 |
try: |
|
137 |
job_info = self.client.GetJobStatus(job_id=job) |
|
138 |
finalized = ["success", "error", "cancelled"] |
|
139 |
if job_info["status"] == "error": |
|
140 |
return "ERROR" |
|
141 |
elif job_info["status"] not in finalized: |
|
142 |
return "RUNNING" |
|
143 |
else: |
|
144 |
return "FINALIZED" |
|
145 |
except GanetiApiError: |
|
146 |
return "ERROR" |
|
147 |
else: |
|
148 |
self.log.debug("Pending build for server '%s'", db_server.id) |
|
149 |
return "RUNNING" |
|
141 |
return "ERROR" |
|
150 | 142 |
|
151 | 143 |
def reconcile_stale_servers(self): |
152 | 144 |
# Detect stale servers |
... | ... | |
218 | 210 |
gnt_server) |
219 | 211 |
self.reconcile_unsynced_nics(server_id, db_server, gnt_server) |
220 | 212 |
self.reconcile_unsynced_disks(server_id, db_server, gnt_server) |
213 |
if db_server.task is not None: |
|
214 |
self.reconcile_pending_task(server_id, db_server) |
|
221 | 215 |
|
222 | 216 |
def reconcile_building_server(self, db_server): |
223 | 217 |
self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.", |
... | ... | |
309 | 303 |
def reconcile_unsynced_disks(self, server_id, db_server, gnt_server): |
310 | 304 |
pass |
311 | 305 |
|
306 |
def reconcile_pending_task(self, server_id, db_server): |
|
307 |
job_id = db_server.task_job_id |
|
308 |
pending_task = False |
|
309 |
if job_id not in self.gnt_jobs: |
|
310 |
pending_task = True |
|
311 |
else: |
|
312 |
gnt_job_status = self.gnt_job[job_id]["status"] |
|
313 |
if gnt_job_status in GANETI_JOBS_FINALIZED: |
|
314 |
pending_task = True |
|
315 |
|
|
316 |
if pending_task: |
|
317 |
self.log.info("Found server '%s' with pending task: '%s'", |
|
318 |
server_id, db_server.task) |
|
319 |
if self.options["fixed_pending_tasks"]: |
|
320 |
db_server.task = None |
|
321 |
db_server.task_job_id = None |
|
322 |
db_server.save() |
|
323 |
self.log.info("Cleared pending task for server '%s", server_id) |
|
324 |
|
|
312 | 325 |
|
313 | 326 |
def format_db_nic(nic): |
314 | 327 |
return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index, |
... | ... | |
435 | 448 |
return nics |
436 | 449 |
|
437 | 450 |
|
451 |
def get_ganeti_jobs(backend): |
|
452 |
gnt_jobs = backend_mod.get_jobs(backend) |
|
453 |
return dict([(int(j["id"]), j) for j in gnt_jobs]) |
|
454 |
|
|
455 |
|
|
438 | 456 |
def disks_from_instance(i): |
439 | 457 |
return dict([(index, {"size": size}) |
440 | 458 |
for index, size in enumerate(i["disk.sizes"])]) |
Also available in: Unified diff