Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 50273051

History | View | Annotate | Download (20.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47
from ganeti import ssconf
48
from ganeti import bdev
49
from ganeti import hypervisor
50
from ganeti.confd import client as confd_client
51

    
52

    
53
MAXTRIES = 5
54
BAD_STATES = ['ERROR_down']
55
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
56
NOTICE = 'NOTICE'
57
ERROR = 'ERROR'
58
KEY_RESTART_COUNT = "restart_count"
59
KEY_RESTART_WHEN = "restart_when"
60
KEY_BOOT_ID = "bootid"
61

    
62

    
63
# Global client object
64
client = None
65

    
66

    
67
class NotMasterError(errors.GenericError):
68
  """Exception raised when this host is not the master."""
69

    
70

    
71
def ShouldPause():
72
  """Check whether we should pause.
73

    
74
  """
75
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
76

    
77

    
78
def StartNodeDaemons():
79
  """Start all the daemons that should be running on all nodes.
80

    
81
  """
82
  # on master or not, try to start the node daemon
83
  utils.EnsureDaemon(constants.NODED)
84
  # start confd as well. On non candidates it will be in disabled mode.
85
  utils.EnsureDaemon(constants.CONFD)
86

    
87

    
88
def RunWatcherHooks():
89
  """Run the watcher hooks.
90

    
91
  """
92
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
93
                             constants.HOOKS_NAME_WATCHER)
94
  if not os.path.isdir(hooks_dir):
95
    return
96

    
97
  try:
98
    results = utils.RunParts(hooks_dir)
99
  except Exception, msg: # pylint: disable-msg=W0703
100
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
101

    
102
  for (relname, status, runresult) in results:
103
    if status == constants.RUNPARTS_SKIP:
104
      logging.debug("Watcher hook %s: skipped", relname)
105
    elif status == constants.RUNPARTS_ERR:
106
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
107
    elif status == constants.RUNPARTS_RUN:
108
      if runresult.failed:
109
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
110
                        relname, runresult.exit_code, runresult.output)
111
      else:
112
        logging.debug("Watcher hook %s: success (output: %s)", relname,
113
                      runresult.output)
114

    
115

    
116
class NodeMaintenance(object):
117
  """Talks to confd daemons and possible shutdown instances/drbd devices.
118

    
119
  """
120
  def __init__(self):
121
    self.store_cb = confd_client.StoreResultCallback()
122
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
123
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
124

    
125
  @staticmethod
126
  def ShouldRun():
127
    """Checks whether node maintenance should run.
128

    
129
    """
130
    try:
131
      return ssconf.SimpleStore().GetMaintainNodeHealth()
132
    except errors.ConfigurationError, err:
133
      logging.error("Configuration error, not activating node maintenance: %s",
134
                    err)
135
      return False
136

    
137
  @staticmethod
138
  def GetRunningInstances():
139
    """Compute list of hypervisor/running instances.
140

    
141
    """
142
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
143
    results = []
144
    for hv_name in hyp_list:
145
      try:
146
        hv = hypervisor.GetHypervisor(hv_name)
147
        ilist = hv.ListInstances()
148
        results.extend([(iname, hv_name) for iname in ilist])
149
      except: # pylint: disable-msg=W0702
150
        logging.error("Error while listing instances for hypervisor %s",
151
                      hv_name, exc_info=True)
152
    return results
153

    
154
  @staticmethod
155
  def GetUsedDRBDs():
156
    """Get list of used DRBD minors.
157

    
158
    """
159
    return bdev.DRBD8.GetUsedDevs().keys()
160

    
161
  @classmethod
162
  def DoMaintenance(cls, role):
163
    """Maintain the instance list.
164

    
165
    """
166
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
167
      inst_running = cls.GetRunningInstances()
168
      cls.ShutdownInstances(inst_running)
169
      drbd_running = cls.GetUsedDRBDs()
170
      cls.ShutdownDRBD(drbd_running)
171
    else:
172
      logging.debug("Not doing anything for role %s", role)
173

    
174
  @staticmethod
175
  def ShutdownInstances(inst_running):
176
    """Shutdown running instances.
177

    
178
    """
179
    names_running = set([i[0] for i in inst_running])
180
    if names_running:
181
      logging.info("Following instances should not be running,"
182
                   " shutting them down: %s", utils.CommaJoin(names_running))
183
      # this dictionary will collapse duplicate instance names (only
184
      # xen pvm/vhm) into a single key, which is fine
185
      i2h = dict(inst_running)
186
      for name in names_running:
187
        hv_name = i2h[name]
188
        hv = hypervisor.GetHypervisor(hv_name)
189
        hv.StopInstance(None, force=True, name=name)
190

    
191
  @staticmethod
192
  def ShutdownDRBD(drbd_running):
193
    """Shutdown active DRBD devices.
194

    
195
    """
196
    if drbd_running:
197
      logging.info("Following DRBD minors should not be active,"
198
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
199
      for minor in drbd_running:
200
        # pylint: disable-msg=W0212
201
        # using the private method as is, pending enhancements to the DRBD
202
        # interface
203
        bdev.DRBD8._ShutdownAll(minor)
204

    
205
  def Exec(self):
206
    """Check node status versus cluster desired state.
207

    
208
    """
209
    my_name = utils.HostInfo().name
210
    req = confd_client.ConfdClientRequest(type=
211
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
212
                                          query=my_name)
213
    self.confd_client.SendRequest(req, async=False)
214
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
215
    if not timed_out:
216
      # should have a valid response
217
      status, result = self.store_cb.GetResponse(req.rsalt)
218
      assert status, "Missing result but received replies"
219
      if not self.filter_cb.consistent[req.rsalt]:
220
        logging.warning("Inconsistent replies, not doing anything")
221
        return
222
      self.DoMaintenance(result.server_reply.answer)
223
    else:
224
      logging.warning("Confd query timed out, cannot do maintenance actions")
225

    
226

    
227
class WatcherState(object):
228
  """Interface to a state file recording restart attempts.
229

    
230
  """
231
  def __init__(self, statefile):
232
    """Open, lock, read and parse the file.
233

    
234
    @type statefile: file
235
    @param statefile: State file object
236

    
237
    """
238
    self.statefile = statefile
239

    
240
    try:
241
      state_data = self.statefile.read()
242
      if not state_data:
243
        self._data = {}
244
      else:
245
        self._data = serializer.Load(state_data)
246
    except Exception, msg: # pylint: disable-msg=W0703
247
      # Ignore errors while loading the file and treat it as empty
248
      self._data = {}
249
      logging.warning(("Invalid state file. Using defaults."
250
                       " Error message: %s"), msg)
251

    
252
    if "instance" not in self._data:
253
      self._data["instance"] = {}
254
    if "node" not in self._data:
255
      self._data["node"] = {}
256

    
257
    self._orig_data = serializer.Dump(self._data)
258

    
259
  def Save(self):
260
    """Save state to file, then unlock and close it.
261

    
262
    """
263
    assert self.statefile
264

    
265
    serialized_form = serializer.Dump(self._data)
266
    if self._orig_data == serialized_form:
267
      logging.debug("Data didn't change, just touching status file")
268
      os.utime(constants.WATCHER_STATEFILE, None)
269
      return
270

    
271
    # We need to make sure the file is locked before renaming it, otherwise
272
    # starting ganeti-watcher again at the same time will create a conflict.
273
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
274
                         data=serialized_form,
275
                         prewrite=utils.LockFile, close=False)
276
    self.statefile = os.fdopen(fd, 'w+')
277

    
278
  def Close(self):
279
    """Unlock configuration file and close it.
280

    
281
    """
282
    assert self.statefile
283

    
284
    # Files are automatically unlocked when closing them
285
    self.statefile.close()
286
    self.statefile = None
287

    
288
  def GetNodeBootID(self, name):
289
    """Returns the last boot ID of a node or None.
290

    
291
    """
292
    ndata = self._data["node"]
293

    
294
    if name in ndata and KEY_BOOT_ID in ndata[name]:
295
      return ndata[name][KEY_BOOT_ID]
296
    return None
297

    
298
  def SetNodeBootID(self, name, bootid):
299
    """Sets the boot ID of a node.
300

    
301
    """
302
    assert bootid
303

    
304
    ndata = self._data["node"]
305

    
306
    if name not in ndata:
307
      ndata[name] = {}
308

    
309
    ndata[name][KEY_BOOT_ID] = bootid
310

    
311
  def NumberOfRestartAttempts(self, instance):
312
    """Returns number of previous restart attempts.
313

    
314
    @type instance: L{Instance}
315
    @param instance: the instance to look up
316

    
317
    """
318
    idata = self._data["instance"]
319

    
320
    if instance.name in idata:
321
      return idata[instance.name][KEY_RESTART_COUNT]
322

    
323
    return 0
324

    
325
  def RecordRestartAttempt(self, instance):
326
    """Record a restart attempt.
327

    
328
    @type instance: L{Instance}
329
    @param instance: the instance being restarted
330

    
331
    """
332
    idata = self._data["instance"]
333

    
334
    if instance.name not in idata:
335
      inst = idata[instance.name] = {}
336
    else:
337
      inst = idata[instance.name]
338

    
339
    inst[KEY_RESTART_WHEN] = time.time()
340
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
341

    
342
  def RemoveInstance(self, instance):
343
    """Update state to reflect that a machine is running.
344

    
345
    This method removes the record for a named instance (as we only
346
    track down instances).
347

    
348
    @type instance: L{Instance}
349
    @param instance: the instance to remove from books
350

    
351
    """
352
    idata = self._data["instance"]
353

    
354
    if instance.name in idata:
355
      del idata[instance.name]
356

    
357

    
358
class Instance(object):
359
  """Abstraction for a Virtual Machine instance.
360

    
361
  """
362
  def __init__(self, name, state, autostart):
363
    self.name = name
364
    self.state = state
365
    self.autostart = autostart
366

    
367
  def Restart(self):
368
    """Encapsulates the start of an instance.
369

    
370
    """
371
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
372
    cli.SubmitOpCode(op, cl=client)
373

    
374
  def ActivateDisks(self):
375
    """Encapsulates the activation of all disks of an instance.
376

    
377
    """
378
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
379
    cli.SubmitOpCode(op, cl=client)
380

    
381

    
382
def GetClusterData():
383
  """Get a list of instances on this cluster.
384

    
385
  """
386
  op1_fields = ["name", "status", "admin_state", "snodes"]
387
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
388
                                 use_locking=True)
389
  op2_fields = ["name", "bootid", "offline"]
390
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
391
                             use_locking=True)
392

    
393
  job_id = client.SubmitJob([op1, op2])
394

    
395
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
396

    
397
  logging.debug("Got data from cluster, writing instance status file")
398

    
399
  result = all_results[0]
400
  smap = {}
401

    
402
  instances = {}
403

    
404
  # write the upfile
405
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
406
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
407

    
408
  for fields in result:
409
    (name, status, autostart, snodes) = fields
410

    
411
    # update the secondary node map
412
    for node in snodes:
413
      if node not in smap:
414
        smap[node] = []
415
      smap[node].append(name)
416

    
417
    instances[name] = Instance(name, status, autostart)
418

    
419
  nodes =  dict([(name, (bootid, offline))
420
                 for name, bootid, offline in all_results[1]])
421

    
422
  client.ArchiveJob(job_id)
423

    
424
  return instances, nodes, smap
425

    
426

    
427
class Watcher(object):
428
  """Encapsulate the logic for restarting erroneously halted virtual machines.
429

    
430
  The calling program should periodically instantiate me and call Run().
431
  This will traverse the list of instances, and make up to MAXTRIES attempts
432
  to restart machines that are down.
433

    
434
  """
435
  def __init__(self, opts, notepad):
436
    self.notepad = notepad
437
    master = client.QueryConfigValues(["master_node"])[0]
438
    if master != utils.HostInfo().name:
439
      raise NotMasterError("This is not the master node")
440
    # first archive old jobs
441
    self.ArchiveJobs(opts.job_age)
442
    # and only then submit new ones
443
    self.instances, self.bootids, self.smap = GetClusterData()
444
    self.started_instances = set()
445
    self.opts = opts
446

    
447
  def Run(self):
448
    """Watcher run sequence.
449

    
450
    """
451
    notepad = self.notepad
452
    self.CheckInstances(notepad)
453
    self.CheckDisks(notepad)
454
    self.VerifyDisks()
455

    
456
  @staticmethod
457
  def ArchiveJobs(age):
458
    """Archive old jobs.
459

    
460
    """
461
    arch_count, left_count = client.AutoArchiveJobs(age)
462
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
463

    
464
  def CheckDisks(self, notepad):
465
    """Check all nodes for restarted ones.
466

    
467
    """
468
    check_nodes = []
469
    for name, (new_id, offline) in self.bootids.iteritems():
470
      old = notepad.GetNodeBootID(name)
471
      if new_id is None:
472
        # Bad node, not returning a boot id
473
        if not offline:
474
          logging.debug("Node %s missing boot id, skipping secondary checks",
475
                        name)
476
        continue
477
      if old != new_id:
478
        # Node's boot ID has changed, proably through a reboot.
479
        check_nodes.append(name)
480

    
481
    if check_nodes:
482
      # Activate disks for all instances with any of the checked nodes as a
483
      # secondary node.
484
      for node in check_nodes:
485
        if node not in self.smap:
486
          continue
487
        for instance_name in self.smap[node]:
488
          instance = self.instances[instance_name]
489
          if not instance.autostart:
490
            logging.info(("Skipping disk activation for non-autostart"
491
                          " instance %s"), instance.name)
492
            continue
493
          if instance.name in self.started_instances:
494
            # we already tried to start the instance, which should have
495
            # activated its drives (if they can be at all)
496
            continue
497
          try:
498
            logging.info("Activating disks for instance %s", instance.name)
499
            instance.ActivateDisks()
500
          except Exception: # pylint: disable-msg=W0703
501
            logging.exception("Error while activating disks for instance %s",
502
                              instance.name)
503

    
504
      # Keep changed boot IDs
505
      for name in check_nodes:
506
        notepad.SetNodeBootID(name, self.bootids[name][0])
507

    
508
  def CheckInstances(self, notepad):
509
    """Make a pass over the list of instances, restarting downed ones.
510

    
511
    """
512
    for instance in self.instances.values():
513
      if instance.state in BAD_STATES:
514
        n = notepad.NumberOfRestartAttempts(instance)
515

    
516
        if n > MAXTRIES:
517
          # stay quiet.
518
          continue
519
        elif n < MAXTRIES:
520
          last = " (Attempt #%d)" % (n + 1)
521
        else:
522
          notepad.RecordRestartAttempt(instance)
523
          logging.error("Could not restart %s after %d attempts, giving up",
524
                        instance.name, MAXTRIES)
525
          continue
526
        try:
527
          logging.info("Restarting %s%s",
528
                        instance.name, last)
529
          instance.Restart()
530
          self.started_instances.add(instance.name)
531
        except Exception: # pylint: disable-msg=W0703
532
          logging.exception("Error while restarting instance %s",
533
                            instance.name)
534

    
535
        notepad.RecordRestartAttempt(instance)
536
      elif instance.state in HELPLESS_STATES:
537
        if notepad.NumberOfRestartAttempts(instance):
538
          notepad.RemoveInstance(instance)
539
      else:
540
        if notepad.NumberOfRestartAttempts(instance):
541
          notepad.RemoveInstance(instance)
542
          logging.info("Restart of %s succeeded", instance.name)
543

    
544
  @staticmethod
545
  def VerifyDisks():
546
    """Run gnt-cluster verify-disks.
547

    
548
    """
549
    op = opcodes.OpVerifyDisks()
550
    job_id = client.SubmitJob([op])
551
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
552
    client.ArchiveJob(job_id)
553
    if not isinstance(result, (tuple, list)):
554
      logging.error("Can't get a valid result from verify-disks")
555
      return
556
    offline_disk_instances = result[2]
557
    if not offline_disk_instances:
558
      # nothing to do
559
      return
560
    logging.debug("Will activate disks for instances %s",
561
                  utils.CommaJoin(offline_disk_instances))
562
    # we submit only one job, and wait for it. not optimal, but spams
563
    # less the job queue
564
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
565
           for name in offline_disk_instances]
566
    job_id = cli.SendJob(job, cl=client)
567

    
568
    try:
569
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
570
    except Exception: # pylint: disable-msg=W0703
571
      logging.exception("Error while activating disks")
572

    
573

    
574
def OpenStateFile(path):
575
  """Opens the state file and acquires a lock on it.
576

    
577
  @type path: string
578
  @param path: Path to state file
579

    
580
  """
581
  # The two-step dance below is necessary to allow both opening existing
582
  # file read/write and creating if not existing. Vanilla open will truncate
583
  # an existing file -or- allow creating if not existing.
584
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
585

    
586
  # Try to acquire lock on state file. If this fails, another watcher instance
587
  # might already be running or another program is temporarily blocking the
588
  # watcher from running.
589
  try:
590
    utils.LockFile(statefile_fd)
591
  except errors.LockError, err:
592
    logging.error("Can't acquire lock on state file %s: %s", path, err)
593
    return None
594

    
595
  return os.fdopen(statefile_fd, "w+")
596

    
597

    
598
def ParseOptions():
599
  """Parse the command line options.
600

    
601
  @return: (options, args) as from OptionParser.parse_args()
602

    
603
  """
604
  parser = OptionParser(description="Ganeti cluster watcher",
605
                        usage="%prog [-d]",
606
                        version="%%prog (ganeti) %s" %
607
                        constants.RELEASE_VERSION)
608

    
609
  parser.add_option(cli.DEBUG_OPT)
610
  parser.add_option("-A", "--job-age", dest="job_age",
611
                    help="Autoarchive jobs older than this age (default"
612
                    " 6 hours)", default=6*3600)
613
  options, args = parser.parse_args()
614
  options.job_age = cli.ParseTimespec(options.job_age)
615
  return options, args
616

    
617

    
618
def main():
619
  """Main function.
620

    
621
  """
622
  global client # pylint: disable-msg=W0603
623

    
624
  options, args = ParseOptions()
625

    
626
  if args: # watcher doesn't take any arguments
627
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
628
    sys.exit(constants.EXIT_FAILURE)
629

    
630
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
631
                     stderr_logging=options.debug)
632

    
633
  if ShouldPause():
634
    logging.debug("Pause has been set, exiting")
635
    sys.exit(constants.EXIT_SUCCESS)
636

    
637
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
638
  if not statefile:
639
    sys.exit(constants.EXIT_FAILURE)
640

    
641
  update_file = False
642
  try:
643
    StartNodeDaemons()
644
    RunWatcherHooks()
645
    # run node maintenance in all cases, even if master, so that old
646
    # masters can be properly cleaned up too
647
    if NodeMaintenance.ShouldRun():
648
      NodeMaintenance().Exec()
649

    
650
    notepad = WatcherState(statefile)
651
    try:
652
      try:
653
        client = cli.GetClient()
654
      except errors.OpPrereqError:
655
        # this is, from cli.GetClient, a not-master case
656
        logging.debug("Not on master, exiting")
657
        update_file = True
658
        sys.exit(constants.EXIT_SUCCESS)
659
      except luxi.NoMasterError, err:
660
        logging.warning("Master seems to be down (%s), trying to restart",
661
                        str(err))
662
        if not utils.EnsureDaemon(constants.MASTERD):
663
          logging.critical("Can't start the master, exiting")
664
          sys.exit(constants.EXIT_FAILURE)
665
        # else retry the connection
666
        client = cli.GetClient()
667

    
668
      # we are on master now
669
      utils.EnsureDaemon(constants.RAPI)
670

    
671
      try:
672
        watcher = Watcher(options, notepad)
673
      except errors.ConfigurationError:
674
        # Just exit if there's no configuration
675
        update_file = True
676
        sys.exit(constants.EXIT_SUCCESS)
677

    
678
      watcher.Run()
679
      update_file = True
680

    
681
    finally:
682
      if update_file:
683
        notepad.Save()
684
      else:
685
        logging.debug("Not updating status file due to failure")
686
  except SystemExit:
687
    raise
688
  except NotMasterError:
689
    logging.debug("Not master, exiting")
690
    sys.exit(constants.EXIT_NOTMASTER)
691
  except errors.ResolverError, err:
692
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
693
    sys.exit(constants.EXIT_NODESETUP_ERROR)
694
  except errors.JobQueueFull:
695
    logging.error("Job queue is full, can't query cluster state")
696
  except errors.JobQueueDrainError:
697
    logging.error("Job queue is drained, can't maintain cluster state")
698
  except Exception, err:
699
    logging.exception(str(err))
700
    sys.exit(constants.EXIT_FAILURE)
701

    
702

    
703
if __name__ == '__main__':
704
  main()