Revision adf6301e lib/watcher/__init__.py

b/lib/watcher/__init__.py
37 37
from ganeti import utils
38 38
from ganeti import constants
39 39
from ganeti import compat
40
from ganeti import serializer
41 40
from ganeti import errors
42 41
from ganeti import opcodes
43 42
from ganeti import cli
......
46 45
from ganeti import netutils
47 46

  
48 47
import ganeti.rapi.client # pylint: disable-msg=W0611
49
import ganeti.watcher.nodemaint # pylint: disable-msg=W0611
48

  
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
50 51

  
51 52

  
52 53
MAXTRIES = 5
53
# Delete any record that is older than 8 hours; this value is based on
54
# the fact that the current retry counter is 5, and watcher runs every
55
# 5 minutes, so it takes around half an hour to exceed the retry
56
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
57
RETRY_EXPIRATION = 8 * 3600
58 54
BAD_STATES = [constants.INSTST_ERRORDOWN]
59 55
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
60 56
NOTICE = 'NOTICE'
61 57
ERROR = 'ERROR'
62
KEY_RESTART_COUNT = "restart_count"
63
KEY_RESTART_WHEN = "restart_when"
64
KEY_BOOT_ID = "bootid"
65 58

  
66 59

  
67 60
# Global LUXI client object
......
118 111
                      runresult.output)
119 112

  
120 113

  
121
class WatcherState(object):
122
  """Interface to a state file recording restart attempts.
123

  
124
  """
125
  def __init__(self, statefile):
126
    """Open, lock, read and parse the file.
127

  
128
    @type statefile: file
129
    @param statefile: State file object
130

  
131
    """
132
    self.statefile = statefile
133

  
134
    try:
135
      state_data = self.statefile.read()
136
      if not state_data:
137
        self._data = {}
138
      else:
139
        self._data = serializer.Load(state_data)
140
    except Exception, msg: # pylint: disable-msg=W0703
141
      # Ignore errors while loading the file and treat it as empty
142
      self._data = {}
143
      logging.warning(("Invalid state file. Using defaults."
144
                       " Error message: %s"), msg)
145

  
146
    if "instance" not in self._data:
147
      self._data["instance"] = {}
148
    if "node" not in self._data:
149
      self._data["node"] = {}
150

  
151
    self._orig_data = serializer.Dump(self._data)
152

  
153
  def Save(self):
154
    """Save state to file, then unlock and close it.
155

  
156
    """
157
    assert self.statefile
158

  
159
    serialized_form = serializer.Dump(self._data)
160
    if self._orig_data == serialized_form:
161
      logging.debug("Data didn't change, just touching status file")
162
      os.utime(constants.WATCHER_STATEFILE, None)
163
      return
164

  
165
    # We need to make sure the file is locked before renaming it, otherwise
166
    # starting ganeti-watcher again at the same time will create a conflict.
167
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
168
                         data=serialized_form,
169
                         prewrite=utils.LockFile, close=False)
170
    self.statefile = os.fdopen(fd, 'w+')
171

  
172
  def Close(self):
173
    """Unlock configuration file and close it.
174

  
175
    """
176
    assert self.statefile
177

  
178
    # Files are automatically unlocked when closing them
179
    self.statefile.close()
180
    self.statefile = None
181

  
182
  def GetNodeBootID(self, name):
183
    """Returns the last boot ID of a node or None.
184

  
185
    """
186
    ndata = self._data["node"]
187

  
188
    if name in ndata and KEY_BOOT_ID in ndata[name]:
189
      return ndata[name][KEY_BOOT_ID]
190
    return None
191

  
192
  def SetNodeBootID(self, name, bootid):
193
    """Sets the boot ID of a node.
194

  
195
    """
196
    assert bootid
197

  
198
    ndata = self._data["node"]
199

  
200
    if name not in ndata:
201
      ndata[name] = {}
202

  
203
    ndata[name][KEY_BOOT_ID] = bootid
204

  
205
  def NumberOfRestartAttempts(self, instance):
206
    """Returns number of previous restart attempts.
207

  
208
    @type instance: L{Instance}
209
    @param instance: the instance to look up
210

  
211
    """
212
    idata = self._data["instance"]
213

  
214
    if instance.name in idata:
215
      return idata[instance.name][KEY_RESTART_COUNT]
216

  
217
    return 0
218

  
219
  def MaintainInstanceList(self, instances):
220
    """Perform maintenance on the recorded instances.
221

  
222
    @type instances: list of string
223
    @param instances: the list of currently existing instances
224

  
225
    """
226
    idict = self._data["instance"]
227
    # First, delete obsolete instances
228
    obsolete_instances = set(idict).difference(instances)
229
    for inst in obsolete_instances:
230
      logging.debug("Forgetting obsolete instance %s", inst)
231
      del idict[inst]
232

  
233
    # Second, delete expired records
234
    earliest = time.time() - RETRY_EXPIRATION
235
    expired_instances = [i for i in idict
236
                         if idict[i][KEY_RESTART_WHEN] < earliest]
237
    for inst in expired_instances:
238
      logging.debug("Expiring record for instance %s", inst)
239
      del idict[inst]
240

  
241
  def RecordRestartAttempt(self, instance):
242
    """Record a restart attempt.
243

  
244
    @type instance: L{Instance}
245
    @param instance: the instance being restarted
246

  
247
    """
248
    idata = self._data["instance"]
249

  
250
    if instance.name not in idata:
251
      inst = idata[instance.name] = {}
252
    else:
253
      inst = idata[instance.name]
254

  
255
    inst[KEY_RESTART_WHEN] = time.time()
256
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
257

  
258
  def RemoveInstance(self, instance):
259
    """Update state to reflect that a machine is running.
260

  
261
    This method removes the record for a named instance (as we only
262
    track down instances).
263

  
264
    @type instance: L{Instance}
265
    @param instance: the instance to remove from books
266

  
267
    """
268
    idata = self._data["instance"]
269

  
270
    if instance.name in idata:
271
      del idata[instance.name]
272

  
273

  
274 114
class Instance(object):
275 115
  """Abstraction for a Virtual Machine instance.
276 116

  
277 117
  """
278
  def __init__(self, name, state, autostart, snodes):
118
  def __init__(self, name, status, autostart, snodes):
279 119
    self.name = name
280
    self.state = state
120
    self.status = status
281 121
    self.autostart = autostart
282 122
    self.snodes = snodes
283 123

  
......
431 271
    notepad.MaintainInstanceList(self.instances.keys())
432 272

  
433 273
    for instance in self.instances.values():
434
      if instance.state in BAD_STATES:
274
      if instance.status in BAD_STATES:
435 275
        n = notepad.NumberOfRestartAttempts(instance)
436 276

  
437 277
        if n > MAXTRIES:
......
454 294
                            instance.name)
455 295

  
456 296
        notepad.RecordRestartAttempt(instance)
457
      elif instance.state in HELPLESS_STATES:
297
      elif instance.status in HELPLESS_STATES:
458 298
        if notepad.NumberOfRestartAttempts(instance):
459 299
          notepad.RemoveInstance(instance)
460 300
      else:
......
519 359
    job = []
520 360
    for name in offline_disk_instances:
521 361
      instance = self.instances[name]
522
      if (instance.state in HELPLESS_STATES or
362
      if (instance.status in HELPLESS_STATES or
523 363
          self._CheckForOfflineNodes(instance)):
524 364
        logging.info("Skip instance %s because it is in helpless state or has"
525 365
                     " one offline secondary", name)
......
535 375
        logging.exception("Error while activating disks")
536 376

  
537 377

  
538
def OpenStateFile(path):
539
  """Opens the state file and acquires a lock on it.
540

  
541
  @type path: string
542
  @param path: Path to state file
543

  
544
  """
545
  # The two-step dance below is necessary to allow both opening existing
546
  # file read/write and creating if not existing. Vanilla open will truncate
547
  # an existing file -or- allow creating if not existing.
548
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
549

  
550
  # Try to acquire lock on state file. If this fails, another watcher instance
551
  # might already be running or another program is temporarily blocking the
552
  # watcher from running.
553
  try:
554
    utils.LockFile(statefile_fd)
555
  except errors.LockError, err:
556
    logging.error("Can't acquire lock on state file %s: %s", path, err)
557
    return None
558

  
559
  return os.fdopen(statefile_fd, "w+")
560

  
561

  
562 378
def IsRapiResponding(hostname):
563 379
  """Connects to RAPI port and does a simple test.
564 380

  
......
628 444
    logging.debug("Pause has been set, exiting")
629 445
    return constants.EXIT_SUCCESS
630 446

  
631
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
447
  statefile = \
448
    state.OpenStateFile(constants.WATCHER_STATEFILE)
632 449
  if not statefile:
633 450
    return constants.EXIT_FAILURE
634 451

  
......
638 455
    RunWatcherHooks()
639 456
    # run node maintenance in all cases, even if master, so that old
640 457
    # masters can be properly cleaned up too
641
    if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
642
      nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
458
    if nodemaint.NodeMaintenance.ShouldRun():
459
      nodemaint.NodeMaintenance().Exec()
643 460

  
644
    notepad = WatcherState(statefile)
461
    notepad = state.WatcherState(statefile)
645 462
    try:
646 463
      try:
647 464
        client = cli.GetClient()

Also available in: Unified diff