Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 6d4e8ec0

History | View | Annotate | Download (15.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
import errno
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['ERROR_down']
48
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
# Global client object
57
client = None
58

    
59

    
60
class NotMasterError(errors.GenericError):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  @param s: the string to indent
68
  @param prefix: the string to prepend each line
69

    
70
  """
71
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72

    
73

    
74
def ShouldPause():
75
  """Check whether we should pause.
76

    
77
  """
78
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
79

    
80

    
81
def StartMaster():
82
  """Try to start the master daemon.
83

    
84
  """
85
  result = utils.RunCmd(['ganeti-masterd'])
86
  if result.failed:
87
    logging.error("Can't start the master daemon: output '%s'", result.output)
88
  return not result.failed
89

    
90

    
91
def EnsureDaemon(daemon):
92
  """Check for and start daemon if not alive.
93

    
94
  """
95
  pidfile = utils.DaemonPidFileName(daemon)
96
  pid = utils.ReadPidFile(pidfile)
97
  if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
98
    logging.debug("Daemon '%s' not alive, trying to restart", daemon)
99
    result = utils.RunCmd([daemon])
100
    if not result:
101
      logging.error("Can't start daemon '%s', failure %s, output: %s",
102
                    daemon, result.fail_reason, result.output)
103

    
104

    
105
class WatcherState(object):
106
  """Interface to a state file recording restart attempts.
107

    
108
  """
109
  def __init__(self):
110
    """Open, lock, read and parse the file.
111

    
112
    Raises exception on lock contention.
113

    
114
    """
115
    # The two-step dance below is necessary to allow both opening existing
116
    # file read/write and creating if not existing.  Vanilla open will truncate
117
    # an existing file -or- allow creating if not existing.
118
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
119
    self.statefile = os.fdopen(fd, 'w+')
120

    
121
    utils.LockFile(self.statefile.fileno())
122

    
123
    try:
124
      state_data = self.statefile.read()
125
      if not state_data:
126
        self._data = {}
127
      else:
128
        self._data = serializer.Load(state_data)
129
    except Exception, msg:
130
      # Ignore errors while loading the file and treat it as empty
131
      self._data = {}
132
      logging.warning(("Invalid state file. Using defaults."
133
                       " Error message: %s"), msg)
134

    
135
    if "instance" not in self._data:
136
      self._data["instance"] = {}
137
    if "node" not in self._data:
138
      self._data["node"] = {}
139

    
140
    self._orig_data = serializer.Dump(self._data)
141

    
142
  def Save(self):
143
    """Save state to file, then unlock and close it.
144

    
145
    """
146
    assert self.statefile
147

    
148
    serialized_form = serializer.Dump(self._data)
149
    if self._orig_data == serialized_form:
150
      logging.debug("Data didn't change, just touching status file")
151
      os.utime(constants.WATCHER_STATEFILE, None)
152
      return
153

    
154
    # We need to make sure the file is locked before renaming it, otherwise
155
    # starting ganeti-watcher again at the same time will create a conflict.
156
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
157
                         data=serialized_form,
158
                         prewrite=utils.LockFile, close=False)
159
    self.statefile = os.fdopen(fd, 'w+')
160

    
161
  def Close(self):
162
    """Unlock configuration file and close it.
163

    
164
    """
165
    assert self.statefile
166

    
167
    # Files are automatically unlocked when closing them
168
    self.statefile.close()
169
    self.statefile = None
170

    
171
  def GetNodeBootID(self, name):
172
    """Returns the last boot ID of a node or None.
173

    
174
    """
175
    ndata = self._data["node"]
176

    
177
    if name in ndata and KEY_BOOT_ID in ndata[name]:
178
      return ndata[name][KEY_BOOT_ID]
179
    return None
180

    
181
  def SetNodeBootID(self, name, bootid):
182
    """Sets the boot ID of a node.
183

    
184
    """
185
    assert bootid
186

    
187
    ndata = self._data["node"]
188

    
189
    if name not in ndata:
190
      ndata[name] = {}
191

    
192
    ndata[name][KEY_BOOT_ID] = bootid
193

    
194
  def NumberOfRestartAttempts(self, instance):
195
    """Returns number of previous restart attempts.
196

    
197
    @type instance: L{Instance}
198
    @param instance: the instance to look up
199

    
200
    """
201
    idata = self._data["instance"]
202

    
203
    if instance.name in idata:
204
      return idata[instance.name][KEY_RESTART_COUNT]
205

    
206
    return 0
207

    
208
  def RecordRestartAttempt(self, instance):
209
    """Record a restart attempt.
210

    
211
    @type instance: L{Instance}
212
    @param instance: the instance being restarted
213

    
214
    """
215
    idata = self._data["instance"]
216

    
217
    if instance.name not in idata:
218
      inst = idata[instance.name] = {}
219
    else:
220
      inst = idata[instance.name]
221

    
222
    inst[KEY_RESTART_WHEN] = time.time()
223
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
224

    
225
  def RemoveInstance(self, instance):
226
    """Update state to reflect that a machine is running.
227

    
228
    This method removes the record for a named instance (as we only
229
    track down instances).
230

    
231
    @type instance: L{Instance}
232
    @param instance: the instance to remove from books
233

    
234
    """
235
    idata = self._data["instance"]
236

    
237
    if instance.name in idata:
238
      del idata[instance.name]
239

    
240

    
241
class Instance(object):
242
  """Abstraction for a Virtual Machine instance.
243

    
244
  """
245
  def __init__(self, name, state, autostart):
246
    self.name = name
247
    self.state = state
248
    self.autostart = autostart
249

    
250
  def Restart(self):
251
    """Encapsulates the start of an instance.
252

    
253
    """
254
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
255
    cli.SubmitOpCode(op, cl=client)
256

    
257
  def ActivateDisks(self):
258
    """Encapsulates the activation of all disks of an instance.
259

    
260
    """
261
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
262
    cli.SubmitOpCode(op, cl=client)
263

    
264

    
265
def GetClusterData():
266
  """Get a list of instances on this cluster.
267

    
268
  """
269
  op1_fields = ["name", "status", "admin_state", "snodes"]
270
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
271
                                 use_locking=True)
272
  op2_fields = ["name", "bootid", "offline"]
273
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
274
                             use_locking=True)
275

    
276
  job_id = client.SubmitJob([op1, op2])
277

    
278
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
279

    
280
  logging.debug("Got data from cluster, writing instance status file")
281

    
282
  result = all_results[0]
283
  smap = {}
284

    
285
  instances = {}
286

    
287
  # write the upfile
288
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
289
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
290

    
291
  for fields in result:
292
    (name, status, autostart, snodes) = fields
293

    
294
    # update the secondary node map
295
    for node in snodes:
296
      if node not in smap:
297
        smap[node] = []
298
      smap[node].append(name)
299

    
300
    instances[name] = Instance(name, status, autostart)
301

    
302
  nodes =  dict([(name, (bootid, offline))
303
                 for name, bootid, offline in all_results[1]])
304

    
305
  client.ArchiveJob(job_id)
306

    
307
  return instances, nodes, smap
308

    
309

    
310
class Watcher(object):
311
  """Encapsulate the logic for restarting erronously halted virtual machines.
312

    
313
  The calling program should periodically instantiate me and call Run().
314
  This will traverse the list of instances, and make up to MAXTRIES attempts
315
  to restart machines that are down.
316

    
317
  """
318
  def __init__(self, opts, notepad):
319
    self.notepad = notepad
320
    master = client.QueryConfigValues(["master_node"])[0]
321
    if master != utils.HostInfo().name:
322
      raise NotMasterError("This is not the master node")
323
    # first archive old jobs
324
    self.ArchiveJobs(opts.job_age)
325
    # and only then submit new ones
326
    self.instances, self.bootids, self.smap = GetClusterData()
327
    self.started_instances = set()
328
    self.opts = opts
329

    
330
  def Run(self):
331
    """Watcher run sequence.
332

    
333
    """
334
    notepad = self.notepad
335
    self.CheckInstances(notepad)
336
    self.CheckDisks(notepad)
337
    self.VerifyDisks()
338

    
339
  @staticmethod
340
  def ArchiveJobs(age):
341
    """Archive old jobs.
342

    
343
    """
344
    arch_count, left_count = client.AutoArchiveJobs(age)
345
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
346

    
347
  def CheckDisks(self, notepad):
348
    """Check all nodes for restarted ones.
349

    
350
    """
351
    check_nodes = []
352
    for name, (new_id, offline) in self.bootids.iteritems():
353
      old = notepad.GetNodeBootID(name)
354
      if new_id is None:
355
        # Bad node, not returning a boot id
356
        if not offline:
357
          logging.debug("Node %s missing boot id, skipping secondary checks",
358
                        name)
359
        continue
360
      if old != new_id:
361
        # Node's boot ID has changed, proably through a reboot.
362
        check_nodes.append(name)
363

    
364
    if check_nodes:
365
      # Activate disks for all instances with any of the checked nodes as a
366
      # secondary node.
367
      for node in check_nodes:
368
        if node not in self.smap:
369
          continue
370
        for instance_name in self.smap[node]:
371
          instance = self.instances[instance_name]
372
          if not instance.autostart:
373
            logging.info(("Skipping disk activation for non-autostart"
374
                          " instance %s"), instance.name)
375
            continue
376
          if instance.name in self.started_instances:
377
            # we already tried to start the instance, which should have
378
            # activated its drives (if they can be at all)
379
            continue
380
          try:
381
            logging.info("Activating disks for instance %s", instance.name)
382
            instance.ActivateDisks()
383
          except Exception:
384
            logging.exception("Error while activating disks for instance %s",
385
                              instance.name)
386

    
387
      # Keep changed boot IDs
388
      for name in check_nodes:
389
        notepad.SetNodeBootID(name, self.bootids[name][0])
390

    
391
  def CheckInstances(self, notepad):
392
    """Make a pass over the list of instances, restarting downed ones.
393

    
394
    """
395
    for instance in self.instances.values():
396
      if instance.state in BAD_STATES:
397
        n = notepad.NumberOfRestartAttempts(instance)
398

    
399
        if n > MAXTRIES:
400
          # stay quiet.
401
          continue
402
        elif n < MAXTRIES:
403
          last = " (Attempt #%d)" % (n + 1)
404
        else:
405
          notepad.RecordRestartAttempt(instance)
406
          logging.error("Could not restart %s after %d attempts, giving up",
407
                        instance.name, MAXTRIES)
408
          continue
409
        try:
410
          logging.info("Restarting %s%s",
411
                        instance.name, last)
412
          instance.Restart()
413
          self.started_instances.add(instance.name)
414
        except Exception:
415
          logging.exception("Error while restarting instance %s",
416
                            instance.name)
417

    
418
        notepad.RecordRestartAttempt(instance)
419
      elif instance.state in HELPLESS_STATES:
420
        if notepad.NumberOfRestartAttempts(instance):
421
          notepad.RemoveInstance(instance)
422
      else:
423
        if notepad.NumberOfRestartAttempts(instance):
424
          notepad.RemoveInstance(instance)
425
          logging.info("Restart of %s succeeded", instance.name)
426

    
427
  @staticmethod
428
  def VerifyDisks():
429
    """Run gnt-cluster verify-disks.
430

    
431
    """
432
    op = opcodes.OpVerifyDisks()
433
    job_id = client.SubmitJob([op])
434
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
435
    client.ArchiveJob(job_id)
436
    if not isinstance(result, (tuple, list)):
437
      logging.error("Can't get a valid result from verify-disks")
438
      return
439
    offline_disk_instances = result[2]
440
    if not offline_disk_instances:
441
      # nothing to do
442
      return
443
    logging.debug("Will activate disks for instances %s",
444
                  ", ".join(offline_disk_instances))
445
    # we submit only one job, and wait for it. not optimal, but spams
446
    # less the job queue
447
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
448
           for name in offline_disk_instances]
449
    job_id = cli.SendJob(job, cl=client)
450

    
451
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
452

    
453

    
454
def ParseOptions():
455
  """Parse the command line options.
456

    
457
  @return: (options, args) as from OptionParser.parse_args()
458

    
459
  """
460
  parser = OptionParser(description="Ganeti cluster watcher",
461
                        usage="%prog [-d]",
462
                        version="%%prog (ganeti) %s" %
463
                        constants.RELEASE_VERSION)
464

    
465
  parser.add_option(cli.DEBUG_OPT)
466
  parser.add_option("-A", "--job-age", dest="job_age",
467
                    help="Autoarchive jobs older than this age (default"
468
                    " 6 hours)", default=6*3600)
469
  options, args = parser.parse_args()
470
  options.job_age = cli.ParseTimespec(options.job_age)
471
  return options, args
472

    
473

    
474
def main():
475
  """Main function.
476

    
477
  """
478
  global client
479

    
480
  options, args = ParseOptions()
481

    
482
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
483
                     stderr_logging=options.debug)
484

    
485
  if ShouldPause():
486
    logging.debug("Pause has been set, exiting")
487
    sys.exit(constants.EXIT_SUCCESS)
488

    
489
  update_file = False
490
  try:
491
    # on master or not, try to start the node dameon
492
    EnsureDaemon(constants.NODED)
493

    
494
    notepad = WatcherState()
495
    try:
496
      try:
497
        client = cli.GetClient()
498
      except errors.OpPrereqError:
499
        # this is, from cli.GetClient, a not-master case
500
        logging.debug("Not on master, exiting")
501
        update_file = True
502
        sys.exit(constants.EXIT_SUCCESS)
503
      except luxi.NoMasterError, err:
504
        logging.warning("Master seems to be down (%s), trying to restart",
505
                        str(err))
506
        if not StartMaster():
507
          logging.critical("Can't start the master, exiting")
508
          sys.exit(constants.EXIT_FAILURE)
509
        # else retry the connection
510
        client = cli.GetClient()
511

    
512
      # we are on master now
513
      EnsureDaemon(constants.RAPI)
514

    
515
      try:
516
        watcher = Watcher(options, notepad)
517
      except errors.ConfigurationError:
518
        # Just exit if there's no configuration
519
        update_file = True
520
        sys.exit(constants.EXIT_SUCCESS)
521

    
522
      watcher.Run()
523
      update_file = True
524

    
525
    finally:
526
      if update_file:
527
        notepad.Save()
528
      else:
529
        logging.debug("Not updating status file due to failure")
530
  except SystemExit:
531
    raise
532
  except NotMasterError:
533
    logging.debug("Not master, exiting")
534
    sys.exit(constants.EXIT_NOTMASTER)
535
  except errors.ResolverError, err:
536
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
537
    sys.exit(constants.EXIT_NODESETUP_ERROR)
538
  except errors.JobQueueFull:
539
    logging.error("Job queue is full, can't query cluster state")
540
  except errors.JobQueueDrainError:
541
    logging.error("Job queue is drained, can't maintain cluster state")
542
  except Exception, err:
543
    logging.error(str(err), exc_info=True)
544
    sys.exit(constants.EXIT_FAILURE)
545

    
546

    
547
if __name__ == '__main__':
548
  main()