Revision adf6301e

b/Makefile.am
245 245

  
246 246
watcher_PYTHON = \
247 247
	lib/watcher/__init__.py \
248
	lib/watcher/nodemaint.py
248
	lib/watcher/nodemaint.py \
249
	lib/watcher/state.py
249 250

  
250 251
server_PYTHON = \
251 252
	lib/server/__init__.py \
b/lib/watcher/__init__.py
37 37
from ganeti import utils
38 38
from ganeti import constants
39 39
from ganeti import compat
40
from ganeti import serializer
41 40
from ganeti import errors
42 41
from ganeti import opcodes
43 42
from ganeti import cli
......
46 45
from ganeti import netutils
47 46

  
48 47
import ganeti.rapi.client # pylint: disable-msg=W0611
49
import ganeti.watcher.nodemaint # pylint: disable-msg=W0611
48

  
49
from ganeti.watcher import nodemaint
50
from ganeti.watcher import state
50 51

  
51 52

  
52 53
MAXTRIES = 5
53
# Delete any record that is older than 8 hours; this value is based on
54
# the fact that the current retry counter is 5, and watcher runs every
55
# 5 minutes, so it takes around half an hour to exceed the retry
56
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
57
RETRY_EXPIRATION = 8 * 3600
58 54
BAD_STATES = [constants.INSTST_ERRORDOWN]
59 55
HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
60 56
NOTICE = 'NOTICE'
61 57
ERROR = 'ERROR'
62
KEY_RESTART_COUNT = "restart_count"
63
KEY_RESTART_WHEN = "restart_when"
64
KEY_BOOT_ID = "bootid"
65 58

  
66 59

  
67 60
# Global LUXI client object
......
118 111
                      runresult.output)
119 112

  
120 113

  
121
class WatcherState(object):
122
  """Interface to a state file recording restart attempts.
123

  
124
  """
125
  def __init__(self, statefile):
126
    """Open, lock, read and parse the file.
127

  
128
    @type statefile: file
129
    @param statefile: State file object
130

  
131
    """
132
    self.statefile = statefile
133

  
134
    try:
135
      state_data = self.statefile.read()
136
      if not state_data:
137
        self._data = {}
138
      else:
139
        self._data = serializer.Load(state_data)
140
    except Exception, msg: # pylint: disable-msg=W0703
141
      # Ignore errors while loading the file and treat it as empty
142
      self._data = {}
143
      logging.warning(("Invalid state file. Using defaults."
144
                       " Error message: %s"), msg)
145

  
146
    if "instance" not in self._data:
147
      self._data["instance"] = {}
148
    if "node" not in self._data:
149
      self._data["node"] = {}
150

  
151
    self._orig_data = serializer.Dump(self._data)
152

  
153
  def Save(self):
154
    """Save state to file, then unlock and close it.
155

  
156
    """
157
    assert self.statefile
158

  
159
    serialized_form = serializer.Dump(self._data)
160
    if self._orig_data == serialized_form:
161
      logging.debug("Data didn't change, just touching status file")
162
      os.utime(constants.WATCHER_STATEFILE, None)
163
      return
164

  
165
    # We need to make sure the file is locked before renaming it, otherwise
166
    # starting ganeti-watcher again at the same time will create a conflict.
167
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
168
                         data=serialized_form,
169
                         prewrite=utils.LockFile, close=False)
170
    self.statefile = os.fdopen(fd, 'w+')
171

  
172
  def Close(self):
173
    """Unlock configuration file and close it.
174

  
175
    """
176
    assert self.statefile
177

  
178
    # Files are automatically unlocked when closing them
179
    self.statefile.close()
180
    self.statefile = None
181

  
182
  def GetNodeBootID(self, name):
183
    """Returns the last boot ID of a node or None.
184

  
185
    """
186
    ndata = self._data["node"]
187

  
188
    if name in ndata and KEY_BOOT_ID in ndata[name]:
189
      return ndata[name][KEY_BOOT_ID]
190
    return None
191

  
192
  def SetNodeBootID(self, name, bootid):
193
    """Sets the boot ID of a node.
194

  
195
    """
196
    assert bootid
197

  
198
    ndata = self._data["node"]
199

  
200
    if name not in ndata:
201
      ndata[name] = {}
202

  
203
    ndata[name][KEY_BOOT_ID] = bootid
204

  
205
  def NumberOfRestartAttempts(self, instance):
206
    """Returns number of previous restart attempts.
207

  
208
    @type instance: L{Instance}
209
    @param instance: the instance to look up
210

  
211
    """
212
    idata = self._data["instance"]
213

  
214
    if instance.name in idata:
215
      return idata[instance.name][KEY_RESTART_COUNT]
216

  
217
    return 0
218

  
219
  def MaintainInstanceList(self, instances):
220
    """Perform maintenance on the recorded instances.
221

  
222
    @type instances: list of string
223
    @param instances: the list of currently existing instances
224

  
225
    """
226
    idict = self._data["instance"]
227
    # First, delete obsolete instances
228
    obsolete_instances = set(idict).difference(instances)
229
    for inst in obsolete_instances:
230
      logging.debug("Forgetting obsolete instance %s", inst)
231
      del idict[inst]
232

  
233
    # Second, delete expired records
234
    earliest = time.time() - RETRY_EXPIRATION
235
    expired_instances = [i for i in idict
236
                         if idict[i][KEY_RESTART_WHEN] < earliest]
237
    for inst in expired_instances:
238
      logging.debug("Expiring record for instance %s", inst)
239
      del idict[inst]
240

  
241
  def RecordRestartAttempt(self, instance):
242
    """Record a restart attempt.
243

  
244
    @type instance: L{Instance}
245
    @param instance: the instance being restarted
246

  
247
    """
248
    idata = self._data["instance"]
249

  
250
    if instance.name not in idata:
251
      inst = idata[instance.name] = {}
252
    else:
253
      inst = idata[instance.name]
254

  
255
    inst[KEY_RESTART_WHEN] = time.time()
256
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
257

  
258
  def RemoveInstance(self, instance):
259
    """Update state to reflect that a machine is running.
260

  
261
    This method removes the record for a named instance (as we only
262
    track down instances).
263

  
264
    @type instance: L{Instance}
265
    @param instance: the instance to remove from books
266

  
267
    """
268
    idata = self._data["instance"]
269

  
270
    if instance.name in idata:
271
      del idata[instance.name]
272

  
273

  
274 114
class Instance(object):
275 115
  """Abstraction for a Virtual Machine instance.
276 116

  
277 117
  """
278
  def __init__(self, name, state, autostart, snodes):
118
  def __init__(self, name, status, autostart, snodes):
279 119
    self.name = name
280
    self.state = state
120
    self.status = status
281 121
    self.autostart = autostart
282 122
    self.snodes = snodes
283 123

  
......
431 271
    notepad.MaintainInstanceList(self.instances.keys())
432 272

  
433 273
    for instance in self.instances.values():
434
      if instance.state in BAD_STATES:
274
      if instance.status in BAD_STATES:
435 275
        n = notepad.NumberOfRestartAttempts(instance)
436 276

  
437 277
        if n > MAXTRIES:
......
454 294
                            instance.name)
455 295

  
456 296
        notepad.RecordRestartAttempt(instance)
457
      elif instance.state in HELPLESS_STATES:
297
      elif instance.status in HELPLESS_STATES:
458 298
        if notepad.NumberOfRestartAttempts(instance):
459 299
          notepad.RemoveInstance(instance)
460 300
      else:
......
519 359
    job = []
520 360
    for name in offline_disk_instances:
521 361
      instance = self.instances[name]
522
      if (instance.state in HELPLESS_STATES or
362
      if (instance.status in HELPLESS_STATES or
523 363
          self._CheckForOfflineNodes(instance)):
524 364
        logging.info("Skip instance %s because it is in helpless state or has"
525 365
                     " one offline secondary", name)
......
535 375
        logging.exception("Error while activating disks")
536 376

  
537 377

  
538
def OpenStateFile(path):
539
  """Opens the state file and acquires a lock on it.
540

  
541
  @type path: string
542
  @param path: Path to state file
543

  
544
  """
545
  # The two-step dance below is necessary to allow both opening existing
546
  # file read/write and creating if not existing. Vanilla open will truncate
547
  # an existing file -or- allow creating if not existing.
548
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
549

  
550
  # Try to acquire lock on state file. If this fails, another watcher instance
551
  # might already be running or another program is temporarily blocking the
552
  # watcher from running.
553
  try:
554
    utils.LockFile(statefile_fd)
555
  except errors.LockError, err:
556
    logging.error("Can't acquire lock on state file %s: %s", path, err)
557
    return None
558

  
559
  return os.fdopen(statefile_fd, "w+")
560

  
561

  
562 378
def IsRapiResponding(hostname):
563 379
  """Connects to RAPI port and does a simple test.
564 380

  
......
628 444
    logging.debug("Pause has been set, exiting")
629 445
    return constants.EXIT_SUCCESS
630 446

  
631
  statefile = OpenStateFile(constants.WATCHER_STATEFILE)
447
  statefile = \
448
    state.OpenStateFile(constants.WATCHER_STATEFILE)
632 449
  if not statefile:
633 450
    return constants.EXIT_FAILURE
634 451

  
......
638 455
    RunWatcherHooks()
639 456
    # run node maintenance in all cases, even if master, so that old
640 457
    # masters can be properly cleaned up too
641
    if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
642
      nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
458
    if nodemaint.NodeMaintenance.ShouldRun():
459
      nodemaint.NodeMaintenance().Exec()
643 460

  
644
    notepad = WatcherState(statefile)
461
    notepad = state.WatcherState(statefile)
645 462
    try:
646 463
      try:
647 464
        client = cli.GetClient()
b/lib/watcher/state.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Module keeping state for Ganeti watcher.
23

  
24
"""
25

  
26
import os
27
import time
28
import logging
29

  
30
from ganeti import utils
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import errors
34

  
35

  
36
# Delete any record that is older than 8 hours; this value is based on
37
# the fact that the current retry counter is 5, and watcher runs every
38
# 5 minutes, so it takes around half an hour to exceed the retry
39
# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
40
RETRY_EXPIRATION = 8 * 3600
41

  
42
KEY_RESTART_COUNT = "restart_count"
43
KEY_RESTART_WHEN = "restart_when"
44
KEY_BOOT_ID = "bootid"
45

  
46

  
47
def OpenStateFile(path):
48
  """Opens the state file and acquires a lock on it.
49

  
50
  @type path: string
51
  @param path: Path to state file
52

  
53
  """
54
  # The two-step dance below is necessary to allow both opening existing
55
  # file read/write and creating if not existing. Vanilla open will truncate
56
  # an existing file -or- allow creating if not existing.
57
  statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
58

  
59
  # Try to acquire lock on state file. If this fails, another watcher instance
60
  # might already be running or another program is temporarily blocking the
61
  # watcher from running.
62
  try:
63
    utils.LockFile(statefile_fd)
64
  except errors.LockError, err:
65
    logging.error("Can't acquire lock on state file %s: %s", path, err)
66
    return None
67

  
68
  return os.fdopen(statefile_fd, "w+")
69

  
70

  
71
class WatcherState(object):
72
  """Interface to a state file recording restart attempts.
73

  
74
  """
75
  def __init__(self, statefile):
76
    """Open, lock, read and parse the file.
77

  
78
    @type statefile: file
79
    @param statefile: State file object
80

  
81
    """
82
    self.statefile = statefile
83

  
84
    try:
85
      state_data = self.statefile.read()
86
      if not state_data:
87
        self._data = {}
88
      else:
89
        self._data = serializer.Load(state_data)
90
    except Exception, msg: # pylint: disable-msg=W0703
91
      # Ignore errors while loading the file and treat it as empty
92
      self._data = {}
93
      logging.warning(("Invalid state file. Using defaults."
94
                       " Error message: %s"), msg)
95

  
96
    if "instance" not in self._data:
97
      self._data["instance"] = {}
98
    if "node" not in self._data:
99
      self._data["node"] = {}
100

  
101
    self._orig_data = serializer.Dump(self._data)
102

  
103
  def Save(self):
104
    """Save state to file, then unlock and close it.
105

  
106
    """
107
    assert self.statefile
108

  
109
    serialized_form = serializer.Dump(self._data)
110
    if self._orig_data == serialized_form:
111
      logging.debug("Data didn't change, just touching status file")
112
      os.utime(constants.WATCHER_STATEFILE, None)
113
      return
114

  
115
    # We need to make sure the file is locked before renaming it, otherwise
116
    # starting ganeti-watcher again at the same time will create a conflict.
117
    fd = utils.WriteFile(constants.WATCHER_STATEFILE,
118
                         data=serialized_form,
119
                         prewrite=utils.LockFile, close=False)
120
    self.statefile = os.fdopen(fd, 'w+')
121

  
122
  def Close(self):
123
    """Unlock configuration file and close it.
124

  
125
    """
126
    assert self.statefile
127

  
128
    # Files are automatically unlocked when closing them
129
    self.statefile.close()
130
    self.statefile = None
131

  
132
  def GetNodeBootID(self, name):
133
    """Returns the last boot ID of a node or None.
134

  
135
    """
136
    ndata = self._data["node"]
137

  
138
    if name in ndata and KEY_BOOT_ID in ndata[name]:
139
      return ndata[name][KEY_BOOT_ID]
140
    return None
141

  
142
  def SetNodeBootID(self, name, bootid):
143
    """Sets the boot ID of a node.
144

  
145
    """
146
    assert bootid
147

  
148
    ndata = self._data["node"]
149

  
150
    if name not in ndata:
151
      ndata[name] = {}
152

  
153
    ndata[name][KEY_BOOT_ID] = bootid
154

  
155
  def NumberOfRestartAttempts(self, instance):
156
    """Returns number of previous restart attempts.
157

  
158
    @type instance: L{Instance}
159
    @param instance: the instance to look up
160

  
161
    """
162
    idata = self._data["instance"]
163

  
164
    if instance.name in idata:
165
      return idata[instance.name][KEY_RESTART_COUNT]
166

  
167
    return 0
168

  
169
  def MaintainInstanceList(self, instances):
170
    """Perform maintenance on the recorded instances.
171

  
172
    @type instances: list of string
173
    @param instances: the list of currently existing instances
174

  
175
    """
176
    idict = self._data["instance"]
177
    # First, delete obsolete instances
178
    obsolete_instances = set(idict).difference(instances)
179
    for inst in obsolete_instances:
180
      logging.debug("Forgetting obsolete instance %s", inst)
181
      del idict[inst]
182

  
183
    # Second, delete expired records
184
    earliest = time.time() - RETRY_EXPIRATION
185
    expired_instances = [i for i in idict
186
                         if idict[i][KEY_RESTART_WHEN] < earliest]
187
    for inst in expired_instances:
188
      logging.debug("Expiring record for instance %s", inst)
189
      del idict[inst]
190

  
191
  def RecordRestartAttempt(self, instance):
192
    """Record a restart attempt.
193

  
194
    @type instance: L{Instance}
195
    @param instance: the instance being restarted
196

  
197
    """
198
    idata = self._data["instance"]
199

  
200
    if instance.name not in idata:
201
      inst = idata[instance.name] = {}
202
    else:
203
      inst = idata[instance.name]
204

  
205
    inst[KEY_RESTART_WHEN] = time.time()
206
    inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
207

  
208
  def RemoveInstance(self, instance):
209
    """Update state to reflect that a machine is running.
210

  
211
    This method removes the record for a named instance (as we only
212
    track down instances).
213

  
214
    @type instance: L{Instance}
215
    @param instance: the instance to remove from books
216

  
217
    """
218
    idata = self._data["instance"]
219

  
220
    if instance.name in idata:
221
      del idata[instance.name]

Also available in: Unified diff