Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 54ca6e4b

History | View | Annotate | Download (16.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import compat
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44
from ganeti import rapi
45
from ganeti import netutils
46

    
47
import ganeti.rapi.client # pylint: disable-msg=W0611
48

    
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
51

    
52

    
53
MAXTRIES = 5
54

    
55

    
56
# Global LUXI client object
57
client = None
58
BAD_STATES = frozenset([
59
  constants.INSTST_ERRORDOWN,
60
  ])
61
HELPLESS_STATES = frozenset([
62
  constants.INSTST_NODEDOWN,
63
  constants.INSTST_NODEOFFLINE,
64
  ])
65
NOTICE = "NOTICE"
66
ERROR = "ERROR"
67

    
68

    
69
class NotMasterError(errors.GenericError):
70
  """Exception raised when this host is not the master."""
71

    
72

    
73
def ShouldPause():
74
  """Check whether we should pause.
75

76
  """
77
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
78

    
79

    
80
def StartNodeDaemons():
81
  """Start all the daemons that should be running on all nodes.
82

83
  """
84
  # on master or not, try to start the node daemon
85
  utils.EnsureDaemon(constants.NODED)
86
  # start confd as well. On non candidates it will be in disabled mode.
87
  utils.EnsureDaemon(constants.CONFD)
88

    
89

    
90
def RunWatcherHooks():
91
  """Run the watcher hooks.
92

93
  """
94
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
95
                             constants.HOOKS_NAME_WATCHER)
96
  if not os.path.isdir(hooks_dir):
97
    return
98

    
99
  try:
100
    results = utils.RunParts(hooks_dir)
101
  except Exception: # pylint: disable-msg=W0703
102
    logging.exception("RunParts %s failed: %s", hooks_dir)
103
    return
104

    
105
  for (relname, status, runresult) in results:
106
    if status == constants.RUNPARTS_SKIP:
107
      logging.debug("Watcher hook %s: skipped", relname)
108
    elif status == constants.RUNPARTS_ERR:
109
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
110
    elif status == constants.RUNPARTS_RUN:
111
      if runresult.failed:
112
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
113
                        relname, runresult.exit_code, runresult.output)
114
      else:
115
        logging.debug("Watcher hook %s: success (output: %s)", relname,
116
                      runresult.output)
117
    else:
118
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
119
                                   status)
120

    
121

    
122
class Instance(object):
123
  """Abstraction for a Virtual Machine instance.
124

125
  """
126
  def __init__(self, name, status, autostart, snodes):
127
    self.name = name
128
    self.status = status
129
    self.autostart = autostart
130
    self.snodes = snodes
131

    
132
  def Restart(self):
133
    """Encapsulates the start of an instance.
134

135
    """
136
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
137
    cli.SubmitOpCode(op, cl=client)
138

    
139
  def ActivateDisks(self):
140
    """Encapsulates the activation of all disks of an instance.
141

142
    """
143
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
144
    cli.SubmitOpCode(op, cl=client)
145

    
146

    
147
def GetClusterData():
148
  """Get a list of instances on this cluster.
149

150
  """
151
  op1_fields = ["name", "status", "admin_state", "snodes"]
152
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
153
                                use_locking=True)
154
  op2_fields = ["name", "bootid", "offline"]
155
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
156
                            use_locking=True)
157

    
158
  job_id = client.SubmitJob([op1, op2])
159

    
160
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
161

    
162
  logging.debug("Got data from cluster, writing instance status file")
163

    
164
  result = all_results[0]
165
  smap = {}
166

    
167
  instances = {}
168

    
169
  # write the instance status file
170
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
171
  utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
172

    
173
  for fields in result:
174
    (name, status, autostart, snodes) = fields
175

    
176
    # update the secondary node map
177
    for node in snodes:
178
      if node not in smap:
179
        smap[node] = []
180
      smap[node].append(name)
181

    
182
    instances[name] = Instance(name, status, autostart, snodes)
183

    
184
  nodes =  dict([(name, (bootid, offline))
185
                 for name, bootid, offline in all_results[1]])
186

    
187
  client.ArchiveJob(job_id)
188

    
189
  return instances, nodes, smap
190

    
191

    
192
class Watcher(object):
193
  """Encapsulate the logic for restarting erroneously halted virtual machines.
194

195
  The calling program should periodically instantiate me and call Run().
196
  This will traverse the list of instances, and make up to MAXTRIES attempts
197
  to restart machines that are down.
198

199
  """
200
  def __init__(self, opts, notepad):
201
    self.notepad = notepad
202
    master = client.QueryConfigValues(["master_node"])[0]
203
    if master != netutils.Hostname.GetSysName():
204
      raise NotMasterError("This is not the master node")
205
    # first archive old jobs
206
    self.ArchiveJobs(opts.job_age)
207
    # and only then submit new ones
208
    self.instances, self.bootids, self.smap = GetClusterData()
209
    self.started_instances = set()
210
    self.opts = opts
211

    
212
  def Run(self):
213
    """Watcher run sequence.
214

215
    """
216
    notepad = self.notepad
217
    self.CheckInstances(notepad)
218
    self.CheckDisks(notepad)
219
    self.VerifyDisks()
220

    
221
  @staticmethod
222
  def ArchiveJobs(age):
223
    """Archive old jobs.
224

225
    """
226
    arch_count, left_count = client.AutoArchiveJobs(age)
227
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
228

    
229
  def CheckDisks(self, notepad):
230
    """Check all nodes for restarted ones.
231

232
    """
233
    check_nodes = []
234
    for name, (new_id, offline) in self.bootids.iteritems():
235
      old = notepad.GetNodeBootID(name)
236
      if new_id is None:
237
        # Bad node, not returning a boot id
238
        if not offline:
239
          logging.debug("Node %s missing boot id, skipping secondary checks",
240
                        name)
241
        continue
242
      if old != new_id:
243
        # Node's boot ID has changed, proably through a reboot.
244
        check_nodes.append(name)
245

    
246
    if check_nodes:
247
      # Activate disks for all instances with any of the checked nodes as a
248
      # secondary node.
249
      for node in check_nodes:
250
        if node not in self.smap:
251
          continue
252
        for instance_name in self.smap[node]:
253
          instance = self.instances[instance_name]
254
          if not instance.autostart:
255
            logging.info(("Skipping disk activation for non-autostart"
256
                          " instance %s"), instance.name)
257
            continue
258
          if instance.name in self.started_instances:
259
            # we already tried to start the instance, which should have
260
            # activated its drives (if they can be at all)
261
            logging.debug("Skipping disk activation for instance %s, as"
262
                          " it was already started", instance.name)
263
            continue
264
          try:
265
            logging.info("Activating disks for instance %s", instance.name)
266
            instance.ActivateDisks()
267
          except Exception: # pylint: disable-msg=W0703
268
            logging.exception("Error while activating disks for instance %s",
269
                              instance.name)
270

    
271
      # Keep changed boot IDs
272
      for name in check_nodes:
273
        notepad.SetNodeBootID(name, self.bootids[name][0])
274

    
275
  def CheckInstances(self, notepad):
276
    """Make a pass over the list of instances, restarting downed ones.
277

278
    """
279
    notepad.MaintainInstanceList(self.instances.keys())
280

    
281
    for instance in self.instances.values():
282
      if instance.status in BAD_STATES:
283
        n = notepad.NumberOfRestartAttempts(instance.name)
284

    
285
        if n > MAXTRIES:
286
          logging.warning("Not restarting instance %s, retries exhausted",
287
                          instance.name)
288
          continue
289
        elif n < MAXTRIES:
290
          last = " (Attempt #%d)" % (n + 1)
291
        else:
292
          notepad.RecordRestartAttempt(instance.name)
293
          logging.error("Could not restart %s after %d attempts, giving up",
294
                        instance.name, MAXTRIES)
295
          continue
296
        try:
297
          logging.info("Restarting %s%s", instance.name, last)
298
          instance.Restart()
299
          self.started_instances.add(instance.name)
300
        except Exception: # pylint: disable-msg=W0703
301
          logging.exception("Error while restarting instance %s",
302
                            instance.name)
303

    
304
        notepad.RecordRestartAttempt(instance.name)
305
      elif instance.status in HELPLESS_STATES:
306
        if notepad.NumberOfRestartAttempts(instance.name):
307
          notepad.RemoveInstance(instance.name)
308
      else:
309
        if notepad.NumberOfRestartAttempts(instance.name):
310
          notepad.RemoveInstance(instance.name)
311
          logging.info("Restart of %s succeeded", instance.name)
312

    
313
  def _CheckForOfflineNodes(self, instance):
314
    """Checks if given instances has any secondary in offline status.
315

316
    @param instance: The instance object
317
    @return: True if any of the secondary is offline, False otherwise
318

319
    """
320
    bootids = []
321
    for node in instance.snodes:
322
      bootids.append(self.bootids[node])
323

    
324
    return compat.any(offline for (_, offline) in bootids)
325

    
326
  def VerifyDisks(self):
327
    """Run gnt-cluster verify-disks.
328

329
    """
330
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
331
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
332
    client.ArchiveJob(job_id)
333

    
334
    # Keep track of submitted jobs
335
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
336

    
337
    archive_jobs = set()
338
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
339
      jex.AddJobId(None, status, job_id)
340
      if status:
341
        archive_jobs.add(job_id)
342

    
343
    offline_disk_instances = set()
344

    
345
    for (status, result) in jex.GetResults():
346
      if not status:
347
        logging.error("Verify-disks job failed: %s", result)
348
        continue
349

    
350
      ((_, instances, _), ) = result
351

    
352
      offline_disk_instances.update(instances)
353

    
354
    for job_id in archive_jobs:
355
      client.ArchiveJob(job_id)
356

    
357
    if not offline_disk_instances:
358
      # nothing to do
359
      logging.debug("verify-disks reported no offline disks, nothing to do")
360
      return
361

    
362
    logging.debug("Will activate disks for instance(s) %s",
363
                  utils.CommaJoin(offline_disk_instances))
364

    
365
    # we submit only one job, and wait for it. not optimal, but spams
366
    # less the job queue
367
    job = []
368
    for name in offline_disk_instances:
369
      instance = self.instances[name]
370
      if (instance.status in HELPLESS_STATES or
371
          self._CheckForOfflineNodes(instance)):
372
        logging.info("Skip instance %s because it is in helpless state or has"
373
                     " one offline secondary", name)
374
        continue
375
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
376

    
377
    if job:
378
      job_id = cli.SendJob(job, cl=client)
379

    
380
      try:
381
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
382
      except Exception: # pylint: disable-msg=W0703
383
        logging.exception("Error while activating disks")
384

    
385

    
386
def IsRapiResponding(hostname):
387
  """Connects to RAPI port and does a simple test.
388

389
  Connects to RAPI port of hostname and does a simple test. At this time, the
390
  test is GetVersion.
391

392
  @type hostname: string
393
  @param hostname: hostname of the node to connect to.
394
  @rtype: bool
395
  @return: Whether RAPI is working properly
396

397
  """
398
  curl_config = rapi.client.GenericCurlConfig()
399
  rapi_client = rapi.client.GanetiRapiClient(hostname,
400
                                             curl_config_fn=curl_config)
401
  try:
402
    master_version = rapi_client.GetVersion()
403
  except rapi.client.CertificateError, err:
404
    logging.warning("RAPI Error: CertificateError (%s)", err)
405
    return False
406
  except rapi.client.GanetiApiError, err:
407
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
408
    return False
409
  logging.debug("RAPI Result: master_version is %s", master_version)
410
  return master_version == constants.RAPI_VERSION
411

    
412

    
413
def ParseOptions():
414
  """Parse the command line options.
415

416
  @return: (options, args) as from OptionParser.parse_args()
417

418
  """
419
  parser = OptionParser(description="Ganeti cluster watcher",
420
                        usage="%prog [-d]",
421
                        version="%%prog (ganeti) %s" %
422
                        constants.RELEASE_VERSION)
423

    
424
  parser.add_option(cli.DEBUG_OPT)
425
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
426
                    help="Autoarchive jobs older than this age (default"
427
                          " 6 hours)")
428
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
429
                    action="store_true", help="Ignore cluster pause setting")
430
  options, args = parser.parse_args()
431
  options.job_age = cli.ParseTimespec(options.job_age)
432

    
433
  if args:
434
    parser.error("No arguments expected")
435

    
436
  return (options, args)
437

    
438

    
439
@rapi.client.UsesRapiClient
440
def Main():
441
  """Main function.
442

443
  """
444
  global client # pylint: disable-msg=W0603
445

    
446
  (options, _) = ParseOptions()
447

    
448
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
449
                     debug=options.debug, stderr_logging=options.debug)
450

    
451
  if ShouldPause() and not options.ignore_pause:
452
    logging.debug("Pause has been set, exiting")
453
    return constants.EXIT_SUCCESS
454

    
455
  statefile = \
456
    state.OpenStateFile(constants.WATCHER_STATEFILE)
457
  if not statefile:
458
    return constants.EXIT_FAILURE
459

    
460
  update_file = False
461
  try:
462
    StartNodeDaemons()
463
    RunWatcherHooks()
464
    # run node maintenance in all cases, even if master, so that old
465
    # masters can be properly cleaned up too
466
    if nodemaint.NodeMaintenance.ShouldRun():
467
      nodemaint.NodeMaintenance().Exec()
468

    
469
    notepad = state.WatcherState(statefile)
470
    try:
471
      try:
472
        client = cli.GetClient()
473
      except errors.OpPrereqError:
474
        # this is, from cli.GetClient, a not-master case
475
        logging.debug("Not on master, exiting")
476
        update_file = True
477
        return constants.EXIT_SUCCESS
478
      except luxi.NoMasterError, err:
479
        logging.warning("Master seems to be down (%s), trying to restart",
480
                        str(err))
481
        if not utils.EnsureDaemon(constants.MASTERD):
482
          logging.critical("Can't start the master, exiting")
483
          return constants.EXIT_FAILURE
484
        # else retry the connection
485
        client = cli.GetClient()
486

    
487
      # we are on master now
488
      utils.EnsureDaemon(constants.RAPI)
489

    
490
      # If RAPI isn't responding to queries, try one restart.
491
      logging.debug("Attempting to talk with RAPI.")
492
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
493
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
494
                        " Restarting Ganeti RAPI.")
495
        utils.StopDaemon(constants.RAPI)
496
        utils.EnsureDaemon(constants.RAPI)
497
        logging.debug("Second attempt to talk with RAPI")
498
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
499
          logging.fatal("RAPI is not responding. Please investigate.")
500
      logging.debug("Successfully talked to RAPI.")
501

    
502
      try:
503
        watcher = Watcher(options, notepad)
504
      except errors.ConfigurationError:
505
        # Just exit if there's no configuration
506
        update_file = True
507
        return constants.EXIT_SUCCESS
508

    
509
      watcher.Run()
510
      update_file = True
511

    
512
    finally:
513
      if update_file:
514
        notepad.Save(constants.WATCHER_STATEFILE)
515
      else:
516
        logging.debug("Not updating status file due to failure")
517
  except SystemExit:
518
    raise
519
  except NotMasterError:
520
    logging.debug("Not master, exiting")
521
    return constants.EXIT_NOTMASTER
522
  except errors.ResolverError, err:
523
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
524
    return constants.EXIT_NODESETUP_ERROR
525
  except errors.JobQueueFull:
526
    logging.error("Job queue is full, can't query cluster state")
527
  except errors.JobQueueDrainError:
528
    logging.error("Job queue is drained, can't maintain cluster state")
529
  except Exception, err:
530
    logging.exception(str(err))
531
    return constants.EXIT_FAILURE
532

    
533
  return constants.EXIT_SUCCESS