Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ fc3f75dd

History | View | Annotate | Download (24.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52

    
53
import ganeti.rapi.client # pylint: disable=W0611
54
from ganeti.rapi.client import UsesRapiClient
55

    
56
from ganeti.watcher import nodemaint
57
from ganeti.watcher import state
58

    
59

    
60
MAXTRIES = 5
61
BAD_STATES = frozenset([
62
  constants.INSTST_ERRORDOWN,
63
  ])
64
HELPLESS_STATES = frozenset([
65
  constants.INSTST_NODEDOWN,
66
  constants.INSTST_NODEOFFLINE,
67
  ])
68
NOTICE = "NOTICE"
69
ERROR = "ERROR"
70

    
71
#: Number of seconds to wait between starting child processes for node groups
72
CHILD_PROCESS_DELAY = 1.0
73

    
74
#: How many seconds to wait for instance status file lock
75
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
76

    
77

    
78
class NotMasterError(errors.GenericError):
79
  """Exception raised when this host is not the master."""
80

    
81

    
82
def ShouldPause():
83
  """Check whether we should pause.
84

85
  """
86
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87

    
88

    
89
def StartNodeDaemons():
90
  """Start all the daemons that should be running on all nodes.
91

92
  """
93
  # on master or not, try to start the node daemon
94
  utils.EnsureDaemon(constants.NODED)
95
  # start confd as well. On non candidates it will be in disabled mode.
96
  if constants.ENABLE_CONFD:
97
    utils.EnsureDaemon(constants.CONFD)
98

    
99

    
100
def RunWatcherHooks():
101
  """Run the watcher hooks.
102

103
  """
104
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
105
                             constants.HOOKS_NAME_WATCHER)
106
  if not os.path.isdir(hooks_dir):
107
    return
108

    
109
  try:
110
    results = utils.RunParts(hooks_dir)
111
  except Exception, err: # pylint: disable=W0703
112
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
113
    return
114

    
115
  for (relname, status, runresult) in results:
116
    if status == constants.RUNPARTS_SKIP:
117
      logging.debug("Watcher hook %s: skipped", relname)
118
    elif status == constants.RUNPARTS_ERR:
119
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
120
    elif status == constants.RUNPARTS_RUN:
121
      if runresult.failed:
122
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
123
                        relname, runresult.exit_code, runresult.output)
124
      else:
125
        logging.debug("Watcher hook %s: success (output: %s)", relname,
126
                      runresult.output)
127
    else:
128
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
129
                                   status)
130

    
131

    
132
class Instance(object):
133
  """Abstraction for a Virtual Machine instance.
134

135
  """
136
  def __init__(self, name, status, autostart, snodes):
137
    self.name = name
138
    self.status = status
139
    self.autostart = autostart
140
    self.snodes = snodes
141

    
142
  def Restart(self, cl):
143
    """Encapsulates the start of an instance.
144

145
    """
146
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
147
    cli.SubmitOpCode(op, cl=cl)
148

    
149
  def ActivateDisks(self, cl):
150
    """Encapsulates the activation of all disks of an instance.
151

152
    """
153
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
154
    cli.SubmitOpCode(op, cl=cl)
155

    
156

    
157
class Node:
158
  """Data container representing cluster node.
159

160
  """
161
  def __init__(self, name, bootid, offline, secondaries):
162
    """Initializes this class.
163

164
    """
165
    self.name = name
166
    self.bootid = bootid
167
    self.offline = offline
168
    self.secondaries = secondaries
169

    
170

    
171
def _CheckInstances(cl, notepad, instances):
172
  """Make a pass over the list of instances, restarting downed ones.
173

174
  """
175
  notepad.MaintainInstanceList(instances.keys())
176

    
177
  started = set()
178

    
179
  for inst in instances.values():
180
    if inst.status in BAD_STATES:
181
      n = notepad.NumberOfRestartAttempts(inst.name)
182

    
183
      if n > MAXTRIES:
184
        logging.warning("Not restarting instance '%s', retries exhausted",
185
                        inst.name)
186
        continue
187

    
188
      if n == MAXTRIES:
189
        notepad.RecordRestartAttempt(inst.name)
190
        logging.error("Could not restart instance '%s' after %s attempts,"
191
                      " giving up", inst.name, MAXTRIES)
192
        continue
193

    
194
      try:
195
        logging.info("Restarting instance '%s' (attempt #%s)",
196
                     inst.name, n + 1)
197
        inst.Restart(cl)
198
      except Exception: # pylint: disable=W0703
199
        logging.exception("Error while restarting instance '%s'", inst.name)
200
      else:
201
        started.add(inst.name)
202

    
203
      notepad.RecordRestartAttempt(inst.name)
204

    
205
    else:
206
      if notepad.NumberOfRestartAttempts(inst.name):
207
        notepad.RemoveInstance(inst.name)
208
        if inst.status not in HELPLESS_STATES:
209
          logging.info("Restart of instance '%s' succeeded", inst.name)
210

    
211
  return started
212

    
213

    
214
def _CheckDisks(cl, notepad, nodes, instances, started):
215
  """Check all nodes for restarted ones.
216

217
  """
218
  check_nodes = []
219

    
220
  for node in nodes.values():
221
    old = notepad.GetNodeBootID(node.name)
222
    if not node.bootid:
223
      # Bad node, not returning a boot id
224
      if not node.offline:
225
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
226
                      node.name)
227
      continue
228

    
229
    if old != node.bootid:
230
      # Node's boot ID has changed, probably through a reboot
231
      check_nodes.append(node)
232

    
233
  if check_nodes:
234
    # Activate disks for all instances with any of the checked nodes as a
235
    # secondary node.
236
    for node in check_nodes:
237
      for instance_name in node.secondaries:
238
        try:
239
          inst = instances[instance_name]
240
        except KeyError:
241
          logging.info("Can't find instance '%s', maybe it was ignored",
242
                       instance_name)
243
          continue
244

    
245
        if not inst.autostart:
246
          logging.info("Skipping disk activation for non-autostart"
247
                       " instance '%s'", inst.name)
248
          continue
249

    
250
        if inst.name in started:
251
          # we already tried to start the instance, which should have
252
          # activated its drives (if they can be at all)
253
          logging.debug("Skipping disk activation for instance '%s' as"
254
                        " it was already started", inst.name)
255
          continue
256

    
257
        try:
258
          logging.info("Activating disks for instance '%s'", inst.name)
259
          inst.ActivateDisks(cl)
260
        except Exception: # pylint: disable=W0703
261
          logging.exception("Error while activating disks for instance '%s'",
262
                            inst.name)
263

    
264
    # Keep changed boot IDs
265
    for node in check_nodes:
266
      notepad.SetNodeBootID(node.name, node.bootid)
267

    
268

    
269
def _CheckForOfflineNodes(nodes, instance):
270
  """Checks if given instances has any secondary in offline status.
271

272
  @param instance: The instance object
273
  @return: True if any of the secondary is offline, False otherwise
274

275
  """
276
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277

    
278

    
279
def _VerifyDisks(cl, uuid, nodes, instances):
280
  """Run a per-group "gnt-cluster verify-disks".
281

282
  """
283
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
284
  ((_, offline_disk_instances, _), ) = \
285
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
286
  cl.ArchiveJob(job_id)
287

    
288
  if not offline_disk_instances:
289
    # nothing to do
290
    logging.debug("Verify-disks reported no offline disks, nothing to do")
291
    return
292

    
293
  logging.debug("Will activate disks for instance(s) %s",
294
                utils.CommaJoin(offline_disk_instances))
295

    
296
  # We submit only one job, and wait for it. Not optimal, but this puts less
297
  # load on the job queue.
298
  job = []
299
  for name in offline_disk_instances:
300
    try:
301
      inst = instances[name]
302
    except KeyError:
303
      logging.info("Can't find instance '%s', maybe it was ignored", name)
304
      continue
305

    
306
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
307
      logging.info("Skipping instance '%s' because it is in a helpless state"
308
                   " or has offline secondaries", name)
309
      continue
310

    
311
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312

    
313
  if job:
314
    job_id = cli.SendJob(job, cl=cl)
315

    
316
    try:
317
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
318
    except Exception: # pylint: disable=W0703
319
      logging.exception("Error while activating disks")
320

    
321

    
322
def IsRapiResponding(hostname):
323
  """Connects to RAPI port and does a simple test.
324

325
  Connects to RAPI port of hostname and does a simple test. At this time, the
326
  test is GetVersion.
327

328
  @type hostname: string
329
  @param hostname: hostname of the node to connect to.
330
  @rtype: bool
331
  @return: Whether RAPI is working properly
332

333
  """
334
  curl_config = rapi.client.GenericCurlConfig()
335
  rapi_client = rapi.client.GanetiRapiClient(hostname,
336
                                             curl_config_fn=curl_config)
337
  try:
338
    master_version = rapi_client.GetVersion()
339
  except rapi.client.CertificateError, err:
340
    logging.warning("RAPI certificate error: %s", err)
341
    return False
342
  except rapi.client.GanetiApiError, err:
343
    logging.warning("RAPI error: %s", err)
344
    return False
345
  else:
346
    logging.debug("Reported RAPI version %s", master_version)
347
    return master_version == constants.RAPI_VERSION
348

    
349

    
350
def ParseOptions():
351
  """Parse the command line options.
352

353
  @return: (options, args) as from OptionParser.parse_args()
354

355
  """
356
  parser = OptionParser(description="Ganeti cluster watcher",
357
                        usage="%prog [-d]",
358
                        version="%%prog (ganeti) %s" %
359
                        constants.RELEASE_VERSION)
360

    
361
  parser.add_option(cli.DEBUG_OPT)
362
  parser.add_option(cli.NODEGROUP_OPT)
363
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
364
                    help="Autoarchive jobs older than this age (default"
365
                          " 6 hours)")
366
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
367
                    action="store_true", help="Ignore cluster pause setting")
368
  parser.add_option("--wait-children", dest="wait_children",
369
                    action="store_true", help="Wait for child processes")
370
  parser.add_option("--no-wait-children", dest="wait_children",
371
                    action="store_false",
372
                    help="Don't wait for child processes")
373
  # See optparse documentation for why default values are not set by options
374
  parser.set_defaults(wait_children=True)
375
  options, args = parser.parse_args()
376
  options.job_age = cli.ParseTimespec(options.job_age)
377

    
378
  if args:
379
    parser.error("No arguments expected")
380

    
381
  return (options, args)
382

    
383

    
384
def _WriteInstanceStatus(filename, data):
385
  """Writes the per-group instance status file.
386

387
  The entries are sorted.
388

389
  @type filename: string
390
  @param filename: Path to instance status file
391
  @type data: list of tuple; (instance name as string, status as string)
392
  @param data: Instance name and status
393

394
  """
395
  logging.debug("Updating instance status file '%s' with %s instances",
396
                filename, len(data))
397

    
398
  utils.WriteFile(filename,
399
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
400
                                   sorted(data))))
401

    
402

    
403
def _UpdateInstanceStatus(filename, instances):
404
  """Writes an instance status file from L{Instance} objects.
405

406
  @type filename: string
407
  @param filename: Path to status file
408
  @type instances: list of L{Instance}
409

410
  """
411
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
412
                                  for inst in instances])
413

    
414

    
415
def _ReadInstanceStatus(filename):
416
  """Reads an instance status file.
417

418
  @type filename: string
419
  @param filename: Path to status file
420
  @rtype: tuple; (None or number, list of lists containing instance name and
421
    status)
422
  @return: File's mtime and instance status contained in the file; mtime is
423
    C{None} if file can't be read
424

425
  """
426
  logging.debug("Reading per-group instance status from '%s'", filename)
427

    
428
  statcb = utils.FileStatHelper()
429
  try:
430
    content = utils.ReadFile(filename, preread=statcb)
431
  except EnvironmentError, err:
432
    if err.errno == errno.ENOENT:
433
      logging.error("Can't read '%s', does not exist (yet)", filename)
434
    else:
435
      logging.exception("Unable to read '%s', ignoring", filename)
436
    return (None, None)
437
  else:
438
    return (statcb.st.st_mtime, [line.split(None, 1)
439
                                 for line in content.splitlines()])
440

    
441

    
442
def _MergeInstanceStatus(filename, pergroup_filename, groups):
443
  """Merges all per-group instance status files into a global one.
444

445
  @type filename: string
446
  @param filename: Path to global instance status file
447
  @type pergroup_filename: string
448
  @param pergroup_filename: Path to per-group status files, must contain "%s"
449
    to be replaced with group UUID
450
  @type groups: sequence
451
  @param groups: UUIDs of known groups
452

453
  """
454
  # Lock global status file in exclusive mode
455
  lock = utils.FileLock.Open(filename)
456
  try:
457
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
458
  except errors.LockError, err:
459
    # All per-group processes will lock and update the file. None of them
460
    # should take longer than 10 seconds (the value of
461
    # INSTANCE_STATUS_LOCK_TIMEOUT).
462
    logging.error("Can't acquire lock on instance status file '%s', not"
463
                  " updating: %s", filename, err)
464
    return
465

    
466
  logging.debug("Acquired exclusive lock on '%s'", filename)
467

    
468
  data = {}
469

    
470
  # Load instance status from all groups
471
  for group_uuid in groups:
472
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
473

    
474
    if mtime is not None:
475
      for (instance_name, status) in instdata:
476
        data.setdefault(instance_name, []).append((mtime, status))
477

    
478
  # Select last update based on file mtime
479
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
480
                for (instance_name, status) in data.items()]
481

    
482
  # Write the global status file. Don't touch file after it's been
483
  # updated--there is no lock anymore.
484
  _WriteInstanceStatus(filename, inststatus)
485

    
486

    
487
def GetLuxiClient(try_restart):
488
  """Tries to connect to the master daemon.
489

490
  @type try_restart: bool
491
  @param try_restart: Whether to attempt to restart the master daemon
492

493
  """
494
  try:
495
    return cli.GetClient()
496
  except errors.OpPrereqError, err:
497
    # this is, from cli.GetClient, a not-master case
498
    raise NotMasterError("Not on master node (%s)" % err)
499

    
500
  except luxi.NoMasterError, err:
501
    if not try_restart:
502
      raise
503

    
504
    logging.warning("Master daemon seems to be down (%s), trying to restart",
505
                    err)
506

    
507
    if not utils.EnsureDaemon(constants.MASTERD):
508
      raise errors.GenericError("Can't start the master daemon")
509

    
510
    # Retry the connection
511
    return cli.GetClient()
512

    
513

    
514
def _StartGroupChildren(cl, wait):
515
  """Starts a new instance of the watcher for every node group.
516

517
  """
518
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
519
                        for arg in sys.argv)
520

    
521
  result = cl.QueryGroups([], ["name", "uuid"], False)
522

    
523
  children = []
524

    
525
  for (idx, (name, uuid)) in enumerate(result):
526
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
527

    
528
    if idx > 0:
529
      # Let's not kill the system
530
      time.sleep(CHILD_PROCESS_DELAY)
531

    
532
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
533
                  name, uuid, args)
534

    
535
    try:
536
      # TODO: Should utils.StartDaemon be used instead?
537
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
538
    except Exception: # pylint: disable=W0703
539
      logging.exception("Failed to start child for group '%s' (%s)",
540
                        name, uuid)
541
    else:
542
      logging.debug("Started with PID %s", pid)
543
      children.append(pid)
544

    
545
  if wait:
546
    for pid in children:
547
      logging.debug("Waiting for child PID %s", pid)
548
      try:
549
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
550
      except EnvironmentError, err:
551
        result = str(err)
552

    
553
      logging.debug("Child PID %s exited with status %s", pid, result)
554

    
555

    
556
def _ArchiveJobs(cl, age):
557
  """Archives old jobs.
558

559
  """
560
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
561
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
562

    
563

    
564
def _CheckMaster(cl):
565
  """Ensures current host is master node.
566

567
  """
568
  (master, ) = cl.QueryConfigValues(["master_node"])
569
  if master != netutils.Hostname.GetSysName():
570
    raise NotMasterError("This is not the master node")
571

    
572

    
573
@UsesRapiClient
574
def _GlobalWatcher(opts):
575
  """Main function for global watcher.
576

577
  At the end child processes are spawned for every node group.
578

579
  """
580
  StartNodeDaemons()
581
  RunWatcherHooks()
582

    
583
  # Run node maintenance in all cases, even if master, so that old masters can
584
  # be properly cleaned up
585
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
586
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
587

    
588
  try:
589
    client = GetLuxiClient(True)
590
  except NotMasterError:
591
    # Don't proceed on non-master nodes
592
    return constants.EXIT_SUCCESS
593

    
594
  # we are on master now
595
  utils.EnsureDaemon(constants.RAPI)
596

    
597
  # If RAPI isn't responding to queries, try one restart
598
  logging.debug("Attempting to talk to remote API on %s",
599
                constants.IP4_ADDRESS_LOCALHOST)
600
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
601
    logging.warning("Couldn't get answer from remote API, restaring daemon")
602
    utils.StopDaemon(constants.RAPI)
603
    utils.EnsureDaemon(constants.RAPI)
604
    logging.debug("Second attempt to talk to remote API")
605
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
606
      logging.fatal("RAPI is not responding")
607
  logging.debug("Successfully talked to remote API")
608

    
609
  _CheckMaster(client)
610
  _ArchiveJobs(client, opts.job_age)
611

    
612
  # Spawn child processes for all node groups
613
  _StartGroupChildren(client, opts.wait_children)
614

    
615
  return constants.EXIT_SUCCESS
616

    
617

    
618
def _GetGroupData(cl, uuid):
619
  """Retrieves instances and nodes per node group.
620

621
  """
622
  job = [
623
    # Get all primary instances in group
624
    opcodes.OpQuery(what=constants.QR_INSTANCE,
625
                    fields=["name", "status", "admin_state", "snodes",
626
                            "pnode.group.uuid", "snodes.group.uuid"],
627
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
628
                    use_locking=True),
629

    
630
    # Get all nodes in group
631
    opcodes.OpQuery(what=constants.QR_NODE,
632
                    fields=["name", "bootid", "offline"],
633
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
634
                    use_locking=True),
635
    ]
636

    
637
  job_id = cl.SubmitJob(job)
638
  results = map(objects.QueryResponse.FromDict,
639
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
640
  cl.ArchiveJob(job_id)
641

    
642
  results_data = map(operator.attrgetter("data"), results)
643

    
644
  # Ensure results are tuples with two values
645
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
646

    
647
  # Extract values ignoring result status
648
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
649
                                 for values in res]
650
                                for res in results_data]
651

    
652
  secondaries = {}
653
  instances = []
654

    
655
  # Load all instances
656
  for (name, status, autostart, snodes, pnode_group_uuid,
657
       snodes_group_uuid) in raw_instances:
658
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
659
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
660
                    " groups %s", name, pnode_group_uuid,
661
                    utils.CommaJoin(snodes_group_uuid))
662
    else:
663
      instances.append(Instance(name, status, autostart, snodes))
664

    
665
      for node in snodes:
666
        secondaries.setdefault(node, set()).add(name)
667

    
668
  # Load all nodes
669
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
670
           for (name, bootid, offline) in raw_nodes]
671

    
672
  return (dict((node.name, node) for node in nodes),
673
          dict((inst.name, inst) for inst in instances))
674

    
675

    
676
def _LoadKnownGroups():
677
  """Returns a list of all node groups known by L{ssconf}.
678

679
  """
680
  groups = ssconf.SimpleStore().GetNodegroupList()
681

    
682
  result = list(line.split(None, 1)[0] for line in groups
683
                if line.strip())
684

    
685
  if not compat.all(map(utils.UUID_RE.match, result)):
686
    raise errors.GenericError("Ssconf contains invalid group UUID")
687

    
688
  return result
689

    
690

    
691
def _GroupWatcher(opts):
692
  """Main function for per-group watcher process.
693

694
  """
695
  group_uuid = opts.nodegroup.lower()
696

    
697
  if not utils.UUID_RE.match(group_uuid):
698
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
699
                              " got '%s'" %
700
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
701

    
702
  logging.info("Watcher for node group '%s'", group_uuid)
703

    
704
  known_groups = _LoadKnownGroups()
705

    
706
  # Check if node group is known
707
  if group_uuid not in known_groups:
708
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
709
                              group_uuid)
710

    
711
  # Group UUID has been verified and should not contain any dangerous
712
  # characters
713
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
714
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
715

    
716
  logging.debug("Using state file %s", state_path)
717

    
718
  # Global watcher
719
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
720
  if not statefile:
721
    return constants.EXIT_FAILURE
722

    
723
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
724
  try:
725
    # Connect to master daemon
726
    client = GetLuxiClient(False)
727

    
728
    _CheckMaster(client)
729

    
730
    (nodes, instances) = _GetGroupData(client, group_uuid)
731

    
732
    # Update per-group instance status file
733
    _UpdateInstanceStatus(inst_status_path, instances.values())
734

    
735
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
736
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
737
                         known_groups)
738

    
739
    started = _CheckInstances(client, notepad, instances)
740
    _CheckDisks(client, notepad, nodes, instances, started)
741
    _VerifyDisks(client, group_uuid, nodes, instances)
742
  except Exception, err:
743
    logging.info("Not updating status file due to failure: %s", err)
744
    raise
745
  else:
746
    # Save changes for next run
747
    notepad.Save(state_path)
748

    
749
  return constants.EXIT_SUCCESS
750

    
751

    
752
def Main():
753
  """Main function.
754

755
  """
756
  (options, _) = ParseOptions()
757

    
758
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
759
                     debug=options.debug, stderr_logging=options.debug)
760

    
761
  if ShouldPause() and not options.ignore_pause:
762
    logging.debug("Pause has been set, exiting")
763
    return constants.EXIT_SUCCESS
764

    
765
  # Try to acquire global watcher lock in shared mode
766
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
767
  try:
768
    lock.Shared(blocking=False)
769
  except (EnvironmentError, errors.LockError), err:
770
    logging.error("Can't acquire lock on %s: %s",
771
                  constants.WATCHER_LOCK_FILE, err)
772
    return constants.EXIT_SUCCESS
773

    
774
  if options.nodegroup is None:
775
    fn = _GlobalWatcher
776
  else:
777
    # Per-nodegroup watcher
778
    fn = _GroupWatcher
779

    
780
  try:
781
    return fn(options)
782
  except (SystemExit, KeyboardInterrupt):
783
    raise
784
  except NotMasterError:
785
    logging.debug("Not master, exiting")
786
    return constants.EXIT_NOTMASTER
787
  except errors.ResolverError, err:
788
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
789
    return constants.EXIT_NODESETUP_ERROR
790
  except errors.JobQueueFull:
791
    logging.error("Job queue is full, can't query cluster state")
792
  except errors.JobQueueDrainError:
793
    logging.error("Job queue is drained, can't maintain cluster state")
794
  except Exception, err:
795
    logging.exception(str(err))
796
    return constants.EXIT_FAILURE
797

    
798
  return constants.EXIT_SUCCESS