Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 8f07dc0d

History | View | Annotate | Download (17 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import compat
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44
from ganeti import rapi
45
from ganeti import netutils
46

    
47
import ganeti.rapi.client # pylint: disable-msg=W0611
48

    
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
51

    
52

    
53
MAXTRIES = 5
54

    
55

    
56
# Global LUXI client object
57
client = None
58
BAD_STATES = frozenset([
59
  constants.INSTST_ERRORDOWN,
60
  ])
61
HELPLESS_STATES = frozenset([
62
  constants.INSTST_NODEDOWN,
63
  constants.INSTST_NODEOFFLINE,
64
  ])
65
NOTICE = "NOTICE"
66
ERROR = "ERROR"
67

    
68

    
69
class NotMasterError(errors.GenericError):
70
  """Exception raised when this host is not the master."""
71

    
72

    
73
def ShouldPause():
74
  """Check whether we should pause.
75

76
  """
77
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
78

    
79

    
80
def StartNodeDaemons():
81
  """Start all the daemons that should be running on all nodes.
82

83
  """
84
  # on master or not, try to start the node daemon
85
  utils.EnsureDaemon(constants.NODED)
86
  # start confd as well. On non candidates it will be in disabled mode.
87
  utils.EnsureDaemon(constants.CONFD)
88

    
89

    
90
def RunWatcherHooks():
91
  """Run the watcher hooks.
92

93
  """
94
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
95
                             constants.HOOKS_NAME_WATCHER)
96
  if not os.path.isdir(hooks_dir):
97
    return
98

    
99
  try:
100
    results = utils.RunParts(hooks_dir)
101
  except Exception: # pylint: disable-msg=W0703
102
    logging.exception("RunParts %s failed: %s", hooks_dir)
103
    return
104

    
105
  for (relname, status, runresult) in results:
106
    if status == constants.RUNPARTS_SKIP:
107
      logging.debug("Watcher hook %s: skipped", relname)
108
    elif status == constants.RUNPARTS_ERR:
109
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110
    elif status == constants.RUNPARTS_RUN:
111
      if runresult.failed:
112
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113
                        relname, runresult.exit_code, runresult.output)
114
      else:
115
        logging.debug("Watcher hook %s: success (output: %s)", relname,
116
                      runresult.output)
117
    else:
118
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
119
                                   status)
120

    
121

    
122
class Instance(object):
123
  """Abstraction for a Virtual Machine instance.
124

125
  """
126
  def __init__(self, name, status, autostart, snodes):
127
    self.name = name
128
    self.status = status
129
    self.autostart = autostart
130
    self.snodes = snodes
131

    
132
  def Restart(self):
133
    """Encapsulates the start of an instance.
134

135
    """
136
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
137
    cli.SubmitOpCode(op, cl=client)
138

    
139
  def ActivateDisks(self):
140
    """Encapsulates the activation of all disks of an instance.
141

142
    """
143
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
144
    cli.SubmitOpCode(op, cl=client)
145

    
146

    
147
def GetClusterData():
148
  """Get a list of instances on this cluster.
149

150
  """
151
  op1_fields = ["name", "status", "admin_state", "snodes"]
152
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
153
                                use_locking=True)
154
  op2_fields = ["name", "bootid", "offline"]
155
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
156
                            use_locking=True)
157

    
158
  job_id = client.SubmitJob([op1, op2])
159

    
160
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
161

    
162
  logging.debug("Got data from cluster, writing instance status file")
163

    
164
  result = all_results[0]
165
  smap = {}
166

    
167
  instances = {}
168

    
169
  _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
170

    
171
  for fields in result:
172
    (name, status, autostart, snodes) = fields
173

    
174
    # update the secondary node map
175
    for node in snodes:
176
      if node not in smap:
177
        smap[node] = []
178
      smap[node].append(name)
179

    
180
    instances[name] = Instance(name, status, autostart, snodes)
181

    
182
  nodes =  dict([(name, (bootid, offline))
183
                 for name, bootid, offline in all_results[1]])
184

    
185
  client.ArchiveJob(job_id)
186

    
187
  return instances, nodes, smap
188

    
189

    
190
class Watcher(object):
191
  """Encapsulate the logic for restarting erroneously halted virtual machines.
192

193
  The calling program should periodically instantiate me and call Run().
194
  This will traverse the list of instances, and make up to MAXTRIES attempts
195
  to restart machines that are down.
196

197
  """
198
  def __init__(self, opts, notepad):
199
    self.notepad = notepad
200
    master = client.QueryConfigValues(["master_node"])[0]
201
    if master != netutils.Hostname.GetSysName():
202
      raise NotMasterError("This is not the master node")
203
    # first archive old jobs
204
    self.ArchiveJobs(opts.job_age)
205
    # and only then submit new ones
206
    self.instances, self.bootids, self.smap = GetClusterData()
207
    self.started_instances = set()
208
    self.opts = opts
209

    
210
  def Run(self):
211
    """Watcher run sequence.
212

213
    """
214
    notepad = self.notepad
215
    self.CheckInstances(notepad)
216
    self.CheckDisks(notepad)
217
    self.VerifyDisks()
218

    
219
  @staticmethod
220
  def ArchiveJobs(age):
221
    """Archive old jobs.
222

223
    """
224
    arch_count, left_count = client.AutoArchiveJobs(age)
225
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
226

    
227
  def CheckDisks(self, notepad):
228
    """Check all nodes for restarted ones.
229

230
    """
231
    check_nodes = []
232
    for name, (new_id, offline) in self.bootids.iteritems():
233
      old = notepad.GetNodeBootID(name)
234
      if new_id is None:
235
        # Bad node, not returning a boot id
236
        if not offline:
237
          logging.debug("Node %s missing boot id, skipping secondary checks",
238
                        name)
239
        continue
240
      if old != new_id:
241
        # Node's boot ID has changed, proably through a reboot.
242
        check_nodes.append(name)
243

    
244
    if check_nodes:
245
      # Activate disks for all instances with any of the checked nodes as a
246
      # secondary node.
247
      for node in check_nodes:
248
        if node not in self.smap:
249
          continue
250
        for instance_name in self.smap[node]:
251
          instance = self.instances[instance_name]
252
          if not instance.autostart:
253
            logging.info(("Skipping disk activation for non-autostart"
254
                          " instance %s"), instance.name)
255
            continue
256
          if instance.name in self.started_instances:
257
            # we already tried to start the instance, which should have
258
            # activated its drives (if they can be at all)
259
            logging.debug("Skipping disk activation for instance %s, as"
260
                          " it was already started", instance.name)
261
            continue
262
          try:
263
            logging.info("Activating disks for instance %s", instance.name)
264
            instance.ActivateDisks()
265
          except Exception: # pylint: disable-msg=W0703
266
            logging.exception("Error while activating disks for instance %s",
267
                              instance.name)
268

    
269
      # Keep changed boot IDs
270
      for name in check_nodes:
271
        notepad.SetNodeBootID(name, self.bootids[name][0])
272

    
273
  def CheckInstances(self, notepad):
274
    """Make a pass over the list of instances, restarting downed ones.
275

276
    """
277
    notepad.MaintainInstanceList(self.instances.keys())
278

    
279
    for instance in self.instances.values():
280
      if instance.status in BAD_STATES:
281
        n = notepad.NumberOfRestartAttempts(instance.name)
282

    
283
        if n > MAXTRIES:
284
          logging.warning("Not restarting instance %s, retries exhausted",
285
                          instance.name)
286
          continue
287
        elif n < MAXTRIES:
288
          last = " (Attempt #%d)" % (n + 1)
289
        else:
290
          notepad.RecordRestartAttempt(instance.name)
291
          logging.error("Could not restart %s after %d attempts, giving up",
292
                        instance.name, MAXTRIES)
293
          continue
294
        try:
295
          logging.info("Restarting %s%s", instance.name, last)
296
          instance.Restart()
297
          self.started_instances.add(instance.name)
298
        except Exception: # pylint: disable-msg=W0703
299
          logging.exception("Error while restarting instance %s",
300
                            instance.name)
301

    
302
        notepad.RecordRestartAttempt(instance.name)
303
      elif instance.status in HELPLESS_STATES:
304
        if notepad.NumberOfRestartAttempts(instance.name):
305
          notepad.RemoveInstance(instance.name)
306
      else:
307
        if notepad.NumberOfRestartAttempts(instance.name):
308
          notepad.RemoveInstance(instance.name)
309
          logging.info("Restart of %s succeeded", instance.name)
310

    
311
  def _CheckForOfflineNodes(self, instance):
312
    """Checks if given instances has any secondary in offline status.
313

314
    @param instance: The instance object
315
    @return: True if any of the secondary is offline, False otherwise
316

317
    """
318
    bootids = []
319
    for node in instance.snodes:
320
      bootids.append(self.bootids[node])
321

    
322
    return compat.any(offline for (_, offline) in bootids)
323

    
324
  def VerifyDisks(self):
325
    """Run gnt-cluster verify-disks.
326

327
    """
328
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
329
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
330
    client.ArchiveJob(job_id)
331

    
332
    # Keep track of submitted jobs
333
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
334

    
335
    archive_jobs = set()
336
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
337
      jex.AddJobId(None, status, job_id)
338
      if status:
339
        archive_jobs.add(job_id)
340

    
341
    offline_disk_instances = set()
342

    
343
    for (status, result) in jex.GetResults():
344
      if not status:
345
        logging.error("Verify-disks job failed: %s", result)
346
        continue
347

    
348
      ((_, instances, _), ) = result
349

    
350
      offline_disk_instances.update(instances)
351

    
352
    for job_id in archive_jobs:
353
      client.ArchiveJob(job_id)
354

    
355
    if not offline_disk_instances:
356
      # nothing to do
357
      logging.debug("verify-disks reported no offline disks, nothing to do")
358
      return
359

    
360
    logging.debug("Will activate disks for instance(s) %s",
361
                  utils.CommaJoin(offline_disk_instances))
362

    
363
    # we submit only one job, and wait for it. not optimal, but spams
364
    # less the job queue
365
    job = []
366
    for name in offline_disk_instances:
367
      instance = self.instances[name]
368
      if (instance.status in HELPLESS_STATES or
369
          self._CheckForOfflineNodes(instance)):
370
        logging.info("Skip instance %s because it is in helpless state or has"
371
                     " one offline secondary", name)
372
        continue
373
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
374

    
375
    if job:
376
      job_id = cli.SendJob(job, cl=client)
377

    
378
      try:
379
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
380
      except Exception: # pylint: disable-msg=W0703
381
        logging.exception("Error while activating disks")
382

    
383

    
384
def IsRapiResponding(hostname):
385
  """Connects to RAPI port and does a simple test.
386

387
  Connects to RAPI port of hostname and does a simple test. At this time, the
388
  test is GetVersion.
389

390
  @type hostname: string
391
  @param hostname: hostname of the node to connect to.
392
  @rtype: bool
393
  @return: Whether RAPI is working properly
394

395
  """
396
  curl_config = rapi.client.GenericCurlConfig()
397
  rapi_client = rapi.client.GanetiRapiClient(hostname,
398
                                             curl_config_fn=curl_config)
399
  try:
400
    master_version = rapi_client.GetVersion()
401
  except rapi.client.CertificateError, err:
402
    logging.warning("RAPI certificate error: %s", err)
403
    return False
404
  except rapi.client.GanetiApiError, err:
405
    logging.warning("RAPI error: %s", err)
406
    return False
407
  else:
408
    logging.debug("Reported RAPI version %s", master_version)
409
    return master_version == constants.RAPI_VERSION
410

    
411

    
412
def ParseOptions():
413
  """Parse the command line options.
414

415
  @return: (options, args) as from OptionParser.parse_args()
416

417
  """
418
  parser = OptionParser(description="Ganeti cluster watcher",
419
                        usage="%prog [-d]",
420
                        version="%%prog (ganeti) %s" %
421
                        constants.RELEASE_VERSION)
422

    
423
  parser.add_option(cli.DEBUG_OPT)
424
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
425
                    help="Autoarchive jobs older than this age (default"
426
                          " 6 hours)")
427
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
428
                    action="store_true", help="Ignore cluster pause setting")
429
  options, args = parser.parse_args()
430
  options.job_age = cli.ParseTimespec(options.job_age)
431

    
432
  if args:
433
    parser.error("No arguments expected")
434

    
435
  return (options, args)
436

    
437

    
438
def _UpdateInstanceStatus(cl, filename):
439
  """Get a list of instances on this cluster.
440

441
  @todo: Think about doing this per nodegroup, too
442

443
  """
444
  op = opcodes.OpInstanceQuery(output_fields=["name", "status"], names=[],
445
                               use_locking=True)
446
  job_id = cl.SubmitJob([op])
447
  (result, ) = cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
448

    
449
  cl.ArchiveJob(job_id)
450

    
451
  logging.debug("Got instance data, writing status file %s", filename)
452

    
453
  utils.WriteFile(filename, data="".join("%s %s\n" % (name, status)
454
                                         for (name, status) in result))
455

    
456

    
457
@rapi.client.UsesRapiClient
458
def Main():
459
  """Main function.
460

461
  """
462
  global client # pylint: disable-msg=W0603
463

    
464
  (options, _) = ParseOptions()
465

    
466
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
467
                     debug=options.debug, stderr_logging=options.debug)
468

    
469
  if ShouldPause() and not options.ignore_pause:
470
    logging.debug("Pause has been set, exiting")
471
    return constants.EXIT_SUCCESS
472

    
473
  statefile = \
474
    state.OpenStateFile(constants.WATCHER_STATEFILE)
475
  if not statefile:
476
    return constants.EXIT_FAILURE
477

    
478
  update_file = False
479
  try:
480
    StartNodeDaemons()
481
    RunWatcherHooks()
482
    # run node maintenance in all cases, even if master, so that old
483
    # masters can be properly cleaned up too
484
    if nodemaint.NodeMaintenance.ShouldRun():
485
      nodemaint.NodeMaintenance().Exec()
486

    
487
    notepad = state.WatcherState(statefile)
488
    try:
489
      try:
490
        client = cli.GetClient()
491
      except errors.OpPrereqError:
492
        # this is, from cli.GetClient, a not-master case
493
        logging.debug("Not on master, exiting")
494
        update_file = True
495
        return constants.EXIT_SUCCESS
496
      except luxi.NoMasterError, err:
497
        logging.warning("Master seems to be down (%s), trying to restart",
498
                        str(err))
499
        if not utils.EnsureDaemon(constants.MASTERD):
500
          logging.critical("Can't start the master, exiting")
501
          return constants.EXIT_FAILURE
502
        # else retry the connection
503
        client = cli.GetClient()
504

    
505
      # we are on master now
506
      utils.EnsureDaemon(constants.RAPI)
507

    
508
      # If RAPI isn't responding to queries, try one restart.
509
      logging.debug("Attempting to talk with RAPI.")
510
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
511
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
512
                        " Restarting Ganeti RAPI.")
513
        utils.StopDaemon(constants.RAPI)
514
        utils.EnsureDaemon(constants.RAPI)
515
        logging.debug("Second attempt to talk with RAPI")
516
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
517
          logging.fatal("RAPI is not responding. Please investigate.")
518
      logging.debug("Successfully talked to RAPI.")
519

    
520
      try:
521
        watcher = Watcher(options, notepad)
522
      except errors.ConfigurationError:
523
        # Just exit if there's no configuration
524
        update_file = True
525
        return constants.EXIT_SUCCESS
526

    
527
      watcher.Run()
528
      update_file = True
529

    
530
    finally:
531
      if update_file:
532
        notepad.Save(constants.WATCHER_STATEFILE)
533
      else:
534
        logging.debug("Not updating status file due to failure")
535
  except SystemExit:
536
    raise
537
  except NotMasterError:
538
    logging.debug("Not master, exiting")
539
    return constants.EXIT_NOTMASTER
540
  except errors.ResolverError, err:
541
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
542
    return constants.EXIT_NODESETUP_ERROR
543
  except errors.JobQueueFull:
544
    logging.error("Job queue is full, can't query cluster state")
545
  except errors.JobQueueDrainError:
546
    logging.error("Job queue is drained, can't maintain cluster state")
547
  except Exception, err:
548
    logging.exception(str(err))
549
    return constants.EXIT_FAILURE
550

    
551
  return constants.EXIT_SUCCESS