Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ c4feafe8

History | View | Annotate | Download (17 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47

    
48

    
49
MAXTRIES = 5
50
BAD_STATES = ['ERROR_down']
51
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
52
NOTICE = 'NOTICE'
53
ERROR = 'ERROR'
54
KEY_RESTART_COUNT = "restart_count"
55
KEY_RESTART_WHEN = "restart_when"
56
KEY_BOOT_ID = "bootid"
57

    
58

    
59
# Global client object
60
client = None
61

    
62

    
63
class NotMasterError(errors.GenericError):
64
  """Exception raised when this host is not the master."""
65

    
66

    
67
def ShouldPause():
68
  """Check whether we should pause.
69

    
70
  """
71
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
72

    
73

    
74
def StartNodeDaemons():
75
  """Start all the daemons that should be running on all nodes.
76

    
77
  """
78
  # on master or not, try to start the node dameon
79
  utils.EnsureDaemon(constants.NODED)
80
  # start confd as well. On non candidates it will be in disabled mode.
81
  utils.EnsureDaemon(constants.CONFD)
82

    
83

    
84
def RunWatcherHooks():
85
  """Run the watcher hooks.
86

    
87
  """
88
  hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
89
                             constants.HOOKS_NAME_WATCHER)
90

    
91
  try:
92
    results = utils.RunParts(hooks_dir)
93
  except Exception, msg: # pylint: disable-msg=W0703
94
    logging.critical("RunParts %s failed: %s", hooks_dir, msg)
95

    
96
  for (relname, status, runresult) in results:
97
    if status == constants.RUNPARTS_SKIP:
98
      logging.debug("Watcher hook %s: skipped", relname)
99
    elif status == constants.RUNPARTS_ERR:
100
      logging.warning("Watcher hook %s: error (%s)", relname, runresult)
101
    elif status == constants.RUNPARTS_RUN:
102
      if runresult.failed:
103
        logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
104
                        relname, runresult.exit_code, runresult.output)
105
      else:
106
        logging.debug("Watcher hook %s: success (output: %s)", relname,
107
                      runresult.output)
108

    
109

    
110
class WatcherState(object):
111
  """Interface to a state file recording restart attempts.
112

    
113
  """
114
  def __init__(self, statefile):
115
    """Open, lock, read and parse the file.
116

    
117
    @type statefile: file
118
    @param statefile: State file object
119

    
120
    """
121
    self.statefile = statefile
122

    
123
    try:
124
      state_data = self.statefile.read()
125
      if not state_data:
126
        self._data = {}
127
      else:
128
        self._data = serializer.Load(state_data)
129
    except Exception, msg: # pylint: disable-msg=W0703
130
      # Ignore errors while loading the file and treat it as empty
131
      self._data = {}
132
      logging.warning(("Invalid state file. Using defaults."
133
                       " Error message: %s"), msg)
134

    
135
    if "instance" not in self._data:
136
      self._data["instance"] = {}
137
    if "node" not in self._data:
138
      self._data["node"] = {}
139

    
140
    self._orig_data = serializer.Dump(self._data)
141

    
142
  def Save(self):
143
    """Save state to file, then unlock and close it.
144

    
145
    """
146
    assert self.statefile
147

    
148
    serialized_form = serializer.Dump(self._data)
149
    if self._orig_data == serialized_form:
150
      logging.debug("Data didn't change, just touching status file")
151
      os.utime(constants.WATCHER_STATEFILE, None)
152
      return
153

    
154
    # We need to make sure the file is locked before renaming it, otherwise
155
    # starting ganeti-watcher again at the same time will create a conflict.
156
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
157
                         data=serialized_form,
158
                         prewrite=utils.LockFile, close=False)
159
    self.statefile = os.fdopen(fd, 'w+')
160

    
161
  def Close(self):
162
    """Unlock configuration file and close it.
163

    
164
    """
165
    assert self.statefile
166

    
167
    # Files are automatically unlocked when closing them
168
    self.statefile.close()
169
    self.statefile = None
170

    
171
  def GetNodeBootID(self, name):
172
    """Returns the last boot ID of a node or None.
173

    
174
    """
175
    ndata = self._data["node"]
176

    
177
    if name in ndata and KEY_BOOT_ID in ndata[name]:
178
      return ndata[name][KEY_BOOT_ID]
179
    return None
180

    
181
  def SetNodeBootID(self, name, bootid):
182
    """Sets the boot ID of a node.
183

    
184
    """
185
    assert bootid
186

    
187
    ndata = self._data["node"]
188

    
189
    if name not in ndata:
190
      ndata[name] = {}
191

    
192
    ndata[name][KEY_BOOT_ID] = bootid
193

    
194
  def NumberOfRestartAttempts(self, instance):
195
    """Returns number of previous restart attempts.
196

    
197
    @type instance: L{Instance}
198
    @param instance: the instance to look up
199

    
200
    """
201
    idata = self._data["instance"]
202

    
203
    if instance.name in idata:
204
      return idata[instance.name][KEY_RESTART_COUNT]
205

    
206
    return 0
207

    
208
  def RecordRestartAttempt(self, instance):
209
    """Record a restart attempt.
210

    
211
    @type instance: L{Instance}
212
    @param instance: the instance being restarted
213

    
214
    """
215
    idata = self._data["instance"]
216

    
217
    if instance.name not in idata:
218
      inst = idata[instance.name] = {}
219
    else:
220
      inst = idata[instance.name]
221

    
222
    inst[KEY_RESTART_WHEN] = time.time()
223
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
224

    
225
  def RemoveInstance(self, instance):
226
    """Update state to reflect that a machine is running.
227

    
228
    This method removes the record for a named instance (as we only
229
    track down instances).
230

    
231
    @type instance: L{Instance}
232
    @param instance: the instance to remove from books
233

    
234
    """
235
    idata = self._data["instance"]
236

    
237
    if instance.name in idata:
238
      del idata[instance.name]
239

    
240

    
241
class Instance(object):
242
  """Abstraction for a Virtual Machine instance.
243

    
244
  """
245
  def __init__(self, name, state, autostart):
246
    self.name = name
247
    self.state = state
248
    self.autostart = autostart
249

    
250
  def Restart(self):
251
    """Encapsulates the start of an instance.
252

    
253
    """
254
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
255
    cli.SubmitOpCode(op, cl=client)
256

    
257
  def ActivateDisks(self):
258
    """Encapsulates the activation of all disks of an instance.
259

    
260
    """
261
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
262
    cli.SubmitOpCode(op, cl=client)
263

    
264

    
265
def GetClusterData():
266
  """Get a list of instances on this cluster.
267

    
268
  """
269
  op1_fields = ["name", "status", "admin_state", "snodes"]
270
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
271
                                 use_locking=True)
272
  op2_fields = ["name", "bootid", "offline"]
273
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
274
                             use_locking=True)
275

    
276
  job_id = client.SubmitJob([op1, op2])
277

    
278
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
279

    
280
  logging.debug("Got data from cluster, writing instance status file")
281

    
282
  result = all_results[0]
283
  smap = {}
284

    
285
  instances = {}
286

    
287
  # write the upfile
288
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
289
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
290

    
291
  for fields in result:
292
    (name, status, autostart, snodes) = fields
293

    
294
    # update the secondary node map
295
    for node in snodes:
296
      if node not in smap:
297
        smap[node] = []
298
      smap[node].append(name)
299

    
300
    instances[name] = Instance(name, status, autostart)
301

    
302
  nodes =  dict([(name, (bootid, offline))
303
                 for name, bootid, offline in all_results[1]])
304

    
305
  client.ArchiveJob(job_id)
306

    
307
  return instances, nodes, smap
308

    
309

    
310
class Watcher(object):
311
  """Encapsulate the logic for restarting erronously halted virtual machines.
312

    
313
  The calling program should periodically instantiate me and call Run().
314
  This will traverse the list of instances, and make up to MAXTRIES attempts
315
  to restart machines that are down.
316

    
317
  """
318
  def __init__(self, opts, notepad):
319
    self.notepad = notepad
320
    master = client.QueryConfigValues(["master_node"])[0]
321
    if master != utils.HostInfo().name:
322
      raise NotMasterError("This is not the master node")
323
    # first archive old jobs
324
    self.ArchiveJobs(opts.job_age)
325
    # and only then submit new ones
326
    self.instances, self.bootids, self.smap = GetClusterData()
327
    self.started_instances = set()
328
    self.opts = opts
329

    
330
  def Run(self):
331
    """Watcher run sequence.
332

    
333
    """
334
    notepad = self.notepad
335
    self.CheckInstances(notepad)
336
    self.CheckDisks(notepad)
337
    self.VerifyDisks()
338

    
339
  @staticmethod
340
  def ArchiveJobs(age):
341
    """Archive old jobs.
342

    
343
    """
344
    arch_count, left_count = client.AutoArchiveJobs(age)
345
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
346

    
347
  def CheckDisks(self, notepad):
348
    """Check all nodes for restarted ones.
349

    
350
    """
351
    check_nodes = []
352
    for name, (new_id, offline) in self.bootids.iteritems():
353
      old = notepad.GetNodeBootID(name)
354
      if new_id is None:
355
        # Bad node, not returning a boot id
356
        if not offline:
357
          logging.debug("Node %s missing boot id, skipping secondary checks",
358
                        name)
359
        continue
360
      if old != new_id:
361
        # Node's boot ID has changed, proably through a reboot.
362
        check_nodes.append(name)
363

    
364
    if check_nodes:
365
      # Activate disks for all instances with any of the checked nodes as a
366
      # secondary node.
367
      for node in check_nodes:
368
        if node not in self.smap:
369
          continue
370
        for instance_name in self.smap[node]:
371
          instance = self.instances[instance_name]
372
          if not instance.autostart:
373
            logging.info(("Skipping disk activation for non-autostart"
374
                          " instance %s"), instance.name)
375
            continue
376
          if instance.name in self.started_instances:
377
            # we already tried to start the instance, which should have
378
            # activated its drives (if they can be at all)
379
            continue
380
          try:
381
            logging.info("Activating disks for instance %s", instance.name)
382
            instance.ActivateDisks()
383
          except Exception: # pylint: disable-msg=W0703
384
            logging.exception("Error while activating disks for instance %s",
385
                              instance.name)
386

    
387
      # Keep changed boot IDs
388
      for name in check_nodes:
389
        notepad.SetNodeBootID(name, self.bootids[name][0])
390

    
391
  def CheckInstances(self, notepad):
392
    """Make a pass over the list of instances, restarting downed ones.
393

    
394
    """
395
    for instance in self.instances.values():
396
      if instance.state in BAD_STATES:
397
        n = notepad.NumberOfRestartAttempts(instance)
398

    
399
        if n > MAXTRIES:
400
          # stay quiet.
401
          continue
402
        elif n < MAXTRIES:
403
          last = " (Attempt #%d)" % (n + 1)
404
        else:
405
          notepad.RecordRestartAttempt(instance)
406
          logging.error("Could not restart %s after %d attempts, giving up",
407
                        instance.name, MAXTRIES)
408
          continue
409
        try:
410
          logging.info("Restarting %s%s",
411
                        instance.name, last)
412
          instance.Restart()
413
          self.started_instances.add(instance.name)
414
        except Exception: # pylint: disable-msg=W0703
415
          logging.exception("Error while restarting instance %s",
416
                            instance.name)
417

    
418
        notepad.RecordRestartAttempt(instance)
419
      elif instance.state in HELPLESS_STATES:
420
        if notepad.NumberOfRestartAttempts(instance):
421
          notepad.RemoveInstance(instance)
422
      else:
423
        if notepad.NumberOfRestartAttempts(instance):
424
          notepad.RemoveInstance(instance)
425
          logging.info("Restart of %s succeeded", instance.name)
426

    
427
  @staticmethod
428
  def VerifyDisks():
429
    """Run gnt-cluster verify-disks.
430

    
431
    """
432
    op = opcodes.OpVerifyDisks()
433
    job_id = client.SubmitJob([op])
434
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
435
    client.ArchiveJob(job_id)
436
    if not isinstance(result, (tuple, list)):
437
      logging.error("Can't get a valid result from verify-disks")
438
      return
439
    offline_disk_instances = result[2]
440
    if not offline_disk_instances:
441
      # nothing to do
442
      return
443
    logging.debug("Will activate disks for instances %s",
444
                  utils.CommaJoin(offline_disk_instances))
445
    # we submit only one job, and wait for it. not optimal, but spams
446
    # less the job queue
447
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
448
           for name in offline_disk_instances]
449
    job_id = cli.SendJob(job, cl=client)
450

    
451
    try:
452
      cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
453
    except Exception: # pylint: disable-msg=W0703
454
      logging.exception("Error while activating disks")
455

    
456

    
457
def OpenStateFile(path):
458
  """Opens the state file and acquires a lock on it.
459

    
460
  @type path: string
461
  @param path: Path to state file
462

    
463
  """
464
  # The two-step dance below is necessary to allow both opening existing
465
  # file read/write and creating if not existing. Vanilla open will truncate
466
  # an existing file -or- allow creating if not existing.
467
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
468

    
469
  # Try to acquire lock on state file. If this fails, another watcher instance
470
  # might already be running or another program is temporarily blocking the
471
  # watcher from running.
472
  try:
473
    utils.LockFile(statefile_fd)
474
  except errors.LockError, err:
475
    logging.error("Can't acquire lock on state file %s: %s", path, err)
476
    return None
477

    
478
  return os.fdopen(statefile_fd, "w+")
479

    
480

    
481
def ParseOptions():
482
  """Parse the command line options.
483

    
484
  @return: (options, args) as from OptionParser.parse_args()
485

    
486
  """
487
  parser = OptionParser(description="Ganeti cluster watcher",
488
                        usage="%prog [-d]",
489
                        version="%%prog (ganeti) %s" %
490
                        constants.RELEASE_VERSION)
491

    
492
  parser.add_option(cli.DEBUG_OPT)
493
  parser.add_option("-A", "--job-age", dest="job_age",
494
                    help="Autoarchive jobs older than this age (default"
495
                    " 6 hours)", default=6*3600)
496
  options, args = parser.parse_args()
497
  options.job_age = cli.ParseTimespec(options.job_age)
498
  return options, args
499

    
500

    
501
def main():
502
  """Main function.
503

    
504
  """
505
  global client # pylint: disable-msg=W0603
506

    
507
  options, args = ParseOptions()
508

    
509
  if args: # watcher doesn't take any arguments
510
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
511
    sys.exit(constants.EXIT_FAILURE)
512

    
513
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
514
                     stderr_logging=options.debug)
515

    
516
  if ShouldPause():
517
    logging.debug("Pause has been set, exiting")
518
    sys.exit(constants.EXIT_SUCCESS)
519

    
520
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
521
  if not statefile:
522
    sys.exit(constants.EXIT_FAILURE)
523

    
524
  update_file = False
525
  try:
526
    StartNodeDaemons()
527
    RunWatcherHooks()
528

    
529
    notepad = WatcherState(statefile)
530
    try:
531
      try:
532
        client = cli.GetClient()
533
      except errors.OpPrereqError:
534
        # this is, from cli.GetClient, a not-master case
535
        logging.debug("Not on master, exiting")
536
        update_file = True
537
        sys.exit(constants.EXIT_SUCCESS)
538
      except luxi.NoMasterError, err:
539
        logging.warning("Master seems to be down (%s), trying to restart",
540
                        str(err))
541
        if not utils.EnsureDaemon(constants.MASTERD):
542
          logging.critical("Can't start the master, exiting")
543
          sys.exit(constants.EXIT_FAILURE)
544
        # else retry the connection
545
        client = cli.GetClient()
546

    
547
      # we are on master now
548
      utils.EnsureDaemon(constants.RAPI)
549

    
550
      try:
551
        watcher = Watcher(options, notepad)
552
      except errors.ConfigurationError:
553
        # Just exit if there's no configuration
554
        update_file = True
555
        sys.exit(constants.EXIT_SUCCESS)
556

    
557
      watcher.Run()
558
      update_file = True
559

    
560
    finally:
561
      if update_file:
562
        notepad.Save()
563
      else:
564
        logging.debug("Not updating status file due to failure")
565
  except SystemExit:
566
    raise
567
  except NotMasterError:
568
    logging.debug("Not master, exiting")
569
    sys.exit(constants.EXIT_NOTMASTER)
570
  except errors.ResolverError, err:
571
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
572
    sys.exit(constants.EXIT_NODESETUP_ERROR)
573
  except errors.JobQueueFull:
574
    logging.error("Job queue is full, can't query cluster state")
575
  except errors.JobQueueDrainError:
576
    logging.error("Job queue is drained, can't maintain cluster state")
577
  except Exception, err:
578
    logging.exception(str(err))
579
    sys.exit(constants.EXIT_FAILURE)
580

    
581

    
582
if __name__ == '__main__':
583
  main()