Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 2826b361

History | View | Annotate | Download (16.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47

    
48

    
49
MAXTRIES = 5
50
BAD_STATES = ['ERROR_down']
51
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
52
NOTICE = 'NOTICE'
53
ERROR = 'ERROR'
54
KEY_RESTART_COUNT = "restart_count"
55
KEY_RESTART_WHEN = "restart_when"
56
KEY_BOOT_ID = "bootid"
57

    
58

    
59
# Global client object
60
client = None
61

    
62

    
63
class NotMasterError(errors.GenericError):
64
  """Exception raised when this host is not the master."""
65

    
66

    
67
def ShouldPause():
68
  """Check whether we should pause.
69

    
70
  """
71
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
72

    
73

    
74
def StartNodeDaemons():
75
  """Start all the daemons that should be running on all nodes.
76

    
77
  """
78
  # on master or not, try to start the node dameon
79
  utils.EnsureDaemon(constants.NODED)
80
  # start confd as well. On non candidates it will be in disabled mode.
81
  utils.EnsureDaemon(constants.CONFD)
82

    
83

    
84
def RunWatcherHooks():
85
  """Run the watcher hooks.
86

    
87
  """
88
  hooks_dir = os.path.join(constants.HOOKS_BASE_DIR,
89
                           constants.HOOKS_NAME_WATCHER)
90

    
91
  try:
92
    results = utils.RunParts(hooks_dir)
93
  except Exception, msg: # pylint: disable-msg=W0703
94
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
95

    
96
  for (relname, status, runresult) in results:
97
    if status == constants.RUNPARTS_SKIP:
98
      logging.debug("Watcher hook %s: skipped", relname)
99
    elif status == constants.RUNPARTS_ERR:
100
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
101
    elif status == constants.RUNPARTS_RUN:
102
      if runresult.failed:
103
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
104
                        relname, runresult.exit_code, runresult.output)
105
      else:
106
        logging.debug("Watcher hook %s: success (output: %s)", relname,
107
                      runresult.output)
108

    
109
class WatcherState(object):
110
  """Interface to a state file recording restart attempts.
111

    
112
  """
113
  def __init__(self):
114
    """Open, lock, read and parse the file.
115

    
116
    Raises exception on lock contention.
117

    
118
    """
119
    # The two-step dance below is necessary to allow both opening existing
120
    # file read/write and creating if not existing.  Vanilla open will truncate
121
    # an existing file -or- allow creating if not existing.
122
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
123
    self.statefile = os.fdopen(fd, 'w+')
124

    
125
    utils.LockFile(self.statefile.fileno())
126

    
127
    try:
128
      state_data = self.statefile.read()
129
      if not state_data:
130
        self._data = {}
131
      else:
132
        self._data = serializer.Load(state_data)
133
    except Exception, msg: # pylint: disable-msg=W0703
134
      # Ignore errors while loading the file and treat it as empty
135
      self._data = {}
136
      logging.warning(("Invalid state file. Using defaults."
137
                       " Error message: %s"), msg)
138

    
139
    if "instance" not in self._data:
140
      self._data["instance"] = {}
141
    if "node" not in self._data:
142
      self._data["node"] = {}
143

    
144
    self._orig_data = serializer.Dump(self._data)
145

    
146
  def Save(self):
147
    """Save state to file, then unlock and close it.
148

    
149
    """
150
    assert self.statefile
151

    
152
    serialized_form = serializer.Dump(self._data)
153
    if self._orig_data == serialized_form:
154
      logging.debug("Data didn't change, just touching status file")
155
      os.utime(constants.WATCHER_STATEFILE, None)
156
      return
157

    
158
    # We need to make sure the file is locked before renaming it, otherwise
159
    # starting ganeti-watcher again at the same time will create a conflict.
160
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
161
                         data=serialized_form,
162
                         prewrite=utils.LockFile, close=False)
163
    self.statefile = os.fdopen(fd, 'w+')
164

    
165
  def Close(self):
166
    """Unlock configuration file and close it.
167

    
168
    """
169
    assert self.statefile
170

    
171
    # Files are automatically unlocked when closing them
172
    self.statefile.close()
173
    self.statefile = None
174

    
175
  def GetNodeBootID(self, name):
176
    """Returns the last boot ID of a node or None.
177

    
178
    """
179
    ndata = self._data["node"]
180

    
181
    if name in ndata and KEY_BOOT_ID in ndata[name]:
182
      return ndata[name][KEY_BOOT_ID]
183
    return None
184

    
185
  def SetNodeBootID(self, name, bootid):
186
    """Sets the boot ID of a node.
187

    
188
    """
189
    assert bootid
190

    
191
    ndata = self._data["node"]
192

    
193
    if name not in ndata:
194
      ndata[name] = {}
195

    
196
    ndata[name][KEY_BOOT_ID] = bootid
197

    
198
  def NumberOfRestartAttempts(self, instance):
199
    """Returns number of previous restart attempts.
200

    
201
    @type instance: L{Instance}
202
    @param instance: the instance to look up
203

    
204
    """
205
    idata = self._data["instance"]
206

    
207
    if instance.name in idata:
208
      return idata[instance.name][KEY_RESTART_COUNT]
209

    
210
    return 0
211

    
212
  def RecordRestartAttempt(self, instance):
213
    """Record a restart attempt.
214

    
215
    @type instance: L{Instance}
216
    @param instance: the instance being restarted
217

    
218
    """
219
    idata = self._data["instance"]
220

    
221
    if instance.name not in idata:
222
      inst = idata[instance.name] = {}
223
    else:
224
      inst = idata[instance.name]
225

    
226
    inst[KEY_RESTART_WHEN] = time.time()
227
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
228

    
229
  def RemoveInstance(self, instance):
230
    """Update state to reflect that a machine is running.
231

    
232
    This method removes the record for a named instance (as we only
233
    track down instances).
234

    
235
    @type instance: L{Instance}
236
    @param instance: the instance to remove from books
237

    
238
    """
239
    idata = self._data["instance"]
240

    
241
    if instance.name in idata:
242
      del idata[instance.name]
243

    
244

    
245
class Instance(object):
246
  """Abstraction for a Virtual Machine instance.
247

    
248
  """
249
  def __init__(self, name, state, autostart):
250
    self.name = name
251
    self.state = state
252
    self.autostart = autostart
253

    
254
  def Restart(self):
255
    """Encapsulates the start of an instance.
256

    
257
    """
258
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
259
    cli.SubmitOpCode(op, cl=client)
260

    
261
  def ActivateDisks(self):
262
    """Encapsulates the activation of all disks of an instance.
263

    
264
    """
265
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
266
    cli.SubmitOpCode(op, cl=client)
267

    
268

    
269
def GetClusterData():
270
  """Get a list of instances on this cluster.
271

    
272
  """
273
  op1_fields = ["name", "status", "admin_state", "snodes"]
274
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
275
                                 use_locking=True)
276
  op2_fields = ["name", "bootid", "offline"]
277
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
278
                             use_locking=True)
279

    
280
  job_id = client.SubmitJob([op1, op2])
281

    
282
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
283

    
284
  logging.debug("Got data from cluster, writing instance status file")
285

    
286
  result = all_results[0]
287
  smap = {}
288

    
289
  instances = {}
290

    
291
  # write the upfile
292
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
293
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
294

    
295
  for fields in result:
296
    (name, status, autostart, snodes) = fields
297

    
298
    # update the secondary node map
299
    for node in snodes:
300
      if node not in smap:
301
        smap[node] = []
302
      smap[node].append(name)
303

    
304
    instances[name] = Instance(name, status, autostart)
305

    
306
  nodes =  dict([(name, (bootid, offline))
307
                 for name, bootid, offline in all_results[1]])
308

    
309
  client.ArchiveJob(job_id)
310

    
311
  return instances, nodes, smap
312

    
313

    
314
class Watcher(object):
315
  """Encapsulate the logic for restarting erronously halted virtual machines.
316

    
317
  The calling program should periodically instantiate me and call Run().
318
  This will traverse the list of instances, and make up to MAXTRIES attempts
319
  to restart machines that are down.
320

    
321
  """
322
  def __init__(self, opts, notepad):
323
    self.notepad = notepad
324
    master = client.QueryConfigValues(["master_node"])[0]
325
    if master != utils.HostInfo().name:
326
      raise NotMasterError("This is not the master node")
327
    # first archive old jobs
328
    self.ArchiveJobs(opts.job_age)
329
    # and only then submit new ones
330
    self.instances, self.bootids, self.smap = GetClusterData()
331
    self.started_instances = set()
332
    self.opts = opts
333

    
334
  def Run(self):
335
    """Watcher run sequence.
336

    
337
    """
338
    notepad = self.notepad
339
    self.CheckInstances(notepad)
340
    self.CheckDisks(notepad)
341
    self.VerifyDisks()
342

    
343
  @staticmethod
344
  def ArchiveJobs(age):
345
    """Archive old jobs.
346

    
347
    """
348
    arch_count, left_count = client.AutoArchiveJobs(age)
349
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
350

    
351
  def CheckDisks(self, notepad):
352
    """Check all nodes for restarted ones.
353

    
354
    """
355
    check_nodes = []
356
    for name, (new_id, offline) in self.bootids.iteritems():
357
      old = notepad.GetNodeBootID(name)
358
      if new_id is None:
359
        # Bad node, not returning a boot id
360
        if not offline:
361
          logging.debug("Node %s missing boot id, skipping secondary checks",
362
                        name)
363
        continue
364
      if old != new_id:
365
        # Node's boot ID has changed, proably through a reboot.
366
        check_nodes.append(name)
367

    
368
    if check_nodes:
369
      # Activate disks for all instances with any of the checked nodes as a
370
      # secondary node.
371
      for node in check_nodes:
372
        if node not in self.smap:
373
          continue
374
        for instance_name in self.smap[node]:
375
          instance = self.instances[instance_name]
376
          if not instance.autostart:
377
            logging.info(("Skipping disk activation for non-autostart"
378
                          " instance %s"), instance.name)
379
            continue
380
          if instance.name in self.started_instances:
381
            # we already tried to start the instance, which should have
382
            # activated its drives (if they can be at all)
383
            continue
384
          try:
385
            logging.info("Activating disks for instance %s", instance.name)
386
            instance.ActivateDisks()
387
          except Exception: # pylint: disable-msg=W0703
388
            logging.exception("Error while activating disks for instance %s",
389
                              instance.name)
390

    
391
      # Keep changed boot IDs
392
      for name in check_nodes:
393
        notepad.SetNodeBootID(name, self.bootids[name][0])
394

    
395
  def CheckInstances(self, notepad):
396
    """Make a pass over the list of instances, restarting downed ones.
397

    
398
    """
399
    for instance in self.instances.values():
400
      if instance.state in BAD_STATES:
401
        n = notepad.NumberOfRestartAttempts(instance)
402

    
403
        if n > MAXTRIES:
404
          # stay quiet.
405
          continue
406
        elif n < MAXTRIES:
407
          last = " (Attempt #%d)" % (n + 1)
408
        else:
409
          notepad.RecordRestartAttempt(instance)
410
          logging.error("Could not restart %s after %d attempts, giving up",
411
                        instance.name, MAXTRIES)
412
          continue
413
        try:
414
          logging.info("Restarting %s%s",
415
                        instance.name, last)
416
          instance.Restart()
417
          self.started_instances.add(instance.name)
418
        except Exception: # pylint: disable-msg=W0703
419
          logging.exception("Error while restarting instance %s",
420
                            instance.name)
421

    
422
        notepad.RecordRestartAttempt(instance)
423
      elif instance.state in HELPLESS_STATES:
424
        if notepad.NumberOfRestartAttempts(instance):
425
          notepad.RemoveInstance(instance)
426
      else:
427
        if notepad.NumberOfRestartAttempts(instance):
428
          notepad.RemoveInstance(instance)
429
          logging.info("Restart of %s succeeded", instance.name)
430

    
431
  @staticmethod
432
  def VerifyDisks():
433
    """Run gnt-cluster verify-disks.
434

    
435
    """
436
    op = opcodes.OpVerifyDisks()
437
    job_id = client.SubmitJob([op])
438
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
439
    client.ArchiveJob(job_id)
440
    if not isinstance(result, (tuple, list)):
441
      logging.error("Can't get a valid result from verify-disks")
442
      return
443
    offline_disk_instances = result[2]
444
    if not offline_disk_instances:
445
      # nothing to do
446
      return
447
    logging.debug("Will activate disks for instances %s",
448
                  utils.CommaJoin(offline_disk_instances))
449
    # we submit only one job, and wait for it. not optimal, but spams
450
    # less the job queue
451
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
452
           for name in offline_disk_instances]
453
    job_id = cli.SendJob(job, cl=client)
454

    
455
    try:
456
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
457
    except Exception: # pylint: disable-msg=W0703
458
      logging.exception("Error while activating disks")
459

    
460

    
461
def ParseOptions():
462
  """Parse the command line options.
463

    
464
  @return: (options, args) as from OptionParser.parse_args()
465

    
466
  """
467
  parser = OptionParser(description="Ganeti cluster watcher",
468
                        usage="%prog [-d]",
469
                        version="%%prog (ganeti) %s" %
470
                        constants.RELEASE_VERSION)
471

    
472
  parser.add_option(cli.DEBUG_OPT)
473
  parser.add_option("-A", "--job-age", dest="job_age",
474
                    help="Autoarchive jobs older than this age (default"
475
                    " 6 hours)", default=6*3600)
476
  options, args = parser.parse_args()
477
  options.job_age = cli.ParseTimespec(options.job_age)
478
  return options, args
479

    
480

    
481
def main():
482
  """Main function.
483

    
484
  """
485
  global client # pylint: disable-msg=W0603
486

    
487
  options, args = ParseOptions()
488

    
489
  if args: # watcher doesn't take any arguments
490
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
491
    sys.exit(constants.EXIT_FAILURE)
492

    
493
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
494
                     stderr_logging=options.debug)
495

    
496
  if ShouldPause():
497
    logging.debug("Pause has been set, exiting")
498
    sys.exit(constants.EXIT_SUCCESS)
499

    
500
  update_file = False
501
  try:
502
    StartNodeDaemons()
503
    RunWatcherHooks()
504

    
505
    notepad = WatcherState()
506
    try:
507
      try:
508
        client = cli.GetClient()
509
      except errors.OpPrereqError:
510
        # this is, from cli.GetClient, a not-master case
511
        logging.debug("Not on master, exiting")
512
        update_file = True
513
        sys.exit(constants.EXIT_SUCCESS)
514
      except luxi.NoMasterError, err:
515
        logging.warning("Master seems to be down (%s), trying to restart",
516
                        str(err))
517
        if not utils.EnsureDaemon(constants.MASTERD):
518
          logging.critical("Can't start the master, exiting")
519
          sys.exit(constants.EXIT_FAILURE)
520
        # else retry the connection
521
        client = cli.GetClient()
522

    
523
      # we are on master now
524
      utils.EnsureDaemon(constants.RAPI)
525

    
526
      try:
527
        watcher = Watcher(options, notepad)
528
      except errors.ConfigurationError:
529
        # Just exit if there's no configuration
530
        update_file = True
531
        sys.exit(constants.EXIT_SUCCESS)
532

    
533
      watcher.Run()
534
      update_file = True
535

    
536
    finally:
537
      if update_file:
538
        notepad.Save()
539
      else:
540
        logging.debug("Not updating status file due to failure")
541
  except SystemExit:
542
    raise
543
  except NotMasterError:
544
    logging.debug("Not master, exiting")
545
    sys.exit(constants.EXIT_NOTMASTER)
546
  except errors.ResolverError, err:
547
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
548
    sys.exit(constants.EXIT_NODESETUP_ERROR)
549
  except errors.JobQueueFull:
550
    logging.error("Job queue is full, can't query cluster state")
551
  except errors.JobQueueDrainError:
552
    logging.error("Job queue is drained, can't maintain cluster state")
553
  except Exception, err:
554
    logging.error(str(err), exc_info=True)
555
    sys.exit(constants.EXIT_FAILURE)
556

    
557

    
558
if __name__ == '__main__':
559
  main()