Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 5188ab37

History | View | Annotate | Download (12.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import fcntl
34
import errno
35
import logging
36
from optparse import OptionParser
37

    
38
from ganeti import utils
39
from ganeti import constants
40
from ganeti import serializer
41
from ganeti import ssconf
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import logger
45
from ganeti import cli
46

    
47

    
48
MAXTRIES = 5
49
BAD_STATES = ['stopped']
50
HELPLESS_STATES = ['(node down)']
51
NOTICE = 'NOTICE'
52
ERROR = 'ERROR'
53
KEY_RESTART_COUNT = "restart_count"
54
KEY_RESTART_WHEN = "restart_when"
55
KEY_BOOT_ID = "bootid"
56

    
57

    
58
# Global client object
59
client = None
60

    
61

    
62
class NotMasterError(errors.GenericError):
63
  """Exception raised when this host is not the master."""
64

    
65

    
66
def Indent(s, prefix='| '):
67
  """Indent a piece of text with a given prefix before each line.
68

    
69
  Args:
70
    s: The string to indent
71
    prefix: The string to prepend each line.
72

    
73
  """
74
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75

    
76

    
77
class WatcherState(object):
78
  """Interface to a state file recording restart attempts.
79

    
80
  """
81
  def __init__(self):
82
    """Open, lock, read and parse the file.
83

    
84
    Raises exception on lock contention.
85

    
86
    """
87
    # The two-step dance below is necessary to allow both opening existing
88
    # file read/write and creating if not existing.  Vanilla open will truncate
89
    # an existing file -or- allow creating if not existing.
90
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
91
    self.statefile = os.fdopen(fd, 'w+')
92

    
93
    utils.LockFile(self.statefile.fileno())
94

    
95
    try:
96
      self._data = serializer.Load(self.statefile.read())
97
    except Exception, msg:
98
      # Ignore errors while loading the file and treat it as empty
99
      self._data = {}
100
      logging.warning(("Empty or invalid state file. Using defaults."
101
                       " Error message: %s"), msg)
102

    
103
    if "instance" not in self._data:
104
      self._data["instance"] = {}
105
    if "node" not in self._data:
106
      self._data["node"] = {}
107

    
108
    self._orig_data = serializer.Dump(self._data)
109

    
110
  def Save(self):
111
    """Save state to file, then unlock and close it.
112

    
113
    """
114
    assert self.statefile
115

    
116
    serialized_form = serializer.Dump(self._data)
117
    if self._orig_data == serialized_form:
118
      logging.debug("Data didn't change, just touching status file")
119
      os.utime(constants.WATCHER_STATEFILE, None)
120
      return
121

    
122
    # We need to make sure the file is locked before renaming it, otherwise
123
    # starting ganeti-watcher again at the same time will create a conflict.
124
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
125
                         data=serialized_form,
126
                         prewrite=utils.LockFile, close=False)
127
    self.statefile = os.fdopen(fd, 'w+')
128

    
129
  def Close(self):
130
    """Unlock configuration file and close it.
131

    
132
    """
133
    assert self.statefile
134

    
135
    # Files are automatically unlocked when closing them
136
    self.statefile.close()
137
    self.statefile = None
138

    
139
  def GetNodeBootID(self, name):
140
    """Returns the last boot ID of a node or None.
141

    
142
    """
143
    ndata = self._data["node"]
144

    
145
    if name in ndata and KEY_BOOT_ID in ndata[name]:
146
      return ndata[name][KEY_BOOT_ID]
147
    return None
148

    
149
  def SetNodeBootID(self, name, bootid):
150
    """Sets the boot ID of a node.
151

    
152
    """
153
    assert bootid
154

    
155
    ndata = self._data["node"]
156

    
157
    if name not in ndata:
158
      ndata[name] = {}
159

    
160
    ndata[name][KEY_BOOT_ID] = bootid
161

    
162
  def NumberOfRestartAttempts(self, instance):
163
    """Returns number of previous restart attempts.
164

    
165
    Args:
166
      instance - the instance to look up.
167

    
168
    """
169
    idata = self._data["instance"]
170

    
171
    if instance.name in idata:
172
      return idata[instance.name][KEY_RESTART_COUNT]
173

    
174
    return 0
175

    
176
  def RecordRestartAttempt(self, instance):
177
    """Record a restart attempt.
178

    
179
    Args:
180
      instance - the instance being restarted
181

    
182
    """
183
    idata = self._data["instance"]
184

    
185
    if instance.name not in idata:
186
      inst = idata[instance.name] = {}
187
    else:
188
      inst = idata[instance.name]
189

    
190
    inst[KEY_RESTART_WHEN] = time.time()
191
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
192

    
193
  def RemoveInstance(self, instance):
194
    """Update state to reflect that a machine is running, i.e. remove record.
195

    
196
    Args:
197
      instance - the instance to remove from books
198

    
199
    This method removes the record for a named instance.
200

    
201
    """
202
    idata = self._data["instance"]
203

    
204
    if instance.name in idata:
205
      del idata[instance.name]
206

    
207

    
208
class Instance(object):
209
  """Abstraction for a Virtual Machine instance.
210

    
211
  Methods:
212
    Restart(): issue a command to restart the represented machine.
213

    
214
  """
215
  def __init__(self, name, state, autostart):
216
    self.name = name
217
    self.state = state
218
    self.autostart = autostart
219

    
220
  def Restart(self):
221
    """Encapsulates the start of an instance.
222

    
223
    """
224
    op = opcodes.OpStartupInstance(instance_name=self.name,
225
                                   force=False,
226
                                   extra_args=None)
227
    cli.SubmitOpCode(op, cl=client)
228

    
229
  def ActivateDisks(self):
230
    """Encapsulates the activation of all disks of an instance.
231

    
232
    """
233
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
234
    cli.SubmitOpCode(op, cl=client)
235

    
236

    
237
def GetInstanceList(with_secondaries=None):
238
  """Get a list of instances on this cluster.
239

    
240
  """
241
  fields = ["name", "oper_state", "admin_state"]
242

    
243
  if with_secondaries is not None:
244
    fields.append("snodes")
245

    
246
  result = client.QueryInstances([], fields)
247

    
248
  instances = []
249
  for fields in result:
250
    if with_secondaries is not None:
251
      (name, status, autostart, snodes) = fields
252

    
253
      if not snodes:
254
        continue
255

    
256
      for node in with_secondaries:
257
        if node in snodes:
258
          break
259
      else:
260
        continue
261

    
262
    else:
263
      (name, status, autostart) = fields
264

    
265
    instances.append(Instance(name, status, autostart))
266

    
267
  return instances
268

    
269

    
270
def GetNodeBootIDs():
271
  """Get a dict mapping nodes to boot IDs.
272

    
273
  """
274
  result = client.QueryNodes([], ["name", "bootid"])
275
  return dict([(name, bootid) for name, bootid in result])
276

    
277

    
278
class Watcher(object):
279
  """Encapsulate the logic for restarting erronously halted virtual machines.
280

    
281
  The calling program should periodically instantiate me and call Run().
282
  This will traverse the list of instances, and make up to MAXTRIES attempts
283
  to restart machines that are down.
284

    
285
  """
286
  def __init__(self):
287
    sstore = ssconf.SimpleStore()
288
    master = sstore.GetMasterNode()
289
    if master != utils.HostInfo().name:
290
      raise NotMasterError("This is not the master node")
291
    self.instances = GetInstanceList()
292
    self.bootids = GetNodeBootIDs()
293
    self.started_instances = set()
294

    
295
  def Run(self):
296
    notepad = WatcherState()
297
    try:
298
      self.CheckInstances(notepad)
299
      self.CheckDisks(notepad)
300
      self.VerifyDisks()
301
    finally:
302
      notepad.Save()
303

    
304
  def CheckDisks(self, notepad):
305
    """Check all nodes for restarted ones.
306

    
307
    """
308
    check_nodes = []
309
    for name, new_id in self.bootids.iteritems():
310
      old = notepad.GetNodeBootID(name)
311
      if old != new_id:
312
        # Node's boot ID has changed, proably through a reboot.
313
        check_nodes.append(name)
314

    
315
    if check_nodes:
316
      # Activate disks for all instances with any of the checked nodes as a
317
      # secondary node.
318
      for instance in GetInstanceList(with_secondaries=check_nodes):
319
        if not instance.autostart:
320
          logging.info(("Skipping disk activation for non-autostart"
321
                        " instance %s"), instance.name)
322
          continue
323
        if instance.name in self.started_instances:
324
          # we already tried to start the instance, which should have
325
          # activated its drives (if they can be at all)
326
          continue
327
        try:
328
          logging.info("Activating disks for instance %s", instance.name)
329
          instance.ActivateDisks()
330
        except Exception, err:
331
          logging.error(str(err), exc_info=True)
332

    
333
      # Keep changed boot IDs
334
      for name in check_nodes:
335
        notepad.SetNodeBootID(name, self.bootids[name])
336

    
337
  def CheckInstances(self, notepad):
338
    """Make a pass over the list of instances, restarting downed ones.
339

    
340
    """
341
    for instance in self.instances:
342
      # Don't care about manually stopped instances
343
      if not instance.autostart:
344
        continue
345

    
346
      if instance.state in BAD_STATES:
347
        n = notepad.NumberOfRestartAttempts(instance)
348

    
349
        if n > MAXTRIES:
350
          # stay quiet.
351
          continue
352
        elif n < MAXTRIES:
353
          last = " (Attempt #%d)" % (n + 1)
354
        else:
355
          notepad.RecordRestartAttempt(instance)
356
          logging.error("Could not restart %s after %d attempts, giving up",
357
                        instance.name, MAXTRIES)
358
          continue
359
        try:
360
          logging.info("Restarting %s%s",
361
                        instance.name, last)
362
          instance.Restart()
363
          self.started_instances.add(instance.name)
364
        except Exception, err:
365
          logging.error(str(err), exc_info=True)
366

    
367
        notepad.RecordRestartAttempt(instance)
368
      elif instance.state in HELPLESS_STATES:
369
        if notepad.NumberOfRestartAttempts(instance):
370
          notepad.RemoveInstance(instance)
371
      else:
372
        if notepad.NumberOfRestartAttempts(instance):
373
          notepad.RemoveInstance(instance)
374
          logging.info("Restart of %s succeeded", instance.name)
375

    
376
  def VerifyDisks(self):
377
    """Run gnt-cluster verify-disks.
378

    
379
    """
380
    op = opcodes.OpVerifyDisks()
381
    result = cli.SubmitOpCode(op, cl=client)
382
    if not isinstance(result, (tuple, list)):
383
      logging.error("Can't get a valid result from verify-disks")
384
      return
385
    offline_disk_instances = result[2]
386
    if not offline_disk_instances:
387
      # nothing to do
388
      return
389
    logging.debug("Will activate disks for instances %s",
390
                  ", ".join(offline_disk_instances))
391
    # we submit only one job, and wait for it. not optimal, but spams
392
    # less the job queue
393
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
394
           for name in offline_disk_instances]
395
    job_id = cli.SendJob(job, cl=client)
396

    
397
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
398

    
399

    
400
def ParseOptions():
401
  """Parse the command line options.
402

    
403
  Returns:
404
    (options, args) as from OptionParser.parse_args()
405

    
406
  """
407
  parser = OptionParser(description="Ganeti cluster watcher",
408
                        usage="%prog [-d]",
409
                        version="%%prog (ganeti) %s" %
410
                        constants.RELEASE_VERSION)
411

    
412
  parser.add_option("-d", "--debug", dest="debug",
413
                    help="Write all messages to stderr",
414
                    default=False, action="store_true")
415
  options, args = parser.parse_args()
416
  return options, args
417

    
418

    
419
def main():
420
  """Main function.
421

    
422
  """
423
  global client
424

    
425
  options, args = ParseOptions()
426

    
427
  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
428
                      stderr_logging=options.debug)
429

    
430
  try:
431
    client = cli.GetClient()
432

    
433
    try:
434
      watcher = Watcher()
435
    except errors.ConfigurationError:
436
      # Just exit if there's no configuration
437
      sys.exit(constants.EXIT_SUCCESS)
438

    
439
    watcher.Run()
440
  except SystemExit:
441
    raise
442
  except NotMasterError:
443
    logging.debug("Not master, exiting")
444
    sys.exit(constants.EXIT_NOTMASTER)
445
  except errors.ResolverError, err:
446
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
447
    sys.exit(constants.EXIT_NODESETUP_ERROR)
448
  except Exception, err:
449
    logging.error(str(err), exc_info=True)
450
    sys.exit(constants.EXIT_FAILURE)
451

    
452

    
453
if __name__ == '__main__':
454
  main()