Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 61a980a9

History | View | Annotate | Download (24.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import os.path
36
import sys
37
import time
38
import logging
39
from optparse import OptionParser
40

    
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import serializer
45
from ganeti import errors
46
from ganeti import opcodes
47
from ganeti import cli
48
from ganeti import luxi
49
from ganeti import ssconf
50
from ganeti import bdev
51
from ganeti import hypervisor
52
from ganeti import rapi
53
from ganeti.confd import client as confd_client
54
from ganeti import netutils
55

    
56
import ganeti.rapi.client # pylint: disable-msg=W0611
57

    
58

    
59
MAXTRIES = 5
60
# Delete any record that is older than 8 hours; this value is based on
61
# the fact that the current retry counter is 5, and watcher runs every
62
# 5 minutes, so it takes around half an hour to exceed the retry
63
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64
RETRY_EXPIRATION = 8 * 3600
65
BAD_STATES = [constants.INSTST_ERRORDOWN]
66
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
67
NOTICE = 'NOTICE'
68
ERROR = 'ERROR'
69
KEY_RESTART_COUNT = "restart_count"
70
KEY_RESTART_WHEN = "restart_when"
71
KEY_BOOT_ID = "bootid"
72

    
73

    
74
# Global client object
75
client = None
76

    
77

    
78
class NotMasterError(errors.GenericError):
79
  """Exception raised when this host is not the master."""
80

    
81

    
82
def ShouldPause():
83
  """Check whether we should pause.
84

85
  """
86
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87

    
88

    
89
def StartNodeDaemons():
90
  """Start all the daemons that should be running on all nodes.
91

92
  """
93
  # on master or not, try to start the node daemon
94
  utils.EnsureDaemon(constants.NODED)
95
  # start confd as well. On non candidates it will be in disabled mode.
96
  utils.EnsureDaemon(constants.CONFD)
97

    
98

    
99
def RunWatcherHooks():
100
  """Run the watcher hooks.
101

102
  """
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104
                             constants.HOOKS_NAME_WATCHER)
105
  if not os.path.isdir(hooks_dir):
106
    return
107

    
108
  try:
109
    results = utils.RunParts(hooks_dir)
110
  except Exception, msg: # pylint: disable-msg=W0703
111
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
112

    
113
  for (relname, status, runresult) in results:
114
    if status == constants.RUNPARTS_SKIP:
115
      logging.debug("Watcher hook %s: skipped", relname)
116
    elif status == constants.RUNPARTS_ERR:
117
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118
    elif status == constants.RUNPARTS_RUN:
119
      if runresult.failed:
120
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121
                        relname, runresult.exit_code, runresult.output)
122
      else:
123
        logging.debug("Watcher hook %s: success (output: %s)", relname,
124
                      runresult.output)
125

    
126

    
127
class NodeMaintenance(object):
128
  """Talks to confd daemons and possible shutdown instances/drbd devices.
129

130
  """
131
  def __init__(self):
132
    self.store_cb = confd_client.StoreResultCallback()
133
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
134
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
135

    
136
  @staticmethod
137
  def ShouldRun():
138
    """Checks whether node maintenance should run.
139

140
    """
141
    try:
142
      return ssconf.SimpleStore().GetMaintainNodeHealth()
143
    except errors.ConfigurationError, err:
144
      logging.error("Configuration error, not activating node maintenance: %s",
145
                    err)
146
      return False
147

    
148
  @staticmethod
149
  def GetRunningInstances():
150
    """Compute list of hypervisor/running instances.
151

152
    """
153
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
154
    results = []
155
    for hv_name in hyp_list:
156
      try:
157
        hv = hypervisor.GetHypervisor(hv_name)
158
        ilist = hv.ListInstances()
159
        results.extend([(iname, hv_name) for iname in ilist])
160
      except: # pylint: disable-msg=W0702
161
        logging.error("Error while listing instances for hypervisor %s",
162
                      hv_name, exc_info=True)
163
    return results
164

    
165
  @staticmethod
166
  def GetUsedDRBDs():
167
    """Get list of used DRBD minors.
168

169
    """
170
    return bdev.DRBD8.GetUsedDevs().keys()
171

    
172
  @classmethod
173
  def DoMaintenance(cls, role):
174
    """Maintain the instance list.
175

176
    """
177
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
178
      inst_running = cls.GetRunningInstances()
179
      cls.ShutdownInstances(inst_running)
180
      drbd_running = cls.GetUsedDRBDs()
181
      cls.ShutdownDRBD(drbd_running)
182
    else:
183
      logging.debug("Not doing anything for role %s", role)
184

    
185
  @staticmethod
186
  def ShutdownInstances(inst_running):
187
    """Shutdown running instances.
188

189
    """
190
    names_running = set([i[0] for i in inst_running])
191
    if names_running:
192
      logging.info("Following instances should not be running,"
193
                   " shutting them down: %s", utils.CommaJoin(names_running))
194
      # this dictionary will collapse duplicate instance names (only
195
      # xen pvm/vhm) into a single key, which is fine
196
      i2h = dict(inst_running)
197
      for name in names_running:
198
        hv_name = i2h[name]
199
        hv = hypervisor.GetHypervisor(hv_name)
200
        hv.StopInstance(None, force=True, name=name)
201

    
202
  @staticmethod
203
  def ShutdownDRBD(drbd_running):
204
    """Shutdown active DRBD devices.
205

206
    """
207
    if drbd_running:
208
      logging.info("Following DRBD minors should not be active,"
209
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
210
      for minor in drbd_running:
211
        # pylint: disable-msg=W0212
212
        # using the private method as is, pending enhancements to the DRBD
213
        # interface
214
        bdev.DRBD8._ShutdownAll(minor)
215

    
216
  def Exec(self):
217
    """Check node status versus cluster desired state.
218

219
    """
220
    my_name = netutils.Hostname.GetSysName()
221
    req = confd_client.ConfdClientRequest(type=
222
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
223
                                          query=my_name)
224
    self.confd_client.SendRequest(req, async=False, coverage=-1)
225
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
226
    if not timed_out:
227
      # should have a valid response
228
      status, result = self.store_cb.GetResponse(req.rsalt)
229
      assert status, "Missing result but received replies"
230
      if not self.filter_cb.consistent[req.rsalt]:
231
        logging.warning("Inconsistent replies, not doing anything")
232
        return
233
      self.DoMaintenance(result.server_reply.answer)
234
    else:
235
      logging.warning("Confd query timed out, cannot do maintenance actions")
236

    
237

    
238
class WatcherState(object):
239
  """Interface to a state file recording restart attempts.
240

241
  """
242
  def __init__(self, statefile):
243
    """Open, lock, read and parse the file.
244

245
    @type statefile: file
246
    @param statefile: State file object
247

248
    """
249
    self.statefile = statefile
250

    
251
    try:
252
      state_data = self.statefile.read()
253
      if not state_data:
254
        self._data = {}
255
      else:
256
        self._data = serializer.Load(state_data)
257
    except Exception, msg: # pylint: disable-msg=W0703
258
      # Ignore errors while loading the file and treat it as empty
259
      self._data = {}
260
      logging.warning(("Invalid state file. Using defaults."
261
                       " Error message: %s"), msg)
262

    
263
    if "instance" not in self._data:
264
      self._data["instance"] = {}
265
    if "node" not in self._data:
266
      self._data["node"] = {}
267

    
268
    self._orig_data = serializer.Dump(self._data)
269

    
270
  def Save(self):
271
    """Save state to file, then unlock and close it.
272

273
    """
274
    assert self.statefile
275

    
276
    serialized_form = serializer.Dump(self._data)
277
    if self._orig_data == serialized_form:
278
      logging.debug("Data didn't change, just touching status file")
279
      os.utime(constants.WATCHER_STATEFILE, None)
280
      return
281

    
282
    # We need to make sure the file is locked before renaming it, otherwise
283
    # starting ganeti-watcher again at the same time will create a conflict.
284
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
285
                         data=serialized_form,
286
                         prewrite=utils.LockFile, close=False)
287
    self.statefile = os.fdopen(fd, 'w+')
288

    
289
  def Close(self):
290
    """Unlock configuration file and close it.
291

292
    """
293
    assert self.statefile
294

    
295
    # Files are automatically unlocked when closing them
296
    self.statefile.close()
297
    self.statefile = None
298

    
299
  def GetNodeBootID(self, name):
300
    """Returns the last boot ID of a node or None.
301

302
    """
303
    ndata = self._data["node"]
304

    
305
    if name in ndata and KEY_BOOT_ID in ndata[name]:
306
      return ndata[name][KEY_BOOT_ID]
307
    return None
308

    
309
  def SetNodeBootID(self, name, bootid):
310
    """Sets the boot ID of a node.
311

312
    """
313
    assert bootid
314

    
315
    ndata = self._data["node"]
316

    
317
    if name not in ndata:
318
      ndata[name] = {}
319

    
320
    ndata[name][KEY_BOOT_ID] = bootid
321

    
322
  def NumberOfRestartAttempts(self, instance):
323
    """Returns number of previous restart attempts.
324

325
    @type instance: L{Instance}
326
    @param instance: the instance to look up
327

328
    """
329
    idata = self._data["instance"]
330

    
331
    if instance.name in idata:
332
      return idata[instance.name][KEY_RESTART_COUNT]
333

    
334
    return 0
335

    
336
  def MaintainInstanceList(self, instances):
337
    """Perform maintenance on the recorded instances.
338

339
    @type instances: list of string
340
    @param instances: the list of currently existing instances
341

342
    """
343
    idict = self._data["instance"]
344
    # First, delete obsolete instances
345
    obsolete_instances = set(idict).difference(instances)
346
    for inst in obsolete_instances:
347
      logging.debug("Forgetting obsolete instance %s", inst)
348
      del idict[inst]
349

    
350
    # Second, delete expired records
351
    earliest = time.time() - RETRY_EXPIRATION
352
    expired_instances = [i for i in idict
353
                         if idict[i][KEY_RESTART_WHEN] < earliest]
354
    for inst in expired_instances:
355
      logging.debug("Expiring record for instance %s", inst)
356
      del idict[inst]
357

    
358
  def RecordRestartAttempt(self, instance):
359
    """Record a restart attempt.
360

361
    @type instance: L{Instance}
362
    @param instance: the instance being restarted
363

364
    """
365
    idata = self._data["instance"]
366

    
367
    if instance.name not in idata:
368
      inst = idata[instance.name] = {}
369
    else:
370
      inst = idata[instance.name]
371

    
372
    inst[KEY_RESTART_WHEN] = time.time()
373
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
374

    
375
  def RemoveInstance(self, instance):
376
    """Update state to reflect that a machine is running.
377

378
    This method removes the record for a named instance (as we only
379
    track down instances).
380

381
    @type instance: L{Instance}
382
    @param instance: the instance to remove from books
383

384
    """
385
    idata = self._data["instance"]
386

    
387
    if instance.name in idata:
388
      del idata[instance.name]
389

    
390

    
391
class Instance(object):
392
  """Abstraction for a Virtual Machine instance.
393

394
  """
395
  def __init__(self, name, state, autostart, snodes):
396
    self.name = name
397
    self.state = state
398
    self.autostart = autostart
399
    self.snodes = snodes
400

    
401
  def Restart(self):
402
    """Encapsulates the start of an instance.
403

404
    """
405
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
406
    cli.SubmitOpCode(op, cl=client)
407

    
408
  def ActivateDisks(self):
409
    """Encapsulates the activation of all disks of an instance.
410

411
    """
412
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
413
    cli.SubmitOpCode(op, cl=client)
414

    
415

    
416
def GetClusterData():
417
  """Get a list of instances on this cluster.
418

419
  """
420
  op1_fields = ["name", "status", "admin_state", "snodes"]
421
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
422
                                use_locking=True)
423
  op2_fields = ["name", "bootid", "offline"]
424
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
425
                            use_locking=True)
426

    
427
  job_id = client.SubmitJob([op1, op2])
428

    
429
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430

    
431
  logging.debug("Got data from cluster, writing instance status file")
432

    
433
  result = all_results[0]
434
  smap = {}
435

    
436
  instances = {}
437

    
438
  # write the upfile
439
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
440
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
441

    
442
  for fields in result:
443
    (name, status, autostart, snodes) = fields
444

    
445
    # update the secondary node map
446
    for node in snodes:
447
      if node not in smap:
448
        smap[node] = []
449
      smap[node].append(name)
450

    
451
    instances[name] = Instance(name, status, autostart, snodes)
452

    
453
  nodes =  dict([(name, (bootid, offline))
454
                 for name, bootid, offline in all_results[1]])
455

    
456
  client.ArchiveJob(job_id)
457

    
458
  return instances, nodes, smap
459

    
460

    
461
class Watcher(object):
462
  """Encapsulate the logic for restarting erroneously halted virtual machines.
463

464
  The calling program should periodically instantiate me and call Run().
465
  This will traverse the list of instances, and make up to MAXTRIES attempts
466
  to restart machines that are down.
467

468
  """
469
  def __init__(self, opts, notepad):
470
    self.notepad = notepad
471
    master = client.QueryConfigValues(["master_node"])[0]
472
    if master != netutils.Hostname.GetSysName():
473
      raise NotMasterError("This is not the master node")
474
    # first archive old jobs
475
    self.ArchiveJobs(opts.job_age)
476
    # and only then submit new ones
477
    self.instances, self.bootids, self.smap = GetClusterData()
478
    self.started_instances = set()
479
    self.opts = opts
480

    
481
  def Run(self):
482
    """Watcher run sequence.
483

484
    """
485
    notepad = self.notepad
486
    self.CheckInstances(notepad)
487
    self.CheckDisks(notepad)
488
    self.VerifyDisks()
489

    
490
  @staticmethod
491
  def ArchiveJobs(age):
492
    """Archive old jobs.
493

494
    """
495
    arch_count, left_count = client.AutoArchiveJobs(age)
496
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
497

    
498
  def CheckDisks(self, notepad):
499
    """Check all nodes for restarted ones.
500

501
    """
502
    check_nodes = []
503
    for name, (new_id, offline) in self.bootids.iteritems():
504
      old = notepad.GetNodeBootID(name)
505
      if new_id is None:
506
        # Bad node, not returning a boot id
507
        if not offline:
508
          logging.debug("Node %s missing boot id, skipping secondary checks",
509
                        name)
510
        continue
511
      if old != new_id:
512
        # Node's boot ID has changed, proably through a reboot.
513
        check_nodes.append(name)
514

    
515
    if check_nodes:
516
      # Activate disks for all instances with any of the checked nodes as a
517
      # secondary node.
518
      for node in check_nodes:
519
        if node not in self.smap:
520
          continue
521
        for instance_name in self.smap[node]:
522
          instance = self.instances[instance_name]
523
          if not instance.autostart:
524
            logging.info(("Skipping disk activation for non-autostart"
525
                          " instance %s"), instance.name)
526
            continue
527
          if instance.name in self.started_instances:
528
            # we already tried to start the instance, which should have
529
            # activated its drives (if they can be at all)
530
            continue
531
          try:
532
            logging.info("Activating disks for instance %s", instance.name)
533
            instance.ActivateDisks()
534
          except Exception: # pylint: disable-msg=W0703
535
            logging.exception("Error while activating disks for instance %s",
536
                              instance.name)
537

    
538
      # Keep changed boot IDs
539
      for name in check_nodes:
540
        notepad.SetNodeBootID(name, self.bootids[name][0])
541

    
542
  def CheckInstances(self, notepad):
543
    """Make a pass over the list of instances, restarting downed ones.
544

545
    """
546
    notepad.MaintainInstanceList(self.instances.keys())
547

    
548
    for instance in self.instances.values():
549
      if instance.state in BAD_STATES:
550
        n = notepad.NumberOfRestartAttempts(instance)
551

    
552
        if n > MAXTRIES:
553
          logging.warning("Not restarting instance %s, retries exhausted",
554
                          instance.name)
555
          continue
556
        elif n < MAXTRIES:
557
          last = " (Attempt #%d)" % (n + 1)
558
        else:
559
          notepad.RecordRestartAttempt(instance)
560
          logging.error("Could not restart %s after %d attempts, giving up",
561
                        instance.name, MAXTRIES)
562
          continue
563
        try:
564
          logging.info("Restarting %s%s",
565
                        instance.name, last)
566
          instance.Restart()
567
          self.started_instances.add(instance.name)
568
        except Exception: # pylint: disable-msg=W0703
569
          logging.exception("Error while restarting instance %s",
570
                            instance.name)
571

    
572
        notepad.RecordRestartAttempt(instance)
573
      elif instance.state in HELPLESS_STATES:
574
        if notepad.NumberOfRestartAttempts(instance):
575
          notepad.RemoveInstance(instance)
576
      else:
577
        if notepad.NumberOfRestartAttempts(instance):
578
          notepad.RemoveInstance(instance)
579
          logging.info("Restart of %s succeeded", instance.name)
580

    
581
  def _CheckForOfflineNodes(self, instance):
582
    """Checks if given instances has any secondary in offline status.
583

584
    @param instance: The instance object
585
    @return: True if any of the secondary is offline, False otherwise
586

587
    """
588
    bootids = []
589
    for node in instance.snodes:
590
      bootids.append(self.bootids[node])
591

    
592
    return compat.any(offline for (_, offline) in bootids)
593

    
594
  def VerifyDisks(self):
595
    """Run gnt-cluster verify-disks.
596

597
    """
598
    op = opcodes.OpClusterVerifyDisks()
599
    job_id = client.SubmitJob([op])
600
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
601
    client.ArchiveJob(job_id)
602
    if not isinstance(result, (tuple, list)):
603
      logging.error("Can't get a valid result from verify-disks")
604
      return
605
    offline_disk_instances = result[1]
606
    if not offline_disk_instances:
607
      # nothing to do
608
      return
609
    logging.debug("Will activate disks for instances %s",
610
                  utils.CommaJoin(offline_disk_instances))
611
    # we submit only one job, and wait for it. not optimal, but spams
612
    # less the job queue
613
    job = []
614
    for name in offline_disk_instances:
615
      instance = self.instances[name]
616
      if (instance.state in HELPLESS_STATES or
617
          self._CheckForOfflineNodes(instance)):
618
        logging.info("Skip instance %s because it is in helpless state or has"
619
                     " one offline secondary", name)
620
        continue
621
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
622

    
623
    if job:
624
      job_id = cli.SendJob(job, cl=client)
625

    
626
      try:
627
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
628
      except Exception: # pylint: disable-msg=W0703
629
        logging.exception("Error while activating disks")
630

    
631

    
632
def OpenStateFile(path):
633
  """Opens the state file and acquires a lock on it.
634

635
  @type path: string
636
  @param path: Path to state file
637

638
  """
639
  # The two-step dance below is necessary to allow both opening existing
640
  # file read/write and creating if not existing. Vanilla open will truncate
641
  # an existing file -or- allow creating if not existing.
642
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
643

    
644
  # Try to acquire lock on state file. If this fails, another watcher instance
645
  # might already be running or another program is temporarily blocking the
646
  # watcher from running.
647
  try:
648
    utils.LockFile(statefile_fd)
649
  except errors.LockError, err:
650
    logging.error("Can't acquire lock on state file %s: %s", path, err)
651
    return None
652

    
653
  return os.fdopen(statefile_fd, "w+")
654

    
655

    
656
def IsRapiResponding(hostname):
657
  """Connects to RAPI port and does a simple test.
658

659
  Connects to RAPI port of hostname and does a simple test. At this time, the
660
  test is GetVersion.
661

662
  @type hostname: string
663
  @param hostname: hostname of the node to connect to.
664
  @rtype: bool
665
  @return: Whether RAPI is working properly
666

667
  """
668
  curl_config = rapi.client.GenericCurlConfig()
669
  rapi_client = rapi.client.GanetiRapiClient(hostname,
670
                                             curl_config_fn=curl_config)
671
  try:
672
    master_version = rapi_client.GetVersion()
673
  except rapi.client.CertificateError, err:
674
    logging.warning("RAPI Error: CertificateError (%s)", err)
675
    return False
676
  except rapi.client.GanetiApiError, err:
677
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
678
    return False
679
  logging.debug("RAPI Result: master_version is %s", master_version)
680
  return master_version == constants.RAPI_VERSION
681

    
682

    
683
def ParseOptions():
684
  """Parse the command line options.
685

686
  @return: (options, args) as from OptionParser.parse_args()
687

688
  """
689
  parser = OptionParser(description="Ganeti cluster watcher",
690
                        usage="%prog [-d]",
691
                        version="%%prog (ganeti) %s" %
692
                        constants.RELEASE_VERSION)
693

    
694
  parser.add_option(cli.DEBUG_OPT)
695
  parser.add_option("-A", "--job-age", dest="job_age",
696
                    help="Autoarchive jobs older than this age (default"
697
                    " 6 hours)", default=6*3600)
698
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
699
                    action="store_true", help="Ignore cluster pause setting")
700
  options, args = parser.parse_args()
701
  options.job_age = cli.ParseTimespec(options.job_age)
702
  return options, args
703

    
704

    
705
@rapi.client.UsesRapiClient
706
def Main():
707
  """Main function.
708

709
  """
710
  global client # pylint: disable-msg=W0603
711

    
712
  options, args = ParseOptions()
713

    
714
  if args: # watcher doesn't take any arguments
715
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
716
    return constants.EXIT_FAILURE
717

    
718
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
719
                     debug=options.debug, stderr_logging=options.debug)
720

    
721
  if ShouldPause() and not options.ignore_pause:
722
    logging.debug("Pause has been set, exiting")
723
    return constants.EXIT_SUCCESS
724

    
725
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
726
  if not statefile:
727
    return constants.EXIT_FAILURE
728

    
729
  update_file = False
730
  try:
731
    StartNodeDaemons()
732
    RunWatcherHooks()
733
    # run node maintenance in all cases, even if master, so that old
734
    # masters can be properly cleaned up too
735
    if NodeMaintenance.ShouldRun():
736
      NodeMaintenance().Exec()
737

    
738
    notepad = WatcherState(statefile)
739
    try:
740
      try:
741
        client = cli.GetClient()
742
      except errors.OpPrereqError:
743
        # this is, from cli.GetClient, a not-master case
744
        logging.debug("Not on master, exiting")
745
        update_file = True
746
        return constants.EXIT_SUCCESS
747
      except luxi.NoMasterError, err:
748
        logging.warning("Master seems to be down (%s), trying to restart",
749
                        str(err))
750
        if not utils.EnsureDaemon(constants.MASTERD):
751
          logging.critical("Can't start the master, exiting")
752
          return constants.EXIT_FAILURE
753
        # else retry the connection
754
        client = cli.GetClient()
755

    
756
      # we are on master now
757
      utils.EnsureDaemon(constants.RAPI)
758

    
759
      # If RAPI isn't responding to queries, try one restart.
760
      logging.debug("Attempting to talk with RAPI.")
761
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
762
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
763
                        " Restarting Ganeti RAPI.")
764
        utils.StopDaemon(constants.RAPI)
765
        utils.EnsureDaemon(constants.RAPI)
766
        logging.debug("Second attempt to talk with RAPI")
767
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
768
          logging.fatal("RAPI is not responding. Please investigate.")
769
      logging.debug("Successfully talked to RAPI.")
770

    
771
      try:
772
        watcher = Watcher(options, notepad)
773
      except errors.ConfigurationError:
774
        # Just exit if there's no configuration
775
        update_file = True
776
        return constants.EXIT_SUCCESS
777

    
778
      watcher.Run()
779
      update_file = True
780

    
781
    finally:
782
      if update_file:
783
        notepad.Save()
784
      else:
785
        logging.debug("Not updating status file due to failure")
786
  except SystemExit:
787
    raise
788
  except NotMasterError:
789
    logging.debug("Not master, exiting")
790
    return constants.EXIT_NOTMASTER
791
  except errors.ResolverError, err:
792
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
793
    return constants.EXIT_NODESETUP_ERROR
794
  except errors.JobQueueFull:
795
    logging.error("Job queue is full, can't query cluster state")
796
  except errors.JobQueueDrainError:
797
    logging.error("Job queue is drained, can't maintain cluster state")
798
  except Exception, err:
799
    logging.exception(str(err))
800
    return constants.EXIT_FAILURE
801

    
802
  return constants.EXIT_SUCCESS