Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ e125c67c

History | View | Annotate | Download (12.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import fcntl
34
import errno
35
import logging
36
from optparse import OptionParser
37

    
38
from ganeti import utils
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import ssconf
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import logger
45
from ganeti import cli
46

    
47

    
48
MAXTRIES = 5
49
BAD_STATES = ['stopped']
50
HELPLESS_STATES = ['(node down)']
51
NOTICE = 'NOTICE'
52
ERROR = 'ERROR'
53
KEY_RESTART_COUNT = "restart_count"
54
KEY_RESTART_WHEN = "restart_when"
55
KEY_BOOT_ID = "bootid"
56

    
57

    
58
# Global client object
59
client = None
60

    
61

    
62
class NotMasterError(errors.GenericError):
63
  """Exception raised when this host is not the master."""
64

    
65

    
66
def Indent(s, prefix='| '):
67
  """Indent a piece of text with a given prefix before each line.
68

    
69
  Args:
70
    s: The string to indent
71
    prefix: The string to prepend each line.
72

    
73
  """
74
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75

    
76

    
77
def DoCmd(cmd):
78
  """Run a shell command.
79

    
80
  Args:
81
    cmd: the command to run.
82

    
83
  Raises CommandError with verbose commentary on error.
84

    
85
  """
86
  res = utils.RunCmd(cmd)
87

    
88
  if res.failed:
89
    msg = ("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" %
90
           (repr(cmd),
91
            Indent(res.fail_reason),
92
            Indent(res.stdout),
93
            Indent(res.stderr)))
94
    raise errors.CommandError(msg)
95

    
96
  return res
97

    
98

    
99
class WatcherState(object):
100
  """Interface to a state file recording restart attempts.
101

    
102
  """
103
  def __init__(self):
104
    """Open, lock, read and parse the file.
105

    
106
    Raises exception on lock contention.
107

    
108
    """
109
    # The two-step dance below is necessary to allow both opening existing
110
    # file read/write and creating if not existing.  Vanilla open will truncate
111
    # an existing file -or- allow creating if not existing.
112
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
113
    self.statefile = os.fdopen(fd, 'w+')
114

    
115
    utils.LockFile(self.statefile.fileno())
116

    
117
    try:
118
      self._data = serializer.Load(self.statefile.read())
119
    except Exception, msg:
120
      # Ignore errors while loading the file and treat it as empty
121
      self._data = {}
122
      logging.warning(("Empty or invalid state file. Using defaults."
123
                       " Error message: %s"), msg)
124

    
125
    if "instance" not in self._data:
126
      self._data["instance"] = {}
127
    if "node" not in self._data:
128
      self._data["node"] = {}
129

    
130
    self._orig_data = serializer.Dump(self._data)
131

    
132
  def Save(self):
133
    """Save state to file, then unlock and close it.
134

    
135
    """
136
    assert self.statefile
137

    
138
    serialized_form = serializer.Dump(self._data)
139
    if self._orig_data == serialized_form:
140
      logging.debug("Data didn't change, just touching status file")
141
      os.utime(constants.WATCHER_STATEFILE, None)
142
      return
143

    
144
    # We need to make sure the file is locked before renaming it, otherwise
145
    # starting ganeti-watcher again at the same time will create a conflict.
146
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
147
                         data=serialized_form,
148
                         prewrite=utils.LockFile, close=False)
149
    self.statefile = os.fdopen(fd, 'w+')
150

    
151
  def Close(self):
152
    """Unlock configuration file and close it.
153

    
154
    """
155
    assert self.statefile
156

    
157
    # Files are automatically unlocked when closing them
158
    self.statefile.close()
159
    self.statefile = None
160

    
161
  def GetNodeBootID(self, name):
162
    """Returns the last boot ID of a node or None.
163

    
164
    """
165
    ndata = self._data["node"]
166

    
167
    if name in ndata and KEY_BOOT_ID in ndata[name]:
168
      return ndata[name][KEY_BOOT_ID]
169
    return None
170

    
171
  def SetNodeBootID(self, name, bootid):
172
    """Sets the boot ID of a node.
173

    
174
    """
175
    assert bootid
176

    
177
    ndata = self._data["node"]
178

    
179
    if name not in ndata:
180
      ndata[name] = {}
181

    
182
    ndata[name][KEY_BOOT_ID] = bootid
183

    
184
  def NumberOfRestartAttempts(self, instance):
185
    """Returns number of previous restart attempts.
186

    
187
    Args:
188
      instance - the instance to look up.
189

    
190
    """
191
    idata = self._data["instance"]
192

    
193
    if instance.name in idata:
194
      return idata[instance.name][KEY_RESTART_COUNT]
195

    
196
    return 0
197

    
198
  def RecordRestartAttempt(self, instance):
199
    """Record a restart attempt.
200

    
201
    Args:
202
      instance - the instance being restarted
203

    
204
    """
205
    idata = self._data["instance"]
206

    
207
    if instance.name not in idata:
208
      inst = idata[instance.name] = {}
209
    else:
210
      inst = idata[instance.name]
211

    
212
    inst[KEY_RESTART_WHEN] = time.time()
213
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
214

    
215
  def RemoveInstance(self, instance):
216
    """Update state to reflect that a machine is running, i.e. remove record.
217

    
218
    Args:
219
      instance - the instance to remove from books
220

    
221
    This method removes the record for a named instance.
222

    
223
    """
224
    idata = self._data["instance"]
225

    
226
    if instance.name in idata:
227
      del idata[instance.name]
228

    
229

    
230
class Instance(object):
231
  """Abstraction for a Virtual Machine instance.
232

    
233
  Methods:
234
    Restart(): issue a command to restart the represented machine.
235

    
236
  """
237
  def __init__(self, name, state, autostart):
238
    self.name = name
239
    self.state = state
240
    self.autostart = autostart
241

    
242
  def Restart(self):
243
    """Encapsulates the start of an instance.
244

    
245
    """
246
    op = opcodes.OpStartupInstance(instance_name=self.name,
247
                                   force=False,
248
                                   extra_args=None)
249
    cli.SubmitOpCode(op, cl=client)
250

    
251
  def ActivateDisks(self):
252
    """Encapsulates the activation of all disks of an instance.
253

    
254
    """
255
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
256
    cli.SubmitOpCode(op, cl=client)
257

    
258

    
259
def GetInstanceList(with_secondaries=None):
260
  """Get a list of instances on this cluster.
261

    
262
  """
263
  fields = ["name", "oper_state", "admin_state"]
264

    
265
  if with_secondaries is not None:
266
    fields.append("snodes")
267

    
268
  result = client.QueryInstances([], fields)
269

    
270
  instances = []
271
  for fields in result:
272
    if with_secondaries is not None:
273
      (name, status, autostart, snodes) = fields
274

    
275
      if not snodes:
276
        continue
277

    
278
      for node in with_secondaries:
279
        if node in snodes:
280
          break
281
      else:
282
        continue
283

    
284
    else:
285
      (name, status, autostart) = fields
286

    
287
    instances.append(Instance(name, status, autostart))
288

    
289
  return instances
290

    
291

    
292
def GetNodeBootIDs():
293
  """Get a dict mapping nodes to boot IDs.
294

    
295
  """
296
  result = client.QueryNodes([], ["name", "bootid"])
297
  return dict([(name, bootid) for name, bootid in result])
298

    
299

    
300
class Watcher(object):
301
  """Encapsulate the logic for restarting erronously halted virtual machines.
302

    
303
  The calling program should periodically instantiate me and call Run().
304
  This will traverse the list of instances, and make up to MAXTRIES attempts
305
  to restart machines that are down.
306

    
307
  """
308
  def __init__(self):
309
    sstore = ssconf.SimpleStore()
310
    master = sstore.GetMasterNode()
311
    if master != utils.HostInfo().name:
312
      raise NotMasterError("This is not the master node")
313
    self.instances = GetInstanceList()
314
    self.bootids = GetNodeBootIDs()
315
    self.started_instances = set()
316

    
317
  def Run(self):
318
    notepad = WatcherState()
319
    try:
320
      self.CheckInstances(notepad)
321
      self.CheckDisks(notepad)
322
      self.VerifyDisks()
323
    finally:
324
      notepad.Save()
325

    
326
  def CheckDisks(self, notepad):
327
    """Check all nodes for restarted ones.
328

    
329
    """
330
    check_nodes = []
331
    for name, new_id in self.bootids.iteritems():
332
      old = notepad.GetNodeBootID(name)
333
      if old != new_id:
334
        # Node's boot ID has changed, proably through a reboot.
335
        check_nodes.append(name)
336

    
337
    if check_nodes:
338
      # Activate disks for all instances with any of the checked nodes as a
339
      # secondary node.
340
      for instance in GetInstanceList(with_secondaries=check_nodes):
341
        if not instance.autostart:
342
          logging.info(("Skipping disk activation for non-autostart"
343
                        " instance %s"), instance.name)
344
          continue
345
        if instance.name in self.started_instances:
346
          # we already tried to start the instance, which should have
347
          # activated its drives (if they can be at all)
348
          continue
349
        try:
350
          logging.info("Activating disks for instance %s", instance.name)
351
          instance.ActivateDisks()
352
        except Exception, err:
353
          logging.error(str(err), exc_info=True)
354

    
355
      # Keep changed boot IDs
356
      for name in check_nodes:
357
        notepad.SetNodeBootID(name, self.bootids[name])
358

    
359
  def CheckInstances(self, notepad):
360
    """Make a pass over the list of instances, restarting downed ones.
361

    
362
    """
363
    for instance in self.instances:
364
      # Don't care about manually stopped instances
365
      if not instance.autostart:
366
        continue
367

    
368
      if instance.state in BAD_STATES:
369
        n = notepad.NumberOfRestartAttempts(instance)
370

    
371
        if n > MAXTRIES:
372
          # stay quiet.
373
          continue
374
        elif n < MAXTRIES:
375
          last = " (Attempt #%d)" % (n + 1)
376
        else:
377
          notepad.RecordRestartAttempt(instance)
378
          logging.error("Could not restart %s after %d attempts, giving up",
379
                        instance.name, MAXTRIES)
380
          continue
381
        try:
382
          logging.info("Restarting %s%s",
383
                        instance.name, last)
384
          instance.Restart()
385
          self.started_instances.add(instance.name)
386
        except Exception, err:
387
          logging.error(str(err), exc_info=True)
388

    
389
        notepad.RecordRestartAttempt(instance)
390
      elif instance.state in HELPLESS_STATES:
391
        if notepad.NumberOfRestartAttempts(instance):
392
          notepad.RemoveInstance(instance)
393
      else:
394
        if notepad.NumberOfRestartAttempts(instance):
395
          notepad.RemoveInstance(instance)
396
          logging.info("Restart of %s succeeded", instance.name)
397

    
398
  def VerifyDisks(self):
399
    """Run gnt-cluster verify-disks.
400

    
401
    """
402
    # TODO: What should we do here?
403
    result = DoCmd(['gnt-cluster', 'verify-disks'])
404
    if result.output:
405
      logging.info(result.output)
406

    
407

    
408
def ParseOptions():
409
  """Parse the command line options.
410

    
411
  Returns:
412
    (options, args) as from OptionParser.parse_args()
413

    
414
  """
415
  parser = OptionParser(description="Ganeti cluster watcher",
416
                        usage="%prog [-d]",
417
                        version="%%prog (ganeti) %s" %
418
                        constants.RELEASE_VERSION)
419

    
420
  parser.add_option("-d", "--debug", dest="debug",
421
                    help="Write all messages to stderr",
422
                    default=False, action="store_true")
423
  options, args = parser.parse_args()
424
  return options, args
425

    
426

    
427
def main():
428
  """Main function.
429

    
430
  """
431
  global client
432

    
433
  options, args = ParseOptions()
434

    
435
  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug)
436

    
437
  try:
438
    client = cli.GetClient()
439

    
440
    try:
441
      watcher = Watcher()
442
    except errors.ConfigurationError:
443
      # Just exit if there's no configuration
444
      sys.exit(constants.EXIT_SUCCESS)
445

    
446
    watcher.Run()
447
  except SystemExit:
448
    raise
449
  except NotMasterError:
450
    logging.debug("Not master, exiting")
451
    sys.exit(constants.EXIT_NOTMASTER)
452
  except errors.ResolverError, err:
453
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
454
    sys.exit(constants.EXIT_NODESETUP_ERROR)
455
  except Exception, err:
456
    logging.error(str(err), exc_info=True)
457
    sys.exit(constants.EXIT_FAILURE)
458

    
459

    
460
if __name__ == '__main__':
461
  main()