Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 30e4e741

History | View | Annotate | Download (15.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
from optparse import OptionParser
39

    
40
from ganeti import utils
41
from ganeti import constants
42
from ganeti import serializer
43
from ganeti import errors
44
from ganeti import opcodes
45
from ganeti import cli
46
from ganeti import luxi
47

    
48

    
49
MAXTRIES = 5
50
BAD_STATES = ['ERROR_down']
51
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
52
NOTICE = 'NOTICE'
53
ERROR = 'ERROR'
54
KEY_RESTART_COUNT = "restart_count"
55
KEY_RESTART_WHEN = "restart_when"
56
KEY_BOOT_ID = "bootid"
57

    
58

    
59
# Global client object
60
client = None
61

    
62

    
63
class NotMasterError(errors.GenericError):
64
  """Exception raised when this host is not the master."""
65

    
66

    
67
def Indent(s, prefix='| '):
68
  """Indent a piece of text with a given prefix before each line.
69

    
70
  @param s: the string to indent
71
  @param prefix: the string to prepend each line
72

    
73
  """
74
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75

    
76

    
77
def ShouldPause():
78
  """Check whether we should pause.
79

    
80
  """
81
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
82

    
83

    
84
def EnsureDaemon(name):
85
  """Check for and start daemon if not alive.
86

    
87
  """
88
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
89
  if result.failed:
90
    logging.error("Can't start daemon '%s', failure %s, output: %s",
91
                  name, result.fail_reason, result.output)
92
    return False
93

    
94
  return True
95

    
96

    
97
class WatcherState(object):
98
  """Interface to a state file recording restart attempts.
99

    
100
  """
101
  def __init__(self):
102
    """Open, lock, read and parse the file.
103

    
104
    Raises exception on lock contention.
105

    
106
    """
107
    # The two-step dance below is necessary to allow both opening existing
108
    # file read/write and creating if not existing.  Vanilla open will truncate
109
    # an existing file -or- allow creating if not existing.
110
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
111
    self.statefile = os.fdopen(fd, 'w+')
112

    
113
    utils.LockFile(self.statefile.fileno())
114

    
115
    try:
116
      state_data = self.statefile.read()
117
      if not state_data:
118
        self._data = {}
119
      else:
120
        self._data = serializer.Load(state_data)
121
    except Exception, msg: # pylint: disable-msg=W0703
122
      # Ignore errors while loading the file and treat it as empty
123
      self._data = {}
124
      logging.warning(("Invalid state file. Using defaults."
125
                       " Error message: %s"), msg)
126

    
127
    if "instance" not in self._data:
128
      self._data["instance"] = {}
129
    if "node" not in self._data:
130
      self._data["node"] = {}
131

    
132
    self._orig_data = serializer.Dump(self._data)
133

    
134
  def Save(self):
135
    """Save state to file, then unlock and close it.
136

    
137
    """
138
    assert self.statefile
139

    
140
    serialized_form = serializer.Dump(self._data)
141
    if self._orig_data == serialized_form:
142
      logging.debug("Data didn't change, just touching status file")
143
      os.utime(constants.WATCHER_STATEFILE, None)
144
      return
145

    
146
    # We need to make sure the file is locked before renaming it, otherwise
147
    # starting ganeti-watcher again at the same time will create a conflict.
148
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
149
                         data=serialized_form,
150
                         prewrite=utils.LockFile, close=False)
151
    self.statefile = os.fdopen(fd, 'w+')
152

    
153
  def Close(self):
154
    """Unlock configuration file and close it.
155

    
156
    """
157
    assert self.statefile
158

    
159
    # Files are automatically unlocked when closing them
160
    self.statefile.close()
161
    self.statefile = None
162

    
163
  def GetNodeBootID(self, name):
164
    """Returns the last boot ID of a node or None.
165

    
166
    """
167
    ndata = self._data["node"]
168

    
169
    if name in ndata and KEY_BOOT_ID in ndata[name]:
170
      return ndata[name][KEY_BOOT_ID]
171
    return None
172

    
173
  def SetNodeBootID(self, name, bootid):
174
    """Sets the boot ID of a node.
175

    
176
    """
177
    assert bootid
178

    
179
    ndata = self._data["node"]
180

    
181
    if name not in ndata:
182
      ndata[name] = {}
183

    
184
    ndata[name][KEY_BOOT_ID] = bootid
185

    
186
  def NumberOfRestartAttempts(self, instance):
187
    """Returns number of previous restart attempts.
188

    
189
    @type instance: L{Instance}
190
    @param instance: the instance to look up
191

    
192
    """
193
    idata = self._data["instance"]
194

    
195
    if instance.name in idata:
196
      return idata[instance.name][KEY_RESTART_COUNT]
197

    
198
    return 0
199

    
200
  def RecordRestartAttempt(self, instance):
201
    """Record a restart attempt.
202

    
203
    @type instance: L{Instance}
204
    @param instance: the instance being restarted
205

    
206
    """
207
    idata = self._data["instance"]
208

    
209
    if instance.name not in idata:
210
      inst = idata[instance.name] = {}
211
    else:
212
      inst = idata[instance.name]
213

    
214
    inst[KEY_RESTART_WHEN] = time.time()
215
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
216

    
217
  def RemoveInstance(self, instance):
218
    """Update state to reflect that a machine is running.
219

    
220
    This method removes the record for a named instance (as we only
221
    track down instances).
222

    
223
    @type instance: L{Instance}
224
    @param instance: the instance to remove from books
225

    
226
    """
227
    idata = self._data["instance"]
228

    
229
    if instance.name in idata:
230
      del idata[instance.name]
231

    
232

    
233
class Instance(object):
234
  """Abstraction for a Virtual Machine instance.
235

    
236
  """
237
  def __init__(self, name, state, autostart):
238
    self.name = name
239
    self.state = state
240
    self.autostart = autostart
241

    
242
  def Restart(self):
243
    """Encapsulates the start of an instance.
244

    
245
    """
246
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
247
    cli.SubmitOpCode(op, cl=client)
248

    
249
  def ActivateDisks(self):
250
    """Encapsulates the activation of all disks of an instance.
251

    
252
    """
253
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
254
    cli.SubmitOpCode(op, cl=client)
255

    
256

    
257
def GetClusterData():
258
  """Get a list of instances on this cluster.
259

    
260
  """
261
  op1_fields = ["name", "status", "admin_state", "snodes"]
262
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
263
                                 use_locking=True)
264
  op2_fields = ["name", "bootid", "offline"]
265
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
266
                             use_locking=True)
267

    
268
  job_id = client.SubmitJob([op1, op2])
269

    
270
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
271

    
272
  logging.debug("Got data from cluster, writing instance status file")
273

    
274
  result = all_results[0]
275
  smap = {}
276

    
277
  instances = {}
278

    
279
  # write the upfile
280
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
281
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
282

    
283
  for fields in result:
284
    (name, status, autostart, snodes) = fields
285

    
286
    # update the secondary node map
287
    for node in snodes:
288
      if node not in smap:
289
        smap[node] = []
290
      smap[node].append(name)
291

    
292
    instances[name] = Instance(name, status, autostart)
293

    
294
  nodes =  dict([(name, (bootid, offline))
295
                 for name, bootid, offline in all_results[1]])
296

    
297
  client.ArchiveJob(job_id)
298

    
299
  return instances, nodes, smap
300

    
301

    
302
class Watcher(object):
303
  """Encapsulate the logic for restarting erronously halted virtual machines.
304

    
305
  The calling program should periodically instantiate me and call Run().
306
  This will traverse the list of instances, and make up to MAXTRIES attempts
307
  to restart machines that are down.
308

    
309
  """
310
  def __init__(self, opts, notepad):
311
    self.notepad = notepad
312
    master = client.QueryConfigValues(["master_node"])[0]
313
    if master != utils.HostInfo().name:
314
      raise NotMasterError("This is not the master node")
315
    # first archive old jobs
316
    self.ArchiveJobs(opts.job_age)
317
    # and only then submit new ones
318
    self.instances, self.bootids, self.smap = GetClusterData()
319
    self.started_instances = set()
320
    self.opts = opts
321

    
322
  def Run(self):
323
    """Watcher run sequence.
324

    
325
    """
326
    notepad = self.notepad
327
    self.CheckInstances(notepad)
328
    self.CheckDisks(notepad)
329
    self.VerifyDisks()
330

    
331
  @staticmethod
332
  def ArchiveJobs(age):
333
    """Archive old jobs.
334

    
335
    """
336
    arch_count, left_count = client.AutoArchiveJobs(age)
337
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
338

    
339
  def CheckDisks(self, notepad):
340
    """Check all nodes for restarted ones.
341

    
342
    """
343
    check_nodes = []
344
    for name, (new_id, offline) in self.bootids.iteritems():
345
      old = notepad.GetNodeBootID(name)
346
      if new_id is None:
347
        # Bad node, not returning a boot id
348
        if not offline:
349
          logging.debug("Node %s missing boot id, skipping secondary checks",
350
                        name)
351
        continue
352
      if old != new_id:
353
        # Node's boot ID has changed, proably through a reboot.
354
        check_nodes.append(name)
355

    
356
    if check_nodes:
357
      # Activate disks for all instances with any of the checked nodes as a
358
      # secondary node.
359
      for node in check_nodes:
360
        if node not in self.smap:
361
          continue
362
        for instance_name in self.smap[node]:
363
          instance = self.instances[instance_name]
364
          if not instance.autostart:
365
            logging.info(("Skipping disk activation for non-autostart"
366
                          " instance %s"), instance.name)
367
            continue
368
          if instance.name in self.started_instances:
369
            # we already tried to start the instance, which should have
370
            # activated its drives (if they can be at all)
371
            continue
372
          try:
373
            logging.info("Activating disks for instance %s", instance.name)
374
            instance.ActivateDisks()
375
          except Exception: # pylint: disable-msg=W0703
376
            logging.exception("Error while activating disks for instance %s",
377
                              instance.name)
378

    
379
      # Keep changed boot IDs
380
      for name in check_nodes:
381
        notepad.SetNodeBootID(name, self.bootids[name][0])
382

    
383
  def CheckInstances(self, notepad):
384
    """Make a pass over the list of instances, restarting downed ones.
385

    
386
    """
387
    for instance in self.instances.values():
388
      if instance.state in BAD_STATES:
389
        n = notepad.NumberOfRestartAttempts(instance)
390

    
391
        if n > MAXTRIES:
392
          # stay quiet.
393
          continue
394
        elif n < MAXTRIES:
395
          last = " (Attempt #%d)" % (n + 1)
396
        else:
397
          notepad.RecordRestartAttempt(instance)
398
          logging.error("Could not restart %s after %d attempts, giving up",
399
                        instance.name, MAXTRIES)
400
          continue
401
        try:
402
          logging.info("Restarting %s%s",
403
                        instance.name, last)
404
          instance.Restart()
405
          self.started_instances.add(instance.name)
406
        except Exception: # pylint: disable-msg=W0703
407
          logging.exception("Error while restarting instance %s",
408
                            instance.name)
409

    
410
        notepad.RecordRestartAttempt(instance)
411
      elif instance.state in HELPLESS_STATES:
412
        if notepad.NumberOfRestartAttempts(instance):
413
          notepad.RemoveInstance(instance)
414
      else:
415
        if notepad.NumberOfRestartAttempts(instance):
416
          notepad.RemoveInstance(instance)
417
          logging.info("Restart of %s succeeded", instance.name)
418

    
419
  @staticmethod
420
  def VerifyDisks():
421
    """Run gnt-cluster verify-disks.
422

    
423
    """
424
    op = opcodes.OpVerifyDisks()
425
    job_id = client.SubmitJob([op])
426
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
427
    client.ArchiveJob(job_id)
428
    if not isinstance(result, (tuple, list)):
429
      logging.error("Can't get a valid result from verify-disks")
430
      return
431
    offline_disk_instances = result[2]
432
    if not offline_disk_instances:
433
      # nothing to do
434
      return
435
    logging.debug("Will activate disks for instances %s",
436
                  utils.CommaJoin(offline_disk_instances))
437
    # we submit only one job, and wait for it. not optimal, but spams
438
    # less the job queue
439
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
440
           for name in offline_disk_instances]
441
    job_id = cli.SendJob(job, cl=client)
442

    
443
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
444

    
445

    
446
def ParseOptions():
447
  """Parse the command line options.
448

    
449
  @return: (options, args) as from OptionParser.parse_args()
450

    
451
  """
452
  parser = OptionParser(description="Ganeti cluster watcher",
453
                        usage="%prog [-d]",
454
                        version="%%prog (ganeti) %s" %
455
                        constants.RELEASE_VERSION)
456

    
457
  parser.add_option(cli.DEBUG_OPT)
458
  parser.add_option("-A", "--job-age", dest="job_age",
459
                    help="Autoarchive jobs older than this age (default"
460
                    " 6 hours)", default=6*3600)
461
  options, args = parser.parse_args()
462
  options.job_age = cli.ParseTimespec(options.job_age)
463
  return options, args
464

    
465

    
466
def main():
467
  """Main function.
468

    
469
  """
470
  global client # pylint: disable-msg=W0603
471

    
472
  options, args = ParseOptions()
473

    
474
  if args: # watcher doesn't take any arguments
475
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
476
    sys.exit(constants.EXIT_FAILURE)
477

    
478
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
479
                     stderr_logging=options.debug)
480

    
481
  if ShouldPause():
482
    logging.debug("Pause has been set, exiting")
483
    sys.exit(constants.EXIT_SUCCESS)
484

    
485
  update_file = False
486
  try:
487
    # on master or not, try to start the node dameon
488
    EnsureDaemon(constants.NODED)
489

    
490
    notepad = WatcherState()
491
    try:
492
      try:
493
        client = cli.GetClient()
494
      except errors.OpPrereqError:
495
        # this is, from cli.GetClient, a not-master case
496
        logging.debug("Not on master, exiting")
497
        update_file = True
498
        sys.exit(constants.EXIT_SUCCESS)
499
      except luxi.NoMasterError, err:
500
        logging.warning("Master seems to be down (%s), trying to restart",
501
                        str(err))
502
        if not EnsureDaemon(constants.MASTERD):
503
          logging.critical("Can't start the master, exiting")
504
          sys.exit(constants.EXIT_FAILURE)
505
        # else retry the connection
506
        client = cli.GetClient()
507

    
508
      # we are on master now
509
      EnsureDaemon(constants.RAPI)
510

    
511
      try:
512
        watcher = Watcher(options, notepad)
513
      except errors.ConfigurationError:
514
        # Just exit if there's no configuration
515
        update_file = True
516
        sys.exit(constants.EXIT_SUCCESS)
517

    
518
      watcher.Run()
519
      update_file = True
520

    
521
    finally:
522
      if update_file:
523
        notepad.Save()
524
      else:
525
        logging.debug("Not updating status file due to failure")
526
  except SystemExit:
527
    raise
528
  except NotMasterError:
529
    logging.debug("Not master, exiting")
530
    sys.exit(constants.EXIT_NOTMASTER)
531
  except errors.ResolverError, err:
532
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
533
    sys.exit(constants.EXIT_NODESETUP_ERROR)
534
  except errors.JobQueueFull:
535
    logging.error("Job queue is full, can't query cluster state")
536
  except errors.JobQueueDrainError:
537
    logging.error("Job queue is drained, can't maintain cluster state")
538
  except Exception, err:
539
    logging.error(str(err), exc_info=True)
540
    sys.exit(constants.EXIT_FAILURE)
541

    
542

    
543
if __name__ == '__main__':
544
  main()