32 |
32 |
import sys
|
33 |
33 |
import time
|
34 |
34 |
import logging
|
|
35 |
import operator
|
35 |
36 |
from optparse import OptionParser
|
36 |
37 |
|
37 |
38 |
from ganeti import utils
|
... | ... | |
43 |
44 |
from ganeti import luxi
|
44 |
45 |
from ganeti import rapi
|
45 |
46 |
from ganeti import netutils
|
|
47 |
from ganeti import qlang
|
|
48 |
from ganeti import objects
|
|
49 |
from ganeti import ssconf
|
|
50 |
from ganeti import ht
|
46 |
51 |
|
47 |
52 |
import ganeti.rapi.client # pylint: disable-msg=W0611
|
48 |
53 |
|
... | ... | |
51 |
56 |
|
52 |
57 |
|
53 |
58 |
MAXTRIES = 5
|
54 |
|
|
55 |
|
|
56 |
|
# Global LUXI client object
|
57 |
|
client = None
|
58 |
59 |
BAD_STATES = frozenset([
|
59 |
60 |
constants.INSTST_ERRORDOWN,
|
60 |
61 |
])
|
... | ... | |
65 |
66 |
NOTICE = "NOTICE"
|
66 |
67 |
ERROR = "ERROR"
|
67 |
68 |
|
|
69 |
#: Number of seconds to wait between starting child processes for node groups
|
|
70 |
CHILD_PROCESS_DELAY = 1.0
|
|
71 |
|
68 |
72 |
|
69 |
73 |
class NotMasterError(errors.GenericError):
|
70 |
74 |
"""Exception raised when this host is not the master."""
|
... | ... | |
129 |
133 |
self.autostart = autostart
|
130 |
134 |
self.snodes = snodes
|
131 |
135 |
|
132 |
|
def Restart(self):
|
|
136 |
def Restart(self, cl):
|
133 |
137 |
"""Encapsulates the start of an instance.
|
134 |
138 |
|
135 |
139 |
"""
|
136 |
140 |
op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
|
137 |
|
cli.SubmitOpCode(op, cl=client)
|
|
141 |
cli.SubmitOpCode(op, cl=cl)
|
138 |
142 |
|
139 |
|
def ActivateDisks(self):
|
|
143 |
def ActivateDisks(self, cl):
|
140 |
144 |
"""Encapsulates the activation of all disks of an instance.
|
141 |
145 |
|
142 |
146 |
"""
|
143 |
147 |
op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
|
144 |
|
cli.SubmitOpCode(op, cl=client)
|
|
148 |
cli.SubmitOpCode(op, cl=cl)
|
145 |
149 |
|
146 |
150 |
|
147 |
|
def GetClusterData():
|
148 |
|
"""Get a list of instances on this cluster.
|
|
151 |
class Node:
|
|
152 |
"""Data container representing cluster node.
|
149 |
153 |
|
150 |
154 |
"""
|
151 |
|
op1_fields = ["name", "status", "admin_state", "snodes"]
|
152 |
|
op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
|
153 |
|
use_locking=True)
|
154 |
|
op2_fields = ["name", "bootid", "offline"]
|
155 |
|
op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
|
156 |
|
use_locking=True)
|
157 |
|
|
158 |
|
job_id = client.SubmitJob([op1, op2])
|
|
155 |
def __init__(self, name, bootid, offline, secondaries):
|
|
156 |
"""Initializes this class.
|
159 |
157 |
|
160 |
|
all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
|
|
158 |
"""
|
|
159 |
self.name = name
|
|
160 |
self.bootid = bootid
|
|
161 |
self.offline = offline
|
|
162 |
self.secondaries = secondaries
|
161 |
163 |
|
162 |
|
logging.debug("Got data from cluster, writing instance status file")
|
163 |
164 |
|
164 |
|
result = all_results[0]
|
165 |
|
smap = {}
|
|
165 |
def _CheckInstances(cl, notepad, instances):
|
|
166 |
"""Make a pass over the list of instances, restarting downed ones.
|
166 |
167 |
|
167 |
|
instances = {}
|
|
168 |
"""
|
|
169 |
notepad.MaintainInstanceList(instances.keys())
|
168 |
170 |
|
169 |
|
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
|
|
171 |
started = set()
|
170 |
172 |
|
171 |
|
for fields in result:
|
172 |
|
(name, status, autostart, snodes) = fields
|
|
173 |
for inst in instances.values():
|
|
174 |
if inst.status in BAD_STATES:
|
|
175 |
n = notepad.NumberOfRestartAttempts(inst.name)
|
173 |
176 |
|
174 |
|
# update the secondary node map
|
175 |
|
for node in snodes:
|
176 |
|
if node not in smap:
|
177 |
|
smap[node] = []
|
178 |
|
smap[node].append(name)
|
|
177 |
if n > MAXTRIES:
|
|
178 |
logging.warning("Not restarting instance '%s', retries exhausted",
|
|
179 |
inst.name)
|
|
180 |
continue
|
179 |
181 |
|
180 |
|
instances[name] = Instance(name, status, autostart, snodes)
|
|
182 |
if n == MAXTRIES:
|
|
183 |
notepad.RecordRestartAttempt(inst.name)
|
|
184 |
logging.error("Could not restart instance '%s' after %s attempts,"
|
|
185 |
" giving up", inst.name, MAXTRIES)
|
|
186 |
continue
|
181 |
187 |
|
182 |
|
nodes = dict([(name, (bootid, offline))
|
183 |
|
for name, bootid, offline in all_results[1]])
|
|
188 |
try:
|
|
189 |
logging.info("Restarting instance '%s' (attempt #%s)",
|
|
190 |
inst.name, n + 1)
|
|
191 |
inst.Restart(cl)
|
|
192 |
except Exception: # pylint: disable-msg=W0703
|
|
193 |
logging.exception("Error while restarting instance '%s'", inst.name)
|
|
194 |
else:
|
|
195 |
started.add(inst.name)
|
184 |
196 |
|
185 |
|
client.ArchiveJob(job_id)
|
|
197 |
notepad.RecordRestartAttempt(inst.name)
|
186 |
198 |
|
187 |
|
return instances, nodes, smap
|
|
199 |
else:
|
|
200 |
if notepad.NumberOfRestartAttempts(inst.name):
|
|
201 |
notepad.RemoveInstance(inst.name)
|
|
202 |
if inst.status not in HELPLESS_STATES:
|
|
203 |
logging.info("Restart of instance '%s' succeeded", inst.name)
|
188 |
204 |
|
|
205 |
return started
|
189 |
206 |
|
190 |
|
class Watcher(object):
|
191 |
|
"""Encapsulate the logic for restarting erroneously halted virtual machines.
|
192 |
207 |
|
193 |
|
The calling program should periodically instantiate me and call Run().
|
194 |
|
This will traverse the list of instances, and make up to MAXTRIES attempts
|
195 |
|
to restart machines that are down.
|
|
208 |
def _CheckDisks(cl, notepad, nodes, instances, started):
|
|
209 |
"""Check all nodes for restarted ones.
|
196 |
210 |
|
197 |
211 |
"""
|
198 |
|
def __init__(self, opts, notepad):
|
199 |
|
self.notepad = notepad
|
200 |
|
master = client.QueryConfigValues(["master_node"])[0]
|
201 |
|
if master != netutils.Hostname.GetSysName():
|
202 |
|
raise NotMasterError("This is not the master node")
|
203 |
|
# first archive old jobs
|
204 |
|
self.ArchiveJobs(opts.job_age)
|
205 |
|
# and only then submit new ones
|
206 |
|
self.instances, self.bootids, self.smap = GetClusterData()
|
207 |
|
self.started_instances = set()
|
208 |
|
self.opts = opts
|
209 |
|
|
210 |
|
def Run(self):
|
211 |
|
"""Watcher run sequence.
|
212 |
|
|
213 |
|
"""
|
214 |
|
notepad = self.notepad
|
215 |
|
self.CheckInstances(notepad)
|
216 |
|
self.CheckDisks(notepad)
|
217 |
|
self.VerifyDisks()
|
218 |
|
|
219 |
|
@staticmethod
|
220 |
|
def ArchiveJobs(age):
|
221 |
|
"""Archive old jobs.
|
222 |
|
|
223 |
|
"""
|
224 |
|
arch_count, left_count = client.AutoArchiveJobs(age)
|
225 |
|
logging.debug("Archived %s jobs, left %s", arch_count, left_count)
|
226 |
|
|
227 |
|
def CheckDisks(self, notepad):
|
228 |
|
"""Check all nodes for restarted ones.
|
229 |
|
|
230 |
|
"""
|
231 |
|
check_nodes = []
|
232 |
|
for name, (new_id, offline) in self.bootids.iteritems():
|
233 |
|
old = notepad.GetNodeBootID(name)
|
234 |
|
if new_id is None:
|
235 |
|
# Bad node, not returning a boot id
|
236 |
|
if not offline:
|
237 |
|
logging.debug("Node %s missing boot id, skipping secondary checks",
|
238 |
|
name)
|
239 |
|
continue
|
240 |
|
if old != new_id:
|
241 |
|
# Node's boot ID has changed, proably through a reboot.
|
242 |
|
check_nodes.append(name)
|
243 |
|
|
244 |
|
if check_nodes:
|
245 |
|
# Activate disks for all instances with any of the checked nodes as a
|
246 |
|
# secondary node.
|
247 |
|
for node in check_nodes:
|
248 |
|
if node not in self.smap:
|
|
212 |
check_nodes = []
|
|
213 |
|
|
214 |
for node in nodes.values():
|
|
215 |
old = notepad.GetNodeBootID(node.name)
|
|
216 |
if not node.bootid:
|
|
217 |
# Bad node, not returning a boot id
|
|
218 |
if not node.offline:
|
|
219 |
logging.debug("Node '%s' missing boot ID, skipping secondary checks",
|
|
220 |
node.name)
|
|
221 |
continue
|
|
222 |
|
|
223 |
if old != node.bootid:
|
|
224 |
# Node's boot ID has changed, probably through a reboot
|
|
225 |
check_nodes.append(node)
|
|
226 |
|
|
227 |
if check_nodes:
|
|
228 |
# Activate disks for all instances with any of the checked nodes as a
|
|
229 |
# secondary node.
|
|
230 |
for node in check_nodes:
|
|
231 |
for instance_name in node.secondaries:
|
|
232 |
try:
|
|
233 |
inst = instances[instance_name]
|
|
234 |
except KeyError:
|
|
235 |
logging.info("Can't find instance '%s', maybe it was ignored",
|
|
236 |
instance_name)
|
249 |
237 |
continue
|
250 |
|
for instance_name in self.smap[node]:
|
251 |
|
instance = self.instances[instance_name]
|
252 |
|
if not instance.autostart:
|
253 |
|
logging.info(("Skipping disk activation for non-autostart"
|
254 |
|
" instance %s"), instance.name)
|
255 |
|
continue
|
256 |
|
if instance.name in self.started_instances:
|
257 |
|
# we already tried to start the instance, which should have
|
258 |
|
# activated its drives (if they can be at all)
|
259 |
|
logging.debug("Skipping disk activation for instance %s, as"
|
260 |
|
" it was already started", instance.name)
|
261 |
|
continue
|
262 |
|
try:
|
263 |
|
logging.info("Activating disks for instance %s", instance.name)
|
264 |
|
instance.ActivateDisks()
|
265 |
|
except Exception: # pylint: disable-msg=W0703
|
266 |
|
logging.exception("Error while activating disks for instance %s",
|
267 |
|
instance.name)
|
268 |
|
|
269 |
|
# Keep changed boot IDs
|
270 |
|
for name in check_nodes:
|
271 |
|
notepad.SetNodeBootID(name, self.bootids[name][0])
|
272 |
|
|
273 |
|
def CheckInstances(self, notepad):
|
274 |
|
"""Make a pass over the list of instances, restarting downed ones.
|
275 |
|
|
276 |
|
"""
|
277 |
|
notepad.MaintainInstanceList(self.instances.keys())
|
278 |
|
|
279 |
|
for instance in self.instances.values():
|
280 |
|
if instance.status in BAD_STATES:
|
281 |
|
n = notepad.NumberOfRestartAttempts(instance.name)
|
282 |
238 |
|
283 |
|
if n > MAXTRIES:
|
284 |
|
logging.warning("Not restarting instance %s, retries exhausted",
|
285 |
|
instance.name)
|
|
239 |
if not inst.autostart:
|
|
240 |
logging.info("Skipping disk activation for non-autostart"
|
|
241 |
" instance '%s'", inst.name)
|
286 |
242 |
continue
|
287 |
|
elif n < MAXTRIES:
|
288 |
|
last = " (Attempt #%d)" % (n + 1)
|
289 |
|
else:
|
290 |
|
notepad.RecordRestartAttempt(instance.name)
|
291 |
|
logging.error("Could not restart %s after %d attempts, giving up",
|
292 |
|
instance.name, MAXTRIES)
|
|
243 |
|
|
244 |
if inst.name in started:
|
|
245 |
# we already tried to start the instance, which should have
|
|
246 |
# activated its drives (if they can be at all)
|
|
247 |
logging.debug("Skipping disk activation for instance '%s' as"
|
|
248 |
" it was already started", inst.name)
|
293 |
249 |
continue
|
|
250 |
|
294 |
251 |
try:
|
295 |
|
logging.info("Restarting %s%s", instance.name, last)
|
296 |
|
instance.Restart()
|
297 |
|
self.started_instances.add(instance.name)
|
|
252 |
logging.info("Activating disks for instance '%s'", inst.name)
|
|
253 |
inst.ActivateDisks(cl)
|
298 |
254 |
except Exception: # pylint: disable-msg=W0703
|
299 |
|
logging.exception("Error while restarting instance %s",
|
300 |
|
instance.name)
|
|
255 |
logging.exception("Error while activating disks for instance '%s'",
|
|
256 |
inst.name)
|
301 |
257 |
|
302 |
|
notepad.RecordRestartAttempt(instance.name)
|
303 |
|
elif instance.status in HELPLESS_STATES:
|
304 |
|
if notepad.NumberOfRestartAttempts(instance.name):
|
305 |
|
notepad.RemoveInstance(instance.name)
|
306 |
|
else:
|
307 |
|
if notepad.NumberOfRestartAttempts(instance.name):
|
308 |
|
notepad.RemoveInstance(instance.name)
|
309 |
|
logging.info("Restart of %s succeeded", instance.name)
|
|
258 |
# Keep changed boot IDs
|
|
259 |
for node in check_nodes:
|
|
260 |
notepad.SetNodeBootID(node.name, node.bootid)
|
310 |
261 |
|
311 |
|
def _CheckForOfflineNodes(self, instance):
|
312 |
|
"""Checks if given instances has any secondary in offline status.
|
313 |
262 |
|
314 |
|
@param instance: The instance object
|
315 |
|
@return: True if any of the secondary is offline, False otherwise
|
316 |
|
|
317 |
|
"""
|
318 |
|
bootids = []
|
319 |
|
for node in instance.snodes:
|
320 |
|
bootids.append(self.bootids[node])
|
321 |
|
|
322 |
|
return compat.any(offline for (_, offline) in bootids)
|
323 |
|
|
324 |
|
def VerifyDisks(self):
|
325 |
|
"""Run gnt-cluster verify-disks.
|
326 |
|
|
327 |
|
"""
|
328 |
|
job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
|
329 |
|
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
|
330 |
|
client.ArchiveJob(job_id)
|
|
263 |
def _CheckForOfflineNodes(nodes, instance):
|
|
264 |
"""Checks if given instances has any secondary in offline status.
|
331 |
265 |
|
332 |
|
# Keep track of submitted jobs
|
333 |
|
jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
|
|
266 |
@param instance: The instance object
|
|
267 |
@return: True if any of the secondary is offline, False otherwise
|
334 |
268 |
|
335 |
|
archive_jobs = set()
|
336 |
|
for (status, job_id) in result[constants.JOB_IDS_KEY]:
|
337 |
|
jex.AddJobId(None, status, job_id)
|
338 |
|
if status:
|
339 |
|
archive_jobs.add(job_id)
|
|
269 |
"""
|
|
270 |
return compat.any(nodes[node_name].offline for node_name in instance.snodes)
|
340 |
271 |
|
341 |
|
offline_disk_instances = set()
|
342 |
272 |
|
343 |
|
for (status, result) in jex.GetResults():
|
344 |
|
if not status:
|
345 |
|
logging.error("Verify-disks job failed: %s", result)
|
346 |
|
continue
|
|
273 |
def _VerifyDisks(cl, uuid, nodes, instances):
|
|
274 |
"""Run a per-group "gnt-cluster verify-disks".
|
347 |
275 |
|
348 |
|
((_, instances, _), ) = result
|
|
276 |
"""
|
|
277 |
job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
|
|
278 |
((_, offline_disk_instances, _), ) = \
|
|
279 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
|
|
280 |
cl.ArchiveJob(job_id)
|
349 |
281 |
|
350 |
|
offline_disk_instances.update(instances)
|
|
282 |
if not offline_disk_instances:
|
|
283 |
# nothing to do
|
|
284 |
logging.debug("Verify-disks reported no offline disks, nothing to do")
|
|
285 |
return
|
351 |
286 |
|
352 |
|
for job_id in archive_jobs:
|
353 |
|
client.ArchiveJob(job_id)
|
|
287 |
logging.debug("Will activate disks for instance(s) %s",
|
|
288 |
utils.CommaJoin(offline_disk_instances))
|
354 |
289 |
|
355 |
|
if not offline_disk_instances:
|
356 |
|
# nothing to do
|
357 |
|
logging.debug("verify-disks reported no offline disks, nothing to do")
|
358 |
|
return
|
|
290 |
# We submit only one job, and wait for it. Not optimal, but this puts less
|
|
291 |
# load on the job queue.
|
|
292 |
job = []
|
|
293 |
for name in offline_disk_instances:
|
|
294 |
try:
|
|
295 |
inst = instances[name]
|
|
296 |
except KeyError:
|
|
297 |
logging.info("Can't find instance '%s', maybe it was ignored", name)
|
|
298 |
continue
|
359 |
299 |
|
360 |
|
logging.debug("Will activate disks for instance(s) %s",
|
361 |
|
utils.CommaJoin(offline_disk_instances))
|
|
300 |
if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
|
|
301 |
logging.info("Skipping instance '%s' because it is in a helpless state or"
|
|
302 |
" has offline secondaries", name)
|
|
303 |
continue
|
362 |
304 |
|
363 |
|
# we submit only one job, and wait for it. not optimal, but spams
|
364 |
|
# less the job queue
|
365 |
|
job = []
|
366 |
|
for name in offline_disk_instances:
|
367 |
|
instance = self.instances[name]
|
368 |
|
if (instance.status in HELPLESS_STATES or
|
369 |
|
self._CheckForOfflineNodes(instance)):
|
370 |
|
logging.info("Skip instance %s because it is in helpless state or has"
|
371 |
|
" one offline secondary", name)
|
372 |
|
continue
|
373 |
|
job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
|
|
305 |
job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
|
374 |
306 |
|
375 |
|
if job:
|
376 |
|
job_id = cli.SendJob(job, cl=client)
|
|
307 |
if job:
|
|
308 |
job_id = cli.SendJob(job, cl=cl)
|
377 |
309 |
|
378 |
|
try:
|
379 |
|
cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
|
380 |
|
except Exception: # pylint: disable-msg=W0703
|
381 |
|
logging.exception("Error while activating disks")
|
|
310 |
try:
|
|
311 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
|
|
312 |
except Exception: # pylint: disable-msg=W0703
|
|
313 |
logging.exception("Error while activating disks")
|
382 |
314 |
|
383 |
315 |
|
384 |
316 |
def IsRapiResponding(hostname):
|
... | ... | |
421 |
353 |
constants.RELEASE_VERSION)
|
422 |
354 |
|
423 |
355 |
parser.add_option(cli.DEBUG_OPT)
|
|
356 |
parser.add_option(cli.NODEGROUP_OPT)
|
424 |
357 |
parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
|
425 |
358 |
help="Autoarchive jobs older than this age (default"
|
426 |
359 |
" 6 hours)")
|
427 |
360 |
parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
|
428 |
361 |
action="store_true", help="Ignore cluster pause setting")
|
|
362 |
parser.add_option("--wait-children", dest="wait_children", default=False,
|
|
363 |
action="store_true", help="Wait for child processes")
|
429 |
364 |
options, args = parser.parse_args()
|
430 |
365 |
options.job_age = cli.ParseTimespec(options.job_age)
|
431 |
366 |
|
... | ... | |
454 |
389 |
for (name, status) in result))
|
455 |
390 |
|
456 |
391 |
|
|
392 |
def GetLuxiClient(try_restart):
|
|
393 |
"""Tries to connect to the master daemon.
|
|
394 |
|
|
395 |
@type try_restart: bool
|
|
396 |
@param try_restart: Whether to attempt to restart the master daemon
|
|
397 |
|
|
398 |
"""
|
|
399 |
try:
|
|
400 |
return cli.GetClient()
|
|
401 |
except errors.OpPrereqError, err:
|
|
402 |
# this is, from cli.GetClient, a not-master case
|
|
403 |
raise NotMasterError("Not on master node (%s)" % err)
|
|
404 |
|
|
405 |
except luxi.NoMasterError, err:
|
|
406 |
if not try_restart:
|
|
407 |
raise
|
|
408 |
|
|
409 |
logging.warning("Master daemon seems to be down (%s), trying to restart",
|
|
410 |
err)
|
|
411 |
|
|
412 |
if not utils.EnsureDaemon(constants.MASTERD):
|
|
413 |
raise errors.GenericError("Can't start the master daemon")
|
|
414 |
|
|
415 |
# Retry the connection
|
|
416 |
return cli.GetClient()
|
|
417 |
|
|
418 |
|
|
419 |
def _StartGroupChildren(cl, wait):
|
|
420 |
"""Starts a new instance of the watcher for every node group.
|
|
421 |
|
|
422 |
"""
|
|
423 |
assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
|
|
424 |
for arg in sys.argv)
|
|
425 |
|
|
426 |
result = cl.QueryGroups([], ["name", "uuid"], False)
|
|
427 |
|
|
428 |
children = []
|
|
429 |
|
|
430 |
for (idx, (name, uuid)) in enumerate(result):
|
|
431 |
args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
|
|
432 |
|
|
433 |
if idx > 0:
|
|
434 |
# Let's not kill the system
|
|
435 |
time.sleep(CHILD_PROCESS_DELAY)
|
|
436 |
|
|
437 |
logging.debug("Spawning child for group '%s' (%s), arguments %s",
|
|
438 |
name, uuid, args)
|
|
439 |
|
|
440 |
try:
|
|
441 |
# TODO: Should utils.StartDaemon be used instead?
|
|
442 |
pid = os.spawnv(os.P_NOWAIT, args[0], args)
|
|
443 |
except Exception: # pylint: disable-msg=W0703
|
|
444 |
logging.exception("Failed to start child for group '%s' (%s)",
|
|
445 |
name, uuid)
|
|
446 |
else:
|
|
447 |
logging.debug("Started with PID %s", pid)
|
|
448 |
children.append(pid)
|
|
449 |
|
|
450 |
if wait:
|
|
451 |
for pid in children:
|
|
452 |
logging.debug("Waiting for child PID %s", pid)
|
|
453 |
try:
|
|
454 |
result = utils.RetryOnSignal(os.waitpid, pid, 0)
|
|
455 |
except EnvironmentError, err:
|
|
456 |
result = str(err)
|
|
457 |
|
|
458 |
logging.debug("Child PID %s exited with status %s", pid, result)
|
|
459 |
|
|
460 |
|
|
461 |
def _ArchiveJobs(cl, age):
|
|
462 |
"""Archives old jobs.
|
|
463 |
|
|
464 |
"""
|
|
465 |
(arch_count, left_count) = cl.AutoArchiveJobs(age)
|
|
466 |
logging.debug("Archived %s jobs, left %s", arch_count, left_count)
|
|
467 |
|
|
468 |
|
|
469 |
def _CheckMaster(cl):
|
|
470 |
"""Ensures current host is master node.
|
|
471 |
|
|
472 |
"""
|
|
473 |
(master, ) = cl.QueryConfigValues(["master_node"])
|
|
474 |
if master != netutils.Hostname.GetSysName():
|
|
475 |
raise NotMasterError("This is not the master node")
|
|
476 |
|
|
477 |
|
457 |
478 |
@rapi.client.UsesRapiClient
|
|
479 |
def _GlobalWatcher(opts):
|
|
480 |
"""Main function for global watcher.
|
|
481 |
|
|
482 |
At the end child processes are spawned for every node group.
|
|
483 |
|
|
484 |
"""
|
|
485 |
StartNodeDaemons()
|
|
486 |
RunWatcherHooks()
|
|
487 |
|
|
488 |
# Run node maintenance in all cases, even if master, so that old masters can
|
|
489 |
# be properly cleaned up
|
|
490 |
if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
|
|
491 |
nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
|
|
492 |
|
|
493 |
try:
|
|
494 |
client = GetLuxiClient(True)
|
|
495 |
except NotMasterError:
|
|
496 |
# Don't proceed on non-master nodes
|
|
497 |
return constants.EXIT_SUCCESS
|
|
498 |
|
|
499 |
# we are on master now
|
|
500 |
utils.EnsureDaemon(constants.RAPI)
|
|
501 |
|
|
502 |
# If RAPI isn't responding to queries, try one restart
|
|
503 |
logging.debug("Attempting to talk to remote API on %s",
|
|
504 |
constants.IP4_ADDRESS_LOCALHOST)
|
|
505 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
|
|
506 |
logging.warning("Couldn't get answer from remote API, restaring daemon")
|
|
507 |
utils.StopDaemon(constants.RAPI)
|
|
508 |
utils.EnsureDaemon(constants.RAPI)
|
|
509 |
logging.debug("Second attempt to talk to remote API")
|
|
510 |
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
|
|
511 |
logging.fatal("RAPI is not responding")
|
|
512 |
logging.debug("Successfully talked to remote API")
|
|
513 |
|
|
514 |
_CheckMaster(client)
|
|
515 |
_ArchiveJobs(client, opts.job_age)
|
|
516 |
_UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
|
|
517 |
|
|
518 |
# Spawn child processes for all node groups
|
|
519 |
_StartGroupChildren(client, opts.wait_children)
|
|
520 |
|
|
521 |
return constants.EXIT_SUCCESS
|
|
522 |
|
|
523 |
|
|
524 |
def _GetGroupData(cl, uuid):
|
|
525 |
"""Retrieves instances and nodes per node group.
|
|
526 |
|
|
527 |
"""
|
|
528 |
# TODO: Implement locking
|
|
529 |
job = [
|
|
530 |
# Get all primary instances in group
|
|
531 |
opcodes.OpQuery(what=constants.QR_INSTANCE,
|
|
532 |
fields=["name", "status", "admin_state", "snodes",
|
|
533 |
"pnode.group.uuid", "snodes.group.uuid"],
|
|
534 |
filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
|
|
535 |
|
|
536 |
# Get all nodes in group
|
|
537 |
opcodes.OpQuery(what=constants.QR_NODE,
|
|
538 |
fields=["name", "bootid", "offline"],
|
|
539 |
filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
|
|
540 |
]
|
|
541 |
|
|
542 |
job_id = cl.SubmitJob(job)
|
|
543 |
results = map(objects.QueryResponse.FromDict,
|
|
544 |
cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
|
|
545 |
cl.ArchiveJob(job_id)
|
|
546 |
|
|
547 |
results_data = map(operator.attrgetter("data"), results)
|
|
548 |
|
|
549 |
# Ensure results are tuples with two values
|
|
550 |
assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
|
|
551 |
|
|
552 |
# Extract values ignoring result status
|
|
553 |
(raw_instances, raw_nodes) = [[map(compat.snd, values)
|
|
554 |
for values in res]
|
|
555 |
for res in results_data]
|
|
556 |
|
|
557 |
secondaries = {}
|
|
558 |
instances = []
|
|
559 |
|
|
560 |
# Load all instances
|
|
561 |
for (name, status, autostart, snodes, pnode_group_uuid,
|
|
562 |
snodes_group_uuid) in raw_instances:
|
|
563 |
if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
|
|
564 |
logging.error("Ignoring split instance '%s', primary group %s, secondary"
|
|
565 |
" groups %s", name, pnode_group_uuid,
|
|
566 |
utils.CommaJoin(snodes_group_uuid))
|
|
567 |
else:
|
|
568 |
instances.append(Instance(name, status, autostart, snodes))
|
|
569 |
|
|
570 |
for node in snodes:
|
|
571 |
secondaries.setdefault(node, set()).add(name)
|
|
572 |
|
|
573 |
# Load all nodes
|
|
574 |
nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
|
|
575 |
for (name, bootid, offline) in raw_nodes]
|
|
576 |
|
|
577 |
return (dict((node.name, node) for node in nodes),
|
|
578 |
dict((inst.name, inst) for inst in instances))
|
|
579 |
|
|
580 |
|
|
581 |
def _KnownGroup(uuid):
|
|
582 |
"""Checks if a group UUID is known by ssconf.
|
|
583 |
|
|
584 |
"""
|
|
585 |
groups = ssconf.SimpleStore().GetNodegroupList()
|
|
586 |
|
|
587 |
return compat.any(line.strip() and line.split()[0] == uuid
|
|
588 |
for line in groups)
|
|
589 |
|
|
590 |
|
|
591 |
def _GroupWatcher(opts):
|
|
592 |
"""Main function for per-group watcher process.
|
|
593 |
|
|
594 |
"""
|
|
595 |
group_uuid = opts.nodegroup.lower()
|
|
596 |
|
|
597 |
if not utils.UUID_RE.match(group_uuid):
|
|
598 |
raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
|
|
599 |
" got '%s'" %
|
|
600 |
(cli.NODEGROUP_OPT_NAME, group_uuid))
|
|
601 |
|
|
602 |
logging.info("Watcher for node group '%s'", group_uuid)
|
|
603 |
|
|
604 |
# Check if node group is known
|
|
605 |
if not _KnownGroup(group_uuid):
|
|
606 |
raise errors.GenericError("Node group '%s' is not known by ssconf" %
|
|
607 |
group_uuid)
|
|
608 |
|
|
609 |
state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
|
|
610 |
|
|
611 |
logging.debug("Using state file %s", state_path)
|
|
612 |
|
|
613 |
# Global watcher
|
|
614 |
statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
|
|
615 |
if not statefile:
|
|
616 |
return constants.EXIT_FAILURE
|
|
617 |
|
|
618 |
notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
|
|
619 |
try:
|
|
620 |
# Connect to master daemon
|
|
621 |
client = GetLuxiClient(False)
|
|
622 |
|
|
623 |
_CheckMaster(client)
|
|
624 |
|
|
625 |
(nodes, instances) = _GetGroupData(client, group_uuid)
|
|
626 |
|
|
627 |
started = _CheckInstances(client, notepad, instances)
|
|
628 |
_CheckDisks(client, notepad, nodes, instances, started)
|
|
629 |
_VerifyDisks(client, group_uuid, nodes, instances)
|
|
630 |
except Exception, err:
|
|
631 |
logging.info("Not updating status file due to failure: %s", err)
|
|
632 |
raise
|
|
633 |
else:
|
|
634 |
# Save changes for next run
|
|
635 |
notepad.Save(state_path)
|
|
636 |
|
|
637 |
return constants.EXIT_SUCCESS
|
|
638 |
|
|
639 |
|
458 |
640 |
def Main():
|
459 |
641 |
"""Main function.
|
460 |
642 |
|
461 |
643 |
"""
|
462 |
|
global client # pylint: disable-msg=W0603
|
463 |
|
|
464 |
644 |
(options, _) = ParseOptions()
|
465 |
645 |
|
466 |
646 |
utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
|
... | ... | |
470 |
650 |
logging.debug("Pause has been set, exiting")
|
471 |
651 |
return constants.EXIT_SUCCESS
|
472 |
652 |
|
473 |
|
statefile = \
|
474 |
|
state.OpenStateFile(constants.WATCHER_STATEFILE)
|
475 |
|
if not statefile:
|
476 |
|
return constants.EXIT_FAILURE
|
477 |
|
|
478 |
|
update_file = False
|
|
653 |
# Try to acquire global watcher lock in shared mode
|
|
654 |
lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
|
479 |
655 |
try:
|
480 |
|
StartNodeDaemons()
|
481 |
|
RunWatcherHooks()
|
482 |
|
# run node maintenance in all cases, even if master, so that old
|
483 |
|
# masters can be properly cleaned up too
|
484 |
|
if nodemaint.NodeMaintenance.ShouldRun():
|
485 |
|
nodemaint.NodeMaintenance().Exec()
|
486 |
|
|
487 |
|
notepad = state.WatcherState(statefile)
|
488 |
|
try:
|
489 |
|
try:
|
490 |
|
client = cli.GetClient()
|
491 |
|
except errors.OpPrereqError:
|
492 |
|
# this is, from cli.GetClient, a not-master case
|
493 |
|
logging.debug("Not on master, exiting")
|
494 |
|
update_file = True
|
495 |
|
return constants.EXIT_SUCCESS
|
496 |
|
except luxi.NoMasterError, err:
|
497 |
|
logging.warning("Master seems to be down (%s), trying to restart",
|
498 |
|
str(err))
|
499 |
|
if not utils.EnsureDaemon(constants.MASTERD):
|
500 |
|
logging.critical("Can't start the master, exiting")
|
501 |
|
return constants.EXIT_FAILURE
|
502 |
|
# else retry the connection
|
503 |
|
client = cli.GetClient()
|
504 |
|
|
505 |
|
# we are on master now
|
506 |
|
utils.EnsureDaemon(constants.RAPI)
|
507 |
|
|
508 |
|
# If RAPI isn't responding to queries, try one restart.
|
509 |
|
logging.debug("Attempting to talk with RAPI.")
|
510 |
|
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
|
511 |
|
logging.warning("Couldn't get answer from Ganeti RAPI daemon."
|
512 |
|
" Restarting Ganeti RAPI.")
|
513 |
|
utils.StopDaemon(constants.RAPI)
|
514 |
|
utils.EnsureDaemon(constants.RAPI)
|
515 |
|
logging.debug("Second attempt to talk with RAPI")
|
516 |
|
if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
|
517 |
|
logging.fatal("RAPI is not responding. Please investigate.")
|
518 |
|
logging.debug("Successfully talked to RAPI.")
|
|
656 |
lock.Shared(blocking=False)
|
|
657 |
except (EnvironmentError, errors.LockError), err:
|
|
658 |
logging.error("Can't acquire lock on %s: %s",
|
|
659 |
constants.WATCHER_LOCK_FILE, err)
|
|
660 |
return constants.EXIT_SUCCESS
|
519 |
661 |
|
520 |
|
try:
|
521 |
|
watcher = Watcher(options, notepad)
|
522 |
|
except errors.ConfigurationError:
|
523 |
|
# Just exit if there's no configuration
|
524 |
|
update_file = True
|
525 |
|
return constants.EXIT_SUCCESS
|
526 |
|
|
527 |
|
watcher.Run()
|
528 |
|
update_file = True
|
529 |
|
|
530 |
|
finally:
|
531 |
|
if update_file:
|
532 |
|
notepad.Save(constants.WATCHER_STATEFILE)
|
533 |
|
else:
|
534 |
|
logging.debug("Not updating status file due to failure")
|
535 |
|
except SystemExit:
|
|
662 |
if options.nodegroup is None:
|
|
663 |
fn = _GlobalWatcher
|
|
664 |
else:
|
|
665 |
# Per-nodegroup watcher
|
|
666 |
fn = _GroupWatcher
|
|
667 |
|
|
668 |
try:
|
|
669 |
return fn(options)
|
|
670 |
except (SystemExit, KeyboardInterrupt):
|
536 |
671 |
raise
|
537 |
672 |
except NotMasterError:
|
538 |
673 |
logging.debug("Not master, exiting")
|