Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ ffeffa1d

History | View | Annotate | Download (6.5 kB)

1 ffeffa1d Iustin Pop
#!/usr/bin/python
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 ffeffa1d Iustin Pop
30 ffeffa1d Iustin Pop
import SocketServer
31 ffeffa1d Iustin Pop
import threading
32 ffeffa1d Iustin Pop
import time
33 ffeffa1d Iustin Pop
import collections
34 ffeffa1d Iustin Pop
import Queue
35 ffeffa1d Iustin Pop
import random
36 ffeffa1d Iustin Pop
import signal
37 ffeffa1d Iustin Pop
import simplejson
38 ffeffa1d Iustin Pop
39 ffeffa1d Iustin Pop
40 ffeffa1d Iustin Pop
from cStringIO import StringIO
41 ffeffa1d Iustin Pop
42 ffeffa1d Iustin Pop
from ganeti import constants
43 ffeffa1d Iustin Pop
from ganeti import mcpu
44 ffeffa1d Iustin Pop
from ganeti import opcodes
45 ffeffa1d Iustin Pop
from ganeti import jqueue
46 ffeffa1d Iustin Pop
from ganeti import luxi
47 ffeffa1d Iustin Pop
from ganeti import utils
48 ffeffa1d Iustin Pop
49 ffeffa1d Iustin Pop
50 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
51 ffeffa1d Iustin Pop
  """IO thread class.
52 ffeffa1d Iustin Pop
53 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
54 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
55 ffeffa1d Iustin Pop
  cleanup at shutdown.
56 ffeffa1d Iustin Pop
57 ffeffa1d Iustin Pop
  """
58 ffeffa1d Iustin Pop
  QUEUE_PROCESSOR_SIZE = 1
59 ffeffa1d Iustin Pop
60 ffeffa1d Iustin Pop
  def __init__(self, address, rqhandler):
61 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
62 ffeffa1d Iustin Pop
    self.do_quit = False
63 ffeffa1d Iustin Pop
    self.queue = jqueue.QueueManager()
64 ffeffa1d Iustin Pop
    self.processors = []
65 ffeffa1d Iustin Pop
    for i in range(self.QUEUE_PROCESSOR_SIZE):
66 ffeffa1d Iustin Pop
      self.processors.append(threading.Thread(target=PoolWorker,
67 ffeffa1d Iustin Pop
                                              args=(i, self.queue.new_queue)))
68 ffeffa1d Iustin Pop
    for t in self.processors:
69 ffeffa1d Iustin Pop
      t.start()
70 ffeffa1d Iustin Pop
    signal.signal(signal.SIGINT, self.handle_sigint)
71 ffeffa1d Iustin Pop
72 ffeffa1d Iustin Pop
  def process_request_thread(self, request, client_address):
73 ffeffa1d Iustin Pop
    """Process the request.
74 ffeffa1d Iustin Pop
75 ffeffa1d Iustin Pop
    This is copied from the code in ThreadingMixIn.
76 ffeffa1d Iustin Pop
77 ffeffa1d Iustin Pop
    """
78 ffeffa1d Iustin Pop
    try:
79 ffeffa1d Iustin Pop
      self.finish_request(request, client_address)
80 ffeffa1d Iustin Pop
      self.close_request(request)
81 ffeffa1d Iustin Pop
    except:
82 ffeffa1d Iustin Pop
      self.handle_error(request, client_address)
83 ffeffa1d Iustin Pop
      self.close_request(request)
84 ffeffa1d Iustin Pop
85 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
86 ffeffa1d Iustin Pop
    """Start a new thread to process the request.
87 ffeffa1d Iustin Pop
88 ffeffa1d Iustin Pop
    This is copied from the coode in ThreadingMixIn.
89 ffeffa1d Iustin Pop
90 ffeffa1d Iustin Pop
    """
91 ffeffa1d Iustin Pop
    t = threading.Thread(target=self.process_request_thread,
92 ffeffa1d Iustin Pop
                         args=(request, client_address))
93 ffeffa1d Iustin Pop
    t.start()
94 ffeffa1d Iustin Pop
95 ffeffa1d Iustin Pop
  def handle_sigint(self, signum, frame):
96 ffeffa1d Iustin Pop
    print "received %s in %s" % (signum, frame)
97 ffeffa1d Iustin Pop
    self.do_quit = True
98 ffeffa1d Iustin Pop
    self.server_close()
99 ffeffa1d Iustin Pop
    for i in range(self.QUEUE_PROCESSOR_SIZE):
100 ffeffa1d Iustin Pop
      self.queue.new_queue.put(None)
101 ffeffa1d Iustin Pop
102 ffeffa1d Iustin Pop
  def serve_forever(self):
103 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
104 ffeffa1d Iustin Pop
    while not self.do_quit:
105 ffeffa1d Iustin Pop
      self.handle_request()
106 ffeffa1d Iustin Pop
107 ffeffa1d Iustin Pop
108 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
109 ffeffa1d Iustin Pop
  """Client handler"""
110 ffeffa1d Iustin Pop
  EOM = '\3'
111 ffeffa1d Iustin Pop
  READ_SIZE = 4096
112 ffeffa1d Iustin Pop
113 ffeffa1d Iustin Pop
  def setup(self):
114 ffeffa1d Iustin Pop
    self._buffer = ""
115 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
116 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
117 ffeffa1d Iustin Pop
118 ffeffa1d Iustin Pop
  def handle(self):
119 ffeffa1d Iustin Pop
    while True:
120 ffeffa1d Iustin Pop
      msg = self.read_message()
121 ffeffa1d Iustin Pop
      if msg is None:
122 ffeffa1d Iustin Pop
        print "client closed connection"
123 ffeffa1d Iustin Pop
        break
124 ffeffa1d Iustin Pop
      request = simplejson.loads(msg)
125 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
126 ffeffa1d Iustin Pop
        print "wrong request received: %s" % msg
127 ffeffa1d Iustin Pop
        break
128 ffeffa1d Iustin Pop
      method = request.get('request', None)
129 ffeffa1d Iustin Pop
      data = request.get('data', None)
130 ffeffa1d Iustin Pop
      if method is None or data is None:
131 ffeffa1d Iustin Pop
        print "no method or data in request"
132 ffeffa1d Iustin Pop
        break
133 ffeffa1d Iustin Pop
      print "request:", method, data
134 ffeffa1d Iustin Pop
      result = self._ops.handle_request(method, data)
135 ffeffa1d Iustin Pop
      print "result:", result
136 ffeffa1d Iustin Pop
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
137 ffeffa1d Iustin Pop
138 ffeffa1d Iustin Pop
  def read_message(self):
139 ffeffa1d Iustin Pop
    while not self._msgs:
140 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
141 ffeffa1d Iustin Pop
      if not data:
142 ffeffa1d Iustin Pop
        return None
143 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
144 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
145 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
146 ffeffa1d Iustin Pop
    return self._msgs.popleft()
147 ffeffa1d Iustin Pop
148 ffeffa1d Iustin Pop
  def send_message(self, msg):
149 ffeffa1d Iustin Pop
    #print "sending", msg
150 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
151 ffeffa1d Iustin Pop
152 ffeffa1d Iustin Pop
153 ffeffa1d Iustin Pop
class ClientOps:
154 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
155 ffeffa1d Iustin Pop
  def __init__(self, server):
156 ffeffa1d Iustin Pop
    self.server = server
157 ffeffa1d Iustin Pop
    self._cpu = None
158 ffeffa1d Iustin Pop
159 ffeffa1d Iustin Pop
  def _getcpu(self):
160 ffeffa1d Iustin Pop
    if self._cpu is None:
161 ffeffa1d Iustin Pop
      self._cpu = mcpu.Processor(lambda x: None)
162 ffeffa1d Iustin Pop
    return self._cpu
163 ffeffa1d Iustin Pop
164 ffeffa1d Iustin Pop
  def handle_request(self, operation, args):
165 ffeffa1d Iustin Pop
    print operation, args
166 ffeffa1d Iustin Pop
    if operation == "submit":
167 ffeffa1d Iustin Pop
      return self.put(args)
168 ffeffa1d Iustin Pop
    elif operation == "query":
169 ffeffa1d Iustin Pop
      path = args["object"]
170 ffeffa1d Iustin Pop
      if path == "instances":
171 ffeffa1d Iustin Pop
        return self.query(args)
172 ffeffa1d Iustin Pop
    else:
173 ffeffa1d Iustin Pop
      raise ValueError("Invalid operation")
174 ffeffa1d Iustin Pop
175 ffeffa1d Iustin Pop
  def put(self, args):
176 ffeffa1d Iustin Pop
    job = luxi.UnserializeJob(args)
177 ffeffa1d Iustin Pop
    rid = self.server.queue.put(job)
178 ffeffa1d Iustin Pop
    return rid
179 ffeffa1d Iustin Pop
180 ffeffa1d Iustin Pop
  def query(self, args):
181 ffeffa1d Iustin Pop
    path = args["object"]
182 ffeffa1d Iustin Pop
    fields = args["fields"]
183 ffeffa1d Iustin Pop
    names = args["names"]
184 ffeffa1d Iustin Pop
    if path == "instances":
185 ffeffa1d Iustin Pop
      opclass = opcodes.OpQueryInstances
186 ffeffa1d Iustin Pop
    else:
187 ffeffa1d Iustin Pop
      raise ValueError("Invalid object %s" % path)
188 ffeffa1d Iustin Pop
189 ffeffa1d Iustin Pop
    op = opclass(output_fields = fields, names=names)
190 ffeffa1d Iustin Pop
    cpu = self._getcpu()
191 ffeffa1d Iustin Pop
    result = cpu.ExecOpCode(op)
192 ffeffa1d Iustin Pop
    return result
193 ffeffa1d Iustin Pop
194 ffeffa1d Iustin Pop
  def query_job(self, rid):
195 ffeffa1d Iustin Pop
    rid = int(data)
196 ffeffa1d Iustin Pop
    job = self.server.queue.query(rid)
197 ffeffa1d Iustin Pop
    return job
198 ffeffa1d Iustin Pop
199 ffeffa1d Iustin Pop
200 ffeffa1d Iustin Pop
def JobRunner(proc, job):
201 ffeffa1d Iustin Pop
  """Job executor.
202 ffeffa1d Iustin Pop
203 ffeffa1d Iustin Pop
  This functions processes a single job in the context of given
204 ffeffa1d Iustin Pop
  processor instance.
205 ffeffa1d Iustin Pop
206 ffeffa1d Iustin Pop
  """
207 ffeffa1d Iustin Pop
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
208 ffeffa1d Iustin Pop
  for op in job.data.op_list:
209 ffeffa1d Iustin Pop
    proc.ExecOpCode(op)
210 ffeffa1d Iustin Pop
  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
211 ffeffa1d Iustin Pop
212 ffeffa1d Iustin Pop
213 ffeffa1d Iustin Pop
def PoolWorker(worker_id, incoming_queue):
214 ffeffa1d Iustin Pop
  """A worker thread function.
215 ffeffa1d Iustin Pop
216 ffeffa1d Iustin Pop
  This is the actual processor of a single thread of Job execution.
217 ffeffa1d Iustin Pop
218 ffeffa1d Iustin Pop
  """
219 ffeffa1d Iustin Pop
  while True:
220 ffeffa1d Iustin Pop
    print "worker %s sleeping" % worker_id
221 ffeffa1d Iustin Pop
    item = incoming_queue.get(True)
222 ffeffa1d Iustin Pop
    if item is None:
223 ffeffa1d Iustin Pop
      break
224 ffeffa1d Iustin Pop
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
225 ffeffa1d Iustin Pop
    utils.Lock('cmd')
226 ffeffa1d Iustin Pop
    try:
227 ffeffa1d Iustin Pop
      proc = mcpu.Processor(feedback=lambda x: None)
228 ffeffa1d Iustin Pop
      try:
229 ffeffa1d Iustin Pop
        JobRunner(proc, item)
230 ffeffa1d Iustin Pop
      except errors.GenericError, err:
231 ffeffa1d Iustin Pop
        print "ganeti exception %s" % err
232 ffeffa1d Iustin Pop
    finally:
233 ffeffa1d Iustin Pop
      utils.Unlock('cmd')
234 ffeffa1d Iustin Pop
      utils.LockCleanup()
235 ffeffa1d Iustin Pop
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
236 ffeffa1d Iustin Pop
  print "worker %s exiting" % worker_id
237 ffeffa1d Iustin Pop
238 ffeffa1d Iustin Pop
239 ffeffa1d Iustin Pop
def main():
240 ffeffa1d Iustin Pop
  """Main function"""
241 ffeffa1d Iustin Pop
242 ffeffa1d Iustin Pop
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
243 ffeffa1d Iustin Pop
  master.serve_forever()
244 ffeffa1d Iustin Pop
245 ffeffa1d Iustin Pop
246 ffeffa1d Iustin Pop
if __name__ == "__main__":
247 ffeffa1d Iustin Pop
  main()