Implement lockless query operations
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42
43
44 MAXTRIES = 5
45 BAD_STATES = ['ERROR_down']
46 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47 NOTICE = 'NOTICE'
48 ERROR = 'ERROR'
49 KEY_RESTART_COUNT = "restart_count"
50 KEY_RESTART_WHEN = "restart_when"
51 KEY_BOOT_ID = "bootid"
52
53
54 # Global client object
55 client = None
56
57
58 class NotMasterError(errors.GenericError):
59   """Exception raised when this host is not the master."""
60
61
62 def Indent(s, prefix='| '):
63   """Indent a piece of text with a given prefix before each line.
64
65   @param s: the string to indent
66   @param prefix: the string to prepend each line
67
68   """
69   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70
71
72 class WatcherState(object):
73   """Interface to a state file recording restart attempts.
74
75   """
76   def __init__(self):
77     """Open, lock, read and parse the file.
78
79     Raises exception on lock contention.
80
81     """
82     # The two-step dance below is necessary to allow both opening existing
83     # file read/write and creating if not existing.  Vanilla open will truncate
84     # an existing file -or- allow creating if not existing.
85     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86     self.statefile = os.fdopen(fd, 'w+')
87
88     utils.LockFile(self.statefile.fileno())
89
90     try:
91       self._data = serializer.Load(self.statefile.read())
92     except Exception, msg:
93       # Ignore errors while loading the file and treat it as empty
94       self._data = {}
95       logging.warning(("Empty or invalid state file. Using defaults."
96                        " Error message: %s"), msg)
97
98     if "instance" not in self._data:
99       self._data["instance"] = {}
100     if "node" not in self._data:
101       self._data["node"] = {}
102
103     self._orig_data = serializer.Dump(self._data)
104
105   def Save(self):
106     """Save state to file, then unlock and close it.
107
108     """
109     assert self.statefile
110
111     serialized_form = serializer.Dump(self._data)
112     if self._orig_data == serialized_form:
113       logging.debug("Data didn't change, just touching status file")
114       os.utime(constants.WATCHER_STATEFILE, None)
115       return
116
117     # We need to make sure the file is locked before renaming it, otherwise
118     # starting ganeti-watcher again at the same time will create a conflict.
119     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120                          data=serialized_form,
121                          prewrite=utils.LockFile, close=False)
122     self.statefile = os.fdopen(fd, 'w+')
123
124   def Close(self):
125     """Unlock configuration file and close it.
126
127     """
128     assert self.statefile
129
130     # Files are automatically unlocked when closing them
131     self.statefile.close()
132     self.statefile = None
133
134   def GetNodeBootID(self, name):
135     """Returns the last boot ID of a node or None.
136
137     """
138     ndata = self._data["node"]
139
140     if name in ndata and KEY_BOOT_ID in ndata[name]:
141       return ndata[name][KEY_BOOT_ID]
142     return None
143
144   def SetNodeBootID(self, name, bootid):
145     """Sets the boot ID of a node.
146
147     """
148     assert bootid
149
150     ndata = self._data["node"]
151
152     if name not in ndata:
153       ndata[name] = {}
154
155     ndata[name][KEY_BOOT_ID] = bootid
156
157   def NumberOfRestartAttempts(self, instance):
158     """Returns number of previous restart attempts.
159
160     @type instance: L{Instance}
161     @param instance: the instance to look up
162
163     """
164     idata = self._data["instance"]
165
166     if instance.name in idata:
167       return idata[instance.name][KEY_RESTART_COUNT]
168
169     return 0
170
171   def RecordRestartAttempt(self, instance):
172     """Record a restart attempt.
173
174     @type instance: L{Instance}
175     @param instance: the instance being restarted
176
177     """
178     idata = self._data["instance"]
179
180     if instance.name not in idata:
181       inst = idata[instance.name] = {}
182     else:
183       inst = idata[instance.name]
184
185     inst[KEY_RESTART_WHEN] = time.time()
186     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187
188   def RemoveInstance(self, instance):
189     """Update state to reflect that a machine is running.
190
191     This method removes the record for a named instance (as we only
192     track down instances).
193
194     @type instance: L{Instance}
195     @param instance: the instance to remove from books
196
197     """
198     idata = self._data["instance"]
199
200     if instance.name in idata:
201       del idata[instance.name]
202
203
204 class Instance(object):
205   """Abstraction for a Virtual Machine instance.
206
207   """
208   def __init__(self, name, state, autostart):
209     self.name = name
210     self.state = state
211     self.autostart = autostart
212
213   def Restart(self):
214     """Encapsulates the start of an instance.
215
216     """
217     op = opcodes.OpStartupInstance(instance_name=self.name,
218                                    force=False,
219                                    extra_args=None)
220     cli.SubmitOpCode(op, cl=client)
221
222   def ActivateDisks(self):
223     """Encapsulates the activation of all disks of an instance.
224
225     """
226     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
227     cli.SubmitOpCode(op, cl=client)
228
229
230 def GetInstanceList(with_secondaries=None):
231   """Get a list of instances on this cluster.
232
233   """
234   fields = ["name", "status", "admin_state"]
235
236   if with_secondaries is not None:
237     fields.append("snodes")
238
239   result = client.QueryInstances([], fields, True)
240
241   instances = []
242   for fields in result:
243     if with_secondaries is not None:
244       (name, status, autostart, snodes) = fields
245
246       if not snodes:
247         continue
248
249       for node in with_secondaries:
250         if node in snodes:
251           break
252       else:
253         continue
254
255     else:
256       (name, status, autostart) = fields
257
258     instances.append(Instance(name, status, autostart))
259
260   return instances
261
262
263 def GetNodeBootIDs():
264   """Get a dict mapping nodes to boot IDs.
265
266   """
267   result = client.QueryNodes([], ["name", "bootid", "offline"], True)
268   return dict([(name, (bootid, offline)) for name, bootid, offline in result])
269
270
271 class Watcher(object):
272   """Encapsulate the logic for restarting erronously halted virtual machines.
273
274   The calling program should periodically instantiate me and call Run().
275   This will traverse the list of instances, and make up to MAXTRIES attempts
276   to restart machines that are down.
277
278   """
279   def __init__(self):
280     master = client.QueryConfigValues(["master_node"])[0]
281     if master != utils.HostInfo().name:
282       raise NotMasterError("This is not the master node")
283     self.instances = GetInstanceList()
284     self.bootids = GetNodeBootIDs()
285     self.started_instances = set()
286
287   def Run(self):
288     notepad = WatcherState()
289     try:
290       self.CheckInstances(notepad)
291       self.CheckDisks(notepad)
292       self.VerifyDisks()
293     finally:
294       notepad.Save()
295
296   def CheckDisks(self, notepad):
297     """Check all nodes for restarted ones.
298
299     """
300     check_nodes = []
301     for name, (new_id, offline) in self.bootids.iteritems():
302       old = notepad.GetNodeBootID(name)
303       if new_id is None:
304         # Bad node, not returning a boot id
305         if not offline:
306           logging.debug("Node %s missing boot id, skipping secondary checks",
307                         name)
308         continue
309       if old != new_id:
310         # Node's boot ID has changed, proably through a reboot.
311         check_nodes.append(name)
312
313     if check_nodes:
314       # Activate disks for all instances with any of the checked nodes as a
315       # secondary node.
316       for instance in GetInstanceList(with_secondaries=check_nodes):
317         if not instance.autostart:
318           logging.info(("Skipping disk activation for non-autostart"
319                         " instance %s"), instance.name)
320           continue
321         if instance.name in self.started_instances:
322           # we already tried to start the instance, which should have
323           # activated its drives (if they can be at all)
324           continue
325         try:
326           logging.info("Activating disks for instance %s", instance.name)
327           instance.ActivateDisks()
328         except Exception:
329           logging.exception("Error while activating disks for instance %s",
330                             instance.name)
331
332       # Keep changed boot IDs
333       for name in check_nodes:
334         notepad.SetNodeBootID(name, self.bootids[name])
335
336   def CheckInstances(self, notepad):
337     """Make a pass over the list of instances, restarting downed ones.
338
339     """
340     for instance in self.instances:
341       if instance.state in BAD_STATES:
342         n = notepad.NumberOfRestartAttempts(instance)
343
344         if n > MAXTRIES:
345           # stay quiet.
346           continue
347         elif n < MAXTRIES:
348           last = " (Attempt #%d)" % (n + 1)
349         else:
350           notepad.RecordRestartAttempt(instance)
351           logging.error("Could not restart %s after %d attempts, giving up",
352                         instance.name, MAXTRIES)
353           continue
354         try:
355           logging.info("Restarting %s%s",
356                         instance.name, last)
357           instance.Restart()
358           self.started_instances.add(instance.name)
359         except Exception:
360           logging.exception("Error while restarting instance %s",
361                             instance.name)
362
363         notepad.RecordRestartAttempt(instance)
364       elif instance.state in HELPLESS_STATES:
365         if notepad.NumberOfRestartAttempts(instance):
366           notepad.RemoveInstance(instance)
367       else:
368         if notepad.NumberOfRestartAttempts(instance):
369           notepad.RemoveInstance(instance)
370           logging.info("Restart of %s succeeded", instance.name)
371
372   @staticmethod
373   def VerifyDisks():
374     """Run gnt-cluster verify-disks.
375
376     """
377     op = opcodes.OpVerifyDisks()
378     result = cli.SubmitOpCode(op, cl=client)
379     if not isinstance(result, (tuple, list)):
380       logging.error("Can't get a valid result from verify-disks")
381       return
382     offline_disk_instances = result[2]
383     if not offline_disk_instances:
384       # nothing to do
385       return
386     logging.debug("Will activate disks for instances %s",
387                   ", ".join(offline_disk_instances))
388     # we submit only one job, and wait for it. not optimal, but spams
389     # less the job queue
390     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
391            for name in offline_disk_instances]
392     job_id = cli.SendJob(job, cl=client)
393
394     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
395
396
397 def ParseOptions():
398   """Parse the command line options.
399
400   @return: (options, args) as from OptionParser.parse_args()
401
402   """
403   parser = OptionParser(description="Ganeti cluster watcher",
404                         usage="%prog [-d]",
405                         version="%%prog (ganeti) %s" %
406                         constants.RELEASE_VERSION)
407
408   parser.add_option("-d", "--debug", dest="debug",
409                     help="Write all messages to stderr",
410                     default=False, action="store_true")
411   options, args = parser.parse_args()
412   return options, args
413
414
415 def main():
416   """Main function.
417
418   """
419   global client
420
421   options, args = ParseOptions()
422
423   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
424                      stderr_logging=options.debug)
425
426   try:
427     client = cli.GetClient()
428
429     try:
430       watcher = Watcher()
431     except errors.ConfigurationError:
432       # Just exit if there's no configuration
433       sys.exit(constants.EXIT_SUCCESS)
434
435     watcher.Run()
436   except SystemExit:
437     raise
438   except NotMasterError:
439     logging.debug("Not master, exiting")
440     sys.exit(constants.EXIT_NOTMASTER)
441   except errors.ResolverError, err:
442     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
443     sys.exit(constants.EXIT_NODESETUP_ERROR)
444   except Exception, err:
445     logging.error(str(err), exc_info=True)
446     sys.exit(constants.EXIT_FAILURE)
447
448
449 if __name__ == '__main__':
450   main()