Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ b7309a0d

History | View | Annotate | Download (12.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import ssconf
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import logger
43
from ganeti import cli
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['ERROR_down']
48
HELPLESS_STATES = ['ERROR_nodedown']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
# Global client object
57
client = None
58

    
59

    
60
class NotMasterError(errors.GenericError):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  Args:
68
    s: The string to indent
69
    prefix: The string to prepend each line.
70

    
71
  """
72
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
73

    
74

    
75
class WatcherState(object):
76
  """Interface to a state file recording restart attempts.
77

    
78
  """
79
  def __init__(self):
80
    """Open, lock, read and parse the file.
81

    
82
    Raises exception on lock contention.
83

    
84
    """
85
    # The two-step dance below is necessary to allow both opening existing
86
    # file read/write and creating if not existing.  Vanilla open will truncate
87
    # an existing file -or- allow creating if not existing.
88
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
89
    self.statefile = os.fdopen(fd, 'w+')
90

    
91
    utils.LockFile(self.statefile.fileno())
92

    
93
    try:
94
      self._data = serializer.Load(self.statefile.read())
95
    except Exception, msg:
96
      # Ignore errors while loading the file and treat it as empty
97
      self._data = {}
98
      logging.warning(("Empty or invalid state file. Using defaults."
99
                       " Error message: %s"), msg)
100

    
101
    if "instance" not in self._data:
102
      self._data["instance"] = {}
103
    if "node" not in self._data:
104
      self._data["node"] = {}
105

    
106
    self._orig_data = serializer.Dump(self._data)
107

    
108
  def Save(self):
109
    """Save state to file, then unlock and close it.
110

    
111
    """
112
    assert self.statefile
113

    
114
    serialized_form = serializer.Dump(self._data)
115
    if self._orig_data == serialized_form:
116
      logging.debug("Data didn't change, just touching status file")
117
      os.utime(constants.WATCHER_STATEFILE, None)
118
      return
119

    
120
    # We need to make sure the file is locked before renaming it, otherwise
121
    # starting ganeti-watcher again at the same time will create a conflict.
122
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
123
                         data=serialized_form,
124
                         prewrite=utils.LockFile, close=False)
125
    self.statefile = os.fdopen(fd, 'w+')
126

    
127
  def Close(self):
128
    """Unlock configuration file and close it.
129

    
130
    """
131
    assert self.statefile
132

    
133
    # Files are automatically unlocked when closing them
134
    self.statefile.close()
135
    self.statefile = None
136

    
137
  def GetNodeBootID(self, name):
138
    """Returns the last boot ID of a node or None.
139

    
140
    """
141
    ndata = self._data["node"]
142

    
143
    if name in ndata and KEY_BOOT_ID in ndata[name]:
144
      return ndata[name][KEY_BOOT_ID]
145
    return None
146

    
147
  def SetNodeBootID(self, name, bootid):
148
    """Sets the boot ID of a node.
149

    
150
    """
151
    assert bootid
152

    
153
    ndata = self._data["node"]
154

    
155
    if name not in ndata:
156
      ndata[name] = {}
157

    
158
    ndata[name][KEY_BOOT_ID] = bootid
159

    
160
  def NumberOfRestartAttempts(self, instance):
161
    """Returns number of previous restart attempts.
162

    
163
    Args:
164
      instance - the instance to look up.
165

    
166
    """
167
    idata = self._data["instance"]
168

    
169
    if instance.name in idata:
170
      return idata[instance.name][KEY_RESTART_COUNT]
171

    
172
    return 0
173

    
174
  def RecordRestartAttempt(self, instance):
175
    """Record a restart attempt.
176

    
177
    Args:
178
      instance - the instance being restarted
179

    
180
    """
181
    idata = self._data["instance"]
182

    
183
    if instance.name not in idata:
184
      inst = idata[instance.name] = {}
185
    else:
186
      inst = idata[instance.name]
187

    
188
    inst[KEY_RESTART_WHEN] = time.time()
189
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
190

    
191
  def RemoveInstance(self, instance):
192
    """Update state to reflect that a machine is running, i.e. remove record.
193

    
194
    Args:
195
      instance - the instance to remove from books
196

    
197
    This method removes the record for a named instance.
198

    
199
    """
200
    idata = self._data["instance"]
201

    
202
    if instance.name in idata:
203
      del idata[instance.name]
204

    
205

    
206
class Instance(object):
207
  """Abstraction for a Virtual Machine instance.
208

    
209
  Methods:
210
    Restart(): issue a command to restart the represented machine.
211

    
212
  """
213
  def __init__(self, name, state, autostart):
214
    self.name = name
215
    self.state = state
216
    self.autostart = autostart
217

    
218
  def Restart(self):
219
    """Encapsulates the start of an instance.
220

    
221
    """
222
    op = opcodes.OpStartupInstance(instance_name=self.name,
223
                                   force=False,
224
                                   extra_args=None)
225
    cli.SubmitOpCode(op, cl=client)
226

    
227
  def ActivateDisks(self):
228
    """Encapsulates the activation of all disks of an instance.
229

    
230
    """
231
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
232
    cli.SubmitOpCode(op, cl=client)
233

    
234

    
235
def GetInstanceList(with_secondaries=None):
236
  """Get a list of instances on this cluster.
237

    
238
  """
239
  fields = ["name", "status", "admin_state"]
240

    
241
  if with_secondaries is not None:
242
    fields.append("snodes")
243

    
244
  result = client.QueryInstances([], fields)
245

    
246
  instances = []
247
  for fields in result:
248
    if with_secondaries is not None:
249
      (name, status, autostart, snodes) = fields
250

    
251
      if not snodes:
252
        continue
253

    
254
      for node in with_secondaries:
255
        if node in snodes:
256
          break
257
      else:
258
        continue
259

    
260
    else:
261
      (name, status, autostart) = fields
262

    
263
    instances.append(Instance(name, status, autostart))
264

    
265
  return instances
266

    
267

    
268
def GetNodeBootIDs():
269
  """Get a dict mapping nodes to boot IDs.
270

    
271
  """
272
  result = client.QueryNodes([], ["name", "bootid"])
273
  return dict([(name, bootid) for name, bootid in result])
274

    
275

    
276
class Watcher(object):
277
  """Encapsulate the logic for restarting erronously halted virtual machines.
278

    
279
  The calling program should periodically instantiate me and call Run().
280
  This will traverse the list of instances, and make up to MAXTRIES attempts
281
  to restart machines that are down.
282

    
283
  """
284
  def __init__(self):
285
    sstore = ssconf.SimpleStore()
286
    master = sstore.GetMasterNode()
287
    if master != utils.HostInfo().name:
288
      raise NotMasterError("This is not the master node")
289
    self.instances = GetInstanceList()
290
    self.bootids = GetNodeBootIDs()
291
    self.started_instances = set()
292

    
293
  def Run(self):
294
    notepad = WatcherState()
295
    try:
296
      self.CheckInstances(notepad)
297
      self.CheckDisks(notepad)
298
      self.VerifyDisks()
299
    finally:
300
      notepad.Save()
301

    
302
  def CheckDisks(self, notepad):
303
    """Check all nodes for restarted ones.
304

    
305
    """
306
    check_nodes = []
307
    for name, new_id in self.bootids.iteritems():
308
      old = notepad.GetNodeBootID(name)
309
      if old != new_id:
310
        # Node's boot ID has changed, proably through a reboot.
311
        check_nodes.append(name)
312

    
313
    if check_nodes:
314
      # Activate disks for all instances with any of the checked nodes as a
315
      # secondary node.
316
      for instance in GetInstanceList(with_secondaries=check_nodes):
317
        if not instance.autostart:
318
          logging.info(("Skipping disk activation for non-autostart"
319
                        " instance %s"), instance.name)
320
          continue
321
        if instance.name in self.started_instances:
322
          # we already tried to start the instance, which should have
323
          # activated its drives (if they can be at all)
324
          continue
325
        try:
326
          logging.info("Activating disks for instance %s", instance.name)
327
          instance.ActivateDisks()
328
        except Exception:
329
          logging.exception("Error while activating disks for instance %s",
330
                            instance.name)
331

    
332
      # Keep changed boot IDs
333
      for name in check_nodes:
334
        notepad.SetNodeBootID(name, self.bootids[name])
335

    
336
  def CheckInstances(self, notepad):
337
    """Make a pass over the list of instances, restarting downed ones.
338

    
339
    """
340
    for instance in self.instances:
341
      if instance.state in BAD_STATES:
342
        n = notepad.NumberOfRestartAttempts(instance)
343

    
344
        if n > MAXTRIES:
345
          # stay quiet.
346
          continue
347
        elif n < MAXTRIES:
348
          last = " (Attempt #%d)" % (n + 1)
349
        else:
350
          notepad.RecordRestartAttempt(instance)
351
          logging.error("Could not restart %s after %d attempts, giving up",
352
                        instance.name, MAXTRIES)
353
          continue
354
        try:
355
          logging.info("Restarting %s%s",
356
                        instance.name, last)
357
          instance.Restart()
358
          self.started_instances.add(instance.name)
359
        except Exception:
360
          logging.exception("Erro while restarting instance %s", instance.name)
361

    
362
        notepad.RecordRestartAttempt(instance)
363
      elif instance.state in HELPLESS_STATES:
364
        if notepad.NumberOfRestartAttempts(instance):
365
          notepad.RemoveInstance(instance)
366
      else:
367
        if notepad.NumberOfRestartAttempts(instance):
368
          notepad.RemoveInstance(instance)
369
          logging.info("Restart of %s succeeded", instance.name)
370

    
371
  @staticmethod
372
  def VerifyDisks():
373
    """Run gnt-cluster verify-disks.
374

    
375
    """
376
    op = opcodes.OpVerifyDisks()
377
    result = cli.SubmitOpCode(op, cl=client)
378
    if not isinstance(result, (tuple, list)):
379
      logging.error("Can't get a valid result from verify-disks")
380
      return
381
    offline_disk_instances = result[2]
382
    if not offline_disk_instances:
383
      # nothing to do
384
      return
385
    logging.debug("Will activate disks for instances %s",
386
                  ", ".join(offline_disk_instances))
387
    # we submit only one job, and wait for it. not optimal, but spams
388
    # less the job queue
389
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
390
           for name in offline_disk_instances]
391
    job_id = cli.SendJob(job, cl=client)
392

    
393
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
394

    
395

    
396
def ParseOptions():
397
  """Parse the command line options.
398

    
399
  Returns:
400
    (options, args) as from OptionParser.parse_args()
401

    
402
  """
403
  parser = OptionParser(description="Ganeti cluster watcher",
404
                        usage="%prog [-d]",
405
                        version="%%prog (ganeti) %s" %
406
                        constants.RELEASE_VERSION)
407

    
408
  parser.add_option("-d", "--debug", dest="debug",
409
                    help="Write all messages to stderr",
410
                    default=False, action="store_true")
411
  options, args = parser.parse_args()
412
  return options, args
413

    
414

    
415
def main():
416
  """Main function.
417

    
418
  """
419
  global client
420

    
421
  options, args = ParseOptions()
422

    
423
  logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
424
                      stderr_logging=options.debug)
425

    
426
  try:
427
    client = cli.GetClient()
428

    
429
    try:
430
      watcher = Watcher()
431
    except errors.ConfigurationError:
432
      # Just exit if there's no configuration
433
      sys.exit(constants.EXIT_SUCCESS)
434

    
435
    watcher.Run()
436
  except SystemExit:
437
    raise
438
  except NotMasterError:
439
    logging.debug("Not master, exiting")
440
    sys.exit(constants.EXIT_NOTMASTER)
441
  except errors.ResolverError, err:
442
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
443
    sys.exit(constants.EXIT_NODESETUP_ERROR)
444
  except Exception, err:
445
    logging.error(str(err), exc_info=True)
446
    sys.exit(constants.EXIT_FAILURE)
447

    
448

    
449
if __name__ == '__main__':
450
  main()