Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ b705c7a6

History | View | Annotate | Download (23.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47
from ganeti import ssconf
48
from ganeti import bdev
49
from ganeti import hypervisor
50
from ganeti import rapi
51
from ganeti.confd import client as confd_client
52
from ganeti import netutils
53

    
54
import ganeti.rapi.client # pylint: disable-msg=W0611
55

    
56

    
57
MAXTRIES = 5
58
# Delete any record that is older than 8 hours; this value is based on
59
# the fact that the current retry counter is 5, and watcher runs every
60
# 5 minutes, so it takes around half an hour to exceed the retry
61
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
62
RETRY_EXPIRATION = 8 * 3600
63
BAD_STATES = ['ERROR_down']
64
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
65
NOTICE = 'NOTICE'
66
ERROR = 'ERROR'
67
KEY_RESTART_COUNT = "restart_count"
68
KEY_RESTART_WHEN = "restart_when"
69
KEY_BOOT_ID = "bootid"
70

    
71

    
72
# Global client object
73
client = None
74

    
75

    
76
class NotMasterError(errors.GenericError):
77
  """Exception raised when this host is not the master."""
78

    
79

    
80
def ShouldPause():
81
  """Check whether we should pause.
82

    
83
  """
84
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
85

    
86

    
87
def StartNodeDaemons():
88
  """Start all the daemons that should be running on all nodes.
89

    
90
  """
91
  # on master or not, try to start the node daemon
92
  utils.EnsureDaemon(constants.NODED)
93
  # start confd as well. On non candidates it will be in disabled mode.
94
  utils.EnsureDaemon(constants.CONFD)
95

    
96

    
97
def RunWatcherHooks():
98
  """Run the watcher hooks.
99

    
100
  """
101
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102
                             constants.HOOKS_NAME_WATCHER)
103
  if not os.path.isdir(hooks_dir):
104
    return
105

    
106
  try:
107
    results = utils.RunParts(hooks_dir)
108
  except Exception, msg: # pylint: disable-msg=W0703
109
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
110

    
111
  for (relname, status, runresult) in results:
112
    if status == constants.RUNPARTS_SKIP:
113
      logging.debug("Watcher hook %s: skipped", relname)
114
    elif status == constants.RUNPARTS_ERR:
115
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116
    elif status == constants.RUNPARTS_RUN:
117
      if runresult.failed:
118
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119
                        relname, runresult.exit_code, runresult.output)
120
      else:
121
        logging.debug("Watcher hook %s: success (output: %s)", relname,
122
                      runresult.output)
123

    
124

    
125
class NodeMaintenance(object):
126
  """Talks to confd daemons and possible shutdown instances/drbd devices.
127

    
128
  """
129
  def __init__(self):
130
    self.store_cb = confd_client.StoreResultCallback()
131
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
133

    
134
  @staticmethod
135
  def ShouldRun():
136
    """Checks whether node maintenance should run.
137

    
138
    """
139
    try:
140
      return ssconf.SimpleStore().GetMaintainNodeHealth()
141
    except errors.ConfigurationError, err:
142
      logging.error("Configuration error, not activating node maintenance: %s",
143
                    err)
144
      return False
145

    
146
  @staticmethod
147
  def GetRunningInstances():
148
    """Compute list of hypervisor/running instances.
149

    
150
    """
151
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
152
    results = []
153
    for hv_name in hyp_list:
154
      try:
155
        hv = hypervisor.GetHypervisor(hv_name)
156
        ilist = hv.ListInstances()
157
        results.extend([(iname, hv_name) for iname in ilist])
158
      except: # pylint: disable-msg=W0702
159
        logging.error("Error while listing instances for hypervisor %s",
160
                      hv_name, exc_info=True)
161
    return results
162

    
163
  @staticmethod
164
  def GetUsedDRBDs():
165
    """Get list of used DRBD minors.
166

    
167
    """
168
    return bdev.DRBD8.GetUsedDevs().keys()
169

    
170
  @classmethod
171
  def DoMaintenance(cls, role):
172
    """Maintain the instance list.
173

    
174
    """
175
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
176
      inst_running = cls.GetRunningInstances()
177
      cls.ShutdownInstances(inst_running)
178
      drbd_running = cls.GetUsedDRBDs()
179
      cls.ShutdownDRBD(drbd_running)
180
    else:
181
      logging.debug("Not doing anything for role %s", role)
182

    
183
  @staticmethod
184
  def ShutdownInstances(inst_running):
185
    """Shutdown running instances.
186

    
187
    """
188
    names_running = set([i[0] for i in inst_running])
189
    if names_running:
190
      logging.info("Following instances should not be running,"
191
                   " shutting them down: %s", utils.CommaJoin(names_running))
192
      # this dictionary will collapse duplicate instance names (only
193
      # xen pvm/vhm) into a single key, which is fine
194
      i2h = dict(inst_running)
195
      for name in names_running:
196
        hv_name = i2h[name]
197
        hv = hypervisor.GetHypervisor(hv_name)
198
        hv.StopInstance(None, force=True, name=name)
199

    
200
  @staticmethod
201
  def ShutdownDRBD(drbd_running):
202
    """Shutdown active DRBD devices.
203

    
204
    """
205
    if drbd_running:
206
      logging.info("Following DRBD minors should not be active,"
207
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
208
      for minor in drbd_running:
209
        # pylint: disable-msg=W0212
210
        # using the private method as is, pending enhancements to the DRBD
211
        # interface
212
        bdev.DRBD8._ShutdownAll(minor)
213

    
214
  def Exec(self):
215
    """Check node status versus cluster desired state.
216

    
217
    """
218
    my_name = netutils.Hostname.GetSysName()
219
    req = confd_client.ConfdClientRequest(type=
220
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
221
                                          query=my_name)
222
    self.confd_client.SendRequest(req, async=False, coverage=-1)
223
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
224
    if not timed_out:
225
      # should have a valid response
226
      status, result = self.store_cb.GetResponse(req.rsalt)
227
      assert status, "Missing result but received replies"
228
      if not self.filter_cb.consistent[req.rsalt]:
229
        logging.warning("Inconsistent replies, not doing anything")
230
        return
231
      self.DoMaintenance(result.server_reply.answer)
232
    else:
233
      logging.warning("Confd query timed out, cannot do maintenance actions")
234

    
235

    
236
class WatcherState(object):
237
  """Interface to a state file recording restart attempts.
238

    
239
  """
240
  def __init__(self, statefile):
241
    """Open, lock, read and parse the file.
242

    
243
    @type statefile: file
244
    @param statefile: State file object
245

    
246
    """
247
    self.statefile = statefile
248

    
249
    try:
250
      state_data = self.statefile.read()
251
      if not state_data:
252
        self._data = {}
253
      else:
254
        self._data = serializer.Load(state_data)
255
    except Exception, msg: # pylint: disable-msg=W0703
256
      # Ignore errors while loading the file and treat it as empty
257
      self._data = {}
258
      logging.warning(("Invalid state file. Using defaults."
259
                       " Error message: %s"), msg)
260

    
261
    if "instance" not in self._data:
262
      self._data["instance"] = {}
263
    if "node" not in self._data:
264
      self._data["node"] = {}
265

    
266
    self._orig_data = serializer.Dump(self._data)
267

    
268
  def Save(self):
269
    """Save state to file, then unlock and close it.
270

    
271
    """
272
    assert self.statefile
273

    
274
    serialized_form = serializer.Dump(self._data)
275
    if self._orig_data == serialized_form:
276
      logging.debug("Data didn't change, just touching status file")
277
      os.utime(constants.WATCHER_STATEFILE, None)
278
      return
279

    
280
    # We need to make sure the file is locked before renaming it, otherwise
281
    # starting ganeti-watcher again at the same time will create a conflict.
282
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283
                         data=serialized_form,
284
                         prewrite=utils.LockFile, close=False)
285
    self.statefile = os.fdopen(fd, 'w+')
286

    
287
  def Close(self):
288
    """Unlock configuration file and close it.
289

    
290
    """
291
    assert self.statefile
292

    
293
    # Files are automatically unlocked when closing them
294
    self.statefile.close()
295
    self.statefile = None
296

    
297
  def GetNodeBootID(self, name):
298
    """Returns the last boot ID of a node or None.
299

    
300
    """
301
    ndata = self._data["node"]
302

    
303
    if name in ndata and KEY_BOOT_ID in ndata[name]:
304
      return ndata[name][KEY_BOOT_ID]
305
    return None
306

    
307
  def SetNodeBootID(self, name, bootid):
308
    """Sets the boot ID of a node.
309

    
310
    """
311
    assert bootid
312

    
313
    ndata = self._data["node"]
314

    
315
    if name not in ndata:
316
      ndata[name] = {}
317

    
318
    ndata[name][KEY_BOOT_ID] = bootid
319

    
320
  def NumberOfRestartAttempts(self, instance):
321
    """Returns number of previous restart attempts.
322

    
323
    @type instance: L{Instance}
324
    @param instance: the instance to look up
325

    
326
    """
327
    idata = self._data["instance"]
328

    
329
    if instance.name in idata:
330
      return idata[instance.name][KEY_RESTART_COUNT]
331

    
332
    return 0
333

    
334
  def MaintainInstanceList(self, instances):
335
    """Perform maintenance on the recorded instances.
336

    
337
    @type instances: list of string
338
    @param instances: the list of currently existing instances
339

    
340
    """
341
    idict = self._data["instance"]
342
    # First, delete obsolete instances
343
    obsolete_instances = set(idict).difference(instances)
344
    for inst in obsolete_instances:
345
      logging.debug("Forgetting obsolete instance %s", inst)
346
      del idict[inst]
347

    
348
    # Second, delete expired records
349
    earliest = time.time() - RETRY_EXPIRATION
350
    expired_instances = [i for i in idict
351
                         if idict[i][KEY_RESTART_WHEN] < earliest]
352
    for inst in expired_instances:
353
      logging.debug("Expiring record for instance %s", inst)
354
      del idict[inst]
355

    
356
  def RecordRestartAttempt(self, instance):
357
    """Record a restart attempt.
358

    
359
    @type instance: L{Instance}
360
    @param instance: the instance being restarted
361

    
362
    """
363
    idata = self._data["instance"]
364

    
365
    if instance.name not in idata:
366
      inst = idata[instance.name] = {}
367
    else:
368
      inst = idata[instance.name]
369

    
370
    inst[KEY_RESTART_WHEN] = time.time()
371
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
372

    
373
  def RemoveInstance(self, instance):
374
    """Update state to reflect that a machine is running.
375

    
376
    This method removes the record for a named instance (as we only
377
    track down instances).
378

    
379
    @type instance: L{Instance}
380
    @param instance: the instance to remove from books
381

    
382
    """
383
    idata = self._data["instance"]
384

    
385
    if instance.name in idata:
386
      del idata[instance.name]
387

    
388

    
389
class Instance(object):
390
  """Abstraction for a Virtual Machine instance.
391

    
392
  """
393
  def __init__(self, name, state, autostart):
394
    self.name = name
395
    self.state = state
396
    self.autostart = autostart
397

    
398
  def Restart(self):
399
    """Encapsulates the start of an instance.
400

    
401
    """
402
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
403
    cli.SubmitOpCode(op, cl=client)
404

    
405
  def ActivateDisks(self):
406
    """Encapsulates the activation of all disks of an instance.
407

    
408
    """
409
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
410
    cli.SubmitOpCode(op, cl=client)
411

    
412

    
413
def GetClusterData():
414
  """Get a list of instances on this cluster.
415

    
416
  """
417
  op1_fields = ["name", "status", "admin_state", "snodes"]
418
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
419
                                 use_locking=True)
420
  op2_fields = ["name", "bootid", "offline"]
421
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
422
                             use_locking=True)
423

    
424
  job_id = client.SubmitJob([op1, op2])
425

    
426
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427

    
428
  logging.debug("Got data from cluster, writing instance status file")
429

    
430
  result = all_results[0]
431
  smap = {}
432

    
433
  instances = {}
434

    
435
  # write the upfile
436
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
438

    
439
  for fields in result:
440
    (name, status, autostart, snodes) = fields
441

    
442
    # update the secondary node map
443
    for node in snodes:
444
      if node not in smap:
445
        smap[node] = []
446
      smap[node].append(name)
447

    
448
    instances[name] = Instance(name, status, autostart)
449

    
450
  nodes =  dict([(name, (bootid, offline))
451
                 for name, bootid, offline in all_results[1]])
452

    
453
  client.ArchiveJob(job_id)
454

    
455
  return instances, nodes, smap
456

    
457

    
458
class Watcher(object):
459
  """Encapsulate the logic for restarting erroneously halted virtual machines.
460

    
461
  The calling program should periodically instantiate me and call Run().
462
  This will traverse the list of instances, and make up to MAXTRIES attempts
463
  to restart machines that are down.
464

    
465
  """
466
  def __init__(self, opts, notepad):
467
    self.notepad = notepad
468
    master = client.QueryConfigValues(["master_node"])[0]
469
    if master != netutils.Hostname.GetSysName():
470
      raise NotMasterError("This is not the master node")
471
    # first archive old jobs
472
    self.ArchiveJobs(opts.job_age)
473
    # and only then submit new ones
474
    self.instances, self.bootids, self.smap = GetClusterData()
475
    self.started_instances = set()
476
    self.opts = opts
477

    
478
  def Run(self):
479
    """Watcher run sequence.
480

    
481
    """
482
    notepad = self.notepad
483
    self.CheckInstances(notepad)
484
    self.CheckDisks(notepad)
485
    self.VerifyDisks()
486

    
487
  @staticmethod
488
  def ArchiveJobs(age):
489
    """Archive old jobs.
490

    
491
    """
492
    arch_count, left_count = client.AutoArchiveJobs(age)
493
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
494

    
495
  def CheckDisks(self, notepad):
496
    """Check all nodes for restarted ones.
497

    
498
    """
499
    check_nodes = []
500
    for name, (new_id, offline) in self.bootids.iteritems():
501
      old = notepad.GetNodeBootID(name)
502
      if new_id is None:
503
        # Bad node, not returning a boot id
504
        if not offline:
505
          logging.debug("Node %s missing boot id, skipping secondary checks",
506
                        name)
507
        continue
508
      if old != new_id:
509
        # Node's boot ID has changed, proably through a reboot.
510
        check_nodes.append(name)
511

    
512
    if check_nodes:
513
      # Activate disks for all instances with any of the checked nodes as a
514
      # secondary node.
515
      for node in check_nodes:
516
        if node not in self.smap:
517
          continue
518
        for instance_name in self.smap[node]:
519
          instance = self.instances[instance_name]
520
          if not instance.autostart:
521
            logging.info(("Skipping disk activation for non-autostart"
522
                          " instance %s"), instance.name)
523
            continue
524
          if instance.name in self.started_instances:
525
            # we already tried to start the instance, which should have
526
            # activated its drives (if they can be at all)
527
            continue
528
          try:
529
            logging.info("Activating disks for instance %s", instance.name)
530
            instance.ActivateDisks()
531
          except Exception: # pylint: disable-msg=W0703
532
            logging.exception("Error while activating disks for instance %s",
533
                              instance.name)
534

    
535
      # Keep changed boot IDs
536
      for name in check_nodes:
537
        notepad.SetNodeBootID(name, self.bootids[name][0])
538

    
539
  def CheckInstances(self, notepad):
540
    """Make a pass over the list of instances, restarting downed ones.
541

    
542
    """
543
    notepad.MaintainInstanceList(self.instances.keys())
544

    
545
    for instance in self.instances.values():
546
      if instance.state in BAD_STATES:
547
        n = notepad.NumberOfRestartAttempts(instance)
548

    
549
        if n > MAXTRIES:
550
          logging.warning("Not restarting instance %s, retries exhausted",
551
                          instance.name)
552
          continue
553
        elif n < MAXTRIES:
554
          last = " (Attempt #%d)" % (n + 1)
555
        else:
556
          notepad.RecordRestartAttempt(instance)
557
          logging.error("Could not restart %s after %d attempts, giving up",
558
                        instance.name, MAXTRIES)
559
          continue
560
        try:
561
          logging.info("Restarting %s%s",
562
                        instance.name, last)
563
          instance.Restart()
564
          self.started_instances.add(instance.name)
565
        except Exception: # pylint: disable-msg=W0703
566
          logging.exception("Error while restarting instance %s",
567
                            instance.name)
568

    
569
        notepad.RecordRestartAttempt(instance)
570
      elif instance.state in HELPLESS_STATES:
571
        if notepad.NumberOfRestartAttempts(instance):
572
          notepad.RemoveInstance(instance)
573
      else:
574
        if notepad.NumberOfRestartAttempts(instance):
575
          notepad.RemoveInstance(instance)
576
          logging.info("Restart of %s succeeded", instance.name)
577

    
578
  @staticmethod
579
  def VerifyDisks():
580
    """Run gnt-cluster verify-disks.
581

    
582
    """
583
    op = opcodes.OpVerifyDisks()
584
    job_id = client.SubmitJob([op])
585
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586
    client.ArchiveJob(job_id)
587
    if not isinstance(result, (tuple, list)):
588
      logging.error("Can't get a valid result from verify-disks")
589
      return
590
    offline_disk_instances = result[2]
591
    if not offline_disk_instances:
592
      # nothing to do
593
      return
594
    logging.debug("Will activate disks for instances %s",
595
                  utils.CommaJoin(offline_disk_instances))
596
    # we submit only one job, and wait for it. not optimal, but spams
597
    # less the job queue
598
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
599
           for name in offline_disk_instances]
600
    job_id = cli.SendJob(job, cl=client)
601

    
602
    try:
603
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604
    except Exception: # pylint: disable-msg=W0703
605
      logging.exception("Error while activating disks")
606

    
607

    
608
def OpenStateFile(path):
609
  """Opens the state file and acquires a lock on it.
610

    
611
  @type path: string
612
  @param path: Path to state file
613

    
614
  """
615
  # The two-step dance below is necessary to allow both opening existing
616
  # file read/write and creating if not existing. Vanilla open will truncate
617
  # an existing file -or- allow creating if not existing.
618
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
619

    
620
  # Try to acquire lock on state file. If this fails, another watcher instance
621
  # might already be running or another program is temporarily blocking the
622
  # watcher from running.
623
  try:
624
    utils.LockFile(statefile_fd)
625
  except errors.LockError, err:
626
    logging.error("Can't acquire lock on state file %s: %s", path, err)
627
    return None
628

    
629
  return os.fdopen(statefile_fd, "w+")
630

    
631

    
632
def IsRapiResponding(hostname):
633
  """Connects to RAPI port and does a simple test.
634

    
635
  Connects to RAPI port of hostname and does a simple test. At this time, the
636
  test is GetVersion.
637

    
638
  @type hostname: string
639
  @param hostname: hostname of the node to connect to.
640
  @rtype: bool
641
  @return: Whether RAPI is working properly
642

    
643
  """
644
  curl_config = rapi.client.GenericCurlConfig(cafile=constants.RAPI_CERT_FILE)
645
  rapi_client = rapi.client.GanetiRapiClient(hostname,
646
                                             curl_config_fn=curl_config)
647
  try:
648
    master_version = rapi_client.GetVersion()
649
  except rapi.client.CertificateError, err:
650
    logging.warning("RAPI Error: CertificateError (%s)", err)
651
    return False
652
  except rapi.client.GanetiApiError, err:
653
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
654
    return False
655
  logging.debug("RAPI Result: master_version is %s", master_version)
656
  return master_version == constants.RAPI_VERSION
657

    
658

    
659
def ParseOptions():
660
  """Parse the command line options.
661

    
662
  @return: (options, args) as from OptionParser.parse_args()
663

    
664
  """
665
  parser = OptionParser(description="Ganeti cluster watcher",
666
                        usage="%prog [-d]",
667
                        version="%%prog (ganeti) %s" %
668
                        constants.RELEASE_VERSION)
669

    
670
  parser.add_option(cli.DEBUG_OPT)
671
  parser.add_option("-A", "--job-age", dest="job_age",
672
                    help="Autoarchive jobs older than this age (default"
673
                    " 6 hours)", default=6*3600)
674
  options, args = parser.parse_args()
675
  options.job_age = cli.ParseTimespec(options.job_age)
676
  return options, args
677

    
678

    
679
@rapi.client.UsesRapiClient
680
def main():
681
  """Main function.
682

    
683
  """
684
  global client # pylint: disable-msg=W0603
685

    
686
  options, args = ParseOptions()
687

    
688
  if args: # watcher doesn't take any arguments
689
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
690
    sys.exit(constants.EXIT_FAILURE)
691

    
692
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
693
                     stderr_logging=options.debug)
694

    
695
  if ShouldPause():
696
    logging.debug("Pause has been set, exiting")
697
    sys.exit(constants.EXIT_SUCCESS)
698

    
699
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
700
  if not statefile:
701
    sys.exit(constants.EXIT_FAILURE)
702

    
703
  update_file = False
704
  try:
705
    StartNodeDaemons()
706
    RunWatcherHooks()
707
    # run node maintenance in all cases, even if master, so that old
708
    # masters can be properly cleaned up too
709
    if NodeMaintenance.ShouldRun():
710
      NodeMaintenance().Exec()
711

    
712
    notepad = WatcherState(statefile)
713
    try:
714
      try:
715
        client = cli.GetClient()
716
      except errors.OpPrereqError:
717
        # this is, from cli.GetClient, a not-master case
718
        logging.debug("Not on master, exiting")
719
        update_file = True
720
        sys.exit(constants.EXIT_SUCCESS)
721
      except luxi.NoMasterError, err:
722
        logging.warning("Master seems to be down (%s), trying to restart",
723
                        str(err))
724
        if not utils.EnsureDaemon(constants.MASTERD):
725
          logging.critical("Can't start the master, exiting")
726
          sys.exit(constants.EXIT_FAILURE)
727
        # else retry the connection
728
        client = cli.GetClient()
729

    
730
      # we are on master now
731
      utils.EnsureDaemon(constants.RAPI)
732

    
733
      # If RAPI isn't responding to queries, try one restart.
734
      logging.debug("Attempting to talk with RAPI.")
735
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
736
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
737
                        " Restarting Ganeti RAPI.")
738
        utils.StopDaemon(constants.RAPI)
739
        utils.EnsureDaemon(constants.RAPI)
740
        logging.debug("Second attempt to talk with RAPI")
741
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
742
          logging.fatal("RAPI is not responding. Please investigate.")
743
      logging.debug("Successfully talked to RAPI.")
744

    
745
      try:
746
        watcher = Watcher(options, notepad)
747
      except errors.ConfigurationError:
748
        # Just exit if there's no configuration
749
        update_file = True
750
        sys.exit(constants.EXIT_SUCCESS)
751

    
752
      watcher.Run()
753
      update_file = True
754

    
755
    finally:
756
      if update_file:
757
        notepad.Save()
758
      else:
759
        logging.debug("Not updating status file due to failure")
760
  except SystemExit:
761
    raise
762
  except NotMasterError:
763
    logging.debug("Not master, exiting")
764
    sys.exit(constants.EXIT_NOTMASTER)
765
  except errors.ResolverError, err:
766
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
767
    sys.exit(constants.EXIT_NODESETUP_ERROR)
768
  except errors.JobQueueFull:
769
    logging.error("Job queue is full, can't query cluster state")
770
  except errors.JobQueueDrainError:
771
    logging.error("Job queue is drained, can't maintain cluster state")
772
  except Exception, err:
773
    logging.exception(str(err))
774
    sys.exit(constants.EXIT_FAILURE)
775

    
776

    
777
if __name__ == '__main__':
778
  main()