Abstract the timestamp formatting into cli.py
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import sys
31 import SocketServer
32 import time
33 import collections
34 import Queue
35 import random
36 import signal
37 import simplejson
38 import logging
39
40 from cStringIO import StringIO
41 from optparse import OptionParser
42
43 from ganeti import config
44 from ganeti import constants
45 from ganeti import mcpu
46 from ganeti import opcodes
47 from ganeti import jqueue
48 from ganeti import locking
49 from ganeti import luxi
50 from ganeti import utils
51 from ganeti import errors
52 from ganeti import ssconf
53 from ganeti import logger
54 from ganeti import workerpool
55 from ganeti import rpc
56
57
58 CLIENT_REQUEST_WORKERS = 16
59
60 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62
63
64 class ClientRequestWorker(workerpool.BaseWorker):
65   def RunTask(self, server, request, client_address):
66     """Process the request.
67
68     This is copied from the code in ThreadingMixIn.
69
70     """
71     try:
72       server.finish_request(request, client_address)
73       server.close_request(request)
74     except:
75       server.handle_error(request, client_address)
76       server.close_request(request)
77
78
79 class IOServer(SocketServer.UnixStreamServer):
80   """IO thread class.
81
82   This class takes care of initializing the other threads, setting
83   signal handlers (which are processed only in this thread), and doing
84   cleanup at shutdown.
85
86   """
87   def __init__(self, address, rqhandler):
88     """IOServer constructor
89
90     Args:
91       address: the address to bind this IOServer to
92       rqhandler: RequestHandler type object
93
94     """
95     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96
97     # We'll only start threads once we've forked.
98     self.context = None
99     self.request_workers = None
100
101   def setup_queue(self):
102     self.context = GanetiContext()
103     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104                                                  ClientRequestWorker)
105
106   def process_request(self, request, client_address):
107     """Add task to workerpool to process request.
108
109     """
110     self.request_workers.AddTask(self, request, client_address)
111
112   def serve_forever(self):
113     """Handle one request at a time until told to quit."""
114     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115     try:
116       while not sighandler.called:
117         self.handle_request()
118     finally:
119       sighandler.Reset()
120
121   def server_cleanup(self):
122     """Cleanup the server.
123
124     This involves shutting down the processor threads and the master
125     socket.
126
127     """
128     try:
129       self.server_close()
130     finally:
131       if self.request_workers:
132         self.request_workers.TerminateWorkers()
133       if self.context:
134         self.context.jobqueue.Shutdown()
135
136
137 class ClientRqHandler(SocketServer.BaseRequestHandler):
138   """Client handler"""
139   EOM = '\3'
140   READ_SIZE = 4096
141
142   def setup(self):
143     self._buffer = ""
144     self._msgs = collections.deque()
145     self._ops = ClientOps(self.server)
146
147   def handle(self):
148     while True:
149       msg = self.read_message()
150       if msg is None:
151         logging.info("client closed connection")
152         break
153
154       request = simplejson.loads(msg)
155       logging.debug("request: %s", request)
156       if not isinstance(request, dict):
157         logging.error("wrong request received: %s", msg)
158         break
159
160       method = request.get(luxi.KEY_METHOD, None)
161       args = request.get(luxi.KEY_ARGS, None)
162       if method is None or args is None:
163         logging.error("no method or args in request")
164         break
165
166       success = False
167       try:
168         result = self._ops.handle_request(method, args)
169         success = True
170       except:
171         logging.error("Unexpected exception", exc_info=True)
172         err = sys.exc_info()
173         result = "Caught exception: %s" % str(err[1])
174
175       response = {
176         luxi.KEY_SUCCESS: success,
177         luxi.KEY_RESULT: result,
178         }
179       logging.debug("response: %s", response)
180       self.send_message(simplejson.dumps(response))
181
182   def read_message(self):
183     while not self._msgs:
184       data = self.request.recv(self.READ_SIZE)
185       if not data:
186         return None
187       new_msgs = (self._buffer + data).split(self.EOM)
188       self._buffer = new_msgs.pop()
189       self._msgs.extend(new_msgs)
190     return self._msgs.popleft()
191
192   def send_message(self, msg):
193     #print "sending", msg
194     self.request.sendall(msg + self.EOM)
195
196
197 class ClientOps:
198   """Class holding high-level client operations."""
199   def __init__(self, server):
200     self.server = server
201
202   def handle_request(self, method, args):
203     queue = self.server.context.jobqueue
204
205     # TODO: Parameter validation
206
207     if method == luxi.REQ_SUBMIT_JOB:
208       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
209       return queue.SubmitJob(ops)
210
211     elif method == luxi.REQ_CANCEL_JOB:
212       job_id = args
213       return queue.CancelJob(job_id)
214
215     elif method == luxi.REQ_ARCHIVE_JOB:
216       job_id = args
217       return queue.ArchiveJob(job_id)
218
219     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
220       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
221       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
222                                      prev_log_serial, timeout)
223
224     elif method == luxi.REQ_QUERY_JOBS:
225       (job_ids, fields) = args
226       return queue.QueryJobs(job_ids, fields)
227
228     elif method == luxi.REQ_QUERY_INSTANCES:
229       (names, fields) = args
230       op = opcodes.OpQueryInstances(names=names, output_fields=fields)
231       return self._Query(op)
232
233     elif method == luxi.REQ_QUERY_NODES:
234       (names, fields) = args
235       op = opcodes.OpQueryNodes(names=names, output_fields=fields)
236       return self._Query(op)
237
238     elif method == luxi.REQ_QUERY_EXPORTS:
239       nodes = args
240       op = opcodes.OpQueryExports(nodes=nodes)
241       return self._Query(op)
242
243     else:
244       raise ValueError("Invalid operation")
245
246   def _DummyLog(self, *args):
247     pass
248
249   def _Query(self, op):
250     """Runs the specified opcode and returns the result.
251
252     """
253     proc = mcpu.Processor(self.server.context)
254     # TODO: Where should log messages go?
255     return proc.ExecOpCode(op, self._DummyLog)
256
257
258 class GanetiContext(object):
259   """Context common to all ganeti threads.
260
261   This class creates and holds common objects shared by all threads.
262
263   """
264   _instance = None
265
266   def __init__(self):
267     """Constructs a new GanetiContext object.
268
269     There should be only a GanetiContext object at any time, so this
270     function raises an error if this is not the case.
271
272     """
273     assert self.__class__._instance is None, "double GanetiContext instance"
274
275     # Create global configuration object
276     self.cfg = config.ConfigWriter()
277
278     # Locking manager
279     self.glm = locking.GanetiLockManager(
280                 self.cfg.GetNodeList(),
281                 self.cfg.GetInstanceList())
282
283     # Job queue
284     self.jobqueue = jqueue.JobQueue(self)
285
286     # setting this also locks the class against attribute modifications
287     self.__class__._instance = self
288
289   def __setattr__(self, name, value):
290     """Setting GanetiContext attributes is forbidden after initialization.
291
292     """
293     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
294     object.__setattr__(self, name, value)
295
296   def AddNode(self, node):
297     """Adds a node to the configuration and lock manager.
298
299     """
300     # Add it to the configuration
301     self.cfg.AddNode(node)
302
303     # If preseeding fails it'll not be added
304     self.jobqueue.AddNode(node.name)
305
306     # Add the new node to the Ganeti Lock Manager
307     self.glm.add(locking.LEVEL_NODE, node.name)
308
309   def ReaddNode(self, node):
310     """Updates a node that's already in the configuration
311
312     """
313     # Synchronize the queue again
314     self.jobqueue.AddNode(node.name)
315
316   def RemoveNode(self, name):
317     """Removes a node from the configuration and lock manager.
318
319     """
320     # Remove node from configuration
321     self.cfg.RemoveNode(name)
322
323     # Notify job queue
324     self.jobqueue.RemoveNode(name)
325
326     # Remove the node from the Ganeti Lock Manager
327     self.glm.remove(locking.LEVEL_NODE, name)
328
329
330 def ParseOptions():
331   """Parse the command line options.
332
333   Returns:
334     (options, args) as from OptionParser.parse_args()
335
336   """
337   parser = OptionParser(description="Ganeti master daemon",
338                         usage="%prog [-f] [-d]",
339                         version="%%prog (ganeti) %s" %
340                         constants.RELEASE_VERSION)
341
342   parser.add_option("-f", "--foreground", dest="fork",
343                     help="Don't detach from the current terminal",
344                     default=True, action="store_false")
345   parser.add_option("-d", "--debug", dest="debug",
346                     help="Enable some debug messages",
347                     default=False, action="store_true")
348   options, args = parser.parse_args()
349   return options, args
350
351
352 def CheckAgreement():
353   """Check the agreement on who is the master.
354
355   The function uses a very simple algorithm: we must get more positive
356   than negative answers. Since in most of the cases we are the master,
357   we'll use our own config file for getting the node list. In the
358   future we could collect the current node list from our (possibly
359   obsolete) known nodes.
360
361   """
362   myself = utils.HostInfo().name
363   #temp instantiation of a config writer, used only to get the node list
364   cfg = config.ConfigWriter()
365   node_list = cfg.GetNodeList()
366   del cfg
367   try:
368     node_list.remove(myself)
369   except KeyError:
370     pass
371   if not node_list:
372     # either single node cluster, or a misconfiguration, but I won't
373     # break any other node, so I can proceed
374     return True
375   results = rpc.call_master_info(node_list)
376   if not isinstance(results, dict):
377     # this should not happen (unless internal error in rpc)
378     logging.critical("Can't complete rpc call, aborting master startup")
379     return False
380   positive = negative = 0
381   other_masters = {}
382   for node in results:
383     if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
384       logging.warning("Can't contact node %s", node)
385       continue
386     master_node = results[node][2]
387     if master_node == myself:
388       positive += 1
389     else:
390       negative += 1
391       if not master_node in other_masters:
392         other_masters[master_node] = 0
393       other_masters[master_node] += 1
394   if positive <= negative:
395     # bad!
396     logging.critical("It seems we are not the master (%d votes for,"
397                      " %d votes against)", positive, negative)
398     if len(other_masters) > 1:
399       logging.critical("The other nodes do not agree on a single master")
400     elif other_masters:
401       # TODO: resync my files from the master
402       logging.critical("It seems the real master is %s",
403                        other_masters.keys()[0])
404     else:
405       logging.critical("Can't contact any node for data, aborting startup")
406     return False
407   return True
408
409
410 def main():
411   """Main function"""
412
413   options, args = ParseOptions()
414   utils.debug = options.debug
415   utils.no_fork = True
416
417   ssconf.CheckMaster(options.debug)
418
419   # we believe we are the master, let's ask the other nodes...
420   if not CheckAgreement():
421     return
422
423   master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
424
425   # become a daemon
426   if options.fork:
427     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
428                     noclose_fds=[master.fileno()])
429
430   utils.WritePidFile(constants.MASTERD_PID)
431
432   logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
433                       stderr_logging=not options.fork)
434
435   logging.info("ganeti master daemon startup")
436
437   # activate ip
438   master_node = ssconf.SimpleStore().GetMasterNode()
439   if not rpc.call_node_start_master(master_node, False):
440     logging.error("Can't activate master IP address")
441
442   master.setup_queue()
443   try:
444     master.serve_forever()
445   finally:
446     master.server_cleanup()
447     utils.RemovePidFile(constants.MASTERD_PID)
448
449
450 if __name__ == "__main__":
451   main()