Revision 16e0b9c9 lib/watcher/__init__.py
b/lib/watcher/__init__.py | ||
---|---|---|
32 | 32 |
import sys |
33 | 33 |
import time |
34 | 34 |
import logging |
35 |
import operator |
|
35 | 36 |
from optparse import OptionParser |
36 | 37 |
|
37 | 38 |
from ganeti import utils |
... | ... | |
43 | 44 |
from ganeti import luxi |
44 | 45 |
from ganeti import rapi |
45 | 46 |
from ganeti import netutils |
47 |
from ganeti import qlang |
|
48 |
from ganeti import objects |
|
49 |
from ganeti import ssconf |
|
50 |
from ganeti import ht |
|
46 | 51 |
|
47 | 52 |
import ganeti.rapi.client # pylint: disable-msg=W0611 |
48 | 53 |
|
... | ... | |
51 | 56 |
|
52 | 57 |
|
53 | 58 |
MAXTRIES = 5 |
54 |
|
|
55 |
|
|
56 |
# Global LUXI client object |
|
57 |
client = None |
|
58 | 59 |
BAD_STATES = frozenset([ |
59 | 60 |
constants.INSTST_ERRORDOWN, |
60 | 61 |
]) |
... | ... | |
65 | 66 |
NOTICE = "NOTICE" |
66 | 67 |
ERROR = "ERROR" |
67 | 68 |
|
69 |
#: Number of seconds to wait between starting child processes for node groups |
|
70 |
CHILD_PROCESS_DELAY = 1.0 |
|
71 |
|
|
68 | 72 |
|
69 | 73 |
class NotMasterError(errors.GenericError): |
70 | 74 |
"""Exception raised when this host is not the master.""" |
... | ... | |
129 | 133 |
self.autostart = autostart |
130 | 134 |
self.snodes = snodes |
131 | 135 |
|
132 |
def Restart(self): |
|
136 |
def Restart(self, cl):
|
|
133 | 137 |
"""Encapsulates the start of an instance. |
134 | 138 |
|
135 | 139 |
""" |
136 | 140 |
op = opcodes.OpInstanceStartup(instance_name=self.name, force=False) |
137 |
cli.SubmitOpCode(op, cl=client)
|
|
141 |
cli.SubmitOpCode(op, cl=cl) |
|
138 | 142 |
|
139 |
def ActivateDisks(self): |
|
143 |
def ActivateDisks(self, cl):
|
|
140 | 144 |
"""Encapsulates the activation of all disks of an instance. |
141 | 145 |
|
142 | 146 |
""" |
143 | 147 |
op = opcodes.OpInstanceActivateDisks(instance_name=self.name) |
144 |
cli.SubmitOpCode(op, cl=client)
|
|
148 |
cli.SubmitOpCode(op, cl=cl) |
|
145 | 149 |
|
146 | 150 |
|
147 |
def GetClusterData():
|
|
148 |
"""Get a list of instances on this cluster.
|
|
151 |
class Node:
|
|
152 |
"""Data container representing cluster node.
|
|
149 | 153 |
|
150 | 154 |
""" |
151 |
op1_fields = ["name", "status", "admin_state", "snodes"] |
|
152 |
op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[], |
|
153 |
use_locking=True) |
|
154 |
op2_fields = ["name", "bootid", "offline"] |
|
155 |
op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[], |
|
156 |
use_locking=True) |
|
157 |
|
|
158 |
job_id = client.SubmitJob([op1, op2]) |
|
155 |
def __init__(self, name, bootid, offline, secondaries): |
|
156 |
"""Initializes this class. |
|
159 | 157 |
|
160 |
all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) |
|
158 |
""" |
|
159 |
self.name = name |
|
160 |
self.bootid = bootid |
|
161 |
self.offline = offline |
|
162 |
self.secondaries = secondaries |
|
161 | 163 |
|
162 |
logging.debug("Got data from cluster, writing instance status file") |
|
163 | 164 |
|
164 |
result = all_results[0]
|
|
165 |
smap = {}
|
|
165 |
def _CheckInstances(cl, notepad, instances):
|
|
166 |
"""Make a pass over the list of instances, restarting downed ones.
|
|
166 | 167 |
|
167 |
instances = {} |
|
168 |
""" |
|
169 |
notepad.MaintainInstanceList(instances.keys()) |
|
168 | 170 |
|
169 |
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
|
|
171 |
started = set()
|
|
170 | 172 |
|
171 |
for fields in result: |
|
172 |
(name, status, autostart, snodes) = fields |
|
173 |
for inst in instances.values(): |
|
174 |
if inst.status in BAD_STATES: |
|
175 |
n = notepad.NumberOfRestartAttempts(inst.name) |
|
173 | 176 |
|
174 |
# update the secondary node map |
|
175 |
for node in snodes: |
|
176 |
if node not in smap: |
|
177 |
smap[node] = [] |
|
178 |
smap[node].append(name) |
|
177 |
if n > MAXTRIES: |
|
178 |
logging.warning("Not restarting instance '%s', retries exhausted", |
|
179 |
inst.name) |
|
180 |
continue |
|
179 | 181 |
|
180 |
instances[name] = Instance(name, status, autostart, snodes) |
|
182 |
if n == MAXTRIES: |
|
183 |
notepad.RecordRestartAttempt(inst.name) |
|
184 |
logging.error("Could not restart instance '%s' after %s attempts," |
|
185 |
" giving up", inst.name, MAXTRIES) |
|
186 |
continue |
|
181 | 187 |
|
182 |
nodes = dict([(name, (bootid, offline)) |
|
183 |
for name, bootid, offline in all_results[1]]) |
|
188 |
try: |
|
189 |
logging.info("Restarting instance '%s' (attempt #%s)", |
|
190 |
inst.name, n + 1) |
|
191 |
inst.Restart(cl) |
|
192 |
except Exception: # pylint: disable-msg=W0703 |
|
193 |
logging.exception("Error while restarting instance '%s'", inst.name) |
|
194 |
else: |
|
195 |
started.add(inst.name) |
|
184 | 196 |
|
185 |
client.ArchiveJob(job_id)
|
|
197 |
notepad.RecordRestartAttempt(inst.name)
|
|
186 | 198 |
|
187 |
return instances, nodes, smap |
|
199 |
else: |
|
200 |
if notepad.NumberOfRestartAttempts(inst.name): |
|
201 |
notepad.RemoveInstance(inst.name) |
|
202 |
if inst.status not in HELPLESS_STATES: |
|
203 |
logging.info("Restart of instance '%s' succeeded", inst.name) |
|
188 | 204 |
|
205 |
return started |
|
189 | 206 |
|
190 |
class Watcher(object): |
|
191 |
"""Encapsulate the logic for restarting erroneously halted virtual machines. |
|
192 | 207 |
|
193 |
The calling program should periodically instantiate me and call Run(). |
|
194 |
This will traverse the list of instances, and make up to MAXTRIES attempts |
|
195 |
to restart machines that are down. |
|
208 |
def _CheckDisks(cl, notepad, nodes, instances, started): |
|
209 |
"""Check all nodes for restarted ones. |
|
196 | 210 |
|
197 | 211 |
""" |
198 |
def __init__(self, opts, notepad): |
|
199 |
self.notepad = notepad |
|
200 |
master = client.QueryConfigValues(["master_node"])[0] |
|
201 |
if master != netutils.Hostname.GetSysName(): |
|
202 |
raise NotMasterError("This is not the master node") |
|
203 |
# first archive old jobs |
|
204 |
self.ArchiveJobs(opts.job_age) |
|
205 |
# and only then submit new ones |
|
206 |
self.instances, self.bootids, self.smap = GetClusterData() |
|
207 |
self.started_instances = set() |
|
208 |
self.opts = opts |
|
209 |
|
|
210 |
def Run(self): |
|
211 |
"""Watcher run sequence. |
|
212 |
|
|
213 |
""" |
|
214 |
notepad = self.notepad |
|
215 |
self.CheckInstances(notepad) |
|
216 |
self.CheckDisks(notepad) |
|
217 |
self.VerifyDisks() |
|
218 |
|
|
219 |
@staticmethod |
|
220 |
def ArchiveJobs(age): |
|
221 |
"""Archive old jobs. |
|
222 |
|
|
223 |
""" |
|
224 |
arch_count, left_count = client.AutoArchiveJobs(age) |
|
225 |
logging.debug("Archived %s jobs, left %s", arch_count, left_count) |
|
226 |
|
|
227 |
def CheckDisks(self, notepad): |
|
228 |
"""Check all nodes for restarted ones. |
|
229 |
|
|
230 |
""" |
|
231 |
check_nodes = [] |
|
232 |
for name, (new_id, offline) in self.bootids.iteritems(): |
|
233 |
old = notepad.GetNodeBootID(name) |
|
234 |
if new_id is None: |
|
235 |
# Bad node, not returning a boot id |
|
236 |
if not offline: |
|
237 |
logging.debug("Node %s missing boot id, skipping secondary checks", |
|
238 |
name) |
|
239 |
continue |
|
240 |
if old != new_id: |
|
241 |
# Node's boot ID has changed, proably through a reboot. |
|
242 |
check_nodes.append(name) |
|
243 |
|
|
244 |
if check_nodes: |
|
245 |
# Activate disks for all instances with any of the checked nodes as a |
|
246 |
# secondary node. |
|
247 |
for node in check_nodes: |
|
248 |
if node not in self.smap: |
|
212 |
check_nodes = [] |
|
213 |
|
|
214 |
for node in nodes.values(): |
|
215 |
old = notepad.GetNodeBootID(node.name) |
|
216 |
if not node.bootid: |
|
217 |
# Bad node, not returning a boot id |
|
218 |
if not node.offline: |
|
219 |
logging.debug("Node '%s' missing boot ID, skipping secondary checks", |
|
220 |
node.name) |
|
221 |
continue |
|
222 |
|
|
223 |
if old != node.bootid: |
|
224 |
# Node's boot ID has changed, probably through a reboot |
|
225 |
check_nodes.append(node) |
|
226 |
|
|
227 |
if check_nodes: |
|
228 |
# Activate disks for all instances with any of the checked nodes as a |
|
229 |
# secondary node. |
|
230 |
for node in check_nodes: |
|
231 |
for instance_name in node.secondaries: |
|
232 |
try: |
|
233 |
inst = instances[instance_name] |
|
234 |
except KeyError: |
|
235 |
logging.info("Can't find instance '%s', maybe it was ignored", |
|
236 |
instance_name) |
|
249 | 237 |
continue |
250 |
for instance_name in self.smap[node]: |
|
251 |
instance = self.instances[instance_name] |
|
252 |
if not instance.autostart: |
|
253 |
logging.info(("Skipping disk activation for non-autostart" |
|
254 |
" instance %s"), instance.name) |
|
255 |
continue |
|
256 |
if instance.name in self.started_instances: |
|
257 |
# we already tried to start the instance, which should have |
|
258 |
# activated its drives (if they can be at all) |
|
259 |
logging.debug("Skipping disk activation for instance %s, as" |
|
260 |
" it was already started", instance.name) |
|
261 |
continue |
|
262 |
try: |
|
263 |
logging.info("Activating disks for instance %s", instance.name) |
|
264 |
instance.ActivateDisks() |
|
265 |
except Exception: # pylint: disable-msg=W0703 |
|
266 |
logging.exception("Error while activating disks for instance %s", |
|
267 |
instance.name) |
|
268 |
|
|
269 |
# Keep changed boot IDs |
|
270 |
for name in check_nodes: |
|
271 |
notepad.SetNodeBootID(name, self.bootids[name][0]) |
|
272 |
|
|
273 |
def CheckInstances(self, notepad): |
|
274 |
"""Make a pass over the list of instances, restarting downed ones. |
|
275 |
|
|
276 |
""" |
|
277 |
notepad.MaintainInstanceList(self.instances.keys()) |
|
278 |
|
|
279 |
for instance in self.instances.values(): |
|
280 |
if instance.status in BAD_STATES: |
|
281 |
n = notepad.NumberOfRestartAttempts(instance.name) |
|
282 | 238 |
|
283 |
if n > MAXTRIES:
|
|
284 |
logging.warning("Not restarting instance %s, retries exhausted",
|
|
285 |
instance.name)
|
|
239 |
if not inst.autostart:
|
|
240 |
logging.info("Skipping disk activation for non-autostart"
|
|
241 |
" instance '%s'", inst.name)
|
|
286 | 242 |
continue |
287 |
elif n < MAXTRIES: |
|
288 |
last = " (Attempt #%d)" % (n + 1)
|
|
289 |
else:
|
|
290 |
notepad.RecordRestartAttempt(instance.name)
|
|
291 |
logging.error("Could not restart %s after %d attempts, giving up",
|
|
292 |
instance.name, MAXTRIES)
|
|
243 |
|
|
244 |
if inst.name in started:
|
|
245 |
# we already tried to start the instance, which should have
|
|
246 |
# activated its drives (if they can be at all)
|
|
247 |
logging.debug("Skipping disk activation for instance '%s' as"
|
|
248 |
" it was already started", inst.name)
|
|
293 | 249 |
continue |
250 |
|
|
294 | 251 |
try: |
295 |
logging.info("Restarting %s%s", instance.name, last) |
|
296 |
instance.Restart() |
|
297 |
self.started_instances.add(instance.name) |
|
252 |
logging.info("Activating disks for instance '%s'", inst.name) |
|
253 |
inst.ActivateDisks(cl) |
|
298 | 254 |
except Exception: # pylint: disable-msg=W0703 |
299 |
logging.exception("Error while restarting instance %s",
|
|
300 |
instance.name)
|
|
255 |
logging.exception("Error while activating disks for instance '%s'",
|
|
256 |
inst.name) |
|
301 | 257 |
|
302 |
notepad.RecordRestartAttempt(instance.name) |
|
303 |
elif instance.status in HELPLESS_STATES: |
|
304 |
if notepad.NumberOfRestartAttempts(instance.name): |
|
305 |
notepad.RemoveInstance(instance.name) |
|
306 |
else: |
|
307 |
if notepad.NumberOfRestartAttempts(instance.name): |
|
308 |
notepad.RemoveInstance(instance.name) |
|
309 |
logging.info("Restart of %s succeeded", instance.name) |
|
258 |
# Keep changed boot IDs |
|
259 |
for node in check_nodes: |
|
260 |
notepad.SetNodeBootID(node.name, node.bootid) |
|
310 | 261 |
|
311 |
def _CheckForOfflineNodes(self, instance): |
|
312 |
"""Checks if given instances has any secondary in offline status. |
|
313 | 262 |
|
314 |
@param instance: The instance object |
|
315 |
@return: True if any of the secondary is offline, False otherwise |
|
316 |
|
|
317 |
""" |
|
318 |
bootids = [] |
|
319 |
for node in instance.snodes: |
|
320 |
bootids.append(self.bootids[node]) |
|
321 |
|
|
322 |
return compat.any(offline for (_, offline) in bootids) |
|
323 |
|
|
324 |
def VerifyDisks(self): |
|
325 |
"""Run gnt-cluster verify-disks. |
|
326 |
|
|
327 |
""" |
|
328 |
job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()]) |
|
329 |
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] |
|
330 |
client.ArchiveJob(job_id) |
|
263 |
def _CheckForOfflineNodes(nodes, instance): |
|
264 |
"""Checks if given instances has any secondary in offline status. |
|
331 | 265 |
|
332 |
# Keep track of submitted jobs
|
|
333 |
jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
|
|
266 |
@param instance: The instance object
|
|
267 |
@return: True if any of the secondary is offline, False otherwise
|
|
334 | 268 |
|
335 |
archive_jobs = set() |
|
336 |
for (status, job_id) in result[constants.JOB_IDS_KEY]: |
|
337 |
jex.AddJobId(None, status, job_id) |
|
338 |
if status: |
|
339 |
archive_jobs.add(job_id) |
|
269 |
""" |
|
270 |
return compat.any(nodes[node_name].offline for node_name in instance.snodes) |
|
340 | 271 |
|
341 |
offline_disk_instances = set() |
|
342 | 272 |
|
343 |
for (status, result) in jex.GetResults(): |
|
344 |
if not status: |
|
345 |
logging.error("Verify-disks job failed: %s", result) |
|
346 |
continue |
|
273 |
def _VerifyDisks(cl, uuid, nodes, instances): |
|
274 |
"""Run a per-group "gnt-cluster verify-disks". |
|
347 | 275 |
|
348 |
((_, instances, _), ) = result |
|
276 |
""" |
|
277 |
job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)]) |
|
278 |
((_, offline_disk_instances, _), ) = \ |
|
279 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug) |
|
280 |
cl.ArchiveJob(job_id) |
|
349 | 281 |
|
350 |
offline_disk_instances.update(instances) |
|
282 |
if not offline_disk_instances: |
|
283 |
# nothing to do |
|
284 |
logging.debug("Verify-disks reported no offline disks, nothing to do") |
|
285 |
return |
|
351 | 286 |
|
352 |
for job_id in archive_jobs:
|
|
353 |
client.ArchiveJob(job_id)
|
|
287 |
logging.debug("Will activate disks for instance(s) %s",
|
|
288 |
utils.CommaJoin(offline_disk_instances))
|
|
354 | 289 |
|
355 |
if not offline_disk_instances: |
|
356 |
# nothing to do |
|
357 |
logging.debug("verify-disks reported no offline disks, nothing to do") |
|
358 |
return |
|
290 |
# We submit only one job, and wait for it. Not optimal, but this puts less |
|
291 |
# load on the job queue. |
|
292 |
job = [] |
|
293 |
for name in offline_disk_instances: |
|
294 |
try: |
|
295 |
inst = instances[name] |
|
296 |
except KeyError: |
|
297 |
logging.info("Can't find instance '%s', maybe it was ignored", name) |
|
298 |
continue |
|
359 | 299 |
|
360 |
logging.debug("Will activate disks for instance(s) %s", |
|
361 |
utils.CommaJoin(offline_disk_instances)) |
|
300 |
if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst): |
|
301 |
logging.info("Skipping instance '%s' because it is in a helpless state or" |
|
302 |
" has offline secondaries", name) |
|
303 |
continue |
|
362 | 304 |
|
363 |
# we submit only one job, and wait for it. not optimal, but spams |
|
364 |
# less the job queue |
|
365 |
job = [] |
|
366 |
for name in offline_disk_instances: |
|
367 |
instance = self.instances[name] |
|
368 |
if (instance.status in HELPLESS_STATES or |
|
369 |
self._CheckForOfflineNodes(instance)): |
|
370 |
logging.info("Skip instance %s because it is in helpless state or has" |
|
371 |
" one offline secondary", name) |
|
372 |
continue |
|
373 |
job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) |
|
305 |
job.append(opcodes.OpInstanceActivateDisks(instance_name=name)) |
|
374 | 306 |
|
375 |
if job:
|
|
376 |
job_id = cli.SendJob(job, cl=client)
|
|
307 |
if job: |
|
308 |
job_id = cli.SendJob(job, cl=cl)
|
|
377 | 309 |
|
378 |
try:
|
|
379 |
cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
|
|
380 |
except Exception: # pylint: disable-msg=W0703
|
|
381 |
logging.exception("Error while activating disks")
|
|
310 |
try: |
|
311 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
|
|
312 |
except Exception: # pylint: disable-msg=W0703 |
|
313 |
logging.exception("Error while activating disks") |
|
382 | 314 |
|
383 | 315 |
|
384 | 316 |
def IsRapiResponding(hostname): |
... | ... | |
421 | 353 |
constants.RELEASE_VERSION) |
422 | 354 |
|
423 | 355 |
parser.add_option(cli.DEBUG_OPT) |
356 |
parser.add_option(cli.NODEGROUP_OPT) |
|
424 | 357 |
parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600, |
425 | 358 |
help="Autoarchive jobs older than this age (default" |
426 | 359 |
" 6 hours)") |
427 | 360 |
parser.add_option("--ignore-pause", dest="ignore_pause", default=False, |
428 | 361 |
action="store_true", help="Ignore cluster pause setting") |
362 |
parser.add_option("--wait-children", dest="wait_children", default=False, |
|
363 |
action="store_true", help="Wait for child processes") |
|
429 | 364 |
options, args = parser.parse_args() |
430 | 365 |
options.job_age = cli.ParseTimespec(options.job_age) |
431 | 366 |
|
... | ... | |
454 | 389 |
for (name, status) in result)) |
455 | 390 |
|
456 | 391 |
|
392 |
def GetLuxiClient(try_restart): |
|
393 |
"""Tries to connect to the master daemon. |
|
394 |
|
|
395 |
@type try_restart: bool |
|
396 |
@param try_restart: Whether to attempt to restart the master daemon |
|
397 |
|
|
398 |
""" |
|
399 |
try: |
|
400 |
return cli.GetClient() |
|
401 |
except errors.OpPrereqError, err: |
|
402 |
# this is, from cli.GetClient, a not-master case |
|
403 |
raise NotMasterError("Not on master node (%s)" % err) |
|
404 |
|
|
405 |
except luxi.NoMasterError, err: |
|
406 |
if not try_restart: |
|
407 |
raise |
|
408 |
|
|
409 |
logging.warning("Master daemon seems to be down (%s), trying to restart", |
|
410 |
err) |
|
411 |
|
|
412 |
if not utils.EnsureDaemon(constants.MASTERD): |
|
413 |
raise errors.GenericError("Can't start the master daemon") |
|
414 |
|
|
415 |
# Retry the connection |
|
416 |
return cli.GetClient() |
|
417 |
|
|
418 |
|
|
419 |
def _StartGroupChildren(cl, wait): |
|
420 |
"""Starts a new instance of the watcher for every node group. |
|
421 |
|
|
422 |
""" |
|
423 |
assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME) |
|
424 |
for arg in sys.argv) |
|
425 |
|
|
426 |
result = cl.QueryGroups([], ["name", "uuid"], False) |
|
427 |
|
|
428 |
children = [] |
|
429 |
|
|
430 |
for (idx, (name, uuid)) in enumerate(result): |
|
431 |
args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid] |
|
432 |
|
|
433 |
if idx > 0: |
|
434 |
# Let's not kill the system |
|
435 |
time.sleep(CHILD_PROCESS_DELAY) |
|
436 |
|
|
437 |
logging.debug("Spawning child for group '%s' (%s), arguments %s", |
|
438 |
name, uuid, args) |
|
439 |
|
|
440 |
try: |
|
441 |
# TODO: Should utils.StartDaemon be used instead? |
|
442 |
pid = os.spawnv(os.P_NOWAIT, args[0], args) |
|
443 |
except Exception: # pylint: disable-msg=W0703 |
|
444 |
logging.exception("Failed to start child for group '%s' (%s)", |
|
445 |
name, uuid) |
|
446 |
else: |
|
447 |
logging.debug("Started with PID %s", pid) |
|
448 |
children.append(pid) |
|
449 |
|
|
450 |
if wait: |
|
451 |
for pid in children: |
|
452 |
logging.debug("Waiting for child PID %s", pid) |
|
453 |
try: |
|
454 |
result = utils.RetryOnSignal(os.waitpid, pid, 0) |
|
455 |
except EnvironmentError, err: |
|
456 |
result = str(err) |
|
457 |
|
|
458 |
logging.debug("Child PID %s exited with status %s", pid, result) |
|
459 |
|
|
460 |
|
|
461 |
def _ArchiveJobs(cl, age): |
|
462 |
"""Archives old jobs. |
|
463 |
|
|
464 |
""" |
|
465 |
(arch_count, left_count) = cl.AutoArchiveJobs(age) |
|
466 |
logging.debug("Archived %s jobs, left %s", arch_count, left_count) |
|
467 |
|
|
468 |
|
|
469 |
def _CheckMaster(cl): |
|
470 |
"""Ensures current host is master node. |
|
471 |
|
|
472 |
""" |
|
473 |
(master, ) = cl.QueryConfigValues(["master_node"]) |
|
474 |
if master != netutils.Hostname.GetSysName(): |
|
475 |
raise NotMasterError("This is not the master node") |
|
476 |
|
|
477 |
|
|
457 | 478 |
@rapi.client.UsesRapiClient |
479 |
def _GlobalWatcher(opts): |
|
480 |
"""Main function for global watcher. |
|
481 |
|
|
482 |
At the end child processes are spawned for every node group. |
|
483 |
|
|
484 |
""" |
|
485 |
StartNodeDaemons() |
|
486 |
RunWatcherHooks() |
|
487 |
|
|
488 |
# Run node maintenance in all cases, even if master, so that old masters can |
|
489 |
# be properly cleaned up |
|
490 |
if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602 |
|
491 |
nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602 |
|
492 |
|
|
493 |
try: |
|
494 |
client = GetLuxiClient(True) |
|
495 |
except NotMasterError: |
|
496 |
# Don't proceed on non-master nodes |
|
497 |
return constants.EXIT_SUCCESS |
|
498 |
|
|
499 |
# we are on master now |
|
500 |
utils.EnsureDaemon(constants.RAPI) |
|
501 |
|
|
502 |
# If RAPI isn't responding to queries, try one restart |
|
503 |
logging.debug("Attempting to talk to remote API on %s", |
|
504 |
constants.IP4_ADDRESS_LOCALHOST) |
|
505 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): |
|
506 |
logging.warning("Couldn't get answer from remote API, restaring daemon") |
|
507 |
utils.StopDaemon(constants.RAPI) |
|
508 |
utils.EnsureDaemon(constants.RAPI) |
|
509 |
logging.debug("Second attempt to talk to remote API") |
|
510 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): |
|
511 |
logging.fatal("RAPI is not responding") |
|
512 |
logging.debug("Successfully talked to remote API") |
|
513 |
|
|
514 |
_CheckMaster(client) |
|
515 |
_ArchiveJobs(client, opts.job_age) |
|
516 |
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE) |
|
517 |
|
|
518 |
# Spawn child processes for all node groups |
|
519 |
_StartGroupChildren(client, opts.wait_children) |
|
520 |
|
|
521 |
return constants.EXIT_SUCCESS |
|
522 |
|
|
523 |
|
|
524 |
def _GetGroupData(cl, uuid): |
|
525 |
"""Retrieves instances and nodes per node group. |
|
526 |
|
|
527 |
""" |
|
528 |
# TODO: Implement locking |
|
529 |
job = [ |
|
530 |
# Get all primary instances in group |
|
531 |
opcodes.OpQuery(what=constants.QR_INSTANCE, |
|
532 |
fields=["name", "status", "admin_state", "snodes", |
|
533 |
"pnode.group.uuid", "snodes.group.uuid"], |
|
534 |
filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]), |
|
535 |
|
|
536 |
# Get all nodes in group |
|
537 |
opcodes.OpQuery(what=constants.QR_NODE, |
|
538 |
fields=["name", "bootid", "offline"], |
|
539 |
filter=[qlang.OP_EQUAL, "group.uuid", uuid]), |
|
540 |
] |
|
541 |
|
|
542 |
job_id = cl.SubmitJob(job) |
|
543 |
results = map(objects.QueryResponse.FromDict, |
|
544 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)) |
|
545 |
cl.ArchiveJob(job_id) |
|
546 |
|
|
547 |
results_data = map(operator.attrgetter("data"), results) |
|
548 |
|
|
549 |
# Ensure results are tuples with two values |
|
550 |
assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data)) |
|
551 |
|
|
552 |
# Extract values ignoring result status |
|
553 |
(raw_instances, raw_nodes) = [[map(compat.snd, values) |
|
554 |
for values in res] |
|
555 |
for res in results_data] |
|
556 |
|
|
557 |
secondaries = {} |
|
558 |
instances = [] |
|
559 |
|
|
560 |
# Load all instances |
|
561 |
for (name, status, autostart, snodes, pnode_group_uuid, |
|
562 |
snodes_group_uuid) in raw_instances: |
|
563 |
if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid): |
|
564 |
logging.error("Ignoring split instance '%s', primary group %s, secondary" |
|
565 |
" groups %s", name, pnode_group_uuid, |
|
566 |
utils.CommaJoin(snodes_group_uuid)) |
|
567 |
else: |
|
568 |
instances.append(Instance(name, status, autostart, snodes)) |
|
569 |
|
|
570 |
for node in snodes: |
|
571 |
secondaries.setdefault(node, set()).add(name) |
|
572 |
|
|
573 |
# Load all nodes |
|
574 |
nodes = [Node(name, bootid, offline, secondaries.get(name, set())) |
|
575 |
for (name, bootid, offline) in raw_nodes] |
|
576 |
|
|
577 |
return (dict((node.name, node) for node in nodes), |
|
578 |
dict((inst.name, inst) for inst in instances)) |
|
579 |
|
|
580 |
|
|
581 |
def _KnownGroup(uuid): |
|
582 |
"""Checks if a group UUID is known by ssconf. |
|
583 |
|
|
584 |
""" |
|
585 |
groups = ssconf.SimpleStore().GetNodegroupList() |
|
586 |
|
|
587 |
return compat.any(line.strip() and line.split()[0] == uuid |
|
588 |
for line in groups) |
|
589 |
|
|
590 |
|
|
591 |
def _GroupWatcher(opts): |
|
592 |
"""Main function for per-group watcher process. |
|
593 |
|
|
594 |
""" |
|
595 |
group_uuid = opts.nodegroup.lower() |
|
596 |
|
|
597 |
if not utils.UUID_RE.match(group_uuid): |
|
598 |
raise errors.GenericError("Node group parameter (%s) must be given a UUID," |
|
599 |
" got '%s'" % |
|
600 |
(cli.NODEGROUP_OPT_NAME, group_uuid)) |
|
601 |
|
|
602 |
logging.info("Watcher for node group '%s'", group_uuid) |
|
603 |
|
|
604 |
# Check if node group is known |
|
605 |
if not _KnownGroup(group_uuid): |
|
606 |
raise errors.GenericError("Node group '%s' is not known by ssconf" % |
|
607 |
group_uuid) |
|
608 |
|
|
609 |
state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid |
|
610 |
|
|
611 |
logging.debug("Using state file %s", state_path) |
|
612 |
|
|
613 |
# Global watcher |
|
614 |
statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602 |
|
615 |
if not statefile: |
|
616 |
return constants.EXIT_FAILURE |
|
617 |
|
|
618 |
notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602 |
|
619 |
try: |
|
620 |
# Connect to master daemon |
|
621 |
client = GetLuxiClient(False) |
|
622 |
|
|
623 |
_CheckMaster(client) |
|
624 |
|
|
625 |
(nodes, instances) = _GetGroupData(client, group_uuid) |
|
626 |
|
|
627 |
started = _CheckInstances(client, notepad, instances) |
|
628 |
_CheckDisks(client, notepad, nodes, instances, started) |
|
629 |
_VerifyDisks(client, group_uuid, nodes, instances) |
|
630 |
except Exception, err: |
|
631 |
logging.info("Not updating status file due to failure: %s", err) |
|
632 |
raise |
|
633 |
else: |
|
634 |
# Save changes for next run |
|
635 |
notepad.Save(state_path) |
|
636 |
|
|
637 |
return constants.EXIT_SUCCESS |
|
638 |
|
|
639 |
|
|
458 | 640 |
def Main(): |
459 | 641 |
"""Main function. |
460 | 642 |
|
461 | 643 |
""" |
462 |
global client # pylint: disable-msg=W0603 |
|
463 |
|
|
464 | 644 |
(options, _) = ParseOptions() |
465 | 645 |
|
466 | 646 |
utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0], |
... | ... | |
470 | 650 |
logging.debug("Pause has been set, exiting") |
471 | 651 |
return constants.EXIT_SUCCESS |
472 | 652 |
|
473 |
statefile = \ |
|
474 |
state.OpenStateFile(constants.WATCHER_STATEFILE) |
|
475 |
if not statefile: |
|
476 |
return constants.EXIT_FAILURE |
|
477 |
|
|
478 |
update_file = False |
|
653 |
# Try to acquire global watcher lock in shared mode |
|
654 |
lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE) |
|
479 | 655 |
try: |
480 |
StartNodeDaemons() |
|
481 |
RunWatcherHooks() |
|
482 |
# run node maintenance in all cases, even if master, so that old |
|
483 |
# masters can be properly cleaned up too |
|
484 |
if nodemaint.NodeMaintenance.ShouldRun(): |
|
485 |
nodemaint.NodeMaintenance().Exec() |
|
486 |
|
|
487 |
notepad = state.WatcherState(statefile) |
|
488 |
try: |
|
489 |
try: |
|
490 |
client = cli.GetClient() |
|
491 |
except errors.OpPrereqError: |
|
492 |
# this is, from cli.GetClient, a not-master case |
|
493 |
logging.debug("Not on master, exiting") |
|
494 |
update_file = True |
|
495 |
return constants.EXIT_SUCCESS |
|
496 |
except luxi.NoMasterError, err: |
|
497 |
logging.warning("Master seems to be down (%s), trying to restart", |
|
498 |
str(err)) |
|
499 |
if not utils.EnsureDaemon(constants.MASTERD): |
|
500 |
logging.critical("Can't start the master, exiting") |
|
501 |
return constants.EXIT_FAILURE |
|
502 |
# else retry the connection |
|
503 |
client = cli.GetClient() |
|
504 |
|
|
505 |
# we are on master now |
|
506 |
utils.EnsureDaemon(constants.RAPI) |
|
507 |
|
|
508 |
# If RAPI isn't responding to queries, try one restart. |
|
509 |
logging.debug("Attempting to talk with RAPI.") |
|
510 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): |
|
511 |
logging.warning("Couldn't get answer from Ganeti RAPI daemon." |
|
512 |
" Restarting Ganeti RAPI.") |
|
513 |
utils.StopDaemon(constants.RAPI) |
|
514 |
utils.EnsureDaemon(constants.RAPI) |
|
515 |
logging.debug("Second attempt to talk with RAPI") |
|
516 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): |
|
517 |
logging.fatal("RAPI is not responding. Please investigate.") |
|
518 |
logging.debug("Successfully talked to RAPI.") |
|
656 |
lock.Shared(blocking=False) |
|
657 |
except (EnvironmentError, errors.LockError), err: |
|
658 |
logging.error("Can't acquire lock on %s: %s", |
|
659 |
constants.WATCHER_LOCK_FILE, err) |
|
660 |
return constants.EXIT_SUCCESS |
|
519 | 661 |
|
520 |
try: |
|
521 |
watcher = Watcher(options, notepad) |
|
522 |
except errors.ConfigurationError: |
|
523 |
# Just exit if there's no configuration |
|
524 |
update_file = True |
|
525 |
return constants.EXIT_SUCCESS |
|
526 |
|
|
527 |
watcher.Run() |
|
528 |
update_file = True |
|
529 |
|
|
530 |
finally: |
|
531 |
if update_file: |
|
532 |
notepad.Save(constants.WATCHER_STATEFILE) |
|
533 |
else: |
|
534 |
logging.debug("Not updating status file due to failure") |
|
535 |
except SystemExit: |
|
662 |
if options.nodegroup is None: |
|
663 |
fn = _GlobalWatcher |
|
664 |
else: |
|
665 |
# Per-nodegroup watcher |
|
666 |
fn = _GroupWatcher |
|
667 |
|
|
668 |
try: |
|
669 |
return fn(options) |
|
670 |
except (SystemExit, KeyboardInterrupt): |
|
536 | 671 |
raise |
537 | 672 |
except NotMasterError: |
538 | 673 |
logging.debug("Not master, exiting") |
Also available in: Unified diff