Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 0cc9e018

History | View | Annotate | Download (16.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import compat
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44
from ganeti import rapi
45
from ganeti import netutils
46

    
47
import ganeti.rapi.client # pylint: disable-msg=W0611
48

    
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
51

    
52

    
53
MAXTRIES = 5
54

    
55

    
56
# Global LUXI client object
57
client = None
58
BAD_STATES = frozenset([
59
  constants.INSTST_ERRORDOWN,
60
  ])
61
HELPLESS_STATES = frozenset([
62
  constants.INSTST_NODEDOWN,
63
  constants.INSTST_NODEOFFLINE,
64
  ])
65
NOTICE = "NOTICE"
66
ERROR = "ERROR"
67

    
68

    
69
class NotMasterError(errors.GenericError):
70
  """Exception raised when this host is not the master."""
71

    
72

    
73
def ShouldPause():
74
  """Check whether we should pause.
75

76
  """
77
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
78

    
79

    
80
def StartNodeDaemons():
81
  """Start all the daemons that should be running on all nodes.
82

83
  """
84
  # on master or not, try to start the node daemon
85
  utils.EnsureDaemon(constants.NODED)
86
  # start confd as well. On non candidates it will be in disabled mode.
87
  utils.EnsureDaemon(constants.CONFD)
88

    
89

    
90
def RunWatcherHooks():
91
  """Run the watcher hooks.
92

93
  """
94
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
95
                             constants.HOOKS_NAME_WATCHER)
96
  if not os.path.isdir(hooks_dir):
97
    return
98

    
99
  try:
100
    results = utils.RunParts(hooks_dir)
101
  except Exception: # pylint: disable-msg=W0703
102
    logging.exception("RunParts %s failed: %s", hooks_dir)
103
    return
104

    
105
  for (relname, status, runresult) in results:
106
    if status == constants.RUNPARTS_SKIP:
107
      logging.debug("Watcher hook %s: skipped", relname)
108
    elif status == constants.RUNPARTS_ERR:
109
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110
    elif status == constants.RUNPARTS_RUN:
111
      if runresult.failed:
112
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113
                        relname, runresult.exit_code, runresult.output)
114
      else:
115
        logging.debug("Watcher hook %s: success (output: %s)", relname,
116
                      runresult.output)
117

    
118

    
119
class Instance(object):
120
  """Abstraction for a Virtual Machine instance.
121

122
  """
123
  def __init__(self, name, status, autostart, snodes):
124
    self.name = name
125
    self.status = status
126
    self.autostart = autostart
127
    self.snodes = snodes
128

    
129
  def Restart(self):
130
    """Encapsulates the start of an instance.
131

132
    """
133
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
134
    cli.SubmitOpCode(op, cl=client)
135

    
136
  def ActivateDisks(self):
137
    """Encapsulates the activation of all disks of an instance.
138

139
    """
140
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
141
    cli.SubmitOpCode(op, cl=client)
142

    
143

    
144
def GetClusterData():
145
  """Get a list of instances on this cluster.
146

147
  """
148
  op1_fields = ["name", "status", "admin_state", "snodes"]
149
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
150
                                use_locking=True)
151
  op2_fields = ["name", "bootid", "offline"]
152
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
153
                            use_locking=True)
154

    
155
  job_id = client.SubmitJob([op1, op2])
156

    
157
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
158

    
159
  logging.debug("Got data from cluster, writing instance status file")
160

    
161
  result = all_results[0]
162
  smap = {}
163

    
164
  instances = {}
165

    
166
  # write the instance status file
167
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
168
  utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
169

    
170
  for fields in result:
171
    (name, status, autostart, snodes) = fields
172

    
173
    # update the secondary node map
174
    for node in snodes:
175
      if node not in smap:
176
        smap[node] = []
177
      smap[node].append(name)
178

    
179
    instances[name] = Instance(name, status, autostart, snodes)
180

    
181
  nodes =  dict([(name, (bootid, offline))
182
                 for name, bootid, offline in all_results[1]])
183

    
184
  client.ArchiveJob(job_id)
185

    
186
  return instances, nodes, smap
187

    
188

    
189
class Watcher(object):
190
  """Encapsulate the logic for restarting erroneously halted virtual machines.
191

192
  The calling program should periodically instantiate me and call Run().
193
  This will traverse the list of instances, and make up to MAXTRIES attempts
194
  to restart machines that are down.
195

196
  """
197
  def __init__(self, opts, notepad):
198
    self.notepad = notepad
199
    master = client.QueryConfigValues(["master_node"])[0]
200
    if master != netutils.Hostname.GetSysName():
201
      raise NotMasterError("This is not the master node")
202
    # first archive old jobs
203
    self.ArchiveJobs(opts.job_age)
204
    # and only then submit new ones
205
    self.instances, self.bootids, self.smap = GetClusterData()
206
    self.started_instances = set()
207
    self.opts = opts
208

    
209
  def Run(self):
210
    """Watcher run sequence.
211

212
    """
213
    notepad = self.notepad
214
    self.CheckInstances(notepad)
215
    self.CheckDisks(notepad)
216
    self.VerifyDisks()
217

    
218
  @staticmethod
219
  def ArchiveJobs(age):
220
    """Archive old jobs.
221

222
    """
223
    arch_count, left_count = client.AutoArchiveJobs(age)
224
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
225

    
226
  def CheckDisks(self, notepad):
227
    """Check all nodes for restarted ones.
228

229
    """
230
    check_nodes = []
231
    for name, (new_id, offline) in self.bootids.iteritems():
232
      old = notepad.GetNodeBootID(name)
233
      if new_id is None:
234
        # Bad node, not returning a boot id
235
        if not offline:
236
          logging.debug("Node %s missing boot id, skipping secondary checks",
237
                        name)
238
        continue
239
      if old != new_id:
240
        # Node's boot ID has changed, proably through a reboot.
241
        check_nodes.append(name)
242

    
243
    if check_nodes:
244
      # Activate disks for all instances with any of the checked nodes as a
245
      # secondary node.
246
      for node in check_nodes:
247
        if node not in self.smap:
248
          continue
249
        for instance_name in self.smap[node]:
250
          instance = self.instances[instance_name]
251
          if not instance.autostart:
252
            logging.info(("Skipping disk activation for non-autostart"
253
                          " instance %s"), instance.name)
254
            continue
255
          if instance.name in self.started_instances:
256
            # we already tried to start the instance, which should have
257
            # activated its drives (if they can be at all)
258
            logging.debug("Skipping disk activation for instance %s, as"
259
                          " it was already started", instance.name)
260
            continue
261
          try:
262
            logging.info("Activating disks for instance %s", instance.name)
263
            instance.ActivateDisks()
264
          except Exception: # pylint: disable-msg=W0703
265
            logging.exception("Error while activating disks for instance %s",
266
                              instance.name)
267

    
268
      # Keep changed boot IDs
269
      for name in check_nodes:
270
        notepad.SetNodeBootID(name, self.bootids[name][0])
271

    
272
  def CheckInstances(self, notepad):
273
    """Make a pass over the list of instances, restarting downed ones.
274

275
    """
276
    notepad.MaintainInstanceList(self.instances.keys())
277

    
278
    for instance in self.instances.values():
279
      if instance.status in BAD_STATES:
280
        n = notepad.NumberOfRestartAttempts(instance)
281

    
282
        if n > MAXTRIES:
283
          logging.warning("Not restarting instance %s, retries exhausted",
284
                          instance.name)
285
          continue
286
        elif n < MAXTRIES:
287
          last = " (Attempt #%d)" % (n + 1)
288
        else:
289
          notepad.RecordRestartAttempt(instance)
290
          logging.error("Could not restart %s after %d attempts, giving up",
291
                        instance.name, MAXTRIES)
292
          continue
293
        try:
294
          logging.info("Restarting %s%s", instance.name, last)
295
          instance.Restart()
296
          self.started_instances.add(instance.name)
297
        except Exception: # pylint: disable-msg=W0703
298
          logging.exception("Error while restarting instance %s",
299
                            instance.name)
300

    
301
        notepad.RecordRestartAttempt(instance)
302
      elif instance.status in HELPLESS_STATES:
303
        if notepad.NumberOfRestartAttempts(instance):
304
          notepad.RemoveInstance(instance)
305
      else:
306
        if notepad.NumberOfRestartAttempts(instance):
307
          notepad.RemoveInstance(instance)
308
          logging.info("Restart of %s succeeded", instance.name)
309

    
310
  def _CheckForOfflineNodes(self, instance):
311
    """Checks if given instances has any secondary in offline status.
312

313
    @param instance: The instance object
314
    @return: True if any of the secondary is offline, False otherwise
315

316
    """
317
    bootids = []
318
    for node in instance.snodes:
319
      bootids.append(self.bootids[node])
320

    
321
    return compat.any(offline for (_, offline) in bootids)
322

    
323
  def VerifyDisks(self):
324
    """Run gnt-cluster verify-disks.
325

326
    """
327
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
328
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
329
    client.ArchiveJob(job_id)
330

    
331
    # Keep track of submitted jobs
332
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
333

    
334
    archive_jobs = set()
335
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
336
      jex.AddJobId(None, status, job_id)
337
      if status:
338
        archive_jobs.add(job_id)
339

    
340
    offline_disk_instances = set()
341

    
342
    for (status, result) in jex.GetResults():
343
      if not status:
344
        logging.error("Verify-disks job failed: %s", result)
345
        continue
346

    
347
      ((_, instances, _), ) = result
348

    
349
      offline_disk_instances.update(instances)
350

    
351
    for job_id in archive_jobs:
352
      client.ArchiveJob(job_id)
353

    
354
    if not offline_disk_instances:
355
      # nothing to do
356
      logging.debug("verify-disks reported no offline disks, nothing to do")
357
      return
358

    
359
    logging.debug("Will activate disks for instance(s) %s",
360
                  utils.CommaJoin(offline_disk_instances))
361

    
362
    # we submit only one job, and wait for it. not optimal, but spams
363
    # less the job queue
364
    job = []
365
    for name in offline_disk_instances:
366
      instance = self.instances[name]
367
      if (instance.status in HELPLESS_STATES or
368
          self._CheckForOfflineNodes(instance)):
369
        logging.info("Skip instance %s because it is in helpless state or has"
370
                     " one offline secondary", name)
371
        continue
372
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
373

    
374
    if job:
375
      job_id = cli.SendJob(job, cl=client)
376

    
377
      try:
378
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
379
      except Exception: # pylint: disable-msg=W0703
380
        logging.exception("Error while activating disks")
381

    
382

    
383
def IsRapiResponding(hostname):
384
  """Connects to RAPI port and does a simple test.
385

386
  Connects to RAPI port of hostname and does a simple test. At this time, the
387
  test is GetVersion.
388

389
  @type hostname: string
390
  @param hostname: hostname of the node to connect to.
391
  @rtype: bool
392
  @return: Whether RAPI is working properly
393

394
  """
395
  curl_config = rapi.client.GenericCurlConfig()
396
  rapi_client = rapi.client.GanetiRapiClient(hostname,
397
                                             curl_config_fn=curl_config)
398
  try:
399
    master_version = rapi_client.GetVersion()
400
  except rapi.client.CertificateError, err:
401
    logging.warning("RAPI Error: CertificateError (%s)", err)
402
    return False
403
  except rapi.client.GanetiApiError, err:
404
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
405
    return False
406
  logging.debug("RAPI Result: master_version is %s", master_version)
407
  return master_version == constants.RAPI_VERSION
408

    
409

    
410
def ParseOptions():
411
  """Parse the command line options.
412

413
  @return: (options, args) as from OptionParser.parse_args()
414

415
  """
416
  parser = OptionParser(description="Ganeti cluster watcher",
417
                        usage="%prog [-d]",
418
                        version="%%prog (ganeti) %s" %
419
                        constants.RELEASE_VERSION)
420

    
421
  parser.add_option(cli.DEBUG_OPT)
422
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
423
                    help="Autoarchive jobs older than this age (default"
424
                          " 6 hours)")
425
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
426
                    action="store_true", help="Ignore cluster pause setting")
427
  options, args = parser.parse_args()
428
  options.job_age = cli.ParseTimespec(options.job_age)
429

    
430
  if args:
431
    parser.error("No arguments expected")
432

    
433
  return (options, args)
434

    
435

    
436
@rapi.client.UsesRapiClient
437
def Main():
438
  """Main function.
439

440
  """
441
  global client # pylint: disable-msg=W0603
442

    
443
  (options, _) = ParseOptions()
444

    
445
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
446
                     debug=options.debug, stderr_logging=options.debug)
447

    
448
  if ShouldPause() and not options.ignore_pause:
449
    logging.debug("Pause has been set, exiting")
450
    return constants.EXIT_SUCCESS
451

    
452
  statefile = \
453
    state.OpenStateFile(constants.WATCHER_STATEFILE)
454
  if not statefile:
455
    return constants.EXIT_FAILURE
456

    
457
  update_file = False
458
  try:
459
    StartNodeDaemons()
460
    RunWatcherHooks()
461
    # run node maintenance in all cases, even if master, so that old
462
    # masters can be properly cleaned up too
463
    if nodemaint.NodeMaintenance.ShouldRun():
464
      nodemaint.NodeMaintenance().Exec()
465

    
466
    notepad = state.WatcherState(statefile)
467
    try:
468
      try:
469
        client = cli.GetClient()
470
      except errors.OpPrereqError:
471
        # this is, from cli.GetClient, a not-master case
472
        logging.debug("Not on master, exiting")
473
        update_file = True
474
        return constants.EXIT_SUCCESS
475
      except luxi.NoMasterError, err:
476
        logging.warning("Master seems to be down (%s), trying to restart",
477
                        str(err))
478
        if not utils.EnsureDaemon(constants.MASTERD):
479
          logging.critical("Can't start the master, exiting")
480
          return constants.EXIT_FAILURE
481
        # else retry the connection
482
        client = cli.GetClient()
483

    
484
      # we are on master now
485
      utils.EnsureDaemon(constants.RAPI)
486

    
487
      # If RAPI isn't responding to queries, try one restart.
488
      logging.debug("Attempting to talk with RAPI.")
489
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
490
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
491
                        " Restarting Ganeti RAPI.")
492
        utils.StopDaemon(constants.RAPI)
493
        utils.EnsureDaemon(constants.RAPI)
494
        logging.debug("Second attempt to talk with RAPI")
495
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
496
          logging.fatal("RAPI is not responding. Please investigate.")
497
      logging.debug("Successfully talked to RAPI.")
498

    
499
      try:
500
        watcher = Watcher(options, notepad)
501
      except errors.ConfigurationError:
502
        # Just exit if there's no configuration
503
        update_file = True
504
        return constants.EXIT_SUCCESS
505

    
506
      watcher.Run()
507
      update_file = True
508

    
509
    finally:
510
      if update_file:
511
        notepad.Save()
512
      else:
513
        logging.debug("Not updating status file due to failure")
514
  except SystemExit:
515
    raise
516
  except NotMasterError:
517
    logging.debug("Not master, exiting")
518
    return constants.EXIT_NOTMASTER
519
  except errors.ResolverError, err:
520
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
521
    return constants.EXIT_NODESETUP_ERROR
522
  except errors.JobQueueFull:
523
    logging.error("Job queue is full, can't query cluster state")
524
  except errors.JobQueueDrainError:
525
    logging.error("Job queue is drained, can't maintain cluster state")
526
  except Exception, err:
527
    logging.exception(str(err))
528
    return constants.EXIT_FAILURE
529

    
530
  return constants.EXIT_SUCCESS