Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 2e5c33db

History | View | Annotate | Download (24.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52

    
53
import ganeti.rapi.client # pylint: disable=W0611
54

    
55
from ganeti.watcher import nodemaint
56
from ganeti.watcher import state
57

    
58

    
59
MAXTRIES = 5
60
BAD_STATES = frozenset([
61
  constants.INSTST_ERRORDOWN,
62
  ])
63
HELPLESS_STATES = frozenset([
64
  constants.INSTST_NODEDOWN,
65
  constants.INSTST_NODEOFFLINE,
66
  ])
67
NOTICE = "NOTICE"
68
ERROR = "ERROR"
69

    
70
#: Number of seconds to wait between starting child processes for node groups
71
CHILD_PROCESS_DELAY = 1.0
72

    
73
#: How many seconds to wait for instance status file lock
74
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75

    
76

    
77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
79

    
80

    
81
def ShouldPause():
82
  """Check whether we should pause.
83

84
  """
85
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86

    
87

    
88
def StartNodeDaemons():
89
  """Start all the daemons that should be running on all nodes.
90

91
  """
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  utils.EnsureDaemon(constants.CONFD)
96

    
97

    
98
def RunWatcherHooks():
99
  """Run the watcher hooks.
100

101
  """
102
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103
                             constants.HOOKS_NAME_WATCHER)
104
  if not os.path.isdir(hooks_dir):
105
    return
106

    
107
  try:
108
    results = utils.RunParts(hooks_dir)
109
  except Exception, err: # pylint: disable=W0703
110
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
111
    return
112

    
113
  for (relname, status, runresult) in results:
114
    if status == constants.RUNPARTS_SKIP:
115
      logging.debug("Watcher hook %s: skipped", relname)
116
    elif status == constants.RUNPARTS_ERR:
117
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118
    elif status == constants.RUNPARTS_RUN:
119
      if runresult.failed:
120
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121
                        relname, runresult.exit_code, runresult.output)
122
      else:
123
        logging.debug("Watcher hook %s: success (output: %s)", relname,
124
                      runresult.output)
125
    else:
126
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
127
                                   status)
128

    
129

    
130
class Instance(object):
131
  """Abstraction for a Virtual Machine instance.
132

133
  """
134
  def __init__(self, name, status, autostart, snodes):
135
    self.name = name
136
    self.status = status
137
    self.autostart = autostart
138
    self.snodes = snodes
139

    
140
  def Restart(self, cl):
141
    """Encapsulates the start of an instance.
142

143
    """
144
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145
    cli.SubmitOpCode(op, cl=cl)
146

    
147
  def ActivateDisks(self, cl):
148
    """Encapsulates the activation of all disks of an instance.
149

150
    """
151
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152
    cli.SubmitOpCode(op, cl=cl)
153

    
154

    
155
class Node:
156
  """Data container representing cluster node.
157

158
  """
159
  def __init__(self, name, bootid, offline, secondaries):
160
    """Initializes this class.
161

162
    """
163
    self.name = name
164
    self.bootid = bootid
165
    self.offline = offline
166
    self.secondaries = secondaries
167

    
168

    
169
def _CheckInstances(cl, notepad, instances):
170
  """Make a pass over the list of instances, restarting downed ones.
171

172
  """
173
  notepad.MaintainInstanceList(instances.keys())
174

    
175
  started = set()
176

    
177
  for inst in instances.values():
178
    if inst.status in BAD_STATES:
179
      n = notepad.NumberOfRestartAttempts(inst.name)
180

    
181
      if n > MAXTRIES:
182
        logging.warning("Not restarting instance '%s', retries exhausted",
183
                        inst.name)
184
        continue
185

    
186
      if n == MAXTRIES:
187
        notepad.RecordRestartAttempt(inst.name)
188
        logging.error("Could not restart instance '%s' after %s attempts,"
189
                      " giving up", inst.name, MAXTRIES)
190
        continue
191

    
192
      try:
193
        logging.info("Restarting instance '%s' (attempt #%s)",
194
                     inst.name, n + 1)
195
        inst.Restart(cl)
196
      except Exception: # pylint: disable=W0703
197
        logging.exception("Error while restarting instance '%s'", inst.name)
198
      else:
199
        started.add(inst.name)
200

    
201
      notepad.RecordRestartAttempt(inst.name)
202

    
203
    else:
204
      if notepad.NumberOfRestartAttempts(inst.name):
205
        notepad.RemoveInstance(inst.name)
206
        if inst.status not in HELPLESS_STATES:
207
          logging.info("Restart of instance '%s' succeeded", inst.name)
208

    
209
  return started
210

    
211

    
212
def _CheckDisks(cl, notepad, nodes, instances, started):
213
  """Check all nodes for restarted ones.
214

215
  """
216
  check_nodes = []
217

    
218
  for node in nodes.values():
219
    old = notepad.GetNodeBootID(node.name)
220
    if not node.bootid:
221
      # Bad node, not returning a boot id
222
      if not node.offline:
223
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
224
                      node.name)
225
      continue
226

    
227
    if old != node.bootid:
228
      # Node's boot ID has changed, probably through a reboot
229
      check_nodes.append(node)
230

    
231
  if check_nodes:
232
    # Activate disks for all instances with any of the checked nodes as a
233
    # secondary node.
234
    for node in check_nodes:
235
      for instance_name in node.secondaries:
236
        try:
237
          inst = instances[instance_name]
238
        except KeyError:
239
          logging.info("Can't find instance '%s', maybe it was ignored",
240
                       instance_name)
241
          continue
242

    
243
        if not inst.autostart:
244
          logging.info("Skipping disk activation for non-autostart"
245
                       " instance '%s'", inst.name)
246
          continue
247

    
248
        if inst.name in started:
249
          # we already tried to start the instance, which should have
250
          # activated its drives (if they can be at all)
251
          logging.debug("Skipping disk activation for instance '%s' as"
252
                        " it was already started", inst.name)
253
          continue
254

    
255
        try:
256
          logging.info("Activating disks for instance '%s'", inst.name)
257
          inst.ActivateDisks(cl)
258
        except Exception: # pylint: disable=W0703
259
          logging.exception("Error while activating disks for instance '%s'",
260
                            inst.name)
261

    
262
    # Keep changed boot IDs
263
    for node in check_nodes:
264
      notepad.SetNodeBootID(node.name, node.bootid)
265

    
266

    
267
def _CheckForOfflineNodes(nodes, instance):
268
  """Checks if given instances has any secondary in offline status.
269

270
  @param instance: The instance object
271
  @return: True if any of the secondary is offline, False otherwise
272

273
  """
274
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275

    
276

    
277
def _VerifyDisks(cl, uuid, nodes, instances):
278
  """Run a per-group "gnt-cluster verify-disks".
279

280
  """
281
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282
  ((_, offline_disk_instances, _), ) = \
283
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284
  cl.ArchiveJob(job_id)
285

    
286
  if not offline_disk_instances:
287
    # nothing to do
288
    logging.debug("Verify-disks reported no offline disks, nothing to do")
289
    return
290

    
291
  logging.debug("Will activate disks for instance(s) %s",
292
                utils.CommaJoin(offline_disk_instances))
293

    
294
  # We submit only one job, and wait for it. Not optimal, but this puts less
295
  # load on the job queue.
296
  job = []
297
  for name in offline_disk_instances:
298
    try:
299
      inst = instances[name]
300
    except KeyError:
301
      logging.info("Can't find instance '%s', maybe it was ignored", name)
302
      continue
303

    
304
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305
      logging.info("Skipping instance '%s' because it is in a helpless state or"
306
                   " has offline secondaries", name)
307
      continue
308

    
309
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310

    
311
  if job:
312
    job_id = cli.SendJob(job, cl=cl)
313

    
314
    try:
315
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316
    except Exception: # pylint: disable=W0703
317
      logging.exception("Error while activating disks")
318

    
319

    
320
def IsRapiResponding(hostname):
321
  """Connects to RAPI port and does a simple test.
322

323
  Connects to RAPI port of hostname and does a simple test. At this time, the
324
  test is GetVersion.
325

326
  @type hostname: string
327
  @param hostname: hostname of the node to connect to.
328
  @rtype: bool
329
  @return: Whether RAPI is working properly
330

331
  """
332
  curl_config = rapi.client.GenericCurlConfig()
333
  rapi_client = rapi.client.GanetiRapiClient(hostname,
334
                                             curl_config_fn=curl_config)
335
  try:
336
    master_version = rapi_client.GetVersion()
337
  except rapi.client.CertificateError, err:
338
    logging.warning("RAPI certificate error: %s", err)
339
    return False
340
  except rapi.client.GanetiApiError, err:
341
    logging.warning("RAPI error: %s", err)
342
    return False
343
  else:
344
    logging.debug("Reported RAPI version %s", master_version)
345
    return master_version == constants.RAPI_VERSION
346

    
347

    
348
def ParseOptions():
349
  """Parse the command line options.
350

351
  @return: (options, args) as from OptionParser.parse_args()
352

353
  """
354
  parser = OptionParser(description="Ganeti cluster watcher",
355
                        usage="%prog [-d]",
356
                        version="%%prog (ganeti) %s" %
357
                        constants.RELEASE_VERSION)
358

    
359
  parser.add_option(cli.DEBUG_OPT)
360
  parser.add_option(cli.NODEGROUP_OPT)
361
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362
                    help="Autoarchive jobs older than this age (default"
363
                          " 6 hours)")
364
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365
                    action="store_true", help="Ignore cluster pause setting")
366
  parser.add_option("--wait-children", dest="wait_children",
367
                    action="store_true", help="Wait for child processes")
368
  parser.add_option("--no-wait-children", dest="wait_children",
369
                    action="store_false", help="Don't wait for child processes")
370
  # See optparse documentation for why default values are not set by options
371
  parser.set_defaults(wait_children=True)
372
  options, args = parser.parse_args()
373
  options.job_age = cli.ParseTimespec(options.job_age)
374

    
375
  if args:
376
    parser.error("No arguments expected")
377

    
378
  return (options, args)
379

    
380

    
381
def _WriteInstanceStatus(filename, data):
382
  """Writes the per-group instance status file.
383

384
  The entries are sorted.
385

386
  @type filename: string
387
  @param filename: Path to instance status file
388
  @type data: list of tuple; (instance name as string, status as string)
389
  @param data: Instance name and status
390

391
  """
392
  logging.debug("Updating instance status file '%s' with %s instances",
393
                filename, len(data))
394

    
395
  utils.WriteFile(filename,
396
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
397
                                   sorted(data))))
398

    
399

    
400
def _UpdateInstanceStatus(filename, instances):
401
  """Writes an instance status file from L{Instance} objects.
402

403
  @type filename: string
404
  @param filename: Path to status file
405
  @type instances: list of L{Instance}
406

407
  """
408
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
409
                                  for inst in instances])
410

    
411

    
412
class _StatCb:
413
  """Helper to store file handle's C{fstat}.
414

415
  """
416
  def __init__(self):
417
    """Initializes this class.
418

419
    """
420
    self.st = None
421

    
422
  def __call__(self, fh):
423
    """Calls C{fstat} on file handle.
424

425
    """
426
    self.st = os.fstat(fh.fileno())
427

    
428

    
429
def _ReadInstanceStatus(filename):
430
  """Reads an instance status file.
431

432
  @type filename: string
433
  @param filename: Path to status file
434
  @rtype: tuple; (None or number, list of lists containing instance name and
435
    status)
436
  @return: File's mtime and instance status contained in the file; mtime is
437
    C{None} if file can't be read
438

439
  """
440
  logging.debug("Reading per-group instance status from '%s'", filename)
441

    
442
  statcb = _StatCb()
443
  try:
444
    content = utils.ReadFile(filename, preread=statcb)
445
  except EnvironmentError, err:
446
    if err.errno == errno.ENOENT:
447
      logging.error("Can't read '%s', does not exist (yet)", filename)
448
    else:
449
      logging.exception("Unable to read '%s', ignoring", filename)
450
    return (None, None)
451
  else:
452
    return (statcb.st.st_mtime, [line.split(None, 1)
453
                                 for line in content.splitlines()])
454

    
455

    
456
def _MergeInstanceStatus(filename, pergroup_filename, groups):
457
  """Merges all per-group instance status files into a global one.
458

459
  @type filename: string
460
  @param filename: Path to global instance status file
461
  @type pergroup_filename: string
462
  @param pergroup_filename: Path to per-group status files, must contain "%s"
463
    to be replaced with group UUID
464
  @type groups: sequence
465
  @param groups: UUIDs of known groups
466

467
  """
468
  # Lock global status file in exclusive mode
469
  lock = utils.FileLock.Open(filename)
470
  try:
471
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
472
  except errors.LockError, err:
473
    # All per-group processes will lock and update the file. None of them
474
    # should take longer than 10 seconds (the value of
475
    # INSTANCE_STATUS_LOCK_TIMEOUT).
476
    logging.error("Can't acquire lock on instance status file '%s', not"
477
                  " updating: %s", filename, err)
478
    return
479

    
480
  logging.debug("Acquired exclusive lock on '%s'", filename)
481

    
482
  data = {}
483

    
484
  # Load instance status from all groups
485
  for group_uuid in groups:
486
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
487

    
488
    if mtime is not None:
489
      for (instance_name, status) in instdata:
490
        data.setdefault(instance_name, []).append((mtime, status))
491

    
492
  # Select last update based on file mtime
493
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
494
                for (instance_name, status) in data.items()]
495

    
496
  # Write the global status file. Don't touch file after it's been
497
  # updated--there is no lock anymore.
498
  _WriteInstanceStatus(filename, inststatus)
499

    
500

    
501
def GetLuxiClient(try_restart):
502
  """Tries to connect to the master daemon.
503

504
  @type try_restart: bool
505
  @param try_restart: Whether to attempt to restart the master daemon
506

507
  """
508
  try:
509
    return cli.GetClient()
510
  except errors.OpPrereqError, err:
511
    # this is, from cli.GetClient, a not-master case
512
    raise NotMasterError("Not on master node (%s)" % err)
513

    
514
  except luxi.NoMasterError, err:
515
    if not try_restart:
516
      raise
517

    
518
    logging.warning("Master daemon seems to be down (%s), trying to restart",
519
                    err)
520

    
521
    if not utils.EnsureDaemon(constants.MASTERD):
522
      raise errors.GenericError("Can't start the master daemon")
523

    
524
    # Retry the connection
525
    return cli.GetClient()
526

    
527

    
528
def _StartGroupChildren(cl, wait):
529
  """Starts a new instance of the watcher for every node group.
530

531
  """
532
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
533
                        for arg in sys.argv)
534

    
535
  result = cl.QueryGroups([], ["name", "uuid"], False)
536

    
537
  children = []
538

    
539
  for (idx, (name, uuid)) in enumerate(result):
540
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
541

    
542
    if idx > 0:
543
      # Let's not kill the system
544
      time.sleep(CHILD_PROCESS_DELAY)
545

    
546
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
547
                  name, uuid, args)
548

    
549
    try:
550
      # TODO: Should utils.StartDaemon be used instead?
551
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
552
    except Exception: # pylint: disable=W0703
553
      logging.exception("Failed to start child for group '%s' (%s)",
554
                        name, uuid)
555
    else:
556
      logging.debug("Started with PID %s", pid)
557
      children.append(pid)
558

    
559
  if wait:
560
    for pid in children:
561
      logging.debug("Waiting for child PID %s", pid)
562
      try:
563
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
564
      except EnvironmentError, err:
565
        result = str(err)
566

    
567
      logging.debug("Child PID %s exited with status %s", pid, result)
568

    
569

    
570
def _ArchiveJobs(cl, age):
571
  """Archives old jobs.
572

573
  """
574
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
575
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
576

    
577

    
578
def _CheckMaster(cl):
579
  """Ensures current host is master node.
580

581
  """
582
  (master, ) = cl.QueryConfigValues(["master_node"])
583
  if master != netutils.Hostname.GetSysName():
584
    raise NotMasterError("This is not the master node")
585

    
586

    
587
@rapi.client.UsesRapiClient
588
def _GlobalWatcher(opts):
589
  """Main function for global watcher.
590

591
  At the end child processes are spawned for every node group.
592

593
  """
594
  StartNodeDaemons()
595
  RunWatcherHooks()
596

    
597
  # Run node maintenance in all cases, even if master, so that old masters can
598
  # be properly cleaned up
599
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
600
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
601

    
602
  try:
603
    client = GetLuxiClient(True)
604
  except NotMasterError:
605
    # Don't proceed on non-master nodes
606
    return constants.EXIT_SUCCESS
607

    
608
  # we are on master now
609
  utils.EnsureDaemon(constants.RAPI)
610

    
611
  # If RAPI isn't responding to queries, try one restart
612
  logging.debug("Attempting to talk to remote API on %s",
613
                constants.IP4_ADDRESS_LOCALHOST)
614
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
615
    logging.warning("Couldn't get answer from remote API, restaring daemon")
616
    utils.StopDaemon(constants.RAPI)
617
    utils.EnsureDaemon(constants.RAPI)
618
    logging.debug("Second attempt to talk to remote API")
619
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
620
      logging.fatal("RAPI is not responding")
621
  logging.debug("Successfully talked to remote API")
622

    
623
  _CheckMaster(client)
624
  _ArchiveJobs(client, opts.job_age)
625

    
626
  # Spawn child processes for all node groups
627
  _StartGroupChildren(client, opts.wait_children)
628

    
629
  return constants.EXIT_SUCCESS
630

    
631

    
632
def _GetGroupData(cl, uuid):
633
  """Retrieves instances and nodes per node group.
634

635
  """
636
  job = [
637
    # Get all primary instances in group
638
    opcodes.OpQuery(what=constants.QR_INSTANCE,
639
                    fields=["name", "status", "admin_state", "snodes",
640
                            "pnode.group.uuid", "snodes.group.uuid"],
641
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
642
                    use_locking=True),
643

    
644
    # Get all nodes in group
645
    opcodes.OpQuery(what=constants.QR_NODE,
646
                    fields=["name", "bootid", "offline"],
647
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
648
                    use_locking=True),
649
    ]
650

    
651
  job_id = cl.SubmitJob(job)
652
  results = map(objects.QueryResponse.FromDict,
653
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
654
  cl.ArchiveJob(job_id)
655

    
656
  results_data = map(operator.attrgetter("data"), results)
657

    
658
  # Ensure results are tuples with two values
659
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
660

    
661
  # Extract values ignoring result status
662
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
663
                                 for values in res]
664
                                for res in results_data]
665

    
666
  secondaries = {}
667
  instances = []
668

    
669
  # Load all instances
670
  for (name, status, autostart, snodes, pnode_group_uuid,
671
       snodes_group_uuid) in raw_instances:
672
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
673
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
674
                    " groups %s", name, pnode_group_uuid,
675
                    utils.CommaJoin(snodes_group_uuid))
676
    else:
677
      instances.append(Instance(name, status, autostart, snodes))
678

    
679
      for node in snodes:
680
        secondaries.setdefault(node, set()).add(name)
681

    
682
  # Load all nodes
683
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
684
           for (name, bootid, offline) in raw_nodes]
685

    
686
  return (dict((node.name, node) for node in nodes),
687
          dict((inst.name, inst) for inst in instances))
688

    
689

    
690
def _LoadKnownGroups():
691
  """Returns a list of all node groups known by L{ssconf}.
692

693
  """
694
  groups = ssconf.SimpleStore().GetNodegroupList()
695

    
696
  result = list(line.split(None, 1)[0] for line in groups
697
                if line.strip())
698

    
699
  if not compat.all(map(utils.UUID_RE.match, result)):
700
    raise errors.GenericError("Ssconf contains invalid group UUID")
701

    
702
  return result
703

    
704

    
705
def _GroupWatcher(opts):
706
  """Main function for per-group watcher process.
707

708
  """
709
  group_uuid = opts.nodegroup.lower()
710

    
711
  if not utils.UUID_RE.match(group_uuid):
712
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
713
                              " got '%s'" %
714
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
715

    
716
  logging.info("Watcher for node group '%s'", group_uuid)
717

    
718
  known_groups = _LoadKnownGroups()
719

    
720
  # Check if node group is known
721
  if group_uuid not in known_groups:
722
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
723
                              group_uuid)
724

    
725
  # Group UUID has been verified and should not contain any dangerous characters
726
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
727
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
728

    
729
  logging.debug("Using state file %s", state_path)
730

    
731
  # Global watcher
732
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
733
  if not statefile:
734
    return constants.EXIT_FAILURE
735

    
736
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
737
  try:
738
    # Connect to master daemon
739
    client = GetLuxiClient(False)
740

    
741
    _CheckMaster(client)
742

    
743
    (nodes, instances) = _GetGroupData(client, group_uuid)
744

    
745
    # Update per-group instance status file
746
    _UpdateInstanceStatus(inst_status_path, instances.values())
747

    
748
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
749
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
750
                         known_groups)
751

    
752
    started = _CheckInstances(client, notepad, instances)
753
    _CheckDisks(client, notepad, nodes, instances, started)
754
    _VerifyDisks(client, group_uuid, nodes, instances)
755
  except Exception, err:
756
    logging.info("Not updating status file due to failure: %s", err)
757
    raise
758
  else:
759
    # Save changes for next run
760
    notepad.Save(state_path)
761

    
762
  return constants.EXIT_SUCCESS
763

    
764

    
765
def Main():
766
  """Main function.
767

768
  """
769
  (options, _) = ParseOptions()
770

    
771
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
772
                     debug=options.debug, stderr_logging=options.debug)
773

    
774
  if ShouldPause() and not options.ignore_pause:
775
    logging.debug("Pause has been set, exiting")
776
    return constants.EXIT_SUCCESS
777

    
778
  # Try to acquire global watcher lock in shared mode
779
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
780
  try:
781
    lock.Shared(blocking=False)
782
  except (EnvironmentError, errors.LockError), err:
783
    logging.error("Can't acquire lock on %s: %s",
784
                  constants.WATCHER_LOCK_FILE, err)
785
    return constants.EXIT_SUCCESS
786

    
787
  if options.nodegroup is None:
788
    fn = _GlobalWatcher
789
  else:
790
    # Per-nodegroup watcher
791
    fn = _GroupWatcher
792

    
793
  try:
794
    return fn(options)
795
  except (SystemExit, KeyboardInterrupt):
796
    raise
797
  except NotMasterError:
798
    logging.debug("Not master, exiting")
799
    return constants.EXIT_NOTMASTER
800
  except errors.ResolverError, err:
801
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
802
    return constants.EXIT_NODESETUP_ERROR
803
  except errors.JobQueueFull:
804
    logging.error("Job queue is full, can't query cluster state")
805
  except errors.JobQueueDrainError:
806
    logging.error("Job queue is drained, can't maintain cluster state")
807
  except Exception, err:
808
    logging.exception(str(err))
809
    return constants.EXIT_FAILURE
810

    
811
  return constants.EXIT_SUCCESS