Merge branch 'master' into next
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
43
44
45 MAXTRIES = 5
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
48 NOTICE = 'NOTICE'
49 ERROR = 'ERROR'
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
53
54
55 # Global client object
56 client = None
57
58
59 class NotMasterError(errors.GenericError):
60   """Exception raised when this host is not the master."""
61
62
63 def Indent(s, prefix='| '):
64   """Indent a piece of text with a given prefix before each line.
65
66   @param s: the string to indent
67   @param prefix: the string to prepend each line
68
69   """
70   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71
72
73 def StartMaster():
74   """Try to start the master daemon.
75
76   """
77   result = utils.RunCmd(['ganeti-masterd'])
78   if result.failed:
79     logging.error("Can't start the master daemon: output '%s'", result.output)
80   return not result.failed
81
82
83 class WatcherState(object):
84   """Interface to a state file recording restart attempts.
85
86   """
87   def __init__(self):
88     """Open, lock, read and parse the file.
89
90     Raises exception on lock contention.
91
92     """
93     # The two-step dance below is necessary to allow both opening existing
94     # file read/write and creating if not existing.  Vanilla open will truncate
95     # an existing file -or- allow creating if not existing.
96     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97     self.statefile = os.fdopen(fd, 'w+')
98
99     utils.LockFile(self.statefile.fileno())
100
101     try:
102       state_data = self.statefile.read()
103       if not state_data:
104         self._data = {}
105       else:
106         self._data = serializer.Load(state_data)
107     except Exception, msg:
108       # Ignore errors while loading the file and treat it as empty
109       self._data = {}
110       logging.warning(("Invalid state file. Using defaults."
111                        " Error message: %s"), msg)
112
113     if "instance" not in self._data:
114       self._data["instance"] = {}
115     if "node" not in self._data:
116       self._data["node"] = {}
117
118     self._orig_data = serializer.Dump(self._data)
119
120   def Save(self):
121     """Save state to file, then unlock and close it.
122
123     """
124     assert self.statefile
125
126     serialized_form = serializer.Dump(self._data)
127     if self._orig_data == serialized_form:
128       logging.debug("Data didn't change, just touching status file")
129       os.utime(constants.WATCHER_STATEFILE, None)
130       return
131
132     # We need to make sure the file is locked before renaming it, otherwise
133     # starting ganeti-watcher again at the same time will create a conflict.
134     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135                          data=serialized_form,
136                          prewrite=utils.LockFile, close=False)
137     self.statefile = os.fdopen(fd, 'w+')
138
139   def Close(self):
140     """Unlock configuration file and close it.
141
142     """
143     assert self.statefile
144
145     # Files are automatically unlocked when closing them
146     self.statefile.close()
147     self.statefile = None
148
149   def GetNodeBootID(self, name):
150     """Returns the last boot ID of a node or None.
151
152     """
153     ndata = self._data["node"]
154
155     if name in ndata and KEY_BOOT_ID in ndata[name]:
156       return ndata[name][KEY_BOOT_ID]
157     return None
158
159   def SetNodeBootID(self, name, bootid):
160     """Sets the boot ID of a node.
161
162     """
163     assert bootid
164
165     ndata = self._data["node"]
166
167     if name not in ndata:
168       ndata[name] = {}
169
170     ndata[name][KEY_BOOT_ID] = bootid
171
172   def NumberOfRestartAttempts(self, instance):
173     """Returns number of previous restart attempts.
174
175     @type instance: L{Instance}
176     @param instance: the instance to look up
177
178     """
179     idata = self._data["instance"]
180
181     if instance.name in idata:
182       return idata[instance.name][KEY_RESTART_COUNT]
183
184     return 0
185
186   def RecordRestartAttempt(self, instance):
187     """Record a restart attempt.
188
189     @type instance: L{Instance}
190     @param instance: the instance being restarted
191
192     """
193     idata = self._data["instance"]
194
195     if instance.name not in idata:
196       inst = idata[instance.name] = {}
197     else:
198       inst = idata[instance.name]
199
200     inst[KEY_RESTART_WHEN] = time.time()
201     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
202
203   def RemoveInstance(self, instance):
204     """Update state to reflect that a machine is running.
205
206     This method removes the record for a named instance (as we only
207     track down instances).
208
209     @type instance: L{Instance}
210     @param instance: the instance to remove from books
211
212     """
213     idata = self._data["instance"]
214
215     if instance.name in idata:
216       del idata[instance.name]
217
218
219 class Instance(object):
220   """Abstraction for a Virtual Machine instance.
221
222   """
223   def __init__(self, name, state, autostart):
224     self.name = name
225     self.state = state
226     self.autostart = autostart
227
228   def Restart(self):
229     """Encapsulates the start of an instance.
230
231     """
232     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233     cli.SubmitOpCode(op, cl=client)
234
235   def ActivateDisks(self):
236     """Encapsulates the activation of all disks of an instance.
237
238     """
239     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240     cli.SubmitOpCode(op, cl=client)
241
242
243 def GetClusterData():
244   """Get a list of instances on this cluster.
245
246   """
247   op1_fields = ["name", "status", "admin_state", "snodes"]
248   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
249                                  use_locking=True)
250   op2_fields = ["name", "bootid", "offline"]
251   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
252                              use_locking=True)
253
254   job_id = client.SubmitJob([op1, op2])
255
256   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
257
258   logging.debug("Got data from cluster, writing instance status file")
259
260   result = all_results[0]
261   smap = {}
262
263   instances = {}
264
265   # write the upfile
266   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
267   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
268
269   for fields in result:
270     (name, status, autostart, snodes) = fields
271
272     # update the secondary node map
273     for node in snodes:
274       if node not in smap:
275         smap[node] = []
276       smap[node].append(name)
277
278     instances[name] = Instance(name, status, autostart)
279
280   nodes =  dict([(name, (bootid, offline))
281                  for name, bootid, offline in all_results[1]])
282
283   client.ArchiveJob(job_id)
284
285   return instances, nodes, smap
286
287
288 class Watcher(object):
289   """Encapsulate the logic for restarting erronously halted virtual machines.
290
291   The calling program should periodically instantiate me and call Run().
292   This will traverse the list of instances, and make up to MAXTRIES attempts
293   to restart machines that are down.
294
295   """
296   def __init__(self, opts, notepad):
297     self.notepad = notepad
298     master = client.QueryConfigValues(["master_node"])[0]
299     if master != utils.HostInfo().name:
300       raise NotMasterError("This is not the master node")
301     self.instances, self.bootids, self.smap = GetClusterData()
302     self.started_instances = set()
303     self.opts = opts
304
305   def Run(self):
306     """Watcher run sequence.
307
308     """
309     notepad = self.notepad
310     self.ArchiveJobs(self.opts.job_age)
311     self.CheckInstances(notepad)
312     self.CheckDisks(notepad)
313     self.VerifyDisks()
314
315   def ArchiveJobs(self, age):
316     """Archive old jobs.
317
318     """
319     arch_count, left_count = client.AutoArchiveJobs(age)
320     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
321
322   def CheckDisks(self, notepad):
323     """Check all nodes for restarted ones.
324
325     """
326     check_nodes = []
327     for name, (new_id, offline) in self.bootids.iteritems():
328       old = notepad.GetNodeBootID(name)
329       if new_id is None:
330         # Bad node, not returning a boot id
331         if not offline:
332           logging.debug("Node %s missing boot id, skipping secondary checks",
333                         name)
334         continue
335       if old != new_id:
336         # Node's boot ID has changed, proably through a reboot.
337         check_nodes.append(name)
338
339     if check_nodes:
340       # Activate disks for all instances with any of the checked nodes as a
341       # secondary node.
342       for node in check_nodes:
343         if node not in self.smap:
344           continue
345         for instance_name in self.smap[node]:
346           instance = self.instances[instance_name]
347           if not instance.autostart:
348             logging.info(("Skipping disk activation for non-autostart"
349                           " instance %s"), instance.name)
350             continue
351           if instance.name in self.started_instances:
352             # we already tried to start the instance, which should have
353             # activated its drives (if they can be at all)
354             continue
355           try:
356             logging.info("Activating disks for instance %s", instance.name)
357             instance.ActivateDisks()
358           except Exception:
359             logging.exception("Error while activating disks for instance %s",
360                               instance.name)
361
362       # Keep changed boot IDs
363       for name in check_nodes:
364         notepad.SetNodeBootID(name, self.bootids[name][0])
365
366   def CheckInstances(self, notepad):
367     """Make a pass over the list of instances, restarting downed ones.
368
369     """
370     for instance in self.instances.values():
371       if instance.state in BAD_STATES:
372         n = notepad.NumberOfRestartAttempts(instance)
373
374         if n > MAXTRIES:
375           # stay quiet.
376           continue
377         elif n < MAXTRIES:
378           last = " (Attempt #%d)" % (n + 1)
379         else:
380           notepad.RecordRestartAttempt(instance)
381           logging.error("Could not restart %s after %d attempts, giving up",
382                         instance.name, MAXTRIES)
383           continue
384         try:
385           logging.info("Restarting %s%s",
386                         instance.name, last)
387           instance.Restart()
388           self.started_instances.add(instance.name)
389         except Exception:
390           logging.exception("Error while restarting instance %s",
391                             instance.name)
392
393         notepad.RecordRestartAttempt(instance)
394       elif instance.state in HELPLESS_STATES:
395         if notepad.NumberOfRestartAttempts(instance):
396           notepad.RemoveInstance(instance)
397       else:
398         if notepad.NumberOfRestartAttempts(instance):
399           notepad.RemoveInstance(instance)
400           logging.info("Restart of %s succeeded", instance.name)
401
402   @staticmethod
403   def VerifyDisks():
404     """Run gnt-cluster verify-disks.
405
406     """
407     op = opcodes.OpVerifyDisks()
408     job_id = client.SubmitJob([op])
409     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
410     client.ArchiveJob(job_id)
411     if not isinstance(result, (tuple, list)):
412       logging.error("Can't get a valid result from verify-disks")
413       return
414     offline_disk_instances = result[2]
415     if not offline_disk_instances:
416       # nothing to do
417       return
418     logging.debug("Will activate disks for instances %s",
419                   ", ".join(offline_disk_instances))
420     # we submit only one job, and wait for it. not optimal, but spams
421     # less the job queue
422     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
423            for name in offline_disk_instances]
424     job_id = cli.SendJob(job, cl=client)
425
426     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427
428
429 def ParseOptions():
430   """Parse the command line options.
431
432   @return: (options, args) as from OptionParser.parse_args()
433
434   """
435   parser = OptionParser(description="Ganeti cluster watcher",
436                         usage="%prog [-d]",
437                         version="%%prog (ganeti) %s" %
438                         constants.RELEASE_VERSION)
439
440   parser.add_option("-d", "--debug", dest="debug",
441                     help="Write all messages to stderr",
442                     default=False, action="store_true")
443   parser.add_option("-A", "--job-age", dest="job_age",
444                     help="Autoarchive jobs older than this age (default"
445                     " 6 hours)", default=6*3600)
446   options, args = parser.parse_args()
447   options.job_age = cli.ParseTimespec(options.job_age)
448   return options, args
449
450
451 def main():
452   """Main function.
453
454   """
455   global client
456
457   options, args = ParseOptions()
458
459   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
460                      stderr_logging=options.debug)
461
462   update_file = True
463   try:
464     notepad = WatcherState()
465     try:
466       try:
467         client = cli.GetClient()
468       except errors.OpPrereqError:
469         # this is, from cli.GetClient, a not-master case
470         logging.debug("Not on master, exiting")
471         sys.exit(constants.EXIT_SUCCESS)
472       except luxi.NoMasterError, err:
473         logging.warning("Master seems to be down (%s), trying to restart",
474                         str(err))
475         if not StartMaster():
476           logging.critical("Can't start the master, exiting")
477           update_file = False
478           sys.exit(constants.EXIT_FAILURE)
479         # else retry the connection
480         client = cli.GetClient()
481
482       try:
483         watcher = Watcher(options, notepad)
484       except errors.ConfigurationError:
485         # Just exit if there's no configuration
486         sys.exit(constants.EXIT_SUCCESS)
487
488       watcher.Run()
489     finally:
490       if update_file:
491         notepad.Save()
492       else:
493         logging.debug("Not updating status file due to failure")
494   except SystemExit:
495     raise
496   except NotMasterError:
497     logging.debug("Not master, exiting")
498     sys.exit(constants.EXIT_NOTMASTER)
499   except errors.ResolverError, err:
500     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
501     sys.exit(constants.EXIT_NODESETUP_ERROR)
502   except Exception, err:
503     logging.error(str(err), exc_info=True)
504     sys.exit(constants.EXIT_FAILURE)
505
506
507 if __name__ == '__main__':
508   main()