Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 3448aa22

History | View | Annotate | Download (13 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      self._data = serializer.Load(self.statefile.read())
92
    except Exception, msg:
93
      # Ignore errors while loading the file and treat it as empty
94
      self._data = {}
95
      logging.warning(("Empty or invalid state file. Using defaults."
96
                       " Error message: %s"), msg)
97

    
98
    if "instance" not in self._data:
99
      self._data["instance"] = {}
100
    if "node" not in self._data:
101
      self._data["node"] = {}
102

    
103
    self._orig_data = serializer.Dump(self._data)
104

    
105
  def Save(self):
106
    """Save state to file, then unlock and close it.
107

    
108
    """
109
    assert self.statefile
110

    
111
    serialized_form = serializer.Dump(self._data)
112
    if self._orig_data == serialized_form:
113
      logging.debug("Data didn't change, just touching status file")
114
      os.utime(constants.WATCHER_STATEFILE, None)
115
      return
116

    
117
    # We need to make sure the file is locked before renaming it, otherwise
118
    # starting ganeti-watcher again at the same time will create a conflict.
119
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120
                         data=serialized_form,
121
                         prewrite=utils.LockFile, close=False)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
  def Close(self):
125
    """Unlock configuration file and close it.
126

    
127
    """
128
    assert self.statefile
129

    
130
    # Files are automatically unlocked when closing them
131
    self.statefile.close()
132
    self.statefile = None
133

    
134
  def GetNodeBootID(self, name):
135
    """Returns the last boot ID of a node or None.
136

    
137
    """
138
    ndata = self._data["node"]
139

    
140
    if name in ndata and KEY_BOOT_ID in ndata[name]:
141
      return ndata[name][KEY_BOOT_ID]
142
    return None
143

    
144
  def SetNodeBootID(self, name, bootid):
145
    """Sets the boot ID of a node.
146

    
147
    """
148
    assert bootid
149

    
150
    ndata = self._data["node"]
151

    
152
    if name not in ndata:
153
      ndata[name] = {}
154

    
155
    ndata[name][KEY_BOOT_ID] = bootid
156

    
157
  def NumberOfRestartAttempts(self, instance):
158
    """Returns number of previous restart attempts.
159

    
160
    @type instance: L{Instance}
161
    @param instance: the instance to look up
162

    
163
    """
164
    idata = self._data["instance"]
165

    
166
    if instance.name in idata:
167
      return idata[instance.name][KEY_RESTART_COUNT]
168

    
169
    return 0
170

    
171
  def RecordRestartAttempt(self, instance):
172
    """Record a restart attempt.
173

    
174
    @type instance: L{Instance}
175
    @param instance: the instance being restarted
176

    
177
    """
178
    idata = self._data["instance"]
179

    
180
    if instance.name not in idata:
181
      inst = idata[instance.name] = {}
182
    else:
183
      inst = idata[instance.name]
184

    
185
    inst[KEY_RESTART_WHEN] = time.time()
186
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187

    
188
  def RemoveInstance(self, instance):
189
    """Update state to reflect that a machine is running.
190

    
191
    This method removes the record for a named instance (as we only
192
    track down instances).
193

    
194
    @type instance: L{Instance}
195
    @param instance: the instance to remove from books
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  """
208
  def __init__(self, name, state, autostart):
209
    self.name = name
210
    self.state = state
211
    self.autostart = autostart
212

    
213
  def Restart(self):
214
    """Encapsulates the start of an instance.
215

    
216
    """
217
    op = opcodes.OpStartupInstance(instance_name=self.name,
218
                                   force=False,
219
                                   extra_args=None)
220
    cli.SubmitOpCode(op, cl=client)
221

    
222
  def ActivateDisks(self):
223
    """Encapsulates the activation of all disks of an instance.
224

    
225
    """
226
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
227
    cli.SubmitOpCode(op, cl=client)
228

    
229

    
230
def GetInstanceList(with_secondaries=None):
231
  """Get a list of instances on this cluster.
232

    
233
  """
234
  fields = ["name", "status", "admin_state"]
235

    
236
  if with_secondaries is not None:
237
    fields.append("snodes")
238

    
239
  result = client.QueryInstances([], fields, True)
240

    
241
  instances = []
242
  for fields in result:
243
    if with_secondaries is not None:
244
      (name, status, autostart, snodes) = fields
245

    
246
      if not snodes:
247
        continue
248

    
249
      for node in with_secondaries:
250
        if node in snodes:
251
          break
252
      else:
253
        continue
254

    
255
    else:
256
      (name, status, autostart) = fields
257

    
258
    instances.append(Instance(name, status, autostart))
259

    
260
  return instances
261

    
262

    
263
def GetNodeBootIDs():
264
  """Get a dict mapping nodes to boot IDs.
265

    
266
  """
267
  result = client.QueryNodes([], ["name", "bootid", "offline"], True)
268
  return dict([(name, (bootid, offline)) for name, bootid, offline in result])
269

    
270

    
271
class Watcher(object):
272
  """Encapsulate the logic for restarting erronously halted virtual machines.
273

    
274
  The calling program should periodically instantiate me and call Run().
275
  This will traverse the list of instances, and make up to MAXTRIES attempts
276
  to restart machines that are down.
277

    
278
  """
279
  def __init__(self, opts):
280
    master = client.QueryConfigValues(["master_node"])[0]
281
    if master != utils.HostInfo().name:
282
      raise NotMasterError("This is not the master node")
283
    self.instances = GetInstanceList()
284
    self.bootids = GetNodeBootIDs()
285
    self.started_instances = set()
286
    self.opts = opts
287

    
288
  def Run(self):
289
    notepad = WatcherState()
290
    try:
291
      self.ArchiveJobs(self.opts.job_age)
292
      self.CheckInstances(notepad)
293
      self.CheckDisks(notepad)
294
      self.VerifyDisks()
295
    finally:
296
      notepad.Save()
297

    
298
  def ArchiveJobs(self, age):
299
    """Archive old jobs.
300

    
301
    """
302
    arch_count, left_count = client.AutoArchiveJobs(age)
303
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
304

    
305
  def CheckDisks(self, notepad):
306
    """Check all nodes for restarted ones.
307

    
308
    """
309
    check_nodes = []
310
    for name, (new_id, offline) in self.bootids.iteritems():
311
      old = notepad.GetNodeBootID(name)
312
      if new_id is None:
313
        # Bad node, not returning a boot id
314
        if not offline:
315
          logging.debug("Node %s missing boot id, skipping secondary checks",
316
                        name)
317
        continue
318
      if old != new_id:
319
        # Node's boot ID has changed, proably through a reboot.
320
        check_nodes.append(name)
321

    
322
    if check_nodes:
323
      # Activate disks for all instances with any of the checked nodes as a
324
      # secondary node.
325
      for instance in GetInstanceList(with_secondaries=check_nodes):
326
        if not instance.autostart:
327
          logging.info(("Skipping disk activation for non-autostart"
328
                        " instance %s"), instance.name)
329
          continue
330
        if instance.name in self.started_instances:
331
          # we already tried to start the instance, which should have
332
          # activated its drives (if they can be at all)
333
          continue
334
        try:
335
          logging.info("Activating disks for instance %s", instance.name)
336
          instance.ActivateDisks()
337
        except Exception:
338
          logging.exception("Error while activating disks for instance %s",
339
                            instance.name)
340

    
341
      # Keep changed boot IDs
342
      for name in check_nodes:
343
        notepad.SetNodeBootID(name, self.bootids[name][0])
344

    
345
  def CheckInstances(self, notepad):
346
    """Make a pass over the list of instances, restarting downed ones.
347

    
348
    """
349
    for instance in self.instances:
350
      if instance.state in BAD_STATES:
351
        n = notepad.NumberOfRestartAttempts(instance)
352

    
353
        if n > MAXTRIES:
354
          # stay quiet.
355
          continue
356
        elif n < MAXTRIES:
357
          last = " (Attempt #%d)" % (n + 1)
358
        else:
359
          notepad.RecordRestartAttempt(instance)
360
          logging.error("Could not restart %s after %d attempts, giving up",
361
                        instance.name, MAXTRIES)
362
          continue
363
        try:
364
          logging.info("Restarting %s%s",
365
                        instance.name, last)
366
          instance.Restart()
367
          self.started_instances.add(instance.name)
368
        except Exception:
369
          logging.exception("Error while restarting instance %s",
370
                            instance.name)
371

    
372
        notepad.RecordRestartAttempt(instance)
373
      elif instance.state in HELPLESS_STATES:
374
        if notepad.NumberOfRestartAttempts(instance):
375
          notepad.RemoveInstance(instance)
376
      else:
377
        if notepad.NumberOfRestartAttempts(instance):
378
          notepad.RemoveInstance(instance)
379
          logging.info("Restart of %s succeeded", instance.name)
380

    
381
  @staticmethod
382
  def VerifyDisks():
383
    """Run gnt-cluster verify-disks.
384

    
385
    """
386
    op = opcodes.OpVerifyDisks()
387
    result = cli.SubmitOpCode(op, cl=client)
388
    if not isinstance(result, (tuple, list)):
389
      logging.error("Can't get a valid result from verify-disks")
390
      return
391
    offline_disk_instances = result[2]
392
    if not offline_disk_instances:
393
      # nothing to do
394
      return
395
    logging.debug("Will activate disks for instances %s",
396
                  ", ".join(offline_disk_instances))
397
    # we submit only one job, and wait for it. not optimal, but spams
398
    # less the job queue
399
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
400
           for name in offline_disk_instances]
401
    job_id = cli.SendJob(job, cl=client)
402

    
403
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
404

    
405

    
406
def ParseOptions():
407
  """Parse the command line options.
408

    
409
  @return: (options, args) as from OptionParser.parse_args()
410

    
411
  """
412
  parser = OptionParser(description="Ganeti cluster watcher",
413
                        usage="%prog [-d]",
414
                        version="%%prog (ganeti) %s" %
415
                        constants.RELEASE_VERSION)
416

    
417
  parser.add_option("-d", "--debug", dest="debug",
418
                    help="Write all messages to stderr",
419
                    default=False, action="store_true")
420
  parser.add_option("-A", "--job-age", dest="job_age",
421
                    help="Autoarchive jobs older than this age (default"
422
                    " 6 hours)", default=6*3600)
423
  options, args = parser.parse_args()
424
  options.job_age = cli.ParseTimespec(options.job_age)
425
  return options, args
426

    
427

    
428
def main():
429
  """Main function.
430

    
431
  """
432
  global client
433

    
434
  options, args = ParseOptions()
435

    
436
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
437
                     stderr_logging=options.debug)
438

    
439
  try:
440
    client = cli.GetClient()
441

    
442
    try:
443
      watcher = Watcher(options)
444
    except errors.ConfigurationError:
445
      # Just exit if there's no configuration
446
      sys.exit(constants.EXIT_SUCCESS)
447

    
448
    watcher.Run()
449
  except SystemExit:
450
    raise
451
  except NotMasterError:
452
    logging.debug("Not master, exiting")
453
    sys.exit(constants.EXIT_NOTMASTER)
454
  except errors.ResolverError, err:
455
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
456
    sys.exit(constants.EXIT_NODESETUP_ERROR)
457
  except Exception, err:
458
    logging.error(str(err), exc_info=True)
459
    sys.exit(constants.EXIT_FAILURE)
460

    
461

    
462
if __name__ == '__main__':
463
  main()