Breath life in to RAPI for trunk
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import fcntl
34 import errno
35 import logging
36 from optparse import OptionParser
37
38 from ganeti import utils
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import ssconf
42 from ganeti import errors
43 from ganeti import logger
44
45
46 MAXTRIES = 5
47 BAD_STATES = ['stopped']
48 HELPLESS_STATES = ['(node down)']
49 NOTICE = 'NOTICE'
50 ERROR = 'ERROR'
51 KEY_RESTART_COUNT = "restart_count"
52 KEY_RESTART_WHEN = "restart_when"
53 KEY_BOOT_ID = "bootid"
54
55
56 class NotMasterError(errors.GenericError):
57   """Exception raised when this host is not the master."""
58
59
60 def Indent(s, prefix='| '):
61   """Indent a piece of text with a given prefix before each line.
62
63   Args:
64     s: The string to indent
65     prefix: The string to prepend each line.
66
67   """
68   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
69
70
71 def DoCmd(cmd):
72   """Run a shell command.
73
74   Args:
75     cmd: the command to run.
76
77   Raises CommandError with verbose commentary on error.
78
79   """
80   res = utils.RunCmd(cmd)
81
82   if res.failed:
83     msg = ("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
84            (repr(cmd),
85             Indent(res.fail_reason),
86             Indent(res.stdout),
87             Indent(res.stderr)))
88     raise errors.CommandError(msg)
89
90   return res
91
92
93 class WatcherState(object):
94   """Interface to a state file recording restart attempts.
95
96   """
97   def __init__(self):
98     """Open, lock, read and parse the file.
99
100     Raises exception on lock contention.
101
102     """
103     # The two-step dance below is necessary to allow both opening existing
104     # file read/write and creating if not existing.  Vanilla open will truncate
105     # an existing file -or- allow creating if not existing.
106     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
107     self.statefile = os.fdopen(fd, 'w+')
108
109     utils.LockFile(self.statefile.fileno())
110
111     try:
112       self._data = serializer.Load(self.statefile.read())
113     except Exception, msg:
114       # Ignore errors while loading the file and treat it as empty
115       self._data = {}
116       logging.warning(("Empty or invalid state file. Using defaults."
117                        " Error message: %s"), msg)
118
119     if "instance" not in self._data:
120       self._data["instance"] = {}
121     if "node" not in self._data:
122       self._data["node"] = {}
123
124     self._orig_data = serializer.Dump(self._data)
125
126   def Save(self):
127     """Save state to file, then unlock and close it.
128
129     """
130     assert self.statefile
131
132     serialized_form = serializer.Dump(self._data)
133     if self._orig_data == serialized_form:
134       logging.debug("Data didn't change, just touching status file")
135       os.utime(constants.WATCHER_STATEFILE, None)
136       return
137
138     # We need to make sure the file is locked before renaming it, otherwise
139     # starting ganeti-watcher again at the same time will create a conflict.
140     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
141                          data=serialized_form,
142                          prewrite=utils.LockFile, close=False)
143     self.statefile = os.fdopen(fd, 'w+')
144
145   def Close(self):
146     """Unlock configuration file and close it.
147
148     """
149     assert self.statefile
150
151     # Files are automatically unlocked when closing them
152     self.statefile.close()
153     self.statefile = None
154
155   def GetNodeBootID(self, name):
156     """Returns the last boot ID of a node or None.
157
158     """
159     ndata = self._data["node"]
160
161     if name in ndata and KEY_BOOT_ID in ndata[name]:
162       return ndata[name][KEY_BOOT_ID]
163     return None
164
165   def SetNodeBootID(self, name, bootid):
166     """Sets the boot ID of a node.
167
168     """
169     assert bootid
170
171     ndata = self._data["node"]
172
173     if name not in ndata:
174       ndata[name] = {}
175
176     ndata[name][KEY_BOOT_ID] = bootid
177
178   def NumberOfRestartAttempts(self, instance):
179     """Returns number of previous restart attempts.
180
181     Args:
182       instance - the instance to look up.
183
184     """
185     idata = self._data["instance"]
186
187     if instance.name in idata:
188       return idata[instance.name][KEY_RESTART_COUNT]
189
190     return 0
191
192   def RecordRestartAttempt(self, instance):
193     """Record a restart attempt.
194
195     Args:
196       instance - the instance being restarted
197
198     """
199     idata = self._data["instance"]
200
201     if instance.name not in idata:
202       inst = idata[instance.name] = {}
203     else:
204       inst = idata[instance.name]
205
206     inst[KEY_RESTART_WHEN] = time.time()
207     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
208
209   def RemoveInstance(self, instance):
210     """Update state to reflect that a machine is running, i.e. remove record.
211
212     Args:
213       instance - the instance to remove from books
214
215     This method removes the record for a named instance.
216
217     """
218     idata = self._data["instance"]
219
220     if instance.name in idata:
221       del idata[instance.name]
222
223
224 class Instance(object):
225   """Abstraction for a Virtual Machine instance.
226
227   Methods:
228     Restart(): issue a command to restart the represented machine.
229
230   """
231   def __init__(self, name, state, autostart):
232     self.name = name
233     self.state = state
234     self.autostart = autostart
235
236   def Restart(self):
237     """Encapsulates the start of an instance.
238
239     """
240     DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
241
242   def ActivateDisks(self):
243     """Encapsulates the activation of all disks of an instance.
244
245     """
246     DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
247
248
249 def _RunListCmd(cmd):
250   """Runs a command and parses its output into lists.
251
252   """
253   for line in DoCmd(cmd).stdout.splitlines():
254     yield line.split(':')
255
256
257 def GetInstanceList(with_secondaries=None):
258   """Get a list of instances on this cluster.
259
260   """
261   cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
262          '--separator=:']
263
264   fields = 'name,oper_state,admin_state'
265
266   if with_secondaries is not None:
267     fields += ',snodes'
268
269   cmd.append('-o')
270   cmd.append(fields)
271
272   instances = []
273   for fields in _RunListCmd(cmd):
274     if with_secondaries is not None:
275       (name, status, autostart, snodes) = fields
276
277       if snodes == "-":
278         continue
279
280       for node in with_secondaries:
281         if node in snodes.split(','):
282           break
283       else:
284         continue
285
286     else:
287       (name, status, autostart) = fields
288
289     instances.append(Instance(name, status, autostart != "no"))
290
291   return instances
292
293
294 def GetNodeBootIDs():
295   """Get a dict mapping nodes to boot IDs.
296
297   """
298   cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
299          '--separator=:', '-o', 'name,bootid']
300
301   ids = {}
302   for fields in _RunListCmd(cmd):
303     (name, bootid) = fields
304     ids[name] = bootid
305
306   return ids
307
308
309 class Watcher(object):
310   """Encapsulate the logic for restarting erronously halted virtual machines.
311
312   The calling program should periodically instantiate me and call Run().
313   This will traverse the list of instances, and make up to MAXTRIES attempts
314   to restart machines that are down.
315
316   """
317   def __init__(self):
318     sstore = ssconf.SimpleStore()
319     master = sstore.GetMasterNode()
320     if master != utils.HostInfo().name:
321       raise NotMasterError("This is not the master node")
322     self.instances = GetInstanceList()
323     self.bootids = GetNodeBootIDs()
324     self.started_instances = set()
325
326   def Run(self):
327     notepad = WatcherState()
328     try:
329       self.CheckInstances(notepad)
330       self.CheckDisks(notepad)
331       self.VerifyDisks()
332     finally:
333       notepad.Save()
334
335   def CheckDisks(self, notepad):
336     """Check all nodes for restarted ones.
337
338     """
339     check_nodes = []
340     for name, new_id in self.bootids.iteritems():
341       old = notepad.GetNodeBootID(name)
342       if old != new_id:
343         # Node's boot ID has changed, proably through a reboot.
344         check_nodes.append(name)
345
346     if check_nodes:
347       # Activate disks for all instances with any of the checked nodes as a
348       # secondary node.
349       for instance in GetInstanceList(with_secondaries=check_nodes):
350         if not instance.autostart:
351           logging.info(("Skipping disk activation for non-autostart"
352                         " instance %s"), instance.name)
353           continue
354         if instance.name in self.started_instances:
355           # we already tried to start the instance, which should have
356           # activated its drives (if they can be at all)
357           continue
358         try:
359           logging.info("Activating disks for instance %s", instance.name)
360           instance.ActivateDisks()
361         except Exception, err:
362           logging.error(str(err), exc_info=True)
363
364       # Keep changed boot IDs
365       for name in check_nodes:
366         notepad.SetNodeBootID(name, self.bootids[name])
367
368   def CheckInstances(self, notepad):
369     """Make a pass over the list of instances, restarting downed ones.
370
371     """
372     for instance in self.instances:
373       # Don't care about manually stopped instances
374       if not instance.autostart:
375         continue
376
377       if instance.state in BAD_STATES:
378         n = notepad.NumberOfRestartAttempts(instance)
379
380         if n > MAXTRIES:
381           # stay quiet.
382           continue
383         elif n < MAXTRIES:
384           last = " (Attempt #%d)" % (n + 1)
385         else:
386           notepad.RecordRestartAttempt(instance)
387           logging.error("Could not restart %s after %d attempts, giving up",
388                         instance.name, MAXTRIES)
389           continue
390         try:
391           logging.info("Restarting %s%s",
392                         instance.name, last)
393           instance.Restart()
394           self.started_instances.add(instance.name)
395         except Exception, err:
396           logging.error(str(err), exc_info=True)
397
398         notepad.RecordRestartAttempt(instance)
399       elif instance.state in HELPLESS_STATES:
400         if notepad.NumberOfRestartAttempts(instance):
401           notepad.RemoveInstance(instance)
402       else:
403         if notepad.NumberOfRestartAttempts(instance):
404           notepad.RemoveInstance(instance)
405           logging.info("Restart of %s succeeded", instance.name)
406
407   def VerifyDisks(self):
408     """Run gnt-cluster verify-disks.
409
410     """
411     result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15'])
412     if result.output:
413       logging.info(result.output)
414
415
416 def ParseOptions():
417   """Parse the command line options.
418
419   Returns:
420     (options, args) as from OptionParser.parse_args()
421
422   """
423   parser = OptionParser(description="Ganeti cluster watcher",
424                         usage="%prog [-d]",
425                         version="%%prog (ganeti) %s" %
426                         constants.RELEASE_VERSION)
427
428   parser.add_option("-d", "--debug", dest="debug",
429                     help="Write all messages to stderr",
430                     default=False, action="store_true")
431   options, args = parser.parse_args()
432   return options, args
433
434
435 def main():
436   """Main function.
437
438   """
439   options, args = ParseOptions()
440
441   logger.SetupDaemon(constants.LOG_WATCHER, debug=options.debug)
442
443   try:
444     try:
445       watcher = Watcher()
446     except errors.ConfigurationError:
447       # Just exit if there's no configuration
448       sys.exit(constants.EXIT_SUCCESS)
449     watcher.Run()
450   except SystemExit:
451     raise
452   except NotMasterError:
453     logging.debug("Not master, exiting")
454     sys.exit(constants.EXIT_NOTMASTER)
455   except errors.ResolverError, err:
456     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
457     sys.exit(constants.EXIT_NODESETUP_ERROR)
458   except Exception, err:
459     logging.error(str(err), exc_info=True)
460     sys.exit(constants.EXIT_FAILURE)
461
462
463 if __name__ == '__main__':
464   main()