watcher: handle full and drained queue cases
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
43
44
45 MAXTRIES = 5
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
48 NOTICE = 'NOTICE'
49 ERROR = 'ERROR'
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
53
54
55 # Global client object
56 client = None
57
58
59 class NotMasterError(errors.GenericError):
60   """Exception raised when this host is not the master."""
61
62
63 def Indent(s, prefix='| '):
64   """Indent a piece of text with a given prefix before each line.
65
66   @param s: the string to indent
67   @param prefix: the string to prepend each line
68
69   """
70   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71
72
73 def StartMaster():
74   """Try to start the master daemon.
75
76   """
77   result = utils.RunCmd(['ganeti-masterd'])
78   if result.failed:
79     logging.error("Can't start the master daemon: output '%s'", result.output)
80   return not result.failed
81
82
83 class WatcherState(object):
84   """Interface to a state file recording restart attempts.
85
86   """
87   def __init__(self):
88     """Open, lock, read and parse the file.
89
90     Raises exception on lock contention.
91
92     """
93     # The two-step dance below is necessary to allow both opening existing
94     # file read/write and creating if not existing.  Vanilla open will truncate
95     # an existing file -or- allow creating if not existing.
96     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97     self.statefile = os.fdopen(fd, 'w+')
98
99     utils.LockFile(self.statefile.fileno())
100
101     try:
102       state_data = self.statefile.read()
103       if not state_data:
104         self._data = {}
105       else:
106         self._data = serializer.Load(state_data)
107     except Exception, msg:
108       # Ignore errors while loading the file and treat it as empty
109       self._data = {}
110       logging.warning(("Invalid state file. Using defaults."
111                        " Error message: %s"), msg)
112
113     if "instance" not in self._data:
114       self._data["instance"] = {}
115     if "node" not in self._data:
116       self._data["node"] = {}
117
118     self._orig_data = serializer.Dump(self._data)
119
120   def Save(self):
121     """Save state to file, then unlock and close it.
122
123     """
124     assert self.statefile
125
126     serialized_form = serializer.Dump(self._data)
127     if self._orig_data == serialized_form:
128       logging.debug("Data didn't change, just touching status file")
129       os.utime(constants.WATCHER_STATEFILE, None)
130       return
131
132     # We need to make sure the file is locked before renaming it, otherwise
133     # starting ganeti-watcher again at the same time will create a conflict.
134     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135                          data=serialized_form,
136                          prewrite=utils.LockFile, close=False)
137     self.statefile = os.fdopen(fd, 'w+')
138
139   def Close(self):
140     """Unlock configuration file and close it.
141
142     """
143     assert self.statefile
144
145     # Files are automatically unlocked when closing them
146     self.statefile.close()
147     self.statefile = None
148
149   def GetNodeBootID(self, name):
150     """Returns the last boot ID of a node or None.
151
152     """
153     ndata = self._data["node"]
154
155     if name in ndata and KEY_BOOT_ID in ndata[name]:
156       return ndata[name][KEY_BOOT_ID]
157     return None
158
159   def SetNodeBootID(self, name, bootid):
160     """Sets the boot ID of a node.
161
162     """
163     assert bootid
164
165     ndata = self._data["node"]
166
167     if name not in ndata:
168       ndata[name] = {}
169
170     ndata[name][KEY_BOOT_ID] = bootid
171
172   def NumberOfRestartAttempts(self, instance):
173     """Returns number of previous restart attempts.
174
175     @type instance: L{Instance}
176     @param instance: the instance to look up
177
178     """
179     idata = self._data["instance"]
180
181     if instance.name in idata:
182       return idata[instance.name][KEY_RESTART_COUNT]
183
184     return 0
185
186   def RecordRestartAttempt(self, instance):
187     """Record a restart attempt.
188
189     @type instance: L{Instance}
190     @param instance: the instance being restarted
191
192     """
193     idata = self._data["instance"]
194
195     if instance.name not in idata:
196       inst = idata[instance.name] = {}
197     else:
198       inst = idata[instance.name]
199
200     inst[KEY_RESTART_WHEN] = time.time()
201     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
202
203   def RemoveInstance(self, instance):
204     """Update state to reflect that a machine is running.
205
206     This method removes the record for a named instance (as we only
207     track down instances).
208
209     @type instance: L{Instance}
210     @param instance: the instance to remove from books
211
212     """
213     idata = self._data["instance"]
214
215     if instance.name in idata:
216       del idata[instance.name]
217
218
219 class Instance(object):
220   """Abstraction for a Virtual Machine instance.
221
222   """
223   def __init__(self, name, state, autostart):
224     self.name = name
225     self.state = state
226     self.autostart = autostart
227
228   def Restart(self):
229     """Encapsulates the start of an instance.
230
231     """
232     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233     cli.SubmitOpCode(op, cl=client)
234
235   def ActivateDisks(self):
236     """Encapsulates the activation of all disks of an instance.
237
238     """
239     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240     cli.SubmitOpCode(op, cl=client)
241
242
243 def GetClusterData():
244   """Get a list of instances on this cluster.
245
246   """
247   op1_fields = ["name", "status", "admin_state", "snodes"]
248   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
249                                  use_locking=True)
250   op2_fields = ["name", "bootid", "offline"]
251   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
252                              use_locking=True)
253
254   job_id = client.SubmitJob([op1, op2])
255
256   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
257
258   logging.debug("Got data from cluster, writing instance status file")
259
260   result = all_results[0]
261   smap = {}
262
263   instances = {}
264
265   # write the upfile
266   up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
267   utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
268
269   for fields in result:
270     (name, status, autostart, snodes) = fields
271
272     # update the secondary node map
273     for node in snodes:
274       if node not in smap:
275         smap[node] = []
276       smap[node].append(name)
277
278     instances[name] = Instance(name, status, autostart)
279
280   nodes =  dict([(name, (bootid, offline))
281                  for name, bootid, offline in all_results[1]])
282
283   client.ArchiveJob(job_id)
284
285   return instances, nodes, smap
286
287
288 class Watcher(object):
289   """Encapsulate the logic for restarting erronously halted virtual machines.
290
291   The calling program should periodically instantiate me and call Run().
292   This will traverse the list of instances, and make up to MAXTRIES attempts
293   to restart machines that are down.
294
295   """
296   def __init__(self, opts, notepad):
297     self.notepad = notepad
298     master = client.QueryConfigValues(["master_node"])[0]
299     if master != utils.HostInfo().name:
300       raise NotMasterError("This is not the master node")
301     # first archive old jobs
302     self.ArchiveJobs(opts.job_age)
303     # and only then submit new ones
304     self.instances, self.bootids, self.smap = GetClusterData()
305     self.started_instances = set()
306     self.opts = opts
307
308   def Run(self):
309     """Watcher run sequence.
310
311     """
312     notepad = self.notepad
313     self.CheckInstances(notepad)
314     self.CheckDisks(notepad)
315     self.VerifyDisks()
316
317   @staticmethod
318   def ArchiveJobs(age):
319     """Archive old jobs.
320
321     """
322     arch_count, left_count = client.AutoArchiveJobs(age)
323     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
324
325   def CheckDisks(self, notepad):
326     """Check all nodes for restarted ones.
327
328     """
329     check_nodes = []
330     for name, (new_id, offline) in self.bootids.iteritems():
331       old = notepad.GetNodeBootID(name)
332       if new_id is None:
333         # Bad node, not returning a boot id
334         if not offline:
335           logging.debug("Node %s missing boot id, skipping secondary checks",
336                         name)
337         continue
338       if old != new_id:
339         # Node's boot ID has changed, proably through a reboot.
340         check_nodes.append(name)
341
342     if check_nodes:
343       # Activate disks for all instances with any of the checked nodes as a
344       # secondary node.
345       for node in check_nodes:
346         if node not in self.smap:
347           continue
348         for instance_name in self.smap[node]:
349           instance = self.instances[instance_name]
350           if not instance.autostart:
351             logging.info(("Skipping disk activation for non-autostart"
352                           " instance %s"), instance.name)
353             continue
354           if instance.name in self.started_instances:
355             # we already tried to start the instance, which should have
356             # activated its drives (if they can be at all)
357             continue
358           try:
359             logging.info("Activating disks for instance %s", instance.name)
360             instance.ActivateDisks()
361           except Exception:
362             logging.exception("Error while activating disks for instance %s",
363                               instance.name)
364
365       # Keep changed boot IDs
366       for name in check_nodes:
367         notepad.SetNodeBootID(name, self.bootids[name][0])
368
369   def CheckInstances(self, notepad):
370     """Make a pass over the list of instances, restarting downed ones.
371
372     """
373     for instance in self.instances.values():
374       if instance.state in BAD_STATES:
375         n = notepad.NumberOfRestartAttempts(instance)
376
377         if n > MAXTRIES:
378           # stay quiet.
379           continue
380         elif n < MAXTRIES:
381           last = " (Attempt #%d)" % (n + 1)
382         else:
383           notepad.RecordRestartAttempt(instance)
384           logging.error("Could not restart %s after %d attempts, giving up",
385                         instance.name, MAXTRIES)
386           continue
387         try:
388           logging.info("Restarting %s%s",
389                         instance.name, last)
390           instance.Restart()
391           self.started_instances.add(instance.name)
392         except Exception:
393           logging.exception("Error while restarting instance %s",
394                             instance.name)
395
396         notepad.RecordRestartAttempt(instance)
397       elif instance.state in HELPLESS_STATES:
398         if notepad.NumberOfRestartAttempts(instance):
399           notepad.RemoveInstance(instance)
400       else:
401         if notepad.NumberOfRestartAttempts(instance):
402           notepad.RemoveInstance(instance)
403           logging.info("Restart of %s succeeded", instance.name)
404
405   @staticmethod
406   def VerifyDisks():
407     """Run gnt-cluster verify-disks.
408
409     """
410     op = opcodes.OpVerifyDisks()
411     job_id = client.SubmitJob([op])
412     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
413     client.ArchiveJob(job_id)
414     if not isinstance(result, (tuple, list)):
415       logging.error("Can't get a valid result from verify-disks")
416       return
417     offline_disk_instances = result[2]
418     if not offline_disk_instances:
419       # nothing to do
420       return
421     logging.debug("Will activate disks for instances %s",
422                   ", ".join(offline_disk_instances))
423     # we submit only one job, and wait for it. not optimal, but spams
424     # less the job queue
425     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
426            for name in offline_disk_instances]
427     job_id = cli.SendJob(job, cl=client)
428
429     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430
431
432 def ParseOptions():
433   """Parse the command line options.
434
435   @return: (options, args) as from OptionParser.parse_args()
436
437   """
438   parser = OptionParser(description="Ganeti cluster watcher",
439                         usage="%prog [-d]",
440                         version="%%prog (ganeti) %s" %
441                         constants.RELEASE_VERSION)
442
443   parser.add_option("-d", "--debug", dest="debug",
444                     help="Write all messages to stderr",
445                     default=False, action="store_true")
446   parser.add_option("-A", "--job-age", dest="job_age",
447                     help="Autoarchive jobs older than this age (default"
448                     " 6 hours)", default=6*3600)
449   options, args = parser.parse_args()
450   options.job_age = cli.ParseTimespec(options.job_age)
451   return options, args
452
453
454 def main():
455   """Main function.
456
457   """
458   global client
459
460   options, args = ParseOptions()
461
462   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
463                      stderr_logging=options.debug)
464
465   update_file = False
466   try:
467     notepad = WatcherState()
468     try:
469       try:
470         client = cli.GetClient()
471       except errors.OpPrereqError:
472         # this is, from cli.GetClient, a not-master case
473         logging.debug("Not on master, exiting")
474         update_file = True
475         sys.exit(constants.EXIT_SUCCESS)
476       except luxi.NoMasterError, err:
477         logging.warning("Master seems to be down (%s), trying to restart",
478                         str(err))
479         if not StartMaster():
480           logging.critical("Can't start the master, exiting")
481           sys.exit(constants.EXIT_FAILURE)
482         # else retry the connection
483         client = cli.GetClient()
484
485       try:
486         watcher = Watcher(options, notepad)
487       except errors.ConfigurationError:
488         # Just exit if there's no configuration
489         update_file = True
490         sys.exit(constants.EXIT_SUCCESS)
491
492       watcher.Run()
493       update_file = True
494
495     finally:
496       if update_file:
497         notepad.Save()
498       else:
499         logging.debug("Not updating status file due to failure")
500   except SystemExit:
501     raise
502   except NotMasterError:
503     logging.debug("Not master, exiting")
504     sys.exit(constants.EXIT_NOTMASTER)
505   except errors.ResolverError, err:
506     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
507     sys.exit(constants.EXIT_NODESETUP_ERROR)
508   except errors.JobQueueFull:
509     logging.error("Job queue is full, can't query cluster state")
510   except errors.JobQueueDrainError:
511     logging.error("Job queue is drained, can't maintain cluster state")
512   except Exception, err:
513     logging.error(str(err), exc_info=True)
514     sys.exit(constants.EXIT_FAILURE)
515
516
517 if __name__ == '__main__':
518   main()