Revision 5a3103e9 daemons/ganeti-watcher

b/daemons/ganeti-watcher
24 24
This program and set of classes implement a watchdog to restart
25 25
virtual machines in a Ganeti cluster that have crashed or been killed
26 26
by a node reboot.  Run from cron or similar.
27
"""
28

  
29 27

  
30
LOGFILE = '/var/log/ganeti/watcher.log'
31
MAXTRIES = 5
32
BAD_STATES = ['stopped']
33
HELPLESS_STATES = ['(node down)']
34
NOTICE = 'NOTICE'
35
ERROR = 'ERROR'
28
"""
36 29

  
37 30
import os
38 31
import sys
32
import re
39 33
import time
40 34
import fcntl
41 35
import errno
36
import simplejson
42 37
from optparse import OptionParser
43 38

  
44

  
45 39
from ganeti import utils
46 40
from ganeti import constants
47 41
from ganeti import ssconf
48 42
from ganeti import errors
49 43

  
50 44

  
45
MAXTRIES = 5
46
BAD_STATES = ['stopped']
47
HELPLESS_STATES = ['(node down)']
48
NOTICE = 'NOTICE'
49
ERROR = 'ERROR'
50

  
51

  
51 52
class Error(Exception):
52 53
  """Generic custom error class."""
53 54

  
......
88 89
  return res
89 90

  
90 91

  
91
class RestarterState(object):
92
class WatcherState(object):
92 93
  """Interface to a state file recording restart attempts.
93 94

  
94
  Methods:
95
    Open(): open, lock, read and parse the file.
96
            Raises StandardError on lock contention.
97

  
98
    NumberOfAttempts(name): returns the number of times in succession
99
                            a restart has been attempted of the named instance.
100

  
101
    RecordAttempt(name, when): records one restart attempt of name at
102
                               time in when.
103

  
104
    Remove(name): remove record given by name, if exists.
105

  
106
    Save(name): saves all records to file, releases lock and closes file.
107

  
108 95
  """
109 96
  def __init__(self):
97
    """Open, lock, read and parse the file.
98

  
99
    Raises StandardError on lock contention.
100

  
101
    """
110 102
    # The two-step dance below is necessary to allow both opening existing
111 103
    # file read/write and creating if not existing.  Vanilla open will truncate
112 104
    # an existing file -or- allow creating if not existing.
......
121 113
      raise
122 114

  
123 115
    self.statefile = f
124
    self.inst_map = {}
125 116

  
126
    for line in f:
127
      name, when, count = line.rstrip().split(':')
117
    try:
118
      self.data = simplejson.load(self.statefile)
119
    except Exception, msg:
120
      # Ignore errors while loading the file and treat it as empty
121
      self.data = {}
122
      sys.stderr.write("Empty or invalid state file. "
123
          "Using defaults. Error message: %s\n" % msg)
124

  
125
    if "instance" not in self.data:
126
      self.data["instance"] = {}
127
    if "node" not in self.data:
128
      self.data["node"] = {}
129

  
130
  def __del__(self):
131
    """Called on destruction.
132

  
133
    """
134
    if self.statefile:
135
      self._Close()
136

  
137
  def _Close(self):
138
    """Unlock configuration file and close it.
139

  
140
    """
141
    assert self.statefile
142

  
143
    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
144

  
145
    self.statefile.close()
146
    self.statefile = None
147

  
148
  def GetNodeBootID(self, name):
149
    """Returns the last boot ID of a node or None.
128 150

  
129
      when = int(when)
130
      count = int(count)
151
    """
152
    ndata = self.data["node"]
153

  
154
    if name in ndata and "bootid" in ndata[name]:
155
      return ndata[name]["bootid"]
156
    return None
157

  
158
  def SetNodeBootID(self, name, bootid):
159
    """Sets the boot ID of a node.
160

  
161
    """
162
    assert bootid
131 163

  
132
      self.inst_map[name] = (when, count)
164
    ndata = self.data["node"]
133 165

  
134
  def NumberOfAttempts(self, instance):
166
    if name not in ndata:
167
      ndata[name] = {}
168

  
169
    ndata[name]["bootid"] = bootid
170

  
171
  def NumberOfRestartAttempts(self, instance):
135 172
    """Returns number of previous restart attempts.
136 173

  
137 174
    Args:
138 175
      instance - the instance to look up.
139 176

  
140 177
    """
141
    assert self.statefile
178
    idata = self.data["instance"]
142 179

  
143
    if instance.name in self.inst_map:
144
      return self.inst_map[instance.name][1]
180
    if instance.name in idata:
181
      return idata[instance.name]["restart_count"]
145 182

  
146 183
    return 0
147 184

  
148
  def RecordAttempt(self, instance):
185
  def RecordRestartAttempt(self, instance):
149 186
    """Record a restart attempt.
150 187

  
151 188
    Args:
152 189
      instance - the instance being restarted
153 190

  
154 191
    """
155
    assert self.statefile
192
    idata = self.data["instance"]
156 193

  
157
    when = time.time()
194
    if instance.name not in idata:
195
      inst = idata[instance.name] = {}
196
    else:
197
      inst = idata[instance.name]
158 198

  
159
    self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance))
199
    inst["restart_when"] = time.time()
200
    inst["restart_count"] = idata.get("restart_count", 0) + 1
160 201

  
161
  def Remove(self, instance):
202
  def RemoveInstance(self, instance):
162 203
    """Update state to reflect that a machine is running, i.e. remove record.
163 204

  
164 205
    Args:
......
167 208
    This method removes the record for a named instance.
168 209

  
169 210
    """
170
    assert self.statefile
211
    idata = self.data["instance"]
171 212

  
172
    if instance.name in self.inst_map:
173
      del self.inst_map[instance.name]
213
    if instance.name in idata:
214
      del idata[instance.name]
174 215

  
175 216
  def Save(self):
176
    """Save records to file, then unlock and close file.
217
    """Save state to file, then unlock and close it.
177 218

  
178 219
    """
179 220
    assert self.statefile
......
181 222
    self.statefile.seek(0)
182 223
    self.statefile.truncate()
183 224

  
184
    for name in self.inst_map:
185
      print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name])
225
    simplejson.dump(self.data, self.statefile)
186 226

  
187
    fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN)
188

  
189
    self.statefile.close()
190
    self.statefile = None
227
    self._Close()
191 228

  
192 229

  
193 230
class Instance(object):
......
197 234
    Restart(): issue a command to restart the represented machine.
198 235

  
199 236
  """
200
  def __init__(self, name, state):
237
  def __init__(self, name, state, autostart):
201 238
    self.name = name
202 239
    self.state = state
240
    self.autostart = autostart
203 241

  
204 242
  def Restart(self):
205 243
    """Encapsulates the start of an instance.
206 244

  
207
    This is currently done using the command line interface and not
208
    the Ganeti modules.
209

  
210 245
    """
211 246
    DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name])
212 247

  
248
  def ActivateDisks(self):
249
    """Encapsulates the activation of all disks of an instance.
250

  
251
    """
252
    DoCmd(['gnt-instance', 'activate-disks', '--lock-retries=15', self.name])
253

  
213 254

  
214
class InstanceList(object):
215
  """The set of Virtual Machine instances on a cluster.
255
def _RunListCmd(cmd):
256
  """Runs a command and parses its output into lists.
216 257

  
217 258
  """
218
  cmd = ['gnt-instance', 'list', '--lock-retries=15',
219
         '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:']
259
  for line in DoCmd(cmd).stdout.splitlines():
260
    yield line.split(':')
220 261

  
221
  def __init__(self):
222
    res = DoCmd(self.cmd)
223 262

  
224
    lines = res.stdout.splitlines()
263
def GetInstanceList(with_secondaries=None):
264
  """Get a list of instances on this cluster.
265

  
266
  """
267
  cmd = ['gnt-instance', 'list', '--lock-retries=15', '--no-headers',
268
         '--separator=:']
269

  
270
  fields = 'name,oper_state,admin_state'
225 271

  
226
    self.instances = []
227
    for line in lines:
228
      fields = [fld.strip() for fld in line.split(':')]
272
  if with_secondaries is not None:
273
    fields += ',snodes'
229 274

  
230
      if len(fields) != 3:
275
  cmd.append('-o')
276
  cmd.append(fields)
277

  
278
  instances = []
279
  for fields in _RunListCmd(cmd):
280
    if with_secondaries is not None:
281
      (name, status, autostart, snodes) = fields
282

  
283
      if snodes == "-":
231 284
        continue
232
      if fields[1] == "no": #no autostart, we don't care about this instance
285

  
286
      for node in with_secondaries:
287
        if node in snodes.split(','):
288
          break
289
      else:
233 290
        continue
234
      name, status = fields[0], fields[2]
235 291

  
236
      self.instances.append(Instance(name, status))
292
    else:
293
      (name, status, autostart) = fields
294

  
295
    instances.append(Instance(name, status, autostart != "no"))
237 296

  
238
  def __iter__(self):
239
    return self.instances.__iter__()
297
  return instances
298

  
299

  
300
def GetNodeBootIDs():
301
  """Get a dict mapping nodes to boot IDs.
302

  
303
  """
304
  cmd = ['gnt-node', 'list', '--lock-retries=15', '--no-headers',
305
         '--separator=:', '-o', 'name,bootid']
306

  
307
  ids = {}
308
  for fields in _RunListCmd(cmd):
309
    (name, bootid) = fields
310
    ids[name] = bootid
311

  
312
  return ids
240 313

  
241 314

  
242 315
class Message(object):
......
252 325
    return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg)
253 326

  
254 327

  
255
class Restarter(object):
328
class Watcher(object):
256 329
  """Encapsulate the logic for restarting erronously halted virtual machines.
257 330

  
258 331
  The calling program should periodically instantiate me and call Run().
......
265 338
    master = sstore.GetMasterNode()
266 339
    if master != utils.HostInfo().name:
267 340
      raise NotMasterError("This is not the master node")
268
    self.instances = InstanceList()
341
    self.instances = GetInstanceList()
342
    self.bootids = GetNodeBootIDs()
269 343
    self.messages = []
270 344

  
271 345
  def Run(self):
272
    """Make a pass over the list of instances, restarting downed ones.
346
    notepad = WatcherState()
347
    self.CheckInstances(notepad)
348
    self.CheckDisks(notepad)
349
    notepad.Save()
350

  
351
  def CheckDisks(self, notepad):
352
    """Check all nodes for restarted ones.
273 353

  
274 354
    """
275
    notepad = RestarterState()
355
    check_nodes = []
356
    for name, id in self.bootids.iteritems():
357
      old = notepad.GetNodeBootID(name)
358
      if old != id:
359
        # Node's boot ID has changed, proably through a reboot.
360
        check_nodes.append(name)
361

  
362
    if check_nodes:
363
      # Activate disks for all instances with any of the checked nodes as a
364
      # secondary node.
365
      for instance in GetInstanceList(with_secondaries=check_nodes):
366
        try:
367
          self.messages.append(Message(NOTICE,
368
                                       "Activating disks for %s." %
369
                                       instance.name))
370
          instance.ActivateDisks()
371
        except Error, x:
372
          self.messages.append(Message(ERROR, str(x)))
373

  
374
      # Keep changed boot IDs
375
      for name in check_nodes:
376
        notepad.SetNodeBootID(name, self.bootids[name])
276 377

  
378
  def CheckInstances(self, notepad):
379
    """Make a pass over the list of instances, restarting downed ones.
380

  
381
    """
277 382
    for instance in self.instances:
383
      # Don't care about manually stopped instances
384
      if not instance.autostart:
385
        continue
386

  
278 387
      if instance.state in BAD_STATES:
279
        n = notepad.NumberOfAttempts(instance)
388
        n = notepad.NumberOfRestartAttempts(instance)
280 389

  
281 390
        if n > MAXTRIES:
282 391
          # stay quiet.
......
284 393
        elif n < MAXTRIES:
285 394
          last = " (Attempt #%d)" % (n + 1)
286 395
        else:
287
          notepad.RecordAttempt(instance)
396
          notepad.RecordRestartAttempt(instance)
288 397
          self.messages.append(Message(ERROR, "Could not restart %s for %d"
289 398
                                       " times, giving up..." %
290 399
                                       (instance.name, MAXTRIES)))
......
297 406
        except Error, x:
298 407
          self.messages.append(Message(ERROR, str(x)))
299 408

  
300
        notepad.RecordAttempt(instance)
409
        notepad.RecordRestartAttempt(instance)
301 410
      elif instance.state in HELPLESS_STATES:
302
        if notepad.NumberOfAttempts(instance):
303
          notepad.Remove(instance)
411
        if notepad.NumberOfRestartAttempts(instance):
412
          notepad.RemoveInstance(instance)
304 413
      else:
305
        if notepad.NumberOfAttempts(instance):
306
          notepad.Remove(instance)
414
        if notepad.NumberOfRestartAttempts(instance):
415
          notepad.RemoveInstance(instance)
307 416
          msg = Message(NOTICE,
308 417
                        "Restart of %s succeeded." % instance.name)
309 418
          self.messages.append(msg)
310 419

  
311
    notepad.Save()
312

  
313 420
  def WriteReport(self, logfile):
314 421
    """Log all messages to file.
315 422

  
......
347 454
  options, args = ParseOptions()
348 455

  
349 456
  if not options.debug:
350
    sys.stderr = sys.stdout = open(LOGFILE, 'a')
457
    sys.stderr = sys.stdout = open(constants.LOG_WATCHER, 'a')
351 458

  
352 459
  try:
353
    restarter = Restarter()
354
    restarter.Run()
355
    restarter.WriteReport(sys.stdout)
460
    watcher = Watcher()
461
    watcher.Run()
462
    watcher.WriteReport(sys.stdout)
356 463
  except NotMasterError:
357 464
    if options.debug:
358 465
      sys.stderr.write("Not master, exiting.\n")
......
363 470
  except Error, err:
364 471
    print err
365 472

  
473

  
366 474
if __name__ == '__main__':
367 475
  main()

Also available in: Unified diff