4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Master daemon program.
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
40 from cStringIO import StringIO
42 from ganeti import constants
43 from ganeti import mcpu
44 from ganeti import opcodes
45 from ganeti import jqueue
46 from ganeti import luxi
47 from ganeti import utils
50 class IOServer(SocketServer.UnixStreamServer):
53 This class takes care of initializing the other threads, setting
54 signal handlers (which are processed only in this thread), and doing
58 QUEUE_PROCESSOR_SIZE = 1
60 def __init__(self, address, rqhandler):
61 SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
63 self.queue = jqueue.QueueManager()
65 for i in range(self.QUEUE_PROCESSOR_SIZE):
66 self.processors.append(threading.Thread(target=PoolWorker,
67 args=(i, self.queue.new_queue)))
68 for t in self.processors:
70 signal.signal(signal.SIGINT, self.handle_sigint)
72 def process_request_thread(self, request, client_address):
73 """Process the request.
75 This is copied from the code in ThreadingMixIn.
79 self.finish_request(request, client_address)
80 self.close_request(request)
82 self.handle_error(request, client_address)
83 self.close_request(request)
85 def process_request(self, request, client_address):
86 """Start a new thread to process the request.
88 This is copied from the coode in ThreadingMixIn.
91 t = threading.Thread(target=self.process_request_thread,
92 args=(request, client_address))
95 def handle_sigint(self, signum, frame):
96 print "received %s in %s" % (signum, frame)
99 for i in range(self.QUEUE_PROCESSOR_SIZE):
100 self.queue.new_queue.put(None)
102 def serve_forever(self):
103 """Handle one request at a time until told to quit."""
104 while not self.do_quit:
105 self.handle_request()
108 class ClientRqHandler(SocketServer.BaseRequestHandler):
115 self._msgs = collections.deque()
116 self._ops = ClientOps(self.server)
120 msg = self.read_message()
122 print "client closed connection"
124 request = simplejson.loads(msg)
125 if not isinstance(request, dict):
126 print "wrong request received: %s" % msg
128 method = request.get('request', None)
129 data = request.get('data', None)
130 if method is None or data is None:
131 print "no method or data in request"
133 print "request:", method, data
134 result = self._ops.handle_request(method, data)
135 print "result:", result
136 self.send_message(simplejson.dumps({'success': True, 'result': result}))
138 def read_message(self):
139 while not self._msgs:
140 data = self.request.recv(self.READ_SIZE)
143 new_msgs = (self._buffer + data).split(self.EOM)
144 self._buffer = new_msgs.pop()
145 self._msgs.extend(new_msgs)
146 return self._msgs.popleft()
148 def send_message(self, msg):
149 #print "sending", msg
150 self.request.sendall(msg + self.EOM)
154 """Class holding high-level client operations."""
155 def __init__(self, server):
160 if self._cpu is None:
161 self._cpu = mcpu.Processor(lambda x: None)
164 def handle_request(self, operation, args):
165 print operation, args
166 if operation == "submit":
167 return self.put(args)
168 elif operation == "query":
169 path = args["object"]
170 if path == "instances":
171 return self.query(args)
173 raise ValueError("Invalid operation")
176 job = luxi.UnserializeJob(args)
177 rid = self.server.queue.put(job)
180 def query(self, args):
181 path = args["object"]
182 fields = args["fields"]
183 names = args["names"]
184 if path == "instances":
185 opclass = opcodes.OpQueryInstances
187 raise ValueError("Invalid object %s" % path)
189 op = opclass(output_fields = fields, names=names)
191 result = cpu.ExecOpCode(op)
194 def query_job(self, rid):
196 job = self.server.queue.query(rid)
200 def JobRunner(proc, job):
203 This functions processes a single job in the context of given
207 job.SetStatus(opcodes.Job.STATUS_RUNNING)
208 for op in job.data.op_list:
210 job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
213 def PoolWorker(worker_id, incoming_queue):
214 """A worker thread function.
216 This is the actual processor of a single thread of Job execution.
220 print "worker %s sleeping" % worker_id
221 item = incoming_queue.get(True)
224 print "worker %s processing job %s" % (worker_id, item.data.job_id)
227 proc = mcpu.Processor(feedback=lambda x: None)
229 JobRunner(proc, item)
230 except errors.GenericError, err:
231 print "ganeti exception %s" % err
235 print "worker %s finish job %s" % (worker_id, item.data.job_id)
236 print "worker %s exiting" % worker_id
242 master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
243 master.serve_forever()
246 if __name__ == "__main__":