Cluster: add nicparams, and update them on upgrade
[ganeti-local] / daemons / ganeti-masterd
1 #!/usr/bin/python -u
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Master daemon program.
23
24 Some classes deviates from the standard style guide since the
25 inheritance from parent classes requires it.
26
27 """
28
29
30 import os
31 import errno
32 import sys
33 import SocketServer
34 import time
35 import collections
36 import Queue
37 import random
38 import signal
39 import logging
40
41 from cStringIO import StringIO
42 from optparse import OptionParser
43
44 from ganeti import config
45 from ganeti import constants
46 from ganeti import mcpu
47 from ganeti import opcodes
48 from ganeti import jqueue
49 from ganeti import locking
50 from ganeti import luxi
51 from ganeti import utils
52 from ganeti import errors
53 from ganeti import ssconf
54 from ganeti import workerpool
55 from ganeti import rpc
56 from ganeti import bootstrap
57 from ganeti import serializer
58
59
60 CLIENT_REQUEST_WORKERS = 16
61
62 EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63 EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64
65
66 class ClientRequestWorker(workerpool.BaseWorker):
67   def RunTask(self, server, request, client_address):
68     """Process the request.
69
70     This is copied from the code in ThreadingMixIn.
71
72     """
73     try:
74       server.finish_request(request, client_address)
75       server.close_request(request)
76     except:
77       server.handle_error(request, client_address)
78       server.close_request(request)
79
80
81 class IOServer(SocketServer.UnixStreamServer):
82   """IO thread class.
83
84   This class takes care of initializing the other threads, setting
85   signal handlers (which are processed only in this thread), and doing
86   cleanup at shutdown.
87
88   """
89   def __init__(self, address, rqhandler):
90     """IOServer constructor
91
92     @param address: the address to bind this IOServer to
93     @param rqhandler: RequestHandler type object
94
95     """
96     SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97
98     # We'll only start threads once we've forked.
99     self.context = None
100     self.request_workers = None
101
102   def setup_queue(self):
103     self.context = GanetiContext()
104     self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105                                                  ClientRequestWorker)
106
107   def process_request(self, request, client_address):
108     """Add task to workerpool to process request.
109
110     """
111     self.request_workers.AddTask(self, request, client_address)
112
113   def serve_forever(self):
114     """Handle one request at a time until told to quit."""
115     sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116     try:
117       while not sighandler.called:
118         self.handle_request()
119     finally:
120       sighandler.Reset()
121
122   def server_cleanup(self):
123     """Cleanup the server.
124
125     This involves shutting down the processor threads and the master
126     socket.
127
128     """
129     try:
130       self.server_close()
131     finally:
132       if self.request_workers:
133         self.request_workers.TerminateWorkers()
134       if self.context:
135         self.context.jobqueue.Shutdown()
136
137
138 class ClientRqHandler(SocketServer.BaseRequestHandler):
139   """Client handler"""
140   EOM = '\3'
141   READ_SIZE = 4096
142
143   def setup(self):
144     self._buffer = ""
145     self._msgs = collections.deque()
146     self._ops = ClientOps(self.server)
147
148   def handle(self):
149     while True:
150       msg = self.read_message()
151       if msg is None:
152         logging.debug("client closed connection")
153         break
154
155       request = serializer.LoadJson(msg)
156       logging.debug("request: %s", request)
157       if not isinstance(request, dict):
158         logging.error("wrong request received: %s", msg)
159         break
160
161       method = request.get(luxi.KEY_METHOD, None)
162       args = request.get(luxi.KEY_ARGS, None)
163       if method is None or args is None:
164         logging.error("no method or args in request")
165         break
166
167       success = False
168       try:
169         result = self._ops.handle_request(method, args)
170         success = True
171       except errors.GenericError, err:
172         success = False
173         result = (err.__class__.__name__, err.args)
174       except:
175         logging.error("Unexpected exception", exc_info=True)
176         err = sys.exc_info()
177         result = "Caught exception: %s" % str(err[1])
178
179       response = {
180         luxi.KEY_SUCCESS: success,
181         luxi.KEY_RESULT: result,
182         }
183       logging.debug("response: %s", response)
184       self.send_message(serializer.DumpJson(response))
185
186   def read_message(self):
187     while not self._msgs:
188       data = self.request.recv(self.READ_SIZE)
189       if not data:
190         return None
191       new_msgs = (self._buffer + data).split(self.EOM)
192       self._buffer = new_msgs.pop()
193       self._msgs.extend(new_msgs)
194     return self._msgs.popleft()
195
196   def send_message(self, msg):
197     #print "sending", msg
198     self.request.sendall(msg + self.EOM)
199
200
201 class ClientOps:
202   """Class holding high-level client operations."""
203   def __init__(self, server):
204     self.server = server
205
206   def handle_request(self, method, args):
207     queue = self.server.context.jobqueue
208
209     # TODO: Parameter validation
210
211     if method == luxi.REQ_SUBMIT_JOB:
212       logging.info("Received new job")
213       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214       return queue.SubmitJob(ops)
215
216     if method == luxi.REQ_SUBMIT_MANY_JOBS:
217       logging.info("Received multiple jobs")
218       jobs = []
219       for ops in args:
220         jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
221       return queue.SubmitManyJobs(jobs)
222
223     elif method == luxi.REQ_CANCEL_JOB:
224       job_id = args
225       logging.info("Received job cancel request for %s", job_id)
226       return queue.CancelJob(job_id)
227
228     elif method == luxi.REQ_ARCHIVE_JOB:
229       job_id = args
230       logging.info("Received job archive request for %s", job_id)
231       return queue.ArchiveJob(job_id)
232
233     elif method == luxi.REQ_AUTOARCHIVE_JOBS:
234       (age, timeout) = args
235       logging.info("Received job autoarchive request for age %s, timeout %s",
236                    age, timeout)
237       return queue.AutoArchiveJobs(age, timeout)
238
239     elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
240       (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
241       logging.info("Received job poll request for %s", job_id)
242       return queue.WaitForJobChanges(job_id, fields, prev_job_info,
243                                      prev_log_serial, timeout)
244
245     elif method == luxi.REQ_QUERY_JOBS:
246       (job_ids, fields) = args
247       if isinstance(job_ids, (tuple, list)) and job_ids:
248         msg = ", ".join(job_ids)
249       else:
250         msg = str(job_ids)
251       logging.info("Received job query request for %s", msg)
252       return queue.QueryJobs(job_ids, fields)
253
254     elif method == luxi.REQ_QUERY_INSTANCES:
255       (names, fields, use_locking) = args
256       logging.info("Received instance query request for %s", names)
257       if use_locking:
258         raise errors.OpPrereqError("Sync queries are not allowed")
259       op = opcodes.OpQueryInstances(names=names, output_fields=fields,
260                                     use_locking=use_locking)
261       return self._Query(op)
262
263     elif method == luxi.REQ_QUERY_NODES:
264       (names, fields, use_locking) = args
265       logging.info("Received node query request for %s", names)
266       if use_locking:
267         raise errors.OpPrereqError("Sync queries are not allowed")
268       op = opcodes.OpQueryNodes(names=names, output_fields=fields,
269                                 use_locking=use_locking)
270       return self._Query(op)
271
272     elif method == luxi.REQ_QUERY_EXPORTS:
273       nodes, use_locking = args
274       if use_locking:
275         raise errors.OpPrereqError("Sync queries are not allowed")
276       logging.info("Received exports query request")
277       op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
278       return self._Query(op)
279
280     elif method == luxi.REQ_QUERY_CONFIG_VALUES:
281       fields = args
282       logging.info("Received config values query request for %s", fields)
283       op = opcodes.OpQueryConfigValues(output_fields=fields)
284       return self._Query(op)
285
286     elif method == luxi.REQ_QUERY_CLUSTER_INFO:
287       logging.info("Received cluster info query request")
288       op = opcodes.OpQueryClusterInfo()
289       return self._Query(op)
290
291     elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
292       drain_flag = args
293       logging.info("Received queue drain flag change request to %s",
294                    drain_flag)
295       return queue.SetDrainFlag(drain_flag)
296
297     else:
298       logging.info("Received invalid request '%s'", method)
299       raise ValueError("Invalid operation '%s'" % method)
300
301   def _DummyLog(self, *args):
302     pass
303
304   def _Query(self, op):
305     """Runs the specified opcode and returns the result.
306
307     """
308     proc = mcpu.Processor(self.server.context)
309     # TODO: Where should log messages go?
310     return proc.ExecOpCode(op, self._DummyLog, None)
311
312
313 class GanetiContext(object):
314   """Context common to all ganeti threads.
315
316   This class creates and holds common objects shared by all threads.
317
318   """
319   _instance = None
320
321   def __init__(self):
322     """Constructs a new GanetiContext object.
323
324     There should be only a GanetiContext object at any time, so this
325     function raises an error if this is not the case.
326
327     """
328     assert self.__class__._instance is None, "double GanetiContext instance"
329
330     # Create global configuration object
331     self.cfg = config.ConfigWriter()
332
333     # Locking manager
334     self.glm = locking.GanetiLockManager(
335                 self.cfg.GetNodeList(),
336                 self.cfg.GetInstanceList())
337
338     # Job queue
339     self.jobqueue = jqueue.JobQueue(self)
340
341     # setting this also locks the class against attribute modifications
342     self.__class__._instance = self
343
344   def __setattr__(self, name, value):
345     """Setting GanetiContext attributes is forbidden after initialization.
346
347     """
348     assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
349     object.__setattr__(self, name, value)
350
351   def AddNode(self, node):
352     """Adds a node to the configuration and lock manager.
353
354     """
355     # Add it to the configuration
356     self.cfg.AddNode(node)
357
358     # If preseeding fails it'll not be added
359     self.jobqueue.AddNode(node)
360
361     # Add the new node to the Ganeti Lock Manager
362     self.glm.add(locking.LEVEL_NODE, node.name)
363
364   def ReaddNode(self, node):
365     """Updates a node that's already in the configuration
366
367     """
368     # Synchronize the queue again
369     self.jobqueue.AddNode(node)
370
371   def RemoveNode(self, name):
372     """Removes a node from the configuration and lock manager.
373
374     """
375     # Remove node from configuration
376     self.cfg.RemoveNode(name)
377
378     # Notify job queue
379     self.jobqueue.RemoveNode(name)
380
381     # Remove the node from the Ganeti Lock Manager
382     self.glm.remove(locking.LEVEL_NODE, name)
383
384
385 def ParseOptions():
386   """Parse the command line options.
387
388   @return: (options, args) as from OptionParser.parse_args()
389
390   """
391   parser = OptionParser(description="Ganeti master daemon",
392                         usage="%prog [-f] [-d]",
393                         version="%%prog (ganeti) %s" %
394                         constants.RELEASE_VERSION)
395
396   parser.add_option("-f", "--foreground", dest="fork",
397                     help="Don't detach from the current terminal",
398                     default=True, action="store_false")
399   parser.add_option("-d", "--debug", dest="debug",
400                     help="Enable some debug messages",
401                     default=False, action="store_true")
402   parser.add_option("--no-voting", dest="no_voting",
403                     help="Do not check that the nodes agree on this node"
404                     " being the master and start the daemon unconditionally",
405                     default=False, action="store_true")
406   options, args = parser.parse_args()
407   return options, args
408
409
410 def CheckAgreement():
411   """Check the agreement on who is the master.
412
413   The function uses a very simple algorithm: we must get more positive
414   than negative answers. Since in most of the cases we are the master,
415   we'll use our own config file for getting the node list. In the
416   future we could collect the current node list from our (possibly
417   obsolete) known nodes.
418
419   In order to account for cold-start of all nodes, we retry for up to
420   a minute until we get a real answer as the top-voted one. If the
421   nodes are more out-of-sync, for now manual startup of the master
422   should be attempted.
423
424   Note that for a even number of nodes cluster, we need at least half
425   of the nodes (beside ourselves) to vote for us. This creates a
426   problem on two-node clusters, since in this case we require the
427   other node to be up too to confirm our status.
428
429   """
430   myself = utils.HostInfo().name
431   #temp instantiation of a config writer, used only to get the node list
432   cfg = config.ConfigWriter()
433   node_list = cfg.GetNodeList()
434   del cfg
435   retries = 6
436   while retries > 0:
437     votes = bootstrap.GatherMasterVotes(node_list)
438     if not votes:
439       # empty node list, this is a one node cluster
440       return True
441     if votes[0][0] is None:
442       retries -= 1
443       time.sleep(10)
444       continue
445     break
446   if retries == 0:
447     logging.critical("Cluster inconsistent, most of the nodes didn't answer"
448                      " after multiple retries. Aborting startup")
449     return False
450   # here a real node is at the top of the list
451   all_votes = sum(item[1] for item in votes)
452   top_node, top_votes = votes[0]
453   result = False
454   if top_node != myself:
455     logging.critical("It seems we are not the master (top-voted node"
456                      " is %s with %d out of %d votes)", top_node, top_votes,
457                      all_votes)
458   elif top_votes < all_votes - top_votes:
459     logging.critical("It seems we are not the master (%d votes for,"
460                      " %d votes against)", top_votes, all_votes - top_votes)
461   else:
462     result = True
463
464   return result
465
466
467 def main():
468   """Main function"""
469
470   options, args = ParseOptions()
471   utils.debug = options.debug
472   utils.no_fork = True
473
474   if options.fork:
475     utils.CloseFDs()
476
477   rpc.Init()
478   try:
479     ssconf.CheckMaster(options.debug)
480
481     # we believe we are the master, let's ask the other nodes...
482     if options.no_voting:
483       sys.stdout.write("The 'no voting' option has been selected.\n")
484       sys.stdout.write("This is dangerous, please confirm by"
485                        " typing uppercase 'yes': ")
486       sys.stdout.flush()
487       confirmation = sys.stdin.readline().strip()
488       if confirmation != "YES":
489         print "Aborting."
490         return
491     else:
492       if not CheckAgreement():
493         return
494
495     dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
496             (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
497            ]
498     utils.EnsureDirs(dirs)
499
500     # This is safe to do as the pid file guarantees against
501     # concurrent execution.
502     utils.RemoveFile(constants.MASTER_SOCKET)
503
504     master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
505   finally:
506     rpc.Shutdown()
507
508   # become a daemon
509   if options.fork:
510     utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
511
512   utils.WritePidFile(constants.MASTERD_PID)
513   try:
514     utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
515                        stderr_logging=not options.fork, multithreaded=True)
516
517     logging.info("Ganeti master daemon startup")
518
519     rpc.Init()
520     try:
521       # activate ip
522       master_node = ssconf.SimpleConfigReader().GetMasterNode()
523       if not rpc.RpcRunner.call_node_start_master(master_node, False):
524         logging.error("Can't activate master IP address")
525
526       master.setup_queue()
527       try:
528         master.serve_forever()
529       finally:
530         master.server_cleanup()
531     finally:
532       rpc.Shutdown()
533   finally:
534     utils.RemovePidFile(constants.MASTERD_PID)
535     utils.RemoveFile(constants.MASTER_SOCKET)
536
537
538 if __name__ == "__main__":
539   main()