Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ b74159ee

History | View | Annotate | Download (9.8 kB)

1 685ee993 Iustin Pop
#!/usr/bin/python -u
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 ffeffa1d Iustin Pop
30 c1f2901b Iustin Pop
import sys
31 ffeffa1d Iustin Pop
import SocketServer
32 ffeffa1d Iustin Pop
import threading
33 ffeffa1d Iustin Pop
import time
34 ffeffa1d Iustin Pop
import collections
35 ffeffa1d Iustin Pop
import Queue
36 ffeffa1d Iustin Pop
import random
37 ffeffa1d Iustin Pop
import signal
38 ffeffa1d Iustin Pop
import simplejson
39 ffeffa1d Iustin Pop
40 ffeffa1d Iustin Pop
41 ffeffa1d Iustin Pop
from cStringIO import StringIO
42 c1f2901b Iustin Pop
from optparse import OptionParser
43 ffeffa1d Iustin Pop
44 ffeffa1d Iustin Pop
from ganeti import constants
45 ffeffa1d Iustin Pop
from ganeti import mcpu
46 ffeffa1d Iustin Pop
from ganeti import opcodes
47 ffeffa1d Iustin Pop
from ganeti import jqueue
48 ffeffa1d Iustin Pop
from ganeti import luxi
49 ffeffa1d Iustin Pop
from ganeti import utils
50 c1f2901b Iustin Pop
from ganeti import errors
51 c1f2901b Iustin Pop
from ganeti import ssconf
52 c1f2901b Iustin Pop
53 c1f2901b Iustin Pop
54 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56 ffeffa1d Iustin Pop
57 ffeffa1d Iustin Pop
58 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
59 ffeffa1d Iustin Pop
  """IO thread class.
60 ffeffa1d Iustin Pop
61 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
62 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
63 ffeffa1d Iustin Pop
  cleanup at shutdown.
64 ffeffa1d Iustin Pop
65 ffeffa1d Iustin Pop
  """
66 ffeffa1d Iustin Pop
  QUEUE_PROCESSOR_SIZE = 1
67 ffeffa1d Iustin Pop
68 ffeffa1d Iustin Pop
  def __init__(self, address, rqhandler):
69 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
70 ffeffa1d Iustin Pop
    self.do_quit = False
71 ffeffa1d Iustin Pop
    self.queue = jqueue.QueueManager()
72 ffeffa1d Iustin Pop
    self.processors = []
73 c1f2901b Iustin Pop
    signal.signal(signal.SIGINT, self.handle_quit_signals)
74 c1f2901b Iustin Pop
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
75 c1f2901b Iustin Pop
76 c1f2901b Iustin Pop
  def setup_processors(self):
77 c1f2901b Iustin Pop
    """Spawn the processors threads.
78 c1f2901b Iustin Pop
79 c1f2901b Iustin Pop
    This initializes the queue and the thread processors. It is done
80 c1f2901b Iustin Pop
    separately from the constructor because we want the clone()
81 c1f2901b Iustin Pop
    syscalls to happen after the daemonize part.
82 c1f2901b Iustin Pop
83 c1f2901b Iustin Pop
    """
84 ffeffa1d Iustin Pop
    for i in range(self.QUEUE_PROCESSOR_SIZE):
85 ffeffa1d Iustin Pop
      self.processors.append(threading.Thread(target=PoolWorker,
86 ffeffa1d Iustin Pop
                                              args=(i, self.queue.new_queue)))
87 ffeffa1d Iustin Pop
    for t in self.processors:
88 ffeffa1d Iustin Pop
      t.start()
89 ffeffa1d Iustin Pop
90 ffeffa1d Iustin Pop
  def process_request_thread(self, request, client_address):
91 ffeffa1d Iustin Pop
    """Process the request.
92 ffeffa1d Iustin Pop
93 ffeffa1d Iustin Pop
    This is copied from the code in ThreadingMixIn.
94 ffeffa1d Iustin Pop
95 ffeffa1d Iustin Pop
    """
96 ffeffa1d Iustin Pop
    try:
97 ffeffa1d Iustin Pop
      self.finish_request(request, client_address)
98 ffeffa1d Iustin Pop
      self.close_request(request)
99 ffeffa1d Iustin Pop
    except:
100 ffeffa1d Iustin Pop
      self.handle_error(request, client_address)
101 ffeffa1d Iustin Pop
      self.close_request(request)
102 ffeffa1d Iustin Pop
103 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
104 ffeffa1d Iustin Pop
    """Start a new thread to process the request.
105 ffeffa1d Iustin Pop
106 ffeffa1d Iustin Pop
    This is copied from the coode in ThreadingMixIn.
107 ffeffa1d Iustin Pop
108 ffeffa1d Iustin Pop
    """
109 ffeffa1d Iustin Pop
    t = threading.Thread(target=self.process_request_thread,
110 ffeffa1d Iustin Pop
                         args=(request, client_address))
111 ffeffa1d Iustin Pop
    t.start()
112 ffeffa1d Iustin Pop
113 c1f2901b Iustin Pop
  def handle_quit_signals(self, signum, frame):
114 ffeffa1d Iustin Pop
    print "received %s in %s" % (signum, frame)
115 ffeffa1d Iustin Pop
    self.do_quit = True
116 ffeffa1d Iustin Pop
117 ffeffa1d Iustin Pop
  def serve_forever(self):
118 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
119 ffeffa1d Iustin Pop
    while not self.do_quit:
120 ffeffa1d Iustin Pop
      self.handle_request()
121 c1f2901b Iustin Pop
      print "served request, quit=%s" % (self.do_quit)
122 c1f2901b Iustin Pop
123 c1f2901b Iustin Pop
  def server_cleanup(self):
124 c1f2901b Iustin Pop
    """Cleanup the server.
125 c1f2901b Iustin Pop
126 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
127 c1f2901b Iustin Pop
    socket.
128 c1f2901b Iustin Pop
129 c1f2901b Iustin Pop
    """
130 c1f2901b Iustin Pop
    self.server_close()
131 c1f2901b Iustin Pop
    utils.RemoveFile(constants.MASTER_SOCKET)
132 c1f2901b Iustin Pop
    for i in range(self.QUEUE_PROCESSOR_SIZE):
133 c1f2901b Iustin Pop
      self.queue.new_queue.put(None)
134 c1f2901b Iustin Pop
    for idx, t in enumerate(self.processors):
135 c1f2901b Iustin Pop
      print "waiting for processor thread %s..." % idx
136 c1f2901b Iustin Pop
      t.join()
137 c1f2901b Iustin Pop
    print "done threads"
138 ffeffa1d Iustin Pop
139 ffeffa1d Iustin Pop
140 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
141 ffeffa1d Iustin Pop
  """Client handler"""
142 ffeffa1d Iustin Pop
  EOM = '\3'
143 ffeffa1d Iustin Pop
  READ_SIZE = 4096
144 ffeffa1d Iustin Pop
145 ffeffa1d Iustin Pop
  def setup(self):
146 ffeffa1d Iustin Pop
    self._buffer = ""
147 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
148 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
149 ffeffa1d Iustin Pop
150 ffeffa1d Iustin Pop
  def handle(self):
151 ffeffa1d Iustin Pop
    while True:
152 ffeffa1d Iustin Pop
      msg = self.read_message()
153 ffeffa1d Iustin Pop
      if msg is None:
154 ffeffa1d Iustin Pop
        print "client closed connection"
155 ffeffa1d Iustin Pop
        break
156 ffeffa1d Iustin Pop
      request = simplejson.loads(msg)
157 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
158 ffeffa1d Iustin Pop
        print "wrong request received: %s" % msg
159 ffeffa1d Iustin Pop
        break
160 ffeffa1d Iustin Pop
      method = request.get('request', None)
161 ffeffa1d Iustin Pop
      data = request.get('data', None)
162 ffeffa1d Iustin Pop
      if method is None or data is None:
163 ffeffa1d Iustin Pop
        print "no method or data in request"
164 ffeffa1d Iustin Pop
        break
165 ffeffa1d Iustin Pop
      print "request:", method, data
166 ffeffa1d Iustin Pop
      result = self._ops.handle_request(method, data)
167 ffeffa1d Iustin Pop
      print "result:", result
168 ffeffa1d Iustin Pop
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
169 ffeffa1d Iustin Pop
170 ffeffa1d Iustin Pop
  def read_message(self):
171 ffeffa1d Iustin Pop
    while not self._msgs:
172 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
173 ffeffa1d Iustin Pop
      if not data:
174 ffeffa1d Iustin Pop
        return None
175 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
176 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
177 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
178 ffeffa1d Iustin Pop
    return self._msgs.popleft()
179 ffeffa1d Iustin Pop
180 ffeffa1d Iustin Pop
  def send_message(self, msg):
181 ffeffa1d Iustin Pop
    #print "sending", msg
182 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
183 ffeffa1d Iustin Pop
184 ffeffa1d Iustin Pop
185 ffeffa1d Iustin Pop
class ClientOps:
186 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
187 ffeffa1d Iustin Pop
  def __init__(self, server):
188 ffeffa1d Iustin Pop
    self.server = server
189 ffeffa1d Iustin Pop
    self._cpu = None
190 ffeffa1d Iustin Pop
191 ffeffa1d Iustin Pop
  def _getcpu(self):
192 ffeffa1d Iustin Pop
    if self._cpu is None:
193 ffeffa1d Iustin Pop
      self._cpu = mcpu.Processor(lambda x: None)
194 ffeffa1d Iustin Pop
    return self._cpu
195 ffeffa1d Iustin Pop
196 ffeffa1d Iustin Pop
  def handle_request(self, operation, args):
197 ffeffa1d Iustin Pop
    print operation, args
198 ffeffa1d Iustin Pop
    if operation == "submit":
199 ffeffa1d Iustin Pop
      return self.put(args)
200 ffeffa1d Iustin Pop
    elif operation == "query":
201 7a1ecaed Iustin Pop
      return self.query(args)
202 ffeffa1d Iustin Pop
    else:
203 ffeffa1d Iustin Pop
      raise ValueError("Invalid operation")
204 ffeffa1d Iustin Pop
205 ffeffa1d Iustin Pop
  def put(self, args):
206 ffeffa1d Iustin Pop
    job = luxi.UnserializeJob(args)
207 ffeffa1d Iustin Pop
    rid = self.server.queue.put(job)
208 ffeffa1d Iustin Pop
    return rid
209 ffeffa1d Iustin Pop
210 ffeffa1d Iustin Pop
  def query(self, args):
211 ffeffa1d Iustin Pop
    path = args["object"]
212 ffeffa1d Iustin Pop
    fields = args["fields"]
213 ffeffa1d Iustin Pop
    names = args["names"]
214 ffeffa1d Iustin Pop
    if path == "instances":
215 ffeffa1d Iustin Pop
      opclass = opcodes.OpQueryInstances
216 7a1ecaed Iustin Pop
    elif path == "jobs":
217 7a1ecaed Iustin Pop
      # early exit because job query-ing is special (not via opcodes)
218 7a1ecaed Iustin Pop
      return self.query_jobs(fields, names)
219 ffeffa1d Iustin Pop
    else:
220 ffeffa1d Iustin Pop
      raise ValueError("Invalid object %s" % path)
221 ffeffa1d Iustin Pop
222 ffeffa1d Iustin Pop
    op = opclass(output_fields = fields, names=names)
223 ffeffa1d Iustin Pop
    cpu = self._getcpu()
224 ffeffa1d Iustin Pop
    result = cpu.ExecOpCode(op)
225 ffeffa1d Iustin Pop
    return result
226 ffeffa1d Iustin Pop
227 7a1ecaed Iustin Pop
  def query_jobs(self, fields, names):
228 7a1ecaed Iustin Pop
    return self.server.queue.query_jobs(fields, names)
229 ffeffa1d Iustin Pop
230 ffeffa1d Iustin Pop
231 ffeffa1d Iustin Pop
def JobRunner(proc, job):
232 ffeffa1d Iustin Pop
  """Job executor.
233 ffeffa1d Iustin Pop
234 ffeffa1d Iustin Pop
  This functions processes a single job in the context of given
235 ffeffa1d Iustin Pop
  processor instance.
236 ffeffa1d Iustin Pop
237 ffeffa1d Iustin Pop
  """
238 ffeffa1d Iustin Pop
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
239 35049ff2 Iustin Pop
  fail = False
240 35049ff2 Iustin Pop
  for idx, op in enumerate(job.data.op_list):
241 35049ff2 Iustin Pop
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
242 35049ff2 Iustin Pop
    try:
243 35049ff2 Iustin Pop
      job.data.op_result[idx] = proc.ExecOpCode(op)
244 35049ff2 Iustin Pop
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
245 35049ff2 Iustin Pop
    except (errors.OpPrereqError, errors.OpExecError), err:
246 35049ff2 Iustin Pop
      fail = True
247 35049ff2 Iustin Pop
      job.data.op_result[idx] = str(err)
248 35049ff2 Iustin Pop
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
249 35049ff2 Iustin Pop
  if fail:
250 35049ff2 Iustin Pop
    job.SetStatus(opcodes.Job.STATUS_FAIL)
251 35049ff2 Iustin Pop
  else:
252 35049ff2 Iustin Pop
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
253 ffeffa1d Iustin Pop
254 ffeffa1d Iustin Pop
255 ffeffa1d Iustin Pop
def PoolWorker(worker_id, incoming_queue):
256 ffeffa1d Iustin Pop
  """A worker thread function.
257 ffeffa1d Iustin Pop
258 ffeffa1d Iustin Pop
  This is the actual processor of a single thread of Job execution.
259 ffeffa1d Iustin Pop
260 ffeffa1d Iustin Pop
  """
261 ffeffa1d Iustin Pop
  while True:
262 ffeffa1d Iustin Pop
    print "worker %s sleeping" % worker_id
263 ffeffa1d Iustin Pop
    item = incoming_queue.get(True)
264 ffeffa1d Iustin Pop
    if item is None:
265 ffeffa1d Iustin Pop
      break
266 ffeffa1d Iustin Pop
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
267 685ee993 Iustin Pop
    #utils.Lock('cmd')
268 ffeffa1d Iustin Pop
    try:
269 ffeffa1d Iustin Pop
      proc = mcpu.Processor(feedback=lambda x: None)
270 ffeffa1d Iustin Pop
      try:
271 ffeffa1d Iustin Pop
        JobRunner(proc, item)
272 ffeffa1d Iustin Pop
      except errors.GenericError, err:
273 ffeffa1d Iustin Pop
        print "ganeti exception %s" % err
274 ffeffa1d Iustin Pop
    finally:
275 685ee993 Iustin Pop
      #utils.Unlock('cmd')
276 685ee993 Iustin Pop
      #utils.LockCleanup()
277 685ee993 Iustin Pop
      pass
278 ffeffa1d Iustin Pop
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
279 ffeffa1d Iustin Pop
  print "worker %s exiting" % worker_id
280 ffeffa1d Iustin Pop
281 ffeffa1d Iustin Pop
282 c1f2901b Iustin Pop
def CheckMaster(debug):
283 c1f2901b Iustin Pop
  """Checks the node setup.
284 c1f2901b Iustin Pop
285 c1f2901b Iustin Pop
  If this is the master, the function will return. Otherwise it will
286 c1f2901b Iustin Pop
  exit with an exit code based on the node status.
287 c1f2901b Iustin Pop
288 c1f2901b Iustin Pop
  """
289 c1f2901b Iustin Pop
  try:
290 c1f2901b Iustin Pop
    ss = ssconf.SimpleStore()
291 c1f2901b Iustin Pop
    master_name = ss.GetMasterNode()
292 c1f2901b Iustin Pop
  except errors.ConfigurationError, err:
293 c1f2901b Iustin Pop
    print "Cluster configuration incomplete: '%s'" % str(err)
294 c1f2901b Iustin Pop
    sys.exit(EXIT_NODESETUP_ERROR)
295 c1f2901b Iustin Pop
296 c1f2901b Iustin Pop
  try:
297 c1f2901b Iustin Pop
    myself = utils.HostInfo()
298 c1f2901b Iustin Pop
  except errors.ResolverError, err:
299 c1f2901b Iustin Pop
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
300 c1f2901b Iustin Pop
    sys.exit(EXIT_NODESETUP_ERROR)
301 c1f2901b Iustin Pop
302 c1f2901b Iustin Pop
  if myself.name != master_name:
303 c1f2901b Iustin Pop
    if debug:
304 c1f2901b Iustin Pop
      sys.stderr.write("Not master, exiting.\n")
305 c1f2901b Iustin Pop
    sys.exit(EXIT_NOTMASTER)
306 c1f2901b Iustin Pop
307 c1f2901b Iustin Pop
308 c1f2901b Iustin Pop
def ParseOptions():
309 c1f2901b Iustin Pop
  """Parse the command line options.
310 c1f2901b Iustin Pop
311 c1f2901b Iustin Pop
  Returns:
312 c1f2901b Iustin Pop
    (options, args) as from OptionParser.parse_args()
313 c1f2901b Iustin Pop
314 c1f2901b Iustin Pop
  """
315 c1f2901b Iustin Pop
  parser = OptionParser(description="Ganeti master daemon",
316 c1f2901b Iustin Pop
                        usage="%prog [-f] [-d]",
317 c1f2901b Iustin Pop
                        version="%%prog (ganeti) %s" %
318 c1f2901b Iustin Pop
                        constants.RELEASE_VERSION)
319 c1f2901b Iustin Pop
320 c1f2901b Iustin Pop
  parser.add_option("-f", "--foreground", dest="fork",
321 c1f2901b Iustin Pop
                    help="Don't detach from the current terminal",
322 c1f2901b Iustin Pop
                    default=True, action="store_false")
323 c1f2901b Iustin Pop
  parser.add_option("-d", "--debug", dest="debug",
324 c1f2901b Iustin Pop
                    help="Enable some debug messages",
325 c1f2901b Iustin Pop
                    default=False, action="store_true")
326 c1f2901b Iustin Pop
  options, args = parser.parse_args()
327 c1f2901b Iustin Pop
  return options, args
328 c1f2901b Iustin Pop
329 c1f2901b Iustin Pop
330 ffeffa1d Iustin Pop
def main():
331 ffeffa1d Iustin Pop
  """Main function"""
332 ffeffa1d Iustin Pop
333 c1f2901b Iustin Pop
  options, args = ParseOptions()
334 c1f2901b Iustin Pop
  utils.debug = options.debug
335 b74159ee Iustin Pop
  utils.no_fork = True
336 c1f2901b Iustin Pop
337 c1f2901b Iustin Pop
  CheckMaster(options.debug)
338 c1f2901b Iustin Pop
339 ffeffa1d Iustin Pop
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
340 ffeffa1d Iustin Pop
341 c1f2901b Iustin Pop
  # become a daemon
342 c1f2901b Iustin Pop
  if options.fork:
343 c1f2901b Iustin Pop
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
344 c1f2901b Iustin Pop
                    noclose_fds=[master.fileno()])
345 c1f2901b Iustin Pop
346 c1f2901b Iustin Pop
  try:
347 a4af651e Iustin Pop
    utils.Lock('cmd', debug=options.debug)
348 a4af651e Iustin Pop
  except errors.LockError, err:
349 a4af651e Iustin Pop
    print >> sys.stderr, str(err)
350 c1f2901b Iustin Pop
    master.server_cleanup()
351 a4af651e Iustin Pop
    return
352 a4af651e Iustin Pop
353 a4af651e Iustin Pop
  try:
354 a4af651e Iustin Pop
    master.setup_processors()
355 a4af651e Iustin Pop
    try:
356 a4af651e Iustin Pop
      master.serve_forever()
357 a4af651e Iustin Pop
    finally:
358 a4af651e Iustin Pop
      master.server_cleanup()
359 a4af651e Iustin Pop
  finally:
360 a4af651e Iustin Pop
    utils.Unlock('cmd')
361 a4af651e Iustin Pop
    utils.LockCleanup()
362 a4af651e Iustin Pop
363 ffeffa1d Iustin Pop
364 ffeffa1d Iustin Pop
if __name__ == "__main__":
365 ffeffa1d Iustin Pop
  main()