install.rst: note about the default parameters
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 import errno
35 from optparse import OptionParser
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
44
45
46 MAXTRIES = 5
47 BAD_STATES = ['ERROR_down']
48 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49 NOTICE = 'NOTICE'
50 ERROR = 'ERROR'
51 KEY_RESTART_COUNT = "restart_count"
52 KEY_RESTART_WHEN = "restart_when"
53 KEY_BOOT_ID = "bootid"
54
55
56 # Global client object
57 client = None
58
59
60 class NotMasterError(errors.GenericError):
61   """Exception raised when this host is not the master."""
62
63
64 def Indent(s, prefix='| '):
65   """Indent a piece of text with a given prefix before each line.
66
67   @param s: the string to indent
68   @param prefix: the string to prepend each line
69
70   """
71   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72
73
74 def ShouldPause():
75   """Check whether we should pause.
76
77   """
78   return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
79
80
81 def EnsureDaemon(name):
82   """Check for and start daemon if not alive.
83
84   """
85   result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
86   if result.failed:
87     logging.error("Can't start daemon '%s', failure %s, output: %s",
88                   name, result.fail_reason, result.output)
89     return False
90
91   return True
92
93
94 class WatcherState(object):
95   """Interface to a state file recording restart attempts.
96
97   """
98   def __init__(self):
99     """Open, lock, read and parse the file.
100
101     Raises exception on lock contention.
102
103     """
104     # The two-step dance below is necessary to allow both opening existing
105     # file read/write and creating if not existing.  Vanilla open will truncate
106     # an existing file -or- allow creating if not existing.
107     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
108     self.statefile = os.fdopen(fd, 'w+')
109
110     utils.LockFile(self.statefile.fileno())
111
112     try:
113       state_data = self.statefile.read()
114       if not state_data:
115         self._data = {}
116       else:
117         self._data = serializer.Load(state_data)
118     except Exception, msg:
119       # Ignore errors while loading the file and treat it as empty
120       self._data = {}
121       logging.warning(("Invalid state file. Using defaults."
122                        " Error message: %s"), msg)
123
124     if "instance" not in self._data:
125       self._data["instance"] = {}
126     if "node" not in self._data:
127       self._data["node"] = {}
128
129     self._orig_data = serializer.Dump(self._data)
130
131   def Save(self):
132     """Save state to file, then unlock and close it.
133
134     """
135     assert self.statefile
136
137     serialized_form = serializer.Dump(self._data)
138     if self._orig_data == serialized_form:
139       logging.debug("Data didn't change, just touching status file")
140       os.utime(constants.WATCHER_STATEFILE, None)
141       return
142
143     # We need to make sure the file is locked before renaming it, otherwise
144     # starting ganeti-watcher again at the same time will create a conflict.
145     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
146                          data=serialized_form,
147                          prewrite=utils.LockFile, close=False)
148     self.statefile = os.fdopen(fd, 'w+')
149
150   def Close(self):
151     """Unlock configuration file and close it.
152
153     """
154     assert self.statefile
155
156     # Files are automatically unlocked when closing them
157     self.statefile.close()
158     self.statefile = None
159
160   def GetNodeBootID(self, name):
161     """Returns the last boot ID of a node or None.
162
163     """
164     ndata = self._data["node"]
165
166     if name in ndata and KEY_BOOT_ID in ndata[name]:
167       return ndata[name][KEY_BOOT_ID]
168     return None
169
170   def SetNodeBootID(self, name, bootid):
171     """Sets the boot ID of a node.
172
173     """
174     assert bootid
175
176     ndata = self._data["node"]
177
178     if name not in ndata:
179       ndata[name] = {}
180
181     ndata[name][KEY_BOOT_ID] = bootid
182
183   def NumberOfRestartAttempts(self, instance):
184     """Returns number of previous restart attempts.
185
186     @type instance: L{Instance}
187     @param instance: the instance to look up
188
189     """
190     idata = self._data["instance"]
191
192     if instance.name in idata:
193       return idata[instance.name][KEY_RESTART_COUNT]
194
195     return 0
196
197   def RecordRestartAttempt(self, instance):
198     """Record a restart attempt.
199
200     @type instance: L{Instance}
201     @param instance: the instance being restarted
202
203     """
204     idata = self._data["instance"]
205
206     if instance.name not in idata:
207       inst = idata[instance.name] = {}
208     else:
209       inst = idata[instance.name]
210
211     inst[KEY_RESTART_WHEN] = time.time()
212     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
213
214   def RemoveInstance(self, instance):
215     """Update state to reflect that a machine is running.
216
217     This method removes the record for a named instance (as we only
218     track down instances).
219
220     @type instance: L{Instance}
221     @param instance: the instance to remove from books
222
223     """
224     idata = self._data["instance"]
225
226     if instance.name in idata:
227       del idata[instance.name]
228
229
230 class Instance(object):
231   """Abstraction for a Virtual Machine instance.
232
233   """
234   def __init__(self, name, state, autostart):
235     self.name = name
236     self.state = state
237     self.autostart = autostart
238
239   def Restart(self):
240     """Encapsulates the start of an instance.
241
242     """
243     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
244     cli.SubmitOpCode(op, cl=client)
245
246   def ActivateDisks(self):
247     """Encapsulates the activation of all disks of an instance.
248
249     """
250     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
251     cli.SubmitOpCode(op, cl=client)
252
253
254 def GetClusterData():
255   """Get a list of instances on this cluster.
256
257   """
258   op1_fields = ["name", "status", "admin_state", "snodes"]
259   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
260                                  use_locking=True)
261   op2_fields = ["name", "bootid", "offline"]
262   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
263                              use_locking=True)
264
265   job_id = client.SubmitJob([op1, op2])
266
267   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
268
269   logging.debug("Got data from cluster, writing instance status file")
270
271   result = all_results[0]
272   smap = {}
273
274   instances = {}
275
276   # write the upfile
277   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
278   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
279
280   for fields in result:
281     (name, status, autostart, snodes) = fields
282
283     # update the secondary node map
284     for node in snodes:
285       if node not in smap:
286         smap[node] = []
287       smap[node].append(name)
288
289     instances[name] = Instance(name, status, autostart)
290
291   nodes =  dict([(name, (bootid, offline))
292                  for name, bootid, offline in all_results[1]])
293
294   client.ArchiveJob(job_id)
295
296   return instances, nodes, smap
297
298
299 class Watcher(object):
300   """Encapsulate the logic for restarting erronously halted virtual machines.
301
302   The calling program should periodically instantiate me and call Run().
303   This will traverse the list of instances, and make up to MAXTRIES attempts
304   to restart machines that are down.
305
306   """
307   def __init__(self, opts, notepad):
308     self.notepad = notepad
309     master = client.QueryConfigValues(["master_node"])[0]
310     if master != utils.HostInfo().name:
311       raise NotMasterError("This is not the master node")
312     # first archive old jobs
313     self.ArchiveJobs(opts.job_age)
314     # and only then submit new ones
315     self.instances, self.bootids, self.smap = GetClusterData()
316     self.started_instances = set()
317     self.opts = opts
318
319   def Run(self):
320     """Watcher run sequence.
321
322     """
323     notepad = self.notepad
324     self.CheckInstances(notepad)
325     self.CheckDisks(notepad)
326     self.VerifyDisks()
327
328   @staticmethod
329   def ArchiveJobs(age):
330     """Archive old jobs.
331
332     """
333     arch_count, left_count = client.AutoArchiveJobs(age)
334     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
335
336   def CheckDisks(self, notepad):
337     """Check all nodes for restarted ones.
338
339     """
340     check_nodes = []
341     for name, (new_id, offline) in self.bootids.iteritems():
342       old = notepad.GetNodeBootID(name)
343       if new_id is None:
344         # Bad node, not returning a boot id
345         if not offline:
346           logging.debug("Node %s missing boot id, skipping secondary checks",
347                         name)
348         continue
349       if old != new_id:
350         # Node's boot ID has changed, proably through a reboot.
351         check_nodes.append(name)
352
353     if check_nodes:
354       # Activate disks for all instances with any of the checked nodes as a
355       # secondary node.
356       for node in check_nodes:
357         if node not in self.smap:
358           continue
359         for instance_name in self.smap[node]:
360           instance = self.instances[instance_name]
361           if not instance.autostart:
362             logging.info(("Skipping disk activation for non-autostart"
363                           " instance %s"), instance.name)
364             continue
365           if instance.name in self.started_instances:
366             # we already tried to start the instance, which should have
367             # activated its drives (if they can be at all)
368             continue
369           try:
370             logging.info("Activating disks for instance %s", instance.name)
371             instance.ActivateDisks()
372           except Exception:
373             logging.exception("Error while activating disks for instance %s",
374                               instance.name)
375
376       # Keep changed boot IDs
377       for name in check_nodes:
378         notepad.SetNodeBootID(name, self.bootids[name][0])
379
380   def CheckInstances(self, notepad):
381     """Make a pass over the list of instances, restarting downed ones.
382
383     """
384     for instance in self.instances.values():
385       if instance.state in BAD_STATES:
386         n = notepad.NumberOfRestartAttempts(instance)
387
388         if n > MAXTRIES:
389           # stay quiet.
390           continue
391         elif n < MAXTRIES:
392           last = " (Attempt #%d)" % (n + 1)
393         else:
394           notepad.RecordRestartAttempt(instance)
395           logging.error("Could not restart %s after %d attempts, giving up",
396                         instance.name, MAXTRIES)
397           continue
398         try:
399           logging.info("Restarting %s%s",
400                         instance.name, last)
401           instance.Restart()
402           self.started_instances.add(instance.name)
403         except Exception:
404           logging.exception("Error while restarting instance %s",
405                             instance.name)
406
407         notepad.RecordRestartAttempt(instance)
408       elif instance.state in HELPLESS_STATES:
409         if notepad.NumberOfRestartAttempts(instance):
410           notepad.RemoveInstance(instance)
411       else:
412         if notepad.NumberOfRestartAttempts(instance):
413           notepad.RemoveInstance(instance)
414           logging.info("Restart of %s succeeded", instance.name)
415
416   @staticmethod
417   def VerifyDisks():
418     """Run gnt-cluster verify-disks.
419
420     """
421     op = opcodes.OpVerifyDisks()
422     job_id = client.SubmitJob([op])
423     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
424     client.ArchiveJob(job_id)
425     if not isinstance(result, (tuple, list)):
426       logging.error("Can't get a valid result from verify-disks")
427       return
428     offline_disk_instances = result[2]
429     if not offline_disk_instances:
430       # nothing to do
431       return
432     logging.debug("Will activate disks for instances %s",
433                   utils.CommaJoin(offline_disk_instances))
434     # we submit only one job, and wait for it. not optimal, but spams
435     # less the job queue
436     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
437            for name in offline_disk_instances]
438     job_id = cli.SendJob(job, cl=client)
439
440     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
441
442
443 def ParseOptions():
444   """Parse the command line options.
445
446   @return: (options, args) as from OptionParser.parse_args()
447
448   """
449   parser = OptionParser(description="Ganeti cluster watcher",
450                         usage="%prog [-d]",
451                         version="%%prog (ganeti) %s" %
452                         constants.RELEASE_VERSION)
453
454   parser.add_option(cli.DEBUG_OPT)
455   parser.add_option("-A", "--job-age", dest="job_age",
456                     help="Autoarchive jobs older than this age (default"
457                     " 6 hours)", default=6*3600)
458   options, args = parser.parse_args()
459   options.job_age = cli.ParseTimespec(options.job_age)
460   return options, args
461
462
463 def main():
464   """Main function.
465
466   """
467   global client
468
469   options, args = ParseOptions()
470
471   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
472                      stderr_logging=options.debug)
473
474   if ShouldPause():
475     logging.debug("Pause has been set, exiting")
476     sys.exit(constants.EXIT_SUCCESS)
477
478   update_file = False
479   try:
480     # on master or not, try to start the node dameon
481     EnsureDaemon(constants.NODED)
482
483     notepad = WatcherState()
484     try:
485       try:
486         client = cli.GetClient()
487       except errors.OpPrereqError:
488         # this is, from cli.GetClient, a not-master case
489         logging.debug("Not on master, exiting")
490         update_file = True
491         sys.exit(constants.EXIT_SUCCESS)
492       except luxi.NoMasterError, err:
493         logging.warning("Master seems to be down (%s), trying to restart",
494                         str(err))
495         if not EnsureDaemon(constants.MASTERD):
496           logging.critical("Can't start the master, exiting")
497           sys.exit(constants.EXIT_FAILURE)
498         # else retry the connection
499         client = cli.GetClient()
500
501       # we are on master now
502       EnsureDaemon(constants.RAPI)
503
504       try:
505         watcher = Watcher(options, notepad)
506       except errors.ConfigurationError:
507         # Just exit if there's no configuration
508         update_file = True
509         sys.exit(constants.EXIT_SUCCESS)
510
511       watcher.Run()
512       update_file = True
513
514     finally:
515       if update_file:
516         notepad.Save()
517       else:
518         logging.debug("Not updating status file due to failure")
519   except SystemExit:
520     raise
521   except NotMasterError:
522     logging.debug("Not master, exiting")
523     sys.exit(constants.EXIT_NOTMASTER)
524   except errors.ResolverError, err:
525     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
526     sys.exit(constants.EXIT_NODESETUP_ERROR)
527   except errors.JobQueueFull:
528     logging.error("Job queue is full, can't query cluster state")
529   except errors.JobQueueDrainError:
530     logging.error("Job queue is drained, can't maintain cluster state")
531   except Exception, err:
532     logging.error(str(err), exc_info=True)
533     sys.exit(constants.EXIT_FAILURE)
534
535
536 if __name__ == '__main__':
537   main()