root / daemons / ganeti-masterd @ ffeffa1d
History | View | Annotate | Download (6.5 kB)
1 | ffeffa1d | Iustin Pop | #!/usr/bin/python |
---|---|---|---|
2 | ffeffa1d | Iustin Pop | # |
3 | ffeffa1d | Iustin Pop | |
4 | ffeffa1d | Iustin Pop | # Copyright (C) 2006, 2007 Google Inc. |
5 | ffeffa1d | Iustin Pop | # |
6 | ffeffa1d | Iustin Pop | # This program is free software; you can redistribute it and/or modify |
7 | ffeffa1d | Iustin Pop | # it under the terms of the GNU General Public License as published by |
8 | ffeffa1d | Iustin Pop | # the Free Software Foundation; either version 2 of the License, or |
9 | ffeffa1d | Iustin Pop | # (at your option) any later version. |
10 | ffeffa1d | Iustin Pop | # |
11 | ffeffa1d | Iustin Pop | # This program is distributed in the hope that it will be useful, but |
12 | ffeffa1d | Iustin Pop | # WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | ffeffa1d | Iustin Pop | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
14 | ffeffa1d | Iustin Pop | # General Public License for more details. |
15 | ffeffa1d | Iustin Pop | # |
16 | ffeffa1d | Iustin Pop | # You should have received a copy of the GNU General Public License |
17 | ffeffa1d | Iustin Pop | # along with this program; if not, write to the Free Software |
18 | ffeffa1d | Iustin Pop | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
19 | ffeffa1d | Iustin Pop | # 02110-1301, USA. |
20 | ffeffa1d | Iustin Pop | |
21 | ffeffa1d | Iustin Pop | |
22 | ffeffa1d | Iustin Pop | """Master daemon program. |
23 | ffeffa1d | Iustin Pop | |
24 | ffeffa1d | Iustin Pop | Some classes deviates from the standard style guide since the |
25 | ffeffa1d | Iustin Pop | inheritance from parent classes requires it. |
26 | ffeffa1d | Iustin Pop | |
27 | ffeffa1d | Iustin Pop | """ |
28 | ffeffa1d | Iustin Pop | |
29 | ffeffa1d | Iustin Pop | |
30 | ffeffa1d | Iustin Pop | import SocketServer |
31 | ffeffa1d | Iustin Pop | import threading |
32 | ffeffa1d | Iustin Pop | import time |
33 | ffeffa1d | Iustin Pop | import collections |
34 | ffeffa1d | Iustin Pop | import Queue |
35 | ffeffa1d | Iustin Pop | import random |
36 | ffeffa1d | Iustin Pop | import signal |
37 | ffeffa1d | Iustin Pop | import simplejson |
38 | ffeffa1d | Iustin Pop | |
39 | ffeffa1d | Iustin Pop | |
40 | ffeffa1d | Iustin Pop | from cStringIO import StringIO |
41 | ffeffa1d | Iustin Pop | |
42 | ffeffa1d | Iustin Pop | from ganeti import constants |
43 | ffeffa1d | Iustin Pop | from ganeti import mcpu |
44 | ffeffa1d | Iustin Pop | from ganeti import opcodes |
45 | ffeffa1d | Iustin Pop | from ganeti import jqueue |
46 | ffeffa1d | Iustin Pop | from ganeti import luxi |
47 | ffeffa1d | Iustin Pop | from ganeti import utils |
48 | ffeffa1d | Iustin Pop | |
49 | ffeffa1d | Iustin Pop | |
50 | ffeffa1d | Iustin Pop | class IOServer(SocketServer.UnixStreamServer): |
51 | ffeffa1d | Iustin Pop | """IO thread class. |
52 | ffeffa1d | Iustin Pop | |
53 | ffeffa1d | Iustin Pop | This class takes care of initializing the other threads, setting |
54 | ffeffa1d | Iustin Pop | signal handlers (which are processed only in this thread), and doing |
55 | ffeffa1d | Iustin Pop | cleanup at shutdown. |
56 | ffeffa1d | Iustin Pop | |
57 | ffeffa1d | Iustin Pop | """ |
58 | ffeffa1d | Iustin Pop | QUEUE_PROCESSOR_SIZE = 1 |
59 | ffeffa1d | Iustin Pop | |
60 | ffeffa1d | Iustin Pop | def __init__(self, address, rqhandler): |
61 | ffeffa1d | Iustin Pop | SocketServer.UnixStreamServer.__init__(self, address, rqhandler) |
62 | ffeffa1d | Iustin Pop | self.do_quit = False |
63 | ffeffa1d | Iustin Pop | self.queue = jqueue.QueueManager() |
64 | ffeffa1d | Iustin Pop | self.processors = [] |
65 | ffeffa1d | Iustin Pop | for i in range(self.QUEUE_PROCESSOR_SIZE): |
66 | ffeffa1d | Iustin Pop | self.processors.append(threading.Thread(target=PoolWorker, |
67 | ffeffa1d | Iustin Pop | args=(i, self.queue.new_queue))) |
68 | ffeffa1d | Iustin Pop | for t in self.processors: |
69 | ffeffa1d | Iustin Pop | t.start() |
70 | ffeffa1d | Iustin Pop | signal.signal(signal.SIGINT, self.handle_sigint) |
71 | ffeffa1d | Iustin Pop | |
72 | ffeffa1d | Iustin Pop | def process_request_thread(self, request, client_address): |
73 | ffeffa1d | Iustin Pop | """Process the request. |
74 | ffeffa1d | Iustin Pop | |
75 | ffeffa1d | Iustin Pop | This is copied from the code in ThreadingMixIn. |
76 | ffeffa1d | Iustin Pop | |
77 | ffeffa1d | Iustin Pop | """ |
78 | ffeffa1d | Iustin Pop | try: |
79 | ffeffa1d | Iustin Pop | self.finish_request(request, client_address) |
80 | ffeffa1d | Iustin Pop | self.close_request(request) |
81 | ffeffa1d | Iustin Pop | except: |
82 | ffeffa1d | Iustin Pop | self.handle_error(request, client_address) |
83 | ffeffa1d | Iustin Pop | self.close_request(request) |
84 | ffeffa1d | Iustin Pop | |
85 | ffeffa1d | Iustin Pop | def process_request(self, request, client_address): |
86 | ffeffa1d | Iustin Pop | """Start a new thread to process the request. |
87 | ffeffa1d | Iustin Pop | |
88 | ffeffa1d | Iustin Pop | This is copied from the coode in ThreadingMixIn. |
89 | ffeffa1d | Iustin Pop | |
90 | ffeffa1d | Iustin Pop | """ |
91 | ffeffa1d | Iustin Pop | t = threading.Thread(target=self.process_request_thread, |
92 | ffeffa1d | Iustin Pop | args=(request, client_address)) |
93 | ffeffa1d | Iustin Pop | t.start() |
94 | ffeffa1d | Iustin Pop | |
95 | ffeffa1d | Iustin Pop | def handle_sigint(self, signum, frame): |
96 | ffeffa1d | Iustin Pop | print "received %s in %s" % (signum, frame) |
97 | ffeffa1d | Iustin Pop | self.do_quit = True |
98 | ffeffa1d | Iustin Pop | self.server_close() |
99 | ffeffa1d | Iustin Pop | for i in range(self.QUEUE_PROCESSOR_SIZE): |
100 | ffeffa1d | Iustin Pop | self.queue.new_queue.put(None) |
101 | ffeffa1d | Iustin Pop | |
102 | ffeffa1d | Iustin Pop | def serve_forever(self): |
103 | ffeffa1d | Iustin Pop | """Handle one request at a time until told to quit.""" |
104 | ffeffa1d | Iustin Pop | while not self.do_quit: |
105 | ffeffa1d | Iustin Pop | self.handle_request() |
106 | ffeffa1d | Iustin Pop | |
107 | ffeffa1d | Iustin Pop | |
108 | ffeffa1d | Iustin Pop | class ClientRqHandler(SocketServer.BaseRequestHandler): |
109 | ffeffa1d | Iustin Pop | """Client handler""" |
110 | ffeffa1d | Iustin Pop | EOM = '\3' |
111 | ffeffa1d | Iustin Pop | READ_SIZE = 4096 |
112 | ffeffa1d | Iustin Pop | |
113 | ffeffa1d | Iustin Pop | def setup(self): |
114 | ffeffa1d | Iustin Pop | self._buffer = "" |
115 | ffeffa1d | Iustin Pop | self._msgs = collections.deque() |
116 | ffeffa1d | Iustin Pop | self._ops = ClientOps(self.server) |
117 | ffeffa1d | Iustin Pop | |
118 | ffeffa1d | Iustin Pop | def handle(self): |
119 | ffeffa1d | Iustin Pop | while True: |
120 | ffeffa1d | Iustin Pop | msg = self.read_message() |
121 | ffeffa1d | Iustin Pop | if msg is None: |
122 | ffeffa1d | Iustin Pop | print "client closed connection" |
123 | ffeffa1d | Iustin Pop | break |
124 | ffeffa1d | Iustin Pop | request = simplejson.loads(msg) |
125 | ffeffa1d | Iustin Pop | if not isinstance(request, dict): |
126 | ffeffa1d | Iustin Pop | print "wrong request received: %s" % msg |
127 | ffeffa1d | Iustin Pop | break |
128 | ffeffa1d | Iustin Pop | method = request.get('request', None) |
129 | ffeffa1d | Iustin Pop | data = request.get('data', None) |
130 | ffeffa1d | Iustin Pop | if method is None or data is None: |
131 | ffeffa1d | Iustin Pop | print "no method or data in request" |
132 | ffeffa1d | Iustin Pop | break |
133 | ffeffa1d | Iustin Pop | print "request:", method, data |
134 | ffeffa1d | Iustin Pop | result = self._ops.handle_request(method, data) |
135 | ffeffa1d | Iustin Pop | print "result:", result |
136 | ffeffa1d | Iustin Pop | self.send_message(simplejson.dumps({'success': True, 'result': result})) |
137 | ffeffa1d | Iustin Pop | |
138 | ffeffa1d | Iustin Pop | def read_message(self): |
139 | ffeffa1d | Iustin Pop | while not self._msgs: |
140 | ffeffa1d | Iustin Pop | data = self.request.recv(self.READ_SIZE) |
141 | ffeffa1d | Iustin Pop | if not data: |
142 | ffeffa1d | Iustin Pop | return None |
143 | ffeffa1d | Iustin Pop | new_msgs = (self._buffer + data).split(self.EOM) |
144 | ffeffa1d | Iustin Pop | self._buffer = new_msgs.pop() |
145 | ffeffa1d | Iustin Pop | self._msgs.extend(new_msgs) |
146 | ffeffa1d | Iustin Pop | return self._msgs.popleft() |
147 | ffeffa1d | Iustin Pop | |
148 | ffeffa1d | Iustin Pop | def send_message(self, msg): |
149 | ffeffa1d | Iustin Pop | #print "sending", msg |
150 | ffeffa1d | Iustin Pop | self.request.sendall(msg + self.EOM) |
151 | ffeffa1d | Iustin Pop | |
152 | ffeffa1d | Iustin Pop | |
153 | ffeffa1d | Iustin Pop | class ClientOps: |
154 | ffeffa1d | Iustin Pop | """Class holding high-level client operations.""" |
155 | ffeffa1d | Iustin Pop | def __init__(self, server): |
156 | ffeffa1d | Iustin Pop | self.server = server |
157 | ffeffa1d | Iustin Pop | self._cpu = None |
158 | ffeffa1d | Iustin Pop | |
159 | ffeffa1d | Iustin Pop | def _getcpu(self): |
160 | ffeffa1d | Iustin Pop | if self._cpu is None: |
161 | ffeffa1d | Iustin Pop | self._cpu = mcpu.Processor(lambda x: None) |
162 | ffeffa1d | Iustin Pop | return self._cpu |
163 | ffeffa1d | Iustin Pop | |
164 | ffeffa1d | Iustin Pop | def handle_request(self, operation, args): |
165 | ffeffa1d | Iustin Pop | print operation, args |
166 | ffeffa1d | Iustin Pop | if operation == "submit": |
167 | ffeffa1d | Iustin Pop | return self.put(args) |
168 | ffeffa1d | Iustin Pop | elif operation == "query": |
169 | ffeffa1d | Iustin Pop | path = args["object"] |
170 | ffeffa1d | Iustin Pop | if path == "instances": |
171 | ffeffa1d | Iustin Pop | return self.query(args) |
172 | ffeffa1d | Iustin Pop | else: |
173 | ffeffa1d | Iustin Pop | raise ValueError("Invalid operation") |
174 | ffeffa1d | Iustin Pop | |
175 | ffeffa1d | Iustin Pop | def put(self, args): |
176 | ffeffa1d | Iustin Pop | job = luxi.UnserializeJob(args) |
177 | ffeffa1d | Iustin Pop | rid = self.server.queue.put(job) |
178 | ffeffa1d | Iustin Pop | return rid |
179 | ffeffa1d | Iustin Pop | |
180 | ffeffa1d | Iustin Pop | def query(self, args): |
181 | ffeffa1d | Iustin Pop | path = args["object"] |
182 | ffeffa1d | Iustin Pop | fields = args["fields"] |
183 | ffeffa1d | Iustin Pop | names = args["names"] |
184 | ffeffa1d | Iustin Pop | if path == "instances": |
185 | ffeffa1d | Iustin Pop | opclass = opcodes.OpQueryInstances |
186 | ffeffa1d | Iustin Pop | else: |
187 | ffeffa1d | Iustin Pop | raise ValueError("Invalid object %s" % path) |
188 | ffeffa1d | Iustin Pop | |
189 | ffeffa1d | Iustin Pop | op = opclass(output_fields = fields, names=names) |
190 | ffeffa1d | Iustin Pop | cpu = self._getcpu() |
191 | ffeffa1d | Iustin Pop | result = cpu.ExecOpCode(op) |
192 | ffeffa1d | Iustin Pop | return result |
193 | ffeffa1d | Iustin Pop | |
194 | ffeffa1d | Iustin Pop | def query_job(self, rid): |
195 | ffeffa1d | Iustin Pop | rid = int(data) |
196 | ffeffa1d | Iustin Pop | job = self.server.queue.query(rid) |
197 | ffeffa1d | Iustin Pop | return job |
198 | ffeffa1d | Iustin Pop | |
199 | ffeffa1d | Iustin Pop | |
200 | ffeffa1d | Iustin Pop | def JobRunner(proc, job): |
201 | ffeffa1d | Iustin Pop | """Job executor. |
202 | ffeffa1d | Iustin Pop | |
203 | ffeffa1d | Iustin Pop | This functions processes a single job in the context of given |
204 | ffeffa1d | Iustin Pop | processor instance. |
205 | ffeffa1d | Iustin Pop | |
206 | ffeffa1d | Iustin Pop | """ |
207 | ffeffa1d | Iustin Pop | job.SetStatus(opcodes.Job.STATUS_RUNNING) |
208 | ffeffa1d | Iustin Pop | for op in job.data.op_list: |
209 | ffeffa1d | Iustin Pop | proc.ExecOpCode(op) |
210 | ffeffa1d | Iustin Pop | job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK) |
211 | ffeffa1d | Iustin Pop | |
212 | ffeffa1d | Iustin Pop | |
213 | ffeffa1d | Iustin Pop | def PoolWorker(worker_id, incoming_queue): |
214 | ffeffa1d | Iustin Pop | """A worker thread function. |
215 | ffeffa1d | Iustin Pop | |
216 | ffeffa1d | Iustin Pop | This is the actual processor of a single thread of Job execution. |
217 | ffeffa1d | Iustin Pop | |
218 | ffeffa1d | Iustin Pop | """ |
219 | ffeffa1d | Iustin Pop | while True: |
220 | ffeffa1d | Iustin Pop | print "worker %s sleeping" % worker_id |
221 | ffeffa1d | Iustin Pop | item = incoming_queue.get(True) |
222 | ffeffa1d | Iustin Pop | if item is None: |
223 | ffeffa1d | Iustin Pop | break |
224 | ffeffa1d | Iustin Pop | print "worker %s processing job %s" % (worker_id, item.data.job_id) |
225 | ffeffa1d | Iustin Pop | utils.Lock('cmd') |
226 | ffeffa1d | Iustin Pop | try: |
227 | ffeffa1d | Iustin Pop | proc = mcpu.Processor(feedback=lambda x: None) |
228 | ffeffa1d | Iustin Pop | try: |
229 | ffeffa1d | Iustin Pop | JobRunner(proc, item) |
230 | ffeffa1d | Iustin Pop | except errors.GenericError, err: |
231 | ffeffa1d | Iustin Pop | print "ganeti exception %s" % err |
232 | ffeffa1d | Iustin Pop | finally: |
233 | ffeffa1d | Iustin Pop | utils.Unlock('cmd') |
234 | ffeffa1d | Iustin Pop | utils.LockCleanup() |
235 | ffeffa1d | Iustin Pop | print "worker %s finish job %s" % (worker_id, item.data.job_id) |
236 | ffeffa1d | Iustin Pop | print "worker %s exiting" % worker_id |
237 | ffeffa1d | Iustin Pop | |
238 | ffeffa1d | Iustin Pop | |
239 | ffeffa1d | Iustin Pop | def main(): |
240 | ffeffa1d | Iustin Pop | """Main function""" |
241 | ffeffa1d | Iustin Pop | |
242 | ffeffa1d | Iustin Pop | master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) |
243 | ffeffa1d | Iustin Pop | master.serve_forever() |
244 | ffeffa1d | Iustin Pop | |
245 | ffeffa1d | Iustin Pop | |
246 | ffeffa1d | Iustin Pop | if __name__ == "__main__": |
247 | ffeffa1d | Iustin Pop | main() |