Convert rpc module to RpcRunner
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38 import logging
39
40 from cStringIO import StringIO
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55 from ganeti import rpc
56
57
58 CLIENT_REQUEST_WORKERS = 16
59
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62
63
64 class ClientRequestWorker(workerpool.BaseWorker):
65   def RunTask(self, server, request, client_address):
66     """Process the request.
67
68     This is copied from the code in ThreadingMixIn.
69
70     """
71     try:
72       server.finish_request(request, client_address)
73       server.close_request(request)
74     except:
75       server.handle_error(request, client_address)
76       server.close_request(request)
77
78
79 class IOServer(SocketServer.UnixStreamServer):
80   """IO thread class.
81
82   This class takes care of initializing the other threads, setting
83   signal handlers (which are processed only in this thread), and doing
84   cleanup at shutdown.
85
86   """
87   def __init__(self, address, rqhandler):
88     """IOServer constructor
89
90     Args:
91       address: the address to bind this IOServer to
92       rqhandler: RequestHandler type object
93
94     """
95     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96
97     # We'll only start threads once we've forked.
98     self.context = None
99     self.request_workers = None
100
101   def setup_queue(self):
102     self.context = GanetiContext()
103     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104                                                  ClientRequestWorker)
105
106   def process_request(self, request, client_address):
107     """Add task to workerpool to process request.
108
109     """
110     self.request_workers.AddTask(self, request, client_address)
111
112   def serve_forever(self):
113     """Handle one request at a time until told to quit."""
114     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115     try:
116       while not sighandler.called:
117         self.handle_request()
118     finally:
119       sighandler.Reset()
120
121   def server_cleanup(self):
122     """Cleanup the server.
123
124     This involves shutting down the processor threads and the master
125     socket.
126
127     """
128     try:
129       self.server_close()
130     finally:
131       if self.request_workers:
132         self.request_workers.TerminateWorkers()
133       if self.context:
134         self.context.jobqueue.Shutdown()
135
136
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
138   """Client handler"""
139   EOM = '\3'
140   READ_SIZE = 4096
141
142   def setup(self):
143     self._buffer = ""
144     self._msgs = collections.deque()
145     self._ops = ClientOps(self.server)
146
147   def handle(self):
148     while True:
149       msg = self.read_message()
150       if msg is None:
151         logging.info("client closed connection")
152         break
153
154       request = simplejson.loads(msg)
155       logging.debug("request: %s", request)
156       if not isinstance(request, dict):
157         logging.error("wrong request received: %s", msg)
158         break
159
160       method = request.get(luxi.KEY_METHOD, None)
161       args = request.get(luxi.KEY_ARGS, None)
162       if method is None or args is None:
163         logging.error("no method or args in request")
164         break
165
166       success = False
167       try:
168         result = self._ops.handle_request(method, args)
169         success = True
170       except:
171         logging.error("Unexpected exception", exc_info=True)
172         err = sys.exc_info()
173         result = "Caught exception: %s" % str(err[1])
174
175       response = {
176         luxi.KEY_SUCCESS: success,
177         luxi.KEY_RESULT: result,
178         }
179       logging.debug("response: %s", response)
180       self.send_message(simplejson.dumps(response))
181
182   def read_message(self):
183     while not self._msgs:
184       data = self.request.recv(self.READ_SIZE)
185       if not data:
186         return None
187       new_msgs = (self._buffer + data).split(self.EOM)
188       self._buffer = new_msgs.pop()
189       self._msgs.extend(new_msgs)
190     return self._msgs.popleft()
191
192   def send_message(self, msg):
193     #print "sending", msg
194     self.request.sendall(msg + self.EOM)
195
196
197 class ClientOps:
198   """Class holding high-level client operations."""
199   def __init__(self, server):
200     self.server = server
201
202   def handle_request(self, method, args):
203     queue = self.server.context.jobqueue
204
205     # TODO: Parameter validation
206
207     if method == luxi.REQ_SUBMIT_JOB:
208       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
209       return queue.SubmitJob(ops)
210
211     elif method == luxi.REQ_CANCEL_JOB:
212       job_id = args
213       return queue.CancelJob(job_id)
214
215     elif method == luxi.REQ_ARCHIVE_JOB:
216       job_id = args
217       return queue.ArchiveJob(job_id)
218
219     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
220       age = args
221       return queue.AutoArchiveJobs(age)
222
223     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
224       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
225       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
226                                      prev_log_serial, timeout)
227
228     elif method == luxi.REQ_QUERY_JOBS:
229       (job_ids, fields) = args
230       return queue.QueryJobs(job_ids, fields)
231
232     elif method == luxi.REQ_QUERY_INSTANCES:
233       (names, fields) = args
234       op = opcodes.OpQueryInstances(names=names, output_fields=fields)
235       return self._Query(op)
236
237     elif method == luxi.REQ_QUERY_NODES:
238       (names, fields) = args
239       op = opcodes.OpQueryNodes(names=names, output_fields=fields)
240       return self._Query(op)
241
242     elif method == luxi.REQ_QUERY_EXPORTS:
243       nodes = args
244       op = opcodes.OpQueryExports(nodes=nodes)
245       return self._Query(op)
246
247     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
248       fields = args
249       op = opcodes.OpQueryConfigValues(output_fields=fields)
250       return self._Query(op)
251
252     else:
253       raise ValueError("Invalid operation")
254
255   def _DummyLog(self, *args):
256     pass
257
258   def _Query(self, op):
259     """Runs the specified opcode and returns the result.
260
261     """
262     proc = mcpu.Processor(self.server.context)
263     # TODO: Where should log messages go?
264     return proc.ExecOpCode(op, self._DummyLog, None)
265
266
267 class GanetiContext(object):
268   """Context common to all ganeti threads.
269
270   This class creates and holds common objects shared by all threads.
271
272   """
273   _instance = None
274
275   def __init__(self):
276     """Constructs a new GanetiContext object.
277
278     There should be only a GanetiContext object at any time, so this
279     function raises an error if this is not the case.
280
281     """
282     assert self.__class__._instance is None, "double GanetiContext instance"
283
284     # Create global configuration object
285     self.cfg = config.ConfigWriter()
286
287     # Locking manager
288     self.glm = locking.GanetiLockManager(
289                 self.cfg.GetNodeList(),
290                 self.cfg.GetInstanceList())
291
292     # Job queue
293     self.jobqueue = jqueue.JobQueue(self)
294
295     # setting this also locks the class against attribute modifications
296     self.__class__._instance = self
297
298   def __setattr__(self, name, value):
299     """Setting GanetiContext attributes is forbidden after initialization.
300
301     """
302     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
303     object.__setattr__(self, name, value)
304
305   def AddNode(self, node):
306     """Adds a node to the configuration and lock manager.
307
308     """
309     # Add it to the configuration
310     self.cfg.AddNode(node)
311
312     # If preseeding fails it'll not be added
313     self.jobqueue.AddNode(node.name)
314
315     # Add the new node to the Ganeti Lock Manager
316     self.glm.add(locking.LEVEL_NODE, node.name)
317
318   def ReaddNode(self, node):
319     """Updates a node that's already in the configuration
320
321     """
322     # Synchronize the queue again
323     self.jobqueue.AddNode(node.name)
324
325   def RemoveNode(self, name):
326     """Removes a node from the configuration and lock manager.
327
328     """
329     # Remove node from configuration
330     self.cfg.RemoveNode(name)
331
332     # Notify job queue
333     self.jobqueue.RemoveNode(name)
334
335     # Remove the node from the Ganeti Lock Manager
336     self.glm.remove(locking.LEVEL_NODE, name)
337
338
339 def ParseOptions():
340   """Parse the command line options.
341
342   Returns:
343     (options, args) as from OptionParser.parse_args()
344
345   """
346   parser = OptionParser(description="Ganeti master daemon",
347                         usage="%prog [-f] [-d]",
348                         version="%%prog (ganeti) %s" %
349                         constants.RELEASE_VERSION)
350
351   parser.add_option("-f", "--foreground", dest="fork",
352                     help="Don't detach from the current terminal",
353                     default=True, action="store_false")
354   parser.add_option("-d", "--debug", dest="debug",
355                     help="Enable some debug messages",
356                     default=False, action="store_true")
357   options, args = parser.parse_args()
358   return options, args
359
360
361 def CheckAgreement():
362   """Check the agreement on who is the master.
363
364   The function uses a very simple algorithm: we must get more positive
365   than negative answers. Since in most of the cases we are the master,
366   we'll use our own config file for getting the node list. In the
367   future we could collect the current node list from our (possibly
368   obsolete) known nodes.
369
370   """
371   myself = utils.HostInfo().name
372   #temp instantiation of a config writer, used only to get the node list
373   cfg = config.ConfigWriter()
374   node_list = cfg.GetNodeList()
375   del cfg
376   try:
377     node_list.remove(myself)
378   except KeyError:
379     pass
380   if not node_list:
381     # either single node cluster, or a misconfiguration, but I won't
382     # break any other node, so I can proceed
383     return True
384   results = rpc.RpcRunner.call_master_info(node_list)
385   if not isinstance(results, dict):
386     # this should not happen (unless internal error in rpc)
387     logging.critical("Can't complete rpc call, aborting master startup")
388     return False
389   positive = negative = 0
390   other_masters = {}
391   for node in results:
392     if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
393       logging.warning("Can't contact node %s", node)
394       continue
395     master_node = results[node][2]
396     if master_node == myself:
397       positive += 1
398     else:
399       negative += 1
400       if not master_node in other_masters:
401         other_masters[master_node] = 0
402       other_masters[master_node] += 1
403   if positive <= negative:
404     # bad!
405     logging.critical("It seems we are not the master (%d votes for,"
406                      " %d votes against)", positive, negative)
407     if len(other_masters) > 1:
408       logging.critical("The other nodes do not agree on a single master")
409     elif other_masters:
410       # TODO: resync my files from the master
411       logging.critical("It seems the real master is %s",
412                        other_masters.keys()[0])
413     else:
414       logging.critical("Can't contact any node for data, aborting startup")
415     return False
416   return True
417
418
419 def main():
420   """Main function"""
421
422   options, args = ParseOptions()
423   utils.debug = options.debug
424   utils.no_fork = True
425
426   ssconf.CheckMaster(options.debug)
427
428   # we believe we are the master, let's ask the other nodes...
429   if not CheckAgreement():
430     return
431
432   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
433
434   # become a daemon
435   if options.fork:
436     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
437                     noclose_fds=[master.fileno()])
438
439   utils.WritePidFile(constants.MASTERD_PID)
440
441   logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
442                       stderr_logging=not options.fork)
443
444   logging.info("ganeti master daemon startup")
445
446   # activate ip
447   master_node = ssconf.SimpleConfigReader().GetMasterNode()
448   if not rpc.RpcRunner.call_node_start_master(master_node, False):
449     logging.error("Can't activate master IP address")
450
451   master.setup_queue()
452   try:
453     master.serve_forever()
454   finally:
455     master.server_cleanup()
456     utils.RemovePidFile(constants.MASTERD_PID)
457
458
459 if __name__ == "__main__":
460   main()