Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ ae1a845c

History | View | Annotate | Download (25.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import os.path
36
import sys
37
import time
38
import logging
39
from optparse import OptionParser
40

    
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import serializer
45
from ganeti import errors
46
from ganeti import opcodes
47
from ganeti import cli
48
from ganeti import luxi
49
from ganeti import ssconf
50
from ganeti import bdev
51
from ganeti import hypervisor
52
from ganeti import rapi
53
from ganeti.confd import client as confd_client
54
from ganeti import netutils
55

    
56
import ganeti.rapi.client # pylint: disable-msg=W0611
57

    
58

    
59
MAXTRIES = 5
60
# Delete any record that is older than 8 hours; this value is based on
61
# the fact that the current retry counter is 5, and watcher runs every
62
# 5 minutes, so it takes around half an hour to exceed the retry
63
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64
RETRY_EXPIRATION = 8 * 3600
65
BAD_STATES = [constants.INSTST_ERRORDOWN]
66
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
67
NOTICE = 'NOTICE'
68
ERROR = 'ERROR'
69
KEY_RESTART_COUNT = "restart_count"
70
KEY_RESTART_WHEN = "restart_when"
71
KEY_BOOT_ID = "bootid"
72

    
73

    
74
# Global LUXI client object
75
client = None
76

    
77

    
78
class NotMasterError(errors.GenericError):
79
  """Exception raised when this host is not the master."""
80

    
81

    
82
def ShouldPause():
83
  """Check whether we should pause.
84

85
  """
86
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87

    
88

    
89
def StartNodeDaemons():
90
  """Start all the daemons that should be running on all nodes.
91

92
  """
93
  # on master or not, try to start the node daemon
94
  utils.EnsureDaemon(constants.NODED)
95
  # start confd as well. On non candidates it will be in disabled mode.
96
  utils.EnsureDaemon(constants.CONFD)
97

    
98

    
99
def RunWatcherHooks():
100
  """Run the watcher hooks.
101

102
  """
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104
                             constants.HOOKS_NAME_WATCHER)
105
  if not os.path.isdir(hooks_dir):
106
    return
107

    
108
  try:
109
    results = utils.RunParts(hooks_dir)
110
  except Exception: # pylint: disable-msg=W0703
111
    logging.exception("RunParts %s failed: %s", hooks_dir)
112
    return
113

    
114
  for (relname, status, runresult) in results:
115
    if status == constants.RUNPARTS_SKIP:
116
      logging.debug("Watcher hook %s: skipped", relname)
117
    elif status == constants.RUNPARTS_ERR:
118
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119
    elif status == constants.RUNPARTS_RUN:
120
      if runresult.failed:
121
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122
                        relname, runresult.exit_code, runresult.output)
123
      else:
124
        logging.debug("Watcher hook %s: success (output: %s)", relname,
125
                      runresult.output)
126

    
127

    
128
class NodeMaintenance(object):
129
  """Talks to confd daemons and possible shutdown instances/drbd devices.
130

131
  """
132
  def __init__(self):
133
    self.store_cb = confd_client.StoreResultCallback()
134
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
135
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
136

    
137
  @staticmethod
138
  def ShouldRun():
139
    """Checks whether node maintenance should run.
140

141
    """
142
    try:
143
      return ssconf.SimpleStore().GetMaintainNodeHealth()
144
    except errors.ConfigurationError, err:
145
      logging.error("Configuration error, not activating node maintenance: %s",
146
                    err)
147
      return False
148

    
149
  @staticmethod
150
  def GetRunningInstances():
151
    """Compute list of hypervisor/running instances.
152

153
    """
154
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
155
    results = []
156
    for hv_name in hyp_list:
157
      try:
158
        hv = hypervisor.GetHypervisor(hv_name)
159
        ilist = hv.ListInstances()
160
        results.extend([(iname, hv_name) for iname in ilist])
161
      except: # pylint: disable-msg=W0702
162
        logging.error("Error while listing instances for hypervisor %s",
163
                      hv_name, exc_info=True)
164
    return results
165

    
166
  @staticmethod
167
  def GetUsedDRBDs():
168
    """Get list of used DRBD minors.
169

170
    """
171
    return bdev.DRBD8.GetUsedDevs().keys()
172

    
173
  @classmethod
174
  def DoMaintenance(cls, role):
175
    """Maintain the instance list.
176

177
    """
178
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
179
      inst_running = cls.GetRunningInstances()
180
      cls.ShutdownInstances(inst_running)
181
      drbd_running = cls.GetUsedDRBDs()
182
      cls.ShutdownDRBD(drbd_running)
183
    else:
184
      logging.debug("Not doing anything for role %s", role)
185

    
186
  @staticmethod
187
  def ShutdownInstances(inst_running):
188
    """Shutdown running instances.
189

190
    """
191
    names_running = set([i[0] for i in inst_running])
192
    if names_running:
193
      logging.info("Following instances should not be running,"
194
                   " shutting them down: %s", utils.CommaJoin(names_running))
195
      # this dictionary will collapse duplicate instance names (only
196
      # xen pvm/vhm) into a single key, which is fine
197
      i2h = dict(inst_running)
198
      for name in names_running:
199
        hv_name = i2h[name]
200
        hv = hypervisor.GetHypervisor(hv_name)
201
        hv.StopInstance(None, force=True, name=name)
202

    
203
  @staticmethod
204
  def ShutdownDRBD(drbd_running):
205
    """Shutdown active DRBD devices.
206

207
    """
208
    if drbd_running:
209
      logging.info("Following DRBD minors should not be active,"
210
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
211
      for minor in drbd_running:
212
        # pylint: disable-msg=W0212
213
        # using the private method as is, pending enhancements to the DRBD
214
        # interface
215
        bdev.DRBD8._ShutdownAll(minor)
216

    
217
  def Exec(self):
218
    """Check node status versus cluster desired state.
219

220
    """
221
    my_name = netutils.Hostname.GetSysName()
222
    req = confd_client.ConfdClientRequest(type=
223
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
224
                                          query=my_name)
225
    self.confd_client.SendRequest(req, async=False, coverage=-1)
226
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
227
    if not timed_out:
228
      # should have a valid response
229
      status, result = self.store_cb.GetResponse(req.rsalt)
230
      assert status, "Missing result but received replies"
231
      if not self.filter_cb.consistent[req.rsalt]:
232
        logging.warning("Inconsistent replies, not doing anything")
233
        return
234
      self.DoMaintenance(result.server_reply.answer)
235
    else:
236
      logging.warning("Confd query timed out, cannot do maintenance actions")
237

    
238

    
239
class WatcherState(object):
240
  """Interface to a state file recording restart attempts.
241

242
  """
243
  def __init__(self, statefile):
244
    """Open, lock, read and parse the file.
245

246
    @type statefile: file
247
    @param statefile: State file object
248

249
    """
250
    self.statefile = statefile
251

    
252
    try:
253
      state_data = self.statefile.read()
254
      if not state_data:
255
        self._data = {}
256
      else:
257
        self._data = serializer.Load(state_data)
258
    except Exception, msg: # pylint: disable-msg=W0703
259
      # Ignore errors while loading the file and treat it as empty
260
      self._data = {}
261
      logging.warning(("Invalid state file. Using defaults."
262
                       " Error message: %s"), msg)
263

    
264
    if "instance" not in self._data:
265
      self._data["instance"] = {}
266
    if "node" not in self._data:
267
      self._data["node"] = {}
268

    
269
    self._orig_data = serializer.Dump(self._data)
270

    
271
  def Save(self):
272
    """Save state to file, then unlock and close it.
273

274
    """
275
    assert self.statefile
276

    
277
    serialized_form = serializer.Dump(self._data)
278
    if self._orig_data == serialized_form:
279
      logging.debug("Data didn't change, just touching status file")
280
      os.utime(constants.WATCHER_STATEFILE, None)
281
      return
282

    
283
    # We need to make sure the file is locked before renaming it, otherwise
284
    # starting ganeti-watcher again at the same time will create a conflict.
285
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
286
                         data=serialized_form,
287
                         prewrite=utils.LockFile, close=False)
288
    self.statefile = os.fdopen(fd, 'w+')
289

    
290
  def Close(self):
291
    """Unlock configuration file and close it.
292

293
    """
294
    assert self.statefile
295

    
296
    # Files are automatically unlocked when closing them
297
    self.statefile.close()
298
    self.statefile = None
299

    
300
  def GetNodeBootID(self, name):
301
    """Returns the last boot ID of a node or None.
302

303
    """
304
    ndata = self._data["node"]
305

    
306
    if name in ndata and KEY_BOOT_ID in ndata[name]:
307
      return ndata[name][KEY_BOOT_ID]
308
    return None
309

    
310
  def SetNodeBootID(self, name, bootid):
311
    """Sets the boot ID of a node.
312

313
    """
314
    assert bootid
315

    
316
    ndata = self._data["node"]
317

    
318
    if name not in ndata:
319
      ndata[name] = {}
320

    
321
    ndata[name][KEY_BOOT_ID] = bootid
322

    
323
  def NumberOfRestartAttempts(self, instance):
324
    """Returns number of previous restart attempts.
325

326
    @type instance: L{Instance}
327
    @param instance: the instance to look up
328

329
    """
330
    idata = self._data["instance"]
331

    
332
    if instance.name in idata:
333
      return idata[instance.name][KEY_RESTART_COUNT]
334

    
335
    return 0
336

    
337
  def MaintainInstanceList(self, instances):
338
    """Perform maintenance on the recorded instances.
339

340
    @type instances: list of string
341
    @param instances: the list of currently existing instances
342

343
    """
344
    idict = self._data["instance"]
345
    # First, delete obsolete instances
346
    obsolete_instances = set(idict).difference(instances)
347
    for inst in obsolete_instances:
348
      logging.debug("Forgetting obsolete instance %s", inst)
349
      del idict[inst]
350

    
351
    # Second, delete expired records
352
    earliest = time.time() - RETRY_EXPIRATION
353
    expired_instances = [i for i in idict
354
                         if idict[i][KEY_RESTART_WHEN] < earliest]
355
    for inst in expired_instances:
356
      logging.debug("Expiring record for instance %s", inst)
357
      del idict[inst]
358

    
359
  def RecordRestartAttempt(self, instance):
360
    """Record a restart attempt.
361

362
    @type instance: L{Instance}
363
    @param instance: the instance being restarted
364

365
    """
366
    idata = self._data["instance"]
367

    
368
    if instance.name not in idata:
369
      inst = idata[instance.name] = {}
370
    else:
371
      inst = idata[instance.name]
372

    
373
    inst[KEY_RESTART_WHEN] = time.time()
374
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
375

    
376
  def RemoveInstance(self, instance):
377
    """Update state to reflect that a machine is running.
378

379
    This method removes the record for a named instance (as we only
380
    track down instances).
381

382
    @type instance: L{Instance}
383
    @param instance: the instance to remove from books
384

385
    """
386
    idata = self._data["instance"]
387

    
388
    if instance.name in idata:
389
      del idata[instance.name]
390

    
391

    
392
class Instance(object):
393
  """Abstraction for a Virtual Machine instance.
394

395
  """
396
  def __init__(self, name, state, autostart, snodes):
397
    self.name = name
398
    self.state = state
399
    self.autostart = autostart
400
    self.snodes = snodes
401

    
402
  def Restart(self):
403
    """Encapsulates the start of an instance.
404

405
    """
406
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
407
    cli.SubmitOpCode(op, cl=client)
408

    
409
  def ActivateDisks(self):
410
    """Encapsulates the activation of all disks of an instance.
411

412
    """
413
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
414
    cli.SubmitOpCode(op, cl=client)
415

    
416

    
417
def GetClusterData():
418
  """Get a list of instances on this cluster.
419

420
  """
421
  op1_fields = ["name", "status", "admin_state", "snodes"]
422
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
423
                                use_locking=True)
424
  op2_fields = ["name", "bootid", "offline"]
425
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
426
                            use_locking=True)
427

    
428
  job_id = client.SubmitJob([op1, op2])
429

    
430
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
431

    
432
  logging.debug("Got data from cluster, writing instance status file")
433

    
434
  result = all_results[0]
435
  smap = {}
436

    
437
  instances = {}
438

    
439
  # write the upfile
440
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
441
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
442

    
443
  for fields in result:
444
    (name, status, autostart, snodes) = fields
445

    
446
    # update the secondary node map
447
    for node in snodes:
448
      if node not in smap:
449
        smap[node] = []
450
      smap[node].append(name)
451

    
452
    instances[name] = Instance(name, status, autostart, snodes)
453

    
454
  nodes =  dict([(name, (bootid, offline))
455
                 for name, bootid, offline in all_results[1]])
456

    
457
  client.ArchiveJob(job_id)
458

    
459
  return instances, nodes, smap
460

    
461

    
462
class Watcher(object):
463
  """Encapsulate the logic for restarting erroneously halted virtual machines.
464

465
  The calling program should periodically instantiate me and call Run().
466
  This will traverse the list of instances, and make up to MAXTRIES attempts
467
  to restart machines that are down.
468

469
  """
470
  def __init__(self, opts, notepad):
471
    self.notepad = notepad
472
    master = client.QueryConfigValues(["master_node"])[0]
473
    if master != netutils.Hostname.GetSysName():
474
      raise NotMasterError("This is not the master node")
475
    # first archive old jobs
476
    self.ArchiveJobs(opts.job_age)
477
    # and only then submit new ones
478
    self.instances, self.bootids, self.smap = GetClusterData()
479
    self.started_instances = set()
480
    self.opts = opts
481

    
482
  def Run(self):
483
    """Watcher run sequence.
484

485
    """
486
    notepad = self.notepad
487
    self.CheckInstances(notepad)
488
    self.CheckDisks(notepad)
489
    self.VerifyDisks()
490

    
491
  @staticmethod
492
  def ArchiveJobs(age):
493
    """Archive old jobs.
494

495
    """
496
    arch_count, left_count = client.AutoArchiveJobs(age)
497
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
498

    
499
  def CheckDisks(self, notepad):
500
    """Check all nodes for restarted ones.
501

502
    """
503
    check_nodes = []
504
    for name, (new_id, offline) in self.bootids.iteritems():
505
      old = notepad.GetNodeBootID(name)
506
      if new_id is None:
507
        # Bad node, not returning a boot id
508
        if not offline:
509
          logging.debug("Node %s missing boot id, skipping secondary checks",
510
                        name)
511
        continue
512
      if old != new_id:
513
        # Node's boot ID has changed, proably through a reboot.
514
        check_nodes.append(name)
515

    
516
    if check_nodes:
517
      # Activate disks for all instances with any of the checked nodes as a
518
      # secondary node.
519
      for node in check_nodes:
520
        if node not in self.smap:
521
          continue
522
        for instance_name in self.smap[node]:
523
          instance = self.instances[instance_name]
524
          if not instance.autostart:
525
            logging.info(("Skipping disk activation for non-autostart"
526
                          " instance %s"), instance.name)
527
            continue
528
          if instance.name in self.started_instances:
529
            # we already tried to start the instance, which should have
530
            # activated its drives (if they can be at all)
531
            logging.debug("Skipping disk activation for instance %s, as"
532
                          " it was already started", instance.name)
533
            continue
534
          try:
535
            logging.info("Activating disks for instance %s", instance.name)
536
            instance.ActivateDisks()
537
          except Exception: # pylint: disable-msg=W0703
538
            logging.exception("Error while activating disks for instance %s",
539
                              instance.name)
540

    
541
      # Keep changed boot IDs
542
      for name in check_nodes:
543
        notepad.SetNodeBootID(name, self.bootids[name][0])
544

    
545
  def CheckInstances(self, notepad):
546
    """Make a pass over the list of instances, restarting downed ones.
547

548
    """
549
    notepad.MaintainInstanceList(self.instances.keys())
550

    
551
    for instance in self.instances.values():
552
      if instance.state in BAD_STATES:
553
        n = notepad.NumberOfRestartAttempts(instance)
554

    
555
        if n > MAXTRIES:
556
          logging.warning("Not restarting instance %s, retries exhausted",
557
                          instance.name)
558
          continue
559
        elif n < MAXTRIES:
560
          last = " (Attempt #%d)" % (n + 1)
561
        else:
562
          notepad.RecordRestartAttempt(instance)
563
          logging.error("Could not restart %s after %d attempts, giving up",
564
                        instance.name, MAXTRIES)
565
          continue
566
        try:
567
          logging.info("Restarting %s%s", instance.name, last)
568
          instance.Restart()
569
          self.started_instances.add(instance.name)
570
        except Exception: # pylint: disable-msg=W0703
571
          logging.exception("Error while restarting instance %s",
572
                            instance.name)
573

    
574
        notepad.RecordRestartAttempt(instance)
575
      elif instance.state in HELPLESS_STATES:
576
        if notepad.NumberOfRestartAttempts(instance):
577
          notepad.RemoveInstance(instance)
578
      else:
579
        if notepad.NumberOfRestartAttempts(instance):
580
          notepad.RemoveInstance(instance)
581
          logging.info("Restart of %s succeeded", instance.name)
582

    
583
  def _CheckForOfflineNodes(self, instance):
584
    """Checks if given instances has any secondary in offline status.
585

586
    @param instance: The instance object
587
    @return: True if any of the secondary is offline, False otherwise
588

589
    """
590
    bootids = []
591
    for node in instance.snodes:
592
      bootids.append(self.bootids[node])
593

    
594
    return compat.any(offline for (_, offline) in bootids)
595

    
596
  def VerifyDisks(self):
597
    """Run gnt-cluster verify-disks.
598

599
    """
600
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
601
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602
    client.ArchiveJob(job_id)
603

    
604
    # Keep track of submitted jobs
605
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
606

    
607
    archive_jobs = set()
608
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
609
      jex.AddJobId(None, status, job_id)
610
      if status:
611
        archive_jobs.add(job_id)
612

    
613
    offline_disk_instances = set()
614

    
615
    for (status, result) in jex.GetResults():
616
      if not status:
617
        logging.error("Verify-disks job failed: %s", result)
618
        continue
619

    
620
      ((_, instances, _), ) = result
621

    
622
      offline_disk_instances.update(instances)
623

    
624
    for job_id in archive_jobs:
625
      client.ArchiveJob(job_id)
626

    
627
    if not offline_disk_instances:
628
      # nothing to do
629
      logging.debug("verify-disks reported no offline disks, nothing to do")
630
      return
631

    
632
    logging.debug("Will activate disks for instance(s) %s",
633
                  utils.CommaJoin(offline_disk_instances))
634

    
635
    # we submit only one job, and wait for it. not optimal, but spams
636
    # less the job queue
637
    job = []
638
    for name in offline_disk_instances:
639
      instance = self.instances[name]
640
      if (instance.state in HELPLESS_STATES or
641
          self._CheckForOfflineNodes(instance)):
642
        logging.info("Skip instance %s because it is in helpless state or has"
643
                     " one offline secondary", name)
644
        continue
645
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
646

    
647
    if job:
648
      job_id = cli.SendJob(job, cl=client)
649

    
650
      try:
651
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
652
      except Exception: # pylint: disable-msg=W0703
653
        logging.exception("Error while activating disks")
654

    
655

    
656
def OpenStateFile(path):
657
  """Opens the state file and acquires a lock on it.
658

659
  @type path: string
660
  @param path: Path to state file
661

662
  """
663
  # The two-step dance below is necessary to allow both opening existing
664
  # file read/write and creating if not existing. Vanilla open will truncate
665
  # an existing file -or- allow creating if not existing.
666
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
667

    
668
  # Try to acquire lock on state file. If this fails, another watcher instance
669
  # might already be running or another program is temporarily blocking the
670
  # watcher from running.
671
  try:
672
    utils.LockFile(statefile_fd)
673
  except errors.LockError, err:
674
    logging.error("Can't acquire lock on state file %s: %s", path, err)
675
    return None
676

    
677
  return os.fdopen(statefile_fd, "w+")
678

    
679

    
680
def IsRapiResponding(hostname):
681
  """Connects to RAPI port and does a simple test.
682

683
  Connects to RAPI port of hostname and does a simple test. At this time, the
684
  test is GetVersion.
685

686
  @type hostname: string
687
  @param hostname: hostname of the node to connect to.
688
  @rtype: bool
689
  @return: Whether RAPI is working properly
690

691
  """
692
  curl_config = rapi.client.GenericCurlConfig()
693
  rapi_client = rapi.client.GanetiRapiClient(hostname,
694
                                             curl_config_fn=curl_config)
695
  try:
696
    master_version = rapi_client.GetVersion()
697
  except rapi.client.CertificateError, err:
698
    logging.warning("RAPI Error: CertificateError (%s)", err)
699
    return False
700
  except rapi.client.GanetiApiError, err:
701
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
702
    return False
703
  logging.debug("RAPI Result: master_version is %s", master_version)
704
  return master_version == constants.RAPI_VERSION
705

    
706

    
707
def ParseOptions():
708
  """Parse the command line options.
709

710
  @return: (options, args) as from OptionParser.parse_args()
711

712
  """
713
  parser = OptionParser(description="Ganeti cluster watcher",
714
                        usage="%prog [-d]",
715
                        version="%%prog (ganeti) %s" %
716
                        constants.RELEASE_VERSION)
717

    
718
  parser.add_option(cli.DEBUG_OPT)
719
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
720
                    help="Autoarchive jobs older than this age (default"
721
                          " 6 hours)")
722
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
723
                    action="store_true", help="Ignore cluster pause setting")
724
  options, args = parser.parse_args()
725
  options.job_age = cli.ParseTimespec(options.job_age)
726

    
727
  if args:
728
    parser.error("No arguments expected")
729

    
730
  return (options, args)
731

    
732

    
733
@rapi.client.UsesRapiClient
734
def Main():
735
  """Main function.
736

737
  """
738
  global client # pylint: disable-msg=W0603
739

    
740
  (options, _) = ParseOptions()
741

    
742
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
743
                     debug=options.debug, stderr_logging=options.debug)
744

    
745
  if ShouldPause() and not options.ignore_pause:
746
    logging.debug("Pause has been set, exiting")
747
    return constants.EXIT_SUCCESS
748

    
749
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
750
  if not statefile:
751
    return constants.EXIT_FAILURE
752

    
753
  update_file = False
754
  try:
755
    StartNodeDaemons()
756
    RunWatcherHooks()
757
    # run node maintenance in all cases, even if master, so that old
758
    # masters can be properly cleaned up too
759
    if NodeMaintenance.ShouldRun():
760
      NodeMaintenance().Exec()
761

    
762
    notepad = WatcherState(statefile)
763
    try:
764
      try:
765
        client = cli.GetClient()
766
      except errors.OpPrereqError:
767
        # this is, from cli.GetClient, a not-master case
768
        logging.debug("Not on master, exiting")
769
        update_file = True
770
        return constants.EXIT_SUCCESS
771
      except luxi.NoMasterError, err:
772
        logging.warning("Master seems to be down (%s), trying to restart",
773
                        str(err))
774
        if not utils.EnsureDaemon(constants.MASTERD):
775
          logging.critical("Can't start the master, exiting")
776
          return constants.EXIT_FAILURE
777
        # else retry the connection
778
        client = cli.GetClient()
779

    
780
      # we are on master now
781
      utils.EnsureDaemon(constants.RAPI)
782

    
783
      # If RAPI isn't responding to queries, try one restart.
784
      logging.debug("Attempting to talk with RAPI.")
785
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
786
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
787
                        " Restarting Ganeti RAPI.")
788
        utils.StopDaemon(constants.RAPI)
789
        utils.EnsureDaemon(constants.RAPI)
790
        logging.debug("Second attempt to talk with RAPI")
791
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
792
          logging.fatal("RAPI is not responding. Please investigate.")
793
      logging.debug("Successfully talked to RAPI.")
794

    
795
      try:
796
        watcher = Watcher(options, notepad)
797
      except errors.ConfigurationError:
798
        # Just exit if there's no configuration
799
        update_file = True
800
        return constants.EXIT_SUCCESS
801

    
802
      watcher.Run()
803
      update_file = True
804

    
805
    finally:
806
      if update_file:
807
        notepad.Save()
808
      else:
809
        logging.debug("Not updating status file due to failure")
810
  except SystemExit:
811
    raise
812
  except NotMasterError:
813
    logging.debug("Not master, exiting")
814
    return constants.EXIT_NOTMASTER
815
  except errors.ResolverError, err:
816
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
817
    return constants.EXIT_NODESETUP_ERROR
818
  except errors.JobQueueFull:
819
    logging.error("Job queue is full, can't query cluster state")
820
  except errors.JobQueueDrainError:
821
    logging.error("Job queue is drained, can't maintain cluster state")
822
  except Exception, err:
823
    logging.exception(str(err))
824
    return constants.EXIT_FAILURE
825

    
826
  return constants.EXIT_SUCCESS