Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 2fb96d39

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import re
33
import time
34
import fcntl
35
import errno
36
import logging
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import ssconf
43
from ganeti import errors
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['stopped']
48
HELPLESS_STATES = ['(node down)']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
class Error(Exception):
57
  """Generic custom error class."""
58

    
59

    
60
class NotMasterError(Error):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  Args:
68
    s: The string to indent
69
    prefix: The string to prepend each line.
70

    
71
  """
72
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
73

    
74

    
75
def DoCmd(cmd):
76
  """Run a shell command.
77

    
78
  Args:
79
    cmd: the command to run.
80

    
81
  Raises CommandError with verbose commentary on error.
82

    
83
  """
84
  res = utils.RunCmd(cmd)
85

    
86
  if res.failed:
87
    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
88
                (repr(cmd),
89
                 Indent(res.fail_reason),
90
                 Indent(res.stdout),
91
                 Indent(res.stderr)))
92

    
93
  return res
94

    
95

    
96
def LockFile(fd):
97
  """Locks a file using POSIX locks.
98

    
99
  """
100
  try:
101
    fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
102
  except IOError, err:
103
    if err.errno == errno.EAGAIN:
104
      raise StandardError("File already locked")
105
    raise
106

    
107

    
108
class WatcherState(object):
109
  """Interface to a state file recording restart attempts.
110

    
111
  """
112
  def __init__(self):
113
    """Open, lock, read and parse the file.
114

    
115
    Raises StandardError on lock contention.
116

    
117
    """
118
    # The two-step dance below is necessary to allow both opening existing
119
    # file read/write and creating if not existing.  Vanilla open will truncate
120
    # an existing file -or- allow creating if not existing.
121
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
    LockFile(self.statefile.fileno())
125

    
126
    try:
127
      self._data = serializer.Load(self.statefile.read())
128
    except Exception, msg:
129
      # Ignore errors while loading the file and treat it as empty
130
      self._data = {}
131
      logging.warning(("Empty or invalid state file. Using defaults."
132
                       " Error message: %s"), msg)
133

    
134
    if "instance" not in self._data:
135
      self._data["instance"] = {}
136
    if "node" not in self._data:
137
      self._data["node"] = {}
138

    
139
    self._orig_data = self._data.copy()
140

    
141
  def Save(self):
142
    """Save state to file, then unlock and close it.
143

    
144
    """
145
    assert self.statefile
146

    
147
    if self._orig_data == self._data:
148
      logging.debug("Data didn't change, just touching status file")
149
      os.utime(constants.WATCHER_STATEFILE, None)
150
      return
151

    
152
    # We need to make sure the file is locked before renaming it, otherwise
153
    # starting ganeti-watcher again at the same time will create a conflict.
154
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
155
                         data=serializer.Dump(self._data),
156
                         prewrite=LockFile, close=False)
157
    self.statefile = os.fdopen(fd, 'w+')
158

    
159
  def Close(self):
160
    """Unlock configuration file and close it.
161

    
162
    """
163
    assert self.statefile
164

    
165
    # Files are automatically unlocked when closing them
166
    self.statefile.close()
167
    self.statefile = None
168

    
169
  def GetNodeBootID(self, name):
170
    """Returns the last boot ID of a node or None.
171

    
172
    """
173
    ndata = self._data["node"]
174

    
175
    if name in ndata and KEY_BOOT_ID in ndata[name]:
176
      return ndata[name][KEY_BOOT_ID]
177
    return None
178

    
179
  def SetNodeBootID(self, name, bootid):
180
    """Sets the boot ID of a node.
181

    
182
    """
183
    assert bootid
184

    
185
    ndata = self._data["node"]
186

    
187
    if name not in ndata:
188
      ndata[name] = {}
189

    
190
    ndata[name][KEY_BOOT_ID] = bootid
191

    
192
  def NumberOfRestartAttempts(self, instance):
193
    """Returns number of previous restart attempts.
194

    
195
    Args:
196
      instance - the instance to look up.
197

    
198
    """
199
    idata = self._data["instance"]
200

    
201
    if instance.name in idata:
202
      return idata[instance.name][KEY_RESTART_COUNT]
203

    
204
    return 0
205

    
206
  def RecordRestartAttempt(self, instance):
207
    """Record a restart attempt.
208

    
209
    Args:
210
      instance - the instance being restarted
211

    
212
    """
213
    idata = self._data["instance"]
214

    
215
    if instance.name not in idata:
216
      inst = idata[instance.name] = {}
217
    else:
218
      inst = idata[instance.name]
219

    
220
    inst[KEY_RESTART_WHEN] = time.time()
221
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
222

    
223
  def RemoveInstance(self, instance):
224
    """Update state to reflect that a machine is running, i.e. remove record.
225

    
226
    Args:
227
      instance - the instance to remove from books
228

    
229
    This method removes the record for a named instance.
230

    
231
    """
232
    idata = self._data["instance"]
233

    
234
    if instance.name in idata:
235
      del idata[instance.name]
236

    
237

    
238
class Instance(object):
239
  """Abstraction for a Virtual Machine instance.
240

    
241
  Methods:
242
    Restart(): issue a command to restart the represented machine.
243

    
244
  """
245
  def __init__(self, name, state, autostart):
246
    self.name = name
247
    self.state = state
248
    self.autostart = autostart
249

    
250
  def Restart(self):
251
    """Encapsulates the start of an instance.
252

    
253
    """
254
    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
255

    
256
  def ActivateDisks(self):
257
    """Encapsulates the activation of all disks of an instance.
258

    
259
    """
260
    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
261

    
262

    
263
def _RunListCmd(cmd):
264
  """Runs a command and parses its output into lists.
265

    
266
  """
267
  for line in DoCmd(cmd).stdout.splitlines():
268
    yield line.split(':')
269

    
270

    
271
def GetInstanceList(with_secondaries=None):
272
  """Get a list of instances on this cluster.
273

    
274
  """
275
  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
276
         '--separator=:']
277

    
278
  fields = 'name,oper_state,admin_state'
279

    
280
  if with_secondaries is not None:
281
    fields += ',snodes'
282

    
283
  cmd.append('-o')
284
  cmd.append(fields)
285

    
286
  instances = []
287
  for fields in _RunListCmd(cmd):
288
    if with_secondaries is not None:
289
      (name, status, autostart, snodes) = fields
290

    
291
      if snodes == "-":
292
        continue
293

    
294
      for node in with_secondaries:
295
        if node in snodes.split(','):
296
          break
297
      else:
298
        continue
299

    
300
    else:
301
      (name, status, autostart) = fields
302

    
303
    instances.append(Instance(name, status, autostart != "no"))
304

    
305
  return instances
306

    
307

    
308
def GetNodeBootIDs():
309
  """Get a dict mapping nodes to boot IDs.
310

    
311
  """
312
  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
313
         '--separator=:', '-o', 'name,bootid']
314

    
315
  ids = {}
316
  for fields in _RunListCmd(cmd):
317
    (name, bootid) = fields
318
    ids[name] = bootid
319

    
320
  return ids
321

    
322

    
323
class Watcher(object):
324
  """Encapsulate the logic for restarting erronously halted virtual machines.
325

    
326
  The calling program should periodically instantiate me and call Run().
327
  This will traverse the list of instances, and make up to MAXTRIES attempts
328
  to restart machines that are down.
329

    
330
  """
331
  def __init__(self):
332
    sstore = ssconf.SimpleStore()
333
    master = sstore.GetMasterNode()
334
    if master != utils.HostInfo().name:
335
      raise NotMasterError("This is not the master node")
336
    self.instances = GetInstanceList()
337
    self.bootids = GetNodeBootIDs()
338
    self.started_instances = set()
339

    
340
  def Run(self):
341
    notepad = WatcherState()
342
    try:
343
      self.CheckInstances(notepad)
344
      self.CheckDisks(notepad)
345
      self.VerifyDisks()
346
    finally:
347
      notepad.Save()
348

    
349
  def CheckDisks(self, notepad):
350
    """Check all nodes for restarted ones.
351

    
352
    """
353
    check_nodes = []
354
    for name, id in self.bootids.iteritems():
355
      old = notepad.GetNodeBootID(name)
356
      if old != id:
357
        # Node's boot ID has changed, proably through a reboot.
358
        check_nodes.append(name)
359

    
360
    if check_nodes:
361
      # Activate disks for all instances with any of the checked nodes as a
362
      # secondary node.
363
      for instance in GetInstanceList(with_secondaries=check_nodes):
364
        if not instance.autostart:
365
          logging.info(("Skipping disk activation for non-autostart"
366
                        " instance %s"), instance.name)
367
          continue
368
        if instance.name in self.started_instances:
369
          # we already tried to start the instance, which should have
370
          # activated its drives (if they can be at all)
371
          continue
372
        try:
373
          logging.info("Activating disks for instance %s", instance.name)
374
          instance.ActivateDisks()
375
        except Error, err:
376
          logging.error(str(err), exc_info=True)
377

    
378
      # Keep changed boot IDs
379
      for name in check_nodes:
380
        notepad.SetNodeBootID(name, self.bootids[name])
381

    
382
  def CheckInstances(self, notepad):
383
    """Make a pass over the list of instances, restarting downed ones.
384

    
385
    """
386
    for instance in self.instances:
387
      # Don't care about manually stopped instances
388
      if not instance.autostart:
389
        continue
390

    
391
      if instance.state in BAD_STATES:
392
        n = notepad.NumberOfRestartAttempts(instance)
393

    
394
        if n > MAXTRIES:
395
          # stay quiet.
396
          continue
397
        elif n < MAXTRIES:
398
          last = " (Attempt #%d)" % (n + 1)
399
        else:
400
          notepad.RecordRestartAttempt(instance)
401
          logging.error("Could not restart %s after %d attempts, giving up",
402
                        instance.name, MAXTRIES)
403
          continue
404
        try:
405
          logging.info("Restarting %s%s",
406
                        instance.name, last)
407
          instance.Restart()
408
          self.started_instances.add(instance.name)
409
        except Error, err:
410
          logging.error(str(err), exc_info=True)
411

    
412
        notepad.RecordRestartAttempt(instance)
413
      elif instance.state in HELPLESS_STATES:
414
        if notepad.NumberOfRestartAttempts(instance):
415
          notepad.RemoveInstance(instance)
416
      else:
417
        if notepad.NumberOfRestartAttempts(instance):
418
          notepad.RemoveInstance(instance)
419
          logging.info("Restart of %s succeeded", instance.name)
420

    
421
  def VerifyDisks(self):
422
    """Run gnt-cluster verify-disks.
423

    
424
    """
425
    result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15'])
426
    if result.output:
427
      logging.info(result.output)
428

    
429

    
430
def ParseOptions():
431
  """Parse the command line options.
432

    
433
  Returns:
434
    (options, args) as from OptionParser.parse_args()
435

    
436
  """
437
  parser = OptionParser(description="Ganeti cluster watcher",
438
                        usage="%prog [-d]",
439
                        version="%%prog (ganeti) %s" %
440
                        constants.RELEASE_VERSION)
441

    
442
  parser.add_option("-d", "--debug", dest="debug",
443
                    help="Write all messages to stderr",
444
                    default=False, action="store_true")
445
  options, args = parser.parse_args()
446
  return options, args
447

    
448

    
449
def SetupLogging(debug):
450
  """Configures the logging module.
451

    
452
  """
453
  formatter = logging.Formatter("%(asctime)s: %(message)s")
454

    
455
  logfile_handler = logging.FileHandler(constants.LOG_WATCHER)
456
  logfile_handler.setFormatter(formatter)
457
  logfile_handler.setLevel(logging.INFO)
458

    
459
  stderr_handler = logging.StreamHandler()
460
  stderr_handler.setFormatter(formatter)
461
  if debug:
462
    stderr_handler.setLevel(logging.NOTSET)
463
  else:
464
    stderr_handler.setLevel(logging.CRITICAL)
465

    
466
  root_logger = logging.getLogger("")
467
  root_logger.setLevel(logging.NOTSET)
468
  root_logger.addHandler(logfile_handler)
469
  root_logger.addHandler(stderr_handler)
470

    
471

    
472
def main():
473
  """Main function.
474

    
475
  """
476
  options, args = ParseOptions()
477

    
478
  SetupLogging(options.debug)
479

    
480
  try:
481
    try:
482
      watcher = Watcher()
483
    except errors.ConfigurationError:
484
      # Just exit if there's no configuration
485
      sys.exit(constants.EXIT_SUCCESS)
486
    watcher.Run()
487
  except SystemExit:
488
    raise
489
  except NotMasterError:
490
    logging.debug("Not master, exiting")
491
    sys.exit(constants.EXIT_NOTMASTER)
492
  except errors.ResolverError, err:
493
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
494
    sys.exit(constants.EXIT_NODESETUP_ERROR)
495
  except Exception, err:
496
    logging.error(str(err), exc_info=True)
497
    sys.exit(constants.EXIT_FAILURE)
498

    
499

    
500
if __name__ == '__main__':
501
  main()