Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 2c404217

History | View | Annotate | Download (13.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      state_data = self.statefile.read()
92
      if not state_data:
93
        self._data = {}
94
      else:
95
        self._data = serializer.Load(state_data)
96
    except Exception, msg:
97
      # Ignore errors while loading the file and treat it as empty
98
      self._data = {}
99
      logging.warning(("Invalid state file. Using defaults."
100
                       " Error message: %s"), msg)
101

    
102
    if "instance" not in self._data:
103
      self._data["instance"] = {}
104
    if "node" not in self._data:
105
      self._data["node"] = {}
106

    
107
    self._orig_data = serializer.Dump(self._data)
108

    
109
  def Save(self):
110
    """Save state to file, then unlock and close it.
111

    
112
    """
113
    assert self.statefile
114

    
115
    serialized_form = serializer.Dump(self._data)
116
    if self._orig_data == serialized_form:
117
      logging.debug("Data didn't change, just touching status file")
118
      os.utime(constants.WATCHER_STATEFILE, None)
119
      return
120

    
121
    # We need to make sure the file is locked before renaming it, otherwise
122
    # starting ganeti-watcher again at the same time will create a conflict.
123
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
124
                         data=serialized_form,
125
                         prewrite=utils.LockFile, close=False)
126
    self.statefile = os.fdopen(fd, 'w+')
127

    
128
  def Close(self):
129
    """Unlock configuration file and close it.
130

    
131
    """
132
    assert self.statefile
133

    
134
    # Files are automatically unlocked when closing them
135
    self.statefile.close()
136
    self.statefile = None
137

    
138
  def GetNodeBootID(self, name):
139
    """Returns the last boot ID of a node or None.
140

    
141
    """
142
    ndata = self._data["node"]
143

    
144
    if name in ndata and KEY_BOOT_ID in ndata[name]:
145
      return ndata[name][KEY_BOOT_ID]
146
    return None
147

    
148
  def SetNodeBootID(self, name, bootid):
149
    """Sets the boot ID of a node.
150

    
151
    """
152
    assert bootid
153

    
154
    ndata = self._data["node"]
155

    
156
    if name not in ndata:
157
      ndata[name] = {}
158

    
159
    ndata[name][KEY_BOOT_ID] = bootid
160

    
161
  def NumberOfRestartAttempts(self, instance):
162
    """Returns number of previous restart attempts.
163

    
164
    @type instance: L{Instance}
165
    @param instance: the instance to look up
166

    
167
    """
168
    idata = self._data["instance"]
169

    
170
    if instance.name in idata:
171
      return idata[instance.name][KEY_RESTART_COUNT]
172

    
173
    return 0
174

    
175
  def RecordRestartAttempt(self, instance):
176
    """Record a restart attempt.
177

    
178
    @type instance: L{Instance}
179
    @param instance: the instance being restarted
180

    
181
    """
182
    idata = self._data["instance"]
183

    
184
    if instance.name not in idata:
185
      inst = idata[instance.name] = {}
186
    else:
187
      inst = idata[instance.name]
188

    
189
    inst[KEY_RESTART_WHEN] = time.time()
190
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
191

    
192
  def RemoveInstance(self, instance):
193
    """Update state to reflect that a machine is running.
194

    
195
    This method removes the record for a named instance (as we only
196
    track down instances).
197

    
198
    @type instance: L{Instance}
199
    @param instance: the instance to remove from books
200

    
201
    """
202
    idata = self._data["instance"]
203

    
204
    if instance.name in idata:
205
      del idata[instance.name]
206

    
207

    
208
class Instance(object):
209
  """Abstraction for a Virtual Machine instance.
210

    
211
  """
212
  def __init__(self, name, state, autostart):
213
    self.name = name
214
    self.state = state
215
    self.autostart = autostart
216

    
217
  def Restart(self):
218
    """Encapsulates the start of an instance.
219

    
220
    """
221
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
222
    cli.SubmitOpCode(op, cl=client)
223

    
224
  def ActivateDisks(self):
225
    """Encapsulates the activation of all disks of an instance.
226

    
227
    """
228
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
229
    cli.SubmitOpCode(op, cl=client)
230

    
231

    
232
def GetClusterData():
233
  """Get a list of instances on this cluster.
234

    
235
  """
236
  op1_fields = ["name", "status", "admin_state", "snodes"]
237
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
238
                                 use_locking=True)
239
  op2_fields = ["name", "bootid", "offline"]
240
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
241
                             use_locking=True)
242

    
243
  job_id = client.SubmitJob([op1, op2])
244

    
245
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
246

    
247
  result = all_results[0]
248
  smap = {}
249

    
250
  instances = {}
251
  for fields in result:
252
    (name, status, autostart, snodes) = fields
253

    
254
    # update the secondary node map
255
    for node in snodes:
256
      if node not in smap:
257
        smap[node] = []
258
      smap[node].append(name)
259

    
260
    instances[name] = Instance(name, status, autostart)
261

    
262
  nodes =  dict([(name, (bootid, offline))
263
                 for name, bootid, offline in all_results[1]])
264

    
265
  client.ArchiveJob(job_id)
266

    
267
  return instances, nodes, smap
268

    
269

    
270
class Watcher(object):
271
  """Encapsulate the logic for restarting erronously halted virtual machines.
272

    
273
  The calling program should periodically instantiate me and call Run().
274
  This will traverse the list of instances, and make up to MAXTRIES attempts
275
  to restart machines that are down.
276

    
277
  """
278
  def __init__(self, opts, notepad):
279
    self.notepad = notepad
280
    master = client.QueryConfigValues(["master_node"])[0]
281
    if master != utils.HostInfo().name:
282
      raise NotMasterError("This is not the master node")
283
    self.instances, self.bootids, self.smap = GetClusterData()
284
    self.started_instances = set()
285
    self.opts = opts
286

    
287
  def Run(self):
288
    """Watcher run sequence.
289

    
290
    """
291
    notepad = self.notepad
292
    self.ArchiveJobs(self.opts.job_age)
293
    self.CheckInstances(notepad)
294
    self.CheckDisks(notepad)
295
    self.VerifyDisks()
296

    
297
  def ArchiveJobs(self, age):
298
    """Archive old jobs.
299

    
300
    """
301
    arch_count, left_count = client.AutoArchiveJobs(age)
302
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
303

    
304
  def CheckDisks(self, notepad):
305
    """Check all nodes for restarted ones.
306

    
307
    """
308
    check_nodes = []
309
    for name, (new_id, offline) in self.bootids.iteritems():
310
      old = notepad.GetNodeBootID(name)
311
      if new_id is None:
312
        # Bad node, not returning a boot id
313
        if not offline:
314
          logging.debug("Node %s missing boot id, skipping secondary checks",
315
                        name)
316
        continue
317
      if old != new_id:
318
        # Node's boot ID has changed, proably through a reboot.
319
        check_nodes.append(name)
320

    
321
    if check_nodes:
322
      # Activate disks for all instances with any of the checked nodes as a
323
      # secondary node.
324
      for node in check_nodes:
325
        if node not in self.smap:
326
          continue
327
        for instance_name in self.smap[node]:
328
          instance = self.instances[instance_name]
329
          if not instance.autostart:
330
            logging.info(("Skipping disk activation for non-autostart"
331
                          " instance %s"), instance.name)
332
            continue
333
          if instance.name in self.started_instances:
334
            # we already tried to start the instance, which should have
335
            # activated its drives (if they can be at all)
336
            continue
337
          try:
338
            logging.info("Activating disks for instance %s", instance.name)
339
            instance.ActivateDisks()
340
          except Exception:
341
            logging.exception("Error while activating disks for instance %s",
342
                              instance.name)
343

    
344
      # Keep changed boot IDs
345
      for name in check_nodes:
346
        notepad.SetNodeBootID(name, self.bootids[name][0])
347

    
348
  def CheckInstances(self, notepad):
349
    """Make a pass over the list of instances, restarting downed ones.
350

    
351
    """
352
    for instance in self.instances.values():
353
      if instance.state in BAD_STATES:
354
        n = notepad.NumberOfRestartAttempts(instance)
355

    
356
        if n > MAXTRIES:
357
          # stay quiet.
358
          continue
359
        elif n < MAXTRIES:
360
          last = " (Attempt #%d)" % (n + 1)
361
        else:
362
          notepad.RecordRestartAttempt(instance)
363
          logging.error("Could not restart %s after %d attempts, giving up",
364
                        instance.name, MAXTRIES)
365
          continue
366
        try:
367
          logging.info("Restarting %s%s",
368
                        instance.name, last)
369
          instance.Restart()
370
          self.started_instances.add(instance.name)
371
        except Exception:
372
          logging.exception("Error while restarting instance %s",
373
                            instance.name)
374

    
375
        notepad.RecordRestartAttempt(instance)
376
      elif instance.state in HELPLESS_STATES:
377
        if notepad.NumberOfRestartAttempts(instance):
378
          notepad.RemoveInstance(instance)
379
      else:
380
        if notepad.NumberOfRestartAttempts(instance):
381
          notepad.RemoveInstance(instance)
382
          logging.info("Restart of %s succeeded", instance.name)
383

    
384
  @staticmethod
385
  def VerifyDisks():
386
    """Run gnt-cluster verify-disks.
387

    
388
    """
389
    op = opcodes.OpVerifyDisks()
390
    job_id = client.SubmitJob([op])
391
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
392
    client.ArchiveJob(job_id)
393
    if not isinstance(result, (tuple, list)):
394
      logging.error("Can't get a valid result from verify-disks")
395
      return
396
    offline_disk_instances = result[2]
397
    if not offline_disk_instances:
398
      # nothing to do
399
      return
400
    logging.debug("Will activate disks for instances %s",
401
                  ", ".join(offline_disk_instances))
402
    # we submit only one job, and wait for it. not optimal, but spams
403
    # less the job queue
404
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
405
           for name in offline_disk_instances]
406
    job_id = cli.SendJob(job, cl=client)
407

    
408
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
409

    
410

    
411
def ParseOptions():
412
  """Parse the command line options.
413

    
414
  @return: (options, args) as from OptionParser.parse_args()
415

    
416
  """
417
  parser = OptionParser(description="Ganeti cluster watcher",
418
                        usage="%prog [-d]",
419
                        version="%%prog (ganeti) %s" %
420
                        constants.RELEASE_VERSION)
421

    
422
  parser.add_option("-d", "--debug", dest="debug",
423
                    help="Write all messages to stderr",
424
                    default=False, action="store_true")
425
  parser.add_option("-A", "--job-age", dest="job_age",
426
                    help="Autoarchive jobs older than this age (default"
427
                    " 6 hours)", default=6*3600)
428
  options, args = parser.parse_args()
429
  options.job_age = cli.ParseTimespec(options.job_age)
430
  return options, args
431

    
432

    
433
def main():
434
  """Main function.
435

    
436
  """
437
  global client
438

    
439
  options, args = ParseOptions()
440

    
441
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
442
                     stderr_logging=options.debug)
443

    
444
  try:
445
    notepad = WatcherState()
446
    try:
447
      try:
448
        client = cli.GetClient()
449
      except errors.OpPrereqError:
450
        # this is, from cli.GetClient, a not-master case
451
        sys.exit(constants.EXIT_SUCCESS)
452

    
453
      try:
454
        watcher = Watcher(options, notepad)
455
      except errors.ConfigurationError:
456
        # Just exit if there's no configuration
457
        sys.exit(constants.EXIT_SUCCESS)
458

    
459
      watcher.Run()
460
    finally:
461
      notepad.Save()
462
  except SystemExit:
463
    raise
464
  except NotMasterError:
465
    logging.debug("Not master, exiting")
466
    sys.exit(constants.EXIT_NOTMASTER)
467
  except errors.ResolverError, err:
468
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
469
    sys.exit(constants.EXIT_NODESETUP_ERROR)
470
  except Exception, err:
471
    logging.error(str(err), exc_info=True)
472
    sys.exit(constants.EXIT_FAILURE)
473

    
474

    
475
if __name__ == '__main__':
476
  main()