Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 82d9caef

History | View | Annotate | Download (12.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  Args:
66
    s: The string to indent
67
    prefix: The string to prepend each line.
68

    
69
  """
70
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
71

    
72

    
73
class WatcherState(object):
74
  """Interface to a state file recording restart attempts.
75

    
76
  """
77
  def __init__(self):
78
    """Open, lock, read and parse the file.
79

    
80
    Raises exception on lock contention.
81

    
82
    """
83
    # The two-step dance below is necessary to allow both opening existing
84
    # file read/write and creating if not existing.  Vanilla open will truncate
85
    # an existing file -or- allow creating if not existing.
86
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
87
    self.statefile = os.fdopen(fd, 'w+')
88

    
89
    utils.LockFile(self.statefile.fileno())
90

    
91
    try:
92
      self._data = serializer.Load(self.statefile.read())
93
    except Exception, msg:
94
      # Ignore errors while loading the file and treat it as empty
95
      self._data = {}
96
      logging.warning(("Empty or invalid state file. Using defaults."
97
                       " Error message: %s"), msg)
98

    
99
    if "instance" not in self._data:
100
      self._data["instance"] = {}
101
    if "node" not in self._data:
102
      self._data["node"] = {}
103

    
104
    self._orig_data = serializer.Dump(self._data)
105

    
106
  def Save(self):
107
    """Save state to file, then unlock and close it.
108

    
109
    """
110
    assert self.statefile
111

    
112
    serialized_form = serializer.Dump(self._data)
113
    if self._orig_data == serialized_form:
114
      logging.debug("Data didn't change, just touching status file")
115
      os.utime(constants.WATCHER_STATEFILE, None)
116
      return
117

    
118
    # We need to make sure the file is locked before renaming it, otherwise
119
    # starting ganeti-watcher again at the same time will create a conflict.
120
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
121
                         data=serialized_form,
122
                         prewrite=utils.LockFile, close=False)
123
    self.statefile = os.fdopen(fd, 'w+')
124

    
125
  def Close(self):
126
    """Unlock configuration file and close it.
127

    
128
    """
129
    assert self.statefile
130

    
131
    # Files are automatically unlocked when closing them
132
    self.statefile.close()
133
    self.statefile = None
134

    
135
  def GetNodeBootID(self, name):
136
    """Returns the last boot ID of a node or None.
137

    
138
    """
139
    ndata = self._data["node"]
140

    
141
    if name in ndata and KEY_BOOT_ID in ndata[name]:
142
      return ndata[name][KEY_BOOT_ID]
143
    return None
144

    
145
  def SetNodeBootID(self, name, bootid):
146
    """Sets the boot ID of a node.
147

    
148
    """
149
    assert bootid
150

    
151
    ndata = self._data["node"]
152

    
153
    if name not in ndata:
154
      ndata[name] = {}
155

    
156
    ndata[name][KEY_BOOT_ID] = bootid
157

    
158
  def NumberOfRestartAttempts(self, instance):
159
    """Returns number of previous restart attempts.
160

    
161
    Args:
162
      instance - the instance to look up.
163

    
164
    """
165
    idata = self._data["instance"]
166

    
167
    if instance.name in idata:
168
      return idata[instance.name][KEY_RESTART_COUNT]
169

    
170
    return 0
171

    
172
  def RecordRestartAttempt(self, instance):
173
    """Record a restart attempt.
174

    
175
    Args:
176
      instance - the instance being restarted
177

    
178
    """
179
    idata = self._data["instance"]
180

    
181
    if instance.name not in idata:
182
      inst = idata[instance.name] = {}
183
    else:
184
      inst = idata[instance.name]
185

    
186
    inst[KEY_RESTART_WHEN] = time.time()
187
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
188

    
189
  def RemoveInstance(self, instance):
190
    """Update state to reflect that a machine is running, i.e. remove record.
191

    
192
    Args:
193
      instance - the instance to remove from books
194

    
195
    This method removes the record for a named instance.
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  Methods:
208
    Restart(): issue a command to restart the represented machine.
209

    
210
  """
211
  def __init__(self, name, state, autostart):
212
    self.name = name
213
    self.state = state
214
    self.autostart = autostart
215

    
216
  def Restart(self):
217
    """Encapsulates the start of an instance.
218

    
219
    """
220
    op = opcodes.OpStartupInstance(instance_name=self.name,
221
                                   force=False,
222
                                   extra_args=None)
223
    cli.SubmitOpCode(op, cl=client)
224

    
225
  def ActivateDisks(self):
226
    """Encapsulates the activation of all disks of an instance.
227

    
228
    """
229
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
230
    cli.SubmitOpCode(op, cl=client)
231

    
232

    
233
def GetInstanceList(with_secondaries=None):
234
  """Get a list of instances on this cluster.
235

    
236
  """
237
  fields = ["name", "status", "admin_state"]
238

    
239
  if with_secondaries is not None:
240
    fields.append("snodes")
241

    
242
  result = client.QueryInstances([], fields)
243

    
244
  instances = []
245
  for fields in result:
246
    if with_secondaries is not None:
247
      (name, status, autostart, snodes) = fields
248

    
249
      if not snodes:
250
        continue
251

    
252
      for node in with_secondaries:
253
        if node in snodes:
254
          break
255
      else:
256
        continue
257

    
258
    else:
259
      (name, status, autostart) = fields
260

    
261
    instances.append(Instance(name, status, autostart))
262

    
263
  return instances
264

    
265

    
266
def GetNodeBootIDs():
267
  """Get a dict mapping nodes to boot IDs.
268

    
269
  """
270
  result = client.QueryNodes([], ["name", "bootid"])
271
  return dict([(name, bootid) for name, bootid in result])
272

    
273

    
274
class Watcher(object):
275
  """Encapsulate the logic for restarting erronously halted virtual machines.
276

    
277
  The calling program should periodically instantiate me and call Run().
278
  This will traverse the list of instances, and make up to MAXTRIES attempts
279
  to restart machines that are down.
280

    
281
  """
282
  def __init__(self):
283
    master = client.QueryConfigValues(["master_node"])[0]
284
    if master != utils.HostInfo().name:
285
      raise NotMasterError("This is not the master node")
286
    self.instances = GetInstanceList()
287
    self.bootids = GetNodeBootIDs()
288
    self.started_instances = set()
289

    
290
  def Run(self):
291
    notepad = WatcherState()
292
    try:
293
      self.CheckInstances(notepad)
294
      self.CheckDisks(notepad)
295
      self.VerifyDisks()
296
    finally:
297
      notepad.Save()
298

    
299
  def CheckDisks(self, notepad):
300
    """Check all nodes for restarted ones.
301

    
302
    """
303
    check_nodes = []
304
    for name, new_id in self.bootids.iteritems():
305
      old = notepad.GetNodeBootID(name)
306
      if new_id is None:
307
        # Bad node, not returning a boot id
308
        logging.debug("Node %s missing boot id, skipping secondary checks",
309
                      name)
310
        continue
311
      if old != new_id:
312
        # Node's boot ID has changed, proably through a reboot.
313
        check_nodes.append(name)
314

    
315
    if check_nodes:
316
      # Activate disks for all instances with any of the checked nodes as a
317
      # secondary node.
318
      for instance in GetInstanceList(with_secondaries=check_nodes):
319
        if not instance.autostart:
320
          logging.info(("Skipping disk activation for non-autostart"
321
                        " instance %s"), instance.name)
322
          continue
323
        if instance.name in self.started_instances:
324
          # we already tried to start the instance, which should have
325
          # activated its drives (if they can be at all)
326
          continue
327
        try:
328
          logging.info("Activating disks for instance %s", instance.name)
329
          instance.ActivateDisks()
330
        except Exception:
331
          logging.exception("Error while activating disks for instance %s",
332
                            instance.name)
333

    
334
      # Keep changed boot IDs
335
      for name in check_nodes:
336
        notepad.SetNodeBootID(name, self.bootids[name])
337

    
338
  def CheckInstances(self, notepad):
339
    """Make a pass over the list of instances, restarting downed ones.
340

    
341
    """
342
    for instance in self.instances:
343
      if instance.state in BAD_STATES:
344
        n = notepad.NumberOfRestartAttempts(instance)
345

    
346
        if n > MAXTRIES:
347
          # stay quiet.
348
          continue
349
        elif n < MAXTRIES:
350
          last = " (Attempt #%d)" % (n + 1)
351
        else:
352
          notepad.RecordRestartAttempt(instance)
353
          logging.error("Could not restart %s after %d attempts, giving up",
354
                        instance.name, MAXTRIES)
355
          continue
356
        try:
357
          logging.info("Restarting %s%s",
358
                        instance.name, last)
359
          instance.Restart()
360
          self.started_instances.add(instance.name)
361
        except Exception:
362
          logging.exception("Erro while restarting instance %s", instance.name)
363

    
364
        notepad.RecordRestartAttempt(instance)
365
      elif instance.state in HELPLESS_STATES:
366
        if notepad.NumberOfRestartAttempts(instance):
367
          notepad.RemoveInstance(instance)
368
      else:
369
        if notepad.NumberOfRestartAttempts(instance):
370
          notepad.RemoveInstance(instance)
371
          logging.info("Restart of %s succeeded", instance.name)
372

    
373
  @staticmethod
374
  def VerifyDisks():
375
    """Run gnt-cluster verify-disks.
376

    
377
    """
378
    op = opcodes.OpVerifyDisks()
379
    result = cli.SubmitOpCode(op, cl=client)
380
    if not isinstance(result, (tuple, list)):
381
      logging.error("Can't get a valid result from verify-disks")
382
      return
383
    offline_disk_instances = result[2]
384
    if not offline_disk_instances:
385
      # nothing to do
386
      return
387
    logging.debug("Will activate disks for instances %s",
388
                  ", ".join(offline_disk_instances))
389
    # we submit only one job, and wait for it. not optimal, but spams
390
    # less the job queue
391
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
392
           for name in offline_disk_instances]
393
    job_id = cli.SendJob(job, cl=client)
394

    
395
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
396

    
397

    
398
def ParseOptions():
399
  """Parse the command line options.
400

    
401
  Returns:
402
    (options, args) as from OptionParser.parse_args()
403

    
404
  """
405
  parser = OptionParser(description="Ganeti cluster watcher",
406
                        usage="%prog [-d]",
407
                        version="%%prog (ganeti) %s" %
408
                        constants.RELEASE_VERSION)
409

    
410
  parser.add_option("-d", "--debug", dest="debug",
411
                    help="Write all messages to stderr",
412
                    default=False, action="store_true")
413
  options, args = parser.parse_args()
414
  return options, args
415

    
416

    
417
def main():
418
  """Main function.
419

    
420
  """
421
  global client
422

    
423
  options, args = ParseOptions()
424

    
425
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
426
                     stderr_logging=options.debug)
427

    
428
  try:
429
    client = cli.GetClient()
430

    
431
    try:
432
      watcher = Watcher()
433
    except errors.ConfigurationError:
434
      # Just exit if there's no configuration
435
      sys.exit(constants.EXIT_SUCCESS)
436

    
437
    watcher.Run()
438
  except SystemExit:
439
    raise
440
  except NotMasterError:
441
    logging.debug("Not master, exiting")
442
    sys.exit(constants.EXIT_NOTMASTER)
443
  except errors.ResolverError, err:
444
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
445
    sys.exit(constants.EXIT_NODESETUP_ERROR)
446
  except Exception, err:
447
    logging.error(str(err), exc_info=True)
448
    sys.exit(constants.EXIT_FAILURE)
449

    
450

    
451
if __name__ == '__main__':
452
  main()