watcher: autoarchive old jobs
[ganeti-local] / daemons / ganeti-watcher
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Tool to restart erronously downed virtual machines.
23
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot.  Run from cron or similar.
27
28 """
29
30 import os
31 import sys
32 import time
33 import logging
34 from optparse import OptionParser
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42
43
44 MAXTRIES = 5
45 BAD_STATES = ['ERROR_down']
46 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47 NOTICE = 'NOTICE'
48 ERROR = 'ERROR'
49 KEY_RESTART_COUNT = "restart_count"
50 KEY_RESTART_WHEN = "restart_when"
51 KEY_BOOT_ID = "bootid"
52
53
54 # Global client object
55 client = None
56
57
58 class NotMasterError(errors.GenericError):
59   """Exception raised when this host is not the master."""
60
61
62 def Indent(s, prefix='| '):
63   """Indent a piece of text with a given prefix before each line.
64
65   @param s: the string to indent
66   @param prefix: the string to prepend each line
67
68   """
69   return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70
71
72 class WatcherState(object):
73   """Interface to a state file recording restart attempts.
74
75   """
76   def __init__(self):
77     """Open, lock, read and parse the file.
78
79     Raises exception on lock contention.
80
81     """
82     # The two-step dance below is necessary to allow both opening existing
83     # file read/write and creating if not existing.  Vanilla open will truncate
84     # an existing file -or- allow creating if not existing.
85     fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86     self.statefile = os.fdopen(fd, 'w+')
87
88     utils.LockFile(self.statefile.fileno())
89
90     try:
91       self._data = serializer.Load(self.statefile.read())
92     except Exception, msg:
93       # Ignore errors while loading the file and treat it as empty
94       self._data = {}
95       logging.warning(("Empty or invalid state file. Using defaults."
96                        " Error message: %s"), msg)
97
98     if "instance" not in self._data:
99       self._data["instance"] = {}
100     if "node" not in self._data:
101       self._data["node"] = {}
102
103     self._orig_data = serializer.Dump(self._data)
104
105   def Save(self):
106     """Save state to file, then unlock and close it.
107
108     """
109     assert self.statefile
110
111     serialized_form = serializer.Dump(self._data)
112     if self._orig_data == serialized_form:
113       logging.debug("Data didn't change, just touching status file")
114       os.utime(constants.WATCHER_STATEFILE, None)
115       return
116
117     # We need to make sure the file is locked before renaming it, otherwise
118     # starting ganeti-watcher again at the same time will create a conflict.
119     fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120                          data=serialized_form,
121                          prewrite=utils.LockFile, close=False)
122     self.statefile = os.fdopen(fd, 'w+')
123
124   def Close(self):
125     """Unlock configuration file and close it.
126
127     """
128     assert self.statefile
129
130     # Files are automatically unlocked when closing them
131     self.statefile.close()
132     self.statefile = None
133
134   def GetNodeBootID(self, name):
135     """Returns the last boot ID of a node or None.
136
137     """
138     ndata = self._data["node"]
139
140     if name in ndata and KEY_BOOT_ID in ndata[name]:
141       return ndata[name][KEY_BOOT_ID]
142     return None
143
144   def SetNodeBootID(self, name, bootid):
145     """Sets the boot ID of a node.
146
147     """
148     assert bootid
149
150     ndata = self._data["node"]
151
152     if name not in ndata:
153       ndata[name] = {}
154
155     ndata[name][KEY_BOOT_ID] = bootid
156
157   def NumberOfRestartAttempts(self, instance):
158     """Returns number of previous restart attempts.
159
160     @type instance: L{Instance}
161     @param instance: the instance to look up
162
163     """
164     idata = self._data["instance"]
165
166     if instance.name in idata:
167       return idata[instance.name][KEY_RESTART_COUNT]
168
169     return 0
170
171   def RecordRestartAttempt(self, instance):
172     """Record a restart attempt.
173
174     @type instance: L{Instance}
175     @param instance: the instance being restarted
176
177     """
178     idata = self._data["instance"]
179
180     if instance.name not in idata:
181       inst = idata[instance.name] = {}
182     else:
183       inst = idata[instance.name]
184
185     inst[KEY_RESTART_WHEN] = time.time()
186     inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187
188   def RemoveInstance(self, instance):
189     """Update state to reflect that a machine is running.
190
191     This method removes the record for a named instance (as we only
192     track down instances).
193
194     @type instance: L{Instance}
195     @param instance: the instance to remove from books
196
197     """
198     idata = self._data["instance"]
199
200     if instance.name in idata:
201       del idata[instance.name]
202
203
204 class Instance(object):
205   """Abstraction for a Virtual Machine instance.
206
207   """
208   def __init__(self, name, state, autostart):
209     self.name = name
210     self.state = state
211     self.autostart = autostart
212
213   def Restart(self):
214     """Encapsulates the start of an instance.
215
216     """
217     op = opcodes.OpStartupInstance(instance_name=self.name,
218                                    force=False,
219                                    extra_args=None)
220     cli.SubmitOpCode(op, cl=client)
221
222   def ActivateDisks(self):
223     """Encapsulates the activation of all disks of an instance.
224
225     """
226     op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
227     cli.SubmitOpCode(op, cl=client)
228
229
230 def GetInstanceList(with_secondaries=None):
231   """Get a list of instances on this cluster.
232
233   """
234   fields = ["name", "status", "admin_state"]
235
236   if with_secondaries is not None:
237     fields.append("snodes")
238
239   result = client.QueryInstances([], fields, True)
240
241   instances = []
242   for fields in result:
243     if with_secondaries is not None:
244       (name, status, autostart, snodes) = fields
245
246       if not snodes:
247         continue
248
249       for node in with_secondaries:
250         if node in snodes:
251           break
252       else:
253         continue
254
255     else:
256       (name, status, autostart) = fields
257
258     instances.append(Instance(name, status, autostart))
259
260   return instances
261
262
263 def GetNodeBootIDs():
264   """Get a dict mapping nodes to boot IDs.
265
266   """
267   result = client.QueryNodes([], ["name", "bootid", "offline"], True)
268   return dict([(name, (bootid, offline)) for name, bootid, offline in result])
269
270
271 class Watcher(object):
272   """Encapsulate the logic for restarting erronously halted virtual machines.
273
274   The calling program should periodically instantiate me and call Run().
275   This will traverse the list of instances, and make up to MAXTRIES attempts
276   to restart machines that are down.
277
278   """
279   def __init__(self, opts):
280     master = client.QueryConfigValues(["master_node"])[0]
281     if master != utils.HostInfo().name:
282       raise NotMasterError("This is not the master node")
283     self.instances = GetInstanceList()
284     self.bootids = GetNodeBootIDs()
285     self.started_instances = set()
286     self.opts = opts
287
288   def Run(self):
289     notepad = WatcherState()
290     try:
291       self.ArchiveJobs(self.opts.job_age)
292       self.CheckInstances(notepad)
293       self.CheckDisks(notepad)
294       self.VerifyDisks()
295     finally:
296       notepad.Save()
297
298   def ArchiveJobs(self, age):
299     """Archive old jobs.
300
301     """
302     arch_count, left_count = client.AutoArchiveJobs(age)
303     logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
304
305   def CheckDisks(self, notepad):
306     """Check all nodes for restarted ones.
307
308     """
309     check_nodes = []
310     for name, (new_id, offline) in self.bootids.iteritems():
311       old = notepad.GetNodeBootID(name)
312       if new_id is None:
313         # Bad node, not returning a boot id
314         if not offline:
315           logging.debug("Node %s missing boot id, skipping secondary checks",
316                         name)
317         continue
318       if old != new_id:
319         # Node's boot ID has changed, proably through a reboot.
320         check_nodes.append(name)
321
322     if check_nodes:
323       # Activate disks for all instances with any of the checked nodes as a
324       # secondary node.
325       for instance in GetInstanceList(with_secondaries=check_nodes):
326         if not instance.autostart:
327           logging.info(("Skipping disk activation for non-autostart"
328                         " instance %s"), instance.name)
329           continue
330         if instance.name in self.started_instances:
331           # we already tried to start the instance, which should have
332           # activated its drives (if they can be at all)
333           continue
334         try:
335           logging.info("Activating disks for instance %s", instance.name)
336           instance.ActivateDisks()
337         except Exception:
338           logging.exception("Error while activating disks for instance %s",
339                             instance.name)
340
341       # Keep changed boot IDs
342       for name in check_nodes:
343         notepad.SetNodeBootID(name, self.bootids[name])
344
345   def CheckInstances(self, notepad):
346     """Make a pass over the list of instances, restarting downed ones.
347
348     """
349     for instance in self.instances:
350       if instance.state in BAD_STATES:
351         n = notepad.NumberOfRestartAttempts(instance)
352
353         if n > MAXTRIES:
354           # stay quiet.
355           continue
356         elif n < MAXTRIES:
357           last = " (Attempt #%d)" % (n + 1)
358         else:
359           notepad.RecordRestartAttempt(instance)
360           logging.error("Could not restart %s after %d attempts, giving up",
361                         instance.name, MAXTRIES)
362           continue
363         try:
364           logging.info("Restarting %s%s",
365                         instance.name, last)
366           instance.Restart()
367           self.started_instances.add(instance.name)
368         except Exception:
369           logging.exception("Error while restarting instance %s",
370                             instance.name)
371
372         notepad.RecordRestartAttempt(instance)
373       elif instance.state in HELPLESS_STATES:
374         if notepad.NumberOfRestartAttempts(instance):
375           notepad.RemoveInstance(instance)
376       else:
377         if notepad.NumberOfRestartAttempts(instance):
378           notepad.RemoveInstance(instance)
379           logging.info("Restart of %s succeeded", instance.name)
380
381   @staticmethod
382   def VerifyDisks():
383     """Run gnt-cluster verify-disks.
384
385     """
386     op = opcodes.OpVerifyDisks()
387     result = cli.SubmitOpCode(op, cl=client)
388     if not isinstance(result, (tuple, list)):
389       logging.error("Can't get a valid result from verify-disks")
390       return
391     offline_disk_instances = result[2]
392     if not offline_disk_instances:
393       # nothing to do
394       return
395     logging.debug("Will activate disks for instances %s",
396                   ", ".join(offline_disk_instances))
397     # we submit only one job, and wait for it. not optimal, but spams
398     # less the job queue
399     job = [opcodes.OpActivateInstanceDisks(instance_name=name)
400            for name in offline_disk_instances]
401     job_id = cli.SendJob(job, cl=client)
402
403     cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
404
405
406 def ParseOptions():
407   """Parse the command line options.
408
409   @return: (options, args) as from OptionParser.parse_args()
410
411   """
412   parser = OptionParser(description="Ganeti cluster watcher",
413                         usage="%prog [-d]",
414                         version="%%prog (ganeti) %s" %
415                         constants.RELEASE_VERSION)
416
417   parser.add_option("-d", "--debug", dest="debug",
418                     help="Write all messages to stderr",
419                     default=False, action="store_true")
420   parser.add_option("-A", "--job-age", dest="job_age",
421                     help="Autoarchive jobs older than this age (default"
422                     " 6 hours)", default=6*3600)
423   options, args = parser.parse_args()
424   options.job_age = cli.ParseTimespec(options.job_age)
425   return options, args
426
427
428 def main():
429   """Main function.
430
431   """
432   global client
433
434   options, args = ParseOptions()
435
436   utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
437                      stderr_logging=options.debug)
438
439   try:
440     client = cli.GetClient()
441
442     try:
443       watcher = Watcher(options)
444     except errors.ConfigurationError:
445       # Just exit if there's no configuration
446       sys.exit(constants.EXIT_SUCCESS)
447
448     watcher.Run()
449   except SystemExit:
450     raise
451   except NotMasterError:
452     logging.debug("Not master, exiting")
453     sys.exit(constants.EXIT_NOTMASTER)
454   except errors.ResolverError, err:
455     logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
456     sys.exit(constants.EXIT_NODESETUP_ERROR)
457   except Exception, err:
458     logging.error(str(err), exc_info=True)
459     sys.exit(constants.EXIT_FAILURE)
460
461
462 if __name__ == '__main__':
463   main()