Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 0ed468d3

History | View | Annotate | Download (9 kB)

1 685ee993 Iustin Pop
#!/usr/bin/python -u
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 ffeffa1d Iustin Pop
30 c1f2901b Iustin Pop
import sys
31 ffeffa1d Iustin Pop
import SocketServer
32 ffeffa1d Iustin Pop
import threading
33 ffeffa1d Iustin Pop
import time
34 ffeffa1d Iustin Pop
import collections
35 ffeffa1d Iustin Pop
import Queue
36 ffeffa1d Iustin Pop
import random
37 ffeffa1d Iustin Pop
import signal
38 ffeffa1d Iustin Pop
import simplejson
39 96cb3986 Michael Hanselmann
import logging
40 ffeffa1d Iustin Pop
41 ffeffa1d Iustin Pop
from cStringIO import StringIO
42 c1f2901b Iustin Pop
from optparse import OptionParser
43 ffeffa1d Iustin Pop
44 39dcf2ef Guido Trotter
from ganeti import config
45 ffeffa1d Iustin Pop
from ganeti import constants
46 ffeffa1d Iustin Pop
from ganeti import mcpu
47 ffeffa1d Iustin Pop
from ganeti import opcodes
48 ffeffa1d Iustin Pop
from ganeti import jqueue
49 39dcf2ef Guido Trotter
from ganeti import locking
50 ffeffa1d Iustin Pop
from ganeti import luxi
51 ffeffa1d Iustin Pop
from ganeti import utils
52 c1f2901b Iustin Pop
from ganeti import errors
53 c1f2901b Iustin Pop
from ganeti import ssconf
54 96cb3986 Michael Hanselmann
from ganeti import logger
55 c1f2901b Iustin Pop
56 c1f2901b Iustin Pop
57 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59 ffeffa1d Iustin Pop
60 ffeffa1d Iustin Pop
61 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
62 ffeffa1d Iustin Pop
  """IO thread class.
63 ffeffa1d Iustin Pop
64 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
65 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
66 ffeffa1d Iustin Pop
  cleanup at shutdown.
67 ffeffa1d Iustin Pop
68 ffeffa1d Iustin Pop
  """
69 39dcf2ef Guido Trotter
  def __init__(self, address, rqhandler, context):
70 ce862cd5 Guido Trotter
    """IOServer constructor
71 ce862cd5 Guido Trotter
72 ce862cd5 Guido Trotter
    Args:
73 ce862cd5 Guido Trotter
      address: the address to bind this IOServer to
74 ce862cd5 Guido Trotter
      rqhandler: RequestHandler type object
75 39dcf2ef Guido Trotter
      context: Context Object common to all worker threads
76 ce862cd5 Guido Trotter
77 ce862cd5 Guido Trotter
    """
78 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
79 ffeffa1d Iustin Pop
    self.do_quit = False
80 39dcf2ef Guido Trotter
    self.context = context
81 50a3fbb2 Michael Hanselmann
82 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
83 50a3fbb2 Michael Hanselmann
    self.jobqueue = None
84 50a3fbb2 Michael Hanselmann
85 c1f2901b Iustin Pop
    signal.signal(signal.SIGINT, self.handle_quit_signals)
86 c1f2901b Iustin Pop
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
87 c1f2901b Iustin Pop
88 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
89 50a3fbb2 Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self.context)
90 50a3fbb2 Michael Hanselmann
91 ffeffa1d Iustin Pop
  def process_request_thread(self, request, client_address):
92 ffeffa1d Iustin Pop
    """Process the request.
93 ffeffa1d Iustin Pop
94 ffeffa1d Iustin Pop
    This is copied from the code in ThreadingMixIn.
95 ffeffa1d Iustin Pop
96 ffeffa1d Iustin Pop
    """
97 ffeffa1d Iustin Pop
    try:
98 ffeffa1d Iustin Pop
      self.finish_request(request, client_address)
99 ffeffa1d Iustin Pop
      self.close_request(request)
100 ffeffa1d Iustin Pop
    except:
101 ffeffa1d Iustin Pop
      self.handle_error(request, client_address)
102 ffeffa1d Iustin Pop
      self.close_request(request)
103 ffeffa1d Iustin Pop
104 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
105 ffeffa1d Iustin Pop
    """Start a new thread to process the request.
106 ffeffa1d Iustin Pop
107 ffeffa1d Iustin Pop
    This is copied from the coode in ThreadingMixIn.
108 ffeffa1d Iustin Pop
109 ffeffa1d Iustin Pop
    """
110 ffeffa1d Iustin Pop
    t = threading.Thread(target=self.process_request_thread,
111 ffeffa1d Iustin Pop
                         args=(request, client_address))
112 ffeffa1d Iustin Pop
    t.start()
113 ffeffa1d Iustin Pop
114 c1f2901b Iustin Pop
  def handle_quit_signals(self, signum, frame):
115 ffeffa1d Iustin Pop
    print "received %s in %s" % (signum, frame)
116 ffeffa1d Iustin Pop
    self.do_quit = True
117 ffeffa1d Iustin Pop
118 ffeffa1d Iustin Pop
  def serve_forever(self):
119 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
120 ffeffa1d Iustin Pop
    while not self.do_quit:
121 ffeffa1d Iustin Pop
      self.handle_request()
122 c1f2901b Iustin Pop
      print "served request, quit=%s" % (self.do_quit)
123 c1f2901b Iustin Pop
124 c1f2901b Iustin Pop
  def server_cleanup(self):
125 c1f2901b Iustin Pop
    """Cleanup the server.
126 c1f2901b Iustin Pop
127 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
128 c1f2901b Iustin Pop
    socket.
129 c1f2901b Iustin Pop
130 c1f2901b Iustin Pop
    """
131 50a3fbb2 Michael Hanselmann
    try:
132 50a3fbb2 Michael Hanselmann
      self.server_close()
133 50a3fbb2 Michael Hanselmann
      utils.RemoveFile(constants.MASTER_SOCKET)
134 50a3fbb2 Michael Hanselmann
    finally:
135 50a3fbb2 Michael Hanselmann
      if self.jobqueue:
136 50a3fbb2 Michael Hanselmann
        self.jobqueue.Shutdown()
137 ffeffa1d Iustin Pop
138 ffeffa1d Iustin Pop
139 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
140 ffeffa1d Iustin Pop
  """Client handler"""
141 ffeffa1d Iustin Pop
  EOM = '\3'
142 ffeffa1d Iustin Pop
  READ_SIZE = 4096
143 ffeffa1d Iustin Pop
144 ffeffa1d Iustin Pop
  def setup(self):
145 ffeffa1d Iustin Pop
    self._buffer = ""
146 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
147 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
148 ffeffa1d Iustin Pop
149 ffeffa1d Iustin Pop
  def handle(self):
150 ffeffa1d Iustin Pop
    while True:
151 ffeffa1d Iustin Pop
      msg = self.read_message()
152 ffeffa1d Iustin Pop
      if msg is None:
153 3d8548c4 Michael Hanselmann
        logging.info("client closed connection")
154 ffeffa1d Iustin Pop
        break
155 3d8548c4 Michael Hanselmann
156 ffeffa1d Iustin Pop
      request = simplejson.loads(msg)
157 3d8548c4 Michael Hanselmann
      logging.debug("request: %s", request)
158 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
159 3d8548c4 Michael Hanselmann
        logging.error("wrong request received: %s", msg)
160 ffeffa1d Iustin Pop
        break
161 3d8548c4 Michael Hanselmann
162 3d8548c4 Michael Hanselmann
      method = request.get(luxi.KEY_METHOD, None)
163 3d8548c4 Michael Hanselmann
      args = request.get(luxi.KEY_ARGS, None)
164 3d8548c4 Michael Hanselmann
      if method is None or args is None:
165 3d8548c4 Michael Hanselmann
        logging.error("no method or args in request")
166 ffeffa1d Iustin Pop
        break
167 3d8548c4 Michael Hanselmann
168 3d8548c4 Michael Hanselmann
      success = False
169 3d8548c4 Michael Hanselmann
      try:
170 3d8548c4 Michael Hanselmann
        result = self._ops.handle_request(method, args)
171 3d8548c4 Michael Hanselmann
        success = True
172 3d8548c4 Michael Hanselmann
      except:
173 3d8548c4 Michael Hanselmann
        logging.error("Unexpected exception", exc_info=True)
174 3d8548c4 Michael Hanselmann
        err = sys.exc_info()
175 3d8548c4 Michael Hanselmann
        result = "Caught exception: %s" % str(err[1])
176 3d8548c4 Michael Hanselmann
177 3d8548c4 Michael Hanselmann
      response = {
178 3d8548c4 Michael Hanselmann
        luxi.KEY_SUCCESS: success,
179 3d8548c4 Michael Hanselmann
        luxi.KEY_RESULT: result,
180 3d8548c4 Michael Hanselmann
        }
181 3d8548c4 Michael Hanselmann
      logging.debug("response: %s", response)
182 3d8548c4 Michael Hanselmann
      self.send_message(simplejson.dumps(response))
183 ffeffa1d Iustin Pop
184 ffeffa1d Iustin Pop
  def read_message(self):
185 ffeffa1d Iustin Pop
    while not self._msgs:
186 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
187 ffeffa1d Iustin Pop
      if not data:
188 ffeffa1d Iustin Pop
        return None
189 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
190 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
191 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
192 ffeffa1d Iustin Pop
    return self._msgs.popleft()
193 ffeffa1d Iustin Pop
194 ffeffa1d Iustin Pop
  def send_message(self, msg):
195 ffeffa1d Iustin Pop
    #print "sending", msg
196 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
197 ffeffa1d Iustin Pop
198 ffeffa1d Iustin Pop
199 ffeffa1d Iustin Pop
class ClientOps:
200 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
201 ffeffa1d Iustin Pop
  def __init__(self, server):
202 ffeffa1d Iustin Pop
    self.server = server
203 ffeffa1d Iustin Pop
204 0bbe448c Michael Hanselmann
  def handle_request(self, method, args):
205 0bbe448c Michael Hanselmann
    queue = self.server.jobqueue
206 0bbe448c Michael Hanselmann
207 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
208 0bbe448c Michael Hanselmann
209 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
210 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211 0bbe448c Michael Hanselmann
      return queue.SubmitJob(ops)
212 ffeffa1d Iustin Pop
213 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
214 0bbe448c Michael Hanselmann
      (job_id, ) = args
215 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
216 ffeffa1d Iustin Pop
217 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
218 0bbe448c Michael Hanselmann
      (job_id, ) = args
219 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
220 0bbe448c Michael Hanselmann
221 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
222 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
223 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
224 0bbe448c Michael Hanselmann
225 0bbe448c Michael Hanselmann
    else:
226 0bbe448c Michael Hanselmann
      raise ValueError("Invalid operation")
227 ffeffa1d Iustin Pop
228 ffeffa1d Iustin Pop
229 39dcf2ef Guido Trotter
class GanetiContext(object):
230 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
231 39dcf2ef Guido Trotter
232 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
233 39dcf2ef Guido Trotter
234 39dcf2ef Guido Trotter
  """
235 39dcf2ef Guido Trotter
  _instance = None
236 39dcf2ef Guido Trotter
237 39dcf2ef Guido Trotter
  def __init__(self):
238 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
239 39dcf2ef Guido Trotter
240 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
241 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
242 39dcf2ef Guido Trotter
243 39dcf2ef Guido Trotter
    """
244 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
245 39dcf2ef Guido Trotter
246 39dcf2ef Guido Trotter
    # Create a ConfigWriter...
247 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
248 39dcf2ef Guido Trotter
    # And a GanetiLockingManager...
249 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
250 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
251 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
252 39dcf2ef Guido Trotter
253 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
254 39dcf2ef Guido Trotter
    self.__class__._instance = self
255 39dcf2ef Guido Trotter
256 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
257 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
258 39dcf2ef Guido Trotter
259 39dcf2ef Guido Trotter
    """
260 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
261 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
262 39dcf2ef Guido Trotter
263 39dcf2ef Guido Trotter
264 c1f2901b Iustin Pop
def CheckMaster(debug):
265 c1f2901b Iustin Pop
  """Checks the node setup.
266 c1f2901b Iustin Pop
267 c1f2901b Iustin Pop
  If this is the master, the function will return. Otherwise it will
268 c1f2901b Iustin Pop
  exit with an exit code based on the node status.
269 c1f2901b Iustin Pop
270 c1f2901b Iustin Pop
  """
271 c1f2901b Iustin Pop
  try:
272 c1f2901b Iustin Pop
    ss = ssconf.SimpleStore()
273 c1f2901b Iustin Pop
    master_name = ss.GetMasterNode()
274 c1f2901b Iustin Pop
  except errors.ConfigurationError, err:
275 c1f2901b Iustin Pop
    print "Cluster configuration incomplete: '%s'" % str(err)
276 c1f2901b Iustin Pop
    sys.exit(EXIT_NODESETUP_ERROR)
277 c1f2901b Iustin Pop
278 c1f2901b Iustin Pop
  try:
279 c1f2901b Iustin Pop
    myself = utils.HostInfo()
280 c1f2901b Iustin Pop
  except errors.ResolverError, err:
281 c1f2901b Iustin Pop
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
282 c1f2901b Iustin Pop
    sys.exit(EXIT_NODESETUP_ERROR)
283 c1f2901b Iustin Pop
284 c1f2901b Iustin Pop
  if myself.name != master_name:
285 c1f2901b Iustin Pop
    if debug:
286 c1f2901b Iustin Pop
      sys.stderr.write("Not master, exiting.\n")
287 c1f2901b Iustin Pop
    sys.exit(EXIT_NOTMASTER)
288 c1f2901b Iustin Pop
289 c1f2901b Iustin Pop
290 c1f2901b Iustin Pop
def ParseOptions():
291 c1f2901b Iustin Pop
  """Parse the command line options.
292 c1f2901b Iustin Pop
293 c1f2901b Iustin Pop
  Returns:
294 c1f2901b Iustin Pop
    (options, args) as from OptionParser.parse_args()
295 c1f2901b Iustin Pop
296 c1f2901b Iustin Pop
  """
297 c1f2901b Iustin Pop
  parser = OptionParser(description="Ganeti master daemon",
298 c1f2901b Iustin Pop
                        usage="%prog [-f] [-d]",
299 c1f2901b Iustin Pop
                        version="%%prog (ganeti) %s" %
300 c1f2901b Iustin Pop
                        constants.RELEASE_VERSION)
301 c1f2901b Iustin Pop
302 c1f2901b Iustin Pop
  parser.add_option("-f", "--foreground", dest="fork",
303 c1f2901b Iustin Pop
                    help="Don't detach from the current terminal",
304 c1f2901b Iustin Pop
                    default=True, action="store_false")
305 c1f2901b Iustin Pop
  parser.add_option("-d", "--debug", dest="debug",
306 c1f2901b Iustin Pop
                    help="Enable some debug messages",
307 c1f2901b Iustin Pop
                    default=False, action="store_true")
308 c1f2901b Iustin Pop
  options, args = parser.parse_args()
309 c1f2901b Iustin Pop
  return options, args
310 c1f2901b Iustin Pop
311 c1f2901b Iustin Pop
312 ffeffa1d Iustin Pop
def main():
313 ffeffa1d Iustin Pop
  """Main function"""
314 ffeffa1d Iustin Pop
315 c1f2901b Iustin Pop
  options, args = ParseOptions()
316 c1f2901b Iustin Pop
  utils.debug = options.debug
317 b74159ee Iustin Pop
  utils.no_fork = True
318 c1f2901b Iustin Pop
319 c1f2901b Iustin Pop
  CheckMaster(options.debug)
320 c1f2901b Iustin Pop
321 39dcf2ef Guido Trotter
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
322 ffeffa1d Iustin Pop
323 c1f2901b Iustin Pop
  # become a daemon
324 c1f2901b Iustin Pop
  if options.fork:
325 c1f2901b Iustin Pop
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
326 c1f2901b Iustin Pop
                    noclose_fds=[master.fileno()])
327 c1f2901b Iustin Pop
328 ff5fac04 Iustin Pop
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug,
329 ff5fac04 Iustin Pop
                     stderr_logging=not options.fork)
330 3b316acb Iustin Pop
331 d4fa5c23 Iustin Pop
  logging.info("ganeti master daemon startup")
332 3b316acb Iustin Pop
333 d4fa5c23 Iustin Pop
  master.setup_queue()
334 c1f2901b Iustin Pop
  try:
335 d4fa5c23 Iustin Pop
    master.serve_forever()
336 a4af651e Iustin Pop
  finally:
337 d4fa5c23 Iustin Pop
    master.server_cleanup()
338 a4af651e Iustin Pop
339 ffeffa1d Iustin Pop
340 ffeffa1d Iustin Pop
if __name__ == "__main__":
341 ffeffa1d Iustin Pop
  main()