Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-watcher @ f93427cd

History | View | Annotate | Download (15.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Tool to restart erronously downed virtual machines.
23

    
24
This program and set of classes implement a watchdog to restart
25
virtual machines in a Ganeti cluster that have crashed or been killed
26
by a node reboot.  Run from cron or similar.
27

    
28
"""
29

    
30
# pylint: disable-msg=C0103,W0142
31

    
32
# C0103: Invalid name ganeti-watcher
33

    
34
import os
35
import sys
36
import time
37
import logging
38
import errno
39
from optparse import OptionParser
40

    
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import serializer
44
from ganeti import errors
45
from ganeti import opcodes
46
from ganeti import cli
47
from ganeti import luxi
48

    
49

    
50
MAXTRIES = 5
51
BAD_STATES = ['ERROR_down']
52
HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
53
NOTICE = 'NOTICE'
54
ERROR = 'ERROR'
55
KEY_RESTART_COUNT = "restart_count"
56
KEY_RESTART_WHEN = "restart_when"
57
KEY_BOOT_ID = "bootid"
58

    
59

    
60
# Global client object
61
client = None
62

    
63

    
64
class NotMasterError(errors.GenericError):
65
  """Exception raised when this host is not the master."""
66

    
67

    
68
def Indent(s, prefix='| '):
69
  """Indent a piece of text with a given prefix before each line.
70

    
71
  @param s: the string to indent
72
  @param prefix: the string to prepend each line
73

    
74
  """
75
  return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
76

    
77

    
78
def ShouldPause():
79
  """Check whether we should pause.
80

    
81
  """
82
  return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
83

    
84

    
85
def EnsureDaemon(name):
86
  """Check for and start daemon if not alive.
87

    
88
  """
89
  result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
90
  if result.failed:
91
    logging.error("Can't start daemon '%s', failure %s, output: %s",
92
                  name, result.fail_reason, result.output)
93
    return False
94

    
95
  return True
96

    
97

    
98
class WatcherState(object):
99
  """Interface to a state file recording restart attempts.
100

    
101
  """
102
  def __init__(self):
103
    """Open, lock, read and parse the file.
104

    
105
    Raises exception on lock contention.
106

    
107
    """
108
    # The two-step dance below is necessary to allow both opening existing
109
    # file read/write and creating if not existing.  Vanilla open will truncate
110
    # an existing file -or- allow creating if not existing.
111
    fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
112
    self.statefile = os.fdopen(fd, 'w+')
113

    
114
    utils.LockFile(self.statefile.fileno())
115

    
116
    try:
117
      state_data = self.statefile.read()
118
      if not state_data:
119
        self._data = {}
120
      else:
121
        self._data = serializer.Load(state_data)
122
    except Exception, msg: # pylint: disable-msg=W0703
123
      # Ignore errors while loading the file and treat it as empty
124
      self._data = {}
125
      logging.warning(("Invalid state file. Using defaults."
126
                       " Error message: %s"), msg)
127

    
128
    if "instance" not in self._data:
129
      self._data["instance"] = {}
130
    if "node" not in self._data:
131
      self._data["node"] = {}
132

    
133
    self._orig_data = serializer.Dump(self._data)
134

    
135
  def Save(self):
136
    """Save state to file, then unlock and close it.
137

    
138
    """
139
    assert self.statefile
140

    
141
    serialized_form = serializer.Dump(self._data)
142
    if self._orig_data == serialized_form:
143
      logging.debug("Data didn't change, just touching status file")
144
      os.utime(constants.WATCHER_STATEFILE, None)
145
      return
146

    
147
    # We need to make sure the file is locked before renaming it, otherwise
148
    # starting ganeti-watcher again at the same time will create a conflict.
149
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
150
                         data=serialized_form,
151
                         prewrite=utils.LockFile, close=False)
152
    self.statefile = os.fdopen(fd, 'w+')
153

    
154
  def Close(self):
155
    """Unlock configuration file and close it.
156

    
157
    """
158
    assert self.statefile
159

    
160
    # Files are automatically unlocked when closing them
161
    self.statefile.close()
162
    self.statefile = None
163

    
164
  def GetNodeBootID(self, name):
165
    """Returns the last boot ID of a node or None.
166

    
167
    """
168
    ndata = self._data["node"]
169

    
170
    if name in ndata and KEY_BOOT_ID in ndata[name]:
171
      return ndata[name][KEY_BOOT_ID]
172
    return None
173

    
174
  def SetNodeBootID(self, name, bootid):
175
    """Sets the boot ID of a node.
176

    
177
    """
178
    assert bootid
179

    
180
    ndata = self._data["node"]
181

    
182
    if name not in ndata:
183
      ndata[name] = {}
184

    
185
    ndata[name][KEY_BOOT_ID] = bootid
186

    
187
  def NumberOfRestartAttempts(self, instance):
188
    """Returns number of previous restart attempts.
189

    
190
    @type instance: L{Instance}
191
    @param instance: the instance to look up
192

    
193
    """
194
    idata = self._data["instance"]
195

    
196
    if instance.name in idata:
197
      return idata[instance.name][KEY_RESTART_COUNT]
198

    
199
    return 0
200

    
201
  def RecordRestartAttempt(self, instance):
202
    """Record a restart attempt.
203

    
204
    @type instance: L{Instance}
205
    @param instance: the instance being restarted
206

    
207
    """
208
    idata = self._data["instance"]
209

    
210
    if instance.name not in idata:
211
      inst = idata[instance.name] = {}
212
    else:
213
      inst = idata[instance.name]
214

    
215
    inst[KEY_RESTART_WHEN] = time.time()
216
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
217

    
218
  def RemoveInstance(self, instance):
219
    """Update state to reflect that a machine is running.
220

    
221
    This method removes the record for a named instance (as we only
222
    track down instances).
223

    
224
    @type instance: L{Instance}
225
    @param instance: the instance to remove from books
226

    
227
    """
228
    idata = self._data["instance"]
229

    
230
    if instance.name in idata:
231
      del idata[instance.name]
232

    
233

    
234
class Instance(object):
235
  """Abstraction for a Virtual Machine instance.
236

    
237
  """
238
  def __init__(self, name, state, autostart):
239
    self.name = name
240
    self.state = state
241
    self.autostart = autostart
242

    
243
  def Restart(self):
244
    """Encapsulates the start of an instance.
245

    
246
    """
247
    op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
248
    cli.SubmitOpCode(op, cl=client)
249

    
250
  def ActivateDisks(self):
251
    """Encapsulates the activation of all disks of an instance.
252

    
253
    """
254
    op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
255
    cli.SubmitOpCode(op, cl=client)
256

    
257

    
258
def GetClusterData():
259
  """Get a list of instances on this cluster.
260

    
261
  """
262
  op1_fields = ["name", "status", "admin_state", "snodes"]
263
  op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
264
                                 use_locking=True)
265
  op2_fields = ["name", "bootid", "offline"]
266
  op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
267
                             use_locking=True)
268

    
269
  job_id = client.SubmitJob([op1, op2])
270

    
271
  all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
272

    
273
  logging.debug("Got data from cluster, writing instance status file")
274

    
275
  result = all_results[0]
276
  smap = {}
277

    
278
  instances = {}
279

    
280
  # write the upfile
281
  up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
282
  utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
283

    
284
  for fields in result:
285
    (name, status, autostart, snodes) = fields
286

    
287
    # update the secondary node map
288
    for node in snodes:
289
      if node not in smap:
290
        smap[node] = []
291
      smap[node].append(name)
292

    
293
    instances[name] = Instance(name, status, autostart)
294

    
295
  nodes =  dict([(name, (bootid, offline))
296
                 for name, bootid, offline in all_results[1]])
297

    
298
  client.ArchiveJob(job_id)
299

    
300
  return instances, nodes, smap
301

    
302

    
303
class Watcher(object):
304
  """Encapsulate the logic for restarting erronously halted virtual machines.
305

    
306
  The calling program should periodically instantiate me and call Run().
307
  This will traverse the list of instances, and make up to MAXTRIES attempts
308
  to restart machines that are down.
309

    
310
  """
311
  def __init__(self, opts, notepad):
312
    self.notepad = notepad
313
    master = client.QueryConfigValues(["master_node"])[0]
314
    if master != utils.HostInfo().name:
315
      raise NotMasterError("This is not the master node")
316
    # first archive old jobs
317
    self.ArchiveJobs(opts.job_age)
318
    # and only then submit new ones
319
    self.instances, self.bootids, self.smap = GetClusterData()
320
    self.started_instances = set()
321
    self.opts = opts
322

    
323
  def Run(self):
324
    """Watcher run sequence.
325

    
326
    """
327
    notepad = self.notepad
328
    self.CheckInstances(notepad)
329
    self.CheckDisks(notepad)
330
    self.VerifyDisks()
331

    
332
  @staticmethod
333
  def ArchiveJobs(age):
334
    """Archive old jobs.
335

    
336
    """
337
    arch_count, left_count = client.AutoArchiveJobs(age)
338
    logging.debug("Archived %s jobs, left %s", arch_count, left_count)
339

    
340
  def CheckDisks(self, notepad):
341
    """Check all nodes for restarted ones.
342

    
343
    """
344
    check_nodes = []
345
    for name, (new_id, offline) in self.bootids.iteritems():
346
      old = notepad.GetNodeBootID(name)
347
      if new_id is None:
348
        # Bad node, not returning a boot id
349
        if not offline:
350
          logging.debug("Node %s missing boot id, skipping secondary checks",
351
                        name)
352
        continue
353
      if old != new_id:
354
        # Node's boot ID has changed, proably through a reboot.
355
        check_nodes.append(name)
356

    
357
    if check_nodes:
358
      # Activate disks for all instances with any of the checked nodes as a
359
      # secondary node.
360
      for node in check_nodes:
361
        if node not in self.smap:
362
          continue
363
        for instance_name in self.smap[node]:
364
          instance = self.instances[instance_name]
365
          if not instance.autostart:
366
            logging.info(("Skipping disk activation for non-autostart"
367
                          " instance %s"), instance.name)
368
            continue
369
          if instance.name in self.started_instances:
370
            # we already tried to start the instance, which should have
371
            # activated its drives (if they can be at all)
372
            continue
373
          try:
374
            logging.info("Activating disks for instance %s", instance.name)
375
            instance.ActivateDisks()
376
          except Exception: # pylint: disable-msg=W0703
377
            logging.exception("Error while activating disks for instance %s",
378
                              instance.name)
379

    
380
      # Keep changed boot IDs
381
      for name in check_nodes:
382
        notepad.SetNodeBootID(name, self.bootids[name][0])
383

    
384
  def CheckInstances(self, notepad):
385
    """Make a pass over the list of instances, restarting downed ones.
386

    
387
    """
388
    for instance in self.instances.values():
389
      if instance.state in BAD_STATES:
390
        n = notepad.NumberOfRestartAttempts(instance)
391

    
392
        if n > MAXTRIES:
393
          # stay quiet.
394
          continue
395
        elif n < MAXTRIES:
396
          last = " (Attempt #%d)" % (n + 1)
397
        else:
398
          notepad.RecordRestartAttempt(instance)
399
          logging.error("Could not restart %s after %d attempts, giving up",
400
                        instance.name, MAXTRIES)
401
          continue
402
        try:
403
          logging.info("Restarting %s%s",
404
                        instance.name, last)
405
          instance.Restart()
406
          self.started_instances.add(instance.name)
407
        except Exception: # pylint: disable-msg=W0703
408
          logging.exception("Error while restarting instance %s",
409
                            instance.name)
410

    
411
        notepad.RecordRestartAttempt(instance)
412
      elif instance.state in HELPLESS_STATES:
413
        if notepad.NumberOfRestartAttempts(instance):
414
          notepad.RemoveInstance(instance)
415
      else:
416
        if notepad.NumberOfRestartAttempts(instance):
417
          notepad.RemoveInstance(instance)
418
          logging.info("Restart of %s succeeded", instance.name)
419

    
420
  @staticmethod
421
  def VerifyDisks():
422
    """Run gnt-cluster verify-disks.
423

    
424
    """
425
    op = opcodes.OpVerifyDisks()
426
    job_id = client.SubmitJob([op])
427
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
428
    client.ArchiveJob(job_id)
429
    if not isinstance(result, (tuple, list)):
430
      logging.error("Can't get a valid result from verify-disks")
431
      return
432
    offline_disk_instances = result[2]
433
    if not offline_disk_instances:
434
      # nothing to do
435
      return
436
    logging.debug("Will activate disks for instances %s",
437
                  utils.CommaJoin(offline_disk_instances))
438
    # we submit only one job, and wait for it. not optimal, but spams
439
    # less the job queue
440
    job = [opcodes.OpActivateInstanceDisks(instance_name=name)
441
           for name in offline_disk_instances]
442
    job_id = cli.SendJob(job, cl=client)
443

    
444
    cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
445

    
446

    
447
def ParseOptions():
448
  """Parse the command line options.
449

    
450
  @return: (options, args) as from OptionParser.parse_args()
451

    
452
  """
453
  parser = OptionParser(description="Ganeti cluster watcher",
454
                        usage="%prog [-d]",
455
                        version="%%prog (ganeti) %s" %
456
                        constants.RELEASE_VERSION)
457

    
458
  parser.add_option(cli.DEBUG_OPT)
459
  parser.add_option("-A", "--job-age", dest="job_age",
460
                    help="Autoarchive jobs older than this age (default"
461
                    " 6 hours)", default=6*3600)
462
  options, args = parser.parse_args()
463
  options.job_age = cli.ParseTimespec(options.job_age)
464
  return options, args
465

    
466

    
467
def main():
468
  """Main function.
469

    
470
  """
471
  global client # pylint: disable-msg=W0603
472

    
473
  options, args = ParseOptions()
474

    
475
  if args: # watcher doesn't take any arguments
476
    print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
477
    sys.exit(constants.EXIT_FAILURE)
478

    
479
  utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
480
                     stderr_logging=options.debug)
481

    
482
  if ShouldPause():
483
    logging.debug("Pause has been set, exiting")
484
    sys.exit(constants.EXIT_SUCCESS)
485

    
486
  update_file = False
487
  try:
488
    # on master or not, try to start the node dameon
489
    EnsureDaemon(constants.NODED)
490

    
491
    notepad = WatcherState()
492
    try:
493
      try:
494
        client = cli.GetClient()
495
      except errors.OpPrereqError:
496
        # this is, from cli.GetClient, a not-master case
497
        logging.debug("Not on master, exiting")
498
        update_file = True
499
        sys.exit(constants.EXIT_SUCCESS)
500
      except luxi.NoMasterError, err:
501
        logging.warning("Master seems to be down (%s), trying to restart",
502
                        str(err))
503
        if not EnsureDaemon(constants.MASTERD):
504
          logging.critical("Can't start the master, exiting")
505
          sys.exit(constants.EXIT_FAILURE)
506
        # else retry the connection
507
        client = cli.GetClient()
508

    
509
      # we are on master now
510
      EnsureDaemon(constants.RAPI)
511

    
512
      try:
513
        watcher = Watcher(options, notepad)
514
      except errors.ConfigurationError:
515
        # Just exit if there's no configuration
516
        update_file = True
517
        sys.exit(constants.EXIT_SUCCESS)
518

    
519
      watcher.Run()
520
      update_file = True
521

    
522
    finally:
523
      if update_file:
524
        notepad.Save()
525
      else:
526
        logging.debug("Not updating status file due to failure")
527
  except SystemExit:
528
    raise
529
  except NotMasterError:
530
    logging.debug("Not master, exiting")
531
    sys.exit(constants.EXIT_NOTMASTER)
532
  except errors.ResolverError, err:
533
    logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
534
    sys.exit(constants.EXIT_NODESETUP_ERROR)
535
  except errors.JobQueueFull:
536
    logging.error("Job queue is full, can't query cluster state")
537
  except errors.JobQueueDrainError:
538
    logging.error("Job queue is drained, can't maintain cluster state")
539
  except Exception, err:
540
    logging.error(str(err), exc_info=True)
541
    sys.exit(constants.EXIT_FAILURE)
542

    
543

    
544
if __name__ == '__main__':
545
  main()