Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 2237687b

History | View | Annotate | Download (23.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47
from ganeti import ssconf
48
from ganeti import bdev
49
from ganeti import hypervisor
50
from ganeti import rapi
51
from ganeti.confd import client as confd_client
52
from ganeti import netutils
53

    
54
import ganeti.rapi.client # pylint: disable-msg=W0611
55

    
56

    
57
MAXTRIES = 5
58
# Delete any record that is older than 8 hours; this value is based on
59
# the fact that the current retry counter is 5, and watcher runs every
60
# 5 minutes, so it takes around half an hour to exceed the retry
61
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
62
RETRY_EXPIRATION = 8 * 3600
63
BAD_STATES = ['ERROR_down']
64
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
65
NOTICE = 'NOTICE'
66
ERROR = 'ERROR'
67
KEY_RESTART_COUNT = "restart_count"
68
KEY_RESTART_WHEN = "restart_when"
69
KEY_BOOT_ID = "bootid"
70

    
71

    
72
# Global client object
73
client = None
74

    
75

    
76
class NotMasterError(errors.GenericError):
77
  """Exception raised when this host is not the master."""
78

    
79

    
80
def ShouldPause():
81
  """Check whether we should pause.
82

83
  """
84
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
85

    
86

    
87
def StartNodeDaemons():
88
  """Start all the daemons that should be running on all nodes.
89

90
  """
91
  # on master or not, try to start the node daemon
92
  utils.EnsureDaemon(constants.NODED)
93
  # start confd as well. On non candidates it will be in disabled mode.
94
  utils.EnsureDaemon(constants.CONFD)
95

    
96

    
97
def RunWatcherHooks():
98
  """Run the watcher hooks.
99

100
  """
101
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102
                             constants.HOOKS_NAME_WATCHER)
103
  if not os.path.isdir(hooks_dir):
104
    return
105

    
106
  try:
107
    results = utils.RunParts(hooks_dir)
108
  except Exception, msg: # pylint: disable-msg=W0703
109
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
110

    
111
  for (relname, status, runresult) in results:
112
    if status == constants.RUNPARTS_SKIP:
113
      logging.debug("Watcher hook %s: skipped", relname)
114
    elif status == constants.RUNPARTS_ERR:
115
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116
    elif status == constants.RUNPARTS_RUN:
117
      if runresult.failed:
118
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119
                        relname, runresult.exit_code, runresult.output)
120
      else:
121
        logging.debug("Watcher hook %s: success (output: %s)", relname,
122
                      runresult.output)
123

    
124

    
125
class NodeMaintenance(object):
126
  """Talks to confd daemons and possible shutdown instances/drbd devices.
127

128
  """
129
  def __init__(self):
130
    self.store_cb = confd_client.StoreResultCallback()
131
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
133

    
134
  @staticmethod
135
  def ShouldRun():
136
    """Checks whether node maintenance should run.
137

138
    """
139
    try:
140
      return ssconf.SimpleStore().GetMaintainNodeHealth()
141
    except errors.ConfigurationError, err:
142
      logging.error("Configuration error, not activating node maintenance: %s",
143
                    err)
144
      return False
145

    
146
  @staticmethod
147
  def GetRunningInstances():
148
    """Compute list of hypervisor/running instances.
149

150
    """
151
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
152
    results = []
153
    for hv_name in hyp_list:
154
      try:
155
        hv = hypervisor.GetHypervisor(hv_name)
156
        ilist = hv.ListInstances()
157
        results.extend([(iname, hv_name) for iname in ilist])
158
      except: # pylint: disable-msg=W0702
159
        logging.error("Error while listing instances for hypervisor %s",
160
                      hv_name, exc_info=True)
161
    return results
162

    
163
  @staticmethod
164
  def GetUsedDRBDs():
165
    """Get list of used DRBD minors.
166

167
    """
168
    return bdev.DRBD8.GetUsedDevs().keys()
169

    
170
  @classmethod
171
  def DoMaintenance(cls, role):
172
    """Maintain the instance list.
173

174
    """
175
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
176
      inst_running = cls.GetRunningInstances()
177
      cls.ShutdownInstances(inst_running)
178
      drbd_running = cls.GetUsedDRBDs()
179
      cls.ShutdownDRBD(drbd_running)
180
    else:
181
      logging.debug("Not doing anything for role %s", role)
182

    
183
  @staticmethod
184
  def ShutdownInstances(inst_running):
185
    """Shutdown running instances.
186

187
    """
188
    names_running = set([i[0] for i in inst_running])
189
    if names_running:
190
      logging.info("Following instances should not be running,"
191
                   " shutting them down: %s", utils.CommaJoin(names_running))
192
      # this dictionary will collapse duplicate instance names (only
193
      # xen pvm/vhm) into a single key, which is fine
194
      i2h = dict(inst_running)
195
      for name in names_running:
196
        hv_name = i2h[name]
197
        hv = hypervisor.GetHypervisor(hv_name)
198
        hv.StopInstance(None, force=True, name=name)
199

    
200
  @staticmethod
201
  def ShutdownDRBD(drbd_running):
202
    """Shutdown active DRBD devices.
203

204
    """
205
    if drbd_running:
206
      logging.info("Following DRBD minors should not be active,"
207
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
208
      for minor in drbd_running:
209
        # pylint: disable-msg=W0212
210
        # using the private method as is, pending enhancements to the DRBD
211
        # interface
212
        bdev.DRBD8._ShutdownAll(minor)
213

    
214
  def Exec(self):
215
    """Check node status versus cluster desired state.
216

217
    """
218
    my_name = netutils.Hostname.GetSysName()
219
    req = confd_client.ConfdClientRequest(type=
220
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
221
                                          query=my_name)
222
    self.confd_client.SendRequest(req, async=False, coverage=-1)
223
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
224
    if not timed_out:
225
      # should have a valid response
226
      status, result = self.store_cb.GetResponse(req.rsalt)
227
      assert status, "Missing result but received replies"
228
      if not self.filter_cb.consistent[req.rsalt]:
229
        logging.warning("Inconsistent replies, not doing anything")
230
        return
231
      self.DoMaintenance(result.server_reply.answer)
232
    else:
233
      logging.warning("Confd query timed out, cannot do maintenance actions")
234

    
235

    
236
class WatcherState(object):
237
  """Interface to a state file recording restart attempts.
238

239
  """
240
  def __init__(self, statefile):
241
    """Open, lock, read and parse the file.
242

243
    @type statefile: file
244
    @param statefile: State file object
245

246
    """
247
    self.statefile = statefile
248

    
249
    try:
250
      state_data = self.statefile.read()
251
      if not state_data:
252
        self._data = {}
253
      else:
254
        self._data = serializer.Load(state_data)
255
    except Exception, msg: # pylint: disable-msg=W0703
256
      # Ignore errors while loading the file and treat it as empty
257
      self._data = {}
258
      logging.warning(("Invalid state file. Using defaults."
259
                       " Error message: %s"), msg)
260

    
261
    if "instance" not in self._data:
262
      self._data["instance"] = {}
263
    if "node" not in self._data:
264
      self._data["node"] = {}
265

    
266
    self._orig_data = serializer.Dump(self._data)
267

    
268
  def Save(self):
269
    """Save state to file, then unlock and close it.
270

271
    """
272
    assert self.statefile
273

    
274
    serialized_form = serializer.Dump(self._data)
275
    if self._orig_data == serialized_form:
276
      logging.debug("Data didn't change, just touching status file")
277
      os.utime(constants.WATCHER_STATEFILE, None)
278
      return
279

    
280
    # We need to make sure the file is locked before renaming it, otherwise
281
    # starting ganeti-watcher again at the same time will create a conflict.
282
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283
                         data=serialized_form,
284
                         prewrite=utils.LockFile, close=False)
285
    self.statefile = os.fdopen(fd, 'w+')
286

    
287
  def Close(self):
288
    """Unlock configuration file and close it.
289

290
    """
291
    assert self.statefile
292

    
293
    # Files are automatically unlocked when closing them
294
    self.statefile.close()
295
    self.statefile = None
296

    
297
  def GetNodeBootID(self, name):
298
    """Returns the last boot ID of a node or None.
299

300
    """
301
    ndata = self._data["node"]
302

    
303
    if name in ndata and KEY_BOOT_ID in ndata[name]:
304
      return ndata[name][KEY_BOOT_ID]
305
    return None
306

    
307
  def SetNodeBootID(self, name, bootid):
308
    """Sets the boot ID of a node.
309

310
    """
311
    assert bootid
312

    
313
    ndata = self._data["node"]
314

    
315
    if name not in ndata:
316
      ndata[name] = {}
317

    
318
    ndata[name][KEY_BOOT_ID] = bootid
319

    
320
  def NumberOfRestartAttempts(self, instance):
321
    """Returns number of previous restart attempts.
322

323
    @type instance: L{Instance}
324
    @param instance: the instance to look up
325

326
    """
327
    idata = self._data["instance"]
328

    
329
    if instance.name in idata:
330
      return idata[instance.name][KEY_RESTART_COUNT]
331

    
332
    return 0
333

    
334
  def MaintainInstanceList(self, instances):
335
    """Perform maintenance on the recorded instances.
336

337
    @type instances: list of string
338
    @param instances: the list of currently existing instances
339

340
    """
341
    idict = self._data["instance"]
342
    # First, delete obsolete instances
343
    obsolete_instances = set(idict).difference(instances)
344
    for inst in obsolete_instances:
345
      logging.debug("Forgetting obsolete instance %s", inst)
346
      del idict[inst]
347

    
348
    # Second, delete expired records
349
    earliest = time.time() - RETRY_EXPIRATION
350
    expired_instances = [i for i in idict
351
                         if idict[i][KEY_RESTART_WHEN] < earliest]
352
    for inst in expired_instances:
353
      logging.debug("Expiring record for instance %s", inst)
354
      del idict[inst]
355

    
356
  def RecordRestartAttempt(self, instance):
357
    """Record a restart attempt.
358

359
    @type instance: L{Instance}
360
    @param instance: the instance being restarted
361

362
    """
363
    idata = self._data["instance"]
364

    
365
    if instance.name not in idata:
366
      inst = idata[instance.name] = {}
367
    else:
368
      inst = idata[instance.name]
369

    
370
    inst[KEY_RESTART_WHEN] = time.time()
371
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
372

    
373
  def RemoveInstance(self, instance):
374
    """Update state to reflect that a machine is running.
375

376
    This method removes the record for a named instance (as we only
377
    track down instances).
378

379
    @type instance: L{Instance}
380
    @param instance: the instance to remove from books
381

382
    """
383
    idata = self._data["instance"]
384

    
385
    if instance.name in idata:
386
      del idata[instance.name]
387

    
388

    
389
class Instance(object):
390
  """Abstraction for a Virtual Machine instance.
391

392
  """
393
  def __init__(self, name, state, autostart):
394
    self.name = name
395
    self.state = state
396
    self.autostart = autostart
397

    
398
  def Restart(self):
399
    """Encapsulates the start of an instance.
400

401
    """
402
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
403
    cli.SubmitOpCode(op, cl=client)
404

    
405
  def ActivateDisks(self):
406
    """Encapsulates the activation of all disks of an instance.
407

408
    """
409
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
410
    cli.SubmitOpCode(op, cl=client)
411

    
412

    
413
def GetClusterData():
414
  """Get a list of instances on this cluster.
415

416
  """
417
  op1_fields = ["name", "status", "admin_state", "snodes"]
418
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
419
                                use_locking=True)
420
  op2_fields = ["name", "bootid", "offline"]
421
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
422
                            use_locking=True)
423

    
424
  job_id = client.SubmitJob([op1, op2])
425

    
426
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427

    
428
  logging.debug("Got data from cluster, writing instance status file")
429

    
430
  result = all_results[0]
431
  smap = {}
432

    
433
  instances = {}
434

    
435
  # write the upfile
436
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
438

    
439
  for fields in result:
440
    (name, status, autostart, snodes) = fields
441

    
442
    # update the secondary node map
443
    for node in snodes:
444
      if node not in smap:
445
        smap[node] = []
446
      smap[node].append(name)
447

    
448
    instances[name] = Instance(name, status, autostart)
449

    
450
  nodes =  dict([(name, (bootid, offline))
451
                 for name, bootid, offline in all_results[1]])
452

    
453
  client.ArchiveJob(job_id)
454

    
455
  return instances, nodes, smap
456

    
457

    
458
class Watcher(object):
459
  """Encapsulate the logic for restarting erroneously halted virtual machines.
460

461
  The calling program should periodically instantiate me and call Run().
462
  This will traverse the list of instances, and make up to MAXTRIES attempts
463
  to restart machines that are down.
464

465
  """
466
  def __init__(self, opts, notepad):
467
    self.notepad = notepad
468
    master = client.QueryConfigValues(["master_node"])[0]
469
    if master != netutils.Hostname.GetSysName():
470
      raise NotMasterError("This is not the master node")
471
    # first archive old jobs
472
    self.ArchiveJobs(opts.job_age)
473
    # and only then submit new ones
474
    self.instances, self.bootids, self.smap = GetClusterData()
475
    self.started_instances = set()
476
    self.opts = opts
477

    
478
  def Run(self):
479
    """Watcher run sequence.
480

481
    """
482
    notepad = self.notepad
483
    self.CheckInstances(notepad)
484
    self.CheckDisks(notepad)
485
    self.VerifyDisks()
486

    
487
  @staticmethod
488
  def ArchiveJobs(age):
489
    """Archive old jobs.
490

491
    """
492
    arch_count, left_count = client.AutoArchiveJobs(age)
493
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
494

    
495
  def CheckDisks(self, notepad):
496
    """Check all nodes for restarted ones.
497

498
    """
499
    check_nodes = []
500
    for name, (new_id, offline) in self.bootids.iteritems():
501
      old = notepad.GetNodeBootID(name)
502
      if new_id is None:
503
        # Bad node, not returning a boot id
504
        if not offline:
505
          logging.debug("Node %s missing boot id, skipping secondary checks",
506
                        name)
507
        continue
508
      if old != new_id:
509
        # Node's boot ID has changed, proably through a reboot.
510
        check_nodes.append(name)
511

    
512
    if check_nodes:
513
      # Activate disks for all instances with any of the checked nodes as a
514
      # secondary node.
515
      for node in check_nodes:
516
        if node not in self.smap:
517
          continue
518
        for instance_name in self.smap[node]:
519
          instance = self.instances[instance_name]
520
          if not instance.autostart:
521
            logging.info(("Skipping disk activation for non-autostart"
522
                          " instance %s"), instance.name)
523
            continue
524
          if instance.name in self.started_instances:
525
            # we already tried to start the instance, which should have
526
            # activated its drives (if they can be at all)
527
            continue
528
          try:
529
            logging.info("Activating disks for instance %s", instance.name)
530
            instance.ActivateDisks()
531
          except Exception: # pylint: disable-msg=W0703
532
            logging.exception("Error while activating disks for instance %s",
533
                              instance.name)
534

    
535
      # Keep changed boot IDs
536
      for name in check_nodes:
537
        notepad.SetNodeBootID(name, self.bootids[name][0])
538

    
539
  def CheckInstances(self, notepad):
540
    """Make a pass over the list of instances, restarting downed ones.
541

542
    """
543
    notepad.MaintainInstanceList(self.instances.keys())
544

    
545
    for instance in self.instances.values():
546
      if instance.state in BAD_STATES:
547
        n = notepad.NumberOfRestartAttempts(instance)
548

    
549
        if n > MAXTRIES:
550
          logging.warning("Not restarting instance %s, retries exhausted",
551
                          instance.name)
552
          continue
553
        elif n < MAXTRIES:
554
          last = " (Attempt #%d)" % (n + 1)
555
        else:
556
          notepad.RecordRestartAttempt(instance)
557
          logging.error("Could not restart %s after %d attempts, giving up",
558
                        instance.name, MAXTRIES)
559
          continue
560
        try:
561
          logging.info("Restarting %s%s",
562
                        instance.name, last)
563
          instance.Restart()
564
          self.started_instances.add(instance.name)
565
        except Exception: # pylint: disable-msg=W0703
566
          logging.exception("Error while restarting instance %s",
567
                            instance.name)
568

    
569
        notepad.RecordRestartAttempt(instance)
570
      elif instance.state in HELPLESS_STATES:
571
        if notepad.NumberOfRestartAttempts(instance):
572
          notepad.RemoveInstance(instance)
573
      else:
574
        if notepad.NumberOfRestartAttempts(instance):
575
          notepad.RemoveInstance(instance)
576
          logging.info("Restart of %s succeeded", instance.name)
577

    
578
  @staticmethod
579
  def VerifyDisks():
580
    """Run gnt-cluster verify-disks.
581

582
    """
583
    op = opcodes.OpClusterVerifyDisks()
584
    job_id = client.SubmitJob([op])
585
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586
    client.ArchiveJob(job_id)
587
    if not isinstance(result, (tuple, list)):
588
      logging.error("Can't get a valid result from verify-disks")
589
      return
590
    offline_disk_instances = result[2]
591
    if not offline_disk_instances:
592
      # nothing to do
593
      return
594
    logging.debug("Will activate disks for instances %s",
595
                  utils.CommaJoin(offline_disk_instances))
596
    # we submit only one job, and wait for it. not optimal, but spams
597
    # less the job queue
598
    job = [opcodes.OpInstanceActivateDisks(instance_name=name)
599
           for name in offline_disk_instances]
600
    job_id = cli.SendJob(job, cl=client)
601

    
602
    try:
603
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604
    except Exception: # pylint: disable-msg=W0703
605
      logging.exception("Error while activating disks")
606

    
607

    
608
def OpenStateFile(path):
609
  """Opens the state file and acquires a lock on it.
610

611
  @type path: string
612
  @param path: Path to state file
613

614
  """
615
  # The two-step dance below is necessary to allow both opening existing
616
  # file read/write and creating if not existing. Vanilla open will truncate
617
  # an existing file -or- allow creating if not existing.
618
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
619

    
620
  # Try to acquire lock on state file. If this fails, another watcher instance
621
  # might already be running or another program is temporarily blocking the
622
  # watcher from running.
623
  try:
624
    utils.LockFile(statefile_fd)
625
  except errors.LockError, err:
626
    logging.error("Can't acquire lock on state file %s: %s", path, err)
627
    return None
628

    
629
  return os.fdopen(statefile_fd, "w+")
630

    
631

    
632
def IsRapiResponding(hostname):
633
  """Connects to RAPI port and does a simple test.
634

635
  Connects to RAPI port of hostname and does a simple test. At this time, the
636
  test is GetVersion.
637

638
  @type hostname: string
639
  @param hostname: hostname of the node to connect to.
640
  @rtype: bool
641
  @return: Whether RAPI is working properly
642

643
  """
644
  curl_config = rapi.client.GenericCurlConfig()
645
  rapi_client = rapi.client.GanetiRapiClient(hostname,
646
                                             curl_config_fn=curl_config)
647
  try:
648
    master_version = rapi_client.GetVersion()
649
  except rapi.client.CertificateError, err:
650
    logging.warning("RAPI Error: CertificateError (%s)", err)
651
    return False
652
  except rapi.client.GanetiApiError, err:
653
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
654
    return False
655
  logging.debug("RAPI Result: master_version is %s", master_version)
656
  return master_version == constants.RAPI_VERSION
657

    
658

    
659
def ParseOptions():
660
  """Parse the command line options.
661

662
  @return: (options, args) as from OptionParser.parse_args()
663

664
  """
665
  parser = OptionParser(description="Ganeti cluster watcher",
666
                        usage="%prog [-d]",
667
                        version="%%prog (ganeti) %s" %
668
                        constants.RELEASE_VERSION)
669

    
670
  parser.add_option(cli.DEBUG_OPT)
671
  parser.add_option("-A", "--job-age", dest="job_age",
672
                    help="Autoarchive jobs older than this age (default"
673
                    " 6 hours)", default=6*3600)
674
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
675
                    action="store_true", help="Ignore cluster pause setting")
676
  options, args = parser.parse_args()
677
  options.job_age = cli.ParseTimespec(options.job_age)
678
  return options, args
679

    
680

    
681
@rapi.client.UsesRapiClient
682
def Main():
683
  """Main function.
684

685
  """
686
  global client # pylint: disable-msg=W0603
687

    
688
  options, args = ParseOptions()
689

    
690
  if args: # watcher doesn't take any arguments
691
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
692
    return constants.EXIT_FAILURE
693

    
694
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
695
                     stderr_logging=options.debug)
696

    
697
  if ShouldPause() and not options.ignore_pause:
698
    logging.debug("Pause has been set, exiting")
699
    return constants.EXIT_SUCCESS
700

    
701
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
702
  if not statefile:
703
    return constants.EXIT_FAILURE
704

    
705
  update_file = False
706
  try:
707
    StartNodeDaemons()
708
    RunWatcherHooks()
709
    # run node maintenance in all cases, even if master, so that old
710
    # masters can be properly cleaned up too
711
    if NodeMaintenance.ShouldRun():
712
      NodeMaintenance().Exec()
713

    
714
    notepad = WatcherState(statefile)
715
    try:
716
      try:
717
        client = cli.GetClient()
718
      except errors.OpPrereqError:
719
        # this is, from cli.GetClient, a not-master case
720
        logging.debug("Not on master, exiting")
721
        update_file = True
722
        return constants.EXIT_SUCCESS
723
      except luxi.NoMasterError, err:
724
        logging.warning("Master seems to be down (%s), trying to restart",
725
                        str(err))
726
        if not utils.EnsureDaemon(constants.MASTERD):
727
          logging.critical("Can't start the master, exiting")
728
          return constants.EXIT_FAILURE
729
        # else retry the connection
730
        client = cli.GetClient()
731

    
732
      # we are on master now
733
      utils.EnsureDaemon(constants.RAPI)
734

    
735
      # If RAPI isn't responding to queries, try one restart.
736
      logging.debug("Attempting to talk with RAPI.")
737
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
738
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
739
                        " Restarting Ganeti RAPI.")
740
        utils.StopDaemon(constants.RAPI)
741
        utils.EnsureDaemon(constants.RAPI)
742
        logging.debug("Second attempt to talk with RAPI")
743
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
744
          logging.fatal("RAPI is not responding. Please investigate.")
745
      logging.debug("Successfully talked to RAPI.")
746

    
747
      try:
748
        watcher = Watcher(options, notepad)
749
      except errors.ConfigurationError:
750
        # Just exit if there's no configuration
751
        update_file = True
752
        return constants.EXIT_SUCCESS
753

    
754
      watcher.Run()
755
      update_file = True
756

    
757
    finally:
758
      if update_file:
759
        notepad.Save()
760
      else:
761
        logging.debug("Not updating status file due to failure")
762
  except SystemExit:
763
    raise
764
  except NotMasterError:
765
    logging.debug("Not master, exiting")
766
    return constants.EXIT_NOTMASTER
767
  except errors.ResolverError, err:
768
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
769
    return constants.EXIT_NODESETUP_ERROR
770
  except errors.JobQueueFull:
771
    logging.error("Job queue is full, can't query cluster state")
772
  except errors.JobQueueDrainError:
773
    logging.error("Job queue is drained, can't maintain cluster state")
774
  except Exception, err:
775
    logging.exception(str(err))
776
    return constants.EXIT_FAILURE
777

    
778
  return constants.EXIT_SUCCESS