Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ b8028dcf

History | View | Annotate | Download (24.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99

    
100

    
101
def RunWatcherHooks():
102
  """Run the watcher hooks.
103

104
  """
105
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
106
                             constants.HOOKS_NAME_WATCHER)
107
  if not os.path.isdir(hooks_dir):
108
    return
109

    
110
  try:
111
    results = utils.RunParts(hooks_dir)
112
  except Exception, err: # pylint: disable=W0703
113
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
114
    return
115

    
116
  for (relname, status, runresult) in results:
117
    if status == constants.RUNPARTS_SKIP:
118
      logging.debug("Watcher hook %s: skipped", relname)
119
    elif status == constants.RUNPARTS_ERR:
120
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
121
    elif status == constants.RUNPARTS_RUN:
122
      if runresult.failed:
123
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
124
                        relname, runresult.exit_code, runresult.output)
125
      else:
126
        logging.debug("Watcher hook %s: success (output: %s)", relname,
127
                      runresult.output)
128
    else:
129
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
130
                                   status)
131

    
132

    
133
class Instance(object):
134
  """Abstraction for a Virtual Machine instance.
135

136
  """
137
  def __init__(self, name, status, autostart, snodes):
138
    self.name = name
139
    self.status = status
140
    self.autostart = autostart
141
    self.snodes = snodes
142

    
143
  def Restart(self, cl):
144
    """Encapsulates the start of an instance.
145

146
    """
147
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
148
    cli.SubmitOpCode(op, cl=cl)
149

    
150
  def ActivateDisks(self, cl):
151
    """Encapsulates the activation of all disks of an instance.
152

153
    """
154
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
155
    cli.SubmitOpCode(op, cl=cl)
156

    
157

    
158
class Node:
159
  """Data container representing cluster node.
160

161
  """
162
  def __init__(self, name, bootid, offline, secondaries):
163
    """Initializes this class.
164

165
    """
166
    self.name = name
167
    self.bootid = bootid
168
    self.offline = offline
169
    self.secondaries = secondaries
170

    
171

    
172
def _CheckInstances(cl, notepad, instances):
173
  """Make a pass over the list of instances, restarting downed ones.
174

175
  """
176
  notepad.MaintainInstanceList(instances.keys())
177

    
178
  started = set()
179

    
180
  for inst in instances.values():
181
    if inst.status in BAD_STATES:
182
      n = notepad.NumberOfRestartAttempts(inst.name)
183

    
184
      if n > MAXTRIES:
185
        logging.warning("Not restarting instance '%s', retries exhausted",
186
                        inst.name)
187
        continue
188

    
189
      if n == MAXTRIES:
190
        notepad.RecordRestartAttempt(inst.name)
191
        logging.error("Could not restart instance '%s' after %s attempts,"
192
                      " giving up", inst.name, MAXTRIES)
193
        continue
194

    
195
      try:
196
        logging.info("Restarting instance '%s' (attempt #%s)",
197
                     inst.name, n + 1)
198
        inst.Restart(cl)
199
      except Exception: # pylint: disable=W0703
200
        logging.exception("Error while restarting instance '%s'", inst.name)
201
      else:
202
        started.add(inst.name)
203

    
204
      notepad.RecordRestartAttempt(inst.name)
205

    
206
    else:
207
      if notepad.NumberOfRestartAttempts(inst.name):
208
        notepad.RemoveInstance(inst.name)
209
        if inst.status not in HELPLESS_STATES:
210
          logging.info("Restart of instance '%s' succeeded", inst.name)
211

    
212
  return started
213

    
214

    
215
def _CheckDisks(cl, notepad, nodes, instances, started):
216
  """Check all nodes for restarted ones.
217

218
  """
219
  check_nodes = []
220

    
221
  for node in nodes.values():
222
    old = notepad.GetNodeBootID(node.name)
223
    if not node.bootid:
224
      # Bad node, not returning a boot id
225
      if not node.offline:
226
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
227
                      node.name)
228
      continue
229

    
230
    if old != node.bootid:
231
      # Node's boot ID has changed, probably through a reboot
232
      check_nodes.append(node)
233

    
234
  if check_nodes:
235
    # Activate disks for all instances with any of the checked nodes as a
236
    # secondary node.
237
    for node in check_nodes:
238
      for instance_name in node.secondaries:
239
        try:
240
          inst = instances[instance_name]
241
        except KeyError:
242
          logging.info("Can't find instance '%s', maybe it was ignored",
243
                       instance_name)
244
          continue
245

    
246
        if not inst.autostart:
247
          logging.info("Skipping disk activation for non-autostart"
248
                       " instance '%s'", inst.name)
249
          continue
250

    
251
        if inst.name in started:
252
          # we already tried to start the instance, which should have
253
          # activated its drives (if they can be at all)
254
          logging.debug("Skipping disk activation for instance '%s' as"
255
                        " it was already started", inst.name)
256
          continue
257

    
258
        try:
259
          logging.info("Activating disks for instance '%s'", inst.name)
260
          inst.ActivateDisks(cl)
261
        except Exception: # pylint: disable=W0703
262
          logging.exception("Error while activating disks for instance '%s'",
263
                            inst.name)
264

    
265
    # Keep changed boot IDs
266
    for node in check_nodes:
267
      notepad.SetNodeBootID(node.name, node.bootid)
268

    
269

    
270
def _CheckForOfflineNodes(nodes, instance):
271
  """Checks if given instances has any secondary in offline status.
272

273
  @param instance: The instance object
274
  @return: True if any of the secondary is offline, False otherwise
275

276
  """
277
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
278

    
279

    
280
def _VerifyDisks(cl, uuid, nodes, instances):
281
  """Run a per-group "gnt-cluster verify-disks".
282

283
  """
284
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
285
  ((_, offline_disk_instances, _), ) = \
286
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
287
  cl.ArchiveJob(job_id)
288

    
289
  if not offline_disk_instances:
290
    # nothing to do
291
    logging.debug("Verify-disks reported no offline disks, nothing to do")
292
    return
293

    
294
  logging.debug("Will activate disks for instance(s) %s",
295
                utils.CommaJoin(offline_disk_instances))
296

    
297
  # We submit only one job, and wait for it. Not optimal, but this puts less
298
  # load on the job queue.
299
  job = []
300
  for name in offline_disk_instances:
301
    try:
302
      inst = instances[name]
303
    except KeyError:
304
      logging.info("Can't find instance '%s', maybe it was ignored", name)
305
      continue
306

    
307
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
308
      logging.info("Skipping instance '%s' because it is in a helpless state"
309
                   " or has offline secondaries", name)
310
      continue
311

    
312
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
313

    
314
  if job:
315
    job_id = cli.SendJob(job, cl=cl)
316

    
317
    try:
318
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
319
    except Exception: # pylint: disable=W0703
320
      logging.exception("Error while activating disks")
321

    
322

    
323
def IsRapiResponding(hostname):
324
  """Connects to RAPI port and does a simple test.
325

326
  Connects to RAPI port of hostname and does a simple test. At this time, the
327
  test is GetVersion.
328

329
  @type hostname: string
330
  @param hostname: hostname of the node to connect to.
331
  @rtype: bool
332
  @return: Whether RAPI is working properly
333

334
  """
335
  curl_config = rapi.client.GenericCurlConfig()
336
  rapi_client = rapi.client.GanetiRapiClient(hostname,
337
                                             curl_config_fn=curl_config)
338
  try:
339
    master_version = rapi_client.GetVersion()
340
  except rapi.client.CertificateError, err:
341
    logging.warning("RAPI certificate error: %s", err)
342
    return False
343
  except rapi.client.GanetiApiError, err:
344
    logging.warning("RAPI error: %s", err)
345
    return False
346
  else:
347
    logging.debug("Reported RAPI version %s", master_version)
348
    return master_version == constants.RAPI_VERSION
349

    
350

    
351
def ParseOptions():
352
  """Parse the command line options.
353

354
  @return: (options, args) as from OptionParser.parse_args()
355

356
  """
357
  parser = OptionParser(description="Ganeti cluster watcher",
358
                        usage="%prog [-d]",
359
                        version="%%prog (ganeti) %s" %
360
                        constants.RELEASE_VERSION)
361

    
362
  parser.add_option(cli.DEBUG_OPT)
363
  parser.add_option(cli.NODEGROUP_OPT)
364
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
365
                    help="Autoarchive jobs older than this age (default"
366
                          " 6 hours)")
367
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
368
                    action="store_true", help="Ignore cluster pause setting")
369
  parser.add_option("--wait-children", dest="wait_children",
370
                    action="store_true", help="Wait for child processes")
371
  parser.add_option("--no-wait-children", dest="wait_children",
372
                    action="store_false",
373
                    help="Don't wait for child processes")
374
  # See optparse documentation for why default values are not set by options
375
  parser.set_defaults(wait_children=True)
376
  options, args = parser.parse_args()
377
  options.job_age = cli.ParseTimespec(options.job_age)
378

    
379
  if args:
380
    parser.error("No arguments expected")
381

    
382
  return (options, args)
383

    
384

    
385
def _WriteInstanceStatus(filename, data):
386
  """Writes the per-group instance status file.
387

388
  The entries are sorted.
389

390
  @type filename: string
391
  @param filename: Path to instance status file
392
  @type data: list of tuple; (instance name as string, status as string)
393
  @param data: Instance name and status
394

395
  """
396
  logging.debug("Updating instance status file '%s' with %s instances",
397
                filename, len(data))
398

    
399
  utils.WriteFile(filename,
400
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
401
                                   sorted(data))))
402

    
403

    
404
def _UpdateInstanceStatus(filename, instances):
405
  """Writes an instance status file from L{Instance} objects.
406

407
  @type filename: string
408
  @param filename: Path to status file
409
  @type instances: list of L{Instance}
410

411
  """
412
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
413
                                  for inst in instances])
414

    
415

    
416
def _ReadInstanceStatus(filename):
417
  """Reads an instance status file.
418

419
  @type filename: string
420
  @param filename: Path to status file
421
  @rtype: tuple; (None or number, list of lists containing instance name and
422
    status)
423
  @return: File's mtime and instance status contained in the file; mtime is
424
    C{None} if file can't be read
425

426
  """
427
  logging.debug("Reading per-group instance status from '%s'", filename)
428

    
429
  statcb = utils.FileStatHelper()
430
  try:
431
    content = utils.ReadFile(filename, preread=statcb)
432
  except EnvironmentError, err:
433
    if err.errno == errno.ENOENT:
434
      logging.error("Can't read '%s', does not exist (yet)", filename)
435
    else:
436
      logging.exception("Unable to read '%s', ignoring", filename)
437
    return (None, None)
438
  else:
439
    return (statcb.st.st_mtime, [line.split(None, 1)
440
                                 for line in content.splitlines()])
441

    
442

    
443
def _MergeInstanceStatus(filename, pergroup_filename, groups):
444
  """Merges all per-group instance status files into a global one.
445

446
  @type filename: string
447
  @param filename: Path to global instance status file
448
  @type pergroup_filename: string
449
  @param pergroup_filename: Path to per-group status files, must contain "%s"
450
    to be replaced with group UUID
451
  @type groups: sequence
452
  @param groups: UUIDs of known groups
453

454
  """
455
  # Lock global status file in exclusive mode
456
  lock = utils.FileLock.Open(filename)
457
  try:
458
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
459
  except errors.LockError, err:
460
    # All per-group processes will lock and update the file. None of them
461
    # should take longer than 10 seconds (the value of
462
    # INSTANCE_STATUS_LOCK_TIMEOUT).
463
    logging.error("Can't acquire lock on instance status file '%s', not"
464
                  " updating: %s", filename, err)
465
    return
466

    
467
  logging.debug("Acquired exclusive lock on '%s'", filename)
468

    
469
  data = {}
470

    
471
  # Load instance status from all groups
472
  for group_uuid in groups:
473
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
474

    
475
    if mtime is not None:
476
      for (instance_name, status) in instdata:
477
        data.setdefault(instance_name, []).append((mtime, status))
478

    
479
  # Select last update based on file mtime
480
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
481
                for (instance_name, status) in data.items()]
482

    
483
  # Write the global status file. Don't touch file after it's been
484
  # updated--there is no lock anymore.
485
  _WriteInstanceStatus(filename, inststatus)
486

    
487

    
488
def GetLuxiClient(try_restart):
489
  """Tries to connect to the master daemon.
490

491
  @type try_restart: bool
492
  @param try_restart: Whether to attempt to restart the master daemon
493

494
  """
495
  try:
496
    return cli.GetClient()
497
  except errors.OpPrereqError, err:
498
    # this is, from cli.GetClient, a not-master case
499
    raise NotMasterError("Not on master node (%s)" % err)
500

    
501
  except luxi.NoMasterError, err:
502
    if not try_restart:
503
      raise
504

    
505
    logging.warning("Master daemon seems to be down (%s), trying to restart",
506
                    err)
507

    
508
    if not utils.EnsureDaemon(constants.MASTERD):
509
      raise errors.GenericError("Can't start the master daemon")
510

    
511
    # Retry the connection
512
    return cli.GetClient()
513

    
514

    
515
def _StartGroupChildren(cl, wait):
516
  """Starts a new instance of the watcher for every node group.
517

518
  """
519
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
520
                        for arg in sys.argv)
521

    
522
  result = cl.QueryGroups([], ["name", "uuid"], False)
523

    
524
  children = []
525

    
526
  for (idx, (name, uuid)) in enumerate(result):
527
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
528

    
529
    if idx > 0:
530
      # Let's not kill the system
531
      time.sleep(CHILD_PROCESS_DELAY)
532

    
533
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
534
                  name, uuid, args)
535

    
536
    try:
537
      # TODO: Should utils.StartDaemon be used instead?
538
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
539
    except Exception: # pylint: disable=W0703
540
      logging.exception("Failed to start child for group '%s' (%s)",
541
                        name, uuid)
542
    else:
543
      logging.debug("Started with PID %s", pid)
544
      children.append(pid)
545

    
546
  if wait:
547
    for pid in children:
548
      logging.debug("Waiting for child PID %s", pid)
549
      try:
550
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
551
      except EnvironmentError, err:
552
        result = str(err)
553

    
554
      logging.debug("Child PID %s exited with status %s", pid, result)
555

    
556

    
557
def _ArchiveJobs(cl, age):
558
  """Archives old jobs.
559

560
  """
561
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
562
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
563

    
564

    
565
def _CheckMaster(cl):
566
  """Ensures current host is master node.
567

568
  """
569
  (master, ) = cl.QueryConfigValues(["master_node"])
570
  if master != netutils.Hostname.GetSysName():
571
    raise NotMasterError("This is not the master node")
572

    
573

    
574
@UsesRapiClient
575
def _GlobalWatcher(opts):
576
  """Main function for global watcher.
577

578
  At the end child processes are spawned for every node group.
579

580
  """
581
  StartNodeDaemons()
582
  RunWatcherHooks()
583

    
584
  # Run node maintenance in all cases, even if master, so that old masters can
585
  # be properly cleaned up
586
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
587
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
588

    
589
  try:
590
    client = GetLuxiClient(True)
591
  except NotMasterError:
592
    # Don't proceed on non-master nodes
593
    return constants.EXIT_SUCCESS
594

    
595
  # we are on master now
596
  utils.EnsureDaemon(constants.RAPI)
597

    
598
  # If RAPI isn't responding to queries, try one restart
599
  logging.debug("Attempting to talk to remote API on %s",
600
                constants.IP4_ADDRESS_LOCALHOST)
601
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
602
    logging.warning("Couldn't get answer from remote API, restaring daemon")
603
    utils.StopDaemon(constants.RAPI)
604
    utils.EnsureDaemon(constants.RAPI)
605
    logging.debug("Second attempt to talk to remote API")
606
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
607
      logging.fatal("RAPI is not responding")
608
  logging.debug("Successfully talked to remote API")
609

    
610
  _CheckMaster(client)
611
  _ArchiveJobs(client, opts.job_age)
612

    
613
  # Spawn child processes for all node groups
614
  _StartGroupChildren(client, opts.wait_children)
615

    
616
  return constants.EXIT_SUCCESS
617

    
618

    
619
def _GetGroupData(cl, uuid):
620
  """Retrieves instances and nodes per node group.
621

622
  """
623
  job = [
624
    # Get all primary instances in group
625
    opcodes.OpQuery(what=constants.QR_INSTANCE,
626
                    fields=["name", "status", "admin_state", "snodes",
627
                            "pnode.group.uuid", "snodes.group.uuid"],
628
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
629
                    use_locking=True),
630

    
631
    # Get all nodes in group
632
    opcodes.OpQuery(what=constants.QR_NODE,
633
                    fields=["name", "bootid", "offline"],
634
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
635
                    use_locking=True),
636
    ]
637

    
638
  job_id = cl.SubmitJob(job)
639
  results = map(objects.QueryResponse.FromDict,
640
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
641
  cl.ArchiveJob(job_id)
642

    
643
  results_data = map(operator.attrgetter("data"), results)
644

    
645
  # Ensure results are tuples with two values
646
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
647

    
648
  # Extract values ignoring result status
649
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
650
                                 for values in res]
651
                                for res in results_data]
652

    
653
  secondaries = {}
654
  instances = []
655

    
656
  # Load all instances
657
  for (name, status, autostart, snodes, pnode_group_uuid,
658
       snodes_group_uuid) in raw_instances:
659
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
660
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
661
                    " groups %s", name, pnode_group_uuid,
662
                    utils.CommaJoin(snodes_group_uuid))
663
    else:
664
      instances.append(Instance(name, status, autostart, snodes))
665

    
666
      for node in snodes:
667
        secondaries.setdefault(node, set()).add(name)
668

    
669
  # Load all nodes
670
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
671
           for (name, bootid, offline) in raw_nodes]
672

    
673
  return (dict((node.name, node) for node in nodes),
674
          dict((inst.name, inst) for inst in instances))
675

    
676

    
677
def _LoadKnownGroups():
678
  """Returns a list of all node groups known by L{ssconf}.
679

680
  """
681
  groups = ssconf.SimpleStore().GetNodegroupList()
682

    
683
  result = list(line.split(None, 1)[0] for line in groups
684
                if line.strip())
685

    
686
  if not compat.all(map(utils.UUID_RE.match, result)):
687
    raise errors.GenericError("Ssconf contains invalid group UUID")
688

    
689
  return result
690

    
691

    
692
def _GroupWatcher(opts):
693
  """Main function for per-group watcher process.
694

695
  """
696
  group_uuid = opts.nodegroup.lower()
697

    
698
  if not utils.UUID_RE.match(group_uuid):
699
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
700
                              " got '%s'" %
701
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
702

    
703
  logging.info("Watcher for node group '%s'", group_uuid)
704

    
705
  known_groups = _LoadKnownGroups()
706

    
707
  # Check if node group is known
708
  if group_uuid not in known_groups:
709
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
710
                              group_uuid)
711

    
712
  # Group UUID has been verified and should not contain any dangerous
713
  # characters
714
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
715
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
716

    
717
  logging.debug("Using state file %s", state_path)
718

    
719
  # Global watcher
720
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
721
  if not statefile:
722
    return constants.EXIT_FAILURE
723

    
724
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
725
  try:
726
    # Connect to master daemon
727
    client = GetLuxiClient(False)
728

    
729
    _CheckMaster(client)
730

    
731
    (nodes, instances) = _GetGroupData(client, group_uuid)
732

    
733
    # Update per-group instance status file
734
    _UpdateInstanceStatus(inst_status_path, instances.values())
735

    
736
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
737
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
738
                         known_groups)
739

    
740
    started = _CheckInstances(client, notepad, instances)
741
    _CheckDisks(client, notepad, nodes, instances, started)
742
    _VerifyDisks(client, group_uuid, nodes, instances)
743
  except Exception, err:
744
    logging.info("Not updating status file due to failure: %s", err)
745
    raise
746
  else:
747
    # Save changes for next run
748
    notepad.Save(state_path)
749

    
750
  return constants.EXIT_SUCCESS
751

    
752

    
753
def Main():
754
  """Main function.
755

756
  """
757
  (options, _) = ParseOptions()
758

    
759
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
760
                     debug=options.debug, stderr_logging=options.debug)
761

    
762
  if ShouldPause() and not options.ignore_pause:
763
    logging.debug("Pause has been set, exiting")
764
    return constants.EXIT_SUCCESS
765

    
766
  # Try to acquire global watcher lock in shared mode
767
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
768
  try:
769
    lock.Shared(blocking=False)
770
  except (EnvironmentError, errors.LockError), err:
771
    logging.error("Can't acquire lock on %s: %s",
772
                  pathutils.WATCHER_LOCK_FILE, err)
773
    return constants.EXIT_SUCCESS
774

    
775
  if options.nodegroup is None:
776
    fn = _GlobalWatcher
777
  else:
778
    # Per-nodegroup watcher
779
    fn = _GroupWatcher
780

    
781
  try:
782
    return fn(options)
783
  except (SystemExit, KeyboardInterrupt):
784
    raise
785
  except NotMasterError:
786
    logging.debug("Not master, exiting")
787
    return constants.EXIT_NOTMASTER
788
  except errors.ResolverError, err:
789
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
790
    return constants.EXIT_NODESETUP_ERROR
791
  except errors.JobQueueFull:
792
    logging.error("Job queue is full, can't query cluster state")
793
  except errors.JobQueueDrainError:
794
    logging.error("Job queue is drained, can't maintain cluster state")
795
  except Exception, err:
796
    logging.exception(str(err))
797
    return constants.EXIT_FAILURE
798

    
799
  return constants.EXIT_SUCCESS