Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ b76f660d

History | View | Annotate | Download (12.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import re
33
import time
34
import fcntl
35
import errno
36
import logging
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import ssconf
43
from ganeti import errors
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['stopped']
48
HELPLESS_STATES = ['(node down)']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
class Error(Exception):
57
  """Generic custom error class."""
58

    
59

    
60
class NotMasterError(Error):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  Args:
68
    s: The string to indent
69
    prefix: The string to prepend each line.
70

    
71
  """
72
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
73

    
74

    
75
def DoCmd(cmd):
76
  """Run a shell command.
77

    
78
  Args:
79
    cmd: the command to run.
80

    
81
  Raises CommandError with verbose commentary on error.
82

    
83
  """
84
  res = utils.RunCmd(cmd)
85

    
86
  if res.failed:
87
    raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
88
                (repr(cmd),
89
                 Indent(res.fail_reason),
90
                 Indent(res.stdout),
91
                 Indent(res.stderr)))
92

    
93
  return res
94

    
95

    
96
def LockFile(fd):
97
  """Locks a file using POSIX locks.
98

    
99
  """
100
  try:
101
    fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
102
  except IOError, err:
103
    if err.errno == errno.EAGAIN:
104
      raise StandardError("File already locked")
105
    raise
106

    
107

    
108
class WatcherState(object):
109
  """Interface to a state file recording restart attempts.
110

    
111
  """
112
  def __init__(self):
113
    """Open, lock, read and parse the file.
114

    
115
    Raises StandardError on lock contention.
116

    
117
    """
118
    # The two-step dance below is necessary to allow both opening existing
119
    # file read/write and creating if not existing.  Vanilla open will truncate
120
    # an existing file -or- allow creating if not existing.
121
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
    LockFile(self.statefile.fileno())
125

    
126
    try:
127
      self._data = serializer.Load(self.statefile.read())
128
    except Exception, msg:
129
      # Ignore errors while loading the file and treat it as empty
130
      self._data = {}
131
      logging.warning(("Empty or invalid state file. Using defaults."
132
                       " Error message: %s"), msg)
133

    
134
    if "instance" not in self._data:
135
      self._data["instance"] = {}
136
    if "node" not in self._data:
137
      self._data["node"] = {}
138

    
139
  def Save(self):
140
    """Save state to file, then unlock and close it.
141

    
142
    """
143
    assert self.statefile
144

    
145
    # We need to make sure the file is locked before renaming it, otherwise
146
    # starting ganeti-watcher again at the same time will create a conflict.
147
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
148
                         data=serializer.Dump(self._data),
149
                         prewrite=LockFile, close=False)
150
    self.statefile = os.fdopen(fd, 'w+')
151

    
152
  def Close(self):
153
    """Unlock configuration file and close it.
154

    
155
    """
156
    assert self.statefile
157

    
158
    # Files are automatically unlocked when closing them
159
    self.statefile.close()
160
    self.statefile = None
161

    
162
  def GetNodeBootID(self, name):
163
    """Returns the last boot ID of a node or None.
164

    
165
    """
166
    ndata = self._data["node"]
167

    
168
    if name in ndata and KEY_BOOT_ID in ndata[name]:
169
      return ndata[name][KEY_BOOT_ID]
170
    return None
171

    
172
  def SetNodeBootID(self, name, bootid):
173
    """Sets the boot ID of a node.
174

    
175
    """
176
    assert bootid
177

    
178
    ndata = self._data["node"]
179

    
180
    if name not in ndata:
181
      ndata[name] = {}
182

    
183
    ndata[name][KEY_BOOT_ID] = bootid
184

    
185
  def NumberOfRestartAttempts(self, instance):
186
    """Returns number of previous restart attempts.
187

    
188
    Args:
189
      instance - the instance to look up.
190

    
191
    """
192
    idata = self._data["instance"]
193

    
194
    if instance.name in idata:
195
      return idata[instance.name][KEY_RESTART_COUNT]
196

    
197
    return 0
198

    
199
  def RecordRestartAttempt(self, instance):
200
    """Record a restart attempt.
201

    
202
    Args:
203
      instance - the instance being restarted
204

    
205
    """
206
    idata = self._data["instance"]
207

    
208
    if instance.name not in idata:
209
      inst = idata[instance.name] = {}
210
    else:
211
      inst = idata[instance.name]
212

    
213
    inst[KEY_RESTART_WHEN] = time.time()
214
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
215

    
216
  def RemoveInstance(self, instance):
217
    """Update state to reflect that a machine is running, i.e. remove record.
218

    
219
    Args:
220
      instance - the instance to remove from books
221

    
222
    This method removes the record for a named instance.
223

    
224
    """
225
    idata = self._data["instance"]
226

    
227
    if instance.name in idata:
228
      del idata[instance.name]
229

    
230

    
231
class Instance(object):
232
  """Abstraction for a Virtual Machine instance.
233

    
234
  Methods:
235
    Restart(): issue a command to restart the represented machine.
236

    
237
  """
238
  def __init__(self, name, state, autostart):
239
    self.name = name
240
    self.state = state
241
    self.autostart = autostart
242

    
243
  def Restart(self):
244
    """Encapsulates the start of an instance.
245

    
246
    """
247
    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
248

    
249
  def ActivateDisks(self):
250
    """Encapsulates the activation of all disks of an instance.
251

    
252
    """
253
    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
254

    
255

    
256
def _RunListCmd(cmd):
257
  """Runs a command and parses its output into lists.
258

    
259
  """
260
  for line in DoCmd(cmd).stdout.splitlines():
261
    yield line.split(':')
262

    
263

    
264
def GetInstanceList(with_secondaries=None):
265
  """Get a list of instances on this cluster.
266

    
267
  """
268
  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
269
         '--separator=:']
270

    
271
  fields = 'name,oper_state,admin_state'
272

    
273
  if with_secondaries is not None:
274
    fields += ',snodes'
275

    
276
  cmd.append('-o')
277
  cmd.append(fields)
278

    
279
  instances = []
280
  for fields in _RunListCmd(cmd):
281
    if with_secondaries is not None:
282
      (name, status, autostart, snodes) = fields
283

    
284
      if snodes == "-":
285
        continue
286

    
287
      for node in with_secondaries:
288
        if node in snodes.split(','):
289
          break
290
      else:
291
        continue
292

    
293
    else:
294
      (name, status, autostart) = fields
295

    
296
    instances.append(Instance(name, status, autostart != "no"))
297

    
298
  return instances
299

    
300

    
301
def GetNodeBootIDs():
302
  """Get a dict mapping nodes to boot IDs.
303

    
304
  """
305
  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
306
         '--separator=:', '-o', 'name,bootid']
307

    
308
  ids = {}
309
  for fields in _RunListCmd(cmd):
310
    (name, bootid) = fields
311
    ids[name] = bootid
312

    
313
  return ids
314

    
315

    
316
class Watcher(object):
317
  """Encapsulate the logic for restarting erronously halted virtual machines.
318

    
319
  The calling program should periodically instantiate me and call Run().
320
  This will traverse the list of instances, and make up to MAXTRIES attempts
321
  to restart machines that are down.
322

    
323
  """
324
  def __init__(self):
325
    sstore = ssconf.SimpleStore()
326
    master = sstore.GetMasterNode()
327
    if master != utils.HostInfo().name:
328
      raise NotMasterError("This is not the master node")
329
    self.instances = GetInstanceList()
330
    self.bootids = GetNodeBootIDs()
331
    self.started_instances = set()
332

    
333
  def Run(self):
334
    notepad = WatcherState()
335
    try:
336
      self.CheckInstances(notepad)
337
      self.CheckDisks(notepad)
338
      self.VerifyDisks()
339
    finally:
340
      notepad.Save()
341

    
342
  def CheckDisks(self, notepad):
343
    """Check all nodes for restarted ones.
344

    
345
    """
346
    check_nodes = []
347
    for name, id in self.bootids.iteritems():
348
      old = notepad.GetNodeBootID(name)
349
      if old != id:
350
        # Node's boot ID has changed, proably through a reboot.
351
        check_nodes.append(name)
352

    
353
    if check_nodes:
354
      # Activate disks for all instances with any of the checked nodes as a
355
      # secondary node.
356
      for instance in GetInstanceList(with_secondaries=check_nodes):
357
        if not instance.autostart:
358
          logging.info(("Skipping disk activation for non-autostart"
359
                        " instance %s"), instance.name)
360
          continue
361
        if instance.name in self.started_instances:
362
          # we already tried to start the instance, which should have
363
          # activated its drives (if they can be at all)
364
          continue
365
        try:
366
          logging.info("Activating disks for instance %s", instance.name)
367
          instance.ActivateDisks()
368
        except Error, err:
369
          logging.error(str(err), exc_info=True)
370

    
371
      # Keep changed boot IDs
372
      for name in check_nodes:
373
        notepad.SetNodeBootID(name, self.bootids[name])
374

    
375
  def CheckInstances(self, notepad):
376
    """Make a pass over the list of instances, restarting downed ones.
377

    
378
    """
379
    for instance in self.instances:
380
      # Don't care about manually stopped instances
381
      if not instance.autostart:
382
        continue
383

    
384
      if instance.state in BAD_STATES:
385
        n = notepad.NumberOfRestartAttempts(instance)
386

    
387
        if n > MAXTRIES:
388
          # stay quiet.
389
          continue
390
        elif n < MAXTRIES:
391
          last = " (Attempt #%d)" % (n + 1)
392
        else:
393
          notepad.RecordRestartAttempt(instance)
394
          logging.error("Could not restart %s after %d attempts, giving up",
395
                        instance.name, MAXTRIES)
396
          continue
397
        try:
398
          logging.info("Restarting %s%s",
399
                        instance.name, last)
400
          instance.Restart()
401
          self.started_instances.add(instance.name)
402
        except Error, err:
403
          logging.error(str(err), exc_info=True)
404

    
405
        notepad.RecordRestartAttempt(instance)
406
      elif instance.state in HELPLESS_STATES:
407
        if notepad.NumberOfRestartAttempts(instance):
408
          notepad.RemoveInstance(instance)
409
      else:
410
        if notepad.NumberOfRestartAttempts(instance):
411
          notepad.RemoveInstance(instance)
412
          logging.info("Restart of %s succeeded", instance.name)
413

    
414
  def VerifyDisks(self):
415
    """Run gnt-cluster verify-disks.
416

    
417
    """
418
    result = DoCmd(['gnt-cluster', 'verify-disks', '--lock-retries=15'])
419
    if result.output:
420
      logging.info(result.output)
421

    
422

    
423
def ParseOptions():
424
  """Parse the command line options.
425

    
426
  Returns:
427
    (options, args) as from OptionParser.parse_args()
428

    
429
  """
430
  parser = OptionParser(description="Ganeti cluster watcher",
431
                        usage="%prog [-d]",
432
                        version="%%prog (ganeti) %s" %
433
                        constants.RELEASE_VERSION)
434

    
435
  parser.add_option("-d", "--debug", dest="debug",
436
                    help="Write all messages to stderr",
437
                    default=False, action="store_true")
438
  options, args = parser.parse_args()
439
  return options, args
440

    
441

    
442
def SetupLogging(debug):
443
  """Configures the logging module.
444

    
445
  """
446
  formatter = logging.Formatter("%(asctime)s: %(message)s")
447

    
448
  logfile_handler = logging.FileHandler(constants.LOG_WATCHER)
449
  logfile_handler.setFormatter(formatter)
450
  logfile_handler.setLevel(logging.INFO)
451

    
452
  stderr_handler = logging.StreamHandler()
453
  stderr_handler.setFormatter(formatter)
454
  if debug:
455
    stderr_handler.setLevel(logging.NOTSET)
456
  else:
457
    stderr_handler.setLevel(logging.CRITICAL)
458

    
459
  root_logger = logging.getLogger("")
460
  root_logger.setLevel(logging.NOTSET)
461
  root_logger.addHandler(logfile_handler)
462
  root_logger.addHandler(stderr_handler)
463

    
464

    
465
def main():
466
  """Main function.
467

    
468
  """
469
  options, args = ParseOptions()
470

    
471
  SetupLogging(options.debug)
472

    
473
  try:
474
    try:
475
      watcher = Watcher()
476
    except errors.ConfigurationError:
477
      # Just exit if there's no configuration
478
      sys.exit(constants.EXIT_SUCCESS)
479
    watcher.Run()
480
  except SystemExit:
481
    raise
482
  except NotMasterError:
483
    logging.debug("Not master, exiting")
484
    sys.exit(constants.EXIT_NOTMASTER)
485
  except errors.ResolverError, err:
486
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
487
    sys.exit(constants.EXIT_NODESETUP_ERROR)
488
  except Exception, err:
489
    logging.error(str(err), exc_info=True)
490
    sys.exit(constants.EXIT_FAILURE)
491

    
492

    
493
if __name__ == '__main__':
494
  main()