Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ a2370b24

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      self._data = serializer.Load(self.statefile.read())
92
    except Exception, msg:
93
      # Ignore errors while loading the file and treat it as empty
94
      self._data = {}
95
      logging.warning(("Empty or invalid state file. Using defaults."
96
                       " Error message: %s"), msg)
97

    
98
    if "instance" not in self._data:
99
      self._data["instance"] = {}
100
    if "node" not in self._data:
101
      self._data["node"] = {}
102

    
103
    self._orig_data = serializer.Dump(self._data)
104

    
105
  def Save(self):
106
    """Save state to file, then unlock and close it.
107

    
108
    """
109
    assert self.statefile
110

    
111
    serialized_form = serializer.Dump(self._data)
112
    if self._orig_data == serialized_form:
113
      logging.debug("Data didn't change, just touching status file")
114
      os.utime(constants.WATCHER_STATEFILE, None)
115
      return
116

    
117
    # We need to make sure the file is locked before renaming it, otherwise
118
    # starting ganeti-watcher again at the same time will create a conflict.
119
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120
                         data=serialized_form,
121
                         prewrite=utils.LockFile, close=False)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
  def Close(self):
125
    """Unlock configuration file and close it.
126

    
127
    """
128
    assert self.statefile
129

    
130
    # Files are automatically unlocked when closing them
131
    self.statefile.close()
132
    self.statefile = None
133

    
134
  def GetNodeBootID(self, name):
135
    """Returns the last boot ID of a node or None.
136

    
137
    """
138
    ndata = self._data["node"]
139

    
140
    if name in ndata and KEY_BOOT_ID in ndata[name]:
141
      return ndata[name][KEY_BOOT_ID]
142
    return None
143

    
144
  def SetNodeBootID(self, name, bootid):
145
    """Sets the boot ID of a node.
146

    
147
    """
148
    assert bootid
149

    
150
    ndata = self._data["node"]
151

    
152
    if name not in ndata:
153
      ndata[name] = {}
154

    
155
    ndata[name][KEY_BOOT_ID] = bootid
156

    
157
  def NumberOfRestartAttempts(self, instance):
158
    """Returns number of previous restart attempts.
159

    
160
    @type instance: L{Instance}
161
    @param instance: the instance to look up
162

    
163
    """
164
    idata = self._data["instance"]
165

    
166
    if instance.name in idata:
167
      return idata[instance.name][KEY_RESTART_COUNT]
168

    
169
    return 0
170

    
171
  def RecordRestartAttempt(self, instance):
172
    """Record a restart attempt.
173

    
174
    @type instance: L{Instance}
175
    @param instance: the instance being restarted
176

    
177
    """
178
    idata = self._data["instance"]
179

    
180
    if instance.name not in idata:
181
      inst = idata[instance.name] = {}
182
    else:
183
      inst = idata[instance.name]
184

    
185
    inst[KEY_RESTART_WHEN] = time.time()
186
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187

    
188
  def RemoveInstance(self, instance):
189
    """Update state to reflect that a machine is running.
190

    
191
    This method removes the record for a named instance (as we only
192
    track down instances).
193

    
194
    @type instance: L{Instance}
195
    @param instance: the instance to remove from books
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  """
208
  def __init__(self, name, state, autostart):
209
    self.name = name
210
    self.state = state
211
    self.autostart = autostart
212

    
213
  def Restart(self):
214
    """Encapsulates the start of an instance.
215

    
216
    """
217
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
218
    cli.SubmitOpCode(op, cl=client)
219

    
220
  def ActivateDisks(self):
221
    """Encapsulates the activation of all disks of an instance.
222

    
223
    """
224
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
225
    cli.SubmitOpCode(op, cl=client)
226

    
227

    
228
def GetInstanceList(with_secondaries=None):
229
  """Get a list of instances on this cluster.
230

    
231
  """
232
  fields = ["name", "status", "admin_state"]
233

    
234
  if with_secondaries is not None:
235
    fields.append("snodes")
236

    
237
  result = client.QueryInstances([], fields, True)
238

    
239
  instances = []
240
  for fields in result:
241
    if with_secondaries is not None:
242
      (name, status, autostart, snodes) = fields
243

    
244
      if not snodes:
245
        continue
246

    
247
      for node in with_secondaries:
248
        if node in snodes:
249
          break
250
      else:
251
        continue
252

    
253
    else:
254
      (name, status, autostart) = fields
255

    
256
    instances.append(Instance(name, status, autostart))
257

    
258
  return instances
259

    
260

    
261
def GetNodeBootIDs():
262
  """Get a dict mapping nodes to boot IDs.
263

    
264
  """
265
  result = client.QueryNodes([], ["name", "bootid", "offline"], True)
266
  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
267

    
268

    
269
class Watcher(object):
270
  """Encapsulate the logic for restarting erronously halted virtual machines.
271

    
272
  The calling program should periodically instantiate me and call Run().
273
  This will traverse the list of instances, and make up to MAXTRIES attempts
274
  to restart machines that are down.
275

    
276
  """
277
  def __init__(self, opts):
278
    master = client.QueryConfigValues(["master_node"])[0]
279
    if master != utils.HostInfo().name:
280
      raise NotMasterError("This is not the master node")
281
    self.instances = GetInstanceList()
282
    self.bootids = GetNodeBootIDs()
283
    self.started_instances = set()
284
    self.opts = opts
285

    
286
  def Run(self):
287
    notepad = WatcherState()
288
    try:
289
      self.ArchiveJobs(self.opts.job_age)
290
      self.CheckInstances(notepad)
291
      self.CheckDisks(notepad)
292
      self.VerifyDisks()
293
    finally:
294
      notepad.Save()
295

    
296
  def ArchiveJobs(self, age):
297
    """Archive old jobs.
298

    
299
    """
300
    arch_count, left_count = client.AutoArchiveJobs(age)
301
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
302

    
303
  def CheckDisks(self, notepad):
304
    """Check all nodes for restarted ones.
305

    
306
    """
307
    check_nodes = []
308
    for name, (new_id, offline) in self.bootids.iteritems():
309
      old = notepad.GetNodeBootID(name)
310
      if new_id is None:
311
        # Bad node, not returning a boot id
312
        if not offline:
313
          logging.debug("Node %s missing boot id, skipping secondary checks",
314
                        name)
315
        continue
316
      if old != new_id:
317
        # Node's boot ID has changed, proably through a reboot.
318
        check_nodes.append(name)
319

    
320
    if check_nodes:
321
      # Activate disks for all instances with any of the checked nodes as a
322
      # secondary node.
323
      for instance in GetInstanceList(with_secondaries=check_nodes):
324
        if not instance.autostart:
325
          logging.info(("Skipping disk activation for non-autostart"
326
                        " instance %s"), instance.name)
327
          continue
328
        if instance.name in self.started_instances:
329
          # we already tried to start the instance, which should have
330
          # activated its drives (if they can be at all)
331
          continue
332
        try:
333
          logging.info("Activating disks for instance %s", instance.name)
334
          instance.ActivateDisks()
335
        except Exception:
336
          logging.exception("Error while activating disks for instance %s",
337
                            instance.name)
338

    
339
      # Keep changed boot IDs
340
      for name in check_nodes:
341
        notepad.SetNodeBootID(name, self.bootids[name][0])
342

    
343
  def CheckInstances(self, notepad):
344
    """Make a pass over the list of instances, restarting downed ones.
345

    
346
    """
347
    for instance in self.instances:
348
      if instance.state in BAD_STATES:
349
        n = notepad.NumberOfRestartAttempts(instance)
350

    
351
        if n > MAXTRIES:
352
          # stay quiet.
353
          continue
354
        elif n < MAXTRIES:
355
          last = " (Attempt #%d)" % (n + 1)
356
        else:
357
          notepad.RecordRestartAttempt(instance)
358
          logging.error("Could not restart %s after %d attempts, giving up",
359
                        instance.name, MAXTRIES)
360
          continue
361
        try:
362
          logging.info("Restarting %s%s",
363
                        instance.name, last)
364
          instance.Restart()
365
          self.started_instances.add(instance.name)
366
        except Exception:
367
          logging.exception("Error while restarting instance %s",
368
                            instance.name)
369

    
370
        notepad.RecordRestartAttempt(instance)
371
      elif instance.state in HELPLESS_STATES:
372
        if notepad.NumberOfRestartAttempts(instance):
373
          notepad.RemoveInstance(instance)
374
      else:
375
        if notepad.NumberOfRestartAttempts(instance):
376
          notepad.RemoveInstance(instance)
377
          logging.info("Restart of %s succeeded", instance.name)
378

    
379
  @staticmethod
380
  def VerifyDisks():
381
    """Run gnt-cluster verify-disks.
382

    
383
    """
384
    op = opcodes.OpVerifyDisks()
385
    result = cli.SubmitOpCode(op, cl=client)
386
    if not isinstance(result, (tuple, list)):
387
      logging.error("Can't get a valid result from verify-disks")
388
      return
389
    offline_disk_instances = result[2]
390
    if not offline_disk_instances:
391
      # nothing to do
392
      return
393
    logging.debug("Will activate disks for instances %s",
394
                  ", ".join(offline_disk_instances))
395
    # we submit only one job, and wait for it. not optimal, but spams
396
    # less the job queue
397
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
398
           for name in offline_disk_instances]
399
    job_id = cli.SendJob(job, cl=client)
400

    
401
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
402

    
403

    
404
def ParseOptions():
405
  """Parse the command line options.
406

    
407
  @return: (options, args) as from OptionParser.parse_args()
408

    
409
  """
410
  parser = OptionParser(description="Ganeti cluster watcher",
411
                        usage="%prog [-d]",
412
                        version="%%prog (ganeti) %s" %
413
                        constants.RELEASE_VERSION)
414

    
415
  parser.add_option("-d", "--debug", dest="debug",
416
                    help="Write all messages to stderr",
417
                    default=False, action="store_true")
418
  parser.add_option("-A", "--job-age", dest="job_age",
419
                    help="Autoarchive jobs older than this age (default"
420
                    " 6 hours)", default=6*3600)
421
  options, args = parser.parse_args()
422
  options.job_age = cli.ParseTimespec(options.job_age)
423
  return options, args
424

    
425

    
426
def main():
427
  """Main function.
428

    
429
  """
430
  global client
431

    
432
  options, args = ParseOptions()
433

    
434
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
435
                     stderr_logging=options.debug)
436

    
437
  try:
438
    client = cli.GetClient()
439

    
440
    try:
441
      watcher = Watcher(options)
442
    except errors.ConfigurationError:
443
      # Just exit if there's no configuration
444
      sys.exit(constants.EXIT_SUCCESS)
445

    
446
    watcher.Run()
447
  except SystemExit:
448
    raise
449
  except NotMasterError:
450
    logging.debug("Not master, exiting")
451
    sys.exit(constants.EXIT_NOTMASTER)
452
  except errors.ResolverError, err:
453
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
454
    sys.exit(constants.EXIT_NODESETUP_ERROR)
455
  except Exception, err:
456
    logging.error(str(err), exc_info=True)
457
    sys.exit(constants.EXIT_FAILURE)
458

    
459

    
460
if __name__ == '__main__':
461
  main()