Statistics
| Branch: | Tag: | Revision:

root / lib / watcher / __init__.py @ d962dbf9

History | View | Annotate | Download (24.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erroneously downed virtual machines.
23

24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

28
"""
29

    
30
import os
31
import os.path
32
import sys
33
import time
34
import logging
35
import operator
36
import errno
37
from optparse import OptionParser
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import compat
42
from ganeti import errors
43
from ganeti import opcodes
44
from ganeti import cli
45
from ganeti import luxi
46
from ganeti import rapi
47
from ganeti import netutils
48
from ganeti import qlang
49
from ganeti import objects
50
from ganeti import ssconf
51
from ganeti import ht
52
from ganeti import pathutils
53

    
54
import ganeti.rapi.client # pylint: disable=W0611
55
from ganeti.rapi.client import UsesRapiClient
56

    
57
from ganeti.watcher import nodemaint
58
from ganeti.watcher import state
59

    
60

    
61
MAXTRIES = 5
62
BAD_STATES = compat.UniqueFrozenset([
63
  constants.INSTST_ERRORDOWN,
64
  ])
65
HELPLESS_STATES = compat.UniqueFrozenset([
66
  constants.INSTST_NODEDOWN,
67
  constants.INSTST_NODEOFFLINE,
68
  ])
69
NOTICE = "NOTICE"
70
ERROR = "ERROR"
71

    
72
#: Number of seconds to wait between starting child processes for node groups
73
CHILD_PROCESS_DELAY = 1.0
74

    
75
#: How many seconds to wait for instance status file lock
76
INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77

    
78

    
79
class NotMasterError(errors.GenericError):
80
  """Exception raised when this host is not the master."""
81

    
82

    
83
def ShouldPause():
84
  """Check whether we should pause.
85

86
  """
87
  return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
88

    
89

    
90
def StartNodeDaemons():
91
  """Start all the daemons that should be running on all nodes.
92

93
  """
94
  # on master or not, try to start the node daemon
95
  utils.EnsureDaemon(constants.NODED)
96
  # start confd as well. On non candidates it will be in disabled mode.
97
  if constants.ENABLE_CONFD:
98
    utils.EnsureDaemon(constants.CONFD)
99
  # start mond as well: all nodes need monitoring
100
  if constants.ENABLE_MOND:
101
    utils.EnsureDaemon(constants.MOND)
102

    
103

    
104
def RunWatcherHooks():
105
  """Run the watcher hooks.
106

107
  """
108
  hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
109
                             constants.HOOKS_NAME_WATCHER)
110
  if not os.path.isdir(hooks_dir):
111
    return
112

    
113
  try:
114
    results = utils.RunParts(hooks_dir)
115
  except Exception, err: # pylint: disable=W0703
116
    logging.exception("RunParts %s failed: %s", hooks_dir, err)
117
    return
118

    
119
  for (relname, status, runresult) in results:
120
    if status == constants.RUNPARTS_SKIP:
121
      logging.debug("Watcher hook %s: skipped", relname)
122
    elif status == constants.RUNPARTS_ERR:
123
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
124
    elif status == constants.RUNPARTS_RUN:
125
      if runresult.failed:
126
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
127
                        relname, runresult.exit_code, runresult.output)
128
      else:
129
        logging.debug("Watcher hook %s: success (output: %s)", relname,
130
                      runresult.output)
131
    else:
132
      raise errors.ProgrammerError("Unknown status %s returned by RunParts",
133
                                   status)
134

    
135

    
136
class Instance(object):
137
  """Abstraction for a Virtual Machine instance.
138

139
  """
140
  def __init__(self, name, status, disks_active, snodes):
141
    self.name = name
142
    self.status = status
143
    self.disks_active = disks_active
144
    self.snodes = snodes
145

    
146
  def Restart(self, cl):
147
    """Encapsulates the start of an instance.
148

149
    """
150
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
151
    cli.SubmitOpCode(op, cl=cl)
152

    
153
  def ActivateDisks(self, cl):
154
    """Encapsulates the activation of all disks of an instance.
155

156
    """
157
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
158
    cli.SubmitOpCode(op, cl=cl)
159

    
160

    
161
class Node:
162
  """Data container representing cluster node.
163

164
  """
165
  def __init__(self, name, bootid, offline, secondaries):
166
    """Initializes this class.
167

168
    """
169
    self.name = name
170
    self.bootid = bootid
171
    self.offline = offline
172
    self.secondaries = secondaries
173

    
174

    
175
def _CheckInstances(cl, notepad, instances):
176
  """Make a pass over the list of instances, restarting downed ones.
177

178
  """
179
  notepad.MaintainInstanceList(instances.keys())
180

    
181
  started = set()
182

    
183
  for inst in instances.values():
184
    if inst.status in BAD_STATES:
185
      n = notepad.NumberOfRestartAttempts(inst.name)
186

    
187
      if n > MAXTRIES:
188
        logging.warning("Not restarting instance '%s', retries exhausted",
189
                        inst.name)
190
        continue
191

    
192
      if n == MAXTRIES:
193
        notepad.RecordRestartAttempt(inst.name)
194
        logging.error("Could not restart instance '%s' after %s attempts,"
195
                      " giving up", inst.name, MAXTRIES)
196
        continue
197

    
198
      try:
199
        logging.info("Restarting instance '%s' (attempt #%s)",
200
                     inst.name, n + 1)
201
        inst.Restart(cl)
202
      except Exception: # pylint: disable=W0703
203
        logging.exception("Error while restarting instance '%s'", inst.name)
204
      else:
205
        started.add(inst.name)
206

    
207
      notepad.RecordRestartAttempt(inst.name)
208

    
209
    else:
210
      if notepad.NumberOfRestartAttempts(inst.name):
211
        notepad.RemoveInstance(inst.name)
212
        if inst.status not in HELPLESS_STATES:
213
          logging.info("Restart of instance '%s' succeeded", inst.name)
214

    
215
  return started
216

    
217

    
218
def _CheckDisks(cl, notepad, nodes, instances, started):
219
  """Check all nodes for restarted ones.
220

221
  """
222
  check_nodes = []
223

    
224
  for node in nodes.values():
225
    old = notepad.GetNodeBootID(node.name)
226
    if not node.bootid:
227
      # Bad node, not returning a boot id
228
      if not node.offline:
229
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
230
                      node.name)
231
      continue
232

    
233
    if old != node.bootid:
234
      # Node's boot ID has changed, probably through a reboot
235
      check_nodes.append(node)
236

    
237
  if check_nodes:
238
    # Activate disks for all instances with any of the checked nodes as a
239
    # secondary node.
240
    for node in check_nodes:
241
      for instance_name in node.secondaries:
242
        try:
243
          inst = instances[instance_name]
244
        except KeyError:
245
          logging.info("Can't find instance '%s', maybe it was ignored",
246
                       instance_name)
247
          continue
248

    
249
        if not inst.disks_active:
250
          logging.info("Skipping disk activation for instance with not"
251
                       " activated disks '%s'", inst.name)
252
          continue
253

    
254
        if inst.name in started:
255
          # we already tried to start the instance, which should have
256
          # activated its drives (if they can be at all)
257
          logging.debug("Skipping disk activation for instance '%s' as"
258
                        " it was already started", inst.name)
259
          continue
260

    
261
        try:
262
          logging.info("Activating disks for instance '%s'", inst.name)
263
          inst.ActivateDisks(cl)
264
        except Exception: # pylint: disable=W0703
265
          logging.exception("Error while activating disks for instance '%s'",
266
                            inst.name)
267

    
268
    # Keep changed boot IDs
269
    for node in check_nodes:
270
      notepad.SetNodeBootID(node.name, node.bootid)
271

    
272

    
273
def _CheckForOfflineNodes(nodes, instance):
274
  """Checks if given instances has any secondary in offline status.
275

276
  @param instance: The instance object
277
  @return: True if any of the secondary is offline, False otherwise
278

279
  """
280
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
281

    
282

    
283
def _VerifyDisks(cl, uuid, nodes, instances):
284
  """Run a per-group "gnt-cluster verify-disks".
285

286
  """
287
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
288
  ((_, offline_disk_instances, _), ) = \
289
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
290
  cl.ArchiveJob(job_id)
291

    
292
  if not offline_disk_instances:
293
    # nothing to do
294
    logging.debug("Verify-disks reported no offline disks, nothing to do")
295
    return
296

    
297
  logging.debug("Will activate disks for instance(s) %s",
298
                utils.CommaJoin(offline_disk_instances))
299

    
300
  # We submit only one job, and wait for it. Not optimal, but this puts less
301
  # load on the job queue.
302
  job = []
303
  for name in offline_disk_instances:
304
    try:
305
      inst = instances[name]
306
    except KeyError:
307
      logging.info("Can't find instance '%s', maybe it was ignored", name)
308
      continue
309

    
310
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
311
      logging.info("Skipping instance '%s' because it is in a helpless state"
312
                   " or has offline secondaries", name)
313
      continue
314

    
315
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
316

    
317
  if job:
318
    job_id = cli.SendJob(job, cl=cl)
319

    
320
    try:
321
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
322
    except Exception: # pylint: disable=W0703
323
      logging.exception("Error while activating disks")
324

    
325

    
326
def IsRapiResponding(hostname):
327
  """Connects to RAPI port and does a simple test.
328

329
  Connects to RAPI port of hostname and does a simple test. At this time, the
330
  test is GetVersion.
331

332
  @type hostname: string
333
  @param hostname: hostname of the node to connect to.
334
  @rtype: bool
335
  @return: Whether RAPI is working properly
336

337
  """
338
  curl_config = rapi.client.GenericCurlConfig()
339
  rapi_client = rapi.client.GanetiRapiClient(hostname,
340
                                             curl_config_fn=curl_config)
341
  try:
342
    master_version = rapi_client.GetVersion()
343
  except rapi.client.CertificateError, err:
344
    logging.warning("RAPI certificate error: %s", err)
345
    return False
346
  except rapi.client.GanetiApiError, err:
347
    logging.warning("RAPI error: %s", err)
348
    return False
349
  else:
350
    logging.debug("Reported RAPI version %s", master_version)
351
    return master_version == constants.RAPI_VERSION
352

    
353

    
354
def ParseOptions():
355
  """Parse the command line options.
356

357
  @return: (options, args) as from OptionParser.parse_args()
358

359
  """
360
  parser = OptionParser(description="Ganeti cluster watcher",
361
                        usage="%prog [-d]",
362
                        version="%%prog (ganeti) %s" %
363
                        constants.RELEASE_VERSION)
364

    
365
  parser.add_option(cli.DEBUG_OPT)
366
  parser.add_option(cli.NODEGROUP_OPT)
367
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
368
                    help="Autoarchive jobs older than this age (default"
369
                          " 6 hours)")
370
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
371
                    action="store_true", help="Ignore cluster pause setting")
372
  parser.add_option("--wait-children", dest="wait_children",
373
                    action="store_true", help="Wait for child processes")
374
  parser.add_option("--no-wait-children", dest="wait_children",
375
                    action="store_false",
376
                    help="Don't wait for child processes")
377
  # See optparse documentation for why default values are not set by options
378
  parser.set_defaults(wait_children=True)
379
  options, args = parser.parse_args()
380
  options.job_age = cli.ParseTimespec(options.job_age)
381

    
382
  if args:
383
    parser.error("No arguments expected")
384

    
385
  return (options, args)
386

    
387

    
388
def _WriteInstanceStatus(filename, data):
389
  """Writes the per-group instance status file.
390

391
  The entries are sorted.
392

393
  @type filename: string
394
  @param filename: Path to instance status file
395
  @type data: list of tuple; (instance name as string, status as string)
396
  @param data: Instance name and status
397

398
  """
399
  logging.debug("Updating instance status file '%s' with %s instances",
400
                filename, len(data))
401

    
402
  utils.WriteFile(filename,
403
                  data="".join(map(compat.partial(operator.mod, "%s %s\n"),
404
                                   sorted(data))))
405

    
406

    
407
def _UpdateInstanceStatus(filename, instances):
408
  """Writes an instance status file from L{Instance} objects.
409

410
  @type filename: string
411
  @param filename: Path to status file
412
  @type instances: list of L{Instance}
413

414
  """
415
  _WriteInstanceStatus(filename, [(inst.name, inst.status)
416
                                  for inst in instances])
417

    
418

    
419
def _ReadInstanceStatus(filename):
420
  """Reads an instance status file.
421

422
  @type filename: string
423
  @param filename: Path to status file
424
  @rtype: tuple; (None or number, list of lists containing instance name and
425
    status)
426
  @return: File's mtime and instance status contained in the file; mtime is
427
    C{None} if file can't be read
428

429
  """
430
  logging.debug("Reading per-group instance status from '%s'", filename)
431

    
432
  statcb = utils.FileStatHelper()
433
  try:
434
    content = utils.ReadFile(filename, preread=statcb)
435
  except EnvironmentError, err:
436
    if err.errno == errno.ENOENT:
437
      logging.error("Can't read '%s', does not exist (yet)", filename)
438
    else:
439
      logging.exception("Unable to read '%s', ignoring", filename)
440
    return (None, None)
441
  else:
442
    return (statcb.st.st_mtime, [line.split(None, 1)
443
                                 for line in content.splitlines()])
444

    
445

    
446
def _MergeInstanceStatus(filename, pergroup_filename, groups):
447
  """Merges all per-group instance status files into a global one.
448

449
  @type filename: string
450
  @param filename: Path to global instance status file
451
  @type pergroup_filename: string
452
  @param pergroup_filename: Path to per-group status files, must contain "%s"
453
    to be replaced with group UUID
454
  @type groups: sequence
455
  @param groups: UUIDs of known groups
456

457
  """
458
  # Lock global status file in exclusive mode
459
  lock = utils.FileLock.Open(filename)
460
  try:
461
    lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
462
  except errors.LockError, err:
463
    # All per-group processes will lock and update the file. None of them
464
    # should take longer than 10 seconds (the value of
465
    # INSTANCE_STATUS_LOCK_TIMEOUT).
466
    logging.error("Can't acquire lock on instance status file '%s', not"
467
                  " updating: %s", filename, err)
468
    return
469

    
470
  logging.debug("Acquired exclusive lock on '%s'", filename)
471

    
472
  data = {}
473

    
474
  # Load instance status from all groups
475
  for group_uuid in groups:
476
    (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
477

    
478
    if mtime is not None:
479
      for (instance_name, status) in instdata:
480
        data.setdefault(instance_name, []).append((mtime, status))
481

    
482
  # Select last update based on file mtime
483
  inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
484
                for (instance_name, status) in data.items()]
485

    
486
  # Write the global status file. Don't touch file after it's been
487
  # updated--there is no lock anymore.
488
  _WriteInstanceStatus(filename, inststatus)
489

    
490

    
491
def GetLuxiClient(try_restart):
492
  """Tries to connect to the master daemon.
493

494
  @type try_restart: bool
495
  @param try_restart: Whether to attempt to restart the master daemon
496

497
  """
498
  try:
499
    return cli.GetClient()
500
  except errors.OpPrereqError, err:
501
    # this is, from cli.GetClient, a not-master case
502
    raise NotMasterError("Not on master node (%s)" % err)
503

    
504
  except luxi.NoMasterError, err:
505
    if not try_restart:
506
      raise
507

    
508
    logging.warning("Master daemon seems to be down (%s), trying to restart",
509
                    err)
510

    
511
    if not utils.EnsureDaemon(constants.MASTERD):
512
      raise errors.GenericError("Can't start the master daemon")
513

    
514
    # Retry the connection
515
    return cli.GetClient()
516

    
517

    
518
def _StartGroupChildren(cl, wait):
519
  """Starts a new instance of the watcher for every node group.
520

521
  """
522
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
523
                        for arg in sys.argv)
524

    
525
  result = cl.QueryGroups([], ["name", "uuid"], False)
526

    
527
  children = []
528

    
529
  for (idx, (name, uuid)) in enumerate(result):
530
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
531

    
532
    if idx > 0:
533
      # Let's not kill the system
534
      time.sleep(CHILD_PROCESS_DELAY)
535

    
536
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
537
                  name, uuid, args)
538

    
539
    try:
540
      # TODO: Should utils.StartDaemon be used instead?
541
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
542
    except Exception: # pylint: disable=W0703
543
      logging.exception("Failed to start child for group '%s' (%s)",
544
                        name, uuid)
545
    else:
546
      logging.debug("Started with PID %s", pid)
547
      children.append(pid)
548

    
549
  if wait:
550
    for pid in children:
551
      logging.debug("Waiting for child PID %s", pid)
552
      try:
553
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
554
      except EnvironmentError, err:
555
        result = str(err)
556

    
557
      logging.debug("Child PID %s exited with status %s", pid, result)
558

    
559

    
560
def _ArchiveJobs(cl, age):
561
  """Archives old jobs.
562

563
  """
564
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
565
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
566

    
567

    
568
def _CheckMaster(cl):
569
  """Ensures current host is master node.
570

571
  """
572
  (master, ) = cl.QueryConfigValues(["master_node"])
573
  if master != netutils.Hostname.GetSysName():
574
    raise NotMasterError("This is not the master node")
575

    
576

    
577
@UsesRapiClient
578
def _GlobalWatcher(opts):
579
  """Main function for global watcher.
580

581
  At the end child processes are spawned for every node group.
582

583
  """
584
  StartNodeDaemons()
585
  RunWatcherHooks()
586

    
587
  # Run node maintenance in all cases, even if master, so that old masters can
588
  # be properly cleaned up
589
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
590
    nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
591

    
592
  try:
593
    client = GetLuxiClient(True)
594
  except NotMasterError:
595
    # Don't proceed on non-master nodes
596
    return constants.EXIT_SUCCESS
597

    
598
  # we are on master now
599
  utils.EnsureDaemon(constants.RAPI)
600

    
601
  # If RAPI isn't responding to queries, try one restart
602
  logging.debug("Attempting to talk to remote API on %s",
603
                constants.IP4_ADDRESS_LOCALHOST)
604
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
605
    logging.warning("Couldn't get answer from remote API, restaring daemon")
606
    utils.StopDaemon(constants.RAPI)
607
    utils.EnsureDaemon(constants.RAPI)
608
    logging.debug("Second attempt to talk to remote API")
609
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
610
      logging.fatal("RAPI is not responding")
611
  logging.debug("Successfully talked to remote API")
612

    
613
  _CheckMaster(client)
614
  _ArchiveJobs(client, opts.job_age)
615

    
616
  # Spawn child processes for all node groups
617
  _StartGroupChildren(client, opts.wait_children)
618

    
619
  return constants.EXIT_SUCCESS
620

    
621

    
622
def _GetGroupData(cl, uuid):
623
  """Retrieves instances and nodes per node group.
624

625
  """
626
  job = [
627
    # Get all primary instances in group
628
    opcodes.OpQuery(what=constants.QR_INSTANCE,
629
                    fields=["name", "status", "disks_active", "snodes",
630
                            "pnode.group.uuid", "snodes.group.uuid"],
631
                    qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
632
                    use_locking=True),
633

    
634
    # Get all nodes in group
635
    opcodes.OpQuery(what=constants.QR_NODE,
636
                    fields=["name", "bootid", "offline"],
637
                    qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
638
                    use_locking=True),
639
    ]
640

    
641
  job_id = cl.SubmitJob(job)
642
  results = map(objects.QueryResponse.FromDict,
643
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
644
  cl.ArchiveJob(job_id)
645

    
646
  results_data = map(operator.attrgetter("data"), results)
647

    
648
  # Ensure results are tuples with two values
649
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
650

    
651
  # Extract values ignoring result status
652
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
653
                                 for values in res]
654
                                for res in results_data]
655

    
656
  secondaries = {}
657
  instances = []
658

    
659
  # Load all instances
660
  for (name, status, disks_active, snodes, pnode_group_uuid,
661
       snodes_group_uuid) in raw_instances:
662
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
663
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
664
                    " groups %s", name, pnode_group_uuid,
665
                    utils.CommaJoin(snodes_group_uuid))
666
    else:
667
      instances.append(Instance(name, status, disks_active, snodes))
668

    
669
      for node in snodes:
670
        secondaries.setdefault(node, set()).add(name)
671

    
672
  # Load all nodes
673
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
674
           for (name, bootid, offline) in raw_nodes]
675

    
676
  return (dict((node.name, node) for node in nodes),
677
          dict((inst.name, inst) for inst in instances))
678

    
679

    
680
def _LoadKnownGroups():
681
  """Returns a list of all node groups known by L{ssconf}.
682

683
  """
684
  groups = ssconf.SimpleStore().GetNodegroupList()
685

    
686
  result = list(line.split(None, 1)[0] for line in groups
687
                if line.strip())
688

    
689
  if not compat.all(map(utils.UUID_RE.match, result)):
690
    raise errors.GenericError("Ssconf contains invalid group UUID")
691

    
692
  return result
693

    
694

    
695
def _GroupWatcher(opts):
696
  """Main function for per-group watcher process.
697

698
  """
699
  group_uuid = opts.nodegroup.lower()
700

    
701
  if not utils.UUID_RE.match(group_uuid):
702
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
703
                              " got '%s'" %
704
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
705

    
706
  logging.info("Watcher for node group '%s'", group_uuid)
707

    
708
  known_groups = _LoadKnownGroups()
709

    
710
  # Check if node group is known
711
  if group_uuid not in known_groups:
712
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
713
                              group_uuid)
714

    
715
  # Group UUID has been verified and should not contain any dangerous
716
  # characters
717
  state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
718
  inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
719

    
720
  logging.debug("Using state file %s", state_path)
721

    
722
  # Global watcher
723
  statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
724
  if not statefile:
725
    return constants.EXIT_FAILURE
726

    
727
  notepad = state.WatcherState(statefile) # pylint: disable=E0602
728
  try:
729
    # Connect to master daemon
730
    client = GetLuxiClient(False)
731

    
732
    _CheckMaster(client)
733

    
734
    (nodes, instances) = _GetGroupData(client, group_uuid)
735

    
736
    # Update per-group instance status file
737
    _UpdateInstanceStatus(inst_status_path, instances.values())
738

    
739
    _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
740
                         pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
741
                         known_groups)
742

    
743
    started = _CheckInstances(client, notepad, instances)
744
    _CheckDisks(client, notepad, nodes, instances, started)
745
    _VerifyDisks(client, group_uuid, nodes, instances)
746
  except Exception, err:
747
    logging.info("Not updating status file due to failure: %s", err)
748
    raise
749
  else:
750
    # Save changes for next run
751
    notepad.Save(state_path)
752

    
753
  return constants.EXIT_SUCCESS
754

    
755

    
756
def Main():
757
  """Main function.
758

759
  """
760
  (options, _) = ParseOptions()
761

    
762
  utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
763
                     debug=options.debug, stderr_logging=options.debug)
764

    
765
  if ShouldPause() and not options.ignore_pause:
766
    logging.debug("Pause has been set, exiting")
767
    return constants.EXIT_SUCCESS
768

    
769
  # Try to acquire global watcher lock in shared mode
770
  lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
771
  try:
772
    lock.Shared(blocking=False)
773
  except (EnvironmentError, errors.LockError), err:
774
    logging.error("Can't acquire lock on %s: %s",
775
                  pathutils.WATCHER_LOCK_FILE, err)
776
    return constants.EXIT_SUCCESS
777

    
778
  if options.nodegroup is None:
779
    fn = _GlobalWatcher
780
  else:
781
    # Per-nodegroup watcher
782
    fn = _GroupWatcher
783

    
784
  try:
785
    return fn(options)
786
  except (SystemExit, KeyboardInterrupt):
787
    raise
788
  except NotMasterError:
789
    logging.debug("Not master, exiting")
790
    return constants.EXIT_NOTMASTER
791
  except errors.ResolverError, err:
792
    logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
793
    return constants.EXIT_NODESETUP_ERROR
794
  except errors.JobQueueFull:
795
    logging.error("Job queue is full, can't query cluster state")
796
  except errors.JobQueueDrainError:
797
    logging.error("Job queue is drained, can't maintain cluster state")
798
  except Exception, err:
799
    logging.exception(str(err))
800
    return constants.EXIT_FAILURE
801

    
802
  return constants.EXIT_SUCCESS