Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ ec79568d

History | View | Annotate | Download (12.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      self._data = serializer.Load(self.statefile.read())
92
    except Exception, msg:
93
      # Ignore errors while loading the file and treat it as empty
94
      self._data = {}
95
      logging.warning(("Empty or invalid state file. Using defaults."
96
                       " Error message: %s"), msg)
97

    
98
    if "instance" not in self._data:
99
      self._data["instance"] = {}
100
    if "node" not in self._data:
101
      self._data["node"] = {}
102

    
103
    self._orig_data = serializer.Dump(self._data)
104

    
105
  def Save(self):
106
    """Save state to file, then unlock and close it.
107

    
108
    """
109
    assert self.statefile
110

    
111
    serialized_form = serializer.Dump(self._data)
112
    if self._orig_data == serialized_form:
113
      logging.debug("Data didn't change, just touching status file")
114
      os.utime(constants.WATCHER_STATEFILE, None)
115
      return
116

    
117
    # We need to make sure the file is locked before renaming it, otherwise
118
    # starting ganeti-watcher again at the same time will create a conflict.
119
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120
                         data=serialized_form,
121
                         prewrite=utils.LockFile, close=False)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
  def Close(self):
125
    """Unlock configuration file and close it.
126

    
127
    """
128
    assert self.statefile
129

    
130
    # Files are automatically unlocked when closing them
131
    self.statefile.close()
132
    self.statefile = None
133

    
134
  def GetNodeBootID(self, name):
135
    """Returns the last boot ID of a node or None.
136

    
137
    """
138
    ndata = self._data["node"]
139

    
140
    if name in ndata and KEY_BOOT_ID in ndata[name]:
141
      return ndata[name][KEY_BOOT_ID]
142
    return None
143

    
144
  def SetNodeBootID(self, name, bootid):
145
    """Sets the boot ID of a node.
146

    
147
    """
148
    assert bootid
149

    
150
    ndata = self._data["node"]
151

    
152
    if name not in ndata:
153
      ndata[name] = {}
154

    
155
    ndata[name][KEY_BOOT_ID] = bootid
156

    
157
  def NumberOfRestartAttempts(self, instance):
158
    """Returns number of previous restart attempts.
159

    
160
    @type instance: L{Instance}
161
    @param instance: the instance to look up
162

    
163
    """
164
    idata = self._data["instance"]
165

    
166
    if instance.name in idata:
167
      return idata[instance.name][KEY_RESTART_COUNT]
168

    
169
    return 0
170

    
171
  def RecordRestartAttempt(self, instance):
172
    """Record a restart attempt.
173

    
174
    @type instance: L{Instance}
175
    @param instance: the instance being restarted
176

    
177
    """
178
    idata = self._data["instance"]
179

    
180
    if instance.name not in idata:
181
      inst = idata[instance.name] = {}
182
    else:
183
      inst = idata[instance.name]
184

    
185
    inst[KEY_RESTART_WHEN] = time.time()
186
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187

    
188
  def RemoveInstance(self, instance):
189
    """Update state to reflect that a machine is running.
190

    
191
    This method removes the record for a named instance (as we only
192
    track down instances).
193

    
194
    @type instance: L{Instance}
195
    @param instance: the instance to remove from books
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  """
208
  def __init__(self, name, state, autostart):
209
    self.name = name
210
    self.state = state
211
    self.autostart = autostart
212

    
213
  def Restart(self):
214
    """Encapsulates the start of an instance.
215

    
216
    """
217
    op = opcodes.OpStartupInstance(instance_name=self.name,
218
                                   force=False,
219
                                   extra_args=None)
220
    cli.SubmitOpCode(op, cl=client)
221

    
222
  def ActivateDisks(self):
223
    """Encapsulates the activation of all disks of an instance.
224

    
225
    """
226
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
227
    cli.SubmitOpCode(op, cl=client)
228

    
229

    
230
def GetInstanceList(with_secondaries=None):
231
  """Get a list of instances on this cluster.
232

    
233
  """
234
  fields = ["name", "status", "admin_state"]
235

    
236
  if with_secondaries is not None:
237
    fields.append("snodes")
238

    
239
  result = client.QueryInstances([], fields, True)
240

    
241
  instances = []
242
  for fields in result:
243
    if with_secondaries is not None:
244
      (name, status, autostart, snodes) = fields
245

    
246
      if not snodes:
247
        continue
248

    
249
      for node in with_secondaries:
250
        if node in snodes:
251
          break
252
      else:
253
        continue
254

    
255
    else:
256
      (name, status, autostart) = fields
257

    
258
    instances.append(Instance(name, status, autostart))
259

    
260
  return instances
261

    
262

    
263
def GetNodeBootIDs():
264
  """Get a dict mapping nodes to boot IDs.
265

    
266
  """
267
  result = client.QueryNodes([], ["name", "bootid", "offline"], True)
268
  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
269

    
270

    
271
class Watcher(object):
272
  """Encapsulate the logic for restarting erronously halted virtual machines.
273

    
274
  The calling program should periodically instantiate me and call Run().
275
  This will traverse the list of instances, and make up to MAXTRIES attempts
276
  to restart machines that are down.
277

    
278
  """
279
  def __init__(self):
280
    master = client.QueryConfigValues(["master_node"])[0]
281
    if master != utils.HostInfo().name:
282
      raise NotMasterError("This is not the master node")
283
    self.instances = GetInstanceList()
284
    self.bootids = GetNodeBootIDs()
285
    self.started_instances = set()
286

    
287
  def Run(self):
288
    notepad = WatcherState()
289
    try:
290
      self.CheckInstances(notepad)
291
      self.CheckDisks(notepad)
292
      self.VerifyDisks()
293
    finally:
294
      notepad.Save()
295

    
296
  def CheckDisks(self, notepad):
297
    """Check all nodes for restarted ones.
298

    
299
    """
300
    check_nodes = []
301
    for name, (new_id, offline) in self.bootids.iteritems():
302
      old = notepad.GetNodeBootID(name)
303
      if new_id is None:
304
        # Bad node, not returning a boot id
305
        if not offline:
306
          logging.debug("Node %s missing boot id, skipping secondary checks",
307
                        name)
308
        continue
309
      if old != new_id:
310
        # Node's boot ID has changed, proably through a reboot.
311
        check_nodes.append(name)
312

    
313
    if check_nodes:
314
      # Activate disks for all instances with any of the checked nodes as a
315
      # secondary node.
316
      for instance in GetInstanceList(with_secondaries=check_nodes):
317
        if not instance.autostart:
318
          logging.info(("Skipping disk activation for non-autostart"
319
                        " instance %s"), instance.name)
320
          continue
321
        if instance.name in self.started_instances:
322
          # we already tried to start the instance, which should have
323
          # activated its drives (if they can be at all)
324
          continue
325
        try:
326
          logging.info("Activating disks for instance %s", instance.name)
327
          instance.ActivateDisks()
328
        except Exception:
329
          logging.exception("Error while activating disks for instance %s",
330
                            instance.name)
331

    
332
      # Keep changed boot IDs
333
      for name in check_nodes:
334
        notepad.SetNodeBootID(name, self.bootids[name])
335

    
336
  def CheckInstances(self, notepad):
337
    """Make a pass over the list of instances, restarting downed ones.
338

    
339
    """
340
    for instance in self.instances:
341
      if instance.state in BAD_STATES:
342
        n = notepad.NumberOfRestartAttempts(instance)
343

    
344
        if n > MAXTRIES:
345
          # stay quiet.
346
          continue
347
        elif n < MAXTRIES:
348
          last = " (Attempt #%d)" % (n + 1)
349
        else:
350
          notepad.RecordRestartAttempt(instance)
351
          logging.error("Could not restart %s after %d attempts, giving up",
352
                        instance.name, MAXTRIES)
353
          continue
354
        try:
355
          logging.info("Restarting %s%s",
356
                        instance.name, last)
357
          instance.Restart()
358
          self.started_instances.add(instance.name)
359
        except Exception:
360
          logging.exception("Error while restarting instance %s",
361
                            instance.name)
362

    
363
        notepad.RecordRestartAttempt(instance)
364
      elif instance.state in HELPLESS_STATES:
365
        if notepad.NumberOfRestartAttempts(instance):
366
          notepad.RemoveInstance(instance)
367
      else:
368
        if notepad.NumberOfRestartAttempts(instance):
369
          notepad.RemoveInstance(instance)
370
          logging.info("Restart of %s succeeded", instance.name)
371

    
372
  @staticmethod
373
  def VerifyDisks():
374
    """Run gnt-cluster verify-disks.
375

    
376
    """
377
    op = opcodes.OpVerifyDisks()
378
    result = cli.SubmitOpCode(op, cl=client)
379
    if not isinstance(result, (tuple, list)):
380
      logging.error("Can't get a valid result from verify-disks")
381
      return
382
    offline_disk_instances = result[2]
383
    if not offline_disk_instances:
384
      # nothing to do
385
      return
386
    logging.debug("Will activate disks for instances %s",
387
                  ", ".join(offline_disk_instances))
388
    # we submit only one job, and wait for it. not optimal, but spams
389
    # less the job queue
390
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
391
           for name in offline_disk_instances]
392
    job_id = cli.SendJob(job, cl=client)
393

    
394
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
395

    
396

    
397
def ParseOptions():
398
  """Parse the command line options.
399

    
400
  @return: (options, args) as from OptionParser.parse_args()
401

    
402
  """
403
  parser = OptionParser(description="Ganeti cluster watcher",
404
                        usage="%prog [-d]",
405
                        version="%%prog (ganeti) %s" %
406
                        constants.RELEASE_VERSION)
407

    
408
  parser.add_option("-d", "--debug", dest="debug",
409
                    help="Write all messages to stderr",
410
                    default=False, action="store_true")
411
  options, args = parser.parse_args()
412
  return options, args
413

    
414

    
415
def main():
416
  """Main function.
417

    
418
  """
419
  global client
420

    
421
  options, args = ParseOptions()
422

    
423
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
424
                     stderr_logging=options.debug)
425

    
426
  try:
427
    client = cli.GetClient()
428

    
429
    try:
430
      watcher = Watcher()
431
    except errors.ConfigurationError:
432
      # Just exit if there's no configuration
433
      sys.exit(constants.EXIT_SUCCESS)
434

    
435
    watcher.Run()
436
  except SystemExit:
437
    raise
438
  except NotMasterError:
439
    logging.debug("Not master, exiting")
440
    sys.exit(constants.EXIT_NOTMASTER)
441
  except errors.ResolverError, err:
442
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
443
    sys.exit(constants.EXIT_NODESETUP_ERROR)
444
  except Exception, err:
445
    logging.error(str(err), exc_info=True)
446
    sys.exit(constants.EXIT_FAILURE)
447

    
448

    
449
if __name__ == '__main__':
450
  main()