Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 604c175c

History | View | Annotate | Download (24.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import os.path
36
import sys
37
import time
38
import logging
39
from optparse import OptionParser
40

    
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import compat
44
from ganeti import serializer
45
from ganeti import errors
46
from ganeti import opcodes
47
from ganeti import cli
48
from ganeti import luxi
49
from ganeti import ssconf
50
from ganeti import bdev
51
from ganeti import hypervisor
52
from ganeti import rapi
53
from ganeti.confd import client as confd_client
54
from ganeti import netutils
55

    
56
import ganeti.rapi.client # pylint: disable-msg=W0611
57

    
58

    
59
MAXTRIES = 5
60
# Delete any record that is older than 8 hours; this value is based on
61
# the fact that the current retry counter is 5, and watcher runs every
62
# 5 minutes, so it takes around half an hour to exceed the retry
63
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64
RETRY_EXPIRATION = 8 * 3600
65
BAD_STATES = [constants.INSTST_ERRORDOWN]
66
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
67
NOTICE = 'NOTICE'
68
ERROR = 'ERROR'
69
KEY_RESTART_COUNT = "restart_count"
70
KEY_RESTART_WHEN = "restart_when"
71
KEY_BOOT_ID = "bootid"
72

    
73

    
74
# Global client object
75
client = None
76

    
77

    
78
class NotMasterError(errors.GenericError):
79
  """Exception raised when this host is not the master."""
80

    
81

    
82
def ShouldPause():
83
  """Check whether we should pause.
84

85
  """
86
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87

    
88

    
89
def StartNodeDaemons():
90
  """Start all the daemons that should be running on all nodes.
91

92
  """
93
  # on master or not, try to start the node daemon
94
  utils.EnsureDaemon(constants.NODED)
95
  # start confd as well. On non candidates it will be in disabled mode.
96
  utils.EnsureDaemon(constants.CONFD)
97

    
98

    
99
def RunWatcherHooks():
100
  """Run the watcher hooks.
101

102
  """
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104
                             constants.HOOKS_NAME_WATCHER)
105
  if not os.path.isdir(hooks_dir):
106
    return
107

    
108
  try:
109
    results = utils.RunParts(hooks_dir)
110
  except Exception, msg: # pylint: disable-msg=W0703
111
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
112

    
113
  for (relname, status, runresult) in results:
114
    if status == constants.RUNPARTS_SKIP:
115
      logging.debug("Watcher hook %s: skipped", relname)
116
    elif status == constants.RUNPARTS_ERR:
117
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118
    elif status == constants.RUNPARTS_RUN:
119
      if runresult.failed:
120
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121
                        relname, runresult.exit_code, runresult.output)
122
      else:
123
        logging.debug("Watcher hook %s: success (output: %s)", relname,
124
                      runresult.output)
125

    
126

    
127
class NodeMaintenance(object):
128
  """Talks to confd daemons and possible shutdown instances/drbd devices.
129

130
  """
131
  def __init__(self):
132
    self.store_cb = confd_client.StoreResultCallback()
133
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
134
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
135

    
136
  @staticmethod
137
  def ShouldRun():
138
    """Checks whether node maintenance should run.
139

140
    """
141
    try:
142
      return ssconf.SimpleStore().GetMaintainNodeHealth()
143
    except errors.ConfigurationError, err:
144
      logging.error("Configuration error, not activating node maintenance: %s",
145
                    err)
146
      return False
147

    
148
  @staticmethod
149
  def GetRunningInstances():
150
    """Compute list of hypervisor/running instances.
151

152
    """
153
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
154
    results = []
155
    for hv_name in hyp_list:
156
      try:
157
        hv = hypervisor.GetHypervisor(hv_name)
158
        ilist = hv.ListInstances()
159
        results.extend([(iname, hv_name) for iname in ilist])
160
      except: # pylint: disable-msg=W0702
161
        logging.error("Error while listing instances for hypervisor %s",
162
                      hv_name, exc_info=True)
163
    return results
164

    
165
  @staticmethod
166
  def GetUsedDRBDs():
167
    """Get list of used DRBD minors.
168

169
    """
170
    return bdev.DRBD8.GetUsedDevs().keys()
171

    
172
  @classmethod
173
  def DoMaintenance(cls, role):
174
    """Maintain the instance list.
175

176
    """
177
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
178
      inst_running = cls.GetRunningInstances()
179
      cls.ShutdownInstances(inst_running)
180
      drbd_running = cls.GetUsedDRBDs()
181
      cls.ShutdownDRBD(drbd_running)
182
    else:
183
      logging.debug("Not doing anything for role %s", role)
184

    
185
  @staticmethod
186
  def ShutdownInstances(inst_running):
187
    """Shutdown running instances.
188

189
    """
190
    names_running = set([i[0] for i in inst_running])
191
    if names_running:
192
      logging.info("Following instances should not be running,"
193
                   " shutting them down: %s", utils.CommaJoin(names_running))
194
      # this dictionary will collapse duplicate instance names (only
195
      # xen pvm/vhm) into a single key, which is fine
196
      i2h = dict(inst_running)
197
      for name in names_running:
198
        hv_name = i2h[name]
199
        hv = hypervisor.GetHypervisor(hv_name)
200
        hv.StopInstance(None, force=True, name=name)
201

    
202
  @staticmethod
203
  def ShutdownDRBD(drbd_running):
204
    """Shutdown active DRBD devices.
205

206
    """
207
    if drbd_running:
208
      logging.info("Following DRBD minors should not be active,"
209
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
210
      for minor in drbd_running:
211
        # pylint: disable-msg=W0212
212
        # using the private method as is, pending enhancements to the DRBD
213
        # interface
214
        bdev.DRBD8._ShutdownAll(minor)
215

    
216
  def Exec(self):
217
    """Check node status versus cluster desired state.
218

219
    """
220
    my_name = netutils.Hostname.GetSysName()
221
    req = confd_client.ConfdClientRequest(type=
222
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
223
                                          query=my_name)
224
    self.confd_client.SendRequest(req, async=False, coverage=-1)
225
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
226
    if not timed_out:
227
      # should have a valid response
228
      status, result = self.store_cb.GetResponse(req.rsalt)
229
      assert status, "Missing result but received replies"
230
      if not self.filter_cb.consistent[req.rsalt]:
231
        logging.warning("Inconsistent replies, not doing anything")
232
        return
233
      self.DoMaintenance(result.server_reply.answer)
234
    else:
235
      logging.warning("Confd query timed out, cannot do maintenance actions")
236

    
237

    
238
class WatcherState(object):
239
  """Interface to a state file recording restart attempts.
240

241
  """
242
  def __init__(self, statefile):
243
    """Open, lock, read and parse the file.
244

245
    @type statefile: file
246
    @param statefile: State file object
247

248
    """
249
    self.statefile = statefile
250

    
251
    try:
252
      state_data = self.statefile.read()
253
      if not state_data:
254
        self._data = {}
255
      else:
256
        self._data = serializer.Load(state_data)
257
    except Exception, msg: # pylint: disable-msg=W0703
258
      # Ignore errors while loading the file and treat it as empty
259
      self._data = {}
260
      logging.warning(("Invalid state file. Using defaults."
261
                       " Error message: %s"), msg)
262

    
263
    if "instance" not in self._data:
264
      self._data["instance"] = {}
265
    if "node" not in self._data:
266
      self._data["node"] = {}
267

    
268
    self._orig_data = serializer.Dump(self._data)
269

    
270
  def Save(self):
271
    """Save state to file, then unlock and close it.
272

273
    """
274
    assert self.statefile
275

    
276
    serialized_form = serializer.Dump(self._data)
277
    if self._orig_data == serialized_form:
278
      logging.debug("Data didn't change, just touching status file")
279
      os.utime(constants.WATCHER_STATEFILE, None)
280
      return
281

    
282
    # We need to make sure the file is locked before renaming it, otherwise
283
    # starting ganeti-watcher again at the same time will create a conflict.
284
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
285
                         data=serialized_form,
286
                         prewrite=utils.LockFile, close=False)
287
    self.statefile = os.fdopen(fd, 'w+')
288

    
289
  def Close(self):
290
    """Unlock configuration file and close it.
291

292
    """
293
    assert self.statefile
294

    
295
    # Files are automatically unlocked when closing them
296
    self.statefile.close()
297
    self.statefile = None
298

    
299
  def GetNodeBootID(self, name):
300
    """Returns the last boot ID of a node or None.
301

302
    """
303
    ndata = self._data["node"]
304

    
305
    if name in ndata and KEY_BOOT_ID in ndata[name]:
306
      return ndata[name][KEY_BOOT_ID]
307
    return None
308

    
309
  def SetNodeBootID(self, name, bootid):
310
    """Sets the boot ID of a node.
311

312
    """
313
    assert bootid
314

    
315
    ndata = self._data["node"]
316

    
317
    if name not in ndata:
318
      ndata[name] = {}
319

    
320
    ndata[name][KEY_BOOT_ID] = bootid
321

    
322
  def NumberOfRestartAttempts(self, instance):
323
    """Returns number of previous restart attempts.
324

325
    @type instance: L{Instance}
326
    @param instance: the instance to look up
327

328
    """
329
    idata = self._data["instance"]
330

    
331
    if instance.name in idata:
332
      return idata[instance.name][KEY_RESTART_COUNT]
333

    
334
    return 0
335

    
336
  def MaintainInstanceList(self, instances):
337
    """Perform maintenance on the recorded instances.
338

339
    @type instances: list of string
340
    @param instances: the list of currently existing instances
341

342
    """
343
    idict = self._data["instance"]
344
    # First, delete obsolete instances
345
    obsolete_instances = set(idict).difference(instances)
346
    for inst in obsolete_instances:
347
      logging.debug("Forgetting obsolete instance %s", inst)
348
      del idict[inst]
349

    
350
    # Second, delete expired records
351
    earliest = time.time() - RETRY_EXPIRATION
352
    expired_instances = [i for i in idict
353
                         if idict[i][KEY_RESTART_WHEN] < earliest]
354
    for inst in expired_instances:
355
      logging.debug("Expiring record for instance %s", inst)
356
      del idict[inst]
357

    
358
  def RecordRestartAttempt(self, instance):
359
    """Record a restart attempt.
360

361
    @type instance: L{Instance}
362
    @param instance: the instance being restarted
363

364
    """
365
    idata = self._data["instance"]
366

    
367
    if instance.name not in idata:
368
      inst = idata[instance.name] = {}
369
    else:
370
      inst = idata[instance.name]
371

    
372
    inst[KEY_RESTART_WHEN] = time.time()
373
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
374

    
375
  def RemoveInstance(self, instance):
376
    """Update state to reflect that a machine is running.
377

378
    This method removes the record for a named instance (as we only
379
    track down instances).
380

381
    @type instance: L{Instance}
382
    @param instance: the instance to remove from books
383

384
    """
385
    idata = self._data["instance"]
386

    
387
    if instance.name in idata:
388
      del idata[instance.name]
389

    
390

    
391
class Instance(object):
392
  """Abstraction for a Virtual Machine instance.
393

394
  """
395
  def __init__(self, name, state, autostart, snodes):
396
    self.name = name
397
    self.state = state
398
    self.autostart = autostart
399
    self.snodes = snodes
400

    
401
  def Restart(self):
402
    """Encapsulates the start of an instance.
403

404
    """
405
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
406
    cli.SubmitOpCode(op, cl=client)
407

    
408
  def ActivateDisks(self):
409
    """Encapsulates the activation of all disks of an instance.
410

411
    """
412
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
413
    cli.SubmitOpCode(op, cl=client)
414

    
415

    
416
def GetClusterData():
417
  """Get a list of instances on this cluster.
418

419
  """
420
  op1_fields = ["name", "status", "admin_state", "snodes"]
421
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
422
                                use_locking=True)
423
  op2_fields = ["name", "bootid", "offline"]
424
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
425
                            use_locking=True)
426

    
427
  job_id = client.SubmitJob([op1, op2])
428

    
429
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430

    
431
  logging.debug("Got data from cluster, writing instance status file")
432

    
433
  result = all_results[0]
434
  smap = {}
435

    
436
  instances = {}
437

    
438
  # write the upfile
439
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
440
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
441

    
442
  for fields in result:
443
    (name, status, autostart, snodes) = fields
444

    
445
    # update the secondary node map
446
    for node in snodes:
447
      if node not in smap:
448
        smap[node] = []
449
      smap[node].append(name)
450

    
451
    instances[name] = Instance(name, status, autostart, snodes)
452

    
453
  nodes =  dict([(name, (bootid, offline))
454
                 for name, bootid, offline in all_results[1]])
455

    
456
  client.ArchiveJob(job_id)
457

    
458
  return instances, nodes, smap
459

    
460

    
461
class Watcher(object):
462
  """Encapsulate the logic for restarting erroneously halted virtual machines.
463

464
  The calling program should periodically instantiate me and call Run().
465
  This will traverse the list of instances, and make up to MAXTRIES attempts
466
  to restart machines that are down.
467

468
  """
469
  def __init__(self, opts, notepad):
470
    self.notepad = notepad
471
    master = client.QueryConfigValues(["master_node"])[0]
472
    if master != netutils.Hostname.GetSysName():
473
      raise NotMasterError("This is not the master node")
474
    # first archive old jobs
475
    self.ArchiveJobs(opts.job_age)
476
    # and only then submit new ones
477
    self.instances, self.bootids, self.smap = GetClusterData()
478
    self.started_instances = set()
479
    self.opts = opts
480

    
481
  def Run(self):
482
    """Watcher run sequence.
483

484
    """
485
    notepad = self.notepad
486
    self.CheckInstances(notepad)
487
    self.CheckDisks(notepad)
488
    self.VerifyDisks()
489

    
490
  @staticmethod
491
  def ArchiveJobs(age):
492
    """Archive old jobs.
493

494
    """
495
    arch_count, left_count = client.AutoArchiveJobs(age)
496
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
497

    
498
  def CheckDisks(self, notepad):
499
    """Check all nodes for restarted ones.
500

501
    """
502
    check_nodes = []
503
    for name, (new_id, offline) in self.bootids.iteritems():
504
      old = notepad.GetNodeBootID(name)
505
      if new_id is None:
506
        # Bad node, not returning a boot id
507
        if not offline:
508
          logging.debug("Node %s missing boot id, skipping secondary checks",
509
                        name)
510
        continue
511
      if old != new_id:
512
        # Node's boot ID has changed, proably through a reboot.
513
        check_nodes.append(name)
514

    
515
    if check_nodes:
516
      # Activate disks for all instances with any of the checked nodes as a
517
      # secondary node.
518
      for node in check_nodes:
519
        if node not in self.smap:
520
          continue
521
        for instance_name in self.smap[node]:
522
          instance = self.instances[instance_name]
523
          if not instance.autostart:
524
            logging.info(("Skipping disk activation for non-autostart"
525
                          " instance %s"), instance.name)
526
            continue
527
          if instance.name in self.started_instances:
528
            # we already tried to start the instance, which should have
529
            # activated its drives (if they can be at all)
530
            logging.debug("Skipping disk activation for instance %s, as"
531
                          " it was already started", instance.name)
532
            continue
533
          try:
534
            logging.info("Activating disks for instance %s", instance.name)
535
            instance.ActivateDisks()
536
          except Exception: # pylint: disable-msg=W0703
537
            logging.exception("Error while activating disks for instance %s",
538
                              instance.name)
539

    
540
      # Keep changed boot IDs
541
      for name in check_nodes:
542
        notepad.SetNodeBootID(name, self.bootids[name][0])
543

    
544
  def CheckInstances(self, notepad):
545
    """Make a pass over the list of instances, restarting downed ones.
546

547
    """
548
    notepad.MaintainInstanceList(self.instances.keys())
549

    
550
    for instance in self.instances.values():
551
      if instance.state in BAD_STATES:
552
        n = notepad.NumberOfRestartAttempts(instance)
553

    
554
        if n > MAXTRIES:
555
          logging.warning("Not restarting instance %s, retries exhausted",
556
                          instance.name)
557
          continue
558
        elif n < MAXTRIES:
559
          last = " (Attempt #%d)" % (n + 1)
560
        else:
561
          notepad.RecordRestartAttempt(instance)
562
          logging.error("Could not restart %s after %d attempts, giving up",
563
                        instance.name, MAXTRIES)
564
          continue
565
        try:
566
          logging.info("Restarting %s%s", instance.name, last)
567
          instance.Restart()
568
          self.started_instances.add(instance.name)
569
        except Exception: # pylint: disable-msg=W0703
570
          logging.exception("Error while restarting instance %s",
571
                            instance.name)
572

    
573
        notepad.RecordRestartAttempt(instance)
574
      elif instance.state in HELPLESS_STATES:
575
        if notepad.NumberOfRestartAttempts(instance):
576
          notepad.RemoveInstance(instance)
577
      else:
578
        if notepad.NumberOfRestartAttempts(instance):
579
          notepad.RemoveInstance(instance)
580
          logging.info("Restart of %s succeeded", instance.name)
581

    
582
  def _CheckForOfflineNodes(self, instance):
583
    """Checks if given instances has any secondary in offline status.
584

585
    @param instance: The instance object
586
    @return: True if any of the secondary is offline, False otherwise
587

588
    """
589
    bootids = []
590
    for node in instance.snodes:
591
      bootids.append(self.bootids[node])
592

    
593
    return compat.any(offline for (_, offline) in bootids)
594

    
595
  def VerifyDisks(self):
596
    """Run gnt-cluster verify-disks.
597

598
    """
599
    op = opcodes.OpClusterVerifyDisks()
600
    job_id = client.SubmitJob([op])
601
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602
    client.ArchiveJob(job_id)
603
    if not isinstance(result, (tuple, list)):
604
      logging.error("Can't get a valid result from verify-disks")
605
      return
606
    offline_disk_instances = result[1]
607
    if not offline_disk_instances:
608
      # nothing to do
609
      logging.debug("verify-disks reported no offline disks, nothing to do")
610
      return
611
    logging.debug("Will activate disks for instance(s) %s",
612
                  utils.CommaJoin(offline_disk_instances))
613
    # we submit only one job, and wait for it. not optimal, but spams
614
    # less the job queue
615
    job = []
616
    for name in offline_disk_instances:
617
      instance = self.instances[name]
618
      if (instance.state in HELPLESS_STATES or
619
          self._CheckForOfflineNodes(instance)):
620
        logging.info("Skip instance %s because it is in helpless state or has"
621
                     " one offline secondary", name)
622
        continue
623
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
624

    
625
    if job:
626
      job_id = cli.SendJob(job, cl=client)
627

    
628
      try:
629
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
630
      except Exception: # pylint: disable-msg=W0703
631
        logging.exception("Error while activating disks")
632

    
633

    
634
def OpenStateFile(path):
635
  """Opens the state file and acquires a lock on it.
636

637
  @type path: string
638
  @param path: Path to state file
639

640
  """
641
  # The two-step dance below is necessary to allow both opening existing
642
  # file read/write and creating if not existing. Vanilla open will truncate
643
  # an existing file -or- allow creating if not existing.
644
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
645

    
646
  # Try to acquire lock on state file. If this fails, another watcher instance
647
  # might already be running or another program is temporarily blocking the
648
  # watcher from running.
649
  try:
650
    utils.LockFile(statefile_fd)
651
  except errors.LockError, err:
652
    logging.error("Can't acquire lock on state file %s: %s", path, err)
653
    return None
654

    
655
  return os.fdopen(statefile_fd, "w+")
656

    
657

    
658
def IsRapiResponding(hostname):
659
  """Connects to RAPI port and does a simple test.
660

661
  Connects to RAPI port of hostname and does a simple test. At this time, the
662
  test is GetVersion.
663

664
  @type hostname: string
665
  @param hostname: hostname of the node to connect to.
666
  @rtype: bool
667
  @return: Whether RAPI is working properly
668

669
  """
670
  curl_config = rapi.client.GenericCurlConfig()
671
  rapi_client = rapi.client.GanetiRapiClient(hostname,
672
                                             curl_config_fn=curl_config)
673
  try:
674
    master_version = rapi_client.GetVersion()
675
  except rapi.client.CertificateError, err:
676
    logging.warning("RAPI Error: CertificateError (%s)", err)
677
    return False
678
  except rapi.client.GanetiApiError, err:
679
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
680
    return False
681
  logging.debug("RAPI Result: master_version is %s", master_version)
682
  return master_version == constants.RAPI_VERSION
683

    
684

    
685
def ParseOptions():
686
  """Parse the command line options.
687

688
  @return: (options, args) as from OptionParser.parse_args()
689

690
  """
691
  parser = OptionParser(description="Ganeti cluster watcher",
692
                        usage="%prog [-d]",
693
                        version="%%prog (ganeti) %s" %
694
                        constants.RELEASE_VERSION)
695

    
696
  parser.add_option(cli.DEBUG_OPT)
697
  parser.add_option("-A", "--job-age", dest="job_age",
698
                    help="Autoarchive jobs older than this age (default"
699
                    " 6 hours)", default=6*3600)
700
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
701
                    action="store_true", help="Ignore cluster pause setting")
702
  options, args = parser.parse_args()
703
  options.job_age = cli.ParseTimespec(options.job_age)
704
  return options, args
705

    
706

    
707
@rapi.client.UsesRapiClient
708
def Main():
709
  """Main function.
710

711
  """
712
  global client # pylint: disable-msg=W0603
713

    
714
  options, args = ParseOptions()
715

    
716
  if args: # watcher doesn't take any arguments
717
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
718
    return constants.EXIT_FAILURE
719

    
720
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
721
                     debug=options.debug, stderr_logging=options.debug)
722

    
723
  if ShouldPause() and not options.ignore_pause:
724
    logging.debug("Pause has been set, exiting")
725
    return constants.EXIT_SUCCESS
726

    
727
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
728
  if not statefile:
729
    return constants.EXIT_FAILURE
730

    
731
  update_file = False
732
  try:
733
    StartNodeDaemons()
734
    RunWatcherHooks()
735
    # run node maintenance in all cases, even if master, so that old
736
    # masters can be properly cleaned up too
737
    if NodeMaintenance.ShouldRun():
738
      NodeMaintenance().Exec()
739

    
740
    notepad = WatcherState(statefile)
741
    try:
742
      try:
743
        client = cli.GetClient()
744
      except errors.OpPrereqError:
745
        # this is, from cli.GetClient, a not-master case
746
        logging.debug("Not on master, exiting")
747
        update_file = True
748
        return constants.EXIT_SUCCESS
749
      except luxi.NoMasterError, err:
750
        logging.warning("Master seems to be down (%s), trying to restart",
751
                        str(err))
752
        if not utils.EnsureDaemon(constants.MASTERD):
753
          logging.critical("Can't start the master, exiting")
754
          return constants.EXIT_FAILURE
755
        # else retry the connection
756
        client = cli.GetClient()
757

    
758
      # we are on master now
759
      utils.EnsureDaemon(constants.RAPI)
760

    
761
      # If RAPI isn't responding to queries, try one restart.
762
      logging.debug("Attempting to talk with RAPI.")
763
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
764
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
765
                        " Restarting Ganeti RAPI.")
766
        utils.StopDaemon(constants.RAPI)
767
        utils.EnsureDaemon(constants.RAPI)
768
        logging.debug("Second attempt to talk with RAPI")
769
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
770
          logging.fatal("RAPI is not responding. Please investigate.")
771
      logging.debug("Successfully talked to RAPI.")
772

    
773
      try:
774
        watcher = Watcher(options, notepad)
775
      except errors.ConfigurationError:
776
        # Just exit if there's no configuration
777
        update_file = True
778
        return constants.EXIT_SUCCESS
779

    
780
      watcher.Run()
781
      update_file = True
782

    
783
    finally:
784
      if update_file:
785
        notepad.Save()
786
      else:
787
        logging.debug("Not updating status file due to failure")
788
  except SystemExit:
789
    raise
790
  except NotMasterError:
791
    logging.debug("Not master, exiting")
792
    return constants.EXIT_NOTMASTER
793
  except errors.ResolverError, err:
794
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
795
    return constants.EXIT_NODESETUP_ERROR
796
  except errors.JobQueueFull:
797
    logging.error("Job queue is full, can't query cluster state")
798
  except errors.JobQueueDrainError:
799
    logging.error("Job queue is drained, can't maintain cluster state")
800
  except Exception, err:
801
    logging.exception(str(err))
802
    return constants.EXIT_FAILURE
803

    
804
  return constants.EXIT_SUCCESS