Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 0ba177e2

History | View | Annotate | Download (24.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52

    
53
import ganeti.rapi.client # pylint: disable=W0611
54

    
55
from ganeti.watcher import nodemaint
56
from ganeti.watcher import state
57

    
58

    
59
MAXTRIES = 5
60
BAD_STATES = frozenset([
61
  constants.INSTST_ERRORDOWN,
62
  ])
63
HELPLESS_STATES = frozenset([
64
  constants.INSTST_NODEDOWN,
65
  constants.INSTST_NODEOFFLINE,
66
  ])
67
NOTICE = "NOTICE"
68
ERROR = "ERROR"
69

    
70
#: Number of seconds to wait between starting child processes for node groups
71
CHILD_PROCESS_DELAY = 1.0
72

    
73
#: How many seconds to wait for instance status file lock
74
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75

    
76

    
77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
79

    
80

    
81
def ShouldPause():
82
  """Check whether we should pause.
83

84
  """
85
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86

    
87

    
88
def StartNodeDaemons():
89
  """Start all the daemons that should be running on all nodes.
90

91
  """
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  if constants.ENABLE_CONFD:
96
    utils.EnsureDaemon(constants.CONFD)
97

    
98

    
99
def RunWatcherHooks():
100
  """Run the watcher hooks.
101

102
  """
103
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104
                             constants.HOOKS_NAME_WATCHER)
105
  if not os.path.isdir(hooks_dir):
106
    return
107

    
108
  try:
109
    results = utils.RunParts(hooks_dir)
110
  except Exception, err: # pylint: disable=W0703
111
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
112
    return
113

    
114
  for (relname, status, runresult) in results:
115
    if status == constants.RUNPARTS_SKIP:
116
      logging.debug("Watcher hook %s: skipped", relname)
117
    elif status == constants.RUNPARTS_ERR:
118
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119
    elif status == constants.RUNPARTS_RUN:
120
      if runresult.failed:
121
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122
                        relname, runresult.exit_code, runresult.output)
123
      else:
124
        logging.debug("Watcher hook %s: success (output: %s)", relname,
125
                      runresult.output)
126
    else:
127
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
128
                                   status)
129

    
130

    
131
class Instance(object):
132
  """Abstraction for a Virtual Machine instance.
133

134
  """
135
  def __init__(self, name, status, autostart, snodes):
136
    self.name = name
137
    self.status = status
138
    self.autostart = autostart
139
    self.snodes = snodes
140

    
141
  def Restart(self, cl):
142
    """Encapsulates the start of an instance.
143

144
    """
145
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
146
    cli.SubmitOpCode(op, cl=cl)
147

    
148
  def ActivateDisks(self, cl):
149
    """Encapsulates the activation of all disks of an instance.
150

151
    """
152
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
153
    cli.SubmitOpCode(op, cl=cl)
154

    
155

    
156
class Node:
157
  """Data container representing cluster node.
158

159
  """
160
  def __init__(self, name, bootid, offline, secondaries):
161
    """Initializes this class.
162

163
    """
164
    self.name = name
165
    self.bootid = bootid
166
    self.offline = offline
167
    self.secondaries = secondaries
168

    
169

    
170
def _CheckInstances(cl, notepad, instances):
171
  """Make a pass over the list of instances, restarting downed ones.
172

173
  """
174
  notepad.MaintainInstanceList(instances.keys())
175

    
176
  started = set()
177

    
178
  for inst in instances.values():
179
    if inst.status in BAD_STATES:
180
      n = notepad.NumberOfRestartAttempts(inst.name)
181

    
182
      if n > MAXTRIES:
183
        logging.warning("Not restarting instance '%s', retries exhausted",
184
                        inst.name)
185
        continue
186

    
187
      if n == MAXTRIES:
188
        notepad.RecordRestartAttempt(inst.name)
189
        logging.error("Could not restart instance '%s' after %s attempts,"
190
                      " giving up", inst.name, MAXTRIES)
191
        continue
192

    
193
      try:
194
        logging.info("Restarting instance '%s' (attempt #%s)",
195
                     inst.name, n + 1)
196
        inst.Restart(cl)
197
      except Exception: # pylint: disable=W0703
198
        logging.exception("Error while restarting instance '%s'", inst.name)
199
      else:
200
        started.add(inst.name)
201

    
202
      notepad.RecordRestartAttempt(inst.name)
203

    
204
    else:
205
      if notepad.NumberOfRestartAttempts(inst.name):
206
        notepad.RemoveInstance(inst.name)
207
        if inst.status not in HELPLESS_STATES:
208
          logging.info("Restart of instance '%s' succeeded", inst.name)
209

    
210
  return started
211

    
212

    
213
def _CheckDisks(cl, notepad, nodes, instances, started):
214
  """Check all nodes for restarted ones.
215

216
  """
217
  check_nodes = []
218

    
219
  for node in nodes.values():
220
    old = notepad.GetNodeBootID(node.name)
221
    if not node.bootid:
222
      # Bad node, not returning a boot id
223
      if not node.offline:
224
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
225
                      node.name)
226
      continue
227

    
228
    if old != node.bootid:
229
      # Node's boot ID has changed, probably through a reboot
230
      check_nodes.append(node)
231

    
232
  if check_nodes:
233
    # Activate disks for all instances with any of the checked nodes as a
234
    # secondary node.
235
    for node in check_nodes:
236
      for instance_name in node.secondaries:
237
        try:
238
          inst = instances[instance_name]
239
        except KeyError:
240
          logging.info("Can't find instance '%s', maybe it was ignored",
241
                       instance_name)
242
          continue
243

    
244
        if not inst.autostart:
245
          logging.info("Skipping disk activation for non-autostart"
246
                       " instance '%s'", inst.name)
247
          continue
248

    
249
        if inst.name in started:
250
          # we already tried to start the instance, which should have
251
          # activated its drives (if they can be at all)
252
          logging.debug("Skipping disk activation for instance '%s' as"
253
                        " it was already started", inst.name)
254
          continue
255

    
256
        try:
257
          logging.info("Activating disks for instance '%s'", inst.name)
258
          inst.ActivateDisks(cl)
259
        except Exception: # pylint: disable=W0703
260
          logging.exception("Error while activating disks for instance '%s'",
261
                            inst.name)
262

    
263
    # Keep changed boot IDs
264
    for node in check_nodes:
265
      notepad.SetNodeBootID(node.name, node.bootid)
266

    
267

    
268
def _CheckForOfflineNodes(nodes, instance):
269
  """Checks if given instances has any secondary in offline status.
270

271
  @param instance: The instance object
272
  @return: True if any of the secondary is offline, False otherwise
273

274
  """
275
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
276

    
277

    
278
def _VerifyDisks(cl, uuid, nodes, instances):
279
  """Run a per-group "gnt-cluster verify-disks".
280

281
  """
282
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
283
  ((_, offline_disk_instances, _), ) = \
284
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
285
  cl.ArchiveJob(job_id)
286

    
287
  if not offline_disk_instances:
288
    # nothing to do
289
    logging.debug("Verify-disks reported no offline disks, nothing to do")
290
    return
291

    
292
  logging.debug("Will activate disks for instance(s) %s",
293
                utils.CommaJoin(offline_disk_instances))
294

    
295
  # We submit only one job, and wait for it. Not optimal, but this puts less
296
  # load on the job queue.
297
  job = []
298
  for name in offline_disk_instances:
299
    try:
300
      inst = instances[name]
301
    except KeyError:
302
      logging.info("Can't find instance '%s', maybe it was ignored", name)
303
      continue
304

    
305
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
306
      logging.info("Skipping instance '%s' because it is in a helpless state or"
307
                   " has offline secondaries", name)
308
      continue
309

    
310
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
311

    
312
  if job:
313
    job_id = cli.SendJob(job, cl=cl)
314

    
315
    try:
316
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
317
    except Exception: # pylint: disable=W0703
318
      logging.exception("Error while activating disks")
319

    
320

    
321
def IsRapiResponding(hostname):
322
  """Connects to RAPI port and does a simple test.
323

324
  Connects to RAPI port of hostname and does a simple test. At this time, the
325
  test is GetVersion.
326

327
  @type hostname: string
328
  @param hostname: hostname of the node to connect to.
329
  @rtype: bool
330
  @return: Whether RAPI is working properly
331

332
  """
333
  curl_config = rapi.client.GenericCurlConfig()
334
  rapi_client = rapi.client.GanetiRapiClient(hostname,
335
                                             curl_config_fn=curl_config)
336
  try:
337
    master_version = rapi_client.GetVersion()
338
  except rapi.client.CertificateError, err:
339
    logging.warning("RAPI certificate error: %s", err)
340
    return False
341
  except rapi.client.GanetiApiError, err:
342
    logging.warning("RAPI error: %s", err)
343
    return False
344
  else:
345
    logging.debug("Reported RAPI version %s", master_version)
346
    return master_version == constants.RAPI_VERSION
347

    
348

    
349
def ParseOptions():
350
  """Parse the command line options.
351

352
  @return: (options, args) as from OptionParser.parse_args()
353

354
  """
355
  parser = OptionParser(description="Ganeti cluster watcher",
356
                        usage="%prog [-d]",
357
                        version="%%prog (ganeti) %s" %
358
                        constants.RELEASE_VERSION)
359

    
360
  parser.add_option(cli.DEBUG_OPT)
361
  parser.add_option(cli.NODEGROUP_OPT)
362
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
363
                    help="Autoarchive jobs older than this age (default"
364
                          " 6 hours)")
365
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
366
                    action="store_true", help="Ignore cluster pause setting")
367
  parser.add_option("--wait-children", dest="wait_children",
368
                    action="store_true", help="Wait for child processes")
369
  parser.add_option("--no-wait-children", dest="wait_children",
370
                    action="store_false", help="Don't wait for child processes")
371
  # See optparse documentation for why default values are not set by options
372
  parser.set_defaults(wait_children=True)
373
  options, args = parser.parse_args()
374
  options.job_age = cli.ParseTimespec(options.job_age)
375

    
376
  if args:
377
    parser.error("No arguments expected")
378

    
379
  return (options, args)
380

    
381

    
382
def _WriteInstanceStatus(filename, data):
383
  """Writes the per-group instance status file.
384

385
  The entries are sorted.
386

387
  @type filename: string
388
  @param filename: Path to instance status file
389
  @type data: list of tuple; (instance name as string, status as string)
390
  @param data: Instance name and status
391

392
  """
393
  logging.debug("Updating instance status file '%s' with %s instances",
394
                filename, len(data))
395

    
396
  utils.WriteFile(filename,
397
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
398
                                   sorted(data))))
399

    
400

    
401
def _UpdateInstanceStatus(filename, instances):
402
  """Writes an instance status file from L{Instance} objects.
403

404
  @type filename: string
405
  @param filename: Path to status file
406
  @type instances: list of L{Instance}
407

408
  """
409
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
410
                                  for inst in instances])
411

    
412

    
413
class _StatCb:
414
  """Helper to store file handle's C{fstat}.
415

416
  """
417
  def __init__(self):
418
    """Initializes this class.
419

420
    """
421
    self.st = None
422

    
423
  def __call__(self, fh):
424
    """Calls C{fstat} on file handle.
425

426
    """
427
    self.st = os.fstat(fh.fileno())
428

    
429

    
430
def _ReadInstanceStatus(filename):
431
  """Reads an instance status file.
432

433
  @type filename: string
434
  @param filename: Path to status file
435
  @rtype: tuple; (None or number, list of lists containing instance name and
436
    status)
437
  @return: File's mtime and instance status contained in the file; mtime is
438
    C{None} if file can't be read
439

440
  """
441
  logging.debug("Reading per-group instance status from '%s'", filename)
442

    
443
  statcb = _StatCb()
444
  try:
445
    content = utils.ReadFile(filename, preread=statcb)
446
  except EnvironmentError, err:
447
    if err.errno == errno.ENOENT:
448
      logging.error("Can't read '%s', does not exist (yet)", filename)
449
    else:
450
      logging.exception("Unable to read '%s', ignoring", filename)
451
    return (None, None)
452
  else:
453
    return (statcb.st.st_mtime, [line.split(None, 1)
454
                                 for line in content.splitlines()])
455

    
456

    
457
def _MergeInstanceStatus(filename, pergroup_filename, groups):
458
  """Merges all per-group instance status files into a global one.
459

460
  @type filename: string
461
  @param filename: Path to global instance status file
462
  @type pergroup_filename: string
463
  @param pergroup_filename: Path to per-group status files, must contain "%s"
464
    to be replaced with group UUID
465
  @type groups: sequence
466
  @param groups: UUIDs of known groups
467

468
  """
469
  # Lock global status file in exclusive mode
470
  lock = utils.FileLock.Open(filename)
471
  try:
472
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
473
  except errors.LockError, err:
474
    # All per-group processes will lock and update the file. None of them
475
    # should take longer than 10 seconds (the value of
476
    # INSTANCE_STATUS_LOCK_TIMEOUT).
477
    logging.error("Can't acquire lock on instance status file '%s', not"
478
                  " updating: %s", filename, err)
479
    return
480

    
481
  logging.debug("Acquired exclusive lock on '%s'", filename)
482

    
483
  data = {}
484

    
485
  # Load instance status from all groups
486
  for group_uuid in groups:
487
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
488

    
489
    if mtime is not None:
490
      for (instance_name, status) in instdata:
491
        data.setdefault(instance_name, []).append((mtime, status))
492

    
493
  # Select last update based on file mtime
494
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
495
                for (instance_name, status) in data.items()]
496

    
497
  # Write the global status file. Don't touch file after it's been
498
  # updated--there is no lock anymore.
499
  _WriteInstanceStatus(filename, inststatus)
500

    
501

    
502
def GetLuxiClient(try_restart):
503
  """Tries to connect to the master daemon.
504

505
  @type try_restart: bool
506
  @param try_restart: Whether to attempt to restart the master daemon
507

508
  """
509
  try:
510
    return cli.GetClient()
511
  except errors.OpPrereqError, err:
512
    # this is, from cli.GetClient, a not-master case
513
    raise NotMasterError("Not on master node (%s)" % err)
514

    
515
  except luxi.NoMasterError, err:
516
    if not try_restart:
517
      raise
518

    
519
    logging.warning("Master daemon seems to be down (%s), trying to restart",
520
                    err)
521

    
522
    if not utils.EnsureDaemon(constants.MASTERD):
523
      raise errors.GenericError("Can't start the master daemon")
524

    
525
    # Retry the connection
526
    return cli.GetClient()
527

    
528

    
529
def _StartGroupChildren(cl, wait):
530
  """Starts a new instance of the watcher for every node group.
531

532
  """
533
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
534
                        for arg in sys.argv)
535

    
536
  result = cl.QueryGroups([], ["name", "uuid"], False)
537

    
538
  children = []
539

    
540
  for (idx, (name, uuid)) in enumerate(result):
541
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
542

    
543
    if idx > 0:
544
      # Let's not kill the system
545
      time.sleep(CHILD_PROCESS_DELAY)
546

    
547
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
548
                  name, uuid, args)
549

    
550
    try:
551
      # TODO: Should utils.StartDaemon be used instead?
552
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
553
    except Exception: # pylint: disable=W0703
554
      logging.exception("Failed to start child for group '%s' (%s)",
555
                        name, uuid)
556
    else:
557
      logging.debug("Started with PID %s", pid)
558
      children.append(pid)
559

    
560
  if wait:
561
    for pid in children:
562
      logging.debug("Waiting for child PID %s", pid)
563
      try:
564
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
565
      except EnvironmentError, err:
566
        result = str(err)
567

    
568
      logging.debug("Child PID %s exited with status %s", pid, result)
569

    
570

    
571
def _ArchiveJobs(cl, age):
572
  """Archives old jobs.
573

574
  """
575
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
576
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
577

    
578

    
579
def _CheckMaster(cl):
580
  """Ensures current host is master node.
581

582
  """
583
  (master, ) = cl.QueryConfigValues(["master_node"])
584
  if master != netutils.Hostname.GetSysName():
585
    raise NotMasterError("This is not the master node")
586

    
587

    
588
@rapi.client.UsesRapiClient
589
def _GlobalWatcher(opts):
590
  """Main function for global watcher.
591

592
  At the end child processes are spawned for every node group.
593

594
  """
595
  StartNodeDaemons()
596
  RunWatcherHooks()
597

    
598
  # Run node maintenance in all cases, even if master, so that old masters can
599
  # be properly cleaned up
600
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
601
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
602

    
603
  try:
604
    client = GetLuxiClient(True)
605
  except NotMasterError:
606
    # Don't proceed on non-master nodes
607
    return constants.EXIT_SUCCESS
608

    
609
  # we are on master now
610
  utils.EnsureDaemon(constants.RAPI)
611

    
612
  # If RAPI isn't responding to queries, try one restart
613
  logging.debug("Attempting to talk to remote API on %s",
614
                constants.IP4_ADDRESS_LOCALHOST)
615
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616
    logging.warning("Couldn't get answer from remote API, restaring daemon")
617
    utils.StopDaemon(constants.RAPI)
618
    utils.EnsureDaemon(constants.RAPI)
619
    logging.debug("Second attempt to talk to remote API")
620
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
621
      logging.fatal("RAPI is not responding")
622
  logging.debug("Successfully talked to remote API")
623

    
624
  _CheckMaster(client)
625
  _ArchiveJobs(client, opts.job_age)
626

    
627
  # Spawn child processes for all node groups
628
  _StartGroupChildren(client, opts.wait_children)
629

    
630
  return constants.EXIT_SUCCESS
631

    
632

    
633
def _GetGroupData(cl, uuid):
634
  """Retrieves instances and nodes per node group.
635

636
  """
637
  job = [
638
    # Get all primary instances in group
639
    opcodes.OpQuery(what=constants.QR_INSTANCE,
640
                    fields=["name", "status", "admin_state", "snodes",
641
                            "pnode.group.uuid", "snodes.group.uuid"],
642
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
643
                    use_locking=True),
644

    
645
    # Get all nodes in group
646
    opcodes.OpQuery(what=constants.QR_NODE,
647
                    fields=["name", "bootid", "offline"],
648
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
649
                    use_locking=True),
650
    ]
651

    
652
  job_id = cl.SubmitJob(job)
653
  results = map(objects.QueryResponse.FromDict,
654
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
655
  cl.ArchiveJob(job_id)
656

    
657
  results_data = map(operator.attrgetter("data"), results)
658

    
659
  # Ensure results are tuples with two values
660
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
661

    
662
  # Extract values ignoring result status
663
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
664
                                 for values in res]
665
                                for res in results_data]
666

    
667
  secondaries = {}
668
  instances = []
669

    
670
  # Load all instances
671
  for (name, status, autostart, snodes, pnode_group_uuid,
672
       snodes_group_uuid) in raw_instances:
673
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
674
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
675
                    " groups %s", name, pnode_group_uuid,
676
                    utils.CommaJoin(snodes_group_uuid))
677
    else:
678
      instances.append(Instance(name, status, autostart, snodes))
679

    
680
      for node in snodes:
681
        secondaries.setdefault(node, set()).add(name)
682

    
683
  # Load all nodes
684
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
685
           for (name, bootid, offline) in raw_nodes]
686

    
687
  return (dict((node.name, node) for node in nodes),
688
          dict((inst.name, inst) for inst in instances))
689

    
690

    
691
def _LoadKnownGroups():
692
  """Returns a list of all node groups known by L{ssconf}.
693

694
  """
695
  groups = ssconf.SimpleStore().GetNodegroupList()
696

    
697
  result = list(line.split(None, 1)[0] for line in groups
698
                if line.strip())
699

    
700
  if not compat.all(map(utils.UUID_RE.match, result)):
701
    raise errors.GenericError("Ssconf contains invalid group UUID")
702

    
703
  return result
704

    
705

    
706
def _GroupWatcher(opts):
707
  """Main function for per-group watcher process.
708

709
  """
710
  group_uuid = opts.nodegroup.lower()
711

    
712
  if not utils.UUID_RE.match(group_uuid):
713
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
714
                              " got '%s'" %
715
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
716

    
717
  logging.info("Watcher for node group '%s'", group_uuid)
718

    
719
  known_groups = _LoadKnownGroups()
720

    
721
  # Check if node group is known
722
  if group_uuid not in known_groups:
723
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
724
                              group_uuid)
725

    
726
  # Group UUID has been verified and should not contain any dangerous characters
727
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
728
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
729

    
730
  logging.debug("Using state file %s", state_path)
731

    
732
  # Global watcher
733
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
734
  if not statefile:
735
    return constants.EXIT_FAILURE
736

    
737
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
738
  try:
739
    # Connect to master daemon
740
    client = GetLuxiClient(False)
741

    
742
    _CheckMaster(client)
743

    
744
    (nodes, instances) = _GetGroupData(client, group_uuid)
745

    
746
    # Update per-group instance status file
747
    _UpdateInstanceStatus(inst_status_path, instances.values())
748

    
749
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
750
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
751
                         known_groups)
752

    
753
    started = _CheckInstances(client, notepad, instances)
754
    _CheckDisks(client, notepad, nodes, instances, started)
755
    _VerifyDisks(client, group_uuid, nodes, instances)
756
  except Exception, err:
757
    logging.info("Not updating status file due to failure: %s", err)
758
    raise
759
  else:
760
    # Save changes for next run
761
    notepad.Save(state_path)
762

    
763
  return constants.EXIT_SUCCESS
764

    
765

    
766
def Main():
767
  """Main function.
768

769
  """
770
  (options, _) = ParseOptions()
771

    
772
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
773
                     debug=options.debug, stderr_logging=options.debug)
774

    
775
  if ShouldPause() and not options.ignore_pause:
776
    logging.debug("Pause has been set, exiting")
777
    return constants.EXIT_SUCCESS
778

    
779
  # Try to acquire global watcher lock in shared mode
780
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
781
  try:
782
    lock.Shared(blocking=False)
783
  except (EnvironmentError, errors.LockError), err:
784
    logging.error("Can't acquire lock on %s: %s",
785
                  constants.WATCHER_LOCK_FILE, err)
786
    return constants.EXIT_SUCCESS
787

    
788
  if options.nodegroup is None:
789
    fn = _GlobalWatcher
790
  else:
791
    # Per-nodegroup watcher
792
    fn = _GroupWatcher
793

    
794
  try:
795
    return fn(options)
796
  except (SystemExit, KeyboardInterrupt):
797
    raise
798
  except NotMasterError:
799
    logging.debug("Not master, exiting")
800
    return constants.EXIT_NOTMASTER
801
  except errors.ResolverError, err:
802
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
803
    return constants.EXIT_NODESETUP_ERROR
804
  except errors.JobQueueFull:
805
    logging.error("Job queue is full, can't query cluster state")
806
  except errors.JobQueueDrainError:
807
    logging.error("Job queue is drained, can't maintain cluster state")
808
  except Exception, err:
809
    logging.exception(str(err))
810
    return constants.EXIT_FAILURE
811

    
812
  return constants.EXIT_SUCCESS