Revision 16e0b9c9 lib/watcher/__init__.py

b/lib/watcher/__init__.py
32 32
import sys
33 33
import time
34 34
import logging
35
import operator
35 36
from optparse import OptionParser
36 37

  
37 38
from ganeti import utils
......
43 44
from ganeti import luxi
44 45
from ganeti import rapi
45 46
from ganeti import netutils
47
from ganeti import qlang
48
from ganeti import objects
49
from ganeti import ssconf
50
from ganeti import ht
46 51

  
47 52
import ganeti.rapi.client # pylint: disable-msg=W0611
48 53

  
......
51 56

  
52 57

  
53 58
MAXTRIES = 5
54

  
55

  
56
# Global LUXI client object
57
client = None
58 59
BAD_STATES = frozenset([
59 60
  constants.INSTST_ERRORDOWN,
60 61
  ])
......
65 66
NOTICE = "NOTICE"
66 67
ERROR = "ERROR"
67 68

  
69
#: Number of seconds to wait between starting child processes for node groups
70
CHILD_PROCESS_DELAY = 1.0
71

  
68 72

  
69 73
class NotMasterError(errors.GenericError):
70 74
  """Exception raised when this host is not the master."""
......
129 133
    self.autostart = autostart
130 134
    self.snodes = snodes
131 135

  
132
  def Restart(self):
136
  def Restart(self, cl):
133 137
    """Encapsulates the start of an instance.
134 138

  
135 139
    """
136 140
    op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
137
    cli.SubmitOpCode(op, cl=client)
141
    cli.SubmitOpCode(op, cl=cl)
138 142

  
139
  def ActivateDisks(self):
143
  def ActivateDisks(self, cl):
140 144
    """Encapsulates the activation of all disks of an instance.
141 145

  
142 146
    """
143 147
    op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
144
    cli.SubmitOpCode(op, cl=client)
148
    cli.SubmitOpCode(op, cl=cl)
145 149

  
146 150

  
147
def GetClusterData():
148
  """Get a list of instances on this cluster.
151
class Node:
152
  """Data container representing cluster node.
149 153

  
150 154
  """
151
  op1_fields = ["name", "status", "admin_state", "snodes"]
152
  op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
153
                                use_locking=True)
154
  op2_fields = ["name", "bootid", "offline"]
155
  op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
156
                            use_locking=True)
157

  
158
  job_id = client.SubmitJob([op1, op2])
155
  def __init__(self, name, bootid, offline, secondaries):
156
    """Initializes this class.
159 157

  
160
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
158
    """
159
    self.name = name
160
    self.bootid = bootid
161
    self.offline = offline
162
    self.secondaries = secondaries
161 163

  
162
  logging.debug("Got data from cluster, writing instance status file")
163 164

  
164
  result = all_results[0]
165
  smap = {}
165
def _CheckInstances(cl, notepad, instances):
166
  """Make a pass over the list of instances, restarting downed ones.
166 167

  
167
  instances = {}
168
  """
169
  notepad.MaintainInstanceList(instances.keys())
168 170

  
169
  _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
171
  started = set()
170 172

  
171
  for fields in result:
172
    (name, status, autostart, snodes) = fields
173
  for inst in instances.values():
174
    if inst.status in BAD_STATES:
175
      n = notepad.NumberOfRestartAttempts(inst.name)
173 176

  
174
    # update the secondary node map
175
    for node in snodes:
176
      if node not in smap:
177
        smap[node] = []
178
      smap[node].append(name)
177
      if n > MAXTRIES:
178
        logging.warning("Not restarting instance '%s', retries exhausted",
179
                        inst.name)
180
        continue
179 181

  
180
    instances[name] = Instance(name, status, autostart, snodes)
182
      if n == MAXTRIES:
183
        notepad.RecordRestartAttempt(inst.name)
184
        logging.error("Could not restart instance '%s' after %s attempts,"
185
                      " giving up", inst.name, MAXTRIES)
186
        continue
181 187

  
182
  nodes =  dict([(name, (bootid, offline))
183
                 for name, bootid, offline in all_results[1]])
188
      try:
189
        logging.info("Restarting instance '%s' (attempt #%s)",
190
                     inst.name, n + 1)
191
        inst.Restart(cl)
192
      except Exception: # pylint: disable-msg=W0703
193
        logging.exception("Error while restarting instance '%s'", inst.name)
194
      else:
195
        started.add(inst.name)
184 196

  
185
  client.ArchiveJob(job_id)
197
      notepad.RecordRestartAttempt(inst.name)
186 198

  
187
  return instances, nodes, smap
199
    else:
200
      if notepad.NumberOfRestartAttempts(inst.name):
201
        notepad.RemoveInstance(inst.name)
202
        if inst.status not in HELPLESS_STATES:
203
          logging.info("Restart of instance '%s' succeeded", inst.name)
188 204

  
205
  return started
189 206

  
190
class Watcher(object):
191
  """Encapsulate the logic for restarting erroneously halted virtual machines.
192 207

  
193
  The calling program should periodically instantiate me and call Run().
194
  This will traverse the list of instances, and make up to MAXTRIES attempts
195
  to restart machines that are down.
208
def _CheckDisks(cl, notepad, nodes, instances, started):
209
  """Check all nodes for restarted ones.
196 210

  
197 211
  """
198
  def __init__(self, opts, notepad):
199
    self.notepad = notepad
200
    master = client.QueryConfigValues(["master_node"])[0]
201
    if master != netutils.Hostname.GetSysName():
202
      raise NotMasterError("This is not the master node")
203
    # first archive old jobs
204
    self.ArchiveJobs(opts.job_age)
205
    # and only then submit new ones
206
    self.instances, self.bootids, self.smap = GetClusterData()
207
    self.started_instances = set()
208
    self.opts = opts
209

  
210
  def Run(self):
211
    """Watcher run sequence.
212

  
213
    """
214
    notepad = self.notepad
215
    self.CheckInstances(notepad)
216
    self.CheckDisks(notepad)
217
    self.VerifyDisks()
218

  
219
  @staticmethod
220
  def ArchiveJobs(age):
221
    """Archive old jobs.
222

  
223
    """
224
    arch_count, left_count = client.AutoArchiveJobs(age)
225
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
226

  
227
  def CheckDisks(self, notepad):
228
    """Check all nodes for restarted ones.
229

  
230
    """
231
    check_nodes = []
232
    for name, (new_id, offline) in self.bootids.iteritems():
233
      old = notepad.GetNodeBootID(name)
234
      if new_id is None:
235
        # Bad node, not returning a boot id
236
        if not offline:
237
          logging.debug("Node %s missing boot id, skipping secondary checks",
238
                        name)
239
        continue
240
      if old != new_id:
241
        # Node's boot ID has changed, proably through a reboot.
242
        check_nodes.append(name)
243

  
244
    if check_nodes:
245
      # Activate disks for all instances with any of the checked nodes as a
246
      # secondary node.
247
      for node in check_nodes:
248
        if node not in self.smap:
212
  check_nodes = []
213

  
214
  for node in nodes.values():
215
    old = notepad.GetNodeBootID(node.name)
216
    if not node.bootid:
217
      # Bad node, not returning a boot id
218
      if not node.offline:
219
        logging.debug("Node '%s' missing boot ID, skipping secondary checks",
220
                      node.name)
221
      continue
222

  
223
    if old != node.bootid:
224
      # Node's boot ID has changed, probably through a reboot
225
      check_nodes.append(node)
226

  
227
  if check_nodes:
228
    # Activate disks for all instances with any of the checked nodes as a
229
    # secondary node.
230
    for node in check_nodes:
231
      for instance_name in node.secondaries:
232
        try:
233
          inst = instances[instance_name]
234
        except KeyError:
235
          logging.info("Can't find instance '%s', maybe it was ignored",
236
                       instance_name)
249 237
          continue
250
        for instance_name in self.smap[node]:
251
          instance = self.instances[instance_name]
252
          if not instance.autostart:
253
            logging.info(("Skipping disk activation for non-autostart"
254
                          " instance %s"), instance.name)
255
            continue
256
          if instance.name in self.started_instances:
257
            # we already tried to start the instance, which should have
258
            # activated its drives (if they can be at all)
259
            logging.debug("Skipping disk activation for instance %s, as"
260
                          " it was already started", instance.name)
261
            continue
262
          try:
263
            logging.info("Activating disks for instance %s", instance.name)
264
            instance.ActivateDisks()
265
          except Exception: # pylint: disable-msg=W0703
266
            logging.exception("Error while activating disks for instance %s",
267
                              instance.name)
268

  
269
      # Keep changed boot IDs
270
      for name in check_nodes:
271
        notepad.SetNodeBootID(name, self.bootids[name][0])
272

  
273
  def CheckInstances(self, notepad):
274
    """Make a pass over the list of instances, restarting downed ones.
275

  
276
    """
277
    notepad.MaintainInstanceList(self.instances.keys())
278

  
279
    for instance in self.instances.values():
280
      if instance.status in BAD_STATES:
281
        n = notepad.NumberOfRestartAttempts(instance.name)
282 238

  
283
        if n > MAXTRIES:
284
          logging.warning("Not restarting instance %s, retries exhausted",
285
                          instance.name)
239
        if not inst.autostart:
240
          logging.info("Skipping disk activation for non-autostart"
241
                       " instance '%s'", inst.name)
286 242
          continue
287
        elif n < MAXTRIES:
288
          last = " (Attempt #%d)" % (n + 1)
289
        else:
290
          notepad.RecordRestartAttempt(instance.name)
291
          logging.error("Could not restart %s after %d attempts, giving up",
292
                        instance.name, MAXTRIES)
243

  
244
        if inst.name in started:
245
          # we already tried to start the instance, which should have
246
          # activated its drives (if they can be at all)
247
          logging.debug("Skipping disk activation for instance '%s' as"
248
                        " it was already started", inst.name)
293 249
          continue
250

  
294 251
        try:
295
          logging.info("Restarting %s%s", instance.name, last)
296
          instance.Restart()
297
          self.started_instances.add(instance.name)
252
          logging.info("Activating disks for instance '%s'", inst.name)
253
          inst.ActivateDisks(cl)
298 254
        except Exception: # pylint: disable-msg=W0703
299
          logging.exception("Error while restarting instance %s",
300
                            instance.name)
255
          logging.exception("Error while activating disks for instance '%s'",
256
                            inst.name)
301 257

  
302
        notepad.RecordRestartAttempt(instance.name)
303
      elif instance.status in HELPLESS_STATES:
304
        if notepad.NumberOfRestartAttempts(instance.name):
305
          notepad.RemoveInstance(instance.name)
306
      else:
307
        if notepad.NumberOfRestartAttempts(instance.name):
308
          notepad.RemoveInstance(instance.name)
309
          logging.info("Restart of %s succeeded", instance.name)
258
    # Keep changed boot IDs
259
    for node in check_nodes:
260
      notepad.SetNodeBootID(node.name, node.bootid)
310 261

  
311
  def _CheckForOfflineNodes(self, instance):
312
    """Checks if given instances has any secondary in offline status.
313 262

  
314
    @param instance: The instance object
315
    @return: True if any of the secondary is offline, False otherwise
316

  
317
    """
318
    bootids = []
319
    for node in instance.snodes:
320
      bootids.append(self.bootids[node])
321

  
322
    return compat.any(offline for (_, offline) in bootids)
323

  
324
  def VerifyDisks(self):
325
    """Run gnt-cluster verify-disks.
326

  
327
    """
328
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
329
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
330
    client.ArchiveJob(job_id)
263
def _CheckForOfflineNodes(nodes, instance):
264
  """Checks if given instances has any secondary in offline status.
331 265

  
332
    # Keep track of submitted jobs
333
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
266
  @param instance: The instance object
267
  @return: True if any of the secondary is offline, False otherwise
334 268

  
335
    archive_jobs = set()
336
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
337
      jex.AddJobId(None, status, job_id)
338
      if status:
339
        archive_jobs.add(job_id)
269
  """
270
  return compat.any(nodes[node_name].offline for node_name in instance.snodes)
340 271

  
341
    offline_disk_instances = set()
342 272

  
343
    for (status, result) in jex.GetResults():
344
      if not status:
345
        logging.error("Verify-disks job failed: %s", result)
346
        continue
273
def _VerifyDisks(cl, uuid, nodes, instances):
274
  """Run a per-group "gnt-cluster verify-disks".
347 275

  
348
      ((_, instances, _), ) = result
276
  """
277
  job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
278
  ((_, offline_disk_instances, _), ) = \
279
    cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
280
  cl.ArchiveJob(job_id)
349 281

  
350
      offline_disk_instances.update(instances)
282
  if not offline_disk_instances:
283
    # nothing to do
284
    logging.debug("Verify-disks reported no offline disks, nothing to do")
285
    return
351 286

  
352
    for job_id in archive_jobs:
353
      client.ArchiveJob(job_id)
287
  logging.debug("Will activate disks for instance(s) %s",
288
                utils.CommaJoin(offline_disk_instances))
354 289

  
355
    if not offline_disk_instances:
356
      # nothing to do
357
      logging.debug("verify-disks reported no offline disks, nothing to do")
358
      return
290
  # We submit only one job, and wait for it. Not optimal, but this puts less
291
  # load on the job queue.
292
  job = []
293
  for name in offline_disk_instances:
294
    try:
295
      inst = instances[name]
296
    except KeyError:
297
      logging.info("Can't find instance '%s', maybe it was ignored", name)
298
      continue
359 299

  
360
    logging.debug("Will activate disks for instance(s) %s",
361
                  utils.CommaJoin(offline_disk_instances))
300
    if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
301
      logging.info("Skipping instance '%s' because it is in a helpless state or"
302
                   " has offline secondaries", name)
303
      continue
362 304

  
363
    # we submit only one job, and wait for it. not optimal, but spams
364
    # less the job queue
365
    job = []
366
    for name in offline_disk_instances:
367
      instance = self.instances[name]
368
      if (instance.status in HELPLESS_STATES or
369
          self._CheckForOfflineNodes(instance)):
370
        logging.info("Skip instance %s because it is in helpless state or has"
371
                     " one offline secondary", name)
372
        continue
373
      job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
305
    job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
374 306

  
375
    if job:
376
      job_id = cli.SendJob(job, cl=client)
307
  if job:
308
    job_id = cli.SendJob(job, cl=cl)
377 309

  
378
      try:
379
        cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
380
      except Exception: # pylint: disable-msg=W0703
381
        logging.exception("Error while activating disks")
310
    try:
311
      cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
312
    except Exception: # pylint: disable-msg=W0703
313
      logging.exception("Error while activating disks")
382 314

  
383 315

  
384 316
def IsRapiResponding(hostname):
......
421 353
                        constants.RELEASE_VERSION)
422 354

  
423 355
  parser.add_option(cli.DEBUG_OPT)
356
  parser.add_option(cli.NODEGROUP_OPT)
424 357
  parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
425 358
                    help="Autoarchive jobs older than this age (default"
426 359
                          " 6 hours)")
427 360
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
428 361
                    action="store_true", help="Ignore cluster pause setting")
362
  parser.add_option("--wait-children", dest="wait_children", default=False,
363
                    action="store_true", help="Wait for child processes")
429 364
  options, args = parser.parse_args()
430 365
  options.job_age = cli.ParseTimespec(options.job_age)
431 366

  
......
454 389
                                         for (name, status) in result))
455 390

  
456 391

  
392
def GetLuxiClient(try_restart):
393
  """Tries to connect to the master daemon.
394

  
395
  @type try_restart: bool
396
  @param try_restart: Whether to attempt to restart the master daemon
397

  
398
  """
399
  try:
400
    return cli.GetClient()
401
  except errors.OpPrereqError, err:
402
    # this is, from cli.GetClient, a not-master case
403
    raise NotMasterError("Not on master node (%s)" % err)
404

  
405
  except luxi.NoMasterError, err:
406
    if not try_restart:
407
      raise
408

  
409
    logging.warning("Master daemon seems to be down (%s), trying to restart",
410
                    err)
411

  
412
    if not utils.EnsureDaemon(constants.MASTERD):
413
      raise errors.GenericError("Can't start the master daemon")
414

  
415
    # Retry the connection
416
    return cli.GetClient()
417

  
418

  
419
def _StartGroupChildren(cl, wait):
420
  """Starts a new instance of the watcher for every node group.
421

  
422
  """
423
  assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
424
                        for arg in sys.argv)
425

  
426
  result = cl.QueryGroups([], ["name", "uuid"], False)
427

  
428
  children = []
429

  
430
  for (idx, (name, uuid)) in enumerate(result):
431
    args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
432

  
433
    if idx > 0:
434
      # Let's not kill the system
435
      time.sleep(CHILD_PROCESS_DELAY)
436

  
437
    logging.debug("Spawning child for group '%s' (%s), arguments %s",
438
                  name, uuid, args)
439

  
440
    try:
441
      # TODO: Should utils.StartDaemon be used instead?
442
      pid = os.spawnv(os.P_NOWAIT, args[0], args)
443
    except Exception: # pylint: disable-msg=W0703
444
      logging.exception("Failed to start child for group '%s' (%s)",
445
                        name, uuid)
446
    else:
447
      logging.debug("Started with PID %s", pid)
448
      children.append(pid)
449

  
450
  if wait:
451
    for pid in children:
452
      logging.debug("Waiting for child PID %s", pid)
453
      try:
454
        result = utils.RetryOnSignal(os.waitpid, pid, 0)
455
      except EnvironmentError, err:
456
        result = str(err)
457

  
458
      logging.debug("Child PID %s exited with status %s", pid, result)
459

  
460

  
461
def _ArchiveJobs(cl, age):
462
  """Archives old jobs.
463

  
464
  """
465
  (arch_count, left_count) = cl.AutoArchiveJobs(age)
466
  logging.debug("Archived %s jobs, left %s", arch_count, left_count)
467

  
468

  
469
def _CheckMaster(cl):
470
  """Ensures current host is master node.
471

  
472
  """
473
  (master, ) = cl.QueryConfigValues(["master_node"])
474
  if master != netutils.Hostname.GetSysName():
475
    raise NotMasterError("This is not the master node")
476

  
477

  
457 478
@rapi.client.UsesRapiClient
479
def _GlobalWatcher(opts):
480
  """Main function for global watcher.
481

  
482
  At the end child processes are spawned for every node group.
483

  
484
  """
485
  StartNodeDaemons()
486
  RunWatcherHooks()
487

  
488
  # Run node maintenance in all cases, even if master, so that old masters can
489
  # be properly cleaned up
490
  if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
491
    nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
492

  
493
  try:
494
    client = GetLuxiClient(True)
495
  except NotMasterError:
496
    # Don't proceed on non-master nodes
497
    return constants.EXIT_SUCCESS
498

  
499
  # we are on master now
500
  utils.EnsureDaemon(constants.RAPI)
501

  
502
  # If RAPI isn't responding to queries, try one restart
503
  logging.debug("Attempting to talk to remote API on %s",
504
                constants.IP4_ADDRESS_LOCALHOST)
505
  if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
506
    logging.warning("Couldn't get answer from remote API, restaring daemon")
507
    utils.StopDaemon(constants.RAPI)
508
    utils.EnsureDaemon(constants.RAPI)
509
    logging.debug("Second attempt to talk to remote API")
510
    if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
511
      logging.fatal("RAPI is not responding")
512
  logging.debug("Successfully talked to remote API")
513

  
514
  _CheckMaster(client)
515
  _ArchiveJobs(client, opts.job_age)
516
  _UpdateInstanceStatus(client, constants.INSTANCE_STATUS_FILE)
517

  
518
  # Spawn child processes for all node groups
519
  _StartGroupChildren(client, opts.wait_children)
520

  
521
  return constants.EXIT_SUCCESS
522

  
523

  
524
def _GetGroupData(cl, uuid):
525
  """Retrieves instances and nodes per node group.
526

  
527
  """
528
  # TODO: Implement locking
529
  job = [
530
    # Get all primary instances in group
531
    opcodes.OpQuery(what=constants.QR_INSTANCE,
532
                    fields=["name", "status", "admin_state", "snodes",
533
                            "pnode.group.uuid", "snodes.group.uuid"],
534
                    filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid]),
535

  
536
    # Get all nodes in group
537
    opcodes.OpQuery(what=constants.QR_NODE,
538
                    fields=["name", "bootid", "offline"],
539
                    filter=[qlang.OP_EQUAL, "group.uuid", uuid]),
540
    ]
541

  
542
  job_id = cl.SubmitJob(job)
543
  results = map(objects.QueryResponse.FromDict,
544
                cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
545
  cl.ArchiveJob(job_id)
546

  
547
  results_data = map(operator.attrgetter("data"), results)
548

  
549
  # Ensure results are tuples with two values
550
  assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
551

  
552
  # Extract values ignoring result status
553
  (raw_instances, raw_nodes) = [[map(compat.snd, values)
554
                                 for values in res]
555
                                for res in results_data]
556

  
557
  secondaries = {}
558
  instances = []
559

  
560
  # Load all instances
561
  for (name, status, autostart, snodes, pnode_group_uuid,
562
       snodes_group_uuid) in raw_instances:
563
    if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
564
      logging.error("Ignoring split instance '%s', primary group %s, secondary"
565
                    " groups %s", name, pnode_group_uuid,
566
                    utils.CommaJoin(snodes_group_uuid))
567
    else:
568
      instances.append(Instance(name, status, autostart, snodes))
569

  
570
      for node in snodes:
571
        secondaries.setdefault(node, set()).add(name)
572

  
573
  # Load all nodes
574
  nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
575
           for (name, bootid, offline) in raw_nodes]
576

  
577
  return (dict((node.name, node) for node in nodes),
578
          dict((inst.name, inst) for inst in instances))
579

  
580

  
581
def _KnownGroup(uuid):
582
  """Checks if a group UUID is known by ssconf.
583

  
584
  """
585
  groups = ssconf.SimpleStore().GetNodegroupList()
586

  
587
  return compat.any(line.strip() and line.split()[0] == uuid
588
                    for line in groups)
589

  
590

  
591
def _GroupWatcher(opts):
592
  """Main function for per-group watcher process.
593

  
594
  """
595
  group_uuid = opts.nodegroup.lower()
596

  
597
  if not utils.UUID_RE.match(group_uuid):
598
    raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
599
                              " got '%s'" %
600
                              (cli.NODEGROUP_OPT_NAME, group_uuid))
601

  
602
  logging.info("Watcher for node group '%s'", group_uuid)
603

  
604
  # Check if node group is known
605
  if not _KnownGroup(group_uuid):
606
    raise errors.GenericError("Node group '%s' is not known by ssconf" %
607
                              group_uuid)
608

  
609
  state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
610

  
611
  logging.debug("Using state file %s", state_path)
612

  
613
  # Global watcher
614
  statefile = state.OpenStateFile(state_path) # pylint: disable-msg=E0602
615
  if not statefile:
616
    return constants.EXIT_FAILURE
617

  
618
  notepad = state.WatcherState(statefile) # pylint: disable-msg=E0602
619
  try:
620
    # Connect to master daemon
621
    client = GetLuxiClient(False)
622

  
623
    _CheckMaster(client)
624

  
625
    (nodes, instances) = _GetGroupData(client, group_uuid)
626

  
627
    started = _CheckInstances(client, notepad, instances)
628
    _CheckDisks(client, notepad, nodes, instances, started)
629
    _VerifyDisks(client, group_uuid, nodes, instances)
630
  except Exception, err:
631
    logging.info("Not updating status file due to failure: %s", err)
632
    raise
633
  else:
634
    # Save changes for next run
635
    notepad.Save(state_path)
636

  
637
  return constants.EXIT_SUCCESS
638

  
639

  
458 640
def Main():
459 641
  """Main function.
460 642

  
461 643
  """
462
  global client # pylint: disable-msg=W0603
463

  
464 644
  (options, _) = ParseOptions()
465 645

  
466 646
  utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
......
470 650
    logging.debug("Pause has been set, exiting")
471 651
    return constants.EXIT_SUCCESS
472 652

  
473
  statefile = \
474
    state.OpenStateFile(constants.WATCHER_STATEFILE)
475
  if not statefile:
476
    return constants.EXIT_FAILURE
477

  
478
  update_file = False
653
  # Try to acquire global watcher lock in shared mode
654
  lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
479 655
  try:
480
    StartNodeDaemons()
481
    RunWatcherHooks()
482
    # run node maintenance in all cases, even if master, so that old
483
    # masters can be properly cleaned up too
484
    if nodemaint.NodeMaintenance.ShouldRun():
485
      nodemaint.NodeMaintenance().Exec()
486

  
487
    notepad = state.WatcherState(statefile)
488
    try:
489
      try:
490
        client = cli.GetClient()
491
      except errors.OpPrereqError:
492
        # this is, from cli.GetClient, a not-master case
493
        logging.debug("Not on master, exiting")
494
        update_file = True
495
        return constants.EXIT_SUCCESS
496
      except luxi.NoMasterError, err:
497
        logging.warning("Master seems to be down (%s), trying to restart",
498
                        str(err))
499
        if not utils.EnsureDaemon(constants.MASTERD):
500
          logging.critical("Can't start the master, exiting")
501
          return constants.EXIT_FAILURE
502
        # else retry the connection
503
        client = cli.GetClient()
504

  
505
      # we are on master now
506
      utils.EnsureDaemon(constants.RAPI)
507

  
508
      # If RAPI isn't responding to queries, try one restart.
509
      logging.debug("Attempting to talk with RAPI.")
510
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
511
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
512
                        " Restarting Ganeti RAPI.")
513
        utils.StopDaemon(constants.RAPI)
514
        utils.EnsureDaemon(constants.RAPI)
515
        logging.debug("Second attempt to talk with RAPI")
516
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
517
          logging.fatal("RAPI is not responding. Please investigate.")
518
      logging.debug("Successfully talked to RAPI.")
656
    lock.Shared(blocking=False)
657
  except (EnvironmentError, errors.LockError), err:
658
    logging.error("Can't acquire lock on %s: %s",
659
                  constants.WATCHER_LOCK_FILE, err)
660
    return constants.EXIT_SUCCESS
519 661

  
520
      try:
521
        watcher = Watcher(options, notepad)
522
      except errors.ConfigurationError:
523
        # Just exit if there's no configuration
524
        update_file = True
525
        return constants.EXIT_SUCCESS
526

  
527
      watcher.Run()
528
      update_file = True
529

  
530
    finally:
531
      if update_file:
532
        notepad.Save(constants.WATCHER_STATEFILE)
533
      else:
534
        logging.debug("Not updating status file due to failure")
535
  except SystemExit:
662
  if options.nodegroup is None:
663
    fn = _GlobalWatcher
664
  else:
665
    # Per-nodegroup watcher
666
    fn = _GroupWatcher
667

  
668
  try:
669
    return fn(options)
670
  except (SystemExit, KeyboardInterrupt):
536 671
    raise
537 672
  except NotMasterError:
538 673
    logging.debug("Not master, exiting")

Also available in: Unified diff