Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ 07b8a2b5

History | View | Annotate | Download (15.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
import os
31
import sys
32
import time
33
import logging
34
import errno
35
from optparse import OptionParser
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import errors
41
from ganeti import opcodes
42
from ganeti import cli
43
from ganeti import luxi
44

    
45

    
46
MAXTRIES = 5
47
BAD_STATES = ['ERROR_down']
48
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49
NOTICE = 'NOTICE'
50
ERROR = 'ERROR'
51
KEY_RESTART_COUNT = "restart_count"
52
KEY_RESTART_WHEN = "restart_when"
53
KEY_BOOT_ID = "bootid"
54

    
55

    
56
# Global client object
57
client = None
58

    
59

    
60
class NotMasterError(errors.GenericError):
61
  """Exception raised when this host is not the master."""
62

    
63

    
64
def Indent(s, prefix='| '):
65
  """Indent a piece of text with a given prefix before each line.
66

    
67
  @param s: the string to indent
68
  @param prefix: the string to prepend each line
69

    
70
  """
71
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72

    
73

    
74
def ShouldPause():
75
  """Check whether we should pause.
76

    
77
  """
78
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
79

    
80

    
81
def EnsureDaemon(name):
82
  """Check for and start daemon if not alive.
83

    
84
  """
85
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
86
  if result.failed:
87
    logging.error("Can't start daemon '%s', failure %s, output: %s",
88
                  name, result.fail_reason, result.output)
89
    return False
90

    
91
  return True
92

    
93

    
94
class WatcherState(object):
95
  """Interface to a state file recording restart attempts.
96

    
97
  """
98
  def __init__(self):
99
    """Open, lock, read and parse the file.
100

    
101
    Raises exception on lock contention.
102

    
103
    """
104
    # The two-step dance below is necessary to allow both opening existing
105
    # file read/write and creating if not existing.  Vanilla open will truncate
106
    # an existing file -or- allow creating if not existing.
107
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
108
    self.statefile = os.fdopen(fd, 'w+')
109

    
110
    utils.LockFile(self.statefile.fileno())
111

    
112
    try:
113
      state_data = self.statefile.read()
114
      if not state_data:
115
        self._data = {}
116
      else:
117
        self._data = serializer.Load(state_data)
118
    except Exception, msg:
119
      # Ignore errors while loading the file and treat it as empty
120
      self._data = {}
121
      logging.warning(("Invalid state file. Using defaults."
122
                       " Error message: %s"), msg)
123

    
124
    if "instance" not in self._data:
125
      self._data["instance"] = {}
126
    if "node" not in self._data:
127
      self._data["node"] = {}
128

    
129
    self._orig_data = serializer.Dump(self._data)
130

    
131
  def Save(self):
132
    """Save state to file, then unlock and close it.
133

    
134
    """
135
    assert self.statefile
136

    
137
    serialized_form = serializer.Dump(self._data)
138
    if self._orig_data == serialized_form:
139
      logging.debug("Data didn't change, just touching status file")
140
      os.utime(constants.WATCHER_STATEFILE, None)
141
      return
142

    
143
    # We need to make sure the file is locked before renaming it, otherwise
144
    # starting ganeti-watcher again at the same time will create a conflict.
145
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
146
                         data=serialized_form,
147
                         prewrite=utils.LockFile, close=False)
148
    self.statefile = os.fdopen(fd, 'w+')
149

    
150
  def Close(self):
151
    """Unlock configuration file and close it.
152

    
153
    """
154
    assert self.statefile
155

    
156
    # Files are automatically unlocked when closing them
157
    self.statefile.close()
158
    self.statefile = None
159

    
160
  def GetNodeBootID(self, name):
161
    """Returns the last boot ID of a node or None.
162

    
163
    """
164
    ndata = self._data["node"]
165

    
166
    if name in ndata and KEY_BOOT_ID in ndata[name]:
167
      return ndata[name][KEY_BOOT_ID]
168
    return None
169

    
170
  def SetNodeBootID(self, name, bootid):
171
    """Sets the boot ID of a node.
172

    
173
    """
174
    assert bootid
175

    
176
    ndata = self._data["node"]
177

    
178
    if name not in ndata:
179
      ndata[name] = {}
180

    
181
    ndata[name][KEY_BOOT_ID] = bootid
182

    
183
  def NumberOfRestartAttempts(self, instance):
184
    """Returns number of previous restart attempts.
185

    
186
    @type instance: L{Instance}
187
    @param instance: the instance to look up
188

    
189
    """
190
    idata = self._data["instance"]
191

    
192
    if instance.name in idata:
193
      return idata[instance.name][KEY_RESTART_COUNT]
194

    
195
    return 0
196

    
197
  def RecordRestartAttempt(self, instance):
198
    """Record a restart attempt.
199

    
200
    @type instance: L{Instance}
201
    @param instance: the instance being restarted
202

    
203
    """
204
    idata = self._data["instance"]
205

    
206
    if instance.name not in idata:
207
      inst = idata[instance.name] = {}
208
    else:
209
      inst = idata[instance.name]
210

    
211
    inst[KEY_RESTART_WHEN] = time.time()
212
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
213

    
214
  def RemoveInstance(self, instance):
215
    """Update state to reflect that a machine is running.
216

    
217
    This method removes the record for a named instance (as we only
218
    track down instances).
219

    
220
    @type instance: L{Instance}
221
    @param instance: the instance to remove from books
222

    
223
    """
224
    idata = self._data["instance"]
225

    
226
    if instance.name in idata:
227
      del idata[instance.name]
228

    
229

    
230
class Instance(object):
231
  """Abstraction for a Virtual Machine instance.
232

    
233
  """
234
  def __init__(self, name, state, autostart):
235
    self.name = name
236
    self.state = state
237
    self.autostart = autostart
238

    
239
  def Restart(self):
240
    """Encapsulates the start of an instance.
241

    
242
    """
243
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
244
    cli.SubmitOpCode(op, cl=client)
245

    
246
  def ActivateDisks(self):
247
    """Encapsulates the activation of all disks of an instance.
248

    
249
    """
250
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
251
    cli.SubmitOpCode(op, cl=client)
252

    
253

    
254
def GetClusterData():
255
  """Get a list of instances on this cluster.
256

    
257
  """
258
  op1_fields = ["name", "status", "admin_state", "snodes"]
259
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
260
                                 use_locking=True)
261
  op2_fields = ["name", "bootid", "offline"]
262
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
263
                             use_locking=True)
264

    
265
  job_id = client.SubmitJob([op1, op2])
266

    
267
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
268

    
269
  logging.debug("Got data from cluster, writing instance status file")
270

    
271
  result = all_results[0]
272
  smap = {}
273

    
274
  instances = {}
275

    
276
  # write the upfile
277
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
278
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
279

    
280
  for fields in result:
281
    (name, status, autostart, snodes) = fields
282

    
283
    # update the secondary node map
284
    for node in snodes:
285
      if node not in smap:
286
        smap[node] = []
287
      smap[node].append(name)
288

    
289
    instances[name] = Instance(name, status, autostart)
290

    
291
  nodes =  dict([(name, (bootid, offline))
292
                 for name, bootid, offline in all_results[1]])
293

    
294
  client.ArchiveJob(job_id)
295

    
296
  return instances, nodes, smap
297

    
298

    
299
class Watcher(object):
300
  """Encapsulate the logic for restarting erronously halted virtual machines.
301

    
302
  The calling program should periodically instantiate me and call Run().
303
  This will traverse the list of instances, and make up to MAXTRIES attempts
304
  to restart machines that are down.
305

    
306
  """
307
  def __init__(self, opts, notepad):
308
    self.notepad = notepad
309
    master = client.QueryConfigValues(["master_node"])[0]
310
    if master != utils.HostInfo().name:
311
      raise NotMasterError("This is not the master node")
312
    # first archive old jobs
313
    self.ArchiveJobs(opts.job_age)
314
    # and only then submit new ones
315
    self.instances, self.bootids, self.smap = GetClusterData()
316
    self.started_instances = set()
317
    self.opts = opts
318

    
319
  def Run(self):
320
    """Watcher run sequence.
321

    
322
    """
323
    notepad = self.notepad
324
    self.CheckInstances(notepad)
325
    self.CheckDisks(notepad)
326
    self.VerifyDisks()
327

    
328
  @staticmethod
329
  def ArchiveJobs(age):
330
    """Archive old jobs.
331

    
332
    """
333
    arch_count, left_count = client.AutoArchiveJobs(age)
334
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
335

    
336
  def CheckDisks(self, notepad):
337
    """Check all nodes for restarted ones.
338

    
339
    """
340
    check_nodes = []
341
    for name, (new_id, offline) in self.bootids.iteritems():
342
      old = notepad.GetNodeBootID(name)
343
      if new_id is None:
344
        # Bad node, not returning a boot id
345
        if not offline:
346
          logging.debug("Node %s missing boot id, skipping secondary checks",
347
                        name)
348
        continue
349
      if old != new_id:
350
        # Node's boot ID has changed, proably through a reboot.
351
        check_nodes.append(name)
352

    
353
    if check_nodes:
354
      # Activate disks for all instances with any of the checked nodes as a
355
      # secondary node.
356
      for node in check_nodes:
357
        if node not in self.smap:
358
          continue
359
        for instance_name in self.smap[node]:
360
          instance = self.instances[instance_name]
361
          if not instance.autostart:
362
            logging.info(("Skipping disk activation for non-autostart"
363
                          " instance %s"), instance.name)
364
            continue
365
          if instance.name in self.started_instances:
366
            # we already tried to start the instance, which should have
367
            # activated its drives (if they can be at all)
368
            continue
369
          try:
370
            logging.info("Activating disks for instance %s", instance.name)
371
            instance.ActivateDisks()
372
          except Exception:
373
            logging.exception("Error while activating disks for instance %s",
374
                              instance.name)
375

    
376
      # Keep changed boot IDs
377
      for name in check_nodes:
378
        notepad.SetNodeBootID(name, self.bootids[name][0])
379

    
380
  def CheckInstances(self, notepad):
381
    """Make a pass over the list of instances, restarting downed ones.
382

    
383
    """
384
    for instance in self.instances.values():
385
      if instance.state in BAD_STATES:
386
        n = notepad.NumberOfRestartAttempts(instance)
387

    
388
        if n > MAXTRIES:
389
          # stay quiet.
390
          continue
391
        elif n < MAXTRIES:
392
          last = " (Attempt #%d)" % (n + 1)
393
        else:
394
          notepad.RecordRestartAttempt(instance)
395
          logging.error("Could not restart %s after %d attempts, giving up",
396
                        instance.name, MAXTRIES)
397
          continue
398
        try:
399
          logging.info("Restarting %s%s",
400
                        instance.name, last)
401
          instance.Restart()
402
          self.started_instances.add(instance.name)
403
        except Exception:
404
          logging.exception("Error while restarting instance %s",
405
                            instance.name)
406

    
407
        notepad.RecordRestartAttempt(instance)
408
      elif instance.state in HELPLESS_STATES:
409
        if notepad.NumberOfRestartAttempts(instance):
410
          notepad.RemoveInstance(instance)
411
      else:
412
        if notepad.NumberOfRestartAttempts(instance):
413
          notepad.RemoveInstance(instance)
414
          logging.info("Restart of %s succeeded", instance.name)
415

    
416
  @staticmethod
417
  def VerifyDisks():
418
    """Run gnt-cluster verify-disks.
419

    
420
    """
421
    op = opcodes.OpVerifyDisks()
422
    job_id = client.SubmitJob([op])
423
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
424
    client.ArchiveJob(job_id)
425
    if not isinstance(result, (tuple, list)):
426
      logging.error("Can't get a valid result from verify-disks")
427
      return
428
    offline_disk_instances = result[2]
429
    if not offline_disk_instances:
430
      # nothing to do
431
      return
432
    logging.debug("Will activate disks for instances %s",
433
                  utils.CommaJoin(offline_disk_instances))
434
    # we submit only one job, and wait for it. not optimal, but spams
435
    # less the job queue
436
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
437
           for name in offline_disk_instances]
438
    job_id = cli.SendJob(job, cl=client)
439

    
440
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
441

    
442

    
443
def ParseOptions():
444
  """Parse the command line options.
445

    
446
  @return: (options, args) as from OptionParser.parse_args()
447

    
448
  """
449
  parser = OptionParser(description="Ganeti cluster watcher",
450
                        usage="%prog [-d]",
451
                        version="%%prog (ganeti) %s" %
452
                        constants.RELEASE_VERSION)
453

    
454
  parser.add_option(cli.DEBUG_OPT)
455
  parser.add_option("-A", "--job-age", dest="job_age",
456
                    help="Autoarchive jobs older than this age (default"
457
                    " 6 hours)", default=6*3600)
458
  options, args = parser.parse_args()
459
  options.job_age = cli.ParseTimespec(options.job_age)
460
  return options, args
461

    
462

    
463
def main():
464
  """Main function.
465

    
466
  """
467
  global client
468

    
469
  options, args = ParseOptions()
470

    
471
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
472
                     stderr_logging=options.debug)
473

    
474
  if ShouldPause():
475
    logging.debug("Pause has been set, exiting")
476
    sys.exit(constants.EXIT_SUCCESS)
477

    
478
  update_file = False
479
  try:
480
    # on master or not, try to start the node dameon
481
    EnsureDaemon(constants.NODED)
482

    
483
    notepad = WatcherState()
484
    try:
485
      try:
486
        client = cli.GetClient()
487
      except errors.OpPrereqError:
488
        # this is, from cli.GetClient, a not-master case
489
        logging.debug("Not on master, exiting")
490
        update_file = True
491
        sys.exit(constants.EXIT_SUCCESS)
492
      except luxi.NoMasterError, err:
493
        logging.warning("Master seems to be down (%s), trying to restart",
494
                        str(err))
495
        if not EnsureDaemon(constants.MASTERD):
496
          logging.critical("Can't start the master, exiting")
497
          sys.exit(constants.EXIT_FAILURE)
498
        # else retry the connection
499
        client = cli.GetClient()
500

    
501
      # we are on master now
502
      EnsureDaemon(constants.RAPI)
503

    
504
      try:
505
        watcher = Watcher(options, notepad)
506
      except errors.ConfigurationError:
507
        # Just exit if there's no configuration
508
        update_file = True
509
        sys.exit(constants.EXIT_SUCCESS)
510

    
511
      watcher.Run()
512
      update_file = True
513

    
514
    finally:
515
      if update_file:
516
        notepad.Save()
517
      else:
518
        logging.debug("Not updating status file due to failure")
519
  except SystemExit:
520
    raise
521
  except NotMasterError:
522
    logging.debug("Not master, exiting")
523
    sys.exit(constants.EXIT_NOTMASTER)
524
  except errors.ResolverError, err:
525
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
526
    sys.exit(constants.EXIT_NODESETUP_ERROR)
527
  except errors.JobQueueFull:
528
    logging.error("Job queue is full, can't query cluster state")
529
  except errors.JobQueueDrainError:
530
    logging.error("Job queue is drained, can't maintain cluster state")
531
  except Exception, err:
532
    logging.error(str(err), exc_info=True)
533
    sys.exit(constants.EXIT_FAILURE)
534

    
535

    
536
if __name__ == '__main__':
537
  main()