Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 37b77b18

History | View | Annotate | Download (12.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import ssconf
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import logger
43
from ganeti import cli
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['ERROR_down']
48
HELPLESS_STATES = ['ERROR_nodedown']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
# Global client object
57
client = None
58

    
59

    
60
class NotMasterError(errors.GenericError):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  Args:
68
    s: The string to indent
69
    prefix: The string to prepend each line.
70

    
71
  """
72
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
73

    
74

    
75
class WatcherState(object):
76
  """Interface to a state file recording restart attempts.
77

    
78
  """
79
  def __init__(self):
80
    """Open, lock, read and parse the file.
81

    
82
    Raises exception on lock contention.
83

    
84
    """
85
    # The two-step dance below is necessary to allow both opening existing
86
    # file read/write and creating if not existing.  Vanilla open will truncate
87
    # an existing file -or- allow creating if not existing.
88
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
89
    self.statefile = os.fdopen(fd, 'w+')
90

    
91
    utils.LockFile(self.statefile.fileno())
92

    
93
    try:
94
      self._data = serializer.Load(self.statefile.read())
95
    except Exception, msg:
96
      # Ignore errors while loading the file and treat it as empty
97
      self._data = {}
98
      logging.warning(("Empty or invalid state file. Using defaults."
99
                       " Error message: %s"), msg)
100

    
101
    if "instance" not in self._data:
102
      self._data["instance"] = {}
103
    if "node" not in self._data:
104
      self._data["node"] = {}
105

    
106
    self._orig_data = serializer.Dump(self._data)
107

    
108
  def Save(self):
109
    """Save state to file, then unlock and close it.
110

    
111
    """
112
    assert self.statefile
113

    
114
    serialized_form = serializer.Dump(self._data)
115
    if self._orig_data == serialized_form:
116
      logging.debug("Data didn't change, just touching status file")
117
      os.utime(constants.WATCHER_STATEFILE, None)
118
      return
119

    
120
    # We need to make sure the file is locked before renaming it, otherwise
121
    # starting ganeti-watcher again at the same time will create a conflict.
122
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
123
                         data=serialized_form,
124
                         prewrite=utils.LockFile, close=False)
125
    self.statefile = os.fdopen(fd, 'w+')
126

    
127
  def Close(self):
128
    """Unlock configuration file and close it.
129

    
130
    """
131
    assert self.statefile
132

    
133
    # Files are automatically unlocked when closing them
134
    self.statefile.close()
135
    self.statefile = None
136

    
137
  def GetNodeBootID(self, name):
138
    """Returns the last boot ID of a node or None.
139

    
140
    """
141
    ndata = self._data["node"]
142

    
143
    if name in ndata and KEY_BOOT_ID in ndata[name]:
144
      return ndata[name][KEY_BOOT_ID]
145
    return None
146

    
147
  def SetNodeBootID(self, name, bootid):
148
    """Sets the boot ID of a node.
149

    
150
    """
151
    assert bootid
152

    
153
    ndata = self._data["node"]
154

    
155
    if name not in ndata:
156
      ndata[name] = {}
157

    
158
    ndata[name][KEY_BOOT_ID] = bootid
159

    
160
  def NumberOfRestartAttempts(self, instance):
161
    """Returns number of previous restart attempts.
162

    
163
    Args:
164
      instance - the instance to look up.
165

    
166
    """
167
    idata = self._data["instance"]
168

    
169
    if instance.name in idata:
170
      return idata[instance.name][KEY_RESTART_COUNT]
171

    
172
    return 0
173

    
174
  def RecordRestartAttempt(self, instance):
175
    """Record a restart attempt.
176

    
177
    Args:
178
      instance - the instance being restarted
179

    
180
    """
181
    idata = self._data["instance"]
182

    
183
    if instance.name not in idata:
184
      inst = idata[instance.name] = {}
185
    else:
186
      inst = idata[instance.name]
187

    
188
    inst[KEY_RESTART_WHEN] = time.time()
189
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
190

    
191
  def RemoveInstance(self, instance):
192
    """Update state to reflect that a machine is running, i.e. remove record.
193

    
194
    Args:
195
      instance - the instance to remove from books
196

    
197
    This method removes the record for a named instance.
198

    
199
    """
200
    idata = self._data["instance"]
201

    
202
    if instance.name in idata:
203
      del idata[instance.name]
204

    
205

    
206
class Instance(object):
207
  """Abstraction for a Virtual Machine instance.
208

    
209
  Methods:
210
    Restart(): issue a command to restart the represented machine.
211

    
212
  """
213
  def __init__(self, name, state, autostart):
214
    self.name = name
215
    self.state = state
216
    self.autostart = autostart
217

    
218
  def Restart(self):
219
    """Encapsulates the start of an instance.
220

    
221
    """
222
    op = opcodes.OpStartupInstance(instance_name=self.name,
223
                                   force=False,
224
                                   extra_args=None)
225
    cli.SubmitOpCode(op, cl=client)
226

    
227
  def ActivateDisks(self):
228
    """Encapsulates the activation of all disks of an instance.
229

    
230
    """
231
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
232
    cli.SubmitOpCode(op, cl=client)
233

    
234

    
235
def GetInstanceList(with_secondaries=None):
236
  """Get a list of instances on this cluster.
237

    
238
  """
239
  fields = ["name", "status", "admin_state"]
240

    
241
  if with_secondaries is not None:
242
    fields.append("snodes")
243

    
244
  result = client.QueryInstances([], fields)
245

    
246
  instances = []
247
  for fields in result:
248
    if with_secondaries is not None:
249
      (name, status, autostart, snodes) = fields
250

    
251
      if not snodes:
252
        continue
253

    
254
      for node in with_secondaries:
255
        if node in snodes:
256
          break
257
      else:
258
        continue
259

    
260
    else:
261
      (name, status, autostart) = fields
262

    
263
    instances.append(Instance(name, status, autostart))
264

    
265
  return instances
266

    
267

    
268
def GetNodeBootIDs():
269
  """Get a dict mapping nodes to boot IDs.
270

    
271
  """
272
  result = client.QueryNodes([], ["name", "bootid"])
273
  return dict([(name, bootid) for name, bootid in result])
274

    
275

    
276
class Watcher(object):
277
  """Encapsulate the logic for restarting erronously halted virtual machines.
278

    
279
  The calling program should periodically instantiate me and call Run().
280
  This will traverse the list of instances, and make up to MAXTRIES attempts
281
  to restart machines that are down.
282

    
283
  """
284
  def __init__(self):
285
    sstore = ssconf.SimpleStore()
286
    master = sstore.GetMasterNode()
287
    if master != utils.HostInfo().name:
288
      raise NotMasterError("This is not the master node")
289
    self.instances = GetInstanceList()
290
    self.bootids = GetNodeBootIDs()
291
    self.started_instances = set()
292

    
293
  def Run(self):
294
    notepad = WatcherState()
295
    try:
296
      self.CheckInstances(notepad)
297
      self.CheckDisks(notepad)
298
      self.VerifyDisks()
299
    finally:
300
      notepad.Save()
301

    
302
  def CheckDisks(self, notepad):
303
    """Check all nodes for restarted ones.
304

    
305
    """
306
    check_nodes = []
307
    for name, new_id in self.bootids.iteritems():
308
      old = notepad.GetNodeBootID(name)
309
      if new_id is None:
310
        # Bad node, not returning a boot id
311
        logging.debug("Node %s missing boot id, skipping secondary checks",
312
                      name)
313
        continue
314
      if old != new_id:
315
        # Node's boot ID has changed, proably through a reboot.
316
        check_nodes.append(name)
317

    
318
    if check_nodes:
319
      # Activate disks for all instances with any of the checked nodes as a
320
      # secondary node.
321
      for instance in GetInstanceList(with_secondaries=check_nodes):
322
        if not instance.autostart:
323
          logging.info(("Skipping disk activation for non-autostart"
324
                        " instance %s"), instance.name)
325
          continue
326
        if instance.name in self.started_instances:
327
          # we already tried to start the instance, which should have
328
          # activated its drives (if they can be at all)
329
          continue
330
        try:
331
          logging.info("Activating disks for instance %s", instance.name)
332
          instance.ActivateDisks()
333
        except Exception:
334
          logging.exception("Error while activating disks for instance %s",
335
                            instance.name)
336

    
337
      # Keep changed boot IDs
338
      for name in check_nodes:
339
        notepad.SetNodeBootID(name, self.bootids[name])
340

    
341
  def CheckInstances(self, notepad):
342
    """Make a pass over the list of instances, restarting downed ones.
343

    
344
    """
345
    for instance in self.instances:
346
      if instance.state in BAD_STATES:
347
        n = notepad.NumberOfRestartAttempts(instance)
348

    
349
        if n > MAXTRIES:
350
          # stay quiet.
351
          continue
352
        elif n < MAXTRIES:
353
          last = " (Attempt #%d)" % (n + 1)
354
        else:
355
          notepad.RecordRestartAttempt(instance)
356
          logging.error("Could not restart %s after %d attempts, giving up",
357
                        instance.name, MAXTRIES)
358
          continue
359
        try:
360
          logging.info("Restarting %s%s",
361
                        instance.name, last)
362
          instance.Restart()
363
          self.started_instances.add(instance.name)
364
        except Exception:
365
          logging.exception("Erro while restarting instance %s", instance.name)
366

    
367
        notepad.RecordRestartAttempt(instance)
368
      elif instance.state in HELPLESS_STATES:
369
        if notepad.NumberOfRestartAttempts(instance):
370
          notepad.RemoveInstance(instance)
371
      else:
372
        if notepad.NumberOfRestartAttempts(instance):
373
          notepad.RemoveInstance(instance)
374
          logging.info("Restart of %s succeeded", instance.name)
375

    
376
  @staticmethod
377
  def VerifyDisks():
378
    """Run gnt-cluster verify-disks.
379

    
380
    """
381
    op = opcodes.OpVerifyDisks()
382
    result = cli.SubmitOpCode(op, cl=client)
383
    if not isinstance(result, (tuple, list)):
384
      logging.error("Can't get a valid result from verify-disks")
385
      return
386
    offline_disk_instances = result[2]
387
    if not offline_disk_instances:
388
      # nothing to do
389
      return
390
    logging.debug("Will activate disks for instances %s",
391
                  ", ".join(offline_disk_instances))
392
    # we submit only one job, and wait for it. not optimal, but spams
393
    # less the job queue
394
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
395
           for name in offline_disk_instances]
396
    job_id = cli.SendJob(job, cl=client)
397

    
398
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
399

    
400

    
401
def ParseOptions():
402
  """Parse the command line options.
403

    
404
  Returns:
405
    (options, args) as from OptionParser.parse_args()
406

    
407
  """
408
  parser = OptionParser(description="Ganeti cluster watcher",
409
                        usage="%prog [-d]",
410
                        version="%%prog (ganeti) %s" %
411
                        constants.RELEASE_VERSION)
412

    
413
  parser.add_option("-d", "--debug", dest="debug",
414
                    help="Write all messages to stderr",
415
                    default=False, action="store_true")
416
  options, args = parser.parse_args()
417
  return options, args
418

    
419

    
420
def main():
421
  """Main function.
422

    
423
  """
424
  global client
425

    
426
  options, args = ParseOptions()
427

    
428
  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
429
                      stderr_logging=options.debug)
430

    
431
  try:
432
    client = cli.GetClient()
433

    
434
    try:
435
      watcher = Watcher()
436
    except errors.ConfigurationError:
437
      # Just exit if there's no configuration
438
      sys.exit(constants.EXIT_SUCCESS)
439

    
440
    watcher.Run()
441
  except SystemExit:
442
    raise
443
  except NotMasterError:
444
    logging.debug("Not master, exiting")
445
    sys.exit(constants.EXIT_NOTMASTER)
446
  except errors.ResolverError, err:
447
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
448
    sys.exit(constants.EXIT_NODESETUP_ERROR)
449
  except Exception, err:
450
    logging.error(str(err), exc_info=True)
451
    sys.exit(constants.EXIT_FAILURE)
452

    
453

    
454
if __name__ == '__main__':
455
  main()