Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ c300dbe4

History | View | Annotate | Download (24.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104

    
105
def RunWatcherHooks():
106
  """Run the watcher hooks.
107

108
  """
109
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
110
                             constants.HOOKS_NAME_WATCHER)
111
  if not os.path.isdir(hooks_dir):
112
    return
113

    
114
  try:
115
    results = utils.RunParts(hooks_dir)
116
  except Exception, err: # pylint: disable=W0703
117
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
118
    return
119

    
120
  for (relname, status, runresult) in results:
121
    if status == constants.RUNPARTS_SKIP:
122
      logging.debug("Watcher hook %s: skipped", relname)
123
    elif status == constants.RUNPARTS_ERR:
124
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
125
    elif status == constants.RUNPARTS_RUN:
126
      if runresult.failed:
127
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
128
                        relname, runresult.exit_code, runresult.output)
129
      else:
130
        logging.debug("Watcher hook %s: success (output: %s)", relname,
131
                      runresult.output)
132
    else:
133
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
134
                                   status)
135

    
136

    
137
class Instance(object):
138
  """Abstraction for a Virtual Machine instance.
139

140
  """
141
  def __init__(self, name, status, autostart, snodes):
142
    self.name = name
143
    self.status = status
144
    self.autostart = autostart
145
    self.snodes = snodes
146

    
147
  def Restart(self, cl):
148
    """Encapsulates the start of an instance.
149

150
    """
151
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
152
    cli.SubmitOpCode(op, cl=cl)
153

    
154
  def ActivateDisks(self, cl):
155
    """Encapsulates the activation of all disks of an instance.
156

157
    """
158
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
159
    cli.SubmitOpCode(op, cl=cl)
160

    
161

    
162
class Node:
163
  """Data container representing cluster node.
164

165
  """
166
  def __init__(self, name, bootid, offline, secondaries):
167
    """Initializes this class.
168

169
    """
170
    self.name = name
171
    self.bootid = bootid
172
    self.offline = offline
173
    self.secondaries = secondaries
174

    
175

    
176
def _CheckInstances(cl, notepad, instances):
177
  """Make a pass over the list of instances, restarting downed ones.
178

179
  """
180
  notepad.MaintainInstanceList(instances.keys())
181

    
182
  started = set()
183

    
184
  for inst in instances.values():
185
    if inst.status in BAD_STATES:
186
      n = notepad.NumberOfRestartAttempts(inst.name)
187

    
188
      if n > MAXTRIES:
189
        logging.warning("Not restarting instance '%s', retries exhausted",
190
                        inst.name)
191
        continue
192

    
193
      if n == MAXTRIES:
194
        notepad.RecordRestartAttempt(inst.name)
195
        logging.error("Could not restart instance '%s' after %s attempts,"
196
                      " giving up", inst.name, MAXTRIES)
197
        continue
198

    
199
      try:
200
        logging.info("Restarting instance '%s' (attempt #%s)",
201
                     inst.name, n + 1)
202
        inst.Restart(cl)
203
      except Exception: # pylint: disable=W0703
204
        logging.exception("Error while restarting instance '%s'", inst.name)
205
      else:
206
        started.add(inst.name)
207

    
208
      notepad.RecordRestartAttempt(inst.name)
209

    
210
    else:
211
      if notepad.NumberOfRestartAttempts(inst.name):
212
        notepad.RemoveInstance(inst.name)
213
        if inst.status not in HELPLESS_STATES:
214
          logging.info("Restart of instance '%s' succeeded", inst.name)
215

    
216
  return started
217

    
218

    
219
def _CheckDisks(cl, notepad, nodes, instances, started):
220
  """Check all nodes for restarted ones.
221

222
  """
223
  check_nodes = []
224

    
225
  for node in nodes.values():
226
    old = notepad.GetNodeBootID(node.name)
227
    if not node.bootid:
228
      # Bad node, not returning a boot id
229
      if not node.offline:
230
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
231
                      node.name)
232
      continue
233

    
234
    if old != node.bootid:
235
      # Node's boot ID has changed, probably through a reboot
236
      check_nodes.append(node)
237

    
238
  if check_nodes:
239
    # Activate disks for all instances with any of the checked nodes as a
240
    # secondary node.
241
    for node in check_nodes:
242
      for instance_name in node.secondaries:
243
        try:
244
          inst = instances[instance_name]
245
        except KeyError:
246
          logging.info("Can't find instance '%s', maybe it was ignored",
247
                       instance_name)
248
          continue
249

    
250
        if not inst.autostart:
251
          logging.info("Skipping disk activation for non-autostart"
252
                       " instance '%s'", inst.name)
253
          continue
254

    
255
        if inst.name in started:
256
          # we already tried to start the instance, which should have
257
          # activated its drives (if they can be at all)
258
          logging.debug("Skipping disk activation for instance '%s' as"
259
                        " it was already started", inst.name)
260
          continue
261

    
262
        try:
263
          logging.info("Activating disks for instance '%s'", inst.name)
264
          inst.ActivateDisks(cl)
265
        except Exception: # pylint: disable=W0703
266
          logging.exception("Error while activating disks for instance '%s'",
267
                            inst.name)
268

    
269
    # Keep changed boot IDs
270
    for node in check_nodes:
271
      notepad.SetNodeBootID(node.name, node.bootid)
272

    
273

    
274
def _CheckForOfflineNodes(nodes, instance):
275
  """Checks if given instances has any secondary in offline status.
276

277
  @param instance: The instance object
278
  @return: True if any of the secondary is offline, False otherwise
279

280
  """
281
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
282

    
283

    
284
def _VerifyDisks(cl, uuid, nodes, instances):
285
  """Run a per-group "gnt-cluster verify-disks".
286

287
  """
288
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
289
  ((_, offline_disk_instances, _), ) = \
290
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
291
  cl.ArchiveJob(job_id)
292

    
293
  if not offline_disk_instances:
294
    # nothing to do
295
    logging.debug("Verify-disks reported no offline disks, nothing to do")
296
    return
297

    
298
  logging.debug("Will activate disks for instance(s) %s",
299
                utils.CommaJoin(offline_disk_instances))
300

    
301
  # We submit only one job, and wait for it. Not optimal, but this puts less
302
  # load on the job queue.
303
  job = []
304
  for name in offline_disk_instances:
305
    try:
306
      inst = instances[name]
307
    except KeyError:
308
      logging.info("Can't find instance '%s', maybe it was ignored", name)
309
      continue
310

    
311
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
312
      logging.info("Skipping instance '%s' because it is in a helpless state"
313
                   " or has offline secondaries", name)
314
      continue
315

    
316
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
317

    
318
  if job:
319
    job_id = cli.SendJob(job, cl=cl)
320

    
321
    try:
322
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
323
    except Exception: # pylint: disable=W0703
324
      logging.exception("Error while activating disks")
325

    
326

    
327
def IsRapiResponding(hostname):
328
  """Connects to RAPI port and does a simple test.
329

330
  Connects to RAPI port of hostname and does a simple test. At this time, the
331
  test is GetVersion.
332

333
  @type hostname: string
334
  @param hostname: hostname of the node to connect to.
335
  @rtype: bool
336
  @return: Whether RAPI is working properly
337

338
  """
339
  curl_config = rapi.client.GenericCurlConfig()
340
  rapi_client = rapi.client.GanetiRapiClient(hostname,
341
                                             curl_config_fn=curl_config)
342
  try:
343
    master_version = rapi_client.GetVersion()
344
  except rapi.client.CertificateError, err:
345
    logging.warning("RAPI certificate error: %s", err)
346
    return False
347
  except rapi.client.GanetiApiError, err:
348
    logging.warning("RAPI error: %s", err)
349
    return False
350
  else:
351
    logging.debug("Reported RAPI version %s", master_version)
352
    return master_version == constants.RAPI_VERSION
353

    
354

    
355
def ParseOptions():
356
  """Parse the command line options.
357

358
  @return: (options, args) as from OptionParser.parse_args()
359

360
  """
361
  parser = OptionParser(description="Ganeti cluster watcher",
362
                        usage="%prog [-d]",
363
                        version="%%prog (ganeti) %s" %
364
                        constants.RELEASE_VERSION)
365

    
366
  parser.add_option(cli.DEBUG_OPT)
367
  parser.add_option(cli.NODEGROUP_OPT)
368
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
369
                    help="Autoarchive jobs older than this age (default"
370
                          " 6 hours)")
371
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
372
                    action="store_true", help="Ignore cluster pause setting")
373
  parser.add_option("--wait-children", dest="wait_children",
374
                    action="store_true", help="Wait for child processes")
375
  parser.add_option("--no-wait-children", dest="wait_children",
376
                    action="store_false",
377
                    help="Don't wait for child processes")
378
  # See optparse documentation for why default values are not set by options
379
  parser.set_defaults(wait_children=True)
380
  options, args = parser.parse_args()
381
  options.job_age = cli.ParseTimespec(options.job_age)
382

    
383
  if args:
384
    parser.error("No arguments expected")
385

    
386
  return (options, args)
387

    
388

    
389
def _WriteInstanceStatus(filename, data):
390
  """Writes the per-group instance status file.
391

392
  The entries are sorted.
393

394
  @type filename: string
395
  @param filename: Path to instance status file
396
  @type data: list of tuple; (instance name as string, status as string)
397
  @param data: Instance name and status
398

399
  """
400
  logging.debug("Updating instance status file '%s' with %s instances",
401
                filename, len(data))
402

    
403
  utils.WriteFile(filename,
404
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
405
                                   sorted(data))))
406

    
407

    
408
def _UpdateInstanceStatus(filename, instances):
409
  """Writes an instance status file from L{Instance} objects.
410

411
  @type filename: string
412
  @param filename: Path to status file
413
  @type instances: list of L{Instance}
414

415
  """
416
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
417
                                  for inst in instances])
418

    
419

    
420
def _ReadInstanceStatus(filename):
421
  """Reads an instance status file.
422

423
  @type filename: string
424
  @param filename: Path to status file
425
  @rtype: tuple; (None or number, list of lists containing instance name and
426
    status)
427
  @return: File's mtime and instance status contained in the file; mtime is
428
    C{None} if file can't be read
429

430
  """
431
  logging.debug("Reading per-group instance status from '%s'", filename)
432

    
433
  statcb = utils.FileStatHelper()
434
  try:
435
    content = utils.ReadFile(filename, preread=statcb)
436
  except EnvironmentError, err:
437
    if err.errno == errno.ENOENT:
438
      logging.error("Can't read '%s', does not exist (yet)", filename)
439
    else:
440
      logging.exception("Unable to read '%s', ignoring", filename)
441
    return (None, None)
442
  else:
443
    return (statcb.st.st_mtime, [line.split(None, 1)
444
                                 for line in content.splitlines()])
445

    
446

    
447
def _MergeInstanceStatus(filename, pergroup_filename, groups):
448
  """Merges all per-group instance status files into a global one.
449

450
  @type filename: string
451
  @param filename: Path to global instance status file
452
  @type pergroup_filename: string
453
  @param pergroup_filename: Path to per-group status files, must contain "%s"
454
    to be replaced with group UUID
455
  @type groups: sequence
456
  @param groups: UUIDs of known groups
457

458
  """
459
  # Lock global status file in exclusive mode
460
  lock = utils.FileLock.Open(filename)
461
  try:
462
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
463
  except errors.LockError, err:
464
    # All per-group processes will lock and update the file. None of them
465
    # should take longer than 10 seconds (the value of
466
    # INSTANCE_STATUS_LOCK_TIMEOUT).
467
    logging.error("Can't acquire lock on instance status file '%s', not"
468
                  " updating: %s", filename, err)
469
    return
470

    
471
  logging.debug("Acquired exclusive lock on '%s'", filename)
472

    
473
  data = {}
474

    
475
  # Load instance status from all groups
476
  for group_uuid in groups:
477
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
478

    
479
    if mtime is not None:
480
      for (instance_name, status) in instdata:
481
        data.setdefault(instance_name, []).append((mtime, status))
482

    
483
  # Select last update based on file mtime
484
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
485
                for (instance_name, status) in data.items()]
486

    
487
  # Write the global status file. Don't touch file after it's been
488
  # updated--there is no lock anymore.
489
  _WriteInstanceStatus(filename, inststatus)
490

    
491

    
492
def GetLuxiClient(try_restart):
493
  """Tries to connect to the master daemon.
494

495
  @type try_restart: bool
496
  @param try_restart: Whether to attempt to restart the master daemon
497

498
  """
499
  try:
500
    return cli.GetClient()
501
  except errors.OpPrereqError, err:
502
    # this is, from cli.GetClient, a not-master case
503
    raise NotMasterError("Not on master node (%s)" % err)
504

    
505
  except luxi.NoMasterError, err:
506
    if not try_restart:
507
      raise
508

    
509
    logging.warning("Master daemon seems to be down (%s), trying to restart",
510
                    err)
511

    
512
    if not utils.EnsureDaemon(constants.MASTERD):
513
      raise errors.GenericError("Can't start the master daemon")
514

    
515
    # Retry the connection
516
    return cli.GetClient()
517

    
518

    
519
def _StartGroupChildren(cl, wait):
520
  """Starts a new instance of the watcher for every node group.
521

522
  """
523
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
524
                        for arg in sys.argv)
525

    
526
  result = cl.QueryGroups([], ["name", "uuid"], False)
527

    
528
  children = []
529

    
530
  for (idx, (name, uuid)) in enumerate(result):
531
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
532

    
533
    if idx > 0:
534
      # Let's not kill the system
535
      time.sleep(CHILD_PROCESS_DELAY)
536

    
537
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
538
                  name, uuid, args)
539

    
540
    try:
541
      # TODO: Should utils.StartDaemon be used instead?
542
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
543
    except Exception: # pylint: disable=W0703
544
      logging.exception("Failed to start child for group '%s' (%s)",
545
                        name, uuid)
546
    else:
547
      logging.debug("Started with PID %s", pid)
548
      children.append(pid)
549

    
550
  if wait:
551
    for pid in children:
552
      logging.debug("Waiting for child PID %s", pid)
553
      try:
554
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
555
      except EnvironmentError, err:
556
        result = str(err)
557

    
558
      logging.debug("Child PID %s exited with status %s", pid, result)
559

    
560

    
561
def _ArchiveJobs(cl, age):
562
  """Archives old jobs.
563

564
  """
565
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
566
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
567

    
568

    
569
def _CheckMaster(cl):
570
  """Ensures current host is master node.
571

572
  """
573
  (master, ) = cl.QueryConfigValues(["master_node"])
574
  if master != netutils.Hostname.GetSysName():
575
    raise NotMasterError("This is not the master node")
576

    
577

    
578
@UsesRapiClient
579
def _GlobalWatcher(opts):
580
  """Main function for global watcher.
581

582
  At the end child processes are spawned for every node group.
583

584
  """
585
  StartNodeDaemons()
586
  RunWatcherHooks()
587

    
588
  # Run node maintenance in all cases, even if master, so that old masters can
589
  # be properly cleaned up
590
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
591
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
592

    
593
  try:
594
    client = GetLuxiClient(True)
595
  except NotMasterError:
596
    # Don't proceed on non-master nodes
597
    return constants.EXIT_SUCCESS
598

    
599
  # we are on master now
600
  utils.EnsureDaemon(constants.RAPI)
601

    
602
  # If RAPI isn't responding to queries, try one restart
603
  logging.debug("Attempting to talk to remote API on %s",
604
                constants.IP4_ADDRESS_LOCALHOST)
605
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
606
    logging.warning("Couldn't get answer from remote API, restaring daemon")
607
    utils.StopDaemon(constants.RAPI)
608
    utils.EnsureDaemon(constants.RAPI)
609
    logging.debug("Second attempt to talk to remote API")
610
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
611
      logging.fatal("RAPI is not responding")
612
  logging.debug("Successfully talked to remote API")
613

    
614
  _CheckMaster(client)
615
  _ArchiveJobs(client, opts.job_age)
616

    
617
  # Spawn child processes for all node groups
618
  _StartGroupChildren(client, opts.wait_children)
619

    
620
  return constants.EXIT_SUCCESS
621

    
622

    
623
def _GetGroupData(cl, uuid):
624
  """Retrieves instances and nodes per node group.
625

626
  """
627
  job = [
628
    # Get all primary instances in group
629
    opcodes.OpQuery(what=constants.QR_INSTANCE,
630
                    fields=["name", "status", "admin_state", "snodes",
631
                            "pnode.group.uuid", "snodes.group.uuid"],
632
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
633
                    use_locking=True),
634

    
635
    # Get all nodes in group
636
    opcodes.OpQuery(what=constants.QR_NODE,
637
                    fields=["name", "bootid", "offline"],
638
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
639
                    use_locking=True),
640
    ]
641

    
642
  job_id = cl.SubmitJob(job)
643
  results = map(objects.QueryResponse.FromDict,
644
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
645
  cl.ArchiveJob(job_id)
646

    
647
  results_data = map(operator.attrgetter("data"), results)
648

    
649
  # Ensure results are tuples with two values
650
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
651

    
652
  # Extract values ignoring result status
653
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
654
                                 for values in res]
655
                                for res in results_data]
656

    
657
  secondaries = {}
658
  instances = []
659

    
660
  # Load all instances
661
  for (name, status, autostart, snodes, pnode_group_uuid,
662
       snodes_group_uuid) in raw_instances:
663
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
664
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
665
                    " groups %s", name, pnode_group_uuid,
666
                    utils.CommaJoin(snodes_group_uuid))
667
    else:
668
      instances.append(Instance(name, status, autostart, snodes))
669

    
670
      for node in snodes:
671
        secondaries.setdefault(node, set()).add(name)
672

    
673
  # Load all nodes
674
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
675
           for (name, bootid, offline) in raw_nodes]
676

    
677
  return (dict((node.name, node) for node in nodes),
678
          dict((inst.name, inst) for inst in instances))
679

    
680

    
681
def _LoadKnownGroups():
682
  """Returns a list of all node groups known by L{ssconf}.
683

684
  """
685
  groups = ssconf.SimpleStore().GetNodegroupList()
686

    
687
  result = list(line.split(None, 1)[0] for line in groups
688
                if line.strip())
689

    
690
  if not compat.all(map(utils.UUID_RE.match, result)):
691
    raise errors.GenericError("Ssconf contains invalid group UUID")
692

    
693
  return result
694

    
695

    
696
def _GroupWatcher(opts):
697
  """Main function for per-group watcher process.
698

699
  """
700
  group_uuid = opts.nodegroup.lower()
701

    
702
  if not utils.UUID_RE.match(group_uuid):
703
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
704
                              " got '%s'" %
705
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
706

    
707
  logging.info("Watcher for node group '%s'", group_uuid)
708

    
709
  known_groups = _LoadKnownGroups()
710

    
711
  # Check if node group is known
712
  if group_uuid not in known_groups:
713
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
714
                              group_uuid)
715

    
716
  # Group UUID has been verified and should not contain any dangerous
717
  # characters
718
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
719
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
720

    
721
  logging.debug("Using state file %s", state_path)
722

    
723
  # Global watcher
724
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
725
  if not statefile:
726
    return constants.EXIT_FAILURE
727

    
728
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
729
  try:
730
    # Connect to master daemon
731
    client = GetLuxiClient(False)
732

    
733
    _CheckMaster(client)
734

    
735
    (nodes, instances) = _GetGroupData(client, group_uuid)
736

    
737
    # Update per-group instance status file
738
    _UpdateInstanceStatus(inst_status_path, instances.values())
739

    
740
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
741
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
742
                         known_groups)
743

    
744
    started = _CheckInstances(client, notepad, instances)
745
    _CheckDisks(client, notepad, nodes, instances, started)
746
    _VerifyDisks(client, group_uuid, nodes, instances)
747
  except Exception, err:
748
    logging.info("Not updating status file due to failure: %s", err)
749
    raise
750
  else:
751
    # Save changes for next run
752
    notepad.Save(state_path)
753

    
754
  return constants.EXIT_SUCCESS
755

    
756

    
757
def Main():
758
  """Main function.
759

760
  """
761
  (options, _) = ParseOptions()
762

    
763
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
764
                     debug=options.debug, stderr_logging=options.debug)
765

    
766
  if ShouldPause() and not options.ignore_pause:
767
    logging.debug("Pause has been set, exiting")
768
    return constants.EXIT_SUCCESS
769

    
770
  # Try to acquire global watcher lock in shared mode
771
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
772
  try:
773
    lock.Shared(blocking=False)
774
  except (EnvironmentError, errors.LockError), err:
775
    logging.error("Can't acquire lock on %s: %s",
776
                  pathutils.WATCHER_LOCK_FILE, err)
777
    return constants.EXIT_SUCCESS
778

    
779
  if options.nodegroup is None:
780
    fn = _GlobalWatcher
781
  else:
782
    # Per-nodegroup watcher
783
    fn = _GroupWatcher
784

    
785
  try:
786
    return fn(options)
787
  except (SystemExit, KeyboardInterrupt):
788
    raise
789
  except NotMasterError:
790
    logging.debug("Not master, exiting")
791
    return constants.EXIT_NOTMASTER
792
  except errors.ResolverError, err:
793
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
794
    return constants.EXIT_NODESETUP_ERROR
795
  except errors.JobQueueFull:
796
    logging.error("Job queue is full, can't query cluster state")
797
  except errors.JobQueueDrainError:
798
    logging.error("Job queue is drained, can't maintain cluster state")
799
  except Exception, err:
800
    logging.exception(str(err))
801
    return constants.EXIT_FAILURE
802

    
803
  return constants.EXIT_SUCCESS