Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ adf6301e

History | View | Annotate | Download (16.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import compat
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44
from ganeti import rapi
45
from ganeti import netutils
46

    
47
import ganeti.rapi.client # pylint: disable-msg=W0611
48

    
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
51

    
52

    
53
MAXTRIES = 5
54
BAD_STATES = [constants.INSTST_ERRORDOWN]
55
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
56
NOTICE = 'NOTICE'
57
ERROR = 'ERROR'
58

    
59

    
60
# Global LUXI client object
61
client = None
62

    
63

    
64
class NotMasterError(errors.GenericError):
65
  """Exception raised when this host is not the master."""
66

    
67

    
68
def ShouldPause():
69
  """Check whether we should pause.
70

71
  """
72
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
73

    
74

    
75
def StartNodeDaemons():
76
  """Start all the daemons that should be running on all nodes.
77

78
  """
79
  # on master or not, try to start the node daemon
80
  utils.EnsureDaemon(constants.NODED)
81
  # start confd as well. On non candidates it will be in disabled mode.
82
  utils.EnsureDaemon(constants.CONFD)
83

    
84

    
85
def RunWatcherHooks():
86
  """Run the watcher hooks.
87

88
  """
89
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
90
                             constants.HOOKS_NAME_WATCHER)
91
  if not os.path.isdir(hooks_dir):
92
    return
93

    
94
  try:
95
    results = utils.RunParts(hooks_dir)
96
  except Exception: # pylint: disable-msg=W0703
97
    logging.exception("RunParts %s failed: %s", hooks_dir)
98
    return
99

    
100
  for (relname, status, runresult) in results:
101
    if status == constants.RUNPARTS_SKIP:
102
      logging.debug("Watcher hook %s: skipped", relname)
103
    elif status == constants.RUNPARTS_ERR:
104
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
105
    elif status == constants.RUNPARTS_RUN:
106
      if runresult.failed:
107
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
108
                        relname, runresult.exit_code, runresult.output)
109
      else:
110
        logging.debug("Watcher hook %s: success (output: %s)", relname,
111
                      runresult.output)
112

    
113

    
114
class Instance(object):
115
  """Abstraction for a Virtual Machine instance.
116

117
  """
118
  def __init__(self, name, status, autostart, snodes):
119
    self.name = name
120
    self.status = status
121
    self.autostart = autostart
122
    self.snodes = snodes
123

    
124
  def Restart(self):
125
    """Encapsulates the start of an instance.
126

127
    """
128
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
129
    cli.SubmitOpCode(op, cl=client)
130

    
131
  def ActivateDisks(self):
132
    """Encapsulates the activation of all disks of an instance.
133

134
    """
135
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
136
    cli.SubmitOpCode(op, cl=client)
137

    
138

    
139
def GetClusterData():
140
  """Get a list of instances on this cluster.
141

142
  """
143
  op1_fields = ["name", "status", "admin_state", "snodes"]
144
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
145
                                use_locking=True)
146
  op2_fields = ["name", "bootid", "offline"]
147
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
148
                            use_locking=True)
149

    
150
  job_id = client.SubmitJob([op1, op2])
151

    
152
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
153

    
154
  logging.debug("Got data from cluster, writing instance status file")
155

    
156
  result = all_results[0]
157
  smap = {}
158

    
159
  instances = {}
160

    
161
  # write the instance status file
162
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
163
  utils.WriteFile(file_name=constants.INSTANCE_STATUS_FILE, data=up_data)
164

    
165
  for fields in result:
166
    (name, status, autostart, snodes) = fields
167

    
168
    # update the secondary node map
169
    for node in snodes:
170
      if node not in smap:
171
        smap[node] = []
172
      smap[node].append(name)
173

    
174
    instances[name] = Instance(name, status, autostart, snodes)
175

    
176
  nodes =  dict([(name, (bootid, offline))
177
                 for name, bootid, offline in all_results[1]])
178

    
179
  client.ArchiveJob(job_id)
180

    
181
  return instances, nodes, smap
182

    
183

    
184
class Watcher(object):
185
  """Encapsulate the logic for restarting erroneously halted virtual machines.
186

187
  The calling program should periodically instantiate me and call Run().
188
  This will traverse the list of instances, and make up to MAXTRIES attempts
189
  to restart machines that are down.
190

191
  """
192
  def __init__(self, opts, notepad):
193
    self.notepad = notepad
194
    master = client.QueryConfigValues(["master_node"])[0]
195
    if master != netutils.Hostname.GetSysName():
196
      raise NotMasterError("This is not the master node")
197
    # first archive old jobs
198
    self.ArchiveJobs(opts.job_age)
199
    # and only then submit new ones
200
    self.instances, self.bootids, self.smap = GetClusterData()
201
    self.started_instances = set()
202
    self.opts = opts
203

    
204
  def Run(self):
205
    """Watcher run sequence.
206

207
    """
208
    notepad = self.notepad
209
    self.CheckInstances(notepad)
210
    self.CheckDisks(notepad)
211
    self.VerifyDisks()
212

    
213
  @staticmethod
214
  def ArchiveJobs(age):
215
    """Archive old jobs.
216

217
    """
218
    arch_count, left_count = client.AutoArchiveJobs(age)
219
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
220

    
221
  def CheckDisks(self, notepad):
222
    """Check all nodes for restarted ones.
223

224
    """
225
    check_nodes = []
226
    for name, (new_id, offline) in self.bootids.iteritems():
227
      old = notepad.GetNodeBootID(name)
228
      if new_id is None:
229
        # Bad node, not returning a boot id
230
        if not offline:
231
          logging.debug("Node %s missing boot id, skipping secondary checks",
232
                        name)
233
        continue
234
      if old != new_id:
235
        # Node's boot ID has changed, proably through a reboot.
236
        check_nodes.append(name)
237

    
238
    if check_nodes:
239
      # Activate disks for all instances with any of the checked nodes as a
240
      # secondary node.
241
      for node in check_nodes:
242
        if node not in self.smap:
243
          continue
244
        for instance_name in self.smap[node]:
245
          instance = self.instances[instance_name]
246
          if not instance.autostart:
247
            logging.info(("Skipping disk activation for non-autostart"
248
                          " instance %s"), instance.name)
249
            continue
250
          if instance.name in self.started_instances:
251
            # we already tried to start the instance, which should have
252
            # activated its drives (if they can be at all)
253
            logging.debug("Skipping disk activation for instance %s, as"
254
                          " it was already started", instance.name)
255
            continue
256
          try:
257
            logging.info("Activating disks for instance %s", instance.name)
258
            instance.ActivateDisks()
259
          except Exception: # pylint: disable-msg=W0703
260
            logging.exception("Error while activating disks for instance %s",
261
                              instance.name)
262

    
263
      # Keep changed boot IDs
264
      for name in check_nodes:
265
        notepad.SetNodeBootID(name, self.bootids[name][0])
266

    
267
  def CheckInstances(self, notepad):
268
    """Make a pass over the list of instances, restarting downed ones.
269

270
    """
271
    notepad.MaintainInstanceList(self.instances.keys())
272

    
273
    for instance in self.instances.values():
274
      if instance.status in BAD_STATES:
275
        n = notepad.NumberOfRestartAttempts(instance)
276

    
277
        if n > MAXTRIES:
278
          logging.warning("Not restarting instance %s, retries exhausted",
279
                          instance.name)
280
          continue
281
        elif n < MAXTRIES:
282
          last = " (Attempt #%d)" % (n + 1)
283
        else:
284
          notepad.RecordRestartAttempt(instance)
285
          logging.error("Could not restart %s after %d attempts, giving up",
286
                        instance.name, MAXTRIES)
287
          continue
288
        try:
289
          logging.info("Restarting %s%s", instance.name, last)
290
          instance.Restart()
291
          self.started_instances.add(instance.name)
292
        except Exception: # pylint: disable-msg=W0703
293
          logging.exception("Error while restarting instance %s",
294
                            instance.name)
295

    
296
        notepad.RecordRestartAttempt(instance)
297
      elif instance.status in HELPLESS_STATES:
298
        if notepad.NumberOfRestartAttempts(instance):
299
          notepad.RemoveInstance(instance)
300
      else:
301
        if notepad.NumberOfRestartAttempts(instance):
302
          notepad.RemoveInstance(instance)
303
          logging.info("Restart of %s succeeded", instance.name)
304

    
305
  def _CheckForOfflineNodes(self, instance):
306
    """Checks if given instances has any secondary in offline status.
307

308
    @param instance: The instance object
309
    @return: True if any of the secondary is offline, False otherwise
310

311
    """
312
    bootids = []
313
    for node in instance.snodes:
314
      bootids.append(self.bootids[node])
315

    
316
    return compat.any(offline for (_, offline) in bootids)
317

    
318
  def VerifyDisks(self):
319
    """Run gnt-cluster verify-disks.
320

321
    """
322
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
323
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
324
    client.ArchiveJob(job_id)
325

    
326
    # Keep track of submitted jobs
327
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
328

    
329
    archive_jobs = set()
330
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
331
      jex.AddJobId(None, status, job_id)
332
      if status:
333
        archive_jobs.add(job_id)
334

    
335
    offline_disk_instances = set()
336

    
337
    for (status, result) in jex.GetResults():
338
      if not status:
339
        logging.error("Verify-disks job failed: %s", result)
340
        continue
341

    
342
      ((_, instances, _), ) = result
343

    
344
      offline_disk_instances.update(instances)
345

    
346
    for job_id in archive_jobs:
347
      client.ArchiveJob(job_id)
348

    
349
    if not offline_disk_instances:
350
      # nothing to do
351
      logging.debug("verify-disks reported no offline disks, nothing to do")
352
      return
353

    
354
    logging.debug("Will activate disks for instance(s) %s",
355
                  utils.CommaJoin(offline_disk_instances))
356

    
357
    # we submit only one job, and wait for it. not optimal, but spams
358
    # less the job queue
359
    job = []
360
    for name in offline_disk_instances:
361
      instance = self.instances[name]
362
      if (instance.status in HELPLESS_STATES or
363
          self._CheckForOfflineNodes(instance)):
364
        logging.info("Skip instance %s because it is in helpless state or has"
365
                     " one offline secondary", name)
366
        continue
367
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
368

    
369
    if job:
370
      job_id = cli.SendJob(job, cl=client)
371

    
372
      try:
373
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
374
      except Exception: # pylint: disable-msg=W0703
375
        logging.exception("Error while activating disks")
376

    
377

    
378
def IsRapiResponding(hostname):
379
  """Connects to RAPI port and does a simple test.
380

381
  Connects to RAPI port of hostname and does a simple test. At this time, the
382
  test is GetVersion.
383

384
  @type hostname: string
385
  @param hostname: hostname of the node to connect to.
386
  @rtype: bool
387
  @return: Whether RAPI is working properly
388

389
  """
390
  curl_config = rapi.client.GenericCurlConfig()
391
  rapi_client = rapi.client.GanetiRapiClient(hostname,
392
                                             curl_config_fn=curl_config)
393
  try:
394
    master_version = rapi_client.GetVersion()
395
  except rapi.client.CertificateError, err:
396
    logging.warning("RAPI Error: CertificateError (%s)", err)
397
    return False
398
  except rapi.client.GanetiApiError, err:
399
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
400
    return False
401
  logging.debug("RAPI Result: master_version is %s", master_version)
402
  return master_version == constants.RAPI_VERSION
403

    
404

    
405
def ParseOptions():
406
  """Parse the command line options.
407

408
  @return: (options, args) as from OptionParser.parse_args()
409

410
  """
411
  parser = OptionParser(description="Ganeti cluster watcher",
412
                        usage="%prog [-d]",
413
                        version="%%prog (ganeti) %s" %
414
                        constants.RELEASE_VERSION)
415

    
416
  parser.add_option(cli.DEBUG_OPT)
417
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
418
                    help="Autoarchive jobs older than this age (default"
419
                          " 6 hours)")
420
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
421
                    action="store_true", help="Ignore cluster pause setting")
422
  options, args = parser.parse_args()
423
  options.job_age = cli.ParseTimespec(options.job_age)
424

    
425
  if args:
426
    parser.error("No arguments expected")
427

    
428
  return (options, args)
429

    
430

    
431
@rapi.client.UsesRapiClient
432
def Main():
433
  """Main function.
434

435
  """
436
  global client # pylint: disable-msg=W0603
437

    
438
  (options, _) = ParseOptions()
439

    
440
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
441
                     debug=options.debug, stderr_logging=options.debug)
442

    
443
  if ShouldPause() and not options.ignore_pause:
444
    logging.debug("Pause has been set, exiting")
445
    return constants.EXIT_SUCCESS
446

    
447
  statefile = \
448
    state.OpenStateFile(constants.WATCHER_STATEFILE)
449
  if not statefile:
450
    return constants.EXIT_FAILURE
451

    
452
  update_file = False
453
  try:
454
    StartNodeDaemons()
455
    RunWatcherHooks()
456
    # run node maintenance in all cases, even if master, so that old
457
    # masters can be properly cleaned up too
458
    if nodemaint.NodeMaintenance.ShouldRun():
459
      nodemaint.NodeMaintenance().Exec()
460

    
461
    notepad = state.WatcherState(statefile)
462
    try:
463
      try:
464
        client = cli.GetClient()
465
      except errors.OpPrereqError:
466
        # this is, from cli.GetClient, a not-master case
467
        logging.debug("Not on master, exiting")
468
        update_file = True
469
        return constants.EXIT_SUCCESS
470
      except luxi.NoMasterError, err:
471
        logging.warning("Master seems to be down (%s), trying to restart",
472
                        str(err))
473
        if not utils.EnsureDaemon(constants.MASTERD):
474
          logging.critical("Can't start the master, exiting")
475
          return constants.EXIT_FAILURE
476
        # else retry the connection
477
        client = cli.GetClient()
478

    
479
      # we are on master now
480
      utils.EnsureDaemon(constants.RAPI)
481

    
482
      # If RAPI isn't responding to queries, try one restart.
483
      logging.debug("Attempting to talk with RAPI.")
484
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
485
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
486
                        " Restarting Ganeti RAPI.")
487
        utils.StopDaemon(constants.RAPI)
488
        utils.EnsureDaemon(constants.RAPI)
489
        logging.debug("Second attempt to talk with RAPI")
490
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
491
          logging.fatal("RAPI is not responding. Please investigate.")
492
      logging.debug("Successfully talked to RAPI.")
493

    
494
      try:
495
        watcher = Watcher(options, notepad)
496
      except errors.ConfigurationError:
497
        # Just exit if there's no configuration
498
        update_file = True
499
        return constants.EXIT_SUCCESS
500

    
501
      watcher.Run()
502
      update_file = True
503

    
504
    finally:
505
      if update_file:
506
        notepad.Save()
507
      else:
508
        logging.debug("Not updating status file due to failure")
509
  except SystemExit:
510
    raise
511
  except NotMasterError:
512
    logging.debug("Not master, exiting")
513
    return constants.EXIT_NOTMASTER
514
  except errors.ResolverError, err:
515
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
516
    return constants.EXIT_NODESETUP_ERROR
517
  except errors.JobQueueFull:
518
    logging.error("Job queue is full, can't query cluster state")
519
  except errors.JobQueueDrainError:
520
    logging.error("Job queue is drained, can't maintain cluster state")
521
  except Exception, err:
522
    logging.exception(str(err))
523
    return constants.EXIT_FAILURE
524

    
525
  return constants.EXIT_SUCCESS