Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 83e5e26f

History | View | Annotate | Download (24.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import compat
43
from ganeti import serializer
44
from ganeti import errors
45
from ganeti import opcodes
46
from ganeti import cli
47
from ganeti import luxi
48
from ganeti import ssconf
49
from ganeti import bdev
50
from ganeti import hypervisor
51
from ganeti import rapi
52
from ganeti.confd import client as confd_client
53
from ganeti import netutils
54

    
55
import ganeti.rapi.client # pylint: disable-msg=W0611
56

    
57

    
58
MAXTRIES = 5
59
# Delete any record that is older than 8 hours; this value is based on
60
# the fact that the current retry counter is 5, and watcher runs every
61
# 5 minutes, so it takes around half an hour to exceed the retry
62
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
63
RETRY_EXPIRATION = 8 * 3600
64
BAD_STATES = ['ERROR_down']
65
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
66
NOTICE = 'NOTICE'
67
ERROR = 'ERROR'
68
KEY_RESTART_COUNT = "restart_count"
69
KEY_RESTART_WHEN = "restart_when"
70
KEY_BOOT_ID = "bootid"
71

    
72

    
73
# Global client object
74
client = None
75

    
76

    
77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
79

    
80

    
81
def ShouldPause():
82
  """Check whether we should pause.
83

84
  """
85
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86

    
87

    
88
def StartNodeDaemons():
89
  """Start all the daemons that should be running on all nodes.
90

91
  """
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  utils.EnsureDaemon(constants.CONFD)
96

    
97

    
98
def RunWatcherHooks():
99
  """Run the watcher hooks.
100

101
  """
102
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103
                             constants.HOOKS_NAME_WATCHER)
104
  if not os.path.isdir(hooks_dir):
105
    return
106

    
107
  try:
108
    results = utils.RunParts(hooks_dir)
109
  except Exception, msg: # pylint: disable-msg=W0703
110
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
111

    
112
  for (relname, status, runresult) in results:
113
    if status == constants.RUNPARTS_SKIP:
114
      logging.debug("Watcher hook %s: skipped", relname)
115
    elif status == constants.RUNPARTS_ERR:
116
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
117
    elif status == constants.RUNPARTS_RUN:
118
      if runresult.failed:
119
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
120
                        relname, runresult.exit_code, runresult.output)
121
      else:
122
        logging.debug("Watcher hook %s: success (output: %s)", relname,
123
                      runresult.output)
124

    
125

    
126
class NodeMaintenance(object):
127
  """Talks to confd daemons and possible shutdown instances/drbd devices.
128

129
  """
130
  def __init__(self):
131
    self.store_cb = confd_client.StoreResultCallback()
132
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
133
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
134

    
135
  @staticmethod
136
  def ShouldRun():
137
    """Checks whether node maintenance should run.
138

139
    """
140
    try:
141
      return ssconf.SimpleStore().GetMaintainNodeHealth()
142
    except errors.ConfigurationError, err:
143
      logging.error("Configuration error, not activating node maintenance: %s",
144
                    err)
145
      return False
146

    
147
  @staticmethod
148
  def GetRunningInstances():
149
    """Compute list of hypervisor/running instances.
150

151
    """
152
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
153
    results = []
154
    for hv_name in hyp_list:
155
      try:
156
        hv = hypervisor.GetHypervisor(hv_name)
157
        ilist = hv.ListInstances()
158
        results.extend([(iname, hv_name) for iname in ilist])
159
      except: # pylint: disable-msg=W0702
160
        logging.error("Error while listing instances for hypervisor %s",
161
                      hv_name, exc_info=True)
162
    return results
163

    
164
  @staticmethod
165
  def GetUsedDRBDs():
166
    """Get list of used DRBD minors.
167

168
    """
169
    return bdev.DRBD8.GetUsedDevs().keys()
170

    
171
  @classmethod
172
  def DoMaintenance(cls, role):
173
    """Maintain the instance list.
174

175
    """
176
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
177
      inst_running = cls.GetRunningInstances()
178
      cls.ShutdownInstances(inst_running)
179
      drbd_running = cls.GetUsedDRBDs()
180
      cls.ShutdownDRBD(drbd_running)
181
    else:
182
      logging.debug("Not doing anything for role %s", role)
183

    
184
  @staticmethod
185
  def ShutdownInstances(inst_running):
186
    """Shutdown running instances.
187

188
    """
189
    names_running = set([i[0] for i in inst_running])
190
    if names_running:
191
      logging.info("Following instances should not be running,"
192
                   " shutting them down: %s", utils.CommaJoin(names_running))
193
      # this dictionary will collapse duplicate instance names (only
194
      # xen pvm/vhm) into a single key, which is fine
195
      i2h = dict(inst_running)
196
      for name in names_running:
197
        hv_name = i2h[name]
198
        hv = hypervisor.GetHypervisor(hv_name)
199
        hv.StopInstance(None, force=True, name=name)
200

    
201
  @staticmethod
202
  def ShutdownDRBD(drbd_running):
203
    """Shutdown active DRBD devices.
204

205
    """
206
    if drbd_running:
207
      logging.info("Following DRBD minors should not be active,"
208
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
209
      for minor in drbd_running:
210
        # pylint: disable-msg=W0212
211
        # using the private method as is, pending enhancements to the DRBD
212
        # interface
213
        bdev.DRBD8._ShutdownAll(minor)
214

    
215
  def Exec(self):
216
    """Check node status versus cluster desired state.
217

218
    """
219
    my_name = netutils.Hostname.GetSysName()
220
    req = confd_client.ConfdClientRequest(type=
221
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
222
                                          query=my_name)
223
    self.confd_client.SendRequest(req, async=False, coverage=-1)
224
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
225
    if not timed_out:
226
      # should have a valid response
227
      status, result = self.store_cb.GetResponse(req.rsalt)
228
      assert status, "Missing result but received replies"
229
      if not self.filter_cb.consistent[req.rsalt]:
230
        logging.warning("Inconsistent replies, not doing anything")
231
        return
232
      self.DoMaintenance(result.server_reply.answer)
233
    else:
234
      logging.warning("Confd query timed out, cannot do maintenance actions")
235

    
236

    
237
class WatcherState(object):
238
  """Interface to a state file recording restart attempts.
239

240
  """
241
  def __init__(self, statefile):
242
    """Open, lock, read and parse the file.
243

244
    @type statefile: file
245
    @param statefile: State file object
246

247
    """
248
    self.statefile = statefile
249

    
250
    try:
251
      state_data = self.statefile.read()
252
      if not state_data:
253
        self._data = {}
254
      else:
255
        self._data = serializer.Load(state_data)
256
    except Exception, msg: # pylint: disable-msg=W0703
257
      # Ignore errors while loading the file and treat it as empty
258
      self._data = {}
259
      logging.warning(("Invalid state file. Using defaults."
260
                       " Error message: %s"), msg)
261

    
262
    if "instance" not in self._data:
263
      self._data["instance"] = {}
264
    if "node" not in self._data:
265
      self._data["node"] = {}
266

    
267
    self._orig_data = serializer.Dump(self._data)
268

    
269
  def Save(self):
270
    """Save state to file, then unlock and close it.
271

272
    """
273
    assert self.statefile
274

    
275
    serialized_form = serializer.Dump(self._data)
276
    if self._orig_data == serialized_form:
277
      logging.debug("Data didn't change, just touching status file")
278
      os.utime(constants.WATCHER_STATEFILE, None)
279
      return
280

    
281
    # We need to make sure the file is locked before renaming it, otherwise
282
    # starting ganeti-watcher again at the same time will create a conflict.
283
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
284
                         data=serialized_form,
285
                         prewrite=utils.LockFile, close=False)
286
    self.statefile = os.fdopen(fd, 'w+')
287

    
288
  def Close(self):
289
    """Unlock configuration file and close it.
290

291
    """
292
    assert self.statefile
293

    
294
    # Files are automatically unlocked when closing them
295
    self.statefile.close()
296
    self.statefile = None
297

    
298
  def GetNodeBootID(self, name):
299
    """Returns the last boot ID of a node or None.
300

301
    """
302
    ndata = self._data["node"]
303

    
304
    if name in ndata and KEY_BOOT_ID in ndata[name]:
305
      return ndata[name][KEY_BOOT_ID]
306
    return None
307

    
308
  def SetNodeBootID(self, name, bootid):
309
    """Sets the boot ID of a node.
310

311
    """
312
    assert bootid
313

    
314
    ndata = self._data["node"]
315

    
316
    if name not in ndata:
317
      ndata[name] = {}
318

    
319
    ndata[name][KEY_BOOT_ID] = bootid
320

    
321
  def NumberOfRestartAttempts(self, instance):
322
    """Returns number of previous restart attempts.
323

324
    @type instance: L{Instance}
325
    @param instance: the instance to look up
326

327
    """
328
    idata = self._data["instance"]
329

    
330
    if instance.name in idata:
331
      return idata[instance.name][KEY_RESTART_COUNT]
332

    
333
    return 0
334

    
335
  def MaintainInstanceList(self, instances):
336
    """Perform maintenance on the recorded instances.
337

338
    @type instances: list of string
339
    @param instances: the list of currently existing instances
340

341
    """
342
    idict = self._data["instance"]
343
    # First, delete obsolete instances
344
    obsolete_instances = set(idict).difference(instances)
345
    for inst in obsolete_instances:
346
      logging.debug("Forgetting obsolete instance %s", inst)
347
      del idict[inst]
348

    
349
    # Second, delete expired records
350
    earliest = time.time() - RETRY_EXPIRATION
351
    expired_instances = [i for i in idict
352
                         if idict[i][KEY_RESTART_WHEN] < earliest]
353
    for inst in expired_instances:
354
      logging.debug("Expiring record for instance %s", inst)
355
      del idict[inst]
356

    
357
  def RecordRestartAttempt(self, instance):
358
    """Record a restart attempt.
359

360
    @type instance: L{Instance}
361
    @param instance: the instance being restarted
362

363
    """
364
    idata = self._data["instance"]
365

    
366
    if instance.name not in idata:
367
      inst = idata[instance.name] = {}
368
    else:
369
      inst = idata[instance.name]
370

    
371
    inst[KEY_RESTART_WHEN] = time.time()
372
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
373

    
374
  def RemoveInstance(self, instance):
375
    """Update state to reflect that a machine is running.
376

377
    This method removes the record for a named instance (as we only
378
    track down instances).
379

380
    @type instance: L{Instance}
381
    @param instance: the instance to remove from books
382

383
    """
384
    idata = self._data["instance"]
385

    
386
    if instance.name in idata:
387
      del idata[instance.name]
388

    
389

    
390
class Instance(object):
391
  """Abstraction for a Virtual Machine instance.
392

393
  """
394
  def __init__(self, name, state, autostart, snodes):
395
    self.name = name
396
    self.state = state
397
    self.autostart = autostart
398
    self.snodes = snodes
399

    
400
  def Restart(self):
401
    """Encapsulates the start of an instance.
402

403
    """
404
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
405
    cli.SubmitOpCode(op, cl=client)
406

    
407
  def ActivateDisks(self):
408
    """Encapsulates the activation of all disks of an instance.
409

410
    """
411
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
412
    cli.SubmitOpCode(op, cl=client)
413

    
414

    
415
def GetClusterData():
416
  """Get a list of instances on this cluster.
417

418
  """
419
  op1_fields = ["name", "status", "admin_state", "snodes"]
420
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
421
                                use_locking=True)
422
  op2_fields = ["name", "bootid", "offline"]
423
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
424
                            use_locking=True)
425

    
426
  job_id = client.SubmitJob([op1, op2])
427

    
428
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
429

    
430
  logging.debug("Got data from cluster, writing instance status file")
431

    
432
  result = all_results[0]
433
  smap = {}
434

    
435
  instances = {}
436

    
437
  # write the upfile
438
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
439
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
440

    
441
  for fields in result:
442
    (name, status, autostart, snodes) = fields
443

    
444
    # update the secondary node map
445
    for node in snodes:
446
      if node not in smap:
447
        smap[node] = []
448
      smap[node].append(name)
449

    
450
    instances[name] = Instance(name, status, autostart, snodes)
451

    
452
  nodes =  dict([(name, (bootid, offline))
453
                 for name, bootid, offline in all_results[1]])
454

    
455
  client.ArchiveJob(job_id)
456

    
457
  return instances, nodes, smap
458

    
459

    
460
class Watcher(object):
461
  """Encapsulate the logic for restarting erroneously halted virtual machines.
462

463
  The calling program should periodically instantiate me and call Run().
464
  This will traverse the list of instances, and make up to MAXTRIES attempts
465
  to restart machines that are down.
466

467
  """
468
  def __init__(self, opts, notepad):
469
    self.notepad = notepad
470
    master = client.QueryConfigValues(["master_node"])[0]
471
    if master != netutils.Hostname.GetSysName():
472
      raise NotMasterError("This is not the master node")
473
    # first archive old jobs
474
    self.ArchiveJobs(opts.job_age)
475
    # and only then submit new ones
476
    self.instances, self.bootids, self.smap = GetClusterData()
477
    self.started_instances = set()
478
    self.opts = opts
479

    
480
  def Run(self):
481
    """Watcher run sequence.
482

483
    """
484
    notepad = self.notepad
485
    self.CheckInstances(notepad)
486
    self.CheckDisks(notepad)
487
    self.VerifyDisks()
488

    
489
  @staticmethod
490
  def ArchiveJobs(age):
491
    """Archive old jobs.
492

493
    """
494
    arch_count, left_count = client.AutoArchiveJobs(age)
495
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
496

    
497
  def CheckDisks(self, notepad):
498
    """Check all nodes for restarted ones.
499

500
    """
501
    check_nodes = []
502
    for name, (new_id, offline) in self.bootids.iteritems():
503
      old = notepad.GetNodeBootID(name)
504
      if new_id is None:
505
        # Bad node, not returning a boot id
506
        if not offline:
507
          logging.debug("Node %s missing boot id, skipping secondary checks",
508
                        name)
509
        continue
510
      if old != new_id:
511
        # Node's boot ID has changed, proably through a reboot.
512
        check_nodes.append(name)
513

    
514
    if check_nodes:
515
      # Activate disks for all instances with any of the checked nodes as a
516
      # secondary node.
517
      for node in check_nodes:
518
        if node not in self.smap:
519
          continue
520
        for instance_name in self.smap[node]:
521
          instance = self.instances[instance_name]
522
          if not instance.autostart:
523
            logging.info(("Skipping disk activation for non-autostart"
524
                          " instance %s"), instance.name)
525
            continue
526
          if instance.name in self.started_instances:
527
            # we already tried to start the instance, which should have
528
            # activated its drives (if they can be at all)
529
            continue
530
          try:
531
            logging.info("Activating disks for instance %s", instance.name)
532
            instance.ActivateDisks()
533
          except Exception: # pylint: disable-msg=W0703
534
            logging.exception("Error while activating disks for instance %s",
535
                              instance.name)
536

    
537
      # Keep changed boot IDs
538
      for name in check_nodes:
539
        notepad.SetNodeBootID(name, self.bootids[name][0])
540

    
541
  def CheckInstances(self, notepad):
542
    """Make a pass over the list of instances, restarting downed ones.
543

544
    """
545
    notepad.MaintainInstanceList(self.instances.keys())
546

    
547
    for instance in self.instances.values():
548
      if instance.state in BAD_STATES:
549
        n = notepad.NumberOfRestartAttempts(instance)
550

    
551
        if n > MAXTRIES:
552
          logging.warning("Not restarting instance %s, retries exhausted",
553
                          instance.name)
554
          continue
555
        elif n < MAXTRIES:
556
          last = " (Attempt #%d)" % (n + 1)
557
        else:
558
          notepad.RecordRestartAttempt(instance)
559
          logging.error("Could not restart %s after %d attempts, giving up",
560
                        instance.name, MAXTRIES)
561
          continue
562
        try:
563
          logging.info("Restarting %s%s",
564
                        instance.name, last)
565
          instance.Restart()
566
          self.started_instances.add(instance.name)
567
        except Exception: # pylint: disable-msg=W0703
568
          logging.exception("Error while restarting instance %s",
569
                            instance.name)
570

    
571
        notepad.RecordRestartAttempt(instance)
572
      elif instance.state in HELPLESS_STATES:
573
        if notepad.NumberOfRestartAttempts(instance):
574
          notepad.RemoveInstance(instance)
575
      else:
576
        if notepad.NumberOfRestartAttempts(instance):
577
          notepad.RemoveInstance(instance)
578
          logging.info("Restart of %s succeeded", instance.name)
579

    
580
  def _CheckForOfflineNodes(self, instance):
581
    """Checks if given instances has any secondary in offline status.
582

583
    @param instance: The instance object
584
    @return: True if any of the secondary is offline, False otherwise
585

586
    """
587
    bootids = []
588
    for node in instance.snodes:
589
      bootids.append(self.bootids[node])
590

    
591
    return compat.any(offline for (_, offline) in bootids)
592

    
593
  def VerifyDisks(self):
594
    """Run gnt-cluster verify-disks.
595

596
    """
597
    op = opcodes.OpClusterVerifyDisks()
598
    job_id = client.SubmitJob([op])
599
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
600
    client.ArchiveJob(job_id)
601
    if not isinstance(result, (tuple, list)):
602
      logging.error("Can't get a valid result from verify-disks")
603
      return
604
    offline_disk_instances = result[1]
605
    if not offline_disk_instances:
606
      # nothing to do
607
      return
608
    logging.debug("Will activate disks for instances %s",
609
                  utils.CommaJoin(offline_disk_instances))
610
    # we submit only one job, and wait for it. not optimal, but spams
611
    # less the job queue
612
    job = []
613
    for name in offline_disk_instances:
614
      instance = self.instances[name]
615
      if (instance.state in HELPLESS_STATES or
616
          self._CheckForOfflineNodes(instance)):
617
        logging.info("Skip instance %s because it is in helpless state or has"
618
                     " one offline secondary", name)
619
        continue
620
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
621

    
622
    if job:
623
      job_id = cli.SendJob(job, cl=client)
624

    
625
      try:
626
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
627
      except Exception: # pylint: disable-msg=W0703
628
        logging.exception("Error while activating disks")
629

    
630

    
631
def OpenStateFile(path):
632
  """Opens the state file and acquires a lock on it.
633

634
  @type path: string
635
  @param path: Path to state file
636

637
  """
638
  # The two-step dance below is necessary to allow both opening existing
639
  # file read/write and creating if not existing. Vanilla open will truncate
640
  # an existing file -or- allow creating if not existing.
641
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
642

    
643
  # Try to acquire lock on state file. If this fails, another watcher instance
644
  # might already be running or another program is temporarily blocking the
645
  # watcher from running.
646
  try:
647
    utils.LockFile(statefile_fd)
648
  except errors.LockError, err:
649
    logging.error("Can't acquire lock on state file %s: %s", path, err)
650
    return None
651

    
652
  return os.fdopen(statefile_fd, "w+")
653

    
654

    
655
def IsRapiResponding(hostname):
656
  """Connects to RAPI port and does a simple test.
657

658
  Connects to RAPI port of hostname and does a simple test. At this time, the
659
  test is GetVersion.
660

661
  @type hostname: string
662
  @param hostname: hostname of the node to connect to.
663
  @rtype: bool
664
  @return: Whether RAPI is working properly
665

666
  """
667
  curl_config = rapi.client.GenericCurlConfig()
668
  rapi_client = rapi.client.GanetiRapiClient(hostname,
669
                                             curl_config_fn=curl_config)
670
  try:
671
    master_version = rapi_client.GetVersion()
672
  except rapi.client.CertificateError, err:
673
    logging.warning("RAPI Error: CertificateError (%s)", err)
674
    return False
675
  except rapi.client.GanetiApiError, err:
676
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
677
    return False
678
  logging.debug("RAPI Result: master_version is %s", master_version)
679
  return master_version == constants.RAPI_VERSION
680

    
681

    
682
def ParseOptions():
683
  """Parse the command line options.
684

685
  @return: (options, args) as from OptionParser.parse_args()
686

687
  """
688
  parser = OptionParser(description="Ganeti cluster watcher",
689
                        usage="%prog [-d]",
690
                        version="%%prog (ganeti) %s" %
691
                        constants.RELEASE_VERSION)
692

    
693
  parser.add_option(cli.DEBUG_OPT)
694
  parser.add_option("-A", "--job-age", dest="job_age",
695
                    help="Autoarchive jobs older than this age (default"
696
                    " 6 hours)", default=6*3600)
697
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
698
                    action="store_true", help="Ignore cluster pause setting")
699
  options, args = parser.parse_args()
700
  options.job_age = cli.ParseTimespec(options.job_age)
701
  return options, args
702

    
703

    
704
@rapi.client.UsesRapiClient
705
def Main():
706
  """Main function.
707

708
  """
709
  global client # pylint: disable-msg=W0603
710

    
711
  options, args = ParseOptions()
712

    
713
  if args: # watcher doesn't take any arguments
714
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
715
    return constants.EXIT_FAILURE
716

    
717
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
718
                     stderr_logging=options.debug)
719

    
720
  if ShouldPause() and not options.ignore_pause:
721
    logging.debug("Pause has been set, exiting")
722
    return constants.EXIT_SUCCESS
723

    
724
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
725
  if not statefile:
726
    return constants.EXIT_FAILURE
727

    
728
  update_file = False
729
  try:
730
    StartNodeDaemons()
731
    RunWatcherHooks()
732
    # run node maintenance in all cases, even if master, so that old
733
    # masters can be properly cleaned up too
734
    if NodeMaintenance.ShouldRun():
735
      NodeMaintenance().Exec()
736

    
737
    notepad = WatcherState(statefile)
738
    try:
739
      try:
740
        client = cli.GetClient()
741
      except errors.OpPrereqError:
742
        # this is, from cli.GetClient, a not-master case
743
        logging.debug("Not on master, exiting")
744
        update_file = True
745
        return constants.EXIT_SUCCESS
746
      except luxi.NoMasterError, err:
747
        logging.warning("Master seems to be down (%s), trying to restart",
748
                        str(err))
749
        if not utils.EnsureDaemon(constants.MASTERD):
750
          logging.critical("Can't start the master, exiting")
751
          return constants.EXIT_FAILURE
752
        # else retry the connection
753
        client = cli.GetClient()
754

    
755
      # we are on master now
756
      utils.EnsureDaemon(constants.RAPI)
757

    
758
      # If RAPI isn't responding to queries, try one restart.
759
      logging.debug("Attempting to talk with RAPI.")
760
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
761
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
762
                        " Restarting Ganeti RAPI.")
763
        utils.StopDaemon(constants.RAPI)
764
        utils.EnsureDaemon(constants.RAPI)
765
        logging.debug("Second attempt to talk with RAPI")
766
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
767
          logging.fatal("RAPI is not responding. Please investigate.")
768
      logging.debug("Successfully talked to RAPI.")
769

    
770
      try:
771
        watcher = Watcher(options, notepad)
772
      except errors.ConfigurationError:
773
        # Just exit if there's no configuration
774
        update_file = True
775
        return constants.EXIT_SUCCESS
776

    
777
      watcher.Run()
778
      update_file = True
779

    
780
    finally:
781
      if update_file:
782
        notepad.Save()
783
      else:
784
        logging.debug("Not updating status file due to failure")
785
  except SystemExit:
786
    raise
787
  except NotMasterError:
788
    logging.debug("Not master, exiting")
789
    return constants.EXIT_NOTMASTER
790
  except errors.ResolverError, err:
791
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
792
    return constants.EXIT_NODESETUP_ERROR
793
  except errors.JobQueueFull:
794
    logging.error("Job queue is full, can't query cluster state")
795
  except errors.JobQueueDrainError:
796
    logging.error("Job queue is drained, can't maintain cluster state")
797
  except Exception, err:
798
    logging.exception(str(err))
799
    return constants.EXIT_FAILURE
800

    
801
  return constants.EXIT_SUCCESS