Initial tests with ganeti-masterd
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import SocketServer
31 import threading
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38
39
40 from cStringIO import StringIO
41
42 from ganeti import constants
43 from ganeti import mcpu
44 from ganeti import opcodes
45 from ganeti import jqueue
46 from ganeti import luxi
47 from ganeti import utils
48
49
50 class IOServer(SocketServer.UnixStreamServer):
51   """IO thread class.
52
53   This class takes care of initializing the other threads, setting
54   signal handlers (which are processed only in this thread), and doing
55   cleanup at shutdown.
56
57   """
58   QUEUE_PROCESSOR_SIZE = 1
59
60   def __init__(self, address, rqhandler):
61     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
62     self.do_quit = False
63     self.queue = jqueue.QueueManager()
64     self.processors = []
65     for i in range(self.QUEUE_PROCESSOR_SIZE):
66       self.processors.append(threading.Thread(target=PoolWorker,
67                                               args=(i, self.queue.new_queue)))
68     for t in self.processors:
69       t.start()
70     signal.signal(signal.SIGINT, self.handle_sigint)
71
72   def process_request_thread(self, request, client_address):
73     """Process the request.
74
75     This is copied from the code in ThreadingMixIn.
76
77     """
78     try:
79       self.finish_request(request, client_address)
80       self.close_request(request)
81     except:
82       self.handle_error(request, client_address)
83       self.close_request(request)
84
85   def process_request(self, request, client_address):
86     """Start a new thread to process the request.
87
88     This is copied from the coode in ThreadingMixIn.
89
90     """
91     t = threading.Thread(target=self.process_request_thread,
92                          args=(request, client_address))
93     t.start()
94
95   def handle_sigint(self, signum, frame):
96     print "received %s in %s" % (signum, frame)
97     self.do_quit = True
98     self.server_close()
99     for i in range(self.QUEUE_PROCESSOR_SIZE):
100       self.queue.new_queue.put(None)
101
102   def serve_forever(self):
103     """Handle one request at a time until told to quit."""
104     while not self.do_quit:
105       self.handle_request()
106
107
108 class ClientRqHandler(SocketServer.BaseRequestHandler):
109   """Client handler"""
110   EOM = '\3'
111   READ_SIZE = 4096
112
113   def setup(self):
114     self._buffer = ""
115     self._msgs = collections.deque()
116     self._ops = ClientOps(self.server)
117
118   def handle(self):
119     while True:
120       msg = self.read_message()
121       if msg is None:
122         print "client closed connection"
123         break
124       request = simplejson.loads(msg)
125       if not isinstance(request, dict):
126         print "wrong request received: %s" % msg
127         break
128       method = request.get('request', None)
129       data = request.get('data', None)
130       if method is None or data is None:
131         print "no method or data in request"
132         break
133       print "request:", method, data
134       result = self._ops.handle_request(method, data)
135       print "result:", result
136       self.send_message(simplejson.dumps({'success': True, 'result': result}))
137
138   def read_message(self):
139     while not self._msgs:
140       data = self.request.recv(self.READ_SIZE)
141       if not data:
142         return None
143       new_msgs = (self._buffer + data).split(self.EOM)
144       self._buffer = new_msgs.pop()
145       self._msgs.extend(new_msgs)
146     return self._msgs.popleft()
147
148   def send_message(self, msg):
149     #print "sending", msg
150     self.request.sendall(msg + self.EOM)
151
152
153 class ClientOps:
154   """Class holding high-level client operations."""
155   def __init__(self, server):
156     self.server = server
157     self._cpu = None
158
159   def _getcpu(self):
160     if self._cpu is None:
161       self._cpu = mcpu.Processor(lambda x: None)
162     return self._cpu
163
164   def handle_request(self, operation, args):
165     print operation, args
166     if operation == "submit":
167       return self.put(args)
168     elif operation == "query":
169       path = args["object"]
170       if path == "instances":
171         return self.query(args)
172     else:
173       raise ValueError("Invalid operation")
174
175   def put(self, args):
176     job = luxi.UnserializeJob(args)
177     rid = self.server.queue.put(job)
178     return rid
179
180   def query(self, args):
181     path = args["object"]
182     fields = args["fields"]
183     names = args["names"]
184     if path == "instances":
185       opclass = opcodes.OpQueryInstances
186     else:
187       raise ValueError("Invalid object %s" % path)
188
189     op = opclass(output_fields = fields, names=names)
190     cpu = self._getcpu()
191     result = cpu.ExecOpCode(op)
192     return result
193
194   def query_job(self, rid):
195     rid = int(data)
196     job = self.server.queue.query(rid)
197     return job
198
199
200 def JobRunner(proc, job):
201   """Job executor.
202
203   This functions processes a single job in the context of given
204   processor instance.
205
206   """
207   job.SetStatus(opcodes.Job.STATUS_RUNNING)
208   for op in job.data.op_list:
209     proc.ExecOpCode(op)
210   job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
211
212
213 def PoolWorker(worker_id, incoming_queue):
214   """A worker thread function.
215
216   This is the actual processor of a single thread of Job execution.
217
218   """
219   while True:
220     print "worker %s sleeping" % worker_id
221     item = incoming_queue.get(True)
222     if item is None:
223       break
224     print "worker %s processing job %s" % (worker_id, item.data.job_id)
225     utils.Lock('cmd')
226     try:
227       proc = mcpu.Processor(feedback=lambda x: None)
228       try:
229         JobRunner(proc, item)
230       except errors.GenericError, err:
231         print "ganeti exception %s" % err
232     finally:
233       utils.Unlock('cmd')
234       utils.LockCleanup()
235     print "worker %s finish job %s" % (worker_id, item.data.job_id)
236   print "worker %s exiting" % worker_id
237
238
239 def main():
240   """Main function"""
241
242   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
243   master.serve_forever()
244
245
246 if __name__ == "__main__":
247   main()