Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 7dfb83c2

History | View | Annotate | Download (14.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42
from ganeti import luxi
43

    
44

    
45
MAXTRIES = 5
46
BAD_STATES = ['ERROR_down']
47
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
48
NOTICE = 'NOTICE'
49
ERROR = 'ERROR'
50
KEY_RESTART_COUNT = "restart_count"
51
KEY_RESTART_WHEN = "restart_when"
52
KEY_BOOT_ID = "bootid"
53

    
54

    
55
# Global client object
56
client = None
57

    
58

    
59
class NotMasterError(errors.GenericError):
60
  """Exception raised when this host is not the master."""
61

    
62

    
63
def Indent(s, prefix='| '):
64
  """Indent a piece of text with a given prefix before each line.
65

    
66
  @param s: the string to indent
67
  @param prefix: the string to prepend each line
68

    
69
  """
70
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71

    
72

    
73
def StartMaster():
74
  """Try to start the master daemon.
75

    
76
  """
77
  result = utils.RunCmd(['ganeti-masterd'])
78
  if result.failed:
79
    logging.error("Can't start the master daemon: output '%s'", result.output)
80
  return not result.failed
81

    
82

    
83
class WatcherState(object):
84
  """Interface to a state file recording restart attempts.
85

    
86
  """
87
  def __init__(self):
88
    """Open, lock, read and parse the file.
89

    
90
    Raises exception on lock contention.
91

    
92
    """
93
    # The two-step dance below is necessary to allow both opening existing
94
    # file read/write and creating if not existing.  Vanilla open will truncate
95
    # an existing file -or- allow creating if not existing.
96
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97
    self.statefile = os.fdopen(fd, 'w+')
98

    
99
    utils.LockFile(self.statefile.fileno())
100

    
101
    try:
102
      state_data = self.statefile.read()
103
      if not state_data:
104
        self._data = {}
105
      else:
106
        self._data = serializer.Load(state_data)
107
    except Exception, msg:
108
      # Ignore errors while loading the file and treat it as empty
109
      self._data = {}
110
      logging.warning(("Invalid state file. Using defaults."
111
                       " Error message: %s"), msg)
112

    
113
    if "instance" not in self._data:
114
      self._data["instance"] = {}
115
    if "node" not in self._data:
116
      self._data["node"] = {}
117

    
118
    self._orig_data = serializer.Dump(self._data)
119

    
120
  def Save(self):
121
    """Save state to file, then unlock and close it.
122

    
123
    """
124
    assert self.statefile
125

    
126
    serialized_form = serializer.Dump(self._data)
127
    if self._orig_data == serialized_form:
128
      logging.debug("Data didn't change, just touching status file")
129
      os.utime(constants.WATCHER_STATEFILE, None)
130
      return
131

    
132
    # We need to make sure the file is locked before renaming it, otherwise
133
    # starting ganeti-watcher again at the same time will create a conflict.
134
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135
                         data=serialized_form,
136
                         prewrite=utils.LockFile, close=False)
137
    self.statefile = os.fdopen(fd, 'w+')
138

    
139
  def Close(self):
140
    """Unlock configuration file and close it.
141

    
142
    """
143
    assert self.statefile
144

    
145
    # Files are automatically unlocked when closing them
146
    self.statefile.close()
147
    self.statefile = None
148

    
149
  def GetNodeBootID(self, name):
150
    """Returns the last boot ID of a node or None.
151

    
152
    """
153
    ndata = self._data["node"]
154

    
155
    if name in ndata and KEY_BOOT_ID in ndata[name]:
156
      return ndata[name][KEY_BOOT_ID]
157
    return None
158

    
159
  def SetNodeBootID(self, name, bootid):
160
    """Sets the boot ID of a node.
161

    
162
    """
163
    assert bootid
164

    
165
    ndata = self._data["node"]
166

    
167
    if name not in ndata:
168
      ndata[name] = {}
169

    
170
    ndata[name][KEY_BOOT_ID] = bootid
171

    
172
  def NumberOfRestartAttempts(self, instance):
173
    """Returns number of previous restart attempts.
174

    
175
    @type instance: L{Instance}
176
    @param instance: the instance to look up
177

    
178
    """
179
    idata = self._data["instance"]
180

    
181
    if instance.name in idata:
182
      return idata[instance.name][KEY_RESTART_COUNT]
183

    
184
    return 0
185

    
186
  def RecordRestartAttempt(self, instance):
187
    """Record a restart attempt.
188

    
189
    @type instance: L{Instance}
190
    @param instance: the instance being restarted
191

    
192
    """
193
    idata = self._data["instance"]
194

    
195
    if instance.name not in idata:
196
      inst = idata[instance.name] = {}
197
    else:
198
      inst = idata[instance.name]
199

    
200
    inst[KEY_RESTART_WHEN] = time.time()
201
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
202

    
203
  def RemoveInstance(self, instance):
204
    """Update state to reflect that a machine is running.
205

    
206
    This method removes the record for a named instance (as we only
207
    track down instances).
208

    
209
    @type instance: L{Instance}
210
    @param instance: the instance to remove from books
211

    
212
    """
213
    idata = self._data["instance"]
214

    
215
    if instance.name in idata:
216
      del idata[instance.name]
217

    
218

    
219
class Instance(object):
220
  """Abstraction for a Virtual Machine instance.
221

    
222
  """
223
  def __init__(self, name, state, autostart):
224
    self.name = name
225
    self.state = state
226
    self.autostart = autostart
227

    
228
  def Restart(self):
229
    """Encapsulates the start of an instance.
230

    
231
    """
232
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233
    cli.SubmitOpCode(op, cl=client)
234

    
235
  def ActivateDisks(self):
236
    """Encapsulates the activation of all disks of an instance.
237

    
238
    """
239
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240
    cli.SubmitOpCode(op, cl=client)
241

    
242

    
243
def GetClusterData():
244
  """Get a list of instances on this cluster.
245

    
246
  """
247
  op1_fields = ["name", "status", "admin_state", "snodes"]
248
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
249
                                 use_locking=True)
250
  op2_fields = ["name", "bootid", "offline"]
251
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
252
                             use_locking=True)
253

    
254
  job_id = client.SubmitJob([op1, op2])
255

    
256
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
257

    
258
  result = all_results[0]
259
  smap = {}
260

    
261
  instances = {}
262
  for fields in result:
263
    (name, status, autostart, snodes) = fields
264

    
265
    # update the secondary node map
266
    for node in snodes:
267
      if node not in smap:
268
        smap[node] = []
269
      smap[node].append(name)
270

    
271
    instances[name] = Instance(name, status, autostart)
272

    
273
  nodes =  dict([(name, (bootid, offline))
274
                 for name, bootid, offline in all_results[1]])
275

    
276
  client.ArchiveJob(job_id)
277

    
278
  return instances, nodes, smap
279

    
280

    
281
class Watcher(object):
282
  """Encapsulate the logic for restarting erronously halted virtual machines.
283

    
284
  The calling program should periodically instantiate me and call Run().
285
  This will traverse the list of instances, and make up to MAXTRIES attempts
286
  to restart machines that are down.
287

    
288
  """
289
  def __init__(self, opts, notepad):
290
    self.notepad = notepad
291
    master = client.QueryConfigValues(["master_node"])[0]
292
    if master != utils.HostInfo().name:
293
      raise NotMasterError("This is not the master node")
294
    self.instances, self.bootids, self.smap = GetClusterData()
295
    self.started_instances = set()
296
    self.opts = opts
297

    
298
  def Run(self):
299
    """Watcher run sequence.
300

    
301
    """
302
    notepad = self.notepad
303
    self.ArchiveJobs(self.opts.job_age)
304
    self.CheckInstances(notepad)
305
    self.CheckDisks(notepad)
306
    self.VerifyDisks()
307

    
308
  def ArchiveJobs(self, age):
309
    """Archive old jobs.
310

    
311
    """
312
    arch_count, left_count = client.AutoArchiveJobs(age)
313
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
314

    
315
  def CheckDisks(self, notepad):
316
    """Check all nodes for restarted ones.
317

    
318
    """
319
    check_nodes = []
320
    for name, (new_id, offline) in self.bootids.iteritems():
321
      old = notepad.GetNodeBootID(name)
322
      if new_id is None:
323
        # Bad node, not returning a boot id
324
        if not offline:
325
          logging.debug("Node %s missing boot id, skipping secondary checks",
326
                        name)
327
        continue
328
      if old != new_id:
329
        # Node's boot ID has changed, proably through a reboot.
330
        check_nodes.append(name)
331

    
332
    if check_nodes:
333
      # Activate disks for all instances with any of the checked nodes as a
334
      # secondary node.
335
      for node in check_nodes:
336
        if node not in self.smap:
337
          continue
338
        for instance_name in self.smap[node]:
339
          instance = self.instances[instance_name]
340
          if not instance.autostart:
341
            logging.info(("Skipping disk activation for non-autostart"
342
                          " instance %s"), instance.name)
343
            continue
344
          if instance.name in self.started_instances:
345
            # we already tried to start the instance, which should have
346
            # activated its drives (if they can be at all)
347
            continue
348
          try:
349
            logging.info("Activating disks for instance %s", instance.name)
350
            instance.ActivateDisks()
351
          except Exception:
352
            logging.exception("Error while activating disks for instance %s",
353
                              instance.name)
354

    
355
      # Keep changed boot IDs
356
      for name in check_nodes:
357
        notepad.SetNodeBootID(name, self.bootids[name][0])
358

    
359
  def CheckInstances(self, notepad):
360
    """Make a pass over the list of instances, restarting downed ones.
361

    
362
    """
363
    for instance in self.instances.values():
364
      if instance.state in BAD_STATES:
365
        n = notepad.NumberOfRestartAttempts(instance)
366

    
367
        if n > MAXTRIES:
368
          # stay quiet.
369
          continue
370
        elif n < MAXTRIES:
371
          last = " (Attempt #%d)" % (n + 1)
372
        else:
373
          notepad.RecordRestartAttempt(instance)
374
          logging.error("Could not restart %s after %d attempts, giving up",
375
                        instance.name, MAXTRIES)
376
          continue
377
        try:
378
          logging.info("Restarting %s%s",
379
                        instance.name, last)
380
          instance.Restart()
381
          self.started_instances.add(instance.name)
382
        except Exception:
383
          logging.exception("Error while restarting instance %s",
384
                            instance.name)
385

    
386
        notepad.RecordRestartAttempt(instance)
387
      elif instance.state in HELPLESS_STATES:
388
        if notepad.NumberOfRestartAttempts(instance):
389
          notepad.RemoveInstance(instance)
390
      else:
391
        if notepad.NumberOfRestartAttempts(instance):
392
          notepad.RemoveInstance(instance)
393
          logging.info("Restart of %s succeeded", instance.name)
394

    
395
  @staticmethod
396
  def VerifyDisks():
397
    """Run gnt-cluster verify-disks.
398

    
399
    """
400
    op = opcodes.OpVerifyDisks()
401
    job_id = client.SubmitJob([op])
402
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
403
    client.ArchiveJob(job_id)
404
    if not isinstance(result, (tuple, list)):
405
      logging.error("Can't get a valid result from verify-disks")
406
      return
407
    offline_disk_instances = result[2]
408
    if not offline_disk_instances:
409
      # nothing to do
410
      return
411
    logging.debug("Will activate disks for instances %s",
412
                  ", ".join(offline_disk_instances))
413
    # we submit only one job, and wait for it. not optimal, but spams
414
    # less the job queue
415
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
416
           for name in offline_disk_instances]
417
    job_id = cli.SendJob(job, cl=client)
418

    
419
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
420

    
421

    
422
def ParseOptions():
423
  """Parse the command line options.
424

    
425
  @return: (options, args) as from OptionParser.parse_args()
426

    
427
  """
428
  parser = OptionParser(description="Ganeti cluster watcher",
429
                        usage="%prog [-d]",
430
                        version="%%prog (ganeti) %s" %
431
                        constants.RELEASE_VERSION)
432

    
433
  parser.add_option("-d", "--debug", dest="debug",
434
                    help="Write all messages to stderr",
435
                    default=False, action="store_true")
436
  parser.add_option("-A", "--job-age", dest="job_age",
437
                    help="Autoarchive jobs older than this age (default"
438
                    " 6 hours)", default=6*3600)
439
  options, args = parser.parse_args()
440
  options.job_age = cli.ParseTimespec(options.job_age)
441
  return options, args
442

    
443

    
444
def main():
445
  """Main function.
446

    
447
  """
448
  global client
449

    
450
  options, args = ParseOptions()
451

    
452
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
453
                     stderr_logging=options.debug)
454

    
455
  update_file = True
456
  try:
457
    notepad = WatcherState()
458
    try:
459
      try:
460
        client = cli.GetClient()
461
      except errors.OpPrereqError:
462
        # this is, from cli.GetClient, a not-master case
463
        logging.debug("Not on master, exiting")
464
        sys.exit(constants.EXIT_SUCCESS)
465
      except luxi.NoMasterError, err:
466
        logging.warning("Master seems to be down (%s), trying to restart",
467
                        str(err))
468
        if not StartMaster():
469
          logging.critical("Can't start the master, exiting")
470
          update_file = False
471
          sys.exit(constants.EXIT_FAILURE)
472
        # else retry the connection
473
        client = cli.GetClient()
474

    
475
      try:
476
        watcher = Watcher(options, notepad)
477
      except errors.ConfigurationError:
478
        # Just exit if there's no configuration
479
        sys.exit(constants.EXIT_SUCCESS)
480

    
481
      watcher.Run()
482
    finally:
483
      if update_file:
484
        notepad.Save()
485
      else:
486
        logging.debug("Not updating status file due to failure")
487
  except SystemExit:
488
    raise
489
  except NotMasterError:
490
    logging.debug("Not master, exiting")
491
    sys.exit(constants.EXIT_NOTMASTER)
492
  except errors.ResolverError, err:
493
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
494
    sys.exit(constants.EXIT_NODESETUP_ERROR)
495
  except Exception, err:
496
    logging.error(str(err), exc_info=True)
497
    sys.exit(constants.EXIT_FAILURE)
498

    
499

    
500
if __name__ == '__main__':
501
  main()