watcher.state: Use strings, not objects
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 from optparse import OptionParser
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import compat
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
44 from ganeti import rapi
45 from ganeti import netutils
46
47 import ganeti.rapi.client # pylint: disable-msg=W0611
48
49 from ganeti.watcher import nodemaint
50 from ganeti.watcher import state
51
52
53 MAXTRIES = 5
54
55
56 # Global LUXI client object
57 client = None
58 BAD_STATES = frozenset([
59   constants.INSTST_ERRORDOWN,
60   ])
61 HELPLESS_STATES = frozenset([
62   constants.INSTST_NODEDOWN,
63   constants.INSTST_NODEOFFLINE,
64   ])
65 NOTICE = "NOTICE"
66 ERROR = "ERROR"
67
68
69 class NotMasterError(errors.GenericError):
70   """Exception raised when this host is not the master."""
71
72
73 def ShouldPause():
74   """Check whether we should pause.
75
76   """
77   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
78
79
80 def StartNodeDaemons():
81   """Start all the daemons that should be running on all nodes.
82
83   """
84   # on master or not, try to start the node daemon
85   utils.EnsureDaemon(constants.NODED)
86   # start confd as well. On non candidates it will be in disabled mode.
87   utils.EnsureDaemon(constants.CONFD)
88
89
90 def RunWatcherHooks():
91   """Run the watcher hooks.
92
93   """
94   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
95                              constants.HOOKS_NAME_WATCHER)
96   if not os.path.isdir(hooks_dir):
97     return
98
99   try:
100     results = utils.RunParts(hooks_dir)
101   except Exception: # pylint: disable-msg=W0703
102     logging.exception("RunParts %s failed: %s", hooks_dir)
103     return
104
105   for (relname, status, runresult) in results:
106     if status == constants.RUNPARTS_SKIP:
107       logging.debug("Watcher hook %s: skipped", relname)
108     elif status == constants.RUNPARTS_ERR:
109       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110     elif status == constants.RUNPARTS_RUN:
111       if runresult.failed:
112         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113                         relname, runresult.exit_code, runresult.output)
114       else:
115         logging.debug("Watcher hook %s: success (output: %s)", relname,
116                       runresult.output)
117     else:
118       raise errors.ProgrammerError("Unknown status %s returned by RunParts",
119                                    status)
120
121
122 class Instance(object):
123   """Abstraction for a Virtual Machine instance.
124
125   """
126   def __init__(self, name, status, autostart, snodes):
127     self.name = name
128     self.status = status
129     self.autostart = autostart
130     self.snodes = snodes
131
132   def Restart(self):
133     """Encapsulates the start of an instance.
134
135     """
136     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
137     cli.SubmitOpCode(op, cl=client)
138
139   def ActivateDisks(self):
140     """Encapsulates the activation of all disks of an instance.
141
142     """
143     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
144     cli.SubmitOpCode(op, cl=client)
145
146
147 def GetClusterData():
148   """Get a list of instances on this cluster.
149
150   """
151   op1_fields = ["name", "status", "admin_state", "snodes"]
152   op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
153                                 use_locking=True)
154   op2_fields = ["name", "bootid", "offline"]
155   op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
156                             use_locking=True)
157
158   job_id = client.SubmitJob([op1, op2])
159
160   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
161
162   logging.debug("Got data from cluster, writing instance status file")
163
164   result = all_results[0]
165   smap = {}
166
167   instances = {}
168
169   # write the instance status file
170   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
171   utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
172
173   for fields in result:
174     (name, status, autostart, snodes) = fields
175
176     # update the secondary node map
177     for node in snodes:
178       if node not in smap:
179         smap[node] = []
180       smap[node].append(name)
181
182     instances[name] = Instance(name, status, autostart, snodes)
183
184   nodes =  dict([(name, (bootid, offline))
185                  for name, bootid, offline in all_results[1]])
186
187   client.ArchiveJob(job_id)
188
189   return instances, nodes, smap
190
191
192 class Watcher(object):
193   """Encapsulate the logic for restarting erroneously halted virtual machines.
194
195   The calling program should periodically instantiate me and call Run().
196   This will traverse the list of instances, and make up to MAXTRIES attempts
197   to restart machines that are down.
198
199   """
200   def __init__(self, opts, notepad):
201     self.notepad = notepad
202     master = client.QueryConfigValues(["master_node"])[0]
203     if master != netutils.Hostname.GetSysName():
204       raise NotMasterError("This is not the master node")
205     # first archive old jobs
206     self.ArchiveJobs(opts.job_age)
207     # and only then submit new ones
208     self.instances, self.bootids, self.smap = GetClusterData()
209     self.started_instances = set()
210     self.opts = opts
211
212   def Run(self):
213     """Watcher run sequence.
214
215     """
216     notepad = self.notepad
217     self.CheckInstances(notepad)
218     self.CheckDisks(notepad)
219     self.VerifyDisks()
220
221   @staticmethod
222   def ArchiveJobs(age):
223     """Archive old jobs.
224
225     """
226     arch_count, left_count = client.AutoArchiveJobs(age)
227     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
228
229   def CheckDisks(self, notepad):
230     """Check all nodes for restarted ones.
231
232     """
233     check_nodes = []
234     for name, (new_id, offline) in self.bootids.iteritems():
235       old = notepad.GetNodeBootID(name)
236       if new_id is None:
237         # Bad node, not returning a boot id
238         if not offline:
239           logging.debug("Node %s missing boot id, skipping secondary checks",
240                         name)
241         continue
242       if old != new_id:
243         # Node's boot ID has changed, proably through a reboot.
244         check_nodes.append(name)
245
246     if check_nodes:
247       # Activate disks for all instances with any of the checked nodes as a
248       # secondary node.
249       for node in check_nodes:
250         if node not in self.smap:
251           continue
252         for instance_name in self.smap[node]:
253           instance = self.instances[instance_name]
254           if not instance.autostart:
255             logging.info(("Skipping disk activation for non-autostart"
256                           " instance %s"), instance.name)
257             continue
258           if instance.name in self.started_instances:
259             # we already tried to start the instance, which should have
260             # activated its drives (if they can be at all)
261             logging.debug("Skipping disk activation for instance %s, as"
262                           " it was already started", instance.name)
263             continue
264           try:
265             logging.info("Activating disks for instance %s", instance.name)
266             instance.ActivateDisks()
267           except Exception: # pylint: disable-msg=W0703
268             logging.exception("Error while activating disks for instance %s",
269                               instance.name)
270
271       # Keep changed boot IDs
272       for name in check_nodes:
273         notepad.SetNodeBootID(name, self.bootids[name][0])
274
275   def CheckInstances(self, notepad):
276     """Make a pass over the list of instances, restarting downed ones.
277
278     """
279     notepad.MaintainInstanceList(self.instances.keys())
280
281     for instance in self.instances.values():
282       if instance.status in BAD_STATES:
283         n = notepad.NumberOfRestartAttempts(instance.name)
284
285         if n > MAXTRIES:
286           logging.warning("Not restarting instance %s, retries exhausted",
287                           instance.name)
288           continue
289         elif n < MAXTRIES:
290           last = " (Attempt #%d)" % (n + 1)
291         else:
292           notepad.RecordRestartAttempt(instance.name)
293           logging.error("Could not restart %s after %d attempts, giving up",
294                         instance.name, MAXTRIES)
295           continue
296         try:
297           logging.info("Restarting %s%s", instance.name, last)
298           instance.Restart()
299           self.started_instances.add(instance.name)
300         except Exception: # pylint: disable-msg=W0703
301           logging.exception("Error while restarting instance %s",
302                             instance.name)
303
304         notepad.RecordRestartAttempt(instance.name)
305       elif instance.status in HELPLESS_STATES:
306         if notepad.NumberOfRestartAttempts(instance.name):
307           notepad.RemoveInstance(instance.name)
308       else:
309         if notepad.NumberOfRestartAttempts(instance.name):
310           notepad.RemoveInstance(instance.name)
311           logging.info("Restart of %s succeeded", instance.name)
312
313   def _CheckForOfflineNodes(self, instance):
314     """Checks if given instances has any secondary in offline status.
315
316     @param instance: The instance object
317     @return: True if any of the secondary is offline, False otherwise
318
319     """
320     bootids = []
321     for node in instance.snodes:
322       bootids.append(self.bootids[node])
323
324     return compat.any(offline for (_, offline) in bootids)
325
326   def VerifyDisks(self):
327     """Run gnt-cluster verify-disks.
328
329     """
330     job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
331     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
332     client.ArchiveJob(job_id)
333
334     # Keep track of submitted jobs
335     jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
336
337     archive_jobs = set()
338     for (status, job_id) in result[constants.JOB_IDS_KEY]:
339       jex.AddJobId(None, status, job_id)
340       if status:
341         archive_jobs.add(job_id)
342
343     offline_disk_instances = set()
344
345     for (status, result) in jex.GetResults():
346       if not status:
347         logging.error("Verify-disks job failed: %s", result)
348         continue
349
350       ((_, instances, _), ) = result
351
352       offline_disk_instances.update(instances)
353
354     for job_id in archive_jobs:
355       client.ArchiveJob(job_id)
356
357     if not offline_disk_instances:
358       # nothing to do
359       logging.debug("verify-disks reported no offline disks, nothing to do")
360       return
361
362     logging.debug("Will activate disks for instance(s) %s",
363                   utils.CommaJoin(offline_disk_instances))
364
365     # we submit only one job, and wait for it. not optimal, but spams
366     # less the job queue
367     job = []
368     for name in offline_disk_instances:
369       instance = self.instances[name]
370       if (instance.status in HELPLESS_STATES or
371           self._CheckForOfflineNodes(instance)):
372         logging.info("Skip instance %s because it is in helpless state or has"
373                      " one offline secondary", name)
374         continue
375       job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
376
377     if job:
378       job_id = cli.SendJob(job, cl=client)
379
380       try:
381         cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
382       except Exception: # pylint: disable-msg=W0703
383         logging.exception("Error while activating disks")
384
385
386 def IsRapiResponding(hostname):
387   """Connects to RAPI port and does a simple test.
388
389   Connects to RAPI port of hostname and does a simple test. At this time, the
390   test is GetVersion.
391
392   @type hostname: string
393   @param hostname: hostname of the node to connect to.
394   @rtype: bool
395   @return: Whether RAPI is working properly
396
397   """
398   curl_config = rapi.client.GenericCurlConfig()
399   rapi_client = rapi.client.GanetiRapiClient(hostname,
400                                              curl_config_fn=curl_config)
401   try:
402     master_version = rapi_client.GetVersion()
403   except rapi.client.CertificateError, err:
404     logging.warning("RAPI Error: CertificateError (%s)", err)
405     return False
406   except rapi.client.GanetiApiError, err:
407     logging.warning("RAPI Error: GanetiApiError (%s)", err)
408     return False
409   logging.debug("RAPI Result: master_version is %s", master_version)
410   return master_version == constants.RAPI_VERSION
411
412
413 def ParseOptions():
414   """Parse the command line options.
415
416   @return: (options, args) as from OptionParser.parse_args()
417
418   """
419   parser = OptionParser(description="Ganeti cluster watcher",
420                         usage="%prog [-d]",
421                         version="%%prog (ganeti) %s" %
422                         constants.RELEASE_VERSION)
423
424   parser.add_option(cli.DEBUG_OPT)
425   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
426                     help="Autoarchive jobs older than this age (default"
427                           " 6 hours)")
428   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
429                     action="store_true", help="Ignore cluster pause setting")
430   options, args = parser.parse_args()
431   options.job_age = cli.ParseTimespec(options.job_age)
432
433   if args:
434     parser.error("No arguments expected")
435
436   return (options, args)
437
438
439 @rapi.client.UsesRapiClient
440 def Main():
441   """Main function.
442
443   """
444   global client # pylint: disable-msg=W0603
445
446   (options, _) = ParseOptions()
447
448   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
449                      debug=options.debug, stderr_logging=options.debug)
450
451   if ShouldPause() and not options.ignore_pause:
452     logging.debug("Pause has been set, exiting")
453     return constants.EXIT_SUCCESS
454
455   statefile = \
456     state.OpenStateFile(constants.WATCHER_STATEFILE)
457   if not statefile:
458     return constants.EXIT_FAILURE
459
460   update_file = False
461   try:
462     StartNodeDaemons()
463     RunWatcherHooks()
464     # run node maintenance in all cases, even if master, so that old
465     # masters can be properly cleaned up too
466     if nodemaint.NodeMaintenance.ShouldRun():
467       nodemaint.NodeMaintenance().Exec()
468
469     notepad = state.WatcherState(statefile)
470     try:
471       try:
472         client = cli.GetClient()
473       except errors.OpPrereqError:
474         # this is, from cli.GetClient, a not-master case
475         logging.debug("Not on master, exiting")
476         update_file = True
477         return constants.EXIT_SUCCESS
478       except luxi.NoMasterError, err:
479         logging.warning("Master seems to be down (%s), trying to restart",
480                         str(err))
481         if not utils.EnsureDaemon(constants.MASTERD):
482           logging.critical("Can't start the master, exiting")
483           return constants.EXIT_FAILURE
484         # else retry the connection
485         client = cli.GetClient()
486
487       # we are on master now
488       utils.EnsureDaemon(constants.RAPI)
489
490       # If RAPI isn't responding to queries, try one restart.
491       logging.debug("Attempting to talk with RAPI.")
492       if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
493         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
494                         " Restarting Ganeti RAPI.")
495         utils.StopDaemon(constants.RAPI)
496         utils.EnsureDaemon(constants.RAPI)
497         logging.debug("Second attempt to talk with RAPI")
498         if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
499           logging.fatal("RAPI is not responding. Please investigate.")
500       logging.debug("Successfully talked to RAPI.")
501
502       try:
503         watcher = Watcher(options, notepad)
504       except errors.ConfigurationError:
505         # Just exit if there's no configuration
506         update_file = True
507         return constants.EXIT_SUCCESS
508
509       watcher.Run()
510       update_file = True
511
512     finally:
513       if update_file:
514         notepad.Save(constants.WATCHER_STATEFILE)
515       else:
516         logging.debug("Not updating status file due to failure")
517   except SystemExit:
518     raise
519   except NotMasterError:
520     logging.debug("Not master, exiting")
521     return constants.EXIT_NOTMASTER
522   except errors.ResolverError, err:
523     logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
524     return constants.EXIT_NODESETUP_ERROR
525   except errors.JobQueueFull:
526     logging.error("Job queue is full, can't query cluster state")
527   except errors.JobQueueDrainError:
528     logging.error("Job queue is drained, can't maintain cluster state")
529   except Exception, err:
530     logging.exception(str(err))
531     return constants.EXIT_FAILURE
532
533   return constants.EXIT_SUCCESS