Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 16e0b9c9

History | View | Annotate | Download (20.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
from optparse import OptionParser
37

    
38
from ganeti import utils
39
from ganeti import constants
40
from ganeti import compat
41
from ganeti import errors
42
from ganeti import opcodes
43
from ganeti import cli
44
from ganeti import luxi
45
from ganeti import rapi
46
from ganeti import netutils
47
from ganeti import qlang
48
from ganeti import objects
49
from ganeti import ssconf
50
from ganeti import ht
51

    
52
import ganeti.rapi.client # pylint: disable-msg=W0611
53

    
54
from ganeti.watcher import nodemaint
55
from ganeti.watcher import state
56

    
57

    
58
MAXTRIES = 5
59
BAD_STATES = frozenset([
60
  constants.INSTST_ERRORDOWN,
61
  ])
62
HELPLESS_STATES = frozenset([
63
  constants.INSTST_NODEDOWN,
64
  constants.INSTST_NODEOFFLINE,
65
  ])
66
NOTICE = "NOTICE"
67
ERROR = "ERROR"
68

    
69
#: Number of seconds to wait between starting child processes for node groups
70
CHILD_PROCESS_DELAY = 1.0
71

    
72

    
73
class NotMasterError(errors.GenericError):
74
  """Exception raised when this host is not the master."""
75

    
76

    
77
def ShouldPause():
78
  """Check whether we should pause.
79

80
  """
81
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
82

    
83

    
84
def StartNodeDaemons():
85
  """Start all the daemons that should be running on all nodes.
86

87
  """
88
  # on master or not, try to start the node daemon
89
  utils.EnsureDaemon(constants.NODED)
90
  # start confd as well. On non candidates it will be in disabled mode.
91
  utils.EnsureDaemon(constants.CONFD)
92

    
93

    
94
def RunWatcherHooks():
95
  """Run the watcher hooks.
96

97
  """
98
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
99
                             constants.HOOKS_NAME_WATCHER)
100
  if not os.path.isdir(hooks_dir):
101
    return
102

    
103
  try:
104
    results = utils.RunParts(hooks_dir)
105
  except Exception: # pylint: disable-msg=W0703
106
    logging.exception("RunParts %s failed: %s", hooks_dir)
107
    return
108

    
109
  for (relname, status, runresult) in results:
110
    if status == constants.RUNPARTS_SKIP:
111
      logging.debug("Watcher hook %s: skipped", relname)
112
    elif status == constants.RUNPARTS_ERR:
113
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
114
    elif status == constants.RUNPARTS_RUN:
115
      if runresult.failed:
116
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
117
                        relname, runresult.exit_code, runresult.output)
118
      else:
119
        logging.debug("Watcher hook %s: success (output: %s)", relname,
120
                      runresult.output)
121
    else:
122
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
123
                                   status)
124

    
125

    
126
class Instance(object):
127
  """Abstraction for a Virtual Machine instance.
128

129
  """
130
  def __init__(self, name, status, autostart, snodes):
131
    self.name = name
132
    self.status = status
133
    self.autostart = autostart
134
    self.snodes = snodes
135

    
136
  def Restart(self, cl):
137
    """Encapsulates the start of an instance.
138

139
    """
140
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
141
    cli.SubmitOpCode(op, cl=cl)
142

    
143
  def ActivateDisks(self, cl):
144
    """Encapsulates the activation of all disks of an instance.
145

146
    """
147
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
148
    cli.SubmitOpCode(op, cl=cl)
149

    
150

    
151
class Node:
152
  """Data container representing cluster node.
153

154
  """
155
  def __init__(self, name, bootid, offline, secondaries):
156
    """Initializes this class.
157

158
    """
159
    self.name = name
160
    self.bootid = bootid
161
    self.offline = offline
162
    self.secondaries = secondaries
163

    
164

    
165
def _CheckInstances(cl, notepad, instances):
166
  """Make a pass over the list of instances, restarting downed ones.
167

168
  """
169
  notepad.MaintainInstanceList(instances.keys())
170

    
171
  started = set()
172

    
173
  for inst in instances.values():
174
    if inst.status in BAD_STATES:
175
      n = notepad.NumberOfRestartAttempts(inst.name)
176

    
177
      if n > MAXTRIES:
178
        logging.warning("Not restarting instance '%s', retries exhausted",
179
                        inst.name)
180
        continue
181

    
182
      if n == MAXTRIES:
183
        notepad.RecordRestartAttempt(inst.name)
184
        logging.error("Could not restart instance '%s' after %s attempts,"
185
                      " giving up", inst.name, MAXTRIES)
186
        continue
187

    
188
      try:
189
        logging.info("Restarting instance '%s' (attempt #%s)",
190
                     inst.name, n + 1)
191
        inst.Restart(cl)
192
      except Exception: # pylint: disable-msg=W0703
193
        logging.exception("Error while restarting instance '%s'", inst.name)
194
      else:
195
        started.add(inst.name)
196

    
197
      notepad.RecordRestartAttempt(inst.name)
198

    
199
    else:
200
      if notepad.NumberOfRestartAttempts(inst.name):
201
        notepad.RemoveInstance(inst.name)
202
        if inst.status not in HELPLESS_STATES:
203
          logging.info("Restart of instance '%s' succeeded", inst.name)
204

    
205
  return started
206

    
207

    
208
def _CheckDisks(cl, notepad, nodes, instances, started):
209
  """Check all nodes for restarted ones.
210

211
  """
212
  check_nodes = []
213

    
214
  for node in nodes.values():
215
    old = notepad.GetNodeBootID(node.name)
216
    if not node.bootid:
217
      # Bad node, not returning a boot id
218
      if not node.offline:
219
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
220
                      node.name)
221
      continue
222

    
223
    if old != node.bootid:
224
      # Node's boot ID has changed, probably through a reboot
225
      check_nodes.append(node)
226

    
227
  if check_nodes:
228
    # Activate disks for all instances with any of the checked nodes as a
229
    # secondary node.
230
    for node in check_nodes:
231
      for instance_name in node.secondaries:
232
        try:
233
          inst = instances[instance_name]
234
        except KeyError:
235
          logging.info("Can't find instance '%s', maybe it was ignored",
236
                       instance_name)
237
          continue
238

    
239
        if not inst.autostart:
240
          logging.info("Skipping disk activation for non-autostart"
241
                       " instance '%s'", inst.name)
242
          continue
243

    
244
        if inst.name in started:
245
          # we already tried to start the instance, which should have
246
          # activated its drives (if they can be at all)
247
          logging.debug("Skipping disk activation for instance '%s' as"
248
                        " it was already started", inst.name)
249
          continue
250

    
251
        try:
252
          logging.info("Activating disks for instance '%s'", inst.name)
253
          inst.ActivateDisks(cl)
254
        except Exception: # pylint: disable-msg=W0703
255
          logging.exception("Error while activating disks for instance '%s'",
256
                            inst.name)
257

    
258
    # Keep changed boot IDs
259
    for node in check_nodes:
260
      notepad.SetNodeBootID(node.name, node.bootid)
261

    
262

    
263
def _CheckForOfflineNodes(nodes, instance):
264
  """Checks if given instances has any secondary in offline status.
265

266
  @param instance: The instance object
267
  @return: True if any of the secondary is offline, False otherwise
268

269
  """
270
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
271

    
272

    
273
def _VerifyDisks(cl, uuid, nodes, instances):
274
  """Run a per-group "gnt-cluster verify-disks".
275

276
  """
277
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
278
  ((_, offline_disk_instances, _), ) = \
279
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
280
  cl.ArchiveJob(job_id)
281

    
282
  if not offline_disk_instances:
283
    # nothing to do
284
    logging.debug("Verify-disks reported no offline disks, nothing to do")
285
    return
286

    
287
  logging.debug("Will activate disks for instance(s) %s",
288
                utils.CommaJoin(offline_disk_instances))
289

    
290
  # We submit only one job, and wait for it. Not optimal, but this puts less
291
  # load on the job queue.
292
  job = []
293
  for name in offline_disk_instances:
294
    try:
295
      inst = instances[name]
296
    except KeyError:
297
      logging.info("Can't find instance '%s', maybe it was ignored", name)
298
      continue
299

    
300
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
301
      logging.info("Skipping instance '%s' because it is in a helpless state or"
302
                   " has offline secondaries", name)
303
      continue
304

    
305
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
306

    
307
  if job:
308
    job_id = cli.SendJob(job, cl=cl)
309

    
310
    try:
311
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
312
    except Exception: # pylint: disable-msg=W0703
313
      logging.exception("Error while activating disks")
314

    
315

    
316
def IsRapiResponding(hostname):
317
  """Connects to RAPI port and does a simple test.
318

319
  Connects to RAPI port of hostname and does a simple test. At this time, the
320
  test is GetVersion.
321

322
  @type hostname: string
323
  @param hostname: hostname of the node to connect to.
324
  @rtype: bool
325
  @return: Whether RAPI is working properly
326

327
  """
328
  curl_config = rapi.client.GenericCurlConfig()
329
  rapi_client = rapi.client.GanetiRapiClient(hostname,
330
                                             curl_config_fn=curl_config)
331
  try:
332
    master_version = rapi_client.GetVersion()
333
  except rapi.client.CertificateError, err:
334
    logging.warning("RAPI certificate error: %s", err)
335
    return False
336
  except rapi.client.GanetiApiError, err:
337
    logging.warning("RAPI error: %s", err)
338
    return False
339
  else:
340
    logging.debug("Reported RAPI version %s", master_version)
341
    return master_version == constants.RAPI_VERSION
342

    
343

    
344
def ParseOptions():
345
  """Parse the command line options.
346

347
  @return: (options, args) as from OptionParser.parse_args()
348

349
  """
350
  parser = OptionParser(description="Ganeti cluster watcher",
351
                        usage="%prog [-d]",
352
                        version="%%prog (ganeti) %s" %
353
                        constants.RELEASE_VERSION)
354

    
355
  parser.add_option(cli.DEBUG_OPT)
356
  parser.add_option(cli.NODEGROUP_OPT)
357
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
358
                    help="Autoarchive jobs older than this age (default"
359
                          " 6 hours)")
360
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
361
                    action="store_true", help="Ignore cluster pause setting")
362
  parser.add_option("--wait-children", dest="wait_children", default=False,
363
                    action="store_true", help="Wait for child processes")
364
  options, args = parser.parse_args()
365
  options.job_age = cli.ParseTimespec(options.job_age)
366

    
367
  if args:
368
    parser.error("No arguments expected")
369

    
370
  return (options, args)
371

    
372

    
373
def _UpdateInstanceStatus(cl, filename):
374
  """Get a list of instances on this cluster.
375

376
  @todo: Think about doing this per nodegroup, too
377

378
  """
379
  op = opcodes.OpInstanceQuery(output_fields=["name", "status"], names=[],
380
                               use_locking=True)
381
  job_id = cl.SubmitJob([op])
382
  (result, ) = cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
383

    
384
  cl.ArchiveJob(job_id)
385

    
386
  logging.debug("Got instance data, writing status file %s", filename)
387

    
388
  utils.WriteFile(filename, data="".join("%s %s\n" % (name, status)
389
                                         for (name, status) in result))
390

    
391

    
392
def GetLuxiClient(try_restart):
393
  """Tries to connect to the master daemon.
394

395
  @type try_restart: bool
396
  @param try_restart: Whether to attempt to restart the master daemon
397

398
  """
399
  try:
400
    return cli.GetClient()
401
  except errors.OpPrereqError, err:
402
    # this is, from cli.GetClient, a not-master case
403
    raise NotMasterError("Not on master node (%s)" % err)
404

    
405
  except luxi.NoMasterError, err:
406
    if not try_restart:
407
      raise
408

    
409
    logging.warning("Master daemon seems to be down (%s), trying to restart",
410
                    err)
411

    
412
    if not utils.EnsureDaemon(constants.MASTERD):
413
      raise errors.GenericError("Can't start the master daemon")
414

    
415
    # Retry the connection
416
    return cli.GetClient()
417

    
418

    
419
def _StartGroupChildren(cl, wait):
420
  """Starts a new instance of the watcher for every node group.
421

422
  """
423
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
424
                        for arg in sys.argv)
425

    
426
  result = cl.QueryGroups([], ["name", "uuid"], False)
427

    
428
  children = []
429

    
430
  for (idx, (name, uuid)) in enumerate(result):
431
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
432

    
433
    if idx > 0:
434
      # Let's not kill the system
435
      time.sleep(CHILD_PROCESS_DELAY)
436

    
437
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
438
                  name, uuid, args)
439

    
440
    try:
441
      # TODO: Should utils.StartDaemon be used instead?
442
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
443
    except Exception: # pylint: disable-msg=W0703
444
      logging.exception("Failed to start child for group '%s' (%s)",
445
                        name, uuid)
446
    else:
447
      logging.debug("Started with PID %s", pid)
448
      children.append(pid)
449

    
450
  if wait:
451
    for pid in children:
452
      logging.debug("Waiting for child PID %s", pid)
453
      try:
454
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
455
      except EnvironmentError, err:
456
        result = str(err)
457

    
458
      logging.debug("Child PID %s exited with status %s", pid, result)
459

    
460

    
461
def _ArchiveJobs(cl, age):
462
  """Archives old jobs.
463

464
  """
465
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
466
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
467

    
468

    
469
def _CheckMaster(cl):
470
  """Ensures current host is master node.
471

472
  """
473
  (master, ) = cl.QueryConfigValues(["master_node"])
474
  if master != netutils.Hostname.GetSysName():
475
    raise NotMasterError("This is not the master node")
476

    
477

    
478
@rapi.client.UsesRapiClient
479
def _GlobalWatcher(opts):
480
  """Main function for global watcher.
481

482
  At the end child processes are spawned for every node group.
483

484
  """
485
  StartNodeDaemons()
486
  RunWatcherHooks()
487

    
488
  # Run node maintenance in all cases, even if master, so that old masters can
489
  # be properly cleaned up
490
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
491
    nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
492

    
493
  try:
494
    client = GetLuxiClient(True)
495
  except NotMasterError:
496
    # Don't proceed on non-master nodes
497
    return constants.EXIT_SUCCESS
498

    
499
  # we are on master now
500
  utils.EnsureDaemon(constants.RAPI)
501

    
502
  # If RAPI isn't responding to queries, try one restart
503
  logging.debug("Attempting to talk to remote API on %s",
504
                constants.IP4_ADDRESS_LOCALHOST)
505
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
506
    logging.warning("Couldn't get answer from remote API, restaring daemon")
507
    utils.StopDaemon(constants.RAPI)
508
    utils.EnsureDaemon(constants.RAPI)
509
    logging.debug("Second attempt to talk to remote API")
510
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
511
      logging.fatal("RAPI is not responding")
512
  logging.debug("Successfully talked to remote API")
513

    
514
  _CheckMaster(client)
515
  _ArchiveJobs(client, opts.job_age)
516
  _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
517

    
518
  # Spawn child processes for all node groups
519
  _StartGroupChildren(client, opts.wait_children)
520

    
521
  return constants.EXIT_SUCCESS
522

    
523

    
524
def _GetGroupData(cl, uuid):
525
  """Retrieves instances and nodes per node group.
526

527
  """
528
  # TODO: Implement locking
529
  job = [
530
    # Get all primary instances in group
531
    opcodes.OpQuery(what=constants.QR_INSTANCE,
532
                    fields=["name", "status", "admin_state", "snodes",
533
                            "pnode.group.uuid", "snodes.group.uuid"],
534
                    filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
535

    
536
    # Get all nodes in group
537
    opcodes.OpQuery(what=constants.QR_NODE,
538
                    fields=["name", "bootid", "offline"],
539
                    filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
540
    ]
541

    
542
  job_id = cl.SubmitJob(job)
543
  results = map(objects.QueryResponse.FromDict,
544
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
545
  cl.ArchiveJob(job_id)
546

    
547
  results_data = map(operator.attrgetter("data"), results)
548

    
549
  # Ensure results are tuples with two values
550
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
551

    
552
  # Extract values ignoring result status
553
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
554
                                 for values in res]
555
                                for res in results_data]
556

    
557
  secondaries = {}
558
  instances = []
559

    
560
  # Load all instances
561
  for (name, status, autostart, snodes, pnode_group_uuid,
562
       snodes_group_uuid) in raw_instances:
563
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
564
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
565
                    " groups %s", name, pnode_group_uuid,
566
                    utils.CommaJoin(snodes_group_uuid))
567
    else:
568
      instances.append(Instance(name, status, autostart, snodes))
569

    
570
      for node in snodes:
571
        secondaries.setdefault(node, set()).add(name)
572

    
573
  # Load all nodes
574
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
575
           for (name, bootid, offline) in raw_nodes]
576

    
577
  return (dict((node.name, node) for node in nodes),
578
          dict((inst.name, inst) for inst in instances))
579

    
580

    
581
def _KnownGroup(uuid):
582
  """Checks if a group UUID is known by ssconf.
583

584
  """
585
  groups = ssconf.SimpleStore().GetNodegroupList()
586

    
587
  return compat.any(line.strip() and line.split()[0] == uuid
588
                    for line in groups)
589

    
590

    
591
def _GroupWatcher(opts):
592
  """Main function for per-group watcher process.
593

594
  """
595
  group_uuid = opts.nodegroup.lower()
596

    
597
  if not utils.UUID_RE.match(group_uuid):
598
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
599
                              " got '%s'" %
600
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
601

    
602
  logging.info("Watcher for node group '%s'", group_uuid)
603

    
604
  # Check if node group is known
605
  if not _KnownGroup(group_uuid):
606
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
607
                              group_uuid)
608

    
609
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
610

    
611
  logging.debug("Using state file %s", state_path)
612

    
613
  # Global watcher
614
  statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
615
  if not statefile:
616
    return constants.EXIT_FAILURE
617

    
618
  notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
619
  try:
620
    # Connect to master daemon
621
    client = GetLuxiClient(False)
622

    
623
    _CheckMaster(client)
624

    
625
    (nodes, instances) = _GetGroupData(client, group_uuid)
626

    
627
    started = _CheckInstances(client, notepad, instances)
628
    _CheckDisks(client, notepad, nodes, instances, started)
629
    _VerifyDisks(client, group_uuid, nodes, instances)
630
  except Exception, err:
631
    logging.info("Not updating status file due to failure: %s", err)
632
    raise
633
  else:
634
    # Save changes for next run
635
    notepad.Save(state_path)
636

    
637
  return constants.EXIT_SUCCESS
638

    
639

    
640
def Main():
641
  """Main function.
642

643
  """
644
  (options, _) = ParseOptions()
645

    
646
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
647
                     debug=options.debug, stderr_logging=options.debug)
648

    
649
  if ShouldPause() and not options.ignore_pause:
650
    logging.debug("Pause has been set, exiting")
651
    return constants.EXIT_SUCCESS
652

    
653
  # Try to acquire global watcher lock in shared mode
654
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
655
  try:
656
    lock.Shared(blocking=False)
657
  except (EnvironmentError, errors.LockError), err:
658
    logging.error("Can't acquire lock on %s: %s",
659
                  constants.WATCHER_LOCK_FILE, err)
660
    return constants.EXIT_SUCCESS
661

    
662
  if options.nodegroup is None:
663
    fn = _GlobalWatcher
664
  else:
665
    # Per-nodegroup watcher
666
    fn = _GroupWatcher
667

    
668
  try:
669
    return fn(options)
670
  except (SystemExit, KeyboardInterrupt):
671
    raise
672
  except NotMasterError:
673
    logging.debug("Not master, exiting")
674
    return constants.EXIT_NOTMASTER
675
  except errors.ResolverError, err:
676
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
677
    return constants.EXIT_NODESETUP_ERROR
678
  except errors.JobQueueFull:
679
    logging.error("Job queue is full, can't query cluster state")
680
  except errors.JobQueueDrainError:
681
    logging.error("Job queue is drained, can't maintain cluster state")
682
  except Exception, err:
683
    logging.exception(str(err))
684
    return constants.EXIT_FAILURE
685

    
686
  return constants.EXIT_SUCCESS