Revision 9f4bb951

b/.gitignore
36 36
/daemons/daemon-util
37 37
/daemons/ensure-dirs
38 38
/daemons/ganeti-cleaner
39
/daemons/ganeti-watcher
39 40

  
40 41
# devel
41 42
/devel/clean-cluster
b/Makefile.am
23 23
masterddir = $(pkgpythondir)/masterd
24 24
confddir = $(pkgpythondir)/confd
25 25
rapidir = $(pkgpythondir)/rapi
26
watcherdir = $(pkgpythondir)/watcher
26 27
impexpddir = $(pkgpythondir)/impexpd
27 28
toolsdir = $(pkglibdir)/tools
28 29
docdir = $(datadir)/doc/$(PACKAGE)
......
47 48
	lib/impexpd \
48 49
	lib/masterd \
49 50
	lib/rapi \
51
	lib/watcher \
50 52
	man \
51 53
	qa \
52 54
	test \
......
188 190
impexpd_PYTHON = \
189 191
	lib/impexpd/__init__.py
190 192

  
193
watcher_PYTHON = \
194
	lib/watcher/__init__.py
195

  
191 196
docrst = \
192 197
	doc/admin.rst \
193 198
	doc/design-2.0.rst \
......
270 275
	scripts/gnt-os
271 276

  
272 277
PYTHON_BOOTSTRAP = \
278
	daemons/ganeti-watcher \
273 279
	scripts/gnt-backup \
274 280
	scripts/gnt-cluster \
275 281
	scripts/gnt-debug \
......
280 286

  
281 287
dist_sbin_SCRIPTS = \
282 288
	daemons/ganeti-noded \
283
	daemons/ganeti-watcher \
284 289
	daemons/ganeti-masterd \
285 290
	daemons/ganeti-confd \
286 291
	daemons/ganeti-rapi
......
468 473
	$(confd_PYTHON) \
469 474
	$(masterd_PYTHON) \
470 475
	$(impexpd_PYTHON) \
476
	$(watcher_PYTHON) \
471 477
	$(noinst_PYTHON)
472 478

  
473 479
srclink_files = \
......
634 640
	} > $@
635 641

  
636 642
# Using deferred evaluation
643
daemons/ganeti-watcher: MODULE = ganeti.watcher
637 644
scripts/%: MODULE = ganeti.client.$(subst -,_,$(notdir $@))
638 645

  
639 646
$(PYTHON_BOOTSTRAP): Makefile | $(all_dirfiles)
/dev/null
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Tool to restart erroneously downed virtual machines.
23

  
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

  
28
"""
29

  
30
# pylint: disable-msg=C0103,W0142
31

  
32
# C0103: Invalid name ganeti-watcher
33

  
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

  
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47
from ganeti import ssconf
48
from ganeti import bdev
49
from ganeti import hypervisor
50
from ganeti import rapi
51
from ganeti.confd import client as confd_client
52
from ganeti import netutils
53

  
54
import ganeti.rapi.client # pylint: disable-msg=W0611
55

  
56

  
57
MAXTRIES = 5
58
# Delete any record that is older than 8 hours; this value is based on
59
# the fact that the current retry counter is 5, and watcher runs every
60
# 5 minutes, so it takes around half an hour to exceed the retry
61
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
62
RETRY_EXPIRATION = 8 * 3600
63
BAD_STATES = ['ERROR_down']
64
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
65
NOTICE = 'NOTICE'
66
ERROR = 'ERROR'
67
KEY_RESTART_COUNT = "restart_count"
68
KEY_RESTART_WHEN = "restart_when"
69
KEY_BOOT_ID = "bootid"
70

  
71

  
72
# Global client object
73
client = None
74

  
75

  
76
class NotMasterError(errors.GenericError):
77
  """Exception raised when this host is not the master."""
78

  
79

  
80
def ShouldPause():
81
  """Check whether we should pause.
82

  
83
  """
84
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
85

  
86

  
87
def StartNodeDaemons():
88
  """Start all the daemons that should be running on all nodes.
89

  
90
  """
91
  # on master or not, try to start the node daemon
92
  utils.EnsureDaemon(constants.NODED)
93
  # start confd as well. On non candidates it will be in disabled mode.
94
  utils.EnsureDaemon(constants.CONFD)
95

  
96

  
97
def RunWatcherHooks():
98
  """Run the watcher hooks.
99

  
100
  """
101
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102
                             constants.HOOKS_NAME_WATCHER)
103
  if not os.path.isdir(hooks_dir):
104
    return
105

  
106
  try:
107
    results = utils.RunParts(hooks_dir)
108
  except Exception, msg: # pylint: disable-msg=W0703
109
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
110

  
111
  for (relname, status, runresult) in results:
112
    if status == constants.RUNPARTS_SKIP:
113
      logging.debug("Watcher hook %s: skipped", relname)
114
    elif status == constants.RUNPARTS_ERR:
115
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116
    elif status == constants.RUNPARTS_RUN:
117
      if runresult.failed:
118
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119
                        relname, runresult.exit_code, runresult.output)
120
      else:
121
        logging.debug("Watcher hook %s: success (output: %s)", relname,
122
                      runresult.output)
123

  
124

  
125
class NodeMaintenance(object):
126
  """Talks to confd daemons and possible shutdown instances/drbd devices.
127

  
128
  """
129
  def __init__(self):
130
    self.store_cb = confd_client.StoreResultCallback()
131
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
133

  
134
  @staticmethod
135
  def ShouldRun():
136
    """Checks whether node maintenance should run.
137

  
138
    """
139
    try:
140
      return ssconf.SimpleStore().GetMaintainNodeHealth()
141
    except errors.ConfigurationError, err:
142
      logging.error("Configuration error, not activating node maintenance: %s",
143
                    err)
144
      return False
145

  
146
  @staticmethod
147
  def GetRunningInstances():
148
    """Compute list of hypervisor/running instances.
149

  
150
    """
151
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
152
    results = []
153
    for hv_name in hyp_list:
154
      try:
155
        hv = hypervisor.GetHypervisor(hv_name)
156
        ilist = hv.ListInstances()
157
        results.extend([(iname, hv_name) for iname in ilist])
158
      except: # pylint: disable-msg=W0702
159
        logging.error("Error while listing instances for hypervisor %s",
160
                      hv_name, exc_info=True)
161
    return results
162

  
163
  @staticmethod
164
  def GetUsedDRBDs():
165
    """Get list of used DRBD minors.
166

  
167
    """
168
    return bdev.DRBD8.GetUsedDevs().keys()
169

  
170
  @classmethod
171
  def DoMaintenance(cls, role):
172
    """Maintain the instance list.
173

  
174
    """
175
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
176
      inst_running = cls.GetRunningInstances()
177
      cls.ShutdownInstances(inst_running)
178
      drbd_running = cls.GetUsedDRBDs()
179
      cls.ShutdownDRBD(drbd_running)
180
    else:
181
      logging.debug("Not doing anything for role %s", role)
182

  
183
  @staticmethod
184
  def ShutdownInstances(inst_running):
185
    """Shutdown running instances.
186

  
187
    """
188
    names_running = set([i[0] for i in inst_running])
189
    if names_running:
190
      logging.info("Following instances should not be running,"
191
                   " shutting them down: %s", utils.CommaJoin(names_running))
192
      # this dictionary will collapse duplicate instance names (only
193
      # xen pvm/vhm) into a single key, which is fine
194
      i2h = dict(inst_running)
195
      for name in names_running:
196
        hv_name = i2h[name]
197
        hv = hypervisor.GetHypervisor(hv_name)
198
        hv.StopInstance(None, force=True, name=name)
199

  
200
  @staticmethod
201
  def ShutdownDRBD(drbd_running):
202
    """Shutdown active DRBD devices.
203

  
204
    """
205
    if drbd_running:
206
      logging.info("Following DRBD minors should not be active,"
207
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
208
      for minor in drbd_running:
209
        # pylint: disable-msg=W0212
210
        # using the private method as is, pending enhancements to the DRBD
211
        # interface
212
        bdev.DRBD8._ShutdownAll(minor)
213

  
214
  def Exec(self):
215
    """Check node status versus cluster desired state.
216

  
217
    """
218
    my_name = netutils.Hostname.GetSysName()
219
    req = confd_client.ConfdClientRequest(type=
220
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
221
                                          query=my_name)
222
    self.confd_client.SendRequest(req, async=False, coverage=-1)
223
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
224
    if not timed_out:
225
      # should have a valid response
226
      status, result = self.store_cb.GetResponse(req.rsalt)
227
      assert status, "Missing result but received replies"
228
      if not self.filter_cb.consistent[req.rsalt]:
229
        logging.warning("Inconsistent replies, not doing anything")
230
        return
231
      self.DoMaintenance(result.server_reply.answer)
232
    else:
233
      logging.warning("Confd query timed out, cannot do maintenance actions")
234

  
235

  
236
class WatcherState(object):
237
  """Interface to a state file recording restart attempts.
238

  
239
  """
240
  def __init__(self, statefile):
241
    """Open, lock, read and parse the file.
242

  
243
    @type statefile: file
244
    @param statefile: State file object
245

  
246
    """
247
    self.statefile = statefile
248

  
249
    try:
250
      state_data = self.statefile.read()
251
      if not state_data:
252
        self._data = {}
253
      else:
254
        self._data = serializer.Load(state_data)
255
    except Exception, msg: # pylint: disable-msg=W0703
256
      # Ignore errors while loading the file and treat it as empty
257
      self._data = {}
258
      logging.warning(("Invalid state file. Using defaults."
259
                       " Error message: %s"), msg)
260

  
261
    if "instance" not in self._data:
262
      self._data["instance"] = {}
263
    if "node" not in self._data:
264
      self._data["node"] = {}
265

  
266
    self._orig_data = serializer.Dump(self._data)
267

  
268
  def Save(self):
269
    """Save state to file, then unlock and close it.
270

  
271
    """
272
    assert self.statefile
273

  
274
    serialized_form = serializer.Dump(self._data)
275
    if self._orig_data == serialized_form:
276
      logging.debug("Data didn't change, just touching status file")
277
      os.utime(constants.WATCHER_STATEFILE, None)
278
      return
279

  
280
    # We need to make sure the file is locked before renaming it, otherwise
281
    # starting ganeti-watcher again at the same time will create a conflict.
282
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283
                         data=serialized_form,
284
                         prewrite=utils.LockFile, close=False)
285
    self.statefile = os.fdopen(fd, 'w+')
286

  
287
  def Close(self):
288
    """Unlock configuration file and close it.
289

  
290
    """
291
    assert self.statefile
292

  
293
    # Files are automatically unlocked when closing them
294
    self.statefile.close()
295
    self.statefile = None
296

  
297
  def GetNodeBootID(self, name):
298
    """Returns the last boot ID of a node or None.
299

  
300
    """
301
    ndata = self._data["node"]
302

  
303
    if name in ndata and KEY_BOOT_ID in ndata[name]:
304
      return ndata[name][KEY_BOOT_ID]
305
    return None
306

  
307
  def SetNodeBootID(self, name, bootid):
308
    """Sets the boot ID of a node.
309

  
310
    """
311
    assert bootid
312

  
313
    ndata = self._data["node"]
314

  
315
    if name not in ndata:
316
      ndata[name] = {}
317

  
318
    ndata[name][KEY_BOOT_ID] = bootid
319

  
320
  def NumberOfRestartAttempts(self, instance):
321
    """Returns number of previous restart attempts.
322

  
323
    @type instance: L{Instance}
324
    @param instance: the instance to look up
325

  
326
    """
327
    idata = self._data["instance"]
328

  
329
    if instance.name in idata:
330
      return idata[instance.name][KEY_RESTART_COUNT]
331

  
332
    return 0
333

  
334
  def MaintainInstanceList(self, instances):
335
    """Perform maintenance on the recorded instances.
336

  
337
    @type instances: list of string
338
    @param instances: the list of currently existing instances
339

  
340
    """
341
    idict = self._data["instance"]
342
    # First, delete obsolete instances
343
    obsolete_instances = set(idict).difference(instances)
344
    for inst in obsolete_instances:
345
      logging.debug("Forgetting obsolete instance %s", inst)
346
      del idict[inst]
347

  
348
    # Second, delete expired records
349
    earliest = time.time() - RETRY_EXPIRATION
350
    expired_instances = [i for i in idict
351
                         if idict[i][KEY_RESTART_WHEN] < earliest]
352
    for inst in expired_instances:
353
      logging.debug("Expiring record for instance %s", inst)
354
      del idict[inst]
355

  
356
  def RecordRestartAttempt(self, instance):
357
    """Record a restart attempt.
358

  
359
    @type instance: L{Instance}
360
    @param instance: the instance being restarted
361

  
362
    """
363
    idata = self._data["instance"]
364

  
365
    if instance.name not in idata:
366
      inst = idata[instance.name] = {}
367
    else:
368
      inst = idata[instance.name]
369

  
370
    inst[KEY_RESTART_WHEN] = time.time()
371
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
372

  
373
  def RemoveInstance(self, instance):
374
    """Update state to reflect that a machine is running.
375

  
376
    This method removes the record for a named instance (as we only
377
    track down instances).
378

  
379
    @type instance: L{Instance}
380
    @param instance: the instance to remove from books
381

  
382
    """
383
    idata = self._data["instance"]
384

  
385
    if instance.name in idata:
386
      del idata[instance.name]
387

  
388

  
389
class Instance(object):
390
  """Abstraction for a Virtual Machine instance.
391

  
392
  """
393
  def __init__(self, name, state, autostart):
394
    self.name = name
395
    self.state = state
396
    self.autostart = autostart
397

  
398
  def Restart(self):
399
    """Encapsulates the start of an instance.
400

  
401
    """
402
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
403
    cli.SubmitOpCode(op, cl=client)
404

  
405
  def ActivateDisks(self):
406
    """Encapsulates the activation of all disks of an instance.
407

  
408
    """
409
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
410
    cli.SubmitOpCode(op, cl=client)
411

  
412

  
413
def GetClusterData():
414
  """Get a list of instances on this cluster.
415

  
416
  """
417
  op1_fields = ["name", "status", "admin_state", "snodes"]
418
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
419
                                 use_locking=True)
420
  op2_fields = ["name", "bootid", "offline"]
421
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
422
                             use_locking=True)
423

  
424
  job_id = client.SubmitJob([op1, op2])
425

  
426
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427

  
428
  logging.debug("Got data from cluster, writing instance status file")
429

  
430
  result = all_results[0]
431
  smap = {}
432

  
433
  instances = {}
434

  
435
  # write the upfile
436
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
438

  
439
  for fields in result:
440
    (name, status, autostart, snodes) = fields
441

  
442
    # update the secondary node map
443
    for node in snodes:
444
      if node not in smap:
445
        smap[node] = []
446
      smap[node].append(name)
447

  
448
    instances[name] = Instance(name, status, autostart)
449

  
450
  nodes =  dict([(name, (bootid, offline))
451
                 for name, bootid, offline in all_results[1]])
452

  
453
  client.ArchiveJob(job_id)
454

  
455
  return instances, nodes, smap
456

  
457

  
458
class Watcher(object):
459
  """Encapsulate the logic for restarting erroneously halted virtual machines.
460

  
461
  The calling program should periodically instantiate me and call Run().
462
  This will traverse the list of instances, and make up to MAXTRIES attempts
463
  to restart machines that are down.
464

  
465
  """
466
  def __init__(self, opts, notepad):
467
    self.notepad = notepad
468
    master = client.QueryConfigValues(["master_node"])[0]
469
    if master != netutils.Hostname.GetSysName():
470
      raise NotMasterError("This is not the master node")
471
    # first archive old jobs
472
    self.ArchiveJobs(opts.job_age)
473
    # and only then submit new ones
474
    self.instances, self.bootids, self.smap = GetClusterData()
475
    self.started_instances = set()
476
    self.opts = opts
477

  
478
  def Run(self):
479
    """Watcher run sequence.
480

  
481
    """
482
    notepad = self.notepad
483
    self.CheckInstances(notepad)
484
    self.CheckDisks(notepad)
485
    self.VerifyDisks()
486

  
487
  @staticmethod
488
  def ArchiveJobs(age):
489
    """Archive old jobs.
490

  
491
    """
492
    arch_count, left_count = client.AutoArchiveJobs(age)
493
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
494

  
495
  def CheckDisks(self, notepad):
496
    """Check all nodes for restarted ones.
497

  
498
    """
499
    check_nodes = []
500
    for name, (new_id, offline) in self.bootids.iteritems():
501
      old = notepad.GetNodeBootID(name)
502
      if new_id is None:
503
        # Bad node, not returning a boot id
504
        if not offline:
505
          logging.debug("Node %s missing boot id, skipping secondary checks",
506
                        name)
507
        continue
508
      if old != new_id:
509
        # Node's boot ID has changed, proably through a reboot.
510
        check_nodes.append(name)
511

  
512
    if check_nodes:
513
      # Activate disks for all instances with any of the checked nodes as a
514
      # secondary node.
515
      for node in check_nodes:
516
        if node not in self.smap:
517
          continue
518
        for instance_name in self.smap[node]:
519
          instance = self.instances[instance_name]
520
          if not instance.autostart:
521
            logging.info(("Skipping disk activation for non-autostart"
522
                          " instance %s"), instance.name)
523
            continue
524
          if instance.name in self.started_instances:
525
            # we already tried to start the instance, which should have
526
            # activated its drives (if they can be at all)
527
            continue
528
          try:
529
            logging.info("Activating disks for instance %s", instance.name)
530
            instance.ActivateDisks()
531
          except Exception: # pylint: disable-msg=W0703
532
            logging.exception("Error while activating disks for instance %s",
533
                              instance.name)
534

  
535
      # Keep changed boot IDs
536
      for name in check_nodes:
537
        notepad.SetNodeBootID(name, self.bootids[name][0])
538

  
539
  def CheckInstances(self, notepad):
540
    """Make a pass over the list of instances, restarting downed ones.
541

  
542
    """
543
    notepad.MaintainInstanceList(self.instances.keys())
544

  
545
    for instance in self.instances.values():
546
      if instance.state in BAD_STATES:
547
        n = notepad.NumberOfRestartAttempts(instance)
548

  
549
        if n > MAXTRIES:
550
          logging.warning("Not restarting instance %s, retries exhausted",
551
                          instance.name)
552
          continue
553
        elif n < MAXTRIES:
554
          last = " (Attempt #%d)" % (n + 1)
555
        else:
556
          notepad.RecordRestartAttempt(instance)
557
          logging.error("Could not restart %s after %d attempts, giving up",
558
                        instance.name, MAXTRIES)
559
          continue
560
        try:
561
          logging.info("Restarting %s%s",
562
                        instance.name, last)
563
          instance.Restart()
564
          self.started_instances.add(instance.name)
565
        except Exception: # pylint: disable-msg=W0703
566
          logging.exception("Error while restarting instance %s",
567
                            instance.name)
568

  
569
        notepad.RecordRestartAttempt(instance)
570
      elif instance.state in HELPLESS_STATES:
571
        if notepad.NumberOfRestartAttempts(instance):
572
          notepad.RemoveInstance(instance)
573
      else:
574
        if notepad.NumberOfRestartAttempts(instance):
575
          notepad.RemoveInstance(instance)
576
          logging.info("Restart of %s succeeded", instance.name)
577

  
578
  @staticmethod
579
  def VerifyDisks():
580
    """Run gnt-cluster verify-disks.
581

  
582
    """
583
    op = opcodes.OpVerifyDisks()
584
    job_id = client.SubmitJob([op])
585
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586
    client.ArchiveJob(job_id)
587
    if not isinstance(result, (tuple, list)):
588
      logging.error("Can't get a valid result from verify-disks")
589
      return
590
    offline_disk_instances = result[2]
591
    if not offline_disk_instances:
592
      # nothing to do
593
      return
594
    logging.debug("Will activate disks for instances %s",
595
                  utils.CommaJoin(offline_disk_instances))
596
    # we submit only one job, and wait for it. not optimal, but spams
597
    # less the job queue
598
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
599
           for name in offline_disk_instances]
600
    job_id = cli.SendJob(job, cl=client)
601

  
602
    try:
603
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604
    except Exception: # pylint: disable-msg=W0703
605
      logging.exception("Error while activating disks")
606

  
607

  
608
def OpenStateFile(path):
609
  """Opens the state file and acquires a lock on it.
610

  
611
  @type path: string
612
  @param path: Path to state file
613

  
614
  """
615
  # The two-step dance below is necessary to allow both opening existing
616
  # file read/write and creating if not existing. Vanilla open will truncate
617
  # an existing file -or- allow creating if not existing.
618
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
619

  
620
  # Try to acquire lock on state file. If this fails, another watcher instance
621
  # might already be running or another program is temporarily blocking the
622
  # watcher from running.
623
  try:
624
    utils.LockFile(statefile_fd)
625
  except errors.LockError, err:
626
    logging.error("Can't acquire lock on state file %s: %s", path, err)
627
    return None
628

  
629
  return os.fdopen(statefile_fd, "w+")
630

  
631

  
632
def IsRapiResponding(hostname):
633
  """Connects to RAPI port and does a simple test.
634

  
635
  Connects to RAPI port of hostname and does a simple test. At this time, the
636
  test is GetVersion.
637

  
638
  @type hostname: string
639
  @param hostname: hostname of the node to connect to.
640
  @rtype: bool
641
  @return: Whether RAPI is working properly
642

  
643
  """
644
  curl_config = rapi.client.GenericCurlConfig()
645
  rapi_client = rapi.client.GanetiRapiClient(hostname,
646
                                             curl_config_fn=curl_config)
647
  try:
648
    master_version = rapi_client.GetVersion()
649
  except rapi.client.CertificateError, err:
650
    logging.warning("RAPI Error: CertificateError (%s)", err)
651
    return False
652
  except rapi.client.GanetiApiError, err:
653
    logging.warning("RAPI Error: GanetiApiError (%s)", err)
654
    return False
655
  logging.debug("RAPI Result: master_version is %s", master_version)
656
  return master_version == constants.RAPI_VERSION
657

  
658

  
659
def ParseOptions():
660
  """Parse the command line options.
661

  
662
  @return: (options, args) as from OptionParser.parse_args()
663

  
664
  """
665
  parser = OptionParser(description="Ganeti cluster watcher",
666
                        usage="%prog [-d]",
667
                        version="%%prog (ganeti) %s" %
668
                        constants.RELEASE_VERSION)
669

  
670
  parser.add_option(cli.DEBUG_OPT)
671
  parser.add_option("-A", "--job-age", dest="job_age",
672
                    help="Autoarchive jobs older than this age (default"
673
                    " 6 hours)", default=6*3600)
674
  parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
675
                    action="store_true", help="Ignore cluster pause setting")
676
  options, args = parser.parse_args()
677
  options.job_age = cli.ParseTimespec(options.job_age)
678
  return options, args
679

  
680

  
681
@rapi.client.UsesRapiClient
682
def main():
683
  """Main function.
684

  
685
  """
686
  global client # pylint: disable-msg=W0603
687

  
688
  options, args = ParseOptions()
689

  
690
  if args: # watcher doesn't take any arguments
691
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
692
    sys.exit(constants.EXIT_FAILURE)
693

  
694
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
695
                     stderr_logging=options.debug)
696

  
697
  if ShouldPause() and not options.ignore_pause:
698
    logging.debug("Pause has been set, exiting")
699
    sys.exit(constants.EXIT_SUCCESS)
700

  
701
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
702
  if not statefile:
703
    sys.exit(constants.EXIT_FAILURE)
704

  
705
  update_file = False
706
  try:
707
    StartNodeDaemons()
708
    RunWatcherHooks()
709
    # run node maintenance in all cases, even if master, so that old
710
    # masters can be properly cleaned up too
711
    if NodeMaintenance.ShouldRun():
712
      NodeMaintenance().Exec()
713

  
714
    notepad = WatcherState(statefile)
715
    try:
716
      try:
717
        client = cli.GetClient()
718
      except errors.OpPrereqError:
719
        # this is, from cli.GetClient, a not-master case
720
        logging.debug("Not on master, exiting")
721
        update_file = True
722
        sys.exit(constants.EXIT_SUCCESS)
723
      except luxi.NoMasterError, err:
724
        logging.warning("Master seems to be down (%s), trying to restart",
725
                        str(err))
726
        if not utils.EnsureDaemon(constants.MASTERD):
727
          logging.critical("Can't start the master, exiting")
728
          sys.exit(constants.EXIT_FAILURE)
729
        # else retry the connection
730
        client = cli.GetClient()
731

  
732
      # we are on master now
733
      utils.EnsureDaemon(constants.RAPI)
734

  
735
      # If RAPI isn't responding to queries, try one restart.
736
      logging.debug("Attempting to talk with RAPI.")
737
      if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
738
        logging.warning("Couldn't get answer from Ganeti RAPI daemon."
739
                        " Restarting Ganeti RAPI.")
740
        utils.StopDaemon(constants.RAPI)
741
        utils.EnsureDaemon(constants.RAPI)
742
        logging.debug("Second attempt to talk with RAPI")
743
        if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
744
          logging.fatal("RAPI is not responding. Please investigate.")
745
      logging.debug("Successfully talked to RAPI.")
746

  
747
      try:
748
        watcher = Watcher(options, notepad)
749
      except errors.ConfigurationError:
750
        # Just exit if there's no configuration
751
        update_file = True
752
        sys.exit(constants.EXIT_SUCCESS)
753

  
754
      watcher.Run()
755
      update_file = True
756

  
757
    finally:
758
      if update_file:
759
        notepad.Save()
760
      else:
761
        logging.debug("Not updating status file due to failure")
762
  except SystemExit:
763
    raise
764
  except NotMasterError:
765
    logging.debug("Not master, exiting")
766
    sys.exit(constants.EXIT_NOTMASTER)
767
  except errors.ResolverError, err:
768
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
769
    sys.exit(constants.EXIT_NODESETUP_ERROR)
770
  except errors.JobQueueFull:
771
    logging.error("Job queue is full, can't query cluster state")
772
  except errors.JobQueueDrainError:
773
    logging.error("Job queue is drained, can't maintain cluster state")
774
  except Exception, err:
775
    logging.exception(str(err))
776
    sys.exit(constants.EXIT_FAILURE)
777

  
778

  
779
if __name__ == '__main__':
780
  main()
b/lib/watcher/__init__.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Tool to restart erroneously downed virtual machines.
23

  
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

  
28
"""
29

  
30
# pylint: disable-msg=C0103,W0142
31

  
32
# C0103: Invalid name ganeti-watcher
33

  
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

  
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47
from ganeti import ssconf
48
from ganeti import bdev
49
from ganeti import hypervisor
50
from ganeti import rapi
51
from ganeti.confd import client as confd_client
52
from ganeti import netutils
53

  
54
import ganeti.rapi.client # pylint: disable-msg=W0611
55

  
56

  
57
MAXTRIES = 5
58
# Delete any record that is older than 8 hours; this value is based on
59
# the fact that the current retry counter is 5, and watcher runs every
60
# 5 minutes, so it takes around half an hour to exceed the retry
61
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
62
RETRY_EXPIRATION = 8 * 3600
63
BAD_STATES = ['ERROR_down']
64
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
65
NOTICE = 'NOTICE'
66
ERROR = 'ERROR'
67
KEY_RESTART_COUNT = "restart_count"
68
KEY_RESTART_WHEN = "restart_when"
69
KEY_BOOT_ID = "bootid"
70

  
71

  
72
# Global client object
73
client = None
74

  
75

  
76
class NotMasterError(errors.GenericError):
77
  """Exception raised when this host is not the master."""
78

  
79

  
80
def ShouldPause():
81
  """Check whether we should pause.
82

  
83
  """
84
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
85

  
86

  
87
def StartNodeDaemons():
88
  """Start all the daemons that should be running on all nodes.
89

  
90
  """
91
  # on master or not, try to start the node daemon
92
  utils.EnsureDaemon(constants.NODED)
93
  # start confd as well. On non candidates it will be in disabled mode.
94
  utils.EnsureDaemon(constants.CONFD)
95

  
96

  
97
def RunWatcherHooks():
98
  """Run the watcher hooks.
99

  
100
  """
101
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102
                             constants.HOOKS_NAME_WATCHER)
103
  if not os.path.isdir(hooks_dir):
104
    return
105

  
106
  try:
107
    results = utils.RunParts(hooks_dir)
108
  except Exception, msg: # pylint: disable-msg=W0703
109
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
110

  
111
  for (relname, status, runresult) in results:
112
    if status == constants.RUNPARTS_SKIP:
113
      logging.debug("Watcher hook %s: skipped", relname)
114
    elif status == constants.RUNPARTS_ERR:
115
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116
    elif status == constants.RUNPARTS_RUN:
117
      if runresult.failed:
118
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119
                        relname, runresult.exit_code, runresult.output)
120
      else:
121
        logging.debug("Watcher hook %s: success (output: %s)", relname,
122
                      runresult.output)
123

  
124

  
125
class NodeMaintenance(object):
126
  """Talks to confd daemons and possible shutdown instances/drbd devices.
127

  
128
  """
129
  def __init__(self):
130
    self.store_cb = confd_client.StoreResultCallback()
131
    self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132
    self.confd_client = confd_client.GetConfdClient(self.filter_cb)
133

  
134
  @staticmethod
135
  def ShouldRun():
136
    """Checks whether node maintenance should run.
137

  
138
    """
139
    try:
140
      return ssconf.SimpleStore().GetMaintainNodeHealth()
141
    except errors.ConfigurationError, err:
142
      logging.error("Configuration error, not activating node maintenance: %s",
143
                    err)
144
      return False
145

  
146
  @staticmethod
147
  def GetRunningInstances():
148
    """Compute list of hypervisor/running instances.
149

  
150
    """
151
    hyp_list = ssconf.SimpleStore().GetHypervisorList()
152
    results = []
153
    for hv_name in hyp_list:
154
      try:
155
        hv = hypervisor.GetHypervisor(hv_name)
156
        ilist = hv.ListInstances()
157
        results.extend([(iname, hv_name) for iname in ilist])
158
      except: # pylint: disable-msg=W0702
159
        logging.error("Error while listing instances for hypervisor %s",
160
                      hv_name, exc_info=True)
161
    return results
162

  
163
  @staticmethod
164
  def GetUsedDRBDs():
165
    """Get list of used DRBD minors.
166

  
167
    """
168
    return bdev.DRBD8.GetUsedDevs().keys()
169

  
170
  @classmethod
171
  def DoMaintenance(cls, role):
172
    """Maintain the instance list.
173

  
174
    """
175
    if role == constants.CONFD_NODE_ROLE_OFFLINE:
176
      inst_running = cls.GetRunningInstances()
177
      cls.ShutdownInstances(inst_running)
178
      drbd_running = cls.GetUsedDRBDs()
179
      cls.ShutdownDRBD(drbd_running)
180
    else:
181
      logging.debug("Not doing anything for role %s", role)
182

  
183
  @staticmethod
184
  def ShutdownInstances(inst_running):
185
    """Shutdown running instances.
186

  
187
    """
188
    names_running = set([i[0] for i in inst_running])
189
    if names_running:
190
      logging.info("Following instances should not be running,"
191
                   " shutting them down: %s", utils.CommaJoin(names_running))
192
      # this dictionary will collapse duplicate instance names (only
193
      # xen pvm/vhm) into a single key, which is fine
194
      i2h = dict(inst_running)
195
      for name in names_running:
196
        hv_name = i2h[name]
197
        hv = hypervisor.GetHypervisor(hv_name)
198
        hv.StopInstance(None, force=True, name=name)
199

  
200
  @staticmethod
201
  def ShutdownDRBD(drbd_running):
202
    """Shutdown active DRBD devices.
203

  
204
    """
205
    if drbd_running:
206
      logging.info("Following DRBD minors should not be active,"
207
                   " shutting them down: %s", utils.CommaJoin(drbd_running))
208
      for minor in drbd_running:
209
        # pylint: disable-msg=W0212
210
        # using the private method as is, pending enhancements to the DRBD
211
        # interface
212
        bdev.DRBD8._ShutdownAll(minor)
213

  
214
  def Exec(self):
215
    """Check node status versus cluster desired state.
216

  
217
    """
218
    my_name = netutils.Hostname.GetSysName()
219
    req = confd_client.ConfdClientRequest(type=
220
                                          constants.CONFD_REQ_NODE_ROLE_BYNAME,
221
                                          query=my_name)
222
    self.confd_client.SendRequest(req, async=False, coverage=-1)
223
    timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
224
    if not timed_out:
225
      # should have a valid response
226
      status, result = self.store_cb.GetResponse(req.rsalt)
227
      assert status, "Missing result but received replies"
228
      if not self.filter_cb.consistent[req.rsalt]:
229
        logging.warning("Inconsistent replies, not doing anything")
230
        return
231
      self.DoMaintenance(result.server_reply.answer)
232
    else:
233
      logging.warning("Confd query timed out, cannot do maintenance actions")
234

  
235

  
236
class WatcherState(object):
237
  """Interface to a state file recording restart attempts.
238

  
239
  """
240
  def __init__(self, statefile):
241
    """Open, lock, read and parse the file.
242

  
243
    @type statefile: file
244
    @param statefile: State file object
245

  
246
    """
247
    self.statefile = statefile
248

  
249
    try:
250
      state_data = self.statefile.read()
251
      if not state_data:
252
        self._data = {}
253
      else:
254
        self._data = serializer.Load(state_data)
255
    except Exception, msg: # pylint: disable-msg=W0703
256
      # Ignore errors while loading the file and treat it as empty
257
      self._data = {}
258
      logging.warning(("Invalid state file. Using defaults."
259
                       " Error message: %s"), msg)
260

  
261
    if "instance" not in self._data:
262
      self._data["instance"] = {}
263
    if "node" not in self._data:
264
      self._data["node"] = {}
265

  
266
    self._orig_data = serializer.Dump(self._data)
267

  
268
  def Save(self):
269
    """Save state to file, then unlock and close it.
270

  
271
    """
272
    assert self.statefile
273

  
274
    serialized_form = serializer.Dump(self._data)
275
    if self._orig_data == serialized_form:
276
      logging.debug("Data didn't change, just touching status file")
277
      os.utime(constants.WATCHER_STATEFILE, None)
278
      return
279

  
280
    # We need to make sure the file is locked before renaming it, otherwise
281
    # starting ganeti-watcher again at the same time will create a conflict.
282
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283
                         data=serialized_form,
284
                         prewrite=utils.LockFile, close=False)
285
    self.statefile = os.fdopen(fd, 'w+')
286

  
287
  def Close(self):
288
    """Unlock configuration file and close it.
289

  
290
    """
291
    assert self.statefile
292

  
293
    # Files are automatically unlocked when closing them
294
    self.statefile.close()
295
    self.statefile = None
296

  
297
  def GetNodeBootID(self, name):
298
    """Returns the last boot ID of a node or None.
299

  
300
    """
301
    ndata = self._data["node"]
302

  
303
    if name in ndata and KEY_BOOT_ID in ndata[name]:
304
      return ndata[name][KEY_BOOT_ID]
305
    return None
306

  
307
  def SetNodeBootID(self, name, bootid):
308
    """Sets the boot ID of a node.
309

  
310
    """
311
    assert bootid
312

  
313
    ndata = self._data["node"]
314

  
315
    if name not in ndata:
316
      ndata[name] = {}
317

  
318
    ndata[name][KEY_BOOT_ID] = bootid
319

  
320
  def NumberOfRestartAttempts(self, instance):
321
    """Returns number of previous restart attempts.
322

  
323
    @type instance: L{Instance}
324
    @param instance: the instance to look up
325

  
326
    """
327
    idata = self._data["instance"]
328

  
329
    if instance.name in idata:
330
      return idata[instance.name][KEY_RESTART_COUNT]
331

  
332
    return 0
333

  
334
  def MaintainInstanceList(self, instances):
335
    """Perform maintenance on the recorded instances.
336

  
337
    @type instances: list of string
338
    @param instances: the list of currently existing instances
339

  
340
    """
341
    idict = self._data["instance"]
342
    # First, delete obsolete instances
343
    obsolete_instances = set(idict).difference(instances)
344
    for inst in obsolete_instances:
345
      logging.debug("Forgetting obsolete instance %s", inst)
346
      del idict[inst]
347

  
348
    # Second, delete expired records
349
    earliest = time.time() - RETRY_EXPIRATION
350
    expired_instances = [i for i in idict
351
                         if idict[i][KEY_RESTART_WHEN] < earliest]
352
    for inst in expired_instances:
353
      logging.debug("Expiring record for instance %s", inst)
354
      del idict[inst]
355

  
356
  def RecordRestartAttempt(self, instance):
357
    """Record a restart attempt.
358

  
359
    @type instance: L{Instance}
360
    @param instance: the instance being restarted
361

  
362
    """
363
    idata = self._data["instance"]
364

  
365
    if instance.name not in idata:
366
      inst = idata[instance.name] = {}
367
    else:
368
      inst = idata[instance.name]
369

  
370
    inst[KEY_RESTART_WHEN] = time.time()
371
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
372

  
373
  def RemoveInstance(self, instance):
374
    """Update state to reflect that a machine is running.
375

  
376
    This method removes the record for a named instance (as we only
377
    track down instances).
378

  
379
    @type instance: L{Instance}
380
    @param instance: the instance to remove from books
381

  
382
    """
383
    idata = self._data["instance"]
384

  
385
    if instance.name in idata:
386
      del idata[instance.name]
387

  
388

  
389
class Instance(object):
390
  """Abstraction for a Virtual Machine instance.
391

  
392
  """
393
  def __init__(self, name, state, autostart):
394
    self.name = name
395
    self.state = state
396
    self.autostart = autostart
397

  
398
  def Restart(self):
399
    """Encapsulates the start of an instance.
400

  
401
    """
402
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
403
    cli.SubmitOpCode(op, cl=client)
404

  
405
  def ActivateDisks(self):
406
    """Encapsulates the activation of all disks of an instance.
407

  
408
    """
409
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
410
    cli.SubmitOpCode(op, cl=client)
411

  
412

  
413
def GetClusterData():
414
  """Get a list of instances on this cluster.
415

  
416
  """
417
  op1_fields = ["name", "status", "admin_state", "snodes"]
418
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
419
                                 use_locking=True)
420
  op2_fields = ["name", "bootid", "offline"]
421
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
422
                             use_locking=True)
423

  
424
  job_id = client.SubmitJob([op1, op2])
425

  
426
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
427

  
428
  logging.debug("Got data from cluster, writing instance status file")
429

  
430
  result = all_results[0]
431
  smap = {}
432

  
433
  instances = {}
434

  
435
  # write the upfile
436
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
438

  
439
  for fields in result:
440
    (name, status, autostart, snodes) = fields
441

  
442
    # update the secondary node map
443
    for node in snodes:
444
      if node not in smap:
445
        smap[node] = []
446
      smap[node].append(name)
447

  
448
    instances[name] = Instance(name, status, autostart)
449

  
450
  nodes =  dict([(name, (bootid, offline))
451
                 for name, bootid, offline in all_results[1]])
452

  
453
  client.ArchiveJob(job_id)
454

  
455
  return instances, nodes, smap
456

  
457

  
458
class Watcher(object):
459
  """Encapsulate the logic for restarting erroneously halted virtual machines.
460

  
461
  The calling program should periodically instantiate me and call Run().
462
  This will traverse the list of instances, and make up to MAXTRIES attempts
463
  to restart machines that are down.
464

  
465
  """
466
  def __init__(self, opts, notepad):
467
    self.notepad = notepad
468
    master = client.QueryConfigValues(["master_node"])[0]
469
    if master != netutils.Hostname.GetSysName():
470
      raise NotMasterError("This is not the master node")
471
    # first archive old jobs
472
    self.ArchiveJobs(opts.job_age)
473
    # and only then submit new ones
474
    self.instances, self.bootids, self.smap = GetClusterData()
475
    self.started_instances = set()
476
    self.opts = opts
477

  
478
  def Run(self):
479
    """Watcher run sequence.
480

  
481
    """
482
    notepad = self.notepad
483
    self.CheckInstances(notepad)
484
    self.CheckDisks(notepad)
485
    self.VerifyDisks()
486

  
487
  @staticmethod
488
  def ArchiveJobs(age):
489
    """Archive old jobs.
490

  
491
    """
492
    arch_count, left_count = client.AutoArchiveJobs(age)
493
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
494

  
495
  def CheckDisks(self, notepad):
496
    """Check all nodes for restarted ones.
497

  
498
    """
499
    check_nodes = []
500
    for name, (new_id, offline) in self.bootids.iteritems():
501
      old = notepad.GetNodeBootID(name)
502
      if new_id is None:
503
        # Bad node, not returning a boot id
504
        if not offline:
505
          logging.debug("Node %s missing boot id, skipping secondary checks",
506
                        name)
507
        continue
508
      if old != new_id:
509
        # Node's boot ID has changed, proably through a reboot.
510
        check_nodes.append(name)
511

  
512
    if check_nodes:
513
      # Activate disks for all instances with any of the checked nodes as a
514
      # secondary node.
515
      for node in check_nodes:
516
        if node not in self.smap:
517
          continue
518
        for instance_name in self.smap[node]:
519
          instance = self.instances[instance_name]
520
          if not instance.autostart:
521
            logging.info(("Skipping disk activation for non-autostart"
522
                          " instance %s"), instance.name)
523
            continue
524
          if instance.name in self.started_instances:
525
            # we already tried to start the instance, which should have
526
            # activated its drives (if they can be at all)
527
            continue
528
          try:
529
            logging.info("Activating disks for instance %s", instance.name)
530
            instance.ActivateDisks()
531
          except Exception: # pylint: disable-msg=W0703
532
            logging.exception("Error while activating disks for instance %s",
533
                              instance.name)
534

  
535
      # Keep changed boot IDs
536
      for name in check_nodes:
537
        notepad.SetNodeBootID(name, self.bootids[name][0])
538

  
539
  def CheckInstances(self, notepad):
540
    """Make a pass over the list of instances, restarting downed ones.
541

  
542
    """
543
    notepad.MaintainInstanceList(self.instances.keys())
544

  
545
    for instance in self.instances.values():
546
      if instance.state in BAD_STATES:
547
        n = notepad.NumberOfRestartAttempts(instance)
548

  
549
        if n > MAXTRIES:
550
          logging.warning("Not restarting instance %s, retries exhausted",
551
                          instance.name)
552
          continue
553
        elif n < MAXTRIES:
554
          last = " (Attempt #%d)" % (n + 1)
555
        else:
556
          notepad.RecordRestartAttempt(instance)
557
          logging.error("Could not restart %s after %d attempts, giving up",
558
                        instance.name, MAXTRIES)
559
          continue
560
        try:
561
          logging.info("Restarting %s%s",
562
                        instance.name, last)
563
          instance.Restart()
564
          self.started_instances.add(instance.name)
565
        except Exception: # pylint: disable-msg=W0703
566
          logging.exception("Error while restarting instance %s",
567
                            instance.name)
568

  
569
        notepad.RecordRestartAttempt(instance)
570
      elif instance.state in HELPLESS_STATES:
571
        if notepad.NumberOfRestartAttempts(instance):
572
          notepad.RemoveInstance(instance)
573
      else:
574
        if notepad.NumberOfRestartAttempts(instance):
575
          notepad.RemoveInstance(instance)
576
          logging.info("Restart of %s succeeded", instance.name)
577

  
578
  @staticmethod
579
  def VerifyDisks():
580
    """Run gnt-cluster verify-disks.
581

  
582
    """
583
    op = opcodes.OpVerifyDisks()
584
    job_id = client.SubmitJob([op])
585
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586
    client.ArchiveJob(job_id)
587
    if not isinstance(result, (tuple, list)):
588
      logging.error("Can't get a valid result from verify-disks")
589
      return
590
    offline_disk_instances = result[2]
591
    if not offline_disk_instances:
592
      # nothing to do
593
      return
594
    logging.debug("Will activate disks for instances %s",
595
                  utils.CommaJoin(offline_disk_instances))
596
    # we submit only one job, and wait for it. not optimal, but spams
597
    # less the job queue
598
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
599
           for name in offline_disk_instances]
600
    job_id = cli.SendJob(job, cl=client)
601

  
602
    try:
603
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604
    except Exception: # pylint: disable-msg=W0703
605
      logging.exception("Error while activating disks")
606

  
607

  
608
def OpenStateFile(path):
609
  """Opens the state file and acquires a lock on it.
610

  
611
  @type path: string
612
  @param path: Path to state file
613

  
614
  """
615
  # The two-step dance below is necessary to allow both opening existing
616
  # file read/write and creating if not existing. Vanilla open will truncate
617
  # an existing file -or- allow creating if not existing.
618
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
619

  
620
  # Try to acquire lock on state file. If this fails, another watcher instance
621
  # might already be running or another program is temporarily blocking the
622
  # watcher from running.
623
  try:
624
    utils.LockFile(statefile_fd)
625
  except errors.LockError, err:
626
    logging.error("Can't acquire lock on state file %s: %s", path, err)
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff