Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
43
44
45 MAXTRIES = 5
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
48 NOTICE = 'NOTICE'
49 ERROR = 'ERROR'
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
53
54
55 # Global client object
56 client = None
57
58
59 class NotMasterError(errors.GenericError):
60   """Exception raised when this host is not the master."""
61
62
63 def Indent(s, prefix='| '):
64   """Indent a piece of text with a given prefix before each line.
65
66   @param s: the string to indent
67   @param prefix: the string to prepend each line
68
69   """
70   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71
72
73 def StartMaster():
74   """Try to start the master daemon.
75
76   """
77   result = utils.RunCmd(['ganeti-masterd'])
78   if result.failed:
79     logging.error("Can't start the master daemon: output '%s'", result.output)
80   return not result.failed
81
82
83 class WatcherState(object):
84   """Interface to a state file recording restart attempts.
85
86   """
87   def __init__(self):
88     """Open, lock, read and parse the file.
89
90     Raises exception on lock contention.
91
92     """
93     # The two-step dance below is necessary to allow both opening existing
94     # file read/write and creating if not existing.  Vanilla open will truncate
95     # an existing file -or- allow creating if not existing.
96     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97     self.statefile = os.fdopen(fd, 'w+')
98
99     utils.LockFile(self.statefile.fileno())
100
101     try:
102       state_data = self.statefile.read()
103       if not state_data:
104         self._data = {}
105       else:
106         self._data = serializer.Load(state_data)
107     except Exception, msg:
108       # Ignore errors while loading the file and treat it as empty
109       self._data = {}
110       logging.warning(("Invalid state file. Using defaults."
111                        " Error message: %s"), msg)
112
113     if "instance" not in self._data:
114       self._data["instance"] = {}
115     if "node" not in self._data:
116       self._data["node"] = {}
117
118     self._orig_data = serializer.Dump(self._data)
119
120   def Save(self):
121     """Save state to file, then unlock and close it.
122
123     """
124     assert self.statefile
125
126     serialized_form = serializer.Dump(self._data)
127     if self._orig_data == serialized_form:
128       logging.debug("Data didn't change, just touching status file")
129       os.utime(constants.WATCHER_STATEFILE, None)
130       return
131
132     # We need to make sure the file is locked before renaming it, otherwise
133     # starting ganeti-watcher again at the same time will create a conflict.
134     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135                          data=serialized_form,
136                          prewrite=utils.LockFile, close=False)
137     self.statefile = os.fdopen(fd, 'w+')
138
139   def Close(self):
140     """Unlock configuration file and close it.
141
142     """
143     assert self.statefile
144
145     # Files are automatically unlocked when closing them
146     self.statefile.close()
147     self.statefile = None
148
149   def GetNodeBootID(self, name):
150     """Returns the last boot ID of a node or None.
151
152     """
153     ndata = self._data["node"]
154
155     if name in ndata and KEY_BOOT_ID in ndata[name]:
156       return ndata[name][KEY_BOOT_ID]
157     return None
158
159   def SetNodeBootID(self, name, bootid):
160     """Sets the boot ID of a node.
161
162     """
163     assert bootid
164
165     ndata = self._data["node"]
166
167     if name not in ndata:
168       ndata[name] = {}
169
170     ndata[name][KEY_BOOT_ID] = bootid
171
172   def NumberOfRestartAttempts(self, instance):
173     """Returns number of previous restart attempts.
174
175     @type instance: L{Instance}
176     @param instance: the instance to look up
177
178     """
179     idata = self._data["instance"]
180
181     if instance.name in idata:
182       return idata[instance.name][KEY_RESTART_COUNT]
183
184     return 0
185
186   def RecordRestartAttempt(self, instance):
187     """Record a restart attempt.
188
189     @type instance: L{Instance}
190     @param instance: the instance being restarted
191
192     """
193     idata = self._data["instance"]
194
195     if instance.name not in idata:
196       inst = idata[instance.name] = {}
197     else:
198       inst = idata[instance.name]
199
200     inst[KEY_RESTART_WHEN] = time.time()
201     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
202
203   def RemoveInstance(self, instance):
204     """Update state to reflect that a machine is running.
205
206     This method removes the record for a named instance (as we only
207     track down instances).
208
209     @type instance: L{Instance}
210     @param instance: the instance to remove from books
211
212     """
213     idata = self._data["instance"]
214
215     if instance.name in idata:
216       del idata[instance.name]
217
218
219 class Instance(object):
220   """Abstraction for a Virtual Machine instance.
221
222   """
223   def __init__(self, name, state, autostart):
224     self.name = name
225     self.state = state
226     self.autostart = autostart
227
228   def Restart(self):
229     """Encapsulates the start of an instance.
230
231     """
232     op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233     cli.SubmitOpCode(op, cl=client)
234
235   def ActivateDisks(self):
236     """Encapsulates the activation of all disks of an instance.
237
238     """
239     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240     cli.SubmitOpCode(op, cl=client)
241
242
243 def GetClusterData():
244   """Get a list of instances on this cluster.
245
246   """
247   op1_fields = ["name", "status", "admin_state", "snodes"]
248   op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
249                                  use_locking=True)
250   op2_fields = ["name", "bootid", "offline"]
251   op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
252                              use_locking=True)
253
254   job_id = client.SubmitJob([op1, op2])
255
256   all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
257
258   result = all_results[0]
259   smap = {}
260
261   instances = {}
262   for fields in result:
263     (name, status, autostart, snodes) = fields
264
265     # update the secondary node map
266     for node in snodes:
267       if node not in smap:
268         smap[node] = []
269       smap[node].append(name)
270
271     instances[name] = Instance(name, status, autostart)
272
273   nodes =  dict([(name, (bootid, offline))
274                  for name, bootid, offline in all_results[1]])
275
276   client.ArchiveJob(job_id)
277
278   return instances, nodes, smap
279
280
281 class Watcher(object):
282   """Encapsulate the logic for restarting erronously halted virtual machines.
283
284   The calling program should periodically instantiate me and call Run().
285   This will traverse the list of instances, and make up to MAXTRIES attempts
286   to restart machines that are down.
287
288   """
289   def __init__(self, opts, notepad):
290     self.notepad = notepad
291     master = client.QueryConfigValues(["master_node"])[0]
292     if master != utils.HostInfo().name:
293       raise NotMasterError("This is not the master node")
294     self.instances, self.bootids, self.smap = GetClusterData()
295     self.started_instances = set()
296     self.opts = opts
297
298   def Run(self):
299     """Watcher run sequence.
300
301     """
302     notepad = self.notepad
303     self.ArchiveJobs(self.opts.job_age)
304     self.CheckInstances(notepad)
305     self.CheckDisks(notepad)
306     self.VerifyDisks()
307
308   def ArchiveJobs(self, age):
309     """Archive old jobs.
310
311     """
312     arch_count, left_count = client.AutoArchiveJobs(age)
313     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
314
315   def CheckDisks(self, notepad):
316     """Check all nodes for restarted ones.
317
318     """
319     check_nodes = []
320     for name, (new_id, offline) in self.bootids.iteritems():
321       old = notepad.GetNodeBootID(name)
322       if new_id is None:
323         # Bad node, not returning a boot id
324         if not offline:
325           logging.debug("Node %s missing boot id, skipping secondary checks",
326                         name)
327         continue
328       if old != new_id:
329         # Node's boot ID has changed, proably through a reboot.
330         check_nodes.append(name)
331
332     if check_nodes:
333       # Activate disks for all instances with any of the checked nodes as a
334       # secondary node.
335       for node in check_nodes:
336         if node not in self.smap:
337           continue
338         for instance_name in self.smap[node]:
339           instance = self.instances[instance_name]
340           if not instance.autostart:
341             logging.info(("Skipping disk activation for non-autostart"
342                           " instance %s"), instance.name)
343             continue
344           if instance.name in self.started_instances:
345             # we already tried to start the instance, which should have
346             # activated its drives (if they can be at all)
347             continue
348           try:
349             logging.info("Activating disks for instance %s", instance.name)
350             instance.ActivateDisks()
351           except Exception:
352             logging.exception("Error while activating disks for instance %s",
353                               instance.name)
354
355       # Keep changed boot IDs
356       for name in check_nodes:
357         notepad.SetNodeBootID(name, self.bootids[name][0])
358
359   def CheckInstances(self, notepad):
360     """Make a pass over the list of instances, restarting downed ones.
361
362     """
363     for instance in self.instances.values():
364       if instance.state in BAD_STATES:
365         n = notepad.NumberOfRestartAttempts(instance)
366
367         if n > MAXTRIES:
368           # stay quiet.
369           continue
370         elif n < MAXTRIES:
371           last = " (Attempt #%d)" % (n + 1)
372         else:
373           notepad.RecordRestartAttempt(instance)
374           logging.error("Could not restart %s after %d attempts, giving up",
375                         instance.name, MAXTRIES)
376           continue
377         try:
378           logging.info("Restarting %s%s",
379                         instance.name, last)
380           instance.Restart()
381           self.started_instances.add(instance.name)
382         except Exception:
383           logging.exception("Error while restarting instance %s",
384                             instance.name)
385
386         notepad.RecordRestartAttempt(instance)
387       elif instance.state in HELPLESS_STATES:
388         if notepad.NumberOfRestartAttempts(instance):
389           notepad.RemoveInstance(instance)
390       else:
391         if notepad.NumberOfRestartAttempts(instance):
392           notepad.RemoveInstance(instance)
393           logging.info("Restart of %s succeeded", instance.name)
394
395   @staticmethod
396   def VerifyDisks():
397     """Run gnt-cluster verify-disks.
398
399     """
400     op = opcodes.OpVerifyDisks()
401     job_id = client.SubmitJob([op])
402     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
403     client.ArchiveJob(job_id)
404     if not isinstance(result, (tuple, list)):
405       logging.error("Can't get a valid result from verify-disks")
406       return
407     offline_disk_instances = result[2]
408     if not offline_disk_instances:
409       # nothing to do
410       return
411     logging.debug("Will activate disks for instances %s",
412                   ", ".join(offline_disk_instances))
413     # we submit only one job, and wait for it. not optimal, but spams
414     # less the job queue
415     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
416            for name in offline_disk_instances]
417     job_id = cli.SendJob(job, cl=client)
418
419     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
420
421
422 def ParseOptions():
423   """Parse the command line options.
424
425   @return: (options, args) as from OptionParser.parse_args()
426
427   """
428   parser = OptionParser(description="Ganeti cluster watcher",
429                         usage="%prog [-d]",
430                         version="%%prog (ganeti) %s" %
431                         constants.RELEASE_VERSION)
432
433   parser.add_option("-d", "--debug", dest="debug",
434                     help="Write all messages to stderr",
435                     default=False, action="store_true")
436   parser.add_option("-A", "--job-age", dest="job_age",
437                     help="Autoarchive jobs older than this age (default"
438                     " 6 hours)", default=6*3600)
439   options, args = parser.parse_args()
440   options.job_age = cli.ParseTimespec(options.job_age)
441   return options, args
442
443
444 def main():
445   """Main function.
446
447   """
448   global client
449
450   options, args = ParseOptions()
451
452   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
453                      stderr_logging=options.debug)
454
455   update_file = True
456   try:
457     notepad = WatcherState()
458     try:
459       try:
460         client = cli.GetClient()
461       except errors.OpPrereqError:
462         # this is, from cli.GetClient, a not-master case
463         logging.debug("Not on master, exiting")
464         sys.exit(constants.EXIT_SUCCESS)
465       except luxi.NoMasterError, err:
466         logging.warning("Master seems to be down (%s), trying to restart",
467                         str(err))
468         if not StartMaster():
469           logging.critical("Can't start the master, exiting")
470           update_file = False
471           sys.exit(constants.EXIT_FAILURE)
472         # else retry the connection
473         client = cli.GetClient()
474
475       try:
476         watcher = Watcher(options, notepad)
477       except errors.ConfigurationError:
478         # Just exit if there's no configuration
479         sys.exit(constants.EXIT_SUCCESS)
480
481       watcher.Run()
482     finally:
483       if update_file:
484         notepad.Save()
485       else:
486         logging.debug("Not updating status file due to failure")
487   except SystemExit:
488     raise
489   except NotMasterError:
490     logging.debug("Not master, exiting")
491     sys.exit(constants.EXIT_NOTMASTER)
492   except errors.ResolverError, err:
493     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
494     sys.exit(constants.EXIT_NODESETUP_ERROR)
495   except Exception, err:
496     logging.error(str(err), exc_info=True)
497     sys.exit(constants.EXIT_FAILURE)
498
499
500 if __name__ == '__main__':
501   main()