Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 9113300d

History | View | Annotate | Download (9.4 kB)

1 685ee993 Iustin Pop
#!/usr/bin/python -u
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 ffeffa1d Iustin Pop
30 c1f2901b Iustin Pop
import sys
31 ffeffa1d Iustin Pop
import SocketServer
32 ffeffa1d Iustin Pop
import time
33 ffeffa1d Iustin Pop
import collections
34 ffeffa1d Iustin Pop
import Queue
35 ffeffa1d Iustin Pop
import random
36 ffeffa1d Iustin Pop
import signal
37 ffeffa1d Iustin Pop
import simplejson
38 96cb3986 Michael Hanselmann
import logging
39 ffeffa1d Iustin Pop
40 ffeffa1d Iustin Pop
from cStringIO import StringIO
41 c1f2901b Iustin Pop
from optparse import OptionParser
42 ffeffa1d Iustin Pop
43 39dcf2ef Guido Trotter
from ganeti import config
44 ffeffa1d Iustin Pop
from ganeti import constants
45 ffeffa1d Iustin Pop
from ganeti import mcpu
46 ffeffa1d Iustin Pop
from ganeti import opcodes
47 ffeffa1d Iustin Pop
from ganeti import jqueue
48 39dcf2ef Guido Trotter
from ganeti import locking
49 ffeffa1d Iustin Pop
from ganeti import luxi
50 ffeffa1d Iustin Pop
from ganeti import utils
51 c1f2901b Iustin Pop
from ganeti import errors
52 c1f2901b Iustin Pop
from ganeti import ssconf
53 96cb3986 Michael Hanselmann
from ganeti import logger
54 23e50d39 Michael Hanselmann
from ganeti import workerpool
55 b1b6ea87 Iustin Pop
from ganeti import rpc
56 c1f2901b Iustin Pop
57 c1f2901b Iustin Pop
58 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
59 23e50d39 Michael Hanselmann
60 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62 ffeffa1d Iustin Pop
63 ffeffa1d Iustin Pop
64 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
65 23e50d39 Michael Hanselmann
  def RunTask(self, server, request, client_address):
66 23e50d39 Michael Hanselmann
    """Process the request.
67 23e50d39 Michael Hanselmann
68 23e50d39 Michael Hanselmann
    This is copied from the code in ThreadingMixIn.
69 23e50d39 Michael Hanselmann
70 23e50d39 Michael Hanselmann
    """
71 23e50d39 Michael Hanselmann
    try:
72 23e50d39 Michael Hanselmann
      server.finish_request(request, client_address)
73 23e50d39 Michael Hanselmann
      server.close_request(request)
74 23e50d39 Michael Hanselmann
    except:
75 23e50d39 Michael Hanselmann
      server.handle_error(request, client_address)
76 23e50d39 Michael Hanselmann
      server.close_request(request)
77 23e50d39 Michael Hanselmann
78 23e50d39 Michael Hanselmann
79 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
80 ffeffa1d Iustin Pop
  """IO thread class.
81 ffeffa1d Iustin Pop
82 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
83 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
84 ffeffa1d Iustin Pop
  cleanup at shutdown.
85 ffeffa1d Iustin Pop
86 ffeffa1d Iustin Pop
  """
87 9113300d Michael Hanselmann
  def __init__(self, address, rqhandler):
88 ce862cd5 Guido Trotter
    """IOServer constructor
89 ce862cd5 Guido Trotter
90 ce862cd5 Guido Trotter
    Args:
91 ce862cd5 Guido Trotter
      address: the address to bind this IOServer to
92 ce862cd5 Guido Trotter
      rqhandler: RequestHandler type object
93 ce862cd5 Guido Trotter
94 ce862cd5 Guido Trotter
    """
95 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96 50a3fbb2 Michael Hanselmann
97 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
98 9113300d Michael Hanselmann
    self.context = None
99 23e50d39 Michael Hanselmann
    self.request_workers = None
100 50a3fbb2 Michael Hanselmann
101 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
102 9113300d Michael Hanselmann
    self.context = GanetiContext()
103 23e50d39 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
105 ffeffa1d Iustin Pop
106 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
107 23e50d39 Michael Hanselmann
    """Add task to workerpool to process request.
108 ffeffa1d Iustin Pop
109 ffeffa1d Iustin Pop
    """
110 23e50d39 Michael Hanselmann
    self.request_workers.AddTask(self, request, client_address)
111 ffeffa1d Iustin Pop
112 ffeffa1d Iustin Pop
  def serve_forever(self):
113 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
114 610bc9ee Michael Hanselmann
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115 610bc9ee Michael Hanselmann
    try:
116 610bc9ee Michael Hanselmann
      while not sighandler.called:
117 610bc9ee Michael Hanselmann
        self.handle_request()
118 610bc9ee Michael Hanselmann
    finally:
119 610bc9ee Michael Hanselmann
      sighandler.Reset()
120 c1f2901b Iustin Pop
121 c1f2901b Iustin Pop
  def server_cleanup(self):
122 c1f2901b Iustin Pop
    """Cleanup the server.
123 c1f2901b Iustin Pop
124 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
125 c1f2901b Iustin Pop
    socket.
126 c1f2901b Iustin Pop
127 c1f2901b Iustin Pop
    """
128 50a3fbb2 Michael Hanselmann
    try:
129 50a3fbb2 Michael Hanselmann
      self.server_close()
130 50a3fbb2 Michael Hanselmann
      utils.RemoveFile(constants.MASTER_SOCKET)
131 50a3fbb2 Michael Hanselmann
    finally:
132 23e50d39 Michael Hanselmann
      if self.request_workers:
133 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
134 9113300d Michael Hanselmann
      if self.context:
135 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
136 ffeffa1d Iustin Pop
137 ffeffa1d Iustin Pop
138 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
139 ffeffa1d Iustin Pop
  """Client handler"""
140 ffeffa1d Iustin Pop
  EOM = '\3'
141 ffeffa1d Iustin Pop
  READ_SIZE = 4096
142 ffeffa1d Iustin Pop
143 ffeffa1d Iustin Pop
  def setup(self):
144 ffeffa1d Iustin Pop
    self._buffer = ""
145 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
146 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
147 ffeffa1d Iustin Pop
148 ffeffa1d Iustin Pop
  def handle(self):
149 ffeffa1d Iustin Pop
    while True:
150 ffeffa1d Iustin Pop
      msg = self.read_message()
151 ffeffa1d Iustin Pop
      if msg is None:
152 3d8548c4 Michael Hanselmann
        logging.info("client closed connection")
153 ffeffa1d Iustin Pop
        break
154 3d8548c4 Michael Hanselmann
155 ffeffa1d Iustin Pop
      request = simplejson.loads(msg)
156 3d8548c4 Michael Hanselmann
      logging.debug("request: %s", request)
157 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
158 3d8548c4 Michael Hanselmann
        logging.error("wrong request received: %s", msg)
159 ffeffa1d Iustin Pop
        break
160 3d8548c4 Michael Hanselmann
161 3d8548c4 Michael Hanselmann
      method = request.get(luxi.KEY_METHOD, None)
162 3d8548c4 Michael Hanselmann
      args = request.get(luxi.KEY_ARGS, None)
163 3d8548c4 Michael Hanselmann
      if method is None or args is None:
164 3d8548c4 Michael Hanselmann
        logging.error("no method or args in request")
165 ffeffa1d Iustin Pop
        break
166 3d8548c4 Michael Hanselmann
167 3d8548c4 Michael Hanselmann
      success = False
168 3d8548c4 Michael Hanselmann
      try:
169 3d8548c4 Michael Hanselmann
        result = self._ops.handle_request(method, args)
170 3d8548c4 Michael Hanselmann
        success = True
171 3d8548c4 Michael Hanselmann
      except:
172 3d8548c4 Michael Hanselmann
        logging.error("Unexpected exception", exc_info=True)
173 3d8548c4 Michael Hanselmann
        err = sys.exc_info()
174 3d8548c4 Michael Hanselmann
        result = "Caught exception: %s" % str(err[1])
175 3d8548c4 Michael Hanselmann
176 3d8548c4 Michael Hanselmann
      response = {
177 3d8548c4 Michael Hanselmann
        luxi.KEY_SUCCESS: success,
178 3d8548c4 Michael Hanselmann
        luxi.KEY_RESULT: result,
179 3d8548c4 Michael Hanselmann
        }
180 3d8548c4 Michael Hanselmann
      logging.debug("response: %s", response)
181 3d8548c4 Michael Hanselmann
      self.send_message(simplejson.dumps(response))
182 ffeffa1d Iustin Pop
183 ffeffa1d Iustin Pop
  def read_message(self):
184 ffeffa1d Iustin Pop
    while not self._msgs:
185 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
186 ffeffa1d Iustin Pop
      if not data:
187 ffeffa1d Iustin Pop
        return None
188 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
189 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
190 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
191 ffeffa1d Iustin Pop
    return self._msgs.popleft()
192 ffeffa1d Iustin Pop
193 ffeffa1d Iustin Pop
  def send_message(self, msg):
194 ffeffa1d Iustin Pop
    #print "sending", msg
195 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
196 ffeffa1d Iustin Pop
197 ffeffa1d Iustin Pop
198 ffeffa1d Iustin Pop
class ClientOps:
199 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
200 ffeffa1d Iustin Pop
  def __init__(self, server):
201 ffeffa1d Iustin Pop
    self.server = server
202 ffeffa1d Iustin Pop
203 0bbe448c Michael Hanselmann
  def handle_request(self, method, args):
204 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
205 0bbe448c Michael Hanselmann
206 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
207 0bbe448c Michael Hanselmann
208 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
209 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
210 c3f0a12f Iustin Pop
      # we need to compute the node list here, since from now on all
211 c3f0a12f Iustin Pop
      # operations require locks on the queue or the storage, and we
212 c3f0a12f Iustin Pop
      # shouldn't get another lock
213 c3f0a12f Iustin Pop
      node_list = self.server.context.cfg.GetNodeList()
214 c3f0a12f Iustin Pop
      return queue.SubmitJob(ops, node_list)
215 ffeffa1d Iustin Pop
216 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
217 3a2c7775 Michael Hanselmann
      job_id = args
218 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
219 ffeffa1d Iustin Pop
220 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
221 3a2c7775 Michael Hanselmann
      job_id = args
222 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
223 0bbe448c Michael Hanselmann
224 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
225 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
226 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
227 0bbe448c Michael Hanselmann
228 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
229 ee6c7b94 Michael Hanselmann
      (names, fields) = args
230 ee6c7b94 Michael Hanselmann
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
231 ee6c7b94 Michael Hanselmann
      return self._Query(op)
232 ee6c7b94 Michael Hanselmann
233 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
234 02f7fe54 Michael Hanselmann
      (names, fields) = args
235 02f7fe54 Michael Hanselmann
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
236 02f7fe54 Michael Hanselmann
      return self._Query(op)
237 02f7fe54 Michael Hanselmann
238 0bbe448c Michael Hanselmann
    else:
239 0bbe448c Michael Hanselmann
      raise ValueError("Invalid operation")
240 ffeffa1d Iustin Pop
241 ee6c7b94 Michael Hanselmann
  def _DummyLog(self, *args):
242 ee6c7b94 Michael Hanselmann
    pass
243 ee6c7b94 Michael Hanselmann
244 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
245 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
246 ee6c7b94 Michael Hanselmann
247 ee6c7b94 Michael Hanselmann
    """
248 ee6c7b94 Michael Hanselmann
    proc = mcpu.Processor(self.server.context)
249 ee6c7b94 Michael Hanselmann
    # TODO: Where should log messages go?
250 ee6c7b94 Michael Hanselmann
    return proc.ExecOpCode(op, self._DummyLog)
251 ee6c7b94 Michael Hanselmann
252 ffeffa1d Iustin Pop
253 39dcf2ef Guido Trotter
class GanetiContext(object):
254 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
255 39dcf2ef Guido Trotter
256 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
257 39dcf2ef Guido Trotter
258 39dcf2ef Guido Trotter
  """
259 39dcf2ef Guido Trotter
  _instance = None
260 39dcf2ef Guido Trotter
261 39dcf2ef Guido Trotter
  def __init__(self):
262 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
263 39dcf2ef Guido Trotter
264 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
265 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
266 39dcf2ef Guido Trotter
267 39dcf2ef Guido Trotter
    """
268 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
269 39dcf2ef Guido Trotter
270 9113300d Michael Hanselmann
    # Create global configuration object
271 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
272 9113300d Michael Hanselmann
273 9113300d Michael Hanselmann
    # Locking manager
274 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
275 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
276 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
277 39dcf2ef Guido Trotter
278 9113300d Michael Hanselmann
    # Job queue
279 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
280 9113300d Michael Hanselmann
281 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
282 39dcf2ef Guido Trotter
    self.__class__._instance = self
283 39dcf2ef Guido Trotter
284 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
285 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
286 39dcf2ef Guido Trotter
287 39dcf2ef Guido Trotter
    """
288 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
289 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
290 39dcf2ef Guido Trotter
291 39dcf2ef Guido Trotter
292 c1f2901b Iustin Pop
def ParseOptions():
293 c1f2901b Iustin Pop
  """Parse the command line options.
294 c1f2901b Iustin Pop
295 c1f2901b Iustin Pop
  Returns:
296 c1f2901b Iustin Pop
    (options, args) as from OptionParser.parse_args()
297 c1f2901b Iustin Pop
298 c1f2901b Iustin Pop
  """
299 c1f2901b Iustin Pop
  parser = OptionParser(description="Ganeti master daemon",
300 c1f2901b Iustin Pop
                        usage="%prog [-f] [-d]",
301 c1f2901b Iustin Pop
                        version="%%prog (ganeti) %s" %
302 c1f2901b Iustin Pop
                        constants.RELEASE_VERSION)
303 c1f2901b Iustin Pop
304 c1f2901b Iustin Pop
  parser.add_option("-f", "--foreground", dest="fork",
305 c1f2901b Iustin Pop
                    help="Don't detach from the current terminal",
306 c1f2901b Iustin Pop
                    default=True, action="store_false")
307 c1f2901b Iustin Pop
  parser.add_option("-d", "--debug", dest="debug",
308 c1f2901b Iustin Pop
                    help="Enable some debug messages",
309 c1f2901b Iustin Pop
                    default=False, action="store_true")
310 c1f2901b Iustin Pop
  options, args = parser.parse_args()
311 c1f2901b Iustin Pop
  return options, args
312 c1f2901b Iustin Pop
313 c1f2901b Iustin Pop
314 ffeffa1d Iustin Pop
def main():
315 ffeffa1d Iustin Pop
  """Main function"""
316 ffeffa1d Iustin Pop
317 c1f2901b Iustin Pop
  options, args = ParseOptions()
318 c1f2901b Iustin Pop
  utils.debug = options.debug
319 b74159ee Iustin Pop
  utils.no_fork = True
320 c1f2901b Iustin Pop
321 5675cd1f Iustin Pop
  ssconf.CheckMaster(options.debug)
322 c1f2901b Iustin Pop
323 9113300d Michael Hanselmann
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
324 ffeffa1d Iustin Pop
325 c1f2901b Iustin Pop
  # become a daemon
326 c1f2901b Iustin Pop
  if options.fork:
327 c1f2901b Iustin Pop
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
328 c1f2901b Iustin Pop
                    noclose_fds=[master.fileno()])
329 c1f2901b Iustin Pop
330 99e88451 Iustin Pop
  utils.WritePidFile(constants.MASTERD_PID)
331 8feda3ad Guido Trotter
332 59f187eb Iustin Pop
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
333 59f187eb Iustin Pop
                      stderr_logging=not options.fork)
334 3b316acb Iustin Pop
335 d4fa5c23 Iustin Pop
  logging.info("ganeti master daemon startup")
336 3b316acb Iustin Pop
337 b1b6ea87 Iustin Pop
  # activate ip
338 b1b6ea87 Iustin Pop
  master_node = ssconf.SimpleStore().GetMasterNode()
339 b1b6ea87 Iustin Pop
  if not rpc.call_node_start_master(master_node, False):
340 b1b6ea87 Iustin Pop
    logging.error("Can't activate master IP address")
341 b1b6ea87 Iustin Pop
342 d4fa5c23 Iustin Pop
  master.setup_queue()
343 c1f2901b Iustin Pop
  try:
344 d4fa5c23 Iustin Pop
    master.serve_forever()
345 a4af651e Iustin Pop
  finally:
346 d4fa5c23 Iustin Pop
    master.server_cleanup()
347 99e88451 Iustin Pop
    utils.RemovePidFile(constants.MASTERD_PID)
348 a4af651e Iustin Pop
349 ffeffa1d Iustin Pop
350 ffeffa1d Iustin Pop
if __name__ == "__main__":
351 ffeffa1d Iustin Pop
  main()