Introduce an RPC call for OS parameters validation
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 # pylint: disable-msg=C0103,W0142
31
32 # C0103: Invalid name ganeti-watcher
33
34 import os
35 import sys
36 import time
37 import logging
38 from optparse import OptionParser
39
40 from ganeti import utils
41 from ganeti import constants
42 from ganeti import serializer
43 from ganeti import errors
44 from ganeti import opcodes
45 from ganeti import cli
46 from ganeti import luxi
47 from ganeti import ssconf
48 from ganeti import bdev
49 from ganeti import hypervisor
50 from ganeti import rapi
51 from ganeti.confd import client as confd_client
52
53 import ganeti.rapi.client # pylint: disable-msg=W0611
54
55
56 MAXTRIES = 5
57 BAD_STATES = ['ERROR_down']
58 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
59 NOTICE = 'NOTICE'
60 ERROR = 'ERROR'
61 KEY_RESTART_COUNT = "restart_count"
62 KEY_RESTART_WHEN = "restart_when"
63 KEY_BOOT_ID = "bootid"
64
65
66 # Global client object
67 client = None
68
69
70 class NotMasterError(errors.GenericError):
71   """Exception raised when this host is not the master."""
72
73
74 def ShouldPause():
75   """Check whether we should pause.
76
77   """
78   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
79
80
81 def StartNodeDaemons():
82   """Start all the daemons that should be running on all nodes.
83
84   """
85   # on master or not, try to start the node daemon
86   utils.EnsureDaemon(constants.NODED)
87   # start confd as well. On non candidates it will be in disabled mode.
88   utils.EnsureDaemon(constants.CONFD)
89
90
91 def RunWatcherHooks():
92   """Run the watcher hooks.
93
94   """
95   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
96                              constants.HOOKS_NAME_WATCHER)
97   if not os.path.isdir(hooks_dir):
98     return
99
100   try:
101     results = utils.RunParts(hooks_dir)
102   except Exception, msg: # pylint: disable-msg=W0703
103     logging.critical("RunParts %s failed: %s", hooks_dir, msg)
104
105   for (relname, status, runresult) in results:
106     if status == constants.RUNPARTS_SKIP:
107       logging.debug("Watcher hook %s: skipped", relname)
108     elif status == constants.RUNPARTS_ERR:
109       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110     elif status == constants.RUNPARTS_RUN:
111       if runresult.failed:
112         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113                         relname, runresult.exit_code, runresult.output)
114       else:
115         logging.debug("Watcher hook %s: success (output: %s)", relname,
116                       runresult.output)
117
118
119 class NodeMaintenance(object):
120   """Talks to confd daemons and possible shutdown instances/drbd devices.
121
122   """
123   def __init__(self):
124     self.store_cb = confd_client.StoreResultCallback()
125     self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
126     self.confd_client = confd_client.GetConfdClient(self.filter_cb)
127
128   @staticmethod
129   def ShouldRun():
130     """Checks whether node maintenance should run.
131
132     """
133     try:
134       return ssconf.SimpleStore().GetMaintainNodeHealth()
135     except errors.ConfigurationError, err:
136       logging.error("Configuration error, not activating node maintenance: %s",
137                     err)
138       return False
139
140   @staticmethod
141   def GetRunningInstances():
142     """Compute list of hypervisor/running instances.
143
144     """
145     hyp_list = ssconf.SimpleStore().GetHypervisorList()
146     results = []
147     for hv_name in hyp_list:
148       try:
149         hv = hypervisor.GetHypervisor(hv_name)
150         ilist = hv.ListInstances()
151         results.extend([(iname, hv_name) for iname in ilist])
152       except: # pylint: disable-msg=W0702
153         logging.error("Error while listing instances for hypervisor %s",
154                       hv_name, exc_info=True)
155     return results
156
157   @staticmethod
158   def GetUsedDRBDs():
159     """Get list of used DRBD minors.
160
161     """
162     return bdev.DRBD8.GetUsedDevs().keys()
163
164   @classmethod
165   def DoMaintenance(cls, role):
166     """Maintain the instance list.
167
168     """
169     if role == constants.CONFD_NODE_ROLE_OFFLINE:
170       inst_running = cls.GetRunningInstances()
171       cls.ShutdownInstances(inst_running)
172       drbd_running = cls.GetUsedDRBDs()
173       cls.ShutdownDRBD(drbd_running)
174     else:
175       logging.debug("Not doing anything for role %s", role)
176
177   @staticmethod
178   def ShutdownInstances(inst_running):
179     """Shutdown running instances.
180
181     """
182     names_running = set([i[0] for i in inst_running])
183     if names_running:
184       logging.info("Following instances should not be running,"
185                    " shutting them down: %s", utils.CommaJoin(names_running))
186       # this dictionary will collapse duplicate instance names (only
187       # xen pvm/vhm) into a single key, which is fine
188       i2h = dict(inst_running)
189       for name in names_running:
190         hv_name = i2h[name]
191         hv = hypervisor.GetHypervisor(hv_name)
192         hv.StopInstance(None, force=True, name=name)
193
194   @staticmethod
195   def ShutdownDRBD(drbd_running):
196     """Shutdown active DRBD devices.
197
198     """
199     if drbd_running:
200       logging.info("Following DRBD minors should not be active,"
201                    " shutting them down: %s", utils.CommaJoin(drbd_running))
202       for minor in drbd_running:
203         # pylint: disable-msg=W0212
204         # using the private method as is, pending enhancements to the DRBD
205         # interface
206         bdev.DRBD8._ShutdownAll(minor)
207
208   def Exec(self):
209     """Check node status versus cluster desired state.
210
211     """
212     my_name = utils.HostInfo().name
213     req = confd_client.ConfdClientRequest(type=
214                                           constants.CONFD_REQ_NODE_ROLE_BYNAME,
215                                           query=my_name)
216     self.confd_client.SendRequest(req, async=False, coverage=-1)
217     timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
218     if not timed_out:
219       # should have a valid response
220       status, result = self.store_cb.GetResponse(req.rsalt)
221       assert status, "Missing result but received replies"
222       if not self.filter_cb.consistent[req.rsalt]:
223         logging.warning("Inconsistent replies, not doing anything")
224         return
225       self.DoMaintenance(result.server_reply.answer)
226     else:
227       logging.warning("Confd query timed out, cannot do maintenance actions")
228
229
230 class WatcherState(object):
231   """Interface to a state file recording restart attempts.
232
233   """
234   def __init__(self, statefile):
235     """Open, lock, read and parse the file.
236
237     @type statefile: file
238     @param statefile: State file object
239
240     """
241     self.statefile = statefile
242
243     try:
244       state_data = self.statefile.read()
245       if not state_data:
246         self._data = {}
247       else:
248         self._data = serializer.Load(state_data)
249     except Exception, msg: # pylint: disable-msg=W0703
250       # Ignore errors while loading the file and treat it as empty
251       self._data = {}
252       logging.warning(("Invalid state file. Using defaults."
253                        " Error message: %s"), msg)
254
255     if "instance" not in self._data:
256       self._data["instance"] = {}
257     if "node" not in self._data:
258       self._data["node"] = {}
259
260     self._orig_data = serializer.Dump(self._data)
261
262   def Save(self):
263     """Save state to file, then unlock and close it.
264
265     """
266     assert self.statefile
267
268     serialized_form = serializer.Dump(self._data)
269     if self._orig_data == serialized_form:
270       logging.debug("Data didn't change, just touching status file")
271       os.utime(constants.WATCHER_STATEFILE, None)
272       return
273
274     # We need to make sure the file is locked before renaming it, otherwise
275     # starting ganeti-watcher again at the same time will create a conflict.
276     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
277                          data=serialized_form,
278                          prewrite=utils.LockFile, close=False)
279     self.statefile = os.fdopen(fd, 'w+')
280
281   def Close(self):
282     """Unlock configuration file and close it.
283
284     """
285     assert self.statefile
286
287     # Files are automatically unlocked when closing them
288     self.statefile.close()
289     self.statefile = None
290
291   def GetNodeBootID(self, name):
292     """Returns the last boot ID of a node or None.
293
294     """
295     ndata = self._data["node"]
296
297     if name in ndata and KEY_BOOT_ID in ndata[name]:
298       return ndata[name][KEY_BOOT_ID]
299     return None
300
301   def SetNodeBootID(self, name, bootid):
302     """Sets the boot ID of a node.
303
304     """
305     assert bootid
306
307     ndata = self._data["node"]
308
309     if name not in ndata:
310       ndata[name] = {}
311
312     ndata[name][KEY_BOOT_ID] = bootid
313
314   def NumberOfRestartAttempts(self, instance):
315     """Returns number of previous restart attempts.
316
317     @type instance: L{Instance}
318     @param instance: the instance to look up
319
320     """
321     idata = self._data["instance"]
322
323     if instance.name in idata:
324       return idata[instance.name][KEY_RESTART_COUNT]
325
326     return 0
327
328   def RecordRestartAttempt(self, instance):
329     """Record a restart attempt.
330
331     @type instance: L{Instance}
332     @param instance: the instance being restarted
333
334     """
335     idata = self._data["instance"]
336
337     if instance.name not in idata:
338       inst = idata[instance.name] = {}
339     else:
340       inst = idata[instance.name]
341
342     inst[KEY_RESTART_WHEN] = time.time()
343     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
344
345   def RemoveInstance(self, instance):
346     """Update state to reflect that a machine is running.
347
348     This method removes the record for a named instance (as we only
349     track down instances).
350
351     @type instance: L{Instance}
352     @param instance: the instance to remove from books
353
354     """
355     idata = self._data["instance"]
356
357     if instance.name in idata:
358       del idata[instance.name]
359
360
361 class Instance(object):
362   """Abstraction for a Virtual Machine instance.
363
364   """
365   def __init__(self, name, state, autostart):
366     self.name = name
367     self.state = state
368     self.autostart = autostart
369
370   def Restart(self):
371     """Encapsulates the start of an instance.
372
373     """
374     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
375     cli.SubmitOpCode(op, cl=client)
376
377   def ActivateDisks(self):
378     """Encapsulates the activation of all disks of an instance.
379
380     """
381     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
382     cli.SubmitOpCode(op, cl=client)
383
384
385 def GetClusterData():
386   """Get a list of instances on this cluster.
387
388   """
389   op1_fields = ["name", "status", "admin_state", "snodes"]
390   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
391                                  use_locking=True)
392   op2_fields = ["name", "bootid", "offline"]
393   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
394                              use_locking=True)
395
396   job_id = client.SubmitJob([op1, op2])
397
398   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
399
400   logging.debug("Got data from cluster, writing instance status file")
401
402   result = all_results[0]
403   smap = {}
404
405   instances = {}
406
407   # write the upfile
408   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
409   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
410
411   for fields in result:
412     (name, status, autostart, snodes) = fields
413
414     # update the secondary node map
415     for node in snodes:
416       if node not in smap:
417         smap[node] = []
418       smap[node].append(name)
419
420     instances[name] = Instance(name, status, autostart)
421
422   nodes =  dict([(name, (bootid, offline))
423                  for name, bootid, offline in all_results[1]])
424
425   client.ArchiveJob(job_id)
426
427   return instances, nodes, smap
428
429
430 class Watcher(object):
431   """Encapsulate the logic for restarting erroneously halted virtual machines.
432
433   The calling program should periodically instantiate me and call Run().
434   This will traverse the list of instances, and make up to MAXTRIES attempts
435   to restart machines that are down.
436
437   """
438   def __init__(self, opts, notepad):
439     self.notepad = notepad
440     master = client.QueryConfigValues(["master_node"])[0]
441     if master != utils.HostInfo().name:
442       raise NotMasterError("This is not the master node")
443     # first archive old jobs
444     self.ArchiveJobs(opts.job_age)
445     # and only then submit new ones
446     self.instances, self.bootids, self.smap = GetClusterData()
447     self.started_instances = set()
448     self.opts = opts
449
450   def Run(self):
451     """Watcher run sequence.
452
453     """
454     notepad = self.notepad
455     self.CheckInstances(notepad)
456     self.CheckDisks(notepad)
457     self.VerifyDisks()
458
459   @staticmethod
460   def ArchiveJobs(age):
461     """Archive old jobs.
462
463     """
464     arch_count, left_count = client.AutoArchiveJobs(age)
465     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
466
467   def CheckDisks(self, notepad):
468     """Check all nodes for restarted ones.
469
470     """
471     check_nodes = []
472     for name, (new_id, offline) in self.bootids.iteritems():
473       old = notepad.GetNodeBootID(name)
474       if new_id is None:
475         # Bad node, not returning a boot id
476         if not offline:
477           logging.debug("Node %s missing boot id, skipping secondary checks",
478                         name)
479         continue
480       if old != new_id:
481         # Node's boot ID has changed, proably through a reboot.
482         check_nodes.append(name)
483
484     if check_nodes:
485       # Activate disks for all instances with any of the checked nodes as a
486       # secondary node.
487       for node in check_nodes:
488         if node not in self.smap:
489           continue
490         for instance_name in self.smap[node]:
491           instance = self.instances[instance_name]
492           if not instance.autostart:
493             logging.info(("Skipping disk activation for non-autostart"
494                           " instance %s"), instance.name)
495             continue
496           if instance.name in self.started_instances:
497             # we already tried to start the instance, which should have
498             # activated its drives (if they can be at all)
499             continue
500           try:
501             logging.info("Activating disks for instance %s", instance.name)
502             instance.ActivateDisks()
503           except Exception: # pylint: disable-msg=W0703
504             logging.exception("Error while activating disks for instance %s",
505                               instance.name)
506
507       # Keep changed boot IDs
508       for name in check_nodes:
509         notepad.SetNodeBootID(name, self.bootids[name][0])
510
511   def CheckInstances(self, notepad):
512     """Make a pass over the list of instances, restarting downed ones.
513
514     """
515     for instance in self.instances.values():
516       if instance.state in BAD_STATES:
517         n = notepad.NumberOfRestartAttempts(instance)
518
519         if n > MAXTRIES:
520           # stay quiet.
521           continue
522         elif n < MAXTRIES:
523           last = " (Attempt #%d)" % (n + 1)
524         else:
525           notepad.RecordRestartAttempt(instance)
526           logging.error("Could not restart %s after %d attempts, giving up",
527                         instance.name, MAXTRIES)
528           continue
529         try:
530           logging.info("Restarting %s%s",
531                         instance.name, last)
532           instance.Restart()
533           self.started_instances.add(instance.name)
534         except Exception: # pylint: disable-msg=W0703
535           logging.exception("Error while restarting instance %s",
536                             instance.name)
537
538         notepad.RecordRestartAttempt(instance)
539       elif instance.state in HELPLESS_STATES:
540         if notepad.NumberOfRestartAttempts(instance):
541           notepad.RemoveInstance(instance)
542       else:
543         if notepad.NumberOfRestartAttempts(instance):
544           notepad.RemoveInstance(instance)
545           logging.info("Restart of %s succeeded", instance.name)
546
547   @staticmethod
548   def VerifyDisks():
549     """Run gnt-cluster verify-disks.
550
551     """
552     op = opcodes.OpVerifyDisks()
553     job_id = client.SubmitJob([op])
554     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
555     client.ArchiveJob(job_id)
556     if not isinstance(result, (tuple, list)):
557       logging.error("Can't get a valid result from verify-disks")
558       return
559     offline_disk_instances = result[2]
560     if not offline_disk_instances:
561       # nothing to do
562       return
563     logging.debug("Will activate disks for instances %s",
564                   utils.CommaJoin(offline_disk_instances))
565     # we submit only one job, and wait for it. not optimal, but spams
566     # less the job queue
567     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
568            for name in offline_disk_instances]
569     job_id = cli.SendJob(job, cl=client)
570
571     try:
572       cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
573     except Exception: # pylint: disable-msg=W0703
574       logging.exception("Error while activating disks")
575
576
577 def OpenStateFile(path):
578   """Opens the state file and acquires a lock on it.
579
580   @type path: string
581   @param path: Path to state file
582
583   """
584   # The two-step dance below is necessary to allow both opening existing
585   # file read/write and creating if not existing. Vanilla open will truncate
586   # an existing file -or- allow creating if not existing.
587   statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
588
589   # Try to acquire lock on state file. If this fails, another watcher instance
590   # might already be running or another program is temporarily blocking the
591   # watcher from running.
592   try:
593     utils.LockFile(statefile_fd)
594   except errors.LockError, err:
595     logging.error("Can't acquire lock on state file %s: %s", path, err)
596     return None
597
598   return os.fdopen(statefile_fd, "w+")
599
600
601 def IsRapiResponding(hostname):
602   """Connects to RAPI port and does a simple test.
603
604   Connects to RAPI port of hostname and does a simple test. At this time, the
605   test is GetVersion.
606
607   @type hostname: string
608   @param hostname: hostname of the node to connect to.
609   @rtype: bool
610   @return: Whether RAPI is working properly
611
612   """
613   ssl_config = rapi.client.CertAuthorityVerify(constants.RAPI_CERT_FILE)
614   rapi_client = \
615     rapi.client.GanetiRapiClient(hostname,
616                                  config_ssl_verification=ssl_config)
617   try:
618     master_version = rapi_client.GetVersion()
619   except rapi.client.CertificateError, err:
620     logging.warning("RAPI Error: CertificateError (%s)", err)
621     return False
622   except rapi.client.GanetiApiError, err:
623     logging.warning("RAPI Error: GanetiApiError (%s)", err)
624     return False
625   logging.debug("RAPI Result: master_version is %s", master_version)
626   return master_version == constants.RAPI_VERSION
627
628
629 def ParseOptions():
630   """Parse the command line options.
631
632   @return: (options, args) as from OptionParser.parse_args()
633
634   """
635   parser = OptionParser(description="Ganeti cluster watcher",
636                         usage="%prog [-d]",
637                         version="%%prog (ganeti) %s" %
638                         constants.RELEASE_VERSION)
639
640   parser.add_option(cli.DEBUG_OPT)
641   parser.add_option("-A", "--job-age", dest="job_age",
642                     help="Autoarchive jobs older than this age (default"
643                     " 6 hours)", default=6*3600)
644   options, args = parser.parse_args()
645   options.job_age = cli.ParseTimespec(options.job_age)
646   return options, args
647
648
649 def main():
650   """Main function.
651
652   """
653   global client # pylint: disable-msg=W0603
654
655   options, args = ParseOptions()
656
657   if args: # watcher doesn't take any arguments
658     print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
659     sys.exit(constants.EXIT_FAILURE)
660
661   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
662                      stderr_logging=options.debug)
663
664   if ShouldPause():
665     logging.debug("Pause has been set, exiting")
666     sys.exit(constants.EXIT_SUCCESS)
667
668   statefile = OpenStateFile(constants.WATCHER_STATEFILE)
669   if not statefile:
670     sys.exit(constants.EXIT_FAILURE)
671
672   update_file = False
673   try:
674     StartNodeDaemons()
675     RunWatcherHooks()
676     # run node maintenance in all cases, even if master, so that old
677     # masters can be properly cleaned up too
678     if NodeMaintenance.ShouldRun():
679       NodeMaintenance().Exec()
680
681     notepad = WatcherState(statefile)
682     try:
683       try:
684         client = cli.GetClient()
685       except errors.OpPrereqError:
686         # this is, from cli.GetClient, a not-master case
687         logging.debug("Not on master, exiting")
688         update_file = True
689         sys.exit(constants.EXIT_SUCCESS)
690       except luxi.NoMasterError, err:
691         logging.warning("Master seems to be down (%s), trying to restart",
692                         str(err))
693         if not utils.EnsureDaemon(constants.MASTERD):
694           logging.critical("Can't start the master, exiting")
695           sys.exit(constants.EXIT_FAILURE)
696         # else retry the connection
697         client = cli.GetClient()
698
699       # we are on master now
700       utils.EnsureDaemon(constants.RAPI)
701
702       # If RAPI isn't responding to queries, try one restart.
703       logging.debug("Attempting to talk with RAPI.")
704       if not IsRapiResponding(constants.LOCALHOST_IP_ADDRESS):
705         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
706                         " Restarting Ganeti RAPI.")
707         utils.StopDaemon(constants.RAPI)
708         utils.EnsureDaemon(constants.RAPI)
709         logging.debug("Second attempt to talk with RAPI")
710         if not IsRapiResponding(constants.LOCALHOST_IP_ADDRESS):
711           logging.fatal("RAPI is not responding. Please investigate.")
712       logging.debug("Successfully talked to RAPI.")
713
714       try:
715         watcher = Watcher(options, notepad)
716       except errors.ConfigurationError:
717         # Just exit if there's no configuration
718         update_file = True
719         sys.exit(constants.EXIT_SUCCESS)
720
721       watcher.Run()
722       update_file = True
723
724     finally:
725       if update_file:
726         notepad.Save()
727       else:
728         logging.debug("Not updating status file due to failure")
729   except SystemExit:
730     raise
731   except NotMasterError:
732     logging.debug("Not master, exiting")
733     sys.exit(constants.EXIT_NOTMASTER)
734   except errors.ResolverError, err:
735     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
736     sys.exit(constants.EXIT_NODESETUP_ERROR)
737   except errors.JobQueueFull:
738     logging.error("Job queue is full, can't query cluster state")
739   except errors.JobQueueDrainError:
740     logging.error("Job queue is drained, can't maintain cluster state")
741   except Exception, err:
742     logging.exception(str(err))
743     sys.exit(constants.EXIT_FAILURE)
744
745
746 if __name__ == '__main__':
747   main()