Remove last use of utils.RunCmd from the watcher
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import fcntl
34 import errno
35 import logging
36 from optparse import OptionParser
37
38 from ganeti import utils
39 from ganeti import constants
40 from ganeti import serializer
41 from ganeti import ssconf
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import logger
45 from ganeti import cli
46
47
48 MAXTRIES = 5
49 BAD_STATES = ['stopped']
50 HELPLESS_STATES = ['(node down)']
51 NOTICE = 'NOTICE'
52 ERROR = 'ERROR'
53 KEY_RESTART_COUNT = "restart_count"
54 KEY_RESTART_WHEN = "restart_when"
55 KEY_BOOT_ID = "bootid"
56
57
58 # Global client object
59 client = None
60
61
62 class NotMasterError(errors.GenericError):
63   """Exception raised when this host is not the master."""
64
65
66 def Indent(s, prefix='| '):
67   """Indent a piece of text with a given prefix before each line.
68
69   Args:
70     s: The string to indent
71     prefix: The string to prepend each line.
72
73   """
74   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75
76
77 class WatcherState(object):
78   """Interface to a state file recording restart attempts.
79
80   """
81   def __init__(self):
82     """Open, lock, read and parse the file.
83
84     Raises exception on lock contention.
85
86     """
87     # The two-step dance below is necessary to allow both opening existing
88     # file read/write and creating if not existing.  Vanilla open will truncate
89     # an existing file -or- allow creating if not existing.
90     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
91     self.statefile = os.fdopen(fd, 'w+')
92
93     utils.LockFile(self.statefile.fileno())
94
95     try:
96       self._data = serializer.Load(self.statefile.read())
97     except Exception, msg:
98       # Ignore errors while loading the file and treat it as empty
99       self._data = {}
100       logging.warning(("Empty or invalid state file. Using defaults."
101                        " Error message: %s"), msg)
102
103     if "instance" not in self._data:
104       self._data["instance"] = {}
105     if "node" not in self._data:
106       self._data["node"] = {}
107
108     self._orig_data = serializer.Dump(self._data)
109
110   def Save(self):
111     """Save state to file, then unlock and close it.
112
113     """
114     assert self.statefile
115
116     serialized_form = serializer.Dump(self._data)
117     if self._orig_data == serialized_form:
118       logging.debug("Data didn't change, just touching status file")
119       os.utime(constants.WATCHER_STATEFILE, None)
120       return
121
122     # We need to make sure the file is locked before renaming it, otherwise
123     # starting ganeti-watcher again at the same time will create a conflict.
124     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
125                          data=serialized_form,
126                          prewrite=utils.LockFile, close=False)
127     self.statefile = os.fdopen(fd, 'w+')
128
129   def Close(self):
130     """Unlock configuration file and close it.
131
132     """
133     assert self.statefile
134
135     # Files are automatically unlocked when closing them
136     self.statefile.close()
137     self.statefile = None
138
139   def GetNodeBootID(self, name):
140     """Returns the last boot ID of a node or None.
141
142     """
143     ndata = self._data["node"]
144
145     if name in ndata and KEY_BOOT_ID in ndata[name]:
146       return ndata[name][KEY_BOOT_ID]
147     return None
148
149   def SetNodeBootID(self, name, bootid):
150     """Sets the boot ID of a node.
151
152     """
153     assert bootid
154
155     ndata = self._data["node"]
156
157     if name not in ndata:
158       ndata[name] = {}
159
160     ndata[name][KEY_BOOT_ID] = bootid
161
162   def NumberOfRestartAttempts(self, instance):
163     """Returns number of previous restart attempts.
164
165     Args:
166       instance - the instance to look up.
167
168     """
169     idata = self._data["instance"]
170
171     if instance.name in idata:
172       return idata[instance.name][KEY_RESTART_COUNT]
173
174     return 0
175
176   def RecordRestartAttempt(self, instance):
177     """Record a restart attempt.
178
179     Args:
180       instance - the instance being restarted
181
182     """
183     idata = self._data["instance"]
184
185     if instance.name not in idata:
186       inst = idata[instance.name] = {}
187     else:
188       inst = idata[instance.name]
189
190     inst[KEY_RESTART_WHEN] = time.time()
191     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
192
193   def RemoveInstance(self, instance):
194     """Update state to reflect that a machine is running, i.e. remove record.
195
196     Args:
197       instance - the instance to remove from books
198
199     This method removes the record for a named instance.
200
201     """
202     idata = self._data["instance"]
203
204     if instance.name in idata:
205       del idata[instance.name]
206
207
208 class Instance(object):
209   """Abstraction for a Virtual Machine instance.
210
211   Methods:
212     Restart(): issue a command to restart the represented machine.
213
214   """
215   def __init__(self, name, state, autostart):
216     self.name = name
217     self.state = state
218     self.autostart = autostart
219
220   def Restart(self):
221     """Encapsulates the start of an instance.
222
223     """
224     op = opcodes.OpStartupInstance(instance_name=self.name,
225                                    force=False,
226                                    extra_args=None)
227     cli.SubmitOpCode(op, cl=client)
228
229   def ActivateDisks(self):
230     """Encapsulates the activation of all disks of an instance.
231
232     """
233     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
234     cli.SubmitOpCode(op, cl=client)
235
236
237 def GetInstanceList(with_secondaries=None):
238   """Get a list of instances on this cluster.
239
240   """
241   fields = ["name", "oper_state", "admin_state"]
242
243   if with_secondaries is not None:
244     fields.append("snodes")
245
246   result = client.QueryInstances([], fields)
247
248   instances = []
249   for fields in result:
250     if with_secondaries is not None:
251       (name, status, autostart, snodes) = fields
252
253       if not snodes:
254         continue
255
256       for node in with_secondaries:
257         if node in snodes:
258           break
259       else:
260         continue
261
262     else:
263       (name, status, autostart) = fields
264
265     instances.append(Instance(name, status, autostart))
266
267   return instances
268
269
270 def GetNodeBootIDs():
271   """Get a dict mapping nodes to boot IDs.
272
273   """
274   result = client.QueryNodes([], ["name", "bootid"])
275   return dict([(name, bootid) for name, bootid in result])
276
277
278 class Watcher(object):
279   """Encapsulate the logic for restarting erronously halted virtual machines.
280
281   The calling program should periodically instantiate me and call Run().
282   This will traverse the list of instances, and make up to MAXTRIES attempts
283   to restart machines that are down.
284
285   """
286   def __init__(self):
287     sstore = ssconf.SimpleStore()
288     master = sstore.GetMasterNode()
289     if master != utils.HostInfo().name:
290       raise NotMasterError("This is not the master node")
291     self.instances = GetInstanceList()
292     self.bootids = GetNodeBootIDs()
293     self.started_instances = set()
294
295   def Run(self):
296     notepad = WatcherState()
297     try:
298       self.CheckInstances(notepad)
299       self.CheckDisks(notepad)
300       self.VerifyDisks()
301     finally:
302       notepad.Save()
303
304   def CheckDisks(self, notepad):
305     """Check all nodes for restarted ones.
306
307     """
308     check_nodes = []
309     for name, new_id in self.bootids.iteritems():
310       old = notepad.GetNodeBootID(name)
311       if old != new_id:
312         # Node's boot ID has changed, proably through a reboot.
313         check_nodes.append(name)
314
315     if check_nodes:
316       # Activate disks for all instances with any of the checked nodes as a
317       # secondary node.
318       for instance in GetInstanceList(with_secondaries=check_nodes):
319         if not instance.autostart:
320           logging.info(("Skipping disk activation for non-autostart"
321                         " instance %s"), instance.name)
322           continue
323         if instance.name in self.started_instances:
324           # we already tried to start the instance, which should have
325           # activated its drives (if they can be at all)
326           continue
327         try:
328           logging.info("Activating disks for instance %s", instance.name)
329           instance.ActivateDisks()
330         except Exception, err:
331           logging.error(str(err), exc_info=True)
332
333       # Keep changed boot IDs
334       for name in check_nodes:
335         notepad.SetNodeBootID(name, self.bootids[name])
336
337   def CheckInstances(self, notepad):
338     """Make a pass over the list of instances, restarting downed ones.
339
340     """
341     for instance in self.instances:
342       # Don't care about manually stopped instances
343       if not instance.autostart:
344         continue
345
346       if instance.state in BAD_STATES:
347         n = notepad.NumberOfRestartAttempts(instance)
348
349         if n > MAXTRIES:
350           # stay quiet.
351           continue
352         elif n < MAXTRIES:
353           last = " (Attempt #%d)" % (n + 1)
354         else:
355           notepad.RecordRestartAttempt(instance)
356           logging.error("Could not restart %s after %d attempts, giving up",
357                         instance.name, MAXTRIES)
358           continue
359         try:
360           logging.info("Restarting %s%s",
361                         instance.name, last)
362           instance.Restart()
363           self.started_instances.add(instance.name)
364         except Exception, err:
365           logging.error(str(err), exc_info=True)
366
367         notepad.RecordRestartAttempt(instance)
368       elif instance.state in HELPLESS_STATES:
369         if notepad.NumberOfRestartAttempts(instance):
370           notepad.RemoveInstance(instance)
371       else:
372         if notepad.NumberOfRestartAttempts(instance):
373           notepad.RemoveInstance(instance)
374           logging.info("Restart of %s succeeded", instance.name)
375
376   def VerifyDisks(self):
377     """Run gnt-cluster verify-disks.
378
379     """
380     op = opcodes.OpVerifyDisks()
381     result = cli.SubmitOpCode(op, cl=client)
382     if not isinstance(result, (tuple, list)):
383       logging.error("Can't get a valid result from verify-disks")
384       return
385     offline_disk_instances = result[2]
386     if not offline_disk_instances:
387       # nothing to do
388       return
389     logging.debug("Will activate disks for instances %s",
390                   ", ".join(offline_disk_instances))
391     # we submit only one job, and wait for it. not optimal, but spams
392     # less the job queue
393     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
394            for name in offline_disk_instances]
395     job_id = cli.SendJob(job, cl=client)
396
397     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
398
399
400 def ParseOptions():
401   """Parse the command line options.
402
403   Returns:
404     (options, args) as from OptionParser.parse_args()
405
406   """
407   parser = OptionParser(description="Ganeti cluster watcher",
408                         usage="%prog [-d]",
409                         version="%%prog (ganeti) %s" %
410                         constants.RELEASE_VERSION)
411
412   parser.add_option("-d", "--debug", dest="debug",
413                     help="Write all messages to stderr",
414                     default=False, action="store_true")
415   options, args = parser.parse_args()
416   return options, args
417
418
419 def main():
420   """Main function.
421
422   """
423   global client
424
425   options, args = ParseOptions()
426
427   logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
428                       stderr_logging=options.debug)
429
430   try:
431     client = cli.GetClient()
432
433     try:
434       watcher = Watcher()
435     except errors.ConfigurationError:
436       # Just exit if there's no configuration
437       sys.exit(constants.EXIT_SUCCESS)
438
439     watcher.Run()
440   except SystemExit:
441     raise
442   except NotMasterError:
443     logging.debug("Not master, exiting")
444     sys.exit(constants.EXIT_NOTMASTER)
445   except errors.ResolverError, err:
446     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
447     sys.exit(constants.EXIT_NODESETUP_ERROR)
448   except Exception, err:
449     logging.error(str(err), exc_info=True)
450     sys.exit(constants.EXIT_FAILURE)
451
452
453 if __name__ == '__main__':
454   main()