Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 2859b87b

History | View | Annotate | Download (12.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import logger
42
from ganeti import cli
43

    
44

    
45
MAXTRIES = 5
46
BAD_STATES = ['ERROR_down']
47
HELPLESS_STATES = ['ERROR_nodedown']
48
NOTICE = 'NOTICE'
49
ERROR = 'ERROR'
50
KEY_RESTART_COUNT = "restart_count"
51
KEY_RESTART_WHEN = "restart_when"
52
KEY_BOOT_ID = "bootid"
53

    
54

    
55
# Global client object
56
client = None
57

    
58

    
59
class NotMasterError(errors.GenericError):
60
  """Exception raised when this host is not the master."""
61

    
62

    
63
def Indent(s, prefix='| '):
64
  """Indent a piece of text with a given prefix before each line.
65

    
66
  Args:
67
    s: The string to indent
68
    prefix: The string to prepend each line.
69

    
70
  """
71
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72

    
73

    
74
class WatcherState(object):
75
  """Interface to a state file recording restart attempts.
76

    
77
  """
78
  def __init__(self):
79
    """Open, lock, read and parse the file.
80

    
81
    Raises exception on lock contention.
82

    
83
    """
84
    # The two-step dance below is necessary to allow both opening existing
85
    # file read/write and creating if not existing.  Vanilla open will truncate
86
    # an existing file -or- allow creating if not existing.
87
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
88
    self.statefile = os.fdopen(fd, 'w+')
89

    
90
    utils.LockFile(self.statefile.fileno())
91

    
92
    try:
93
      self._data = serializer.Load(self.statefile.read())
94
    except Exception, msg:
95
      # Ignore errors while loading the file and treat it as empty
96
      self._data = {}
97
      logging.warning(("Empty or invalid state file. Using defaults."
98
                       " Error message: %s"), msg)
99

    
100
    if "instance" not in self._data:
101
      self._data["instance"] = {}
102
    if "node" not in self._data:
103
      self._data["node"] = {}
104

    
105
    self._orig_data = serializer.Dump(self._data)
106

    
107
  def Save(self):
108
    """Save state to file, then unlock and close it.
109

    
110
    """
111
    assert self.statefile
112

    
113
    serialized_form = serializer.Dump(self._data)
114
    if self._orig_data == serialized_form:
115
      logging.debug("Data didn't change, just touching status file")
116
      os.utime(constants.WATCHER_STATEFILE, None)
117
      return
118

    
119
    # We need to make sure the file is locked before renaming it, otherwise
120
    # starting ganeti-watcher again at the same time will create a conflict.
121
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
122
                         data=serialized_form,
123
                         prewrite=utils.LockFile, close=False)
124
    self.statefile = os.fdopen(fd, 'w+')
125

    
126
  def Close(self):
127
    """Unlock configuration file and close it.
128

    
129
    """
130
    assert self.statefile
131

    
132
    # Files are automatically unlocked when closing them
133
    self.statefile.close()
134
    self.statefile = None
135

    
136
  def GetNodeBootID(self, name):
137
    """Returns the last boot ID of a node or None.
138

    
139
    """
140
    ndata = self._data["node"]
141

    
142
    if name in ndata and KEY_BOOT_ID in ndata[name]:
143
      return ndata[name][KEY_BOOT_ID]
144
    return None
145

    
146
  def SetNodeBootID(self, name, bootid):
147
    """Sets the boot ID of a node.
148

    
149
    """
150
    assert bootid
151

    
152
    ndata = self._data["node"]
153

    
154
    if name not in ndata:
155
      ndata[name] = {}
156

    
157
    ndata[name][KEY_BOOT_ID] = bootid
158

    
159
  def NumberOfRestartAttempts(self, instance):
160
    """Returns number of previous restart attempts.
161

    
162
    Args:
163
      instance - the instance to look up.
164

    
165
    """
166
    idata = self._data["instance"]
167

    
168
    if instance.name in idata:
169
      return idata[instance.name][KEY_RESTART_COUNT]
170

    
171
    return 0
172

    
173
  def RecordRestartAttempt(self, instance):
174
    """Record a restart attempt.
175

    
176
    Args:
177
      instance - the instance being restarted
178

    
179
    """
180
    idata = self._data["instance"]
181

    
182
    if instance.name not in idata:
183
      inst = idata[instance.name] = {}
184
    else:
185
      inst = idata[instance.name]
186

    
187
    inst[KEY_RESTART_WHEN] = time.time()
188
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
189

    
190
  def RemoveInstance(self, instance):
191
    """Update state to reflect that a machine is running, i.e. remove record.
192

    
193
    Args:
194
      instance - the instance to remove from books
195

    
196
    This method removes the record for a named instance.
197

    
198
    """
199
    idata = self._data["instance"]
200

    
201
    if instance.name in idata:
202
      del idata[instance.name]
203

    
204

    
205
class Instance(object):
206
  """Abstraction for a Virtual Machine instance.
207

    
208
  Methods:
209
    Restart(): issue a command to restart the represented machine.
210

    
211
  """
212
  def __init__(self, name, state, autostart):
213
    self.name = name
214
    self.state = state
215
    self.autostart = autostart
216

    
217
  def Restart(self):
218
    """Encapsulates the start of an instance.
219

    
220
    """
221
    op = opcodes.OpStartupInstance(instance_name=self.name,
222
                                   force=False,
223
                                   extra_args=None)
224
    cli.SubmitOpCode(op, cl=client)
225

    
226
  def ActivateDisks(self):
227
    """Encapsulates the activation of all disks of an instance.
228

    
229
    """
230
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
231
    cli.SubmitOpCode(op, cl=client)
232

    
233

    
234
def GetInstanceList(with_secondaries=None):
235
  """Get a list of instances on this cluster.
236

    
237
  """
238
  fields = ["name", "status", "admin_state"]
239

    
240
  if with_secondaries is not None:
241
    fields.append("snodes")
242

    
243
  result = client.QueryInstances([], fields)
244

    
245
  instances = []
246
  for fields in result:
247
    if with_secondaries is not None:
248
      (name, status, autostart, snodes) = fields
249

    
250
      if not snodes:
251
        continue
252

    
253
      for node in with_secondaries:
254
        if node in snodes:
255
          break
256
      else:
257
        continue
258

    
259
    else:
260
      (name, status, autostart) = fields
261

    
262
    instances.append(Instance(name, status, autostart))
263

    
264
  return instances
265

    
266

    
267
def GetNodeBootIDs():
268
  """Get a dict mapping nodes to boot IDs.
269

    
270
  """
271
  result = client.QueryNodes([], ["name", "bootid"])
272
  return dict([(name, bootid) for name, bootid in result])
273

    
274

    
275
class Watcher(object):
276
  """Encapsulate the logic for restarting erronously halted virtual machines.
277

    
278
  The calling program should periodically instantiate me and call Run().
279
  This will traverse the list of instances, and make up to MAXTRIES attempts
280
  to restart machines that are down.
281

    
282
  """
283
  def __init__(self):
284
    master = client.QueryConfigValues(["master_node"])[0]
285
    if master != utils.HostInfo().name:
286
      raise NotMasterError("This is not the master node")
287
    self.instances = GetInstanceList()
288
    self.bootids = GetNodeBootIDs()
289
    self.started_instances = set()
290

    
291
  def Run(self):
292
    notepad = WatcherState()
293
    try:
294
      self.CheckInstances(notepad)
295
      self.CheckDisks(notepad)
296
      self.VerifyDisks()
297
    finally:
298
      notepad.Save()
299

    
300
  def CheckDisks(self, notepad):
301
    """Check all nodes for restarted ones.
302

    
303
    """
304
    check_nodes = []
305
    for name, new_id in self.bootids.iteritems():
306
      old = notepad.GetNodeBootID(name)
307
      if new_id is None:
308
        # Bad node, not returning a boot id
309
        logging.debug("Node %s missing boot id, skipping secondary checks",
310
                      name)
311
        continue
312
      if old != new_id:
313
        # Node's boot ID has changed, proably through a reboot.
314
        check_nodes.append(name)
315

    
316
    if check_nodes:
317
      # Activate disks for all instances with any of the checked nodes as a
318
      # secondary node.
319
      for instance in GetInstanceList(with_secondaries=check_nodes):
320
        if not instance.autostart:
321
          logging.info(("Skipping disk activation for non-autostart"
322
                        " instance %s"), instance.name)
323
          continue
324
        if instance.name in self.started_instances:
325
          # we already tried to start the instance, which should have
326
          # activated its drives (if they can be at all)
327
          continue
328
        try:
329
          logging.info("Activating disks for instance %s", instance.name)
330
          instance.ActivateDisks()
331
        except Exception:
332
          logging.exception("Error while activating disks for instance %s",
333
                            instance.name)
334

    
335
      # Keep changed boot IDs
336
      for name in check_nodes:
337
        notepad.SetNodeBootID(name, self.bootids[name])
338

    
339
  def CheckInstances(self, notepad):
340
    """Make a pass over the list of instances, restarting downed ones.
341

    
342
    """
343
    for instance in self.instances:
344
      if instance.state in BAD_STATES:
345
        n = notepad.NumberOfRestartAttempts(instance)
346

    
347
        if n > MAXTRIES:
348
          # stay quiet.
349
          continue
350
        elif n < MAXTRIES:
351
          last = " (Attempt #%d)" % (n + 1)
352
        else:
353
          notepad.RecordRestartAttempt(instance)
354
          logging.error("Could not restart %s after %d attempts, giving up",
355
                        instance.name, MAXTRIES)
356
          continue
357
        try:
358
          logging.info("Restarting %s%s",
359
                        instance.name, last)
360
          instance.Restart()
361
          self.started_instances.add(instance.name)
362
        except Exception:
363
          logging.exception("Erro while restarting instance %s", instance.name)
364

    
365
        notepad.RecordRestartAttempt(instance)
366
      elif instance.state in HELPLESS_STATES:
367
        if notepad.NumberOfRestartAttempts(instance):
368
          notepad.RemoveInstance(instance)
369
      else:
370
        if notepad.NumberOfRestartAttempts(instance):
371
          notepad.RemoveInstance(instance)
372
          logging.info("Restart of %s succeeded", instance.name)
373

    
374
  @staticmethod
375
  def VerifyDisks():
376
    """Run gnt-cluster verify-disks.
377

    
378
    """
379
    op = opcodes.OpVerifyDisks()
380
    result = cli.SubmitOpCode(op, cl=client)
381
    if not isinstance(result, (tuple, list)):
382
      logging.error("Can't get a valid result from verify-disks")
383
      return
384
    offline_disk_instances = result[2]
385
    if not offline_disk_instances:
386
      # nothing to do
387
      return
388
    logging.debug("Will activate disks for instances %s",
389
                  ", ".join(offline_disk_instances))
390
    # we submit only one job, and wait for it. not optimal, but spams
391
    # less the job queue
392
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
393
           for name in offline_disk_instances]
394
    job_id = cli.SendJob(job, cl=client)
395

    
396
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
397

    
398

    
399
def ParseOptions():
400
  """Parse the command line options.
401

    
402
  Returns:
403
    (options, args) as from OptionParser.parse_args()
404

    
405
  """
406
  parser = OptionParser(description="Ganeti cluster watcher",
407
                        usage="%prog [-d]",
408
                        version="%%prog (ganeti) %s" %
409
                        constants.RELEASE_VERSION)
410

    
411
  parser.add_option("-d", "--debug", dest="debug",
412
                    help="Write all messages to stderr",
413
                    default=False, action="store_true")
414
  options, args = parser.parse_args()
415
  return options, args
416

    
417

    
418
def main():
419
  """Main function.
420

    
421
  """
422
  global client
423

    
424
  options, args = ParseOptions()
425

    
426
  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
427
                      stderr_logging=options.debug)
428

    
429
  try:
430
    client = cli.GetClient()
431

    
432
    try:
433
      watcher = Watcher()
434
    except errors.ConfigurationError:
435
      # Just exit if there's no configuration
436
      sys.exit(constants.EXIT_SUCCESS)
437

    
438
    watcher.Run()
439
  except SystemExit:
440
    raise
441
  except NotMasterError:
442
    logging.debug("Not master, exiting")
443
    sys.exit(constants.EXIT_NOTMASTER)
444
  except errors.ResolverError, err:
445
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
446
    sys.exit(constants.EXIT_NODESETUP_ERROR)
447
  except Exception, err:
448
    logging.error(str(err), exc_info=True)
449
    sys.exit(constants.EXIT_FAILURE)
450

    
451

    
452
if __name__ == '__main__':
453
  main()