Switch gnt-debug submit-job to JobExecutor
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
43
44
45 MAXTRIES = 5
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
48 NOTICE = 'NOTICE'
49 ERROR = 'ERROR'
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
53
54
55 # Global client object
56 client = None
57
58
59 class NotMasterError(errors.GenericError):
60   """Exception raised when this host is not the master."""
61
62
63 def Indent(s, prefix='| '):
64   """Indent a piece of text with a given prefix before each line.
65
66   @param s: the string to indent
67   @param prefix: the string to prepend each line
68
69   """
70   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71
72
73 def StartMaster():
74   """Try to start the master daemon.
75
76   """
77   result = utils.RunCmd(['ganeti-masterd'])
78   if result.failed:
79     logging.error("Can't start the master daemon: output '%s'", result.output)
80   return not result.failed
81
82
83 def EnsureDaemon(daemon):
84   """Check for and start daemon if not alive.
85
86   """
87   pidfile = utils.DaemonPidFileName(daemon)
88   pid = utils.ReadPidFile(pidfile)
89   if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
90     logging.debug("Daemon '%s' not alive, trying to restart", daemon)
91     result = utils.RunCmd([daemon])
92     if not result:
93       logging.error("Can't start daemon '%s', failure %s, output: %s",
94                     daemon, result.fail_reason, result.output)
95
96
97 class WatcherState(object):
98   """Interface to a state file recording restart attempts.
99
100   """
101   def __init__(self):
102     """Open, lock, read and parse the file.
103
104     Raises exception on lock contention.
105
106     """
107     # The two-step dance below is necessary to allow both opening existing
108     # file read/write and creating if not existing.  Vanilla open will truncate
109     # an existing file -or- allow creating if not existing.
110     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
111     self.statefile = os.fdopen(fd, 'w+')
112
113     utils.LockFile(self.statefile.fileno())
114
115     try:
116       state_data = self.statefile.read()
117       if not state_data:
118         self._data = {}
119       else:
120         self._data = serializer.Load(state_data)
121     except Exception, msg:
122       # Ignore errors while loading the file and treat it as empty
123       self._data = {}
124       logging.warning(("Invalid state file. Using defaults."
125                        " Error message: %s"), msg)
126
127     if "instance" not in self._data:
128       self._data["instance"] = {}
129     if "node" not in self._data:
130       self._data["node"] = {}
131
132     self._orig_data = serializer.Dump(self._data)
133
134   def Save(self):
135     """Save state to file, then unlock and close it.
136
137     """
138     assert self.statefile
139
140     serialized_form = serializer.Dump(self._data)
141     if self._orig_data == serialized_form:
142       logging.debug("Data didn't change, just touching status file")
143       os.utime(constants.WATCHER_STATEFILE, None)
144       return
145
146     # We need to make sure the file is locked before renaming it, otherwise
147     # starting ganeti-watcher again at the same time will create a conflict.
148     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
149                          data=serialized_form,
150                          prewrite=utils.LockFile, close=False)
151     self.statefile = os.fdopen(fd, 'w+')
152
153   def Close(self):
154     """Unlock configuration file and close it.
155
156     """
157     assert self.statefile
158
159     # Files are automatically unlocked when closing them
160     self.statefile.close()
161     self.statefile = None
162
163   def GetNodeBootID(self, name):
164     """Returns the last boot ID of a node or None.
165
166     """
167     ndata = self._data["node"]
168
169     if name in ndata and KEY_BOOT_ID in ndata[name]:
170       return ndata[name][KEY_BOOT_ID]
171     return None
172
173   def SetNodeBootID(self, name, bootid):
174     """Sets the boot ID of a node.
175
176     """
177     assert bootid
178
179     ndata = self._data["node"]
180
181     if name not in ndata:
182       ndata[name] = {}
183
184     ndata[name][KEY_BOOT_ID] = bootid
185
186   def NumberOfRestartAttempts(self, instance):
187     """Returns number of previous restart attempts.
188
189     @type instance: L{Instance}
190     @param instance: the instance to look up
191
192     """
193     idata = self._data["instance"]
194
195     if instance.name in idata:
196       return idata[instance.name][KEY_RESTART_COUNT]
197
198     return 0
199
200   def RecordRestartAttempt(self, instance):
201     """Record a restart attempt.
202
203     @type instance: L{Instance}
204     @param instance: the instance being restarted
205
206     """
207     idata = self._data["instance"]
208
209     if instance.name not in idata:
210       inst = idata[instance.name] = {}
211     else:
212       inst = idata[instance.name]
213
214     inst[KEY_RESTART_WHEN] = time.time()
215     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
216
217   def RemoveInstance(self, instance):
218     """Update state to reflect that a machine is running.
219
220     This method removes the record for a named instance (as we only
221     track down instances).
222
223     @type instance: L{Instance}
224     @param instance: the instance to remove from books
225
226     """
227     idata = self._data["instance"]
228
229     if instance.name in idata:
230       del idata[instance.name]
231
232
233 class Instance(object):
234   """Abstraction for a Virtual Machine instance.
235
236   """
237   def __init__(self, name, state, autostart):
238     self.name = name
239     self.state = state
240     self.autostart = autostart
241
242   def Restart(self):
243     """Encapsulates the start of an instance.
244
245     """
246     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
247     cli.SubmitOpCode(op, cl=client)
248
249   def ActivateDisks(self):
250     """Encapsulates the activation of all disks of an instance.
251
252     """
253     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
254     cli.SubmitOpCode(op, cl=client)
255
256
257 def GetClusterData():
258   """Get a list of instances on this cluster.
259
260   """
261   op1_fields = ["name", "status", "admin_state", "snodes"]
262   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
263                                  use_locking=True)
264   op2_fields = ["name", "bootid", "offline"]
265   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
266                              use_locking=True)
267
268   job_id = client.SubmitJob([op1, op2])
269
270   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
271
272   logging.debug("Got data from cluster, writing instance status file")
273
274   result = all_results[0]
275   smap = {}
276
277   instances = {}
278
279   # write the upfile
280   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
281   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
282
283   for fields in result:
284     (name, status, autostart, snodes) = fields
285
286     # update the secondary node map
287     for node in snodes:
288       if node not in smap:
289         smap[node] = []
290       smap[node].append(name)
291
292     instances[name] = Instance(name, status, autostart)
293
294   nodes =  dict([(name, (bootid, offline))
295                  for name, bootid, offline in all_results[1]])
296
297   client.ArchiveJob(job_id)
298
299   return instances, nodes, smap
300
301
302 class Watcher(object):
303   """Encapsulate the logic for restarting erronously halted virtual machines.
304
305   The calling program should periodically instantiate me and call Run().
306   This will traverse the list of instances, and make up to MAXTRIES attempts
307   to restart machines that are down.
308
309   """
310   def __init__(self, opts, notepad):
311     self.notepad = notepad
312     master = client.QueryConfigValues(["master_node"])[0]
313     if master != utils.HostInfo().name:
314       raise NotMasterError("This is not the master node")
315     # first archive old jobs
316     self.ArchiveJobs(opts.job_age)
317     # and only then submit new ones
318     self.instances, self.bootids, self.smap = GetClusterData()
319     self.started_instances = set()
320     self.opts = opts
321
322   def Run(self):
323     """Watcher run sequence.
324
325     """
326     notepad = self.notepad
327     self.CheckInstances(notepad)
328     self.CheckDisks(notepad)
329     self.VerifyDisks()
330
331   @staticmethod
332   def ArchiveJobs(age):
333     """Archive old jobs.
334
335     """
336     arch_count, left_count = client.AutoArchiveJobs(age)
337     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
338
339   def CheckDisks(self, notepad):
340     """Check all nodes for restarted ones.
341
342     """
343     check_nodes = []
344     for name, (new_id, offline) in self.bootids.iteritems():
345       old = notepad.GetNodeBootID(name)
346       if new_id is None:
347         # Bad node, not returning a boot id
348         if not offline:
349           logging.debug("Node %s missing boot id, skipping secondary checks",
350                         name)
351         continue
352       if old != new_id:
353         # Node's boot ID has changed, proably through a reboot.
354         check_nodes.append(name)
355
356     if check_nodes:
357       # Activate disks for all instances with any of the checked nodes as a
358       # secondary node.
359       for node in check_nodes:
360         if node not in self.smap:
361           continue
362         for instance_name in self.smap[node]:
363           instance = self.instances[instance_name]
364           if not instance.autostart:
365             logging.info(("Skipping disk activation for non-autostart"
366                           " instance %s"), instance.name)
367             continue
368           if instance.name in self.started_instances:
369             # we already tried to start the instance, which should have
370             # activated its drives (if they can be at all)
371             continue
372           try:
373             logging.info("Activating disks for instance %s", instance.name)
374             instance.ActivateDisks()
375           except Exception:
376             logging.exception("Error while activating disks for instance %s",
377                               instance.name)
378
379       # Keep changed boot IDs
380       for name in check_nodes:
381         notepad.SetNodeBootID(name, self.bootids[name][0])
382
383   def CheckInstances(self, notepad):
384     """Make a pass over the list of instances, restarting downed ones.
385
386     """
387     for instance in self.instances.values():
388       if instance.state in BAD_STATES:
389         n = notepad.NumberOfRestartAttempts(instance)
390
391         if n > MAXTRIES:
392           # stay quiet.
393           continue
394         elif n < MAXTRIES:
395           last = " (Attempt #%d)" % (n + 1)
396         else:
397           notepad.RecordRestartAttempt(instance)
398           logging.error("Could not restart %s after %d attempts, giving up",
399                         instance.name, MAXTRIES)
400           continue
401         try:
402           logging.info("Restarting %s%s",
403                         instance.name, last)
404           instance.Restart()
405           self.started_instances.add(instance.name)
406         except Exception:
407           logging.exception("Error while restarting instance %s",
408                             instance.name)
409
410         notepad.RecordRestartAttempt(instance)
411       elif instance.state in HELPLESS_STATES:
412         if notepad.NumberOfRestartAttempts(instance):
413           notepad.RemoveInstance(instance)
414       else:
415         if notepad.NumberOfRestartAttempts(instance):
416           notepad.RemoveInstance(instance)
417           logging.info("Restart of %s succeeded", instance.name)
418
419   @staticmethod
420   def VerifyDisks():
421     """Run gnt-cluster verify-disks.
422
423     """
424     op = opcodes.OpVerifyDisks()
425     job_id = client.SubmitJob([op])
426     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
427     client.ArchiveJob(job_id)
428     if not isinstance(result, (tuple, list)):
429       logging.error("Can't get a valid result from verify-disks")
430       return
431     offline_disk_instances = result[2]
432     if not offline_disk_instances:
433       # nothing to do
434       return
435     logging.debug("Will activate disks for instances %s",
436                   ", ".join(offline_disk_instances))
437     # we submit only one job, and wait for it. not optimal, but spams
438     # less the job queue
439     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
440            for name in offline_disk_instances]
441     job_id = cli.SendJob(job, cl=client)
442
443     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
444
445
446 def ParseOptions():
447   """Parse the command line options.
448
449   @return: (options, args) as from OptionParser.parse_args()
450
451   """
452   parser = OptionParser(description="Ganeti cluster watcher",
453                         usage="%prog [-d]",
454                         version="%%prog (ganeti) %s" %
455                         constants.RELEASE_VERSION)
456
457   parser.add_option("-d", "--debug", dest="debug",
458                     help="Write all messages to stderr",
459                     default=False, action="store_true")
460   parser.add_option("-A", "--job-age", dest="job_age",
461                     help="Autoarchive jobs older than this age (default"
462                     " 6 hours)", default=6*3600)
463   options, args = parser.parse_args()
464   options.job_age = cli.ParseTimespec(options.job_age)
465   return options, args
466
467
468 def main():
469   """Main function.
470
471   """
472   global client
473
474   options, args = ParseOptions()
475
476   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
477                      stderr_logging=options.debug)
478
479   update_file = False
480   try:
481     # on master or not, try to start the node dameon (use _PID but is
482     # the same as daemon name)
483     EnsureDaemon(constants.NODED_PID)
484
485     notepad = WatcherState()
486     try:
487       try:
488         client = cli.GetClient()
489       except errors.OpPrereqError:
490         # this is, from cli.GetClient, a not-master case
491         logging.debug("Not on master, exiting")
492         update_file = True
493         sys.exit(constants.EXIT_SUCCESS)
494       except luxi.NoMasterError, err:
495         logging.warning("Master seems to be down (%s), trying to restart",
496                         str(err))
497         if not StartMaster():
498           logging.critical("Can't start the master, exiting")
499           sys.exit(constants.EXIT_FAILURE)
500         # else retry the connection
501         client = cli.GetClient()
502
503       # we are on master now (use _PID but is the same as daemon name)
504       EnsureDaemon(constants.RAPI_PID)
505
506       try:
507         watcher = Watcher(options, notepad)
508       except errors.ConfigurationError:
509         # Just exit if there's no configuration
510         update_file = True
511         sys.exit(constants.EXIT_SUCCESS)
512
513       watcher.Run()
514       update_file = True
515
516     finally:
517       if update_file:
518         notepad.Save()
519       else:
520         logging.debug("Not updating status file due to failure")
521   except SystemExit:
522     raise
523   except NotMasterError:
524     logging.debug("Not master, exiting")
525     sys.exit(constants.EXIT_NOTMASTER)
526   except errors.ResolverError, err:
527     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
528     sys.exit(constants.EXIT_NODESETUP_ERROR)
529   except errors.JobQueueFull:
530     logging.error("Job queue is full, can't query cluster state")
531   except errors.JobQueueDrainError:
532     logging.error("Job queue is drained, can't maintain cluster state")
533   except Exception, err:
534     logging.error(str(err), exc_info=True)
535     sys.exit(constants.EXIT_FAILURE)
536
537
538 if __name__ == '__main__':
539   main()