Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 78f44650

History | View | Annotate | Download (13.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      state_data = self.statefile.read()
92
      if not state_data:
93
        self._data = {}
94
      else:
95
        self._data = serializer.Load(state_data)
96
    except Exception, msg:
97
      # Ignore errors while loading the file and treat it as empty
98
      self._data = {}
99
      logging.warning(("Invalid state file. Using defaults."
100
                       " Error message: %s"), msg)
101

    
102
    if "instance" not in self._data:
103
      self._data["instance"] = {}
104
    if "node" not in self._data:
105
      self._data["node"] = {}
106

    
107
    self._orig_data = serializer.Dump(self._data)
108

    
109
  def Save(self):
110
    """Save state to file, then unlock and close it.
111

    
112
    """
113
    assert self.statefile
114

    
115
    serialized_form = serializer.Dump(self._data)
116
    if self._orig_data == serialized_form:
117
      logging.debug("Data didn't change, just touching status file")
118
      os.utime(constants.WATCHER_STATEFILE, None)
119
      return
120

    
121
    # We need to make sure the file is locked before renaming it, otherwise
122
    # starting ganeti-watcher again at the same time will create a conflict.
123
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
124
                         data=serialized_form,
125
                         prewrite=utils.LockFile, close=False)
126
    self.statefile = os.fdopen(fd, 'w+')
127

    
128
  def Close(self):
129
    """Unlock configuration file and close it.
130

    
131
    """
132
    assert self.statefile
133

    
134
    # Files are automatically unlocked when closing them
135
    self.statefile.close()
136
    self.statefile = None
137

    
138
  def GetNodeBootID(self, name):
139
    """Returns the last boot ID of a node or None.
140

    
141
    """
142
    ndata = self._data["node"]
143

    
144
    if name in ndata and KEY_BOOT_ID in ndata[name]:
145
      return ndata[name][KEY_BOOT_ID]
146
    return None
147

    
148
  def SetNodeBootID(self, name, bootid):
149
    """Sets the boot ID of a node.
150

    
151
    """
152
    assert bootid
153

    
154
    ndata = self._data["node"]
155

    
156
    if name not in ndata:
157
      ndata[name] = {}
158

    
159
    ndata[name][KEY_BOOT_ID] = bootid
160

    
161
  def NumberOfRestartAttempts(self, instance):
162
    """Returns number of previous restart attempts.
163

    
164
    @type instance: L{Instance}
165
    @param instance: the instance to look up
166

    
167
    """
168
    idata = self._data["instance"]
169

    
170
    if instance.name in idata:
171
      return idata[instance.name][KEY_RESTART_COUNT]
172

    
173
    return 0
174

    
175
  def RecordRestartAttempt(self, instance):
176
    """Record a restart attempt.
177

    
178
    @type instance: L{Instance}
179
    @param instance: the instance being restarted
180

    
181
    """
182
    idata = self._data["instance"]
183

    
184
    if instance.name not in idata:
185
      inst = idata[instance.name] = {}
186
    else:
187
      inst = idata[instance.name]
188

    
189
    inst[KEY_RESTART_WHEN] = time.time()
190
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
191

    
192
  def RemoveInstance(self, instance):
193
    """Update state to reflect that a machine is running.
194

    
195
    This method removes the record for a named instance (as we only
196
    track down instances).
197

    
198
    @type instance: L{Instance}
199
    @param instance: the instance to remove from books
200

    
201
    """
202
    idata = self._data["instance"]
203

    
204
    if instance.name in idata:
205
      del idata[instance.name]
206

    
207

    
208
class Instance(object):
209
  """Abstraction for a Virtual Machine instance.
210

    
211
  """
212
  def __init__(self, name, state, autostart):
213
    self.name = name
214
    self.state = state
215
    self.autostart = autostart
216

    
217
  def Restart(self):
218
    """Encapsulates the start of an instance.
219

    
220
    """
221
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
222
    cli.SubmitOpCode(op, cl=client)
223

    
224
  def ActivateDisks(self):
225
    """Encapsulates the activation of all disks of an instance.
226

    
227
    """
228
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
229
    cli.SubmitOpCode(op, cl=client)
230

    
231

    
232
def GetClusterData():
233
  """Get a list of instances on this cluster.
234

    
235
  """
236
  op1_fields = ["name", "status", "admin_state", "snodes"]
237
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
238
                                 use_locking=True)
239
  op2_fields = ["name", "bootid", "offline"]
240
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
241
                             use_locking=True)
242

    
243
  job_id = client.SubmitJob([op1, op2])
244

    
245
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
246

    
247
  logging.debug("Got data from cluster, writing instance status file")
248

    
249
  result = all_results[0]
250
  smap = {}
251

    
252
  instances = {}
253

    
254
  # write the upfile
255
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
256
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
257

    
258
  for fields in result:
259
    (name, status, autostart, snodes) = fields
260

    
261
    # update the secondary node map
262
    for node in snodes:
263
      if node not in smap:
264
        smap[node] = []
265
      smap[node].append(name)
266

    
267
    instances[name] = Instance(name, status, autostart)
268

    
269
  nodes =  dict([(name, (bootid, offline))
270
                 for name, bootid, offline in all_results[1]])
271

    
272
  client.ArchiveJob(job_id)
273

    
274
  return instances, nodes, smap
275

    
276

    
277
class Watcher(object):
278
  """Encapsulate the logic for restarting erronously halted virtual machines.
279

    
280
  The calling program should periodically instantiate me and call Run().
281
  This will traverse the list of instances, and make up to MAXTRIES attempts
282
  to restart machines that are down.
283

    
284
  """
285
  def __init__(self, opts, notepad):
286
    self.notepad = notepad
287
    master = client.QueryConfigValues(["master_node"])[0]
288
    if master != utils.HostInfo().name:
289
      raise NotMasterError("This is not the master node")
290
    self.instances, self.bootids, self.smap = GetClusterData()
291
    self.started_instances = set()
292
    self.opts = opts
293

    
294
  def Run(self):
295
    """Watcher run sequence.
296

    
297
    """
298
    notepad = self.notepad
299
    self.ArchiveJobs(self.opts.job_age)
300
    self.CheckInstances(notepad)
301
    self.CheckDisks(notepad)
302
    self.VerifyDisks()
303

    
304
  def ArchiveJobs(self, age):
305
    """Archive old jobs.
306

    
307
    """
308
    arch_count, left_count = client.AutoArchiveJobs(age)
309
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
310

    
311
  def CheckDisks(self, notepad):
312
    """Check all nodes for restarted ones.
313

    
314
    """
315
    check_nodes = []
316
    for name, (new_id, offline) in self.bootids.iteritems():
317
      old = notepad.GetNodeBootID(name)
318
      if new_id is None:
319
        # Bad node, not returning a boot id
320
        if not offline:
321
          logging.debug("Node %s missing boot id, skipping secondary checks",
322
                        name)
323
        continue
324
      if old != new_id:
325
        # Node's boot ID has changed, proably through a reboot.
326
        check_nodes.append(name)
327

    
328
    if check_nodes:
329
      # Activate disks for all instances with any of the checked nodes as a
330
      # secondary node.
331
      for node in check_nodes:
332
        if node not in self.smap:
333
          continue
334
        for instance_name in self.smap[node]:
335
          instance = self.instances[instance_name]
336
          if not instance.autostart:
337
            logging.info(("Skipping disk activation for non-autostart"
338
                          " instance %s"), instance.name)
339
            continue
340
          if instance.name in self.started_instances:
341
            # we already tried to start the instance, which should have
342
            # activated its drives (if they can be at all)
343
            continue
344
          try:
345
            logging.info("Activating disks for instance %s", instance.name)
346
            instance.ActivateDisks()
347
          except Exception:
348
            logging.exception("Error while activating disks for instance %s",
349
                              instance.name)
350

    
351
      # Keep changed boot IDs
352
      for name in check_nodes:
353
        notepad.SetNodeBootID(name, self.bootids[name][0])
354

    
355
  def CheckInstances(self, notepad):
356
    """Make a pass over the list of instances, restarting downed ones.
357

    
358
    """
359
    for instance in self.instances.values():
360
      if instance.state in BAD_STATES:
361
        n = notepad.NumberOfRestartAttempts(instance)
362

    
363
        if n > MAXTRIES:
364
          # stay quiet.
365
          continue
366
        elif n < MAXTRIES:
367
          last = " (Attempt #%d)" % (n + 1)
368
        else:
369
          notepad.RecordRestartAttempt(instance)
370
          logging.error("Could not restart %s after %d attempts, giving up",
371
                        instance.name, MAXTRIES)
372
          continue
373
        try:
374
          logging.info("Restarting %s%s",
375
                        instance.name, last)
376
          instance.Restart()
377
          self.started_instances.add(instance.name)
378
        except Exception:
379
          logging.exception("Error while restarting instance %s",
380
                            instance.name)
381

    
382
        notepad.RecordRestartAttempt(instance)
383
      elif instance.state in HELPLESS_STATES:
384
        if notepad.NumberOfRestartAttempts(instance):
385
          notepad.RemoveInstance(instance)
386
      else:
387
        if notepad.NumberOfRestartAttempts(instance):
388
          notepad.RemoveInstance(instance)
389
          logging.info("Restart of %s succeeded", instance.name)
390

    
391
  @staticmethod
392
  def VerifyDisks():
393
    """Run gnt-cluster verify-disks.
394

    
395
    """
396
    op = opcodes.OpVerifyDisks()
397
    job_id = client.SubmitJob([op])
398
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
399
    client.ArchiveJob(job_id)
400
    if not isinstance(result, (tuple, list)):
401
      logging.error("Can't get a valid result from verify-disks")
402
      return
403
    offline_disk_instances = result[2]
404
    if not offline_disk_instances:
405
      # nothing to do
406
      return
407
    logging.debug("Will activate disks for instances %s",
408
                  ", ".join(offline_disk_instances))
409
    # we submit only one job, and wait for it. not optimal, but spams
410
    # less the job queue
411
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
412
           for name in offline_disk_instances]
413
    job_id = cli.SendJob(job, cl=client)
414

    
415
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
416

    
417

    
418
def ParseOptions():
419
  """Parse the command line options.
420

    
421
  @return: (options, args) as from OptionParser.parse_args()
422

    
423
  """
424
  parser = OptionParser(description="Ganeti cluster watcher",
425
                        usage="%prog [-d]",
426
                        version="%%prog (ganeti) %s" %
427
                        constants.RELEASE_VERSION)
428

    
429
  parser.add_option("-d", "--debug", dest="debug",
430
                    help="Write all messages to stderr",
431
                    default=False, action="store_true")
432
  parser.add_option("-A", "--job-age", dest="job_age",
433
                    help="Autoarchive jobs older than this age (default"
434
                    " 6 hours)", default=6*3600)
435
  options, args = parser.parse_args()
436
  options.job_age = cli.ParseTimespec(options.job_age)
437
  return options, args
438

    
439

    
440
def main():
441
  """Main function.
442

    
443
  """
444
  global client
445

    
446
  options, args = ParseOptions()
447

    
448
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
449
                     stderr_logging=options.debug)
450

    
451
  try:
452
    notepad = WatcherState()
453
    try:
454
      try:
455
        client = cli.GetClient()
456
      except errors.OpPrereqError:
457
        # this is, from cli.GetClient, a not-master case
458
        sys.exit(constants.EXIT_SUCCESS)
459

    
460
      try:
461
        watcher = Watcher(options, notepad)
462
      except errors.ConfigurationError:
463
        # Just exit if there's no configuration
464
        sys.exit(constants.EXIT_SUCCESS)
465

    
466
      watcher.Run()
467
    finally:
468
      notepad.Save()
469
  except SystemExit:
470
    raise
471
  except NotMasterError:
472
    logging.debug("Not master, exiting")
473
    sys.exit(constants.EXIT_NOTMASTER)
474
  except errors.ResolverError, err:
475
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
476
    sys.exit(constants.EXIT_NODESETUP_ERROR)
477
  except Exception, err:
478
    logging.error(str(err), exc_info=True)
479
    sys.exit(constants.EXIT_FAILURE)
480

    
481

    
482
if __name__ == '__main__':
483
  main()