12bafcec9b6e45d8890a1c7437f406ceae463e0d
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42
43
44 MAXTRIES = 5
45 BAD_STATES = ['ERROR_down']
46 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47 NOTICE = 'NOTICE'
48 ERROR = 'ERROR'
49 KEY_RESTART_COUNT = "restart_count"
50 KEY_RESTART_WHEN = "restart_when"
51 KEY_BOOT_ID = "bootid"
52
53
54 # Global client object
55 client = None
56
57
58 class NotMasterError(errors.GenericError):
59   """Exception raised when this host is not the master."""
60
61
62 def Indent(s, prefix='| '):
63   """Indent a piece of text with a given prefix before each line.
64
65   @param s: the string to indent
66   @param prefix: the string to prepend each line
67
68   """
69   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70
71
72 class WatcherState(object):
73   """Interface to a state file recording restart attempts.
74
75   """
76   def __init__(self):
77     """Open, lock, read and parse the file.
78
79     Raises exception on lock contention.
80
81     """
82     # The two-step dance below is necessary to allow both opening existing
83     # file read/write and creating if not existing.  Vanilla open will truncate
84     # an existing file -or- allow creating if not existing.
85     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86     self.statefile = os.fdopen(fd, 'w+')
87
88     utils.LockFile(self.statefile.fileno())
89
90     try:
91       self._data = serializer.Load(self.statefile.read())
92     except Exception, msg:
93       # Ignore errors while loading the file and treat it as empty
94       self._data = {}
95       logging.warning(("Empty or invalid state file. Using defaults."
96                        " Error message: %s"), msg)
97
98     if "instance" not in self._data:
99       self._data["instance"] = {}
100     if "node" not in self._data:
101       self._data["node"] = {}
102
103     self._orig_data = serializer.Dump(self._data)
104
105   def Save(self):
106     """Save state to file, then unlock and close it.
107
108     """
109     assert self.statefile
110
111     serialized_form = serializer.Dump(self._data)
112     if self._orig_data == serialized_form:
113       logging.debug("Data didn't change, just touching status file")
114       os.utime(constants.WATCHER_STATEFILE, None)
115       return
116
117     # We need to make sure the file is locked before renaming it, otherwise
118     # starting ganeti-watcher again at the same time will create a conflict.
119     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120                          data=serialized_form,
121                          prewrite=utils.LockFile, close=False)
122     self.statefile = os.fdopen(fd, 'w+')
123
124   def Close(self):
125     """Unlock configuration file and close it.
126
127     """
128     assert self.statefile
129
130     # Files are automatically unlocked when closing them
131     self.statefile.close()
132     self.statefile = None
133
134   def GetNodeBootID(self, name):
135     """Returns the last boot ID of a node or None.
136
137     """
138     ndata = self._data["node"]
139
140     if name in ndata and KEY_BOOT_ID in ndata[name]:
141       return ndata[name][KEY_BOOT_ID]
142     return None
143
144   def SetNodeBootID(self, name, bootid):
145     """Sets the boot ID of a node.
146
147     """
148     assert bootid
149
150     ndata = self._data["node"]
151
152     if name not in ndata:
153       ndata[name] = {}
154
155     ndata[name][KEY_BOOT_ID] = bootid
156
157   def NumberOfRestartAttempts(self, instance):
158     """Returns number of previous restart attempts.
159
160     @type instance: L{Instance}
161     @param instance: the instance to look up
162
163     """
164     idata = self._data["instance"]
165
166     if instance.name in idata:
167       return idata[instance.name][KEY_RESTART_COUNT]
168
169     return 0
170
171   def RecordRestartAttempt(self, instance):
172     """Record a restart attempt.
173
174     @type instance: L{Instance}
175     @param instance: the instance being restarted
176
177     """
178     idata = self._data["instance"]
179
180     if instance.name not in idata:
181       inst = idata[instance.name] = {}
182     else:
183       inst = idata[instance.name]
184
185     inst[KEY_RESTART_WHEN] = time.time()
186     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187
188   def RemoveInstance(self, instance):
189     """Update state to reflect that a machine is running.
190
191     This method removes the record for a named instance (as we only
192     track down instances).
193
194     @type instance: L{Instance}
195     @param instance: the instance to remove from books
196
197     """
198     idata = self._data["instance"]
199
200     if instance.name in idata:
201       del idata[instance.name]
202
203
204 class Instance(object):
205   """Abstraction for a Virtual Machine instance.
206
207   """
208   def __init__(self, name, state, autostart):
209     self.name = name
210     self.state = state
211     self.autostart = autostart
212
213   def Restart(self):
214     """Encapsulates the start of an instance.
215
216     """
217     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
218     cli.SubmitOpCode(op, cl=client)
219
220   def ActivateDisks(self):
221     """Encapsulates the activation of all disks of an instance.
222
223     """
224     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
225     cli.SubmitOpCode(op, cl=client)
226
227
228 def GetInstanceList(with_secondaries=None):
229   """Get a list of instances on this cluster.
230
231   """
232   fields = ["name", "status", "admin_state"]
233
234   if with_secondaries is not None:
235     fields.append("snodes")
236
237   result = client.QueryInstances([], fields, True)
238
239   instances = []
240   for fields in result:
241     if with_secondaries is not None:
242       (name, status, autostart, snodes) = fields
243
244       if not snodes:
245         continue
246
247       for node in with_secondaries:
248         if node in snodes:
249           break
250       else:
251         continue
252
253     else:
254       (name, status, autostart) = fields
255
256     instances.append(Instance(name, status, autostart))
257
258   return instances
259
260
261 def GetNodeBootIDs():
262   """Get a dict mapping nodes to boot IDs.
263
264   """
265   result = client.QueryNodes([], ["name", "bootid", "offline"], True)
266   return dict([(name, (bootid, offline)) for name, bootid, offline in result])
267
268
269 class Watcher(object):
270   """Encapsulate the logic for restarting erronously halted virtual machines.
271
272   The calling program should periodically instantiate me and call Run().
273   This will traverse the list of instances, and make up to MAXTRIES attempts
274   to restart machines that are down.
275
276   """
277   def __init__(self, opts):
278     master = client.QueryConfigValues(["master_node"])[0]
279     if master != utils.HostInfo().name:
280       raise NotMasterError("This is not the master node")
281     self.instances = GetInstanceList()
282     self.bootids = GetNodeBootIDs()
283     self.started_instances = set()
284     self.opts = opts
285
286   def Run(self):
287     notepad = WatcherState()
288     try:
289       self.ArchiveJobs(self.opts.job_age)
290       self.CheckInstances(notepad)
291       self.CheckDisks(notepad)
292       self.VerifyDisks()
293     finally:
294       notepad.Save()
295
296   def ArchiveJobs(self, age):
297     """Archive old jobs.
298
299     """
300     arch_count, left_count = client.AutoArchiveJobs(age)
301     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
302
303   def CheckDisks(self, notepad):
304     """Check all nodes for restarted ones.
305
306     """
307     check_nodes = []
308     for name, (new_id, offline) in self.bootids.iteritems():
309       old = notepad.GetNodeBootID(name)
310       if new_id is None:
311         # Bad node, not returning a boot id
312         if not offline:
313           logging.debug("Node %s missing boot id, skipping secondary checks",
314                         name)
315         continue
316       if old != new_id:
317         # Node's boot ID has changed, proably through a reboot.
318         check_nodes.append(name)
319
320     if check_nodes:
321       # Activate disks for all instances with any of the checked nodes as a
322       # secondary node.
323       for instance in GetInstanceList(with_secondaries=check_nodes):
324         if not instance.autostart:
325           logging.info(("Skipping disk activation for non-autostart"
326                         " instance %s"), instance.name)
327           continue
328         if instance.name in self.started_instances:
329           # we already tried to start the instance, which should have
330           # activated its drives (if they can be at all)
331           continue
332         try:
333           logging.info("Activating disks for instance %s", instance.name)
334           instance.ActivateDisks()
335         except Exception:
336           logging.exception("Error while activating disks for instance %s",
337                             instance.name)
338
339       # Keep changed boot IDs
340       for name in check_nodes:
341         notepad.SetNodeBootID(name, self.bootids[name][0])
342
343   def CheckInstances(self, notepad):
344     """Make a pass over the list of instances, restarting downed ones.
345
346     """
347     for instance in self.instances:
348       if instance.state in BAD_STATES:
349         n = notepad.NumberOfRestartAttempts(instance)
350
351         if n > MAXTRIES:
352           # stay quiet.
353           continue
354         elif n < MAXTRIES:
355           last = " (Attempt #%d)" % (n + 1)
356         else:
357           notepad.RecordRestartAttempt(instance)
358           logging.error("Could not restart %s after %d attempts, giving up",
359                         instance.name, MAXTRIES)
360           continue
361         try:
362           logging.info("Restarting %s%s",
363                         instance.name, last)
364           instance.Restart()
365           self.started_instances.add(instance.name)
366         except Exception:
367           logging.exception("Error while restarting instance %s",
368                             instance.name)
369
370         notepad.RecordRestartAttempt(instance)
371       elif instance.state in HELPLESS_STATES:
372         if notepad.NumberOfRestartAttempts(instance):
373           notepad.RemoveInstance(instance)
374       else:
375         if notepad.NumberOfRestartAttempts(instance):
376           notepad.RemoveInstance(instance)
377           logging.info("Restart of %s succeeded", instance.name)
378
379   @staticmethod
380   def VerifyDisks():
381     """Run gnt-cluster verify-disks.
382
383     """
384     op = opcodes.OpVerifyDisks()
385     result = cli.SubmitOpCode(op, cl=client)
386     if not isinstance(result, (tuple, list)):
387       logging.error("Can't get a valid result from verify-disks")
388       return
389     offline_disk_instances = result[2]
390     if not offline_disk_instances:
391       # nothing to do
392       return
393     logging.debug("Will activate disks for instances %s",
394                   ", ".join(offline_disk_instances))
395     # we submit only one job, and wait for it. not optimal, but spams
396     # less the job queue
397     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
398            for name in offline_disk_instances]
399     job_id = cli.SendJob(job, cl=client)
400
401     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
402
403
404 def ParseOptions():
405   """Parse the command line options.
406
407   @return: (options, args) as from OptionParser.parse_args()
408
409   """
410   parser = OptionParser(description="Ganeti cluster watcher",
411                         usage="%prog [-d]",
412                         version="%%prog (ganeti) %s" %
413                         constants.RELEASE_VERSION)
414
415   parser.add_option("-d", "--debug", dest="debug",
416                     help="Write all messages to stderr",
417                     default=False, action="store_true")
418   parser.add_option("-A", "--job-age", dest="job_age",
419                     help="Autoarchive jobs older than this age (default"
420                     " 6 hours)", default=6*3600)
421   options, args = parser.parse_args()
422   options.job_age = cli.ParseTimespec(options.job_age)
423   return options, args
424
425
426 def main():
427   """Main function.
428
429   """
430   global client
431
432   options, args = ParseOptions()
433
434   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
435                      stderr_logging=options.debug)
436
437   try:
438     client = cli.GetClient()
439
440     try:
441       watcher = Watcher(options)
442     except errors.ConfigurationError:
443       # Just exit if there's no configuration
444       sys.exit(constants.EXIT_SUCCESS)
445
446     watcher.Run()
447   except SystemExit:
448     raise
449   except NotMasterError:
450     logging.debug("Not master, exiting")
451     sys.exit(constants.EXIT_NOTMASTER)
452   except errors.ResolverError, err:
453     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
454     sys.exit(constants.EXIT_NODESETUP_ERROR)
455   except Exception, err:
456     logging.error(str(err), exc_info=True)
457     sys.exit(constants.EXIT_FAILURE)
458
459
460 if __name__ == '__main__':
461   main()