Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ cc962d58

History | View | Annotate | Download (13 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      self._data = serializer.Load(self.statefile.read())
92
    except Exception, msg:
93
      # Ignore errors while loading the file and treat it as empty
94
      self._data = {}
95
      logging.warning(("Empty or invalid state file. Using defaults."
96
                       " Error message: %s"), msg)
97

    
98
    if "instance" not in self._data:
99
      self._data["instance"] = {}
100
    if "node" not in self._data:
101
      self._data["node"] = {}
102

    
103
    self._orig_data = serializer.Dump(self._data)
104

    
105
  def Save(self):
106
    """Save state to file, then unlock and close it.
107

    
108
    """
109
    assert self.statefile
110

    
111
    serialized_form = serializer.Dump(self._data)
112
    if self._orig_data == serialized_form:
113
      logging.debug("Data didn't change, just touching status file")
114
      os.utime(constants.WATCHER_STATEFILE, None)
115
      return
116

    
117
    # We need to make sure the file is locked before renaming it, otherwise
118
    # starting ganeti-watcher again at the same time will create a conflict.
119
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120
                         data=serialized_form,
121
                         prewrite=utils.LockFile, close=False)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
  def Close(self):
125
    """Unlock configuration file and close it.
126

    
127
    """
128
    assert self.statefile
129

    
130
    # Files are automatically unlocked when closing them
131
    self.statefile.close()
132
    self.statefile = None
133

    
134
  def GetNodeBootID(self, name):
135
    """Returns the last boot ID of a node or None.
136

    
137
    """
138
    ndata = self._data["node"]
139

    
140
    if name in ndata and KEY_BOOT_ID in ndata[name]:
141
      return ndata[name][KEY_BOOT_ID]
142
    return None
143

    
144
  def SetNodeBootID(self, name, bootid):
145
    """Sets the boot ID of a node.
146

    
147
    """
148
    assert bootid
149

    
150
    ndata = self._data["node"]
151

    
152
    if name not in ndata:
153
      ndata[name] = {}
154

    
155
    ndata[name][KEY_BOOT_ID] = bootid
156

    
157
  def NumberOfRestartAttempts(self, instance):
158
    """Returns number of previous restart attempts.
159

    
160
    @type instance: L{Instance}
161
    @param instance: the instance to look up
162

    
163
    """
164
    idata = self._data["instance"]
165

    
166
    if instance.name in idata:
167
      return idata[instance.name][KEY_RESTART_COUNT]
168

    
169
    return 0
170

    
171
  def RecordRestartAttempt(self, instance):
172
    """Record a restart attempt.
173

    
174
    @type instance: L{Instance}
175
    @param instance: the instance being restarted
176

    
177
    """
178
    idata = self._data["instance"]
179

    
180
    if instance.name not in idata:
181
      inst = idata[instance.name] = {}
182
    else:
183
      inst = idata[instance.name]
184

    
185
    inst[KEY_RESTART_WHEN] = time.time()
186
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187

    
188
  def RemoveInstance(self, instance):
189
    """Update state to reflect that a machine is running.
190

    
191
    This method removes the record for a named instance (as we only
192
    track down instances).
193

    
194
    @type instance: L{Instance}
195
    @param instance: the instance to remove from books
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  """
208
  def __init__(self, name, state, autostart):
209
    self.name = name
210
    self.state = state
211
    self.autostart = autostart
212

    
213
  def Restart(self):
214
    """Encapsulates the start of an instance.
215

    
216
    """
217
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
218
    cli.SubmitOpCode(op, cl=client)
219

    
220
  def ActivateDisks(self):
221
    """Encapsulates the activation of all disks of an instance.
222

    
223
    """
224
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
225
    cli.SubmitOpCode(op, cl=client)
226

    
227

    
228
def GetInstanceList(with_secondaries=None):
229
  """Get a list of instances on this cluster.
230

    
231
  """
232
  fields = ["name", "status", "admin_state"]
233

    
234
  if with_secondaries is not None:
235
    fields.append("snodes")
236

    
237
  result = client.QueryInstances([], fields, True)
238

    
239
  instances = []
240
  for fields in result:
241
    if with_secondaries is not None:
242
      (name, status, autostart, snodes) = fields
243

    
244
      if not snodes:
245
        continue
246

    
247
      for node in with_secondaries:
248
        if node in snodes:
249
          break
250
      else:
251
        continue
252

    
253
    else:
254
      (name, status, autostart) = fields
255

    
256
    instances.append(Instance(name, status, autostart))
257

    
258
  return instances
259

    
260

    
261
def GetNodeBootIDs():
262
  """Get a dict mapping nodes to boot IDs.
263

    
264
  """
265
  result = client.QueryNodes([], ["name", "bootid", "offline"], True)
266
  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
267

    
268

    
269
class Watcher(object):
270
  """Encapsulate the logic for restarting erronously halted virtual machines.
271

    
272
  The calling program should periodically instantiate me and call Run().
273
  This will traverse the list of instances, and make up to MAXTRIES attempts
274
  to restart machines that are down.
275

    
276
  """
277
  def __init__(self, opts, notepad):
278
    self.notepad = notepad
279
    master = client.QueryConfigValues(["master_node"])[0]
280
    if master != utils.HostInfo().name:
281
      raise NotMasterError("This is not the master node")
282
    self.instances = GetInstanceList()
283
    self.bootids = GetNodeBootIDs()
284
    self.started_instances = set()
285
    self.opts = opts
286

    
287
  def Run(self):
288
    """Watcher run sequence.
289

    
290
    """
291
    notepad = self.notepad
292
    self.ArchiveJobs(self.opts.job_age)
293
    self.CheckInstances(notepad)
294
    self.CheckDisks(notepad)
295
    self.VerifyDisks()
296

    
297
  def ArchiveJobs(self, age):
298
    """Archive old jobs.
299

    
300
    """
301
    arch_count, left_count = client.AutoArchiveJobs(age)
302
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
303

    
304
  def CheckDisks(self, notepad):
305
    """Check all nodes for restarted ones.
306

    
307
    """
308
    check_nodes = []
309
    for name, (new_id, offline) in self.bootids.iteritems():
310
      old = notepad.GetNodeBootID(name)
311
      if new_id is None:
312
        # Bad node, not returning a boot id
313
        if not offline:
314
          logging.debug("Node %s missing boot id, skipping secondary checks",
315
                        name)
316
        continue
317
      if old != new_id:
318
        # Node's boot ID has changed, proably through a reboot.
319
        check_nodes.append(name)
320

    
321
    if check_nodes:
322
      # Activate disks for all instances with any of the checked nodes as a
323
      # secondary node.
324
      for instance in GetInstanceList(with_secondaries=check_nodes):
325
        if not instance.autostart:
326
          logging.info(("Skipping disk activation for non-autostart"
327
                        " instance %s"), instance.name)
328
          continue
329
        if instance.name in self.started_instances:
330
          # we already tried to start the instance, which should have
331
          # activated its drives (if they can be at all)
332
          continue
333
        try:
334
          logging.info("Activating disks for instance %s", instance.name)
335
          instance.ActivateDisks()
336
        except Exception:
337
          logging.exception("Error while activating disks for instance %s",
338
                            instance.name)
339

    
340
      # Keep changed boot IDs
341
      for name in check_nodes:
342
        notepad.SetNodeBootID(name, self.bootids[name][0])
343

    
344
  def CheckInstances(self, notepad):
345
    """Make a pass over the list of instances, restarting downed ones.
346

    
347
    """
348
    for instance in self.instances:
349
      if instance.state in BAD_STATES:
350
        n = notepad.NumberOfRestartAttempts(instance)
351

    
352
        if n > MAXTRIES:
353
          # stay quiet.
354
          continue
355
        elif n < MAXTRIES:
356
          last = " (Attempt #%d)" % (n + 1)
357
        else:
358
          notepad.RecordRestartAttempt(instance)
359
          logging.error("Could not restart %s after %d attempts, giving up",
360
                        instance.name, MAXTRIES)
361
          continue
362
        try:
363
          logging.info("Restarting %s%s",
364
                        instance.name, last)
365
          instance.Restart()
366
          self.started_instances.add(instance.name)
367
        except Exception:
368
          logging.exception("Error while restarting instance %s",
369
                            instance.name)
370

    
371
        notepad.RecordRestartAttempt(instance)
372
      elif instance.state in HELPLESS_STATES:
373
        if notepad.NumberOfRestartAttempts(instance):
374
          notepad.RemoveInstance(instance)
375
      else:
376
        if notepad.NumberOfRestartAttempts(instance):
377
          notepad.RemoveInstance(instance)
378
          logging.info("Restart of %s succeeded", instance.name)
379

    
380
  @staticmethod
381
  def VerifyDisks():
382
    """Run gnt-cluster verify-disks.
383

    
384
    """
385
    op = opcodes.OpVerifyDisks()
386
    result = cli.SubmitOpCode(op, cl=client)
387
    if not isinstance(result, (tuple, list)):
388
      logging.error("Can't get a valid result from verify-disks")
389
      return
390
    offline_disk_instances = result[2]
391
    if not offline_disk_instances:
392
      # nothing to do
393
      return
394
    logging.debug("Will activate disks for instances %s",
395
                  ", ".join(offline_disk_instances))
396
    # we submit only one job, and wait for it. not optimal, but spams
397
    # less the job queue
398
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
399
           for name in offline_disk_instances]
400
    job_id = cli.SendJob(job, cl=client)
401

    
402
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
403

    
404

    
405
def ParseOptions():
406
  """Parse the command line options.
407

    
408
  @return: (options, args) as from OptionParser.parse_args()
409

    
410
  """
411
  parser = OptionParser(description="Ganeti cluster watcher",
412
                        usage="%prog [-d]",
413
                        version="%%prog (ganeti) %s" %
414
                        constants.RELEASE_VERSION)
415

    
416
  parser.add_option("-d", "--debug", dest="debug",
417
                    help="Write all messages to stderr",
418
                    default=False, action="store_true")
419
  parser.add_option("-A", "--job-age", dest="job_age",
420
                    help="Autoarchive jobs older than this age (default"
421
                    " 6 hours)", default=6*3600)
422
  options, args = parser.parse_args()
423
  options.job_age = cli.ParseTimespec(options.job_age)
424
  return options, args
425

    
426

    
427
def main():
428
  """Main function.
429

    
430
  """
431
  global client
432

    
433
  options, args = ParseOptions()
434

    
435
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
436
                     stderr_logging=options.debug)
437

    
438
  try:
439
    notepad = WatcherState()
440
    try:
441
      client = cli.GetClient()
442

    
443
      try:
444
        watcher = Watcher(options, notepad)
445
      except errors.ConfigurationError:
446
        # Just exit if there's no configuration
447
        sys.exit(constants.EXIT_SUCCESS)
448

    
449
      watcher.Run()
450
    finally:
451
      notepad.Save()
452
  except SystemExit:
453
    raise
454
  except NotMasterError:
455
    logging.debug("Not master, exiting")
456
    sys.exit(constants.EXIT_NOTMASTER)
457
  except errors.ResolverError, err:
458
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
459
    sys.exit(constants.EXIT_NODESETUP_ERROR)
460
  except Exception, err:
461
    logging.error(str(err), exc_info=True)
462
    sys.exit(constants.EXIT_FAILURE)
463

    
464

    
465
if __name__ == '__main__':
466
  main()