watcher: Split node maintenance into separate module
[ganeti-local] / lib / watcher / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erroneously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import os.path
32 import sys
33 import time
34 import logging
35 from optparse import OptionParser
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import compat
40 from ganeti import serializer
41 from ganeti import errors
42 from ganeti import opcodes
43 from ganeti import cli
44 from ganeti import luxi
45 from ganeti import rapi
46 from ganeti import netutils
47
48 import ganeti.rapi.client # pylint: disable-msg=W0611
49 import ganeti.watcher.nodemaint # pylint: disable-msg=W0611
50
51
52 MAXTRIES = 5
53 # Delete any record that is older than 8 hours; this value is based on
54 # the fact that the current retry counter is 5, and watcher runs every
55 # 5 minutes, so it takes around half an hour to exceed the retry
56 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
57 RETRY_EXPIRATION = 8 * 3600
58 BAD_STATES = [constants.INSTST_ERRORDOWN]
59 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
60 NOTICE = 'NOTICE'
61 ERROR = 'ERROR'
62 KEY_RESTART_COUNT = "restart_count"
63 KEY_RESTART_WHEN = "restart_when"
64 KEY_BOOT_ID = "bootid"
65
66
67 # Global LUXI client object
68 client = None
69
70
71 class NotMasterError(errors.GenericError):
72   """Exception raised when this host is not the master."""
73
74
75 def ShouldPause():
76   """Check whether we should pause.
77
78   """
79   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
80
81
82 def StartNodeDaemons():
83   """Start all the daemons that should be running on all nodes.
84
85   """
86   # on master or not, try to start the node daemon
87   utils.EnsureDaemon(constants.NODED)
88   # start confd as well. On non candidates it will be in disabled mode.
89   utils.EnsureDaemon(constants.CONFD)
90
91
92 def RunWatcherHooks():
93   """Run the watcher hooks.
94
95   """
96   hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
97                              constants.HOOKS_NAME_WATCHER)
98   if not os.path.isdir(hooks_dir):
99     return
100
101   try:
102     results = utils.RunParts(hooks_dir)
103   except Exception: # pylint: disable-msg=W0703
104     logging.exception("RunParts %s failed: %s", hooks_dir)
105     return
106
107   for (relname, status, runresult) in results:
108     if status == constants.RUNPARTS_SKIP:
109       logging.debug("Watcher hook %s: skipped", relname)
110     elif status == constants.RUNPARTS_ERR:
111       logging.warning("Watcher hook %s: error (%s)", relname, runresult)
112     elif status == constants.RUNPARTS_RUN:
113       if runresult.failed:
114         logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
115                         relname, runresult.exit_code, runresult.output)
116       else:
117         logging.debug("Watcher hook %s: success (output: %s)", relname,
118                       runresult.output)
119
120
121 class WatcherState(object):
122   """Interface to a state file recording restart attempts.
123
124   """
125   def __init__(self, statefile):
126     """Open, lock, read and parse the file.
127
128     @type statefile: file
129     @param statefile: State file object
130
131     """
132     self.statefile = statefile
133
134     try:
135       state_data = self.statefile.read()
136       if not state_data:
137         self._data = {}
138       else:
139         self._data = serializer.Load(state_data)
140     except Exception, msg: # pylint: disable-msg=W0703
141       # Ignore errors while loading the file and treat it as empty
142       self._data = {}
143       logging.warning(("Invalid state file. Using defaults."
144                        " Error message: %s"), msg)
145
146     if "instance" not in self._data:
147       self._data["instance"] = {}
148     if "node" not in self._data:
149       self._data["node"] = {}
150
151     self._orig_data = serializer.Dump(self._data)
152
153   def Save(self):
154     """Save state to file, then unlock and close it.
155
156     """
157     assert self.statefile
158
159     serialized_form = serializer.Dump(self._data)
160     if self._orig_data == serialized_form:
161       logging.debug("Data didn't change, just touching status file")
162       os.utime(constants.WATCHER_STATEFILE, None)
163       return
164
165     # We need to make sure the file is locked before renaming it, otherwise
166     # starting ganeti-watcher again at the same time will create a conflict.
167     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
168                          data=serialized_form,
169                          prewrite=utils.LockFile, close=False)
170     self.statefile = os.fdopen(fd, 'w+')
171
172   def Close(self):
173     """Unlock configuration file and close it.
174
175     """
176     assert self.statefile
177
178     # Files are automatically unlocked when closing them
179     self.statefile.close()
180     self.statefile = None
181
182   def GetNodeBootID(self, name):
183     """Returns the last boot ID of a node or None.
184
185     """
186     ndata = self._data["node"]
187
188     if name in ndata and KEY_BOOT_ID in ndata[name]:
189       return ndata[name][KEY_BOOT_ID]
190     return None
191
192   def SetNodeBootID(self, name, bootid):
193     """Sets the boot ID of a node.
194
195     """
196     assert bootid
197
198     ndata = self._data["node"]
199
200     if name not in ndata:
201       ndata[name] = {}
202
203     ndata[name][KEY_BOOT_ID] = bootid
204
205   def NumberOfRestartAttempts(self, instance):
206     """Returns number of previous restart attempts.
207
208     @type instance: L{Instance}
209     @param instance: the instance to look up
210
211     """
212     idata = self._data["instance"]
213
214     if instance.name in idata:
215       return idata[instance.name][KEY_RESTART_COUNT]
216
217     return 0
218
219   def MaintainInstanceList(self, instances):
220     """Perform maintenance on the recorded instances.
221
222     @type instances: list of string
223     @param instances: the list of currently existing instances
224
225     """
226     idict = self._data["instance"]
227     # First, delete obsolete instances
228     obsolete_instances = set(idict).difference(instances)
229     for inst in obsolete_instances:
230       logging.debug("Forgetting obsolete instance %s", inst)
231       del idict[inst]
232
233     # Second, delete expired records
234     earliest = time.time() - RETRY_EXPIRATION
235     expired_instances = [i for i in idict
236                          if idict[i][KEY_RESTART_WHEN] < earliest]
237     for inst in expired_instances:
238       logging.debug("Expiring record for instance %s", inst)
239       del idict[inst]
240
241   def RecordRestartAttempt(self, instance):
242     """Record a restart attempt.
243
244     @type instance: L{Instance}
245     @param instance: the instance being restarted
246
247     """
248     idata = self._data["instance"]
249
250     if instance.name not in idata:
251       inst = idata[instance.name] = {}
252     else:
253       inst = idata[instance.name]
254
255     inst[KEY_RESTART_WHEN] = time.time()
256     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
257
258   def RemoveInstance(self, instance):
259     """Update state to reflect that a machine is running.
260
261     This method removes the record for a named instance (as we only
262     track down instances).
263
264     @type instance: L{Instance}
265     @param instance: the instance to remove from books
266
267     """
268     idata = self._data["instance"]
269
270     if instance.name in idata:
271       del idata[instance.name]
272
273
274 class Instance(object):
275   """Abstraction for a Virtual Machine instance.
276
277   """
278   def __init__(self, name, state, autostart, snodes):
279     self.name = name
280     self.state = state
281     self.autostart = autostart
282     self.snodes = snodes
283
284   def Restart(self):
285     """Encapsulates the start of an instance.
286
287     """
288     op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
289     cli.SubmitOpCode(op, cl=client)
290
291   def ActivateDisks(self):
292     """Encapsulates the activation of all disks of an instance.
293
294     """
295     op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
296     cli.SubmitOpCode(op, cl=client)
297
298
299 def GetClusterData():
300   """Get a list of instances on this cluster.
301
302   """
303   op1_fields = ["name", "status", "admin_state", "snodes"]
304   op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
305                                 use_locking=True)
306   op2_fields = ["name", "bootid", "offline"]
307   op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
308                             use_locking=True)
309
310   job_id = client.SubmitJob([op1, op2])
311
312   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
313
314   logging.debug("Got data from cluster, writing instance status file")
315
316   result = all_results[0]
317   smap = {}
318
319   instances = {}
320
321   # write the upfile
322   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
323   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
324
325   for fields in result:
326     (name, status, autostart, snodes) = fields
327
328     # update the secondary node map
329     for node in snodes:
330       if node not in smap:
331         smap[node] = []
332       smap[node].append(name)
333
334     instances[name] = Instance(name, status, autostart, snodes)
335
336   nodes =  dict([(name, (bootid, offline))
337                  for name, bootid, offline in all_results[1]])
338
339   client.ArchiveJob(job_id)
340
341   return instances, nodes, smap
342
343
344 class Watcher(object):
345   """Encapsulate the logic for restarting erroneously halted virtual machines.
346
347   The calling program should periodically instantiate me and call Run().
348   This will traverse the list of instances, and make up to MAXTRIES attempts
349   to restart machines that are down.
350
351   """
352   def __init__(self, opts, notepad):
353     self.notepad = notepad
354     master = client.QueryConfigValues(["master_node"])[0]
355     if master != netutils.Hostname.GetSysName():
356       raise NotMasterError("This is not the master node")
357     # first archive old jobs
358     self.ArchiveJobs(opts.job_age)
359     # and only then submit new ones
360     self.instances, self.bootids, self.smap = GetClusterData()
361     self.started_instances = set()
362     self.opts = opts
363
364   def Run(self):
365     """Watcher run sequence.
366
367     """
368     notepad = self.notepad
369     self.CheckInstances(notepad)
370     self.CheckDisks(notepad)
371     self.VerifyDisks()
372
373   @staticmethod
374   def ArchiveJobs(age):
375     """Archive old jobs.
376
377     """
378     arch_count, left_count = client.AutoArchiveJobs(age)
379     logging.debug("Archived %s jobs, left %s", arch_count, left_count)
380
381   def CheckDisks(self, notepad):
382     """Check all nodes for restarted ones.
383
384     """
385     check_nodes = []
386     for name, (new_id, offline) in self.bootids.iteritems():
387       old = notepad.GetNodeBootID(name)
388       if new_id is None:
389         # Bad node, not returning a boot id
390         if not offline:
391           logging.debug("Node %s missing boot id, skipping secondary checks",
392                         name)
393         continue
394       if old != new_id:
395         # Node's boot ID has changed, proably through a reboot.
396         check_nodes.append(name)
397
398     if check_nodes:
399       # Activate disks for all instances with any of the checked nodes as a
400       # secondary node.
401       for node in check_nodes:
402         if node not in self.smap:
403           continue
404         for instance_name in self.smap[node]:
405           instance = self.instances[instance_name]
406           if not instance.autostart:
407             logging.info(("Skipping disk activation for non-autostart"
408                           " instance %s"), instance.name)
409             continue
410           if instance.name in self.started_instances:
411             # we already tried to start the instance, which should have
412             # activated its drives (if they can be at all)
413             logging.debug("Skipping disk activation for instance %s, as"
414                           " it was already started", instance.name)
415             continue
416           try:
417             logging.info("Activating disks for instance %s", instance.name)
418             instance.ActivateDisks()
419           except Exception: # pylint: disable-msg=W0703
420             logging.exception("Error while activating disks for instance %s",
421                               instance.name)
422
423       # Keep changed boot IDs
424       for name in check_nodes:
425         notepad.SetNodeBootID(name, self.bootids[name][0])
426
427   def CheckInstances(self, notepad):
428     """Make a pass over the list of instances, restarting downed ones.
429
430     """
431     notepad.MaintainInstanceList(self.instances.keys())
432
433     for instance in self.instances.values():
434       if instance.state in BAD_STATES:
435         n = notepad.NumberOfRestartAttempts(instance)
436
437         if n > MAXTRIES:
438           logging.warning("Not restarting instance %s, retries exhausted",
439                           instance.name)
440           continue
441         elif n < MAXTRIES:
442           last = " (Attempt #%d)" % (n + 1)
443         else:
444           notepad.RecordRestartAttempt(instance)
445           logging.error("Could not restart %s after %d attempts, giving up",
446                         instance.name, MAXTRIES)
447           continue
448         try:
449           logging.info("Restarting %s%s", instance.name, last)
450           instance.Restart()
451           self.started_instances.add(instance.name)
452         except Exception: # pylint: disable-msg=W0703
453           logging.exception("Error while restarting instance %s",
454                             instance.name)
455
456         notepad.RecordRestartAttempt(instance)
457       elif instance.state in HELPLESS_STATES:
458         if notepad.NumberOfRestartAttempts(instance):
459           notepad.RemoveInstance(instance)
460       else:
461         if notepad.NumberOfRestartAttempts(instance):
462           notepad.RemoveInstance(instance)
463           logging.info("Restart of %s succeeded", instance.name)
464
465   def _CheckForOfflineNodes(self, instance):
466     """Checks if given instances has any secondary in offline status.
467
468     @param instance: The instance object
469     @return: True if any of the secondary is offline, False otherwise
470
471     """
472     bootids = []
473     for node in instance.snodes:
474       bootids.append(self.bootids[node])
475
476     return compat.any(offline for (_, offline) in bootids)
477
478   def VerifyDisks(self):
479     """Run gnt-cluster verify-disks.
480
481     """
482     job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
483     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
484     client.ArchiveJob(job_id)
485
486     # Keep track of submitted jobs
487     jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
488
489     archive_jobs = set()
490     for (status, job_id) in result[constants.JOB_IDS_KEY]:
491       jex.AddJobId(None, status, job_id)
492       if status:
493         archive_jobs.add(job_id)
494
495     offline_disk_instances = set()
496
497     for (status, result) in jex.GetResults():
498       if not status:
499         logging.error("Verify-disks job failed: %s", result)
500         continue
501
502       ((_, instances, _), ) = result
503
504       offline_disk_instances.update(instances)
505
506     for job_id in archive_jobs:
507       client.ArchiveJob(job_id)
508
509     if not offline_disk_instances:
510       # nothing to do
511       logging.debug("verify-disks reported no offline disks, nothing to do")
512       return
513
514     logging.debug("Will activate disks for instance(s) %s",
515                   utils.CommaJoin(offline_disk_instances))
516
517     # we submit only one job, and wait for it. not optimal, but spams
518     # less the job queue
519     job = []
520     for name in offline_disk_instances:
521       instance = self.instances[name]
522       if (instance.state in HELPLESS_STATES or
523           self._CheckForOfflineNodes(instance)):
524         logging.info("Skip instance %s because it is in helpless state or has"
525                      " one offline secondary", name)
526         continue
527       job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
528
529     if job:
530       job_id = cli.SendJob(job, cl=client)
531
532       try:
533         cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
534       except Exception: # pylint: disable-msg=W0703
535         logging.exception("Error while activating disks")
536
537
538 def OpenStateFile(path):
539   """Opens the state file and acquires a lock on it.
540
541   @type path: string
542   @param path: Path to state file
543
544   """
545   # The two-step dance below is necessary to allow both opening existing
546   # file read/write and creating if not existing. Vanilla open will truncate
547   # an existing file -or- allow creating if not existing.
548   statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
549
550   # Try to acquire lock on state file. If this fails, another watcher instance
551   # might already be running or another program is temporarily blocking the
552   # watcher from running.
553   try:
554     utils.LockFile(statefile_fd)
555   except errors.LockError, err:
556     logging.error("Can't acquire lock on state file %s: %s", path, err)
557     return None
558
559   return os.fdopen(statefile_fd, "w+")
560
561
562 def IsRapiResponding(hostname):
563   """Connects to RAPI port and does a simple test.
564
565   Connects to RAPI port of hostname and does a simple test. At this time, the
566   test is GetVersion.
567
568   @type hostname: string
569   @param hostname: hostname of the node to connect to.
570   @rtype: bool
571   @return: Whether RAPI is working properly
572
573   """
574   curl_config = rapi.client.GenericCurlConfig()
575   rapi_client = rapi.client.GanetiRapiClient(hostname,
576                                              curl_config_fn=curl_config)
577   try:
578     master_version = rapi_client.GetVersion()
579   except rapi.client.CertificateError, err:
580     logging.warning("RAPI Error: CertificateError (%s)", err)
581     return False
582   except rapi.client.GanetiApiError, err:
583     logging.warning("RAPI Error: GanetiApiError (%s)", err)
584     return False
585   logging.debug("RAPI Result: master_version is %s", master_version)
586   return master_version == constants.RAPI_VERSION
587
588
589 def ParseOptions():
590   """Parse the command line options.
591
592   @return: (options, args) as from OptionParser.parse_args()
593
594   """
595   parser = OptionParser(description="Ganeti cluster watcher",
596                         usage="%prog [-d]",
597                         version="%%prog (ganeti) %s" %
598                         constants.RELEASE_VERSION)
599
600   parser.add_option(cli.DEBUG_OPT)
601   parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
602                     help="Autoarchive jobs older than this age (default"
603                           " 6 hours)")
604   parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
605                     action="store_true", help="Ignore cluster pause setting")
606   options, args = parser.parse_args()
607   options.job_age = cli.ParseTimespec(options.job_age)
608
609   if args:
610     parser.error("No arguments expected")
611
612   return (options, args)
613
614
615 @rapi.client.UsesRapiClient
616 def Main():
617   """Main function.
618
619   """
620   global client # pylint: disable-msg=W0603
621
622   (options, _) = ParseOptions()
623
624   utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
625                      debug=options.debug, stderr_logging=options.debug)
626
627   if ShouldPause() and not options.ignore_pause:
628     logging.debug("Pause has been set, exiting")
629     return constants.EXIT_SUCCESS
630
631   statefile = OpenStateFile(constants.WATCHER_STATEFILE)
632   if not statefile:
633     return constants.EXIT_FAILURE
634
635   update_file = False
636   try:
637     StartNodeDaemons()
638     RunWatcherHooks()
639     # run node maintenance in all cases, even if master, so that old
640     # masters can be properly cleaned up too
641     if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
642       nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
643
644     notepad = WatcherState(statefile)
645     try:
646       try:
647         client = cli.GetClient()
648       except errors.OpPrereqError:
649         # this is, from cli.GetClient, a not-master case
650         logging.debug("Not on master, exiting")
651         update_file = True
652         return constants.EXIT_SUCCESS
653       except luxi.NoMasterError, err:
654         logging.warning("Master seems to be down (%s), trying to restart",
655                         str(err))
656         if not utils.EnsureDaemon(constants.MASTERD):
657           logging.critical("Can't start the master, exiting")
658           return constants.EXIT_FAILURE
659         # else retry the connection
660         client = cli.GetClient()
661
662       # we are on master now
663       utils.EnsureDaemon(constants.RAPI)
664
665       # If RAPI isn't responding to queries, try one restart.
666       logging.debug("Attempting to talk with RAPI.")
667       if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
668         logging.warning("Couldn't get answer from Ganeti RAPI daemon."
669                         " Restarting Ganeti RAPI.")
670         utils.StopDaemon(constants.RAPI)
671         utils.EnsureDaemon(constants.RAPI)
672         logging.debug("Second attempt to talk with RAPI")
673         if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
674           logging.fatal("RAPI is not responding. Please investigate.")
675       logging.debug("Successfully talked to RAPI.")
676
677       try:
678         watcher = Watcher(options, notepad)
679       except errors.ConfigurationError:
680         # Just exit if there's no configuration
681         update_file = True
682         return constants.EXIT_SUCCESS
683
684       watcher.Run()
685       update_file = True
686
687     finally:
688       if update_file:
689         notepad.Save()
690       else:
691         logging.debug("Not updating status file due to failure")
692   except SystemExit:
693     raise
694   except NotMasterError:
695     logging.debug("Not master, exiting")
696     return constants.EXIT_NOTMASTER
697   except errors.ResolverError, err:
698     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
699     return constants.EXIT_NODESETUP_ERROR
700   except errors.JobQueueFull:
701     logging.error("Job queue is full, can't query cluster state")
702   except errors.JobQueueDrainError:
703     logging.error("Job queue is drained, can't maintain cluster state")
704   except Exception, err:
705     logging.exception(str(err))
706     return constants.EXIT_FAILURE
707
708   return constants.EXIT_SUCCESS