watcher: Split state class into separate module
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 from optparse import OptionParser
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import compat
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
44 from ganeti import rapi
45 from ganeti import netutils
46
47 import ganeti.rapi.client # pylint: disable-msg=W0611
48
49 from ganeti.watcher import nodemaint
50 from ganeti.watcher import state
51
52
53 MAXTRIES = 5
54 BAD_STATES = [constants.INSTST_ERRORDOWN]
55 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
56 NOTICE = 'NOTICE'
57 ERROR = 'ERROR'
58
59
60 # Global LUXI client object
61 client = None
62
63
64 class NotMasterError(errors.GenericError):
65   """Exception raised when this host is not the master."""
66
67
68 def ShouldPause():
69   """Check whether we should pause.
70
71   """
72   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
73
74
75 def StartNodeDaemons():
76   """Start all the daemons that should be running on all nodes.
77
78   """
79   # on master or not, try to start the node daemon
80   utils.EnsureDaemon(constants.NODED)
81   # start confd as well. On non candidates it will be in disabled mode.
82   utils.EnsureDaemon(constants.CONFD)
83
84
85 def RunWatcherHooks():
86   """Run the watcher hooks.
87
88   """
89   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
90                              constants.HOOKS_NAME_WATCHER)
91   if not os.path.isdir(hooks_dir):
92     return
93
94   try:
95     results = utils.RunParts(hooks_dir)
96   except Exception: # pylint: disable-msg=W0703
97     logging.exception("RunParts %s failed: %s", hooks_dir)
98     return
99
100   for (relname, status, runresult) in results:
101     if status == constants.RUNPARTS_SKIP:
102       logging.debug("Watcher hook %s: skipped", relname)
103     elif status == constants.RUNPARTS_ERR:
104       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
105     elif status == constants.RUNPARTS_RUN:
106       if runresult.failed:
107         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
108                         relname, runresult.exit_code, runresult.output)
109       else:
110         logging.debug("Watcher hook %s: success (output: %s)", relname,
111                       runresult.output)
112
113
114 class Instance(object):
115   """Abstraction for a Virtual Machine instance.
116
117   """
118   def __init__(self, name, status, autostart, snodes):
119     self.name = name
120     self.status = status
121     self.autostart = autostart
122     self.snodes = snodes
123
124   def Restart(self):
125     """Encapsulates the start of an instance.
126
127     """
128     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
129     cli.SubmitOpCode(op, cl=client)
130
131   def ActivateDisks(self):
132     """Encapsulates the activation of all disks of an instance.
133
134     """
135     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
136     cli.SubmitOpCode(op, cl=client)
137
138
139 def GetClusterData():
140   """Get a list of instances on this cluster.
141
142   """
143   op1_fields = ["name", "status", "admin_state", "snodes"]
144   op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
145                                 use_locking=True)
146   op2_fields = ["name", "bootid", "offline"]
147   op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
148                             use_locking=True)
149
150   job_id = client.SubmitJob([op1, op2])
151
152   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
153
154   logging.debug("Got data from cluster, writing instance status file")
155
156   result = all_results[0]
157   smap = {}
158
159   instances = {}
160
161   # write the instance status file
162   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
163   utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
164
165   for fields in result:
166     (name, status, autostart, snodes) = fields
167
168     # update the secondary node map
169     for node in snodes:
170       if node not in smap:
171         smap[node] = []
172       smap[node].append(name)
173
174     instances[name] = Instance(name, status, autostart, snodes)
175
176   nodes =  dict([(name, (bootid, offline))
177                  for name, bootid, offline in all_results[1]])
178
179   client.ArchiveJob(job_id)
180
181   return instances, nodes, smap
182
183
184 class Watcher(object):
185   """Encapsulate the logic for restarting erroneously halted virtual machines.
186
187   The calling program should periodically instantiate me and call Run().
188   This will traverse the list of instances, and make up to MAXTRIES attempts
189   to restart machines that are down.
190
191   """
192   def __init__(self, opts, notepad):
193     self.notepad = notepad
194     master = client.QueryConfigValues(["master_node"])[0]
195     if master != netutils.Hostname.GetSysName():
196       raise NotMasterError("This is not the master node")
197     # first archive old jobs
198     self.ArchiveJobs(opts.job_age)
199     # and only then submit new ones
200     self.instances, self.bootids, self.smap = GetClusterData()
201     self.started_instances = set()
202     self.opts = opts
203
204   def Run(self):
205     """Watcher run sequence.
206
207     """
208     notepad = self.notepad
209     self.CheckInstances(notepad)
210     self.CheckDisks(notepad)
211     self.VerifyDisks()
212
213   @staticmethod
214   def ArchiveJobs(age):
215     """Archive old jobs.
216
217     """
218     arch_count, left_count = client.AutoArchiveJobs(age)
219     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
220
221   def CheckDisks(self, notepad):
222     """Check all nodes for restarted ones.
223
224     """
225     check_nodes = []
226     for name, (new_id, offline) in self.bootids.iteritems():
227       old = notepad.GetNodeBootID(name)
228       if new_id is None:
229         # Bad node, not returning a boot id
230         if not offline:
231           logging.debug("Node %s missing boot id, skipping secondary checks",
232                         name)
233         continue
234       if old != new_id:
235         # Node's boot ID has changed, proably through a reboot.
236         check_nodes.append(name)
237
238     if check_nodes:
239       # Activate disks for all instances with any of the checked nodes as a
240       # secondary node.
241       for node in check_nodes:
242         if node not in self.smap:
243           continue
244         for instance_name in self.smap[node]:
245           instance = self.instances[instance_name]
246           if not instance.autostart:
247             logging.info(("Skipping disk activation for non-autostart"
248                           " instance %s"), instance.name)
249             continue
250           if instance.name in self.started_instances:
251             # we already tried to start the instance, which should have
252             # activated its drives (if they can be at all)
253             logging.debug("Skipping disk activation for instance %s, as"
254                           " it was already started", instance.name)
255             continue
256           try:
257             logging.info("Activating disks for instance %s", instance.name)
258             instance.ActivateDisks()
259           except Exception: # pylint: disable-msg=W0703
260             logging.exception("Error while activating disks for instance %s",
261                               instance.name)
262
263       # Keep changed boot IDs
264       for name in check_nodes:
265         notepad.SetNodeBootID(name, self.bootids[name][0])
266
267   def CheckInstances(self, notepad):
268     """Make a pass over the list of instances, restarting downed ones.
269
270     """
271     notepad.MaintainInstanceList(self.instances.keys())
272
273     for instance in self.instances.values():
274       if instance.status in BAD_STATES:
275         n = notepad.NumberOfRestartAttempts(instance)
276
277         if n > MAXTRIES:
278           logging.warning("Not restarting instance %s, retries exhausted",
279                           instance.name)
280           continue
281         elif n < MAXTRIES:
282           last = " (Attempt #%d)" % (n + 1)
283         else:
284           notepad.RecordRestartAttempt(instance)
285           logging.error("Could not restart %s after %d attempts, giving up",
286                         instance.name, MAXTRIES)
287           continue
288         try:
289           logging.info("Restarting %s%s", instance.name, last)
290           instance.Restart()
291           self.started_instances.add(instance.name)
292         except Exception: # pylint: disable-msg=W0703
293           logging.exception("Error while restarting instance %s",
294                             instance.name)
295
296         notepad.RecordRestartAttempt(instance)
297       elif instance.status in HELPLESS_STATES:
298         if notepad.NumberOfRestartAttempts(instance):
299           notepad.RemoveInstance(instance)
300       else:
301         if notepad.NumberOfRestartAttempts(instance):
302           notepad.RemoveInstance(instance)
303           logging.info("Restart of %s succeeded", instance.name)
304
305   def _CheckForOfflineNodes(self, instance):
306     """Checks if given instances has any secondary in offline status.
307
308     @param instance: The instance object
309     @return: True if any of the secondary is offline, False otherwise
310
311     """
312     bootids = []
313     for node in instance.snodes:
314       bootids.append(self.bootids[node])
315
316     return compat.any(offline for (_, offline) in bootids)
317
318   def VerifyDisks(self):
319     """Run gnt-cluster verify-disks.
320
321     """
322     job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
323     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
324     client.ArchiveJob(job_id)
325
326     # Keep track of submitted jobs
327     jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
328
329     archive_jobs = set()
330     for (status, job_id) in result[constants.JOB_IDS_KEY]:
331       jex.AddJobId(None, status, job_id)
332       if status:
333         archive_jobs.add(job_id)
334
335     offline_disk_instances = set()
336
337     for (status, result) in jex.GetResults():
338       if not status:
339         logging.error("Verify-disks job failed: %s", result)
340         continue
341
342       ((_, instances, _), ) = result
343
344       offline_disk_instances.update(instances)
345
346     for job_id in archive_jobs:
347       client.ArchiveJob(job_id)
348
349     if not offline_disk_instances:
350       # nothing to do
351       logging.debug("verify-disks reported no offline disks, nothing to do")
352       return
353
354     logging.debug("Will activate disks for instance(s) %s",
355                   utils.CommaJoin(offline_disk_instances))
356
357     # we submit only one job, and wait for it. not optimal, but spams
358     # less the job queue
359     job = []
360     for name in offline_disk_instances:
361       instance = self.instances[name]
362       if (instance.status in HELPLESS_STATES or
363           self._CheckForOfflineNodes(instance)):
364         logging.info("Skip instance %s because it is in helpless state or has"
365                      " one offline secondary", name)
366         continue
367       job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
368
369     if job:
370       job_id = cli.SendJob(job, cl=client)
371
372       try:
373         cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
374       except Exception: # pylint: disable-msg=W0703
375         logging.exception("Error while activating disks")
376
377
378 def IsRapiResponding(hostname):
379   """Connects to RAPI port and does a simple test.
380
381   Connects to RAPI port of hostname and does a simple test. At this time, the
382   test is GetVersion.
383
384   @type hostname: string
385   @param hostname: hostname of the node to connect to.
386   @rtype: bool
387   @return: Whether RAPI is working properly
388
389   """
390   curl_config = rapi.client.GenericCurlConfig()
391   rapi_client = rapi.client.GanetiRapiClient(hostname,
392                                              curl_config_fn=curl_config)
393   try:
394     master_version = rapi_client.GetVersion()
395   except rapi.client.CertificateError, err:
396     logging.warning("RAPI Error: CertificateError (%s)", err)
397     return False
398   except rapi.client.GanetiApiError, err:
399     logging.warning("RAPI Error: GanetiApiError (%s)", err)
400     return False
401   logging.debug("RAPI Result: master_version is %s", master_version)
402   return master_version == constants.RAPI_VERSION
403
404
405 def ParseOptions():
406   """Parse the command line options.
407
408   @return: (options, args) as from OptionParser.parse_args()
409
410   """
411   parser = OptionParser(description="Ganeti cluster watcher",
412                         usage="%prog [-d]",
413                         version="%%prog (ganeti) %s" %
414                         constants.RELEASE_VERSION)
415
416   parser.add_option(cli.DEBUG_OPT)
417   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
418                     help="Autoarchive jobs older than this age (default"
419                           " 6 hours)")
420   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
421                     action="store_true", help="Ignore cluster pause setting")
422   options, args = parser.parse_args()
423   options.job_age = cli.ParseTimespec(options.job_age)
424
425   if args:
426     parser.error("No arguments expected")
427
428   return (options, args)
429
430
431 @rapi.client.UsesRapiClient
432 def Main():
433   """Main function.
434
435   """
436   global client # pylint: disable-msg=W0603
437
438   (options, _) = ParseOptions()
439
440   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
441                      debug=options.debug, stderr_logging=options.debug)
442
443   if ShouldPause() and not options.ignore_pause:
444     logging.debug("Pause has been set, exiting")
445     return constants.EXIT_SUCCESS
446
447   statefile = \
448     state.OpenStateFile(constants.WATCHER_STATEFILE)
449   if not statefile:
450     return constants.EXIT_FAILURE
451
452   update_file = False
453   try:
454     StartNodeDaemons()
455     RunWatcherHooks()
456     # run node maintenance in all cases, even if master, so that old
457     # masters can be properly cleaned up too
458     if nodemaint.NodeMaintenance.ShouldRun():
459       nodemaint.NodeMaintenance().Exec()
460
461     notepad = state.WatcherState(statefile)
462     try:
463       try:
464         client = cli.GetClient()
465       except errors.OpPrereqError:
466         # this is, from cli.GetClient, a not-master case
467         logging.debug("Not on master, exiting")
468         update_file = True
469         return constants.EXIT_SUCCESS
470       except luxi.NoMasterError, err:
471         logging.warning("Master seems to be down (%s), trying to restart",
472                         str(err))
473         if not utils.EnsureDaemon(constants.MASTERD):
474           logging.critical("Can't start the master, exiting")
475           return constants.EXIT_FAILURE
476         # else retry the connection
477         client = cli.GetClient()
478
479       # we are on master now
480       utils.EnsureDaemon(constants.RAPI)
481
482       # If RAPI isn't responding to queries, try one restart.
483       logging.debug("Attempting to talk with RAPI.")
484       if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
485         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
486                         " Restarting Ganeti RAPI.")
487         utils.StopDaemon(constants.RAPI)
488         utils.EnsureDaemon(constants.RAPI)
489         logging.debug("Second attempt to talk with RAPI")
490         if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
491           logging.fatal("RAPI is not responding. Please investigate.")
492       logging.debug("Successfully talked to RAPI.")
493
494       try:
495         watcher = Watcher(options, notepad)
496       except errors.ConfigurationError:
497         # Just exit if there's no configuration
498         update_file = True
499         return constants.EXIT_SUCCESS
500
501       watcher.Run()
502       update_file = True
503
504     finally:
505       if update_file:
506         notepad.Save()
507       else:
508         logging.debug("Not updating status file due to failure")
509   except SystemExit:
510     raise
511   except NotMasterError:
512     logging.debug("Not master, exiting")
513     return constants.EXIT_NOTMASTER
514   except errors.ResolverError, err:
515     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
516     return constants.EXIT_NODESETUP_ERROR
517   except errors.JobQueueFull:
518     logging.error("Job queue is full, can't query cluster state")
519   except errors.JobQueueDrainError:
520     logging.error("Job queue is drained, can't maintain cluster state")
521   except Exception, err:
522     logging.exception(str(err))
523     return constants.EXIT_FAILURE
524
525   return constants.EXIT_SUCCESS