Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 6177890b

History | View | Annotate | Download (24.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104
def RunWatcherHooks():
105
  """Run the watcher hooks.
106

107
  """
108
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109
                             constants.HOOKS_NAME_WATCHER)
110
  if not os.path.isdir(hooks_dir):
111
    return
112

    
113
  try:
114
    results = utils.RunParts(hooks_dir)
115
  except Exception, err: # pylint: disable=W0703
116
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
117
    return
118

    
119
  for (relname, status, runresult) in results:
120
    if status == constants.RUNPARTS_SKIP:
121
      logging.debug("Watcher hook %s: skipped", relname)
122
    elif status == constants.RUNPARTS_ERR:
123
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124
    elif status == constants.RUNPARTS_RUN:
125
      if runresult.failed:
126
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127
                        relname, runresult.exit_code, runresult.output)
128
      else:
129
        logging.debug("Watcher hook %s: success (output: %s)", relname,
130
                      runresult.output)
131
    else:
132
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133
                                   status)
134

    
135

    
136
class Instance(object):
137
  """Abstraction for a Virtual Machine instance.
138

139
  """
140
  def __init__(self, name, status, disks_active, snodes):
141
    self.name = name
142
    self.status = status
143
    self.disks_active = disks_active
144
    self.snodes = snodes
145

    
146
  def Restart(self, cl):
147
    """Encapsulates the start of an instance.
148

149
    """
150
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151
    cli.SubmitOpCode(op, cl=cl)
152

    
153
  def ActivateDisks(self, cl):
154
    """Encapsulates the activation of all disks of an instance.
155

156
    """
157
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158
    cli.SubmitOpCode(op, cl=cl)
159

    
160

    
161
class Node:
162
  """Data container representing cluster node.
163

164
  """
165
  def __init__(self, name, bootid, offline, secondaries):
166
    """Initializes this class.
167

168
    """
169
    self.name = name
170
    self.bootid = bootid
171
    self.offline = offline
172
    self.secondaries = secondaries
173

    
174

    
175
def _CheckInstances(cl, notepad, instances):
176
  """Make a pass over the list of instances, restarting downed ones.
177

178
  """
179
  notepad.MaintainInstanceList(instances.keys())
180

    
181
  started = set()
182

    
183
  for inst in instances.values():
184
    if inst.status in BAD_STATES:
185
      n = notepad.NumberOfRestartAttempts(inst.name)
186

    
187
      if n > MAXTRIES:
188
        logging.warning("Not restarting instance '%s', retries exhausted",
189
                        inst.name)
190
        continue
191

    
192
      if n == MAXTRIES:
193
        notepad.RecordRestartAttempt(inst.name)
194
        logging.error("Could not restart instance '%s' after %s attempts,"
195
                      " giving up", inst.name, MAXTRIES)
196
        continue
197

    
198
      try:
199
        logging.info("Restarting instance '%s' (attempt #%s)",
200
                     inst.name, n + 1)
201
        inst.Restart(cl)
202
      except Exception: # pylint: disable=W0703
203
        logging.exception("Error while restarting instance '%s'", inst.name)
204
      else:
205
        started.add(inst.name)
206

    
207
      notepad.RecordRestartAttempt(inst.name)
208

    
209
    else:
210
      if notepad.NumberOfRestartAttempts(inst.name):
211
        notepad.RemoveInstance(inst.name)
212
        if inst.status not in HELPLESS_STATES:
213
          logging.info("Restart of instance '%s' succeeded", inst.name)
214

    
215
  return started
216

    
217

    
218
def _CheckDisks(cl, notepad, nodes, instances, started):
219
  """Check all nodes for restarted ones.
220

221
  """
222
  check_nodes = []
223

    
224
  for node in nodes.values():
225
    old = notepad.GetNodeBootID(node.name)
226
    if not node.bootid:
227
      # Bad node, not returning a boot id
228
      if not node.offline:
229
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230
                      node.name)
231
      continue
232

    
233
    if old != node.bootid:
234
      # Node's boot ID has changed, probably through a reboot
235
      check_nodes.append(node)
236

    
237
  if check_nodes:
238
    # Activate disks for all instances with any of the checked nodes as a
239
    # secondary node.
240
    for node in check_nodes:
241
      for instance_name in node.secondaries:
242
        try:
243
          inst = instances[instance_name]
244
        except KeyError:
245
          logging.info("Can't find instance '%s', maybe it was ignored",
246
                       instance_name)
247
          continue
248

    
249
        if not inst.disks_active:
250
          logging.info("Skipping disk activation for instance with not"
251
                       " activated disks '%s'", inst.name)
252
          continue
253

    
254
        if inst.name in started:
255
          # we already tried to start the instance, which should have
256
          # activated its drives (if they can be at all)
257
          logging.debug("Skipping disk activation for instance '%s' as"
258
                        " it was already started", inst.name)
259
          continue
260

    
261
        try:
262
          logging.info("Activating disks for instance '%s'", inst.name)
263
          inst.ActivateDisks(cl)
264
        except Exception: # pylint: disable=W0703
265
          logging.exception("Error while activating disks for instance '%s'",
266
                            inst.name)
267

    
268
    # Keep changed boot IDs
269
    for node in check_nodes:
270
      notepad.SetNodeBootID(node.name, node.bootid)
271

    
272

    
273
def _CheckForOfflineNodes(nodes, instance):
274
  """Checks if given instances has any secondary in offline status.
275

276
  @param instance: The instance object
277
  @return: True if any of the secondary is offline, False otherwise
278

279
  """
280
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281

    
282

    
283
def _VerifyDisks(cl, uuid, nodes, instances):
284
  """Run a per-group "gnt-cluster verify-disks".
285

286
  """
287
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288
  ((_, offline_disk_instances, _), ) = \
289
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290
  cl.ArchiveJob(job_id)
291

    
292
  if not offline_disk_instances:
293
    # nothing to do
294
    logging.debug("Verify-disks reported no offline disks, nothing to do")
295
    return
296

    
297
  logging.debug("Will activate disks for instance(s) %s",
298
                utils.CommaJoin(offline_disk_instances))
299

    
300
  # We submit only one job, and wait for it. Not optimal, but this puts less
301
  # load on the job queue.
302
  job = []
303
  for name in offline_disk_instances:
304
    try:
305
      inst = instances[name]
306
    except KeyError:
307
      logging.info("Can't find instance '%s', maybe it was ignored", name)
308
      continue
309

    
310
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311
      logging.info("Skipping instance '%s' because it is in a helpless state"
312
                   " or has offline secondaries", name)
313
      continue
314

    
315
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
316

    
317
  if job:
318
    job_id = cli.SendJob(job, cl=cl)
319

    
320
    try:
321
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322
    except Exception: # pylint: disable=W0703
323
      logging.exception("Error while activating disks")
324

    
325

    
326
def IsRapiResponding(hostname):
327
  """Connects to RAPI port and does a simple test.
328

329
  Connects to RAPI port of hostname and does a simple test. At this time, the
330
  test is GetVersion.
331

332
  If RAPI responds with error code "401 Unauthorized", the test is successful,
333
  because the aim of this function is to assess whether RAPI is responding, not
334
  if it is accessible.
335

336
  @type hostname: string
337
  @param hostname: hostname of the node to connect to.
338
  @rtype: bool
339
  @return: Whether RAPI is working properly
340

341
  """
342
  curl_config = rapi.client.GenericCurlConfig()
343
  rapi_client = rapi.client.GanetiRapiClient(hostname,
344
                                             curl_config_fn=curl_config)
345
  try:
346
    master_version = rapi_client.GetVersion()
347
  except rapi.client.CertificateError, err:
348
    logging.warning("RAPI certificate error: %s", err)
349
    return False
350
  except rapi.client.GanetiApiError, err:
351
    if err.code == 401:
352
      # Unauthorized, but RAPI is alive and responding
353
      return True
354
    else:
355
      logging.warning("RAPI error: %s", err)
356
      return False
357
  else:
358
    logging.debug("Reported RAPI version %s", master_version)
359
    return master_version == constants.RAPI_VERSION
360

    
361

    
362
def ParseOptions():
363
  """Parse the command line options.
364

365
  @return: (options, args) as from OptionParser.parse_args()
366

367
  """
368
  parser = OptionParser(description="Ganeti cluster watcher",
369
                        usage="%prog [-d]",
370
                        version="%%prog (ganeti) %s" %
371
                        constants.RELEASE_VERSION)
372

    
373
  parser.add_option(cli.DEBUG_OPT)
374
  parser.add_option(cli.NODEGROUP_OPT)
375
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
376
                    help="Autoarchive jobs older than this age (default"
377
                          " 6 hours)")
378
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
379
                    action="store_true", help="Ignore cluster pause setting")
380
  parser.add_option("--wait-children", dest="wait_children",
381
                    action="store_true", help="Wait for child processes")
382
  parser.add_option("--no-wait-children", dest="wait_children",
383
                    action="store_false",
384
                    help="Don't wait for child processes")
385
  # See optparse documentation for why default values are not set by options
386
  parser.set_defaults(wait_children=True)
387
  options, args = parser.parse_args()
388
  options.job_age = cli.ParseTimespec(options.job_age)
389

    
390
  if args:
391
    parser.error("No arguments expected")
392

    
393
  return (options, args)
394

    
395

    
396
def _WriteInstanceStatus(filename, data):
397
  """Writes the per-group instance status file.
398

399
  The entries are sorted.
400

401
  @type filename: string
402
  @param filename: Path to instance status file
403
  @type data: list of tuple; (instance name as string, status as string)
404
  @param data: Instance name and status
405

406
  """
407
  logging.debug("Updating instance status file '%s' with %s instances",
408
                filename, len(data))
409

    
410
  utils.WriteFile(filename,
411
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
412
                                   sorted(data))))
413

    
414

    
415
def _UpdateInstanceStatus(filename, instances):
416
  """Writes an instance status file from L{Instance} objects.
417

418
  @type filename: string
419
  @param filename: Path to status file
420
  @type instances: list of L{Instance}
421

422
  """
423
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
424
                                  for inst in instances])
425

    
426

    
427
def _ReadInstanceStatus(filename):
428
  """Reads an instance status file.
429

430
  @type filename: string
431
  @param filename: Path to status file
432
  @rtype: tuple; (None or number, list of lists containing instance name and
433
    status)
434
  @return: File's mtime and instance status contained in the file; mtime is
435
    C{None} if file can't be read
436

437
  """
438
  logging.debug("Reading per-group instance status from '%s'", filename)
439

    
440
  statcb = utils.FileStatHelper()
441
  try:
442
    content = utils.ReadFile(filename, preread=statcb)
443
  except EnvironmentError, err:
444
    if err.errno == errno.ENOENT:
445
      logging.error("Can't read '%s', does not exist (yet)", filename)
446
    else:
447
      logging.exception("Unable to read '%s', ignoring", filename)
448
    return (None, None)
449
  else:
450
    return (statcb.st.st_mtime, [line.split(None, 1)
451
                                 for line in content.splitlines()])
452

    
453

    
454
def _MergeInstanceStatus(filename, pergroup_filename, groups):
455
  """Merges all per-group instance status files into a global one.
456

457
  @type filename: string
458
  @param filename: Path to global instance status file
459
  @type pergroup_filename: string
460
  @param pergroup_filename: Path to per-group status files, must contain "%s"
461
    to be replaced with group UUID
462
  @type groups: sequence
463
  @param groups: UUIDs of known groups
464

465
  """
466
  # Lock global status file in exclusive mode
467
  lock = utils.FileLock.Open(filename)
468
  try:
469
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
470
  except errors.LockError, err:
471
    # All per-group processes will lock and update the file. None of them
472
    # should take longer than 10 seconds (the value of
473
    # INSTANCE_STATUS_LOCK_TIMEOUT).
474
    logging.error("Can't acquire lock on instance status file '%s', not"
475
                  " updating: %s", filename, err)
476
    return
477

    
478
  logging.debug("Acquired exclusive lock on '%s'", filename)
479

    
480
  data = {}
481

    
482
  # Load instance status from all groups
483
  for group_uuid in groups:
484
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
485

    
486
    if mtime is not None:
487
      for (instance_name, status) in instdata:
488
        data.setdefault(instance_name, []).append((mtime, status))
489

    
490
  # Select last update based on file mtime
491
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
492
                for (instance_name, status) in data.items()]
493

    
494
  # Write the global status file. Don't touch file after it's been
495
  # updated--there is no lock anymore.
496
  _WriteInstanceStatus(filename, inststatus)
497

    
498

    
499
def GetLuxiClient(try_restart):
500
  """Tries to connect to the master daemon.
501

502
  @type try_restart: bool
503
  @param try_restart: Whether to attempt to restart the master daemon
504

505
  """
506
  try:
507
    return cli.GetClient()
508
  except errors.OpPrereqError, err:
509
    # this is, from cli.GetClient, a not-master case
510
    raise NotMasterError("Not on master node (%s)" % err)
511

    
512
  except luxi.NoMasterError, err:
513
    if not try_restart:
514
      raise
515

    
516
    logging.warning("Master daemon seems to be down (%s), trying to restart",
517
                    err)
518

    
519
    if not utils.EnsureDaemon(constants.MASTERD):
520
      raise errors.GenericError("Can't start the master daemon")
521

    
522
    # Retry the connection
523
    return cli.GetClient()
524

    
525

    
526
def _StartGroupChildren(cl, wait):
527
  """Starts a new instance of the watcher for every node group.
528

529
  """
530
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
531
                        for arg in sys.argv)
532

    
533
  result = cl.QueryGroups([], ["name", "uuid"], False)
534

    
535
  children = []
536

    
537
  for (idx, (name, uuid)) in enumerate(result):
538
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
539

    
540
    if idx > 0:
541
      # Let's not kill the system
542
      time.sleep(CHILD_PROCESS_DELAY)
543

    
544
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
545
                  name, uuid, args)
546

    
547
    try:
548
      # TODO: Should utils.StartDaemon be used instead?
549
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
550
    except Exception: # pylint: disable=W0703
551
      logging.exception("Failed to start child for group '%s' (%s)",
552
                        name, uuid)
553
    else:
554
      logging.debug("Started with PID %s", pid)
555
      children.append(pid)
556

    
557
  if wait:
558
    for pid in children:
559
      logging.debug("Waiting for child PID %s", pid)
560
      try:
561
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
562
      except EnvironmentError, err:
563
        result = str(err)
564

    
565
      logging.debug("Child PID %s exited with status %s", pid, result)
566

    
567

    
568
def _ArchiveJobs(cl, age):
569
  """Archives old jobs.
570

571
  """
572
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
573
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
574

    
575

    
576
def _CheckMaster(cl):
577
  """Ensures current host is master node.
578

579
  """
580
  (master, ) = cl.QueryConfigValues(["master_node"])
581
  if master != netutils.Hostname.GetSysName():
582
    raise NotMasterError("This is not the master node")
583

    
584

    
585
@UsesRapiClient
586
def _GlobalWatcher(opts):
587
  """Main function for global watcher.
588

589
  At the end child processes are spawned for every node group.
590

591
  """
592
  StartNodeDaemons()
593
  RunWatcherHooks()
594

    
595
  # Run node maintenance in all cases, even if master, so that old masters can
596
  # be properly cleaned up
597
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
598
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
599

    
600
  try:
601
    client = GetLuxiClient(True)
602
  except NotMasterError:
603
    # Don't proceed on non-master nodes
604
    return constants.EXIT_SUCCESS
605

    
606
  # we are on master now
607
  utils.EnsureDaemon(constants.RAPI)
608

    
609
  # If RAPI isn't responding to queries, try one restart
610
  logging.debug("Attempting to talk to remote API on %s",
611
                constants.IP4_ADDRESS_LOCALHOST)
612
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
613
    logging.warning("Couldn't get answer from remote API, restaring daemon")
614
    utils.StopDaemon(constants.RAPI)
615
    utils.EnsureDaemon(constants.RAPI)
616
    logging.debug("Second attempt to talk to remote API")
617
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
618
      logging.fatal("RAPI is not responding")
619
  logging.debug("Successfully talked to remote API")
620

    
621
  _CheckMaster(client)
622
  _ArchiveJobs(client, opts.job_age)
623

    
624
  # Spawn child processes for all node groups
625
  _StartGroupChildren(client, opts.wait_children)
626

    
627
  return constants.EXIT_SUCCESS
628

    
629

    
630
def _GetGroupData(cl, uuid):
631
  """Retrieves instances and nodes per node group.
632

633
  """
634
  job = [
635
    # Get all primary instances in group
636
    opcodes.OpQuery(what=constants.QR_INSTANCE,
637
                    fields=["name", "status", "disks_active", "snodes",
638
                            "pnode.group.uuid", "snodes.group.uuid"],
639
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
640
                    use_locking=True),
641

    
642
    # Get all nodes in group
643
    opcodes.OpQuery(what=constants.QR_NODE,
644
                    fields=["name", "bootid", "offline"],
645
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
646
                    use_locking=True),
647
    ]
648

    
649
  job_id = cl.SubmitJob(job)
650
  results = map(objects.QueryResponse.FromDict,
651
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
652
  cl.ArchiveJob(job_id)
653

    
654
  results_data = map(operator.attrgetter("data"), results)
655

    
656
  # Ensure results are tuples with two values
657
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
658

    
659
  # Extract values ignoring result status
660
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
661
                                 for values in res]
662
                                for res in results_data]
663

    
664
  secondaries = {}
665
  instances = []
666

    
667
  # Load all instances
668
  for (name, status, disks_active, snodes, pnode_group_uuid,
669
       snodes_group_uuid) in raw_instances:
670
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
671
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
672
                    " groups %s", name, pnode_group_uuid,
673
                    utils.CommaJoin(snodes_group_uuid))
674
    else:
675
      instances.append(Instance(name, status, disks_active, snodes))
676

    
677
      for node in snodes:
678
        secondaries.setdefault(node, set()).add(name)
679

    
680
  # Load all nodes
681
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
682
           for (name, bootid, offline) in raw_nodes]
683

    
684
  return (dict((node.name, node) for node in nodes),
685
          dict((inst.name, inst) for inst in instances))
686

    
687

    
688
def _LoadKnownGroups():
689
  """Returns a list of all node groups known by L{ssconf}.
690

691
  """
692
  groups = ssconf.SimpleStore().GetNodegroupList()
693

    
694
  result = list(line.split(None, 1)[0] for line in groups
695
                if line.strip())
696

    
697
  if not compat.all(map(utils.UUID_RE.match, result)):
698
    raise errors.GenericError("Ssconf contains invalid group UUID")
699

    
700
  return result
701

    
702

    
703
def _GroupWatcher(opts):
704
  """Main function for per-group watcher process.
705

706
  """
707
  group_uuid = opts.nodegroup.lower()
708

    
709
  if not utils.UUID_RE.match(group_uuid):
710
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
711
                              " got '%s'" %
712
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
713

    
714
  logging.info("Watcher for node group '%s'", group_uuid)
715

    
716
  known_groups = _LoadKnownGroups()
717

    
718
  # Check if node group is known
719
  if group_uuid not in known_groups:
720
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
721
                              group_uuid)
722

    
723
  # Group UUID has been verified and should not contain any dangerous
724
  # characters
725
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
726
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
727

    
728
  logging.debug("Using state file %s", state_path)
729

    
730
  # Global watcher
731
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
732
  if not statefile:
733
    return constants.EXIT_FAILURE
734

    
735
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
736
  try:
737
    # Connect to master daemon
738
    client = GetLuxiClient(False)
739

    
740
    _CheckMaster(client)
741

    
742
    (nodes, instances) = _GetGroupData(client, group_uuid)
743

    
744
    # Update per-group instance status file
745
    _UpdateInstanceStatus(inst_status_path, instances.values())
746

    
747
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
748
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
749
                         known_groups)
750

    
751
    started = _CheckInstances(client, notepad, instances)
752
    _CheckDisks(client, notepad, nodes, instances, started)
753
    _VerifyDisks(client, group_uuid, nodes, instances)
754
  except Exception, err:
755
    logging.info("Not updating status file due to failure: %s", err)
756
    raise
757
  else:
758
    # Save changes for next run
759
    notepad.Save(state_path)
760

    
761
  return constants.EXIT_SUCCESS
762

    
763

    
764
def Main():
765
  """Main function.
766

767
  """
768
  (options, _) = ParseOptions()
769

    
770
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
771
                     debug=options.debug, stderr_logging=options.debug)
772

    
773
  if ShouldPause() and not options.ignore_pause:
774
    logging.debug("Pause has been set, exiting")
775
    return constants.EXIT_SUCCESS
776

    
777
  # Try to acquire global watcher lock in shared mode
778
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
779
  try:
780
    lock.Shared(blocking=False)
781
  except (EnvironmentError, errors.LockError), err:
782
    logging.error("Can't acquire lock on %s: %s",
783
                  pathutils.WATCHER_LOCK_FILE, err)
784
    return constants.EXIT_SUCCESS
785

    
786
  if options.nodegroup is None:
787
    fn = _GlobalWatcher
788
  else:
789
    # Per-nodegroup watcher
790
    fn = _GroupWatcher
791

    
792
  try:
793
    return fn(options)
794
  except (SystemExit, KeyboardInterrupt):
795
    raise
796
  except NotMasterError:
797
    logging.debug("Not master, exiting")
798
    return constants.EXIT_NOTMASTER
799
  except errors.ResolverError, err:
800
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
801
    return constants.EXIT_NODESETUP_ERROR
802
  except errors.JobQueueFull:
803
    logging.error("Job queue is full, can't query cluster state")
804
  except errors.JobQueueDrainError:
805
    logging.error("Job queue is drained, can't maintain cluster state")
806
  except Exception, err:
807
    logging.exception(str(err))
808
    return constants.EXIT_FAILURE
809

    
810
  return constants.EXIT_SUCCESS