Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 6dfcc47b

History | View | Annotate | Download (13.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
from optparse import OptionParser
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import errors
40
from ganeti import opcodes
41
from ganeti import cli
42

    
43

    
44
MAXTRIES = 5
45
BAD_STATES = ['ERROR_down']
46
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
47
NOTICE = 'NOTICE'
48
ERROR = 'ERROR'
49
KEY_RESTART_COUNT = "restart_count"
50
KEY_RESTART_WHEN = "restart_when"
51
KEY_BOOT_ID = "bootid"
52

    
53

    
54
# Global client object
55
client = None
56

    
57

    
58
class NotMasterError(errors.GenericError):
59
  """Exception raised when this host is not the master."""
60

    
61

    
62
def Indent(s, prefix='| '):
63
  """Indent a piece of text with a given prefix before each line.
64

    
65
  @param s: the string to indent
66
  @param prefix: the string to prepend each line
67

    
68
  """
69
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
70

    
71

    
72
class WatcherState(object):
73
  """Interface to a state file recording restart attempts.
74

    
75
  """
76
  def __init__(self):
77
    """Open, lock, read and parse the file.
78

    
79
    Raises exception on lock contention.
80

    
81
    """
82
    # The two-step dance below is necessary to allow both opening existing
83
    # file read/write and creating if not existing.  Vanilla open will truncate
84
    # an existing file -or- allow creating if not existing.
85
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86
    self.statefile = os.fdopen(fd, 'w+')
87

    
88
    utils.LockFile(self.statefile.fileno())
89

    
90
    try:
91
      self._data = serializer.Load(self.statefile.read())
92
    except Exception, msg:
93
      # Ignore errors while loading the file and treat it as empty
94
      self._data = {}
95
      logging.warning(("Empty or invalid state file. Using defaults."
96
                       " Error message: %s"), msg)
97

    
98
    if "instance" not in self._data:
99
      self._data["instance"] = {}
100
    if "node" not in self._data:
101
      self._data["node"] = {}
102

    
103
    self._orig_data = serializer.Dump(self._data)
104

    
105
  def Save(self):
106
    """Save state to file, then unlock and close it.
107

    
108
    """
109
    assert self.statefile
110

    
111
    serialized_form = serializer.Dump(self._data)
112
    if self._orig_data == serialized_form:
113
      logging.debug("Data didn't change, just touching status file")
114
      os.utime(constants.WATCHER_STATEFILE, None)
115
      return
116

    
117
    # We need to make sure the file is locked before renaming it, otherwise
118
    # starting ganeti-watcher again at the same time will create a conflict.
119
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120
                         data=serialized_form,
121
                         prewrite=utils.LockFile, close=False)
122
    self.statefile = os.fdopen(fd, 'w+')
123

    
124
  def Close(self):
125
    """Unlock configuration file and close it.
126

    
127
    """
128
    assert self.statefile
129

    
130
    # Files are automatically unlocked when closing them
131
    self.statefile.close()
132
    self.statefile = None
133

    
134
  def GetNodeBootID(self, name):
135
    """Returns the last boot ID of a node or None.
136

    
137
    """
138
    ndata = self._data["node"]
139

    
140
    if name in ndata and KEY_BOOT_ID in ndata[name]:
141
      return ndata[name][KEY_BOOT_ID]
142
    return None
143

    
144
  def SetNodeBootID(self, name, bootid):
145
    """Sets the boot ID of a node.
146

    
147
    """
148
    assert bootid
149

    
150
    ndata = self._data["node"]
151

    
152
    if name not in ndata:
153
      ndata[name] = {}
154

    
155
    ndata[name][KEY_BOOT_ID] = bootid
156

    
157
  def NumberOfRestartAttempts(self, instance):
158
    """Returns number of previous restart attempts.
159

    
160
    @type instance: L{Instance}
161
    @param instance: the instance to look up
162

    
163
    """
164
    idata = self._data["instance"]
165

    
166
    if instance.name in idata:
167
      return idata[instance.name][KEY_RESTART_COUNT]
168

    
169
    return 0
170

    
171
  def RecordRestartAttempt(self, instance):
172
    """Record a restart attempt.
173

    
174
    @type instance: L{Instance}
175
    @param instance: the instance being restarted
176

    
177
    """
178
    idata = self._data["instance"]
179

    
180
    if instance.name not in idata:
181
      inst = idata[instance.name] = {}
182
    else:
183
      inst = idata[instance.name]
184

    
185
    inst[KEY_RESTART_WHEN] = time.time()
186
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
187

    
188
  def RemoveInstance(self, instance):
189
    """Update state to reflect that a machine is running.
190

    
191
    This method removes the record for a named instance (as we only
192
    track down instances).
193

    
194
    @type instance: L{Instance}
195
    @param instance: the instance to remove from books
196

    
197
    """
198
    idata = self._data["instance"]
199

    
200
    if instance.name in idata:
201
      del idata[instance.name]
202

    
203

    
204
class Instance(object):
205
  """Abstraction for a Virtual Machine instance.
206

    
207
  """
208
  def __init__(self, name, state, autostart):
209
    self.name = name
210
    self.state = state
211
    self.autostart = autostart
212

    
213
  def Restart(self):
214
    """Encapsulates the start of an instance.
215

    
216
    """
217
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
218
    cli.SubmitOpCode(op, cl=client)
219

    
220
  def ActivateDisks(self):
221
    """Encapsulates the activation of all disks of an instance.
222

    
223
    """
224
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
225
    cli.SubmitOpCode(op, cl=client)
226

    
227

    
228
def GetClusterData():
229
  """Get a list of instances on this cluster.
230

    
231
  """
232
  op1_fields = ["name", "status", "admin_state", "snodes"]
233
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
234
                                 use_locking=True)
235
  op2_fields = ["name", "bootid", "offline"]
236
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
237
                             use_locking=True)
238

    
239
  job_id = client.SubmitJob([op1, op2])
240

    
241
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
242

    
243
  result = all_results[0]
244
  smap = {}
245

    
246
  instances = {}
247
  for fields in result:
248
    (name, status, autostart, snodes) = fields
249

    
250
    # update the secondary node map
251
    for node in snodes:
252
      if node not in smap:
253
        smap[node] = []
254
      smap[node].append(name)
255

    
256
    instances[name] = Instance(name, status, autostart)
257

    
258
  nodes =  dict([(name, (bootid, offline))
259
                 for name, bootid, offline in all_results[1]])
260

    
261
  client.ArchiveJob(job_id)
262

    
263
  return instances, nodes, smap
264

    
265

    
266
class Watcher(object):
267
  """Encapsulate the logic for restarting erronously halted virtual machines.
268

    
269
  The calling program should periodically instantiate me and call Run().
270
  This will traverse the list of instances, and make up to MAXTRIES attempts
271
  to restart machines that are down.
272

    
273
  """
274
  def __init__(self, opts, notepad):
275
    self.notepad = notepad
276
    master = client.QueryConfigValues(["master_node"])[0]
277
    if master != utils.HostInfo().name:
278
      raise NotMasterError("This is not the master node")
279
    self.instances, self.bootids, self.smap = GetClusterData()
280
    self.started_instances = set()
281
    self.opts = opts
282

    
283
  def Run(self):
284
    """Watcher run sequence.
285

    
286
    """
287
    notepad = self.notepad
288
    self.ArchiveJobs(self.opts.job_age)
289
    self.CheckInstances(notepad)
290
    self.CheckDisks(notepad)
291
    self.VerifyDisks()
292

    
293
  def ArchiveJobs(self, age):
294
    """Archive old jobs.
295

    
296
    """
297
    arch_count, left_count = client.AutoArchiveJobs(age)
298
    logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
299

    
300
  def CheckDisks(self, notepad):
301
    """Check all nodes for restarted ones.
302

    
303
    """
304
    check_nodes = []
305
    for name, (new_id, offline) in self.bootids.iteritems():
306
      old = notepad.GetNodeBootID(name)
307
      if new_id is None:
308
        # Bad node, not returning a boot id
309
        if not offline:
310
          logging.debug("Node %s missing boot id, skipping secondary checks",
311
                        name)
312
        continue
313
      if old != new_id:
314
        # Node's boot ID has changed, proably through a reboot.
315
        check_nodes.append(name)
316

    
317
    if check_nodes:
318
      # Activate disks for all instances with any of the checked nodes as a
319
      # secondary node.
320
      for node in check_nodes:
321
        if node not in self.smap:
322
          continue
323
        for instance_name in self.smap[node]:
324
          instance = self.instances[instance_name]
325
          if not instance.autostart:
326
            logging.info(("Skipping disk activation for non-autostart"
327
                          " instance %s"), instance.name)
328
            continue
329
          if instance.name in self.started_instances:
330
            # we already tried to start the instance, which should have
331
            # activated its drives (if they can be at all)
332
            continue
333
          try:
334
            logging.info("Activating disks for instance %s", instance.name)
335
            instance.ActivateDisks()
336
          except Exception:
337
            logging.exception("Error while activating disks for instance %s",
338
                              instance.name)
339

    
340
      # Keep changed boot IDs
341
      for name in check_nodes:
342
        notepad.SetNodeBootID(name, self.bootids[name][0])
343

    
344
  def CheckInstances(self, notepad):
345
    """Make a pass over the list of instances, restarting downed ones.
346

    
347
    """
348
    for instance in self.instances.values():
349
      if instance.state in BAD_STATES:
350
        n = notepad.NumberOfRestartAttempts(instance)
351

    
352
        if n > MAXTRIES:
353
          # stay quiet.
354
          continue
355
        elif n < MAXTRIES:
356
          last = " (Attempt #%d)" % (n + 1)
357
        else:
358
          notepad.RecordRestartAttempt(instance)
359
          logging.error("Could not restart %s after %d attempts, giving up",
360
                        instance.name, MAXTRIES)
361
          continue
362
        try:
363
          logging.info("Restarting %s%s",
364
                        instance.name, last)
365
          instance.Restart()
366
          self.started_instances.add(instance.name)
367
        except Exception:
368
          logging.exception("Error while restarting instance %s",
369
                            instance.name)
370

    
371
        notepad.RecordRestartAttempt(instance)
372
      elif instance.state in HELPLESS_STATES:
373
        if notepad.NumberOfRestartAttempts(instance):
374
          notepad.RemoveInstance(instance)
375
      else:
376
        if notepad.NumberOfRestartAttempts(instance):
377
          notepad.RemoveInstance(instance)
378
          logging.info("Restart of %s succeeded", instance.name)
379

    
380
  @staticmethod
381
  def VerifyDisks():
382
    """Run gnt-cluster verify-disks.
383

    
384
    """
385
    op = opcodes.OpVerifyDisks()
386
    job_id = client.SubmitJob([op])
387
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
388
    client.ArchiveJob(job_id)
389
    if not isinstance(result, (tuple, list)):
390
      logging.error("Can't get a valid result from verify-disks")
391
      return
392
    offline_disk_instances = result[2]
393
    if not offline_disk_instances:
394
      # nothing to do
395
      return
396
    logging.debug("Will activate disks for instances %s",
397
                  ", ".join(offline_disk_instances))
398
    # we submit only one job, and wait for it. not optimal, but spams
399
    # less the job queue
400
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
401
           for name in offline_disk_instances]
402
    job_id = cli.SendJob(job, cl=client)
403

    
404
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
405

    
406

    
407
def ParseOptions():
408
  """Parse the command line options.
409

    
410
  @return: (options, args) as from OptionParser.parse_args()
411

    
412
  """
413
  parser = OptionParser(description="Ganeti cluster watcher",
414
                        usage="%prog [-d]",
415
                        version="%%prog (ganeti) %s" %
416
                        constants.RELEASE_VERSION)
417

    
418
  parser.add_option("-d", "--debug", dest="debug",
419
                    help="Write all messages to stderr",
420
                    default=False, action="store_true")
421
  parser.add_option("-A", "--job-age", dest="job_age",
422
                    help="Autoarchive jobs older than this age (default"
423
                    " 6 hours)", default=6*3600)
424
  options, args = parser.parse_args()
425
  options.job_age = cli.ParseTimespec(options.job_age)
426
  return options, args
427

    
428

    
429
def main():
430
  """Main function.
431

    
432
  """
433
  global client
434

    
435
  options, args = ParseOptions()
436

    
437
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
438
                     stderr_logging=options.debug)
439

    
440
  try:
441
    notepad = WatcherState()
442
    try:
443
      client = cli.GetClient()
444

    
445
      try:
446
        watcher = Watcher(options, notepad)
447
      except errors.ConfigurationError:
448
        # Just exit if there's no configuration
449
        sys.exit(constants.EXIT_SUCCESS)
450

    
451
      watcher.Run()
452
    finally:
453
      notepad.Save()
454
  except SystemExit:
455
    raise
456
  except NotMasterError:
457
    logging.debug("Not master, exiting")
458
    sys.exit(constants.EXIT_NOTMASTER)
459
  except errors.ResolverError, err:
460
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
461
    sys.exit(constants.EXIT_NODESETUP_ERROR)
462
  except Exception, err:
463
    logging.error(str(err), exc_info=True)
464
    sys.exit(constants.EXIT_FAILURE)
465

    
466

    
467
if __name__ == '__main__':
468
  main()