Fix epydoc format warnings
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import os
31 import errno
32 import sys
33 import SocketServer
34 import time
35 import collections
36 import Queue
37 import random
38 import signal
39 import simplejson
40 import logging
41
42 from cStringIO import StringIO
43 from optparse import OptionParser
44
45 from ganeti import config
46 from ganeti import constants
47 from ganeti import mcpu
48 from ganeti import opcodes
49 from ganeti import jqueue
50 from ganeti import locking
51 from ganeti import luxi
52 from ganeti import utils
53 from ganeti import errors
54 from ganeti import ssconf
55 from ganeti import workerpool
56 from ganeti import rpc
57 from ganeti import bootstrap
58
59
60 CLIENT_REQUEST_WORKERS = 16
61
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64
65
66 class ClientRequestWorker(workerpool.BaseWorker):
67   def RunTask(self, server, request, client_address):
68     """Process the request.
69
70     This is copied from the code in ThreadingMixIn.
71
72     """
73     try:
74       server.finish_request(request, client_address)
75       server.close_request(request)
76     except:
77       server.handle_error(request, client_address)
78       server.close_request(request)
79
80
81 class IOServer(SocketServer.UnixStreamServer):
82   """IO thread class.
83
84   This class takes care of initializing the other threads, setting
85   signal handlers (which are processed only in this thread), and doing
86   cleanup at shutdown.
87
88   """
89   def __init__(self, address, rqhandler):
90     """IOServer constructor
91
92     @param address: the address to bind this IOServer to
93     @param rqhandler: RequestHandler type object
94
95     """
96     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
98     # We'll only start threads once we've forked.
99     self.context = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.context = GanetiContext()
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   def serve_forever(self):
114     """Handle one request at a time until told to quit."""
115     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116     try:
117       while not sighandler.called:
118         self.handle_request()
119     finally:
120       sighandler.Reset()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131     finally:
132       if self.request_workers:
133         self.request_workers.TerminateWorkers()
134       if self.context:
135         self.context.jobqueue.Shutdown()
136
137
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
139   """Client handler"""
140   EOM = '\3'
141   READ_SIZE = 4096
142
143   def setup(self):
144     self._buffer = ""
145     self._msgs = collections.deque()
146     self._ops = ClientOps(self.server)
147
148   def handle(self):
149     while True:
150       msg = self.read_message()
151       if msg is None:
152         logging.info("client closed connection")
153         break
154
155       request = simplejson.loads(msg)
156       logging.debug("request: %s", request)
157       if not isinstance(request, dict):
158         logging.error("wrong request received: %s", msg)
159         break
160
161       method = request.get(luxi.KEY_METHOD, None)
162       args = request.get(luxi.KEY_ARGS, None)
163       if method is None or args is None:
164         logging.error("no method or args in request")
165         break
166
167       success = False
168       try:
169         result = self._ops.handle_request(method, args)
170         success = True
171       except errors.GenericError, err:
172         success = False
173         result = (err.__class__.__name__, err.args)
174       except:
175         logging.error("Unexpected exception", exc_info=True)
176         err = sys.exc_info()
177         result = "Caught exception: %s" % str(err[1])
178
179       response = {
180         luxi.KEY_SUCCESS: success,
181         luxi.KEY_RESULT: result,
182         }
183       logging.debug("response: %s", response)
184       self.send_message(simplejson.dumps(response))
185
186   def read_message(self):
187     while not self._msgs:
188       data = self.request.recv(self.READ_SIZE)
189       if not data:
190         return None
191       new_msgs = (self._buffer + data).split(self.EOM)
192       self._buffer = new_msgs.pop()
193       self._msgs.extend(new_msgs)
194     return self._msgs.popleft()
195
196   def send_message(self, msg):
197     #print "sending", msg
198     self.request.sendall(msg + self.EOM)
199
200
201 class ClientOps:
202   """Class holding high-level client operations."""
203   def __init__(self, server):
204     self.server = server
205
206   def handle_request(self, method, args):
207     queue = self.server.context.jobqueue
208
209     # TODO: Parameter validation
210
211     if method == luxi.REQ_SUBMIT_JOB:
212       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
213       return queue.SubmitJob(ops)
214
215     elif method == luxi.REQ_CANCEL_JOB:
216       job_id = args
217       return queue.CancelJob(job_id)
218
219     elif method == luxi.REQ_ARCHIVE_JOB:
220       job_id = args
221       return queue.ArchiveJob(job_id)
222
223     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
224       age = args
225       return queue.AutoArchiveJobs(age)
226
227     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230                                      prev_log_serial, timeout)
231
232     elif method == luxi.REQ_QUERY_JOBS:
233       (job_ids, fields) = args
234       return queue.QueryJobs(job_ids, fields)
235
236     elif method == luxi.REQ_QUERY_INSTANCES:
237       (names, fields) = args
238       op = opcodes.OpQueryInstances(names=names, output_fields=fields)
239       return self._Query(op)
240
241     elif method == luxi.REQ_QUERY_NODES:
242       (names, fields) = args
243       op = opcodes.OpQueryNodes(names=names, output_fields=fields)
244       return self._Query(op)
245
246     elif method == luxi.REQ_QUERY_EXPORTS:
247       nodes = args
248       op = opcodes.OpQueryExports(nodes=nodes)
249       return self._Query(op)
250
251     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
252       fields = args
253       op = opcodes.OpQueryConfigValues(output_fields=fields)
254       return self._Query(op)
255
256     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
257       drain_flag = args
258       return queue.SetDrainFlag(drain_flag)
259
260     else:
261       raise ValueError("Invalid operation")
262
263   def _DummyLog(self, *args):
264     pass
265
266   def _Query(self, op):
267     """Runs the specified opcode and returns the result.
268
269     """
270     proc = mcpu.Processor(self.server.context)
271     # TODO: Where should log messages go?
272     return proc.ExecOpCode(op, self._DummyLog, None)
273
274
275 class GanetiContext(object):
276   """Context common to all ganeti threads.
277
278   This class creates and holds common objects shared by all threads.
279
280   """
281   _instance = None
282
283   def __init__(self):
284     """Constructs a new GanetiContext object.
285
286     There should be only a GanetiContext object at any time, so this
287     function raises an error if this is not the case.
288
289     """
290     assert self.__class__._instance is None, "double GanetiContext instance"
291
292     # Create global configuration object
293     self.cfg = config.ConfigWriter()
294
295     # Locking manager
296     self.glm = locking.GanetiLockManager(
297                 self.cfg.GetNodeList(),
298                 self.cfg.GetInstanceList())
299
300     # Job queue
301     self.jobqueue = jqueue.JobQueue(self)
302
303     # setting this also locks the class against attribute modifications
304     self.__class__._instance = self
305
306   def __setattr__(self, name, value):
307     """Setting GanetiContext attributes is forbidden after initialization.
308
309     """
310     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
311     object.__setattr__(self, name, value)
312
313   def AddNode(self, node):
314     """Adds a node to the configuration and lock manager.
315
316     """
317     # Add it to the configuration
318     self.cfg.AddNode(node)
319
320     # If preseeding fails it'll not be added
321     self.jobqueue.AddNode(node)
322
323     # Add the new node to the Ganeti Lock Manager
324     self.glm.add(locking.LEVEL_NODE, node.name)
325
326   def ReaddNode(self, node):
327     """Updates a node that's already in the configuration
328
329     """
330     # Synchronize the queue again
331     self.jobqueue.AddNode(node)
332
333   def RemoveNode(self, name):
334     """Removes a node from the configuration and lock manager.
335
336     """
337     # Remove node from configuration
338     self.cfg.RemoveNode(name)
339
340     # Notify job queue
341     self.jobqueue.RemoveNode(name)
342
343     # Remove the node from the Ganeti Lock Manager
344     self.glm.remove(locking.LEVEL_NODE, name)
345
346
347 def ParseOptions():
348   """Parse the command line options.
349
350   @return: (options, args) as from OptionParser.parse_args()
351
352   """
353   parser = OptionParser(description="Ganeti master daemon",
354                         usage="%prog [-f] [-d]",
355                         version="%%prog (ganeti) %s" %
356                         constants.RELEASE_VERSION)
357
358   parser.add_option("-f", "--foreground", dest="fork",
359                     help="Don't detach from the current terminal",
360                     default=True, action="store_false")
361   parser.add_option("-d", "--debug", dest="debug",
362                     help="Enable some debug messages",
363                     default=False, action="store_true")
364   options, args = parser.parse_args()
365   return options, args
366
367
368 def CheckAgreement():
369   """Check the agreement on who is the master.
370
371   The function uses a very simple algorithm: we must get more positive
372   than negative answers. Since in most of the cases we are the master,
373   we'll use our own config file for getting the node list. In the
374   future we could collect the current node list from our (possibly
375   obsolete) known nodes.
376
377   In order to account for cold-start of all nodes, we retry for up to
378   a minute until we get a real answer as the top-voted one. If the
379   nodes are more out-of-sync, for now manual startup of the master
380   should be attempted.
381
382   Note that for a even number of nodes cluster, we need at least half
383   of the nodes (beside ourselves) to vote for us. This creates a
384   problem on two-node clusters, since in this case we require the
385   other node to be up too to confirm our status.
386
387   """
388   myself = utils.HostInfo().name
389   #temp instantiation of a config writer, used only to get the node list
390   cfg = config.ConfigWriter()
391   node_list = cfg.GetNodeList()
392   del cfg
393   retries = 6
394   while retries > 0:
395     votes = bootstrap.GatherMasterVotes(node_list)
396     if not votes:
397       # empty node list, this is a one node cluster
398       return True
399     if votes[0][0] is None:
400       retries -= 1
401       time.sleep(10)
402       continue
403     break
404   if retries == 0:
405       logging.critical("Cluster inconsistent, most of the nodes didn't answer"
406                        " after multiple retries. Aborting startup")
407       return False
408   # here a real node is at the top of the list
409   all_votes = sum(item[1] for item in votes)
410   top_node, top_votes = votes[0]
411   result = False
412   if top_node != myself:
413     logging.critical("It seems we are not the master (top-voted node"
414                      " is %s with %d out of %d votes)", top_node, top_votes,
415                      all_votes)
416   elif top_votes < all_votes - top_votes:
417     logging.critical("It seems we are not the master (%d votes for,"
418                      " %d votes against)", top_votes, all_votes - top_votes)
419   else:
420     result = True
421
422   return result
423
424
425 def main():
426   """Main function"""
427
428   options, args = ParseOptions()
429   utils.debug = options.debug
430   utils.no_fork = True
431
432   rpc.Init()
433   try:
434     ssconf.CheckMaster(options.debug)
435
436     # we believe we are the master, let's ask the other nodes...
437     if not CheckAgreement():
438       return
439
440     dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
441             (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
442            ]
443     for dir, mode in dirs:
444       try:
445         os.mkdir(dir, mode)
446       except EnvironmentError, err:
447         if err.errno != errno.EEXIST:
448           raise errors.GenericError("Cannot create needed directory"
449             " '%s': %s" % (constants.SOCKET_DIR, err))
450       if not os.path.isdir(dir):
451         raise errors.GenericError("%s is not a directory" % dir)
452
453     # This is safe to do as the pid file guarantees against
454     # concurrent execution.
455     utils.RemoveFile(constants.MASTER_SOCKET)
456
457     master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
458   finally:
459     rpc.Shutdown()
460
461   # become a daemon
462   if options.fork:
463     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
464                     noclose_fds=[master.fileno()])
465
466   utils.WritePidFile(constants.MASTERD_PID)
467   try:
468     utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
469                        stderr_logging=not options.fork)
470
471     logging.info("Ganeti master daemon startup")
472
473     rpc.Init()
474     try:
475       # activate ip
476       master_node = ssconf.SimpleConfigReader().GetMasterNode()
477       if not rpc.RpcRunner.call_node_start_master(master_node, False):
478         logging.error("Can't activate master IP address")
479
480       master.setup_queue()
481       try:
482         master.serve_forever()
483       finally:
484         master.server_cleanup()
485     finally:
486       rpc.Shutdown()
487   finally:
488     utils.RemovePidFile(constants.MASTERD_PID)
489     utils.RemoveFile(constants.MASTER_SOCKET)
490
491
492 if __name__ == "__main__":
493   main()