confd: avoid spamming the logfile
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 import errno
35 from optparse import OptionParser
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
44
45
46 MAXTRIES = 5
47 BAD_STATES = ['ERROR_down']
48 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49 NOTICE = 'NOTICE'
50 ERROR = 'ERROR'
51 KEY_RESTART_COUNT = "restart_count"
52 KEY_RESTART_WHEN = "restart_when"
53 KEY_BOOT_ID = "bootid"
54
55
56 # Global client object
57 client = None
58
59
60 class NotMasterError(errors.GenericError):
61   """Exception raised when this host is not the master."""
62
63
64 def Indent(s, prefix='| '):
65   """Indent a piece of text with a given prefix before each line.
66
67   @param s: the string to indent
68   @param prefix: the string to prepend each line
69
70   """
71   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72
73
74 def ShouldPause():
75   """Check whether we should pause.
76
77   """
78   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
79
80
81 def StartMaster():
82   """Try to start the master daemon.
83
84   """
85   result = utils.RunCmd(['ganeti-masterd'])
86   if result.failed:
87     logging.error("Can't start the master daemon: output '%s'", result.output)
88   return not result.failed
89
90
91 def EnsureDaemon(daemon):
92   """Check for and start daemon if not alive.
93
94   """
95   pidfile = utils.DaemonPidFileName(daemon)
96   pid = utils.ReadPidFile(pidfile)
97   if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
98     logging.debug("Daemon '%s' not alive, trying to restart", daemon)
99     result = utils.RunCmd([daemon])
100     if not result:
101       logging.error("Can't start daemon '%s', failure %s, output: %s",
102                     daemon, result.fail_reason, result.output)
103
104
105 class WatcherState(object):
106   """Interface to a state file recording restart attempts.
107
108   """
109   def __init__(self):
110     """Open, lock, read and parse the file.
111
112     Raises exception on lock contention.
113
114     """
115     # The two-step dance below is necessary to allow both opening existing
116     # file read/write and creating if not existing.  Vanilla open will truncate
117     # an existing file -or- allow creating if not existing.
118     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
119     self.statefile = os.fdopen(fd, 'w+')
120
121     utils.LockFile(self.statefile.fileno())
122
123     try:
124       state_data = self.statefile.read()
125       if not state_data:
126         self._data = {}
127       else:
128         self._data = serializer.Load(state_data)
129     except Exception, msg:
130       # Ignore errors while loading the file and treat it as empty
131       self._data = {}
132       logging.warning(("Invalid state file. Using defaults."
133                        " Error message: %s"), msg)
134
135     if "instance" not in self._data:
136       self._data["instance"] = {}
137     if "node" not in self._data:
138       self._data["node"] = {}
139
140     self._orig_data = serializer.Dump(self._data)
141
142   def Save(self):
143     """Save state to file, then unlock and close it.
144
145     """
146     assert self.statefile
147
148     serialized_form = serializer.Dump(self._data)
149     if self._orig_data == serialized_form:
150       logging.debug("Data didn't change, just touching status file")
151       os.utime(constants.WATCHER_STATEFILE, None)
152       return
153
154     # We need to make sure the file is locked before renaming it, otherwise
155     # starting ganeti-watcher again at the same time will create a conflict.
156     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
157                          data=serialized_form,
158                          prewrite=utils.LockFile, close=False)
159     self.statefile = os.fdopen(fd, 'w+')
160
161   def Close(self):
162     """Unlock configuration file and close it.
163
164     """
165     assert self.statefile
166
167     # Files are automatically unlocked when closing them
168     self.statefile.close()
169     self.statefile = None
170
171   def GetNodeBootID(self, name):
172     """Returns the last boot ID of a node or None.
173
174     """
175     ndata = self._data["node"]
176
177     if name in ndata and KEY_BOOT_ID in ndata[name]:
178       return ndata[name][KEY_BOOT_ID]
179     return None
180
181   def SetNodeBootID(self, name, bootid):
182     """Sets the boot ID of a node.
183
184     """
185     assert bootid
186
187     ndata = self._data["node"]
188
189     if name not in ndata:
190       ndata[name] = {}
191
192     ndata[name][KEY_BOOT_ID] = bootid
193
194   def NumberOfRestartAttempts(self, instance):
195     """Returns number of previous restart attempts.
196
197     @type instance: L{Instance}
198     @param instance: the instance to look up
199
200     """
201     idata = self._data["instance"]
202
203     if instance.name in idata:
204       return idata[instance.name][KEY_RESTART_COUNT]
205
206     return 0
207
208   def RecordRestartAttempt(self, instance):
209     """Record a restart attempt.
210
211     @type instance: L{Instance}
212     @param instance: the instance being restarted
213
214     """
215     idata = self._data["instance"]
216
217     if instance.name not in idata:
218       inst = idata[instance.name] = {}
219     else:
220       inst = idata[instance.name]
221
222     inst[KEY_RESTART_WHEN] = time.time()
223     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
224
225   def RemoveInstance(self, instance):
226     """Update state to reflect that a machine is running.
227
228     This method removes the record for a named instance (as we only
229     track down instances).
230
231     @type instance: L{Instance}
232     @param instance: the instance to remove from books
233
234     """
235     idata = self._data["instance"]
236
237     if instance.name in idata:
238       del idata[instance.name]
239
240
241 class Instance(object):
242   """Abstraction for a Virtual Machine instance.
243
244   """
245   def __init__(self, name, state, autostart):
246     self.name = name
247     self.state = state
248     self.autostart = autostart
249
250   def Restart(self):
251     """Encapsulates the start of an instance.
252
253     """
254     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
255     cli.SubmitOpCode(op, cl=client)
256
257   def ActivateDisks(self):
258     """Encapsulates the activation of all disks of an instance.
259
260     """
261     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
262     cli.SubmitOpCode(op, cl=client)
263
264
265 def GetClusterData():
266   """Get a list of instances on this cluster.
267
268   """
269   op1_fields = ["name", "status", "admin_state", "snodes"]
270   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
271                                  use_locking=True)
272   op2_fields = ["name", "bootid", "offline"]
273   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
274                              use_locking=True)
275
276   job_id = client.SubmitJob([op1, op2])
277
278   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
279
280   logging.debug("Got data from cluster, writing instance status file")
281
282   result = all_results[0]
283   smap = {}
284
285   instances = {}
286
287   # write the upfile
288   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
289   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
290
291   for fields in result:
292     (name, status, autostart, snodes) = fields
293
294     # update the secondary node map
295     for node in snodes:
296       if node not in smap:
297         smap[node] = []
298       smap[node].append(name)
299
300     instances[name] = Instance(name, status, autostart)
301
302   nodes =  dict([(name, (bootid, offline))
303                  for name, bootid, offline in all_results[1]])
304
305   client.ArchiveJob(job_id)
306
307   return instances, nodes, smap
308
309
310 class Watcher(object):
311   """Encapsulate the logic for restarting erronously halted virtual machines.
312
313   The calling program should periodically instantiate me and call Run().
314   This will traverse the list of instances, and make up to MAXTRIES attempts
315   to restart machines that are down.
316
317   """
318   def __init__(self, opts, notepad):
319     self.notepad = notepad
320     master = client.QueryConfigValues(["master_node"])[0]
321     if master != utils.HostInfo().name:
322       raise NotMasterError("This is not the master node")
323     # first archive old jobs
324     self.ArchiveJobs(opts.job_age)
325     # and only then submit new ones
326     self.instances, self.bootids, self.smap = GetClusterData()
327     self.started_instances = set()
328     self.opts = opts
329
330   def Run(self):
331     """Watcher run sequence.
332
333     """
334     notepad = self.notepad
335     self.CheckInstances(notepad)
336     self.CheckDisks(notepad)
337     self.VerifyDisks()
338
339   @staticmethod
340   def ArchiveJobs(age):
341     """Archive old jobs.
342
343     """
344     arch_count, left_count = client.AutoArchiveJobs(age)
345     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
346
347   def CheckDisks(self, notepad):
348     """Check all nodes for restarted ones.
349
350     """
351     check_nodes = []
352     for name, (new_id, offline) in self.bootids.iteritems():
353       old = notepad.GetNodeBootID(name)
354       if new_id is None:
355         # Bad node, not returning a boot id
356         if not offline:
357           logging.debug("Node %s missing boot id, skipping secondary checks",
358                         name)
359         continue
360       if old != new_id:
361         # Node's boot ID has changed, proably through a reboot.
362         check_nodes.append(name)
363
364     if check_nodes:
365       # Activate disks for all instances with any of the checked nodes as a
366       # secondary node.
367       for node in check_nodes:
368         if node not in self.smap:
369           continue
370         for instance_name in self.smap[node]:
371           instance = self.instances[instance_name]
372           if not instance.autostart:
373             logging.info(("Skipping disk activation for non-autostart"
374                           " instance %s"), instance.name)
375             continue
376           if instance.name in self.started_instances:
377             # we already tried to start the instance, which should have
378             # activated its drives (if they can be at all)
379             continue
380           try:
381             logging.info("Activating disks for instance %s", instance.name)
382             instance.ActivateDisks()
383           except Exception:
384             logging.exception("Error while activating disks for instance %s",
385                               instance.name)
386
387       # Keep changed boot IDs
388       for name in check_nodes:
389         notepad.SetNodeBootID(name, self.bootids[name][0])
390
391   def CheckInstances(self, notepad):
392     """Make a pass over the list of instances, restarting downed ones.
393
394     """
395     for instance in self.instances.values():
396       if instance.state in BAD_STATES:
397         n = notepad.NumberOfRestartAttempts(instance)
398
399         if n > MAXTRIES:
400           # stay quiet.
401           continue
402         elif n < MAXTRIES:
403           last = " (Attempt #%d)" % (n + 1)
404         else:
405           notepad.RecordRestartAttempt(instance)
406           logging.error("Could not restart %s after %d attempts, giving up",
407                         instance.name, MAXTRIES)
408           continue
409         try:
410           logging.info("Restarting %s%s",
411                         instance.name, last)
412           instance.Restart()
413           self.started_instances.add(instance.name)
414         except Exception:
415           logging.exception("Error while restarting instance %s",
416                             instance.name)
417
418         notepad.RecordRestartAttempt(instance)
419       elif instance.state in HELPLESS_STATES:
420         if notepad.NumberOfRestartAttempts(instance):
421           notepad.RemoveInstance(instance)
422       else:
423         if notepad.NumberOfRestartAttempts(instance):
424           notepad.RemoveInstance(instance)
425           logging.info("Restart of %s succeeded", instance.name)
426
427   @staticmethod
428   def VerifyDisks():
429     """Run gnt-cluster verify-disks.
430
431     """
432     op = opcodes.OpVerifyDisks()
433     job_id = client.SubmitJob([op])
434     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
435     client.ArchiveJob(job_id)
436     if not isinstance(result, (tuple, list)):
437       logging.error("Can't get a valid result from verify-disks")
438       return
439     offline_disk_instances = result[2]
440     if not offline_disk_instances:
441       # nothing to do
442       return
443     logging.debug("Will activate disks for instances %s",
444                   ", ".join(offline_disk_instances))
445     # we submit only one job, and wait for it. not optimal, but spams
446     # less the job queue
447     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
448            for name in offline_disk_instances]
449     job_id = cli.SendJob(job, cl=client)
450
451     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
452
453
454 def ParseOptions():
455   """Parse the command line options.
456
457   @return: (options, args) as from OptionParser.parse_args()
458
459   """
460   parser = OptionParser(description="Ganeti cluster watcher",
461                         usage="%prog [-d]",
462                         version="%%prog (ganeti) %s" %
463                         constants.RELEASE_VERSION)
464
465   parser.add_option("-d", "--debug", dest="debug",
466                     help="Write all messages to stderr",
467                     default=False, action="store_true")
468   parser.add_option("-A", "--job-age", dest="job_age",
469                     help="Autoarchive jobs older than this age (default"
470                     " 6 hours)", default=6*3600)
471   options, args = parser.parse_args()
472   options.job_age = cli.ParseTimespec(options.job_age)
473   return options, args
474
475
476 def main():
477   """Main function.
478
479   """
480   global client
481
482   options, args = ParseOptions()
483
484   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
485                      stderr_logging=options.debug)
486
487   if ShouldPause():
488     logging.debug("Pause has been set, exiting")
489     sys.exit(constants.EXIT_SUCCESS)
490
491   update_file = False
492   try:
493     # on master or not, try to start the node dameon
494     EnsureDaemon(constants.NODED)
495
496     notepad = WatcherState()
497     try:
498       try:
499         client = cli.GetClient()
500       except errors.OpPrereqError:
501         # this is, from cli.GetClient, a not-master case
502         logging.debug("Not on master, exiting")
503         update_file = True
504         sys.exit(constants.EXIT_SUCCESS)
505       except luxi.NoMasterError, err:
506         logging.warning("Master seems to be down (%s), trying to restart",
507                         str(err))
508         if not StartMaster():
509           logging.critical("Can't start the master, exiting")
510           sys.exit(constants.EXIT_FAILURE)
511         # else retry the connection
512         client = cli.GetClient()
513
514       # we are on master now
515       EnsureDaemon(constants.RAPI)
516
517       try:
518         watcher = Watcher(options, notepad)
519       except errors.ConfigurationError:
520         # Just exit if there's no configuration
521         update_file = True
522         sys.exit(constants.EXIT_SUCCESS)
523
524       watcher.Run()
525       update_file = True
526
527     finally:
528       if update_file:
529         notepad.Save()
530       else:
531         logging.debug("Not updating status file due to failure")
532   except SystemExit:
533     raise
534   except NotMasterError:
535     logging.debug("Not master, exiting")
536     sys.exit(constants.EXIT_NOTMASTER)
537   except errors.ResolverError, err:
538     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
539     sys.exit(constants.EXIT_NODESETUP_ERROR)
540   except errors.JobQueueFull:
541     logging.error("Job queue is full, can't query cluster state")
542   except errors.JobQueueDrainError:
543     logging.error("Job queue is drained, can't maintain cluster state")
544   except Exception, err:
545     logging.error(str(err), exc_info=True)
546     sys.exit(constants.EXIT_FAILURE)
547
548
549 if __name__ == '__main__':
550   main()