Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ cbfc4681

History | View | Annotate | Download (12.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  Args:
66
    s: The string to indent
67
    prefix: The string to prepend each line.
68

    
69
  """
70
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71

    
72

    
73
class WatcherState(object):
74
  """Interface to a state file recording restart attempts.
75

    
76
  """
77
  def __init__(self):
78
    """Open, lock, read and parse the file.
79

    
80
    Raises exception on lock contention.
81

    
82
    """
83
    # The two-step dance below is necessary to allow both opening existing
84
    # file read/write and creating if not existing.  Vanilla open will truncate
85
    # an existing file -or- allow creating if not existing.
86
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
87
    self.statefile = os.fdopen(fd, 'w+')
88

    
89
    utils.LockFile(self.statefile.fileno())
90

    
91
    try:
92
      self._data = serializer.Load(self.statefile.read())
93
    except Exception, msg:
94
      # Ignore errors while loading the file and treat it as empty
95
      self._data = {}
96
      logging.warning(("Empty or invalid state file. Using defaults."
97
                       " Error message: %s"), msg)
98

    
99
    if "instance" not in self._data:
100
      self._data["instance"] = {}
101
    if "node" not in self._data:
102
      self._data["node"] = {}
103

    
104
    self._orig_data = serializer.Dump(self._data)
105

    
106
  def Save(self):
107
    """Save state to file, then unlock and close it.
108

    
109
    """
110
    assert self.statefile
111

    
112
    serialized_form = serializer.Dump(self._data)
113
    if self._orig_data == serialized_form:
114
      logging.debug("Data didn't change, just touching status file")
115
      os.utime(constants.WATCHER_STATEFILE, None)
116
      return
117

    
118
    # We need to make sure the file is locked before renaming it, otherwise
119
    # starting ganeti-watcher again at the same time will create a conflict.
120
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
121
                         data=serialized_form,
122
                         prewrite=utils.LockFile, close=False)
123
    self.statefile = os.fdopen(fd, 'w+')
124

    
125
  def Close(self):
126
    """Unlock configuration file and close it.
127

    
128
    """
129
    assert self.statefile
130

    
131
    # Files are automatically unlocked when closing them
132
    self.statefile.close()
133
    self.statefile = None
134

    
135
  def GetNodeBootID(self, name):
136
    """Returns the last boot ID of a node or None.
137

    
138
    """
139
    ndata = self._data["node"]
140

    
141
    if name in ndata and KEY_BOOT_ID in ndata[name]:
142
      return ndata[name][KEY_BOOT_ID]
143
    return None
144

    
145
  def SetNodeBootID(self, name, bootid):
146
    """Sets the boot ID of a node.
147

    
148
    """
149
    assert bootid
150

    
151
    ndata = self._data["node"]
152

    
153
    if name not in ndata:
154
      ndata[name] = {}
155

    
156
    ndata[name][KEY_BOOT_ID] = bootid
157

    
158
  def NumberOfRestartAttempts(self, instance):
159
    """Returns number of previous restart attempts.
160

    
161
    Args:
162
      instance - the instance to look up.
163

    
164
    """
165
    idata = self._data["instance"]
166

    
167
    if instance.name in idata:
168
      return idata[instance.name][KEY_RESTART_COUNT]
169

    
170
    return 0
171

    
172
  def RecordRestartAttempt(self, instance):
173
    """Record a restart attempt.
174

    
175
    Args:
176
      instance - the instance being restarted
177

    
178
    """
179
    idata = self._data["instance"]
180

    
181
    if instance.name not in idata:
182
      inst = idata[instance.name] = {}
183
    else:
184
      inst = idata[instance.name]
185

    
186
    inst[KEY_RESTART_WHEN] = time.time()
187
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
188

    
189
  def RemoveInstance(self, instance):
190
    """Update state to reflect that a machine is running, i.e. remove record.
191

    
192
    Args:
193
      instance - the instance to remove from books
194

    
195
    This method removes the record for a named instance.
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  Methods:
208
    Restart(): issue a command to restart the represented machine.
209

    
210
  """
211
  def __init__(self, name, state, autostart):
212
    self.name = name
213
    self.state = state
214
    self.autostart = autostart
215

    
216
  def Restart(self):
217
    """Encapsulates the start of an instance.
218

    
219
    """
220
    op = opcodes.OpStartupInstance(instance_name=self.name,
221
                                   force=False,
222
                                   extra_args=None)
223
    cli.SubmitOpCode(op, cl=client)
224

    
225
  def ActivateDisks(self):
226
    """Encapsulates the activation of all disks of an instance.
227

    
228
    """
229
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
230
    cli.SubmitOpCode(op, cl=client)
231

    
232

    
233
def GetInstanceList(with_secondaries=None):
234
  """Get a list of instances on this cluster.
235

    
236
  """
237
  fields = ["name", "status", "admin_state"]
238

    
239
  if with_secondaries is not None:
240
    fields.append("snodes")
241

    
242
  result = client.QueryInstances([], fields)
243

    
244
  instances = []
245
  for fields in result:
246
    if with_secondaries is not None:
247
      (name, status, autostart, snodes) = fields
248

    
249
      if not snodes:
250
        continue
251

    
252
      for node in with_secondaries:
253
        if node in snodes:
254
          break
255
      else:
256
        continue
257

    
258
    else:
259
      (name, status, autostart) = fields
260

    
261
    instances.append(Instance(name, status, autostart))
262

    
263
  return instances
264

    
265

    
266
def GetNodeBootIDs():
267
  """Get a dict mapping nodes to boot IDs.
268

    
269
  """
270
  result = client.QueryNodes([], ["name", "bootid", "offline"])
271
  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
272

    
273

    
274
class Watcher(object):
275
  """Encapsulate the logic for restarting erronously halted virtual machines.
276

    
277
  The calling program should periodically instantiate me and call Run().
278
  This will traverse the list of instances, and make up to MAXTRIES attempts
279
  to restart machines that are down.
280

    
281
  """
282
  def __init__(self):
283
    master = client.QueryConfigValues(["master_node"])[0]
284
    if master != utils.HostInfo().name:
285
      raise NotMasterError("This is not the master node")
286
    self.instances = GetInstanceList()
287
    self.bootids = GetNodeBootIDs()
288
    self.started_instances = set()
289

    
290
  def Run(self):
291
    notepad = WatcherState()
292
    try:
293
      self.CheckInstances(notepad)
294
      self.CheckDisks(notepad)
295
      self.VerifyDisks()
296
    finally:
297
      notepad.Save()
298

    
299
  def CheckDisks(self, notepad):
300
    """Check all nodes for restarted ones.
301

    
302
    """
303
    check_nodes = []
304
    for name, (new_id, offline) in self.bootids.iteritems():
305
      old = notepad.GetNodeBootID(name)
306
      if new_id is None:
307
        # Bad node, not returning a boot id
308
        if not offline:
309
          logging.debug("Node %s missing boot id, skipping secondary checks",
310
                        name)
311
        continue
312
      if old != new_id:
313
        # Node's boot ID has changed, proably through a reboot.
314
        check_nodes.append(name)
315

    
316
    if check_nodes:
317
      # Activate disks for all instances with any of the checked nodes as a
318
      # secondary node.
319
      for instance in GetInstanceList(with_secondaries=check_nodes):
320
        if not instance.autostart:
321
          logging.info(("Skipping disk activation for non-autostart"
322
                        " instance %s"), instance.name)
323
          continue
324
        if instance.name in self.started_instances:
325
          # we already tried to start the instance, which should have
326
          # activated its drives (if they can be at all)
327
          continue
328
        try:
329
          logging.info("Activating disks for instance %s", instance.name)
330
          instance.ActivateDisks()
331
        except Exception:
332
          logging.exception("Error while activating disks for instance %s",
333
                            instance.name)
334

    
335
      # Keep changed boot IDs
336
      for name in check_nodes:
337
        notepad.SetNodeBootID(name, self.bootids[name])
338

    
339
  def CheckInstances(self, notepad):
340
    """Make a pass over the list of instances, restarting downed ones.
341

    
342
    """
343
    for instance in self.instances:
344
      if instance.state in BAD_STATES:
345
        n = notepad.NumberOfRestartAttempts(instance)
346

    
347
        if n > MAXTRIES:
348
          # stay quiet.
349
          continue
350
        elif n < MAXTRIES:
351
          last = " (Attempt #%d)" % (n + 1)
352
        else:
353
          notepad.RecordRestartAttempt(instance)
354
          logging.error("Could not restart %s after %d attempts, giving up",
355
                        instance.name, MAXTRIES)
356
          continue
357
        try:
358
          logging.info("Restarting %s%s",
359
                        instance.name, last)
360
          instance.Restart()
361
          self.started_instances.add(instance.name)
362
        except Exception:
363
          logging.exception("Erro while restarting instance %s", instance.name)
364

    
365
        notepad.RecordRestartAttempt(instance)
366
      elif instance.state in HELPLESS_STATES:
367
        if notepad.NumberOfRestartAttempts(instance):
368
          notepad.RemoveInstance(instance)
369
      else:
370
        if notepad.NumberOfRestartAttempts(instance):
371
          notepad.RemoveInstance(instance)
372
          logging.info("Restart of %s succeeded", instance.name)
373

    
374
  @staticmethod
375
  def VerifyDisks():
376
    """Run gnt-cluster verify-disks.
377

    
378
    """
379
    op = opcodes.OpVerifyDisks()
380
    result = cli.SubmitOpCode(op, cl=client)
381
    if not isinstance(result, (tuple, list)):
382
      logging.error("Can't get a valid result from verify-disks")
383
      return
384
    offline_disk_instances = result[2]
385
    if not offline_disk_instances:
386
      # nothing to do
387
      return
388
    logging.debug("Will activate disks for instances %s",
389
                  ", ".join(offline_disk_instances))
390
    # we submit only one job, and wait for it. not optimal, but spams
391
    # less the job queue
392
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
393
           for name in offline_disk_instances]
394
    job_id = cli.SendJob(job, cl=client)
395

    
396
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
397

    
398

    
399
def ParseOptions():
400
  """Parse the command line options.
401

    
402
  Returns:
403
    (options, args) as from OptionParser.parse_args()
404

    
405
  """
406
  parser = OptionParser(description="Ganeti cluster watcher",
407
                        usage="%prog [-d]",
408
                        version="%%prog (ganeti) %s" %
409
                        constants.RELEASE_VERSION)
410

    
411
  parser.add_option("-d", "--debug", dest="debug",
412
                    help="Write all messages to stderr",
413
                    default=False, action="store_true")
414
  options, args = parser.parse_args()
415
  return options, args
416

    
417

    
418
def main():
419
  """Main function.
420

    
421
  """
422
  global client
423

    
424
  options, args = ParseOptions()
425

    
426
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
427
                     stderr_logging=options.debug)
428

    
429
  try:
430
    client = cli.GetClient()
431

    
432
    try:
433
      watcher = Watcher()
434
    except errors.ConfigurationError:
435
      # Just exit if there's no configuration
436
      sys.exit(constants.EXIT_SUCCESS)
437

    
438
    watcher.Run()
439
  except SystemExit:
440
    raise
441
  except NotMasterError:
442
    logging.debug("Not master, exiting")
443
    sys.exit(constants.EXIT_NOTMASTER)
444
  except errors.ResolverError, err:
445
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
446
    sys.exit(constants.EXIT_NODESETUP_ERROR)
447
  except Exception, err:
448
    logging.error(str(err), exc_info=True)
449
    sys.exit(constants.EXIT_FAILURE)
450

    
451

    
452
if __name__ == '__main__':
453
  main()