Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ 9bb69bb5

History | View | Annotate | Download (24 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52

    
53
import ganeti.rapi.client # pylint: disable-msg=W0611
54

    
55
from ganeti.watcher import nodemaint
56
from ganeti.watcher import state
57

    
58

    
59
MAXTRIES = 5
60
BAD_STATES = frozenset([
61
  constants.INSTST_ERRORDOWN,
62
  ])
63
HELPLESS_STATES = frozenset([
64
  constants.INSTST_NODEDOWN,
65
  constants.INSTST_NODEOFFLINE,
66
  ])
67
NOTICE = "NOTICE"
68
ERROR = "ERROR"
69

    
70
#: Number of seconds to wait between starting child processes for node groups
71
CHILD_PROCESS_DELAY = 1.0
72

    
73
#: How many seconds to wait for instance status file lock
74
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
75

    
76

    
77
class NotMasterError(errors.GenericError):
78
  """Exception raised when this host is not the master."""
79

    
80

    
81
def ShouldPause():
82
  """Check whether we should pause.
83

84
  """
85
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
86

    
87

    
88
def StartNodeDaemons():
89
  """Start all the daemons that should be running on all nodes.
90

91
  """
92
  # on master or not, try to start the node daemon
93
  utils.EnsureDaemon(constants.NODED)
94
  # start confd as well. On non candidates it will be in disabled mode.
95
  utils.EnsureDaemon(constants.CONFD)
96

    
97

    
98
def RunWatcherHooks():
99
  """Run the watcher hooks.
100

101
  """
102
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103
                             constants.HOOKS_NAME_WATCHER)
104
  if not os.path.isdir(hooks_dir):
105
    return
106

    
107
  try:
108
    results = utils.RunParts(hooks_dir)
109
  except Exception: # pylint: disable-msg=W0703
110
    logging.exception("RunParts %s failed: %s", hooks_dir)
111
    return
112

    
113
  for (relname, status, runresult) in results:
114
    if status == constants.RUNPARTS_SKIP:
115
      logging.debug("Watcher hook %s: skipped", relname)
116
    elif status == constants.RUNPARTS_ERR:
117
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118
    elif status == constants.RUNPARTS_RUN:
119
      if runresult.failed:
120
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121
                        relname, runresult.exit_code, runresult.output)
122
      else:
123
        logging.debug("Watcher hook %s: success (output: %s)", relname,
124
                      runresult.output)
125
    else:
126
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
127
                                   status)
128

    
129

    
130
class Instance(object):
131
  """Abstraction for a Virtual Machine instance.
132

133
  """
134
  def __init__(self, name, status, autostart, snodes):
135
    self.name = name
136
    self.status = status
137
    self.autostart = autostart
138
    self.snodes = snodes
139

    
140
  def Restart(self, cl):
141
    """Encapsulates the start of an instance.
142

143
    """
144
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145
    cli.SubmitOpCode(op, cl=cl)
146

    
147
  def ActivateDisks(self, cl):
148
    """Encapsulates the activation of all disks of an instance.
149

150
    """
151
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152
    cli.SubmitOpCode(op, cl=cl)
153

    
154

    
155
class Node:
156
  """Data container representing cluster node.
157

158
  """
159
  def __init__(self, name, bootid, offline, secondaries):
160
    """Initializes this class.
161

162
    """
163
    self.name = name
164
    self.bootid = bootid
165
    self.offline = offline
166
    self.secondaries = secondaries
167

    
168

    
169
def _CheckInstances(cl, notepad, instances):
170
  """Make a pass over the list of instances, restarting downed ones.
171

172
  """
173
  notepad.MaintainInstanceList(instances.keys())
174

    
175
  started = set()
176

    
177
  for inst in instances.values():
178
    if inst.status in BAD_STATES:
179
      n = notepad.NumberOfRestartAttempts(inst.name)
180

    
181
      if n > MAXTRIES:
182
        logging.warning("Not restarting instance '%s', retries exhausted",
183
                        inst.name)
184
        continue
185

    
186
      if n == MAXTRIES:
187
        notepad.RecordRestartAttempt(inst.name)
188
        logging.error("Could not restart instance '%s' after %s attempts,"
189
                      " giving up", inst.name, MAXTRIES)
190
        continue
191

    
192
      try:
193
        logging.info("Restarting instance '%s' (attempt #%s)",
194
                     inst.name, n + 1)
195
        inst.Restart(cl)
196
      except Exception: # pylint: disable-msg=W0703
197
        logging.exception("Error while restarting instance '%s'", inst.name)
198
      else:
199
        started.add(inst.name)
200

    
201
      notepad.RecordRestartAttempt(inst.name)
202

    
203
    else:
204
      if notepad.NumberOfRestartAttempts(inst.name):
205
        notepad.RemoveInstance(inst.name)
206
        if inst.status not in HELPLESS_STATES:
207
          logging.info("Restart of instance '%s' succeeded", inst.name)
208

    
209
  return started
210

    
211

    
212
def _CheckDisks(cl, notepad, nodes, instances, started):
213
  """Check all nodes for restarted ones.
214

215
  """
216
  check_nodes = []
217

    
218
  for node in nodes.values():
219
    old = notepad.GetNodeBootID(node.name)
220
    if not node.bootid:
221
      # Bad node, not returning a boot id
222
      if not node.offline:
223
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
224
                      node.name)
225
      continue
226

    
227
    if old != node.bootid:
228
      # Node's boot ID has changed, probably through a reboot
229
      check_nodes.append(node)
230

    
231
  if check_nodes:
232
    # Activate disks for all instances with any of the checked nodes as a
233
    # secondary node.
234
    for node in check_nodes:
235
      for instance_name in node.secondaries:
236
        try:
237
          inst = instances[instance_name]
238
        except KeyError:
239
          logging.info("Can't find instance '%s', maybe it was ignored",
240
                       instance_name)
241
          continue
242

    
243
        if not inst.autostart:
244
          logging.info("Skipping disk activation for non-autostart"
245
                       " instance '%s'", inst.name)
246
          continue
247

    
248
        if inst.name in started:
249
          # we already tried to start the instance, which should have
250
          # activated its drives (if they can be at all)
251
          logging.debug("Skipping disk activation for instance '%s' as"
252
                        " it was already started", inst.name)
253
          continue
254

    
255
        try:
256
          logging.info("Activating disks for instance '%s'", inst.name)
257
          inst.ActivateDisks(cl)
258
        except Exception: # pylint: disable-msg=W0703
259
          logging.exception("Error while activating disks for instance '%s'",
260
                            inst.name)
261

    
262
    # Keep changed boot IDs
263
    for node in check_nodes:
264
      notepad.SetNodeBootID(node.name, node.bootid)
265

    
266

    
267
def _CheckForOfflineNodes(nodes, instance):
268
  """Checks if given instances has any secondary in offline status.
269

270
  @param instance: The instance object
271
  @return: True if any of the secondary is offline, False otherwise
272

273
  """
274
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
275

    
276

    
277
def _VerifyDisks(cl, uuid, nodes, instances):
278
  """Run a per-group "gnt-cluster verify-disks".
279

280
  """
281
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282
  ((_, offline_disk_instances, _), ) = \
283
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284
  cl.ArchiveJob(job_id)
285

    
286
  if not offline_disk_instances:
287
    # nothing to do
288
    logging.debug("Verify-disks reported no offline disks, nothing to do")
289
    return
290

    
291
  logging.debug("Will activate disks for instance(s) %s",
292
                utils.CommaJoin(offline_disk_instances))
293

    
294
  # We submit only one job, and wait for it. Not optimal, but this puts less
295
  # load on the job queue.
296
  job = []
297
  for name in offline_disk_instances:
298
    try:
299
      inst = instances[name]
300
    except KeyError:
301
      logging.info("Can't find instance '%s', maybe it was ignored", name)
302
      continue
303

    
304
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305
      logging.info("Skipping instance '%s' because it is in a helpless state or"
306
                   " has offline secondaries", name)
307
      continue
308

    
309
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
310

    
311
  if job:
312
    job_id = cli.SendJob(job, cl=cl)
313

    
314
    try:
315
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316
    except Exception: # pylint: disable-msg=W0703
317
      logging.exception("Error while activating disks")
318

    
319

    
320
def IsRapiResponding(hostname):
321
  """Connects to RAPI port and does a simple test.
322

323
  Connects to RAPI port of hostname and does a simple test. At this time, the
324
  test is GetVersion.
325

326
  @type hostname: string
327
  @param hostname: hostname of the node to connect to.
328
  @rtype: bool
329
  @return: Whether RAPI is working properly
330

331
  """
332
  curl_config = rapi.client.GenericCurlConfig()
333
  rapi_client = rapi.client.GanetiRapiClient(hostname,
334
                                             curl_config_fn=curl_config)
335
  try:
336
    master_version = rapi_client.GetVersion()
337
  except rapi.client.CertificateError, err:
338
    logging.warning("RAPI certificate error: %s", err)
339
    return False
340
  except rapi.client.GanetiApiError, err:
341
    logging.warning("RAPI error: %s", err)
342
    return False
343
  else:
344
    logging.debug("Reported RAPI version %s", master_version)
345
    return master_version == constants.RAPI_VERSION
346

    
347

    
348
def ParseOptions():
349
  """Parse the command line options.
350

351
  @return: (options, args) as from OptionParser.parse_args()
352

353
  """
354
  parser = OptionParser(description="Ganeti cluster watcher",
355
                        usage="%prog [-d]",
356
                        version="%%prog (ganeti) %s" %
357
                        constants.RELEASE_VERSION)
358

    
359
  parser.add_option(cli.DEBUG_OPT)
360
  parser.add_option(cli.NODEGROUP_OPT)
361
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362
                    help="Autoarchive jobs older than this age (default"
363
                          " 6 hours)")
364
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365
                    action="store_true", help="Ignore cluster pause setting")
366
  parser.add_option("--wait-children", dest="wait_children", default=False,
367
                    action="store_true", help="Wait for child processes")
368
  options, args = parser.parse_args()
369
  options.job_age = cli.ParseTimespec(options.job_age)
370

    
371
  if args:
372
    parser.error("No arguments expected")
373

    
374
  return (options, args)
375

    
376

    
377
def _WriteInstanceStatus(filename, data):
378
  """Writes the per-group instance status file.
379

380
  The entries are sorted.
381

382
  @type filename: string
383
  @param filename: Path to instance status file
384
  @type data: list of tuple; (instance name as string, status as string)
385
  @param data: Instance name and status
386

387
  """
388
  logging.debug("Updating instance status file '%s' with %s instances",
389
                filename, len(data))
390

    
391
  utils.WriteFile(filename,
392
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
393
                                   sorted(data))))
394

    
395

    
396
def _UpdateInstanceStatus(filename, instances):
397
  """Writes an instance status file from L{Instance} objects.
398

399
  @type filename: string
400
  @param filename: Path to status file
401
  @type instances: list of L{Instance}
402

403
  """
404
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
405
                                  for inst in instances])
406

    
407

    
408
class _StatCb:
409
  """Helper to store file handle's C{fstat}.
410

411
  """
412
  def __init__(self):
413
    """Initializes this class.
414

415
    """
416
    self.st = None
417

    
418
  def __call__(self, fh):
419
    """Calls C{fstat} on file handle.
420

421
    """
422
    self.st = os.fstat(fh.fileno())
423

    
424

    
425
def _ReadInstanceStatus(filename):
426
  """Reads an instance status file.
427

428
  @type filename: string
429
  @param filename: Path to status file
430
  @rtype: tuple; (None or number, list of lists containing instance name and
431
    status)
432
  @return: File's mtime and instance status contained in the file; mtime is
433
    C{None} if file can't be read
434

435
  """
436
  logging.debug("Reading per-group instance status from '%s'", filename)
437

    
438
  statcb = _StatCb()
439
  try:
440
    content = utils.ReadFile(filename, preread=statcb)
441
  except EnvironmentError, err:
442
    if err.errno == errno.ENOENT:
443
      logging.error("Can't read '%s', does not exist (yet)", filename)
444
    else:
445
      logging.exception("Unable to read '%s', ignoring", filename)
446
    return (None, None)
447
  else:
448
    return (statcb.st.st_mtime, [line.split(1)
449
                                 for line in content.splitlines()])
450

    
451

    
452
def _MergeInstanceStatus(filename, pergroup_filename, groups):
453
  """Merges all per-group instance status files into a global one.
454

455
  @type filename: string
456
  @param filename: Path to global instance status file
457
  @type pergroup_filename: string
458
  @param pergroup_filename: Path to per-group status files, must contain "%s"
459
    to be replaced with group UUID
460
  @type groups: sequence
461
  @param groups: UUIDs of known groups
462

463
  """
464
  # Lock global status file in exclusive mode
465
  lock = utils.FileLock.Open(filename)
466
  try:
467
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
468
  except errors.LockError, err:
469
    # All per-group processes will lock and update the file. None of them
470
    # should take longer than 10 seconds (the value of
471
    # INSTANCE_STATUS_LOCK_TIMEOUT).
472
    logging.error("Can't acquire lock on instance status file '%s', not"
473
                  " updating: %s", filename, err)
474
    return
475

    
476
  logging.debug("Acquired exclusive lock on '%s'", filename)
477

    
478
  data = {}
479

    
480
  # Load instance status from all groups
481
  for group_uuid in groups:
482
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
483

    
484
    if mtime is not None:
485
      for (instance_name, status) in instdata:
486
        data.setdefault(instance_name, []).append((mtime, status))
487

    
488
  # Select last update based on file mtime
489
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
490
                for (instance_name, status) in data.items()]
491

    
492
  # Write the global status file. Don't touch file after it's been
493
  # updated--there is no lock anymore.
494
  _WriteInstanceStatus(filename, inststatus)
495

    
496

    
497
def GetLuxiClient(try_restart):
498
  """Tries to connect to the master daemon.
499

500
  @type try_restart: bool
501
  @param try_restart: Whether to attempt to restart the master daemon
502

503
  """
504
  try:
505
    return cli.GetClient()
506
  except errors.OpPrereqError, err:
507
    # this is, from cli.GetClient, a not-master case
508
    raise NotMasterError("Not on master node (%s)" % err)
509

    
510
  except luxi.NoMasterError, err:
511
    if not try_restart:
512
      raise
513

    
514
    logging.warning("Master daemon seems to be down (%s), trying to restart",
515
                    err)
516

    
517
    if not utils.EnsureDaemon(constants.MASTERD):
518
      raise errors.GenericError("Can't start the master daemon")
519

    
520
    # Retry the connection
521
    return cli.GetClient()
522

    
523

    
524
def _StartGroupChildren(cl, wait):
525
  """Starts a new instance of the watcher for every node group.
526

527
  """
528
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
529
                        for arg in sys.argv)
530

    
531
  result = cl.QueryGroups([], ["name", "uuid"], False)
532

    
533
  children = []
534

    
535
  for (idx, (name, uuid)) in enumerate(result):
536
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
537

    
538
    if idx > 0:
539
      # Let's not kill the system
540
      time.sleep(CHILD_PROCESS_DELAY)
541

    
542
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
543
                  name, uuid, args)
544

    
545
    try:
546
      # TODO: Should utils.StartDaemon be used instead?
547
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
548
    except Exception: # pylint: disable-msg=W0703
549
      logging.exception("Failed to start child for group '%s' (%s)",
550
                        name, uuid)
551
    else:
552
      logging.debug("Started with PID %s", pid)
553
      children.append(pid)
554

    
555
  if wait:
556
    for pid in children:
557
      logging.debug("Waiting for child PID %s", pid)
558
      try:
559
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
560
      except EnvironmentError, err:
561
        result = str(err)
562

    
563
      logging.debug("Child PID %s exited with status %s", pid, result)
564

    
565

    
566
def _ArchiveJobs(cl, age):
567
  """Archives old jobs.
568

569
  """
570
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
571
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
572

    
573

    
574
def _CheckMaster(cl):
575
  """Ensures current host is master node.
576

577
  """
578
  (master, ) = cl.QueryConfigValues(["master_node"])
579
  if master != netutils.Hostname.GetSysName():
580
    raise NotMasterError("This is not the master node")
581

    
582

    
583
@rapi.client.UsesRapiClient
584
def _GlobalWatcher(opts):
585
  """Main function for global watcher.
586

587
  At the end child processes are spawned for every node group.
588

589
  """
590
  StartNodeDaemons()
591
  RunWatcherHooks()
592

    
593
  # Run node maintenance in all cases, even if master, so that old masters can
594
  # be properly cleaned up
595
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
596
    nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
597

    
598
  try:
599
    client = GetLuxiClient(True)
600
  except NotMasterError:
601
    # Don't proceed on non-master nodes
602
    return constants.EXIT_SUCCESS
603

    
604
  # we are on master now
605
  utils.EnsureDaemon(constants.RAPI)
606

    
607
  # If RAPI isn't responding to queries, try one restart
608
  logging.debug("Attempting to talk to remote API on %s",
609
                constants.IP4_ADDRESS_LOCALHOST)
610
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
611
    logging.warning("Couldn't get answer from remote API, restaring daemon")
612
    utils.StopDaemon(constants.RAPI)
613
    utils.EnsureDaemon(constants.RAPI)
614
    logging.debug("Second attempt to talk to remote API")
615
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
616
      logging.fatal("RAPI is not responding")
617
  logging.debug("Successfully talked to remote API")
618

    
619
  _CheckMaster(client)
620
  _ArchiveJobs(client, opts.job_age)
621

    
622
  # Spawn child processes for all node groups
623
  _StartGroupChildren(client, opts.wait_children)
624

    
625
  return constants.EXIT_SUCCESS
626

    
627

    
628
def _GetGroupData(cl, uuid):
629
  """Retrieves instances and nodes per node group.
630

631
  """
632
  # TODO: Implement locking
633
  job = [
634
    # Get all primary instances in group
635
    opcodes.OpQuery(what=constants.QR_INSTANCE,
636
                    fields=["name", "status", "admin_state", "snodes",
637
                            "pnode.group.uuid", "snodes.group.uuid"],
638
                    filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
639

    
640
    # Get all nodes in group
641
    opcodes.OpQuery(what=constants.QR_NODE,
642
                    fields=["name", "bootid", "offline"],
643
                    filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
644
    ]
645

    
646
  job_id = cl.SubmitJob(job)
647
  results = map(objects.QueryResponse.FromDict,
648
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
649
  cl.ArchiveJob(job_id)
650

    
651
  results_data = map(operator.attrgetter("data"), results)
652

    
653
  # Ensure results are tuples with two values
654
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
655

    
656
  # Extract values ignoring result status
657
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
658
                                 for values in res]
659
                                for res in results_data]
660

    
661
  secondaries = {}
662
  instances = []
663

    
664
  # Load all instances
665
  for (name, status, autostart, snodes, pnode_group_uuid,
666
       snodes_group_uuid) in raw_instances:
667
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
668
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
669
                    " groups %s", name, pnode_group_uuid,
670
                    utils.CommaJoin(snodes_group_uuid))
671
    else:
672
      instances.append(Instance(name, status, autostart, snodes))
673

    
674
      for node in snodes:
675
        secondaries.setdefault(node, set()).add(name)
676

    
677
  # Load all nodes
678
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
679
           for (name, bootid, offline) in raw_nodes]
680

    
681
  return (dict((node.name, node) for node in nodes),
682
          dict((inst.name, inst) for inst in instances))
683

    
684

    
685
def _LoadKnownGroups():
686
  """Returns a list of all node groups known by L{ssconf}.
687

688
  """
689
  groups = ssconf.SimpleStore().GetNodegroupList()
690

    
691
  result = list(line.split(None, 1)[0] for line in groups
692
                if line.strip())
693

    
694
  if not compat.all(map(utils.UUID_RE.match, result)):
695
    raise errors.GenericError("Ssconf contains invalid group UUID")
696

    
697
  return result
698

    
699

    
700
def _GroupWatcher(opts):
701
  """Main function for per-group watcher process.
702

703
  """
704
  group_uuid = opts.nodegroup.lower()
705

    
706
  if not utils.UUID_RE.match(group_uuid):
707
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
708
                              " got '%s'" %
709
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
710

    
711
  logging.info("Watcher for node group '%s'", group_uuid)
712

    
713
  known_groups = _LoadKnownGroups()
714

    
715
  # Check if node group is known
716
  if group_uuid not in known_groups:
717
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
718
                              group_uuid)
719

    
720
  # Group UUID has been verified and should not contain any dangerous characters
721
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
722
  inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
723

    
724
  logging.debug("Using state file %s", state_path)
725

    
726
  # Global watcher
727
  statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
728
  if not statefile:
729
    return constants.EXIT_FAILURE
730

    
731
  notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
732
  try:
733
    # Connect to master daemon
734
    client = GetLuxiClient(False)
735

    
736
    _CheckMaster(client)
737

    
738
    (nodes, instances) = _GetGroupData(client, group_uuid)
739

    
740
    # Update per-group instance status file
741
    _UpdateInstanceStatus(inst_status_path, instances.values())
742

    
743
    _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
744
                         constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
745
                         known_groups)
746

    
747
    started = _CheckInstances(client, notepad, instances)
748
    _CheckDisks(client, notepad, nodes, instances, started)
749
    _VerifyDisks(client, group_uuid, nodes, instances)
750
  except Exception, err:
751
    logging.info("Not updating status file due to failure: %s", err)
752
    raise
753
  else:
754
    # Save changes for next run
755
    notepad.Save(state_path)
756

    
757
  return constants.EXIT_SUCCESS
758

    
759

    
760
def Main():
761
  """Main function.
762

763
  """
764
  (options, _) = ParseOptions()
765

    
766
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
767
                     debug=options.debug, stderr_logging=options.debug)
768

    
769
  if ShouldPause() and not options.ignore_pause:
770
    logging.debug("Pause has been set, exiting")
771
    return constants.EXIT_SUCCESS
772

    
773
  # Try to acquire global watcher lock in shared mode
774
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
775
  try:
776
    lock.Shared(blocking=False)
777
  except (EnvironmentError, errors.LockError), err:
778
    logging.error("Can't acquire lock on %s: %s",
779
                  constants.WATCHER_LOCK_FILE, err)
780
    return constants.EXIT_SUCCESS
781

    
782
  if options.nodegroup is None:
783
    fn = _GlobalWatcher
784
  else:
785
    # Per-nodegroup watcher
786
    fn = _GroupWatcher
787

    
788
  try:
789
    return fn(options)
790
  except (SystemExit, KeyboardInterrupt):
791
    raise
792
  except NotMasterError:
793
    logging.debug("Not master, exiting")
794
    return constants.EXIT_NOTMASTER
795
  except errors.ResolverError, err:
796
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
797
    return constants.EXIT_NODESETUP_ERROR
798
  except errors.JobQueueFull:
799
    logging.error("Job queue is full, can't query cluster state")
800
  except errors.JobQueueDrainError:
801
    logging.error("Job queue is drained, can't maintain cluster state")
802
  except Exception, err:
803
    logging.exception(str(err))
804
    return constants.EXIT_FAILURE
805

    
806
  return constants.EXIT_SUCCESS