Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 0debfb35

History | View | Annotate | Download (18 kB)

1 834f8b67 Iustin Pop
#!/usr/bin/python
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 ffeffa1d Iustin Pop
30 d823660a Guido Trotter
import os
31 c1f2901b Iustin Pop
import sys
32 ffeffa1d Iustin Pop
import SocketServer
33 ffeffa1d Iustin Pop
import time
34 ffeffa1d Iustin Pop
import collections
35 ffeffa1d Iustin Pop
import signal
36 96cb3986 Michael Hanselmann
import logging
37 ffeffa1d Iustin Pop
38 c1f2901b Iustin Pop
from optparse import OptionParser
39 ffeffa1d Iustin Pop
40 39dcf2ef Guido Trotter
from ganeti import config
41 ffeffa1d Iustin Pop
from ganeti import constants
42 04ccf5e9 Guido Trotter
from ganeti import daemon
43 ffeffa1d Iustin Pop
from ganeti import mcpu
44 ffeffa1d Iustin Pop
from ganeti import opcodes
45 ffeffa1d Iustin Pop
from ganeti import jqueue
46 39dcf2ef Guido Trotter
from ganeti import locking
47 ffeffa1d Iustin Pop
from ganeti import luxi
48 ffeffa1d Iustin Pop
from ganeti import utils
49 c1f2901b Iustin Pop
from ganeti import errors
50 c1f2901b Iustin Pop
from ganeti import ssconf
51 23e50d39 Michael Hanselmann
from ganeti import workerpool
52 b1b6ea87 Iustin Pop
from ganeti import rpc
53 d7cdb55d Iustin Pop
from ganeti import bootstrap
54 dd36d829 Iustin Pop
from ganeti import serializer
55 c1f2901b Iustin Pop
56 c1f2901b Iustin Pop
57 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
58 23e50d39 Michael Hanselmann
59 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61 ffeffa1d Iustin Pop
62 ffeffa1d Iustin Pop
63 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
64 23e50d39 Michael Hanselmann
  def RunTask(self, server, request, client_address):
65 23e50d39 Michael Hanselmann
    """Process the request.
66 23e50d39 Michael Hanselmann
67 23e50d39 Michael Hanselmann
    This is copied from the code in ThreadingMixIn.
68 23e50d39 Michael Hanselmann
69 23e50d39 Michael Hanselmann
    """
70 23e50d39 Michael Hanselmann
    try:
71 23e50d39 Michael Hanselmann
      server.finish_request(request, client_address)
72 23e50d39 Michael Hanselmann
      server.close_request(request)
73 23e50d39 Michael Hanselmann
    except:
74 23e50d39 Michael Hanselmann
      server.handle_error(request, client_address)
75 23e50d39 Michael Hanselmann
      server.close_request(request)
76 23e50d39 Michael Hanselmann
77 23e50d39 Michael Hanselmann
78 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
79 ffeffa1d Iustin Pop
  """IO thread class.
80 ffeffa1d Iustin Pop
81 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
82 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
83 ffeffa1d Iustin Pop
  cleanup at shutdown.
84 ffeffa1d Iustin Pop
85 ffeffa1d Iustin Pop
  """
86 9113300d Michael Hanselmann
  def __init__(self, address, rqhandler):
87 ce862cd5 Guido Trotter
    """IOServer constructor
88 ce862cd5 Guido Trotter
89 c41eea6e Iustin Pop
    @param address: the address to bind this IOServer to
90 c41eea6e Iustin Pop
    @param rqhandler: RequestHandler type object
91 ce862cd5 Guido Trotter
92 ce862cd5 Guido Trotter
    """
93 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94 50a3fbb2 Michael Hanselmann
95 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
96 9113300d Michael Hanselmann
    self.context = None
97 23e50d39 Michael Hanselmann
    self.request_workers = None
98 50a3fbb2 Michael Hanselmann
99 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
100 9113300d Michael Hanselmann
    self.context = GanetiContext()
101 23e50d39 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
103 ffeffa1d Iustin Pop
104 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
105 23e50d39 Michael Hanselmann
    """Add task to workerpool to process request.
106 ffeffa1d Iustin Pop
107 ffeffa1d Iustin Pop
    """
108 23e50d39 Michael Hanselmann
    self.request_workers.AddTask(self, request, client_address)
109 ffeffa1d Iustin Pop
110 6b5e5018 Guido Trotter
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
111 6b5e5018 Guido Trotter
  def serve_forever(self, signal_handlers=None):
112 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
113 6b5e5018 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
114 6b5e5018 Guido Trotter
           len(signal_handlers) > 0, \
115 6b5e5018 Guido Trotter
           "Broken SignalHandled decorator"
116 6b5e5018 Guido Trotter
    # Since we use SignalHandled only once, the resulting dict will map all
117 6b5e5018 Guido Trotter
    # signals to the same handler. We'll just use the first one.
118 6b5e5018 Guido Trotter
    sighandler = signal_handlers.values()[0]
119 6b5e5018 Guido Trotter
    while not sighandler.called:
120 6b5e5018 Guido Trotter
      self.handle_request()
121 c1f2901b Iustin Pop
122 c1f2901b Iustin Pop
  def server_cleanup(self):
123 c1f2901b Iustin Pop
    """Cleanup the server.
124 c1f2901b Iustin Pop
125 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
126 c1f2901b Iustin Pop
    socket.
127 c1f2901b Iustin Pop
128 c1f2901b Iustin Pop
    """
129 50a3fbb2 Michael Hanselmann
    try:
130 50a3fbb2 Michael Hanselmann
      self.server_close()
131 50a3fbb2 Michael Hanselmann
    finally:
132 23e50d39 Michael Hanselmann
      if self.request_workers:
133 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
134 9113300d Michael Hanselmann
      if self.context:
135 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
136 ffeffa1d Iustin Pop
137 ffeffa1d Iustin Pop
138 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
139 ffeffa1d Iustin Pop
  """Client handler"""
140 ffeffa1d Iustin Pop
  EOM = '\3'
141 ffeffa1d Iustin Pop
  READ_SIZE = 4096
142 ffeffa1d Iustin Pop
143 ffeffa1d Iustin Pop
  def setup(self):
144 ffeffa1d Iustin Pop
    self._buffer = ""
145 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
146 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
147 ffeffa1d Iustin Pop
148 ffeffa1d Iustin Pop
  def handle(self):
149 ffeffa1d Iustin Pop
    while True:
150 ffeffa1d Iustin Pop
      msg = self.read_message()
151 ffeffa1d Iustin Pop
      if msg is None:
152 d21d09d6 Iustin Pop
        logging.debug("client closed connection")
153 ffeffa1d Iustin Pop
        break
154 3d8548c4 Michael Hanselmann
155 dd36d829 Iustin Pop
      request = serializer.LoadJson(msg)
156 3d8548c4 Michael Hanselmann
      logging.debug("request: %s", request)
157 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
158 3d8548c4 Michael Hanselmann
        logging.error("wrong request received: %s", msg)
159 ffeffa1d Iustin Pop
        break
160 3d8548c4 Michael Hanselmann
161 3d8548c4 Michael Hanselmann
      method = request.get(luxi.KEY_METHOD, None)
162 3d8548c4 Michael Hanselmann
      args = request.get(luxi.KEY_ARGS, None)
163 3d8548c4 Michael Hanselmann
      if method is None or args is None:
164 3d8548c4 Michael Hanselmann
        logging.error("no method or args in request")
165 ffeffa1d Iustin Pop
        break
166 3d8548c4 Michael Hanselmann
167 3d8548c4 Michael Hanselmann
      success = False
168 3d8548c4 Michael Hanselmann
      try:
169 3d8548c4 Michael Hanselmann
        result = self._ops.handle_request(method, args)
170 3d8548c4 Michael Hanselmann
        success = True
171 6797ec29 Iustin Pop
      except errors.GenericError, err:
172 6797ec29 Iustin Pop
        success = False
173 6956e9cd Iustin Pop
        result = errors.EncodeException(err)
174 3d8548c4 Michael Hanselmann
      except:
175 3d8548c4 Michael Hanselmann
        logging.error("Unexpected exception", exc_info=True)
176 3d8548c4 Michael Hanselmann
        err = sys.exc_info()
177 3d8548c4 Michael Hanselmann
        result = "Caught exception: %s" % str(err[1])
178 3d8548c4 Michael Hanselmann
179 3d8548c4 Michael Hanselmann
      response = {
180 3d8548c4 Michael Hanselmann
        luxi.KEY_SUCCESS: success,
181 3d8548c4 Michael Hanselmann
        luxi.KEY_RESULT: result,
182 3d8548c4 Michael Hanselmann
        }
183 3d8548c4 Michael Hanselmann
      logging.debug("response: %s", response)
184 dd36d829 Iustin Pop
      self.send_message(serializer.DumpJson(response))
185 ffeffa1d Iustin Pop
186 ffeffa1d Iustin Pop
  def read_message(self):
187 ffeffa1d Iustin Pop
    while not self._msgs:
188 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
189 ffeffa1d Iustin Pop
      if not data:
190 ffeffa1d Iustin Pop
        return None
191 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
192 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
193 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
194 ffeffa1d Iustin Pop
    return self._msgs.popleft()
195 ffeffa1d Iustin Pop
196 ffeffa1d Iustin Pop
  def send_message(self, msg):
197 ffeffa1d Iustin Pop
    #print "sending", msg
198 6096ee13 Michael Hanselmann
    # TODO: sendall is not guaranteed to send everything
199 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
200 ffeffa1d Iustin Pop
201 ffeffa1d Iustin Pop
202 ffeffa1d Iustin Pop
class ClientOps:
203 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
204 ffeffa1d Iustin Pop
  def __init__(self, server):
205 ffeffa1d Iustin Pop
    self.server = server
206 ffeffa1d Iustin Pop
207 0bbe448c Michael Hanselmann
  def handle_request(self, method, args):
208 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
209 0bbe448c Michael Hanselmann
210 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
211 0bbe448c Michael Hanselmann
212 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
213 e566ddbd Iustin Pop
      logging.info("Received new job")
214 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215 4c848b18 Michael Hanselmann
      return queue.SubmitJob(ops)
216 ffeffa1d Iustin Pop
217 2971c913 Iustin Pop
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
218 2971c913 Iustin Pop
      logging.info("Received multiple jobs")
219 2971c913 Iustin Pop
      jobs = []
220 2971c913 Iustin Pop
      for ops in args:
221 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222 2971c913 Iustin Pop
      return queue.SubmitManyJobs(jobs)
223 2971c913 Iustin Pop
224 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
225 3a2c7775 Michael Hanselmann
      job_id = args
226 e566ddbd Iustin Pop
      logging.info("Received job cancel request for %s", job_id)
227 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
228 ffeffa1d Iustin Pop
229 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
230 3a2c7775 Michael Hanselmann
      job_id = args
231 e566ddbd Iustin Pop
      logging.info("Received job archive request for %s", job_id)
232 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
233 0bbe448c Michael Hanselmann
234 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235 f8ad5591 Michael Hanselmann
      (age, timeout) = args
236 e566ddbd Iustin Pop
      logging.info("Received job autoarchive request for age %s, timeout %s",
237 e566ddbd Iustin Pop
                   age, timeout)
238 f8ad5591 Michael Hanselmann
      return queue.AutoArchiveJobs(age, timeout)
239 07cd723a Iustin Pop
240 dfe57c22 Michael Hanselmann
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241 5c735209 Iustin Pop
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242 e566ddbd Iustin Pop
      logging.info("Received job poll request for %s", job_id)
243 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244 5c735209 Iustin Pop
                                     prev_log_serial, timeout)
245 dfe57c22 Michael Hanselmann
246 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
247 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
248 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
249 e566ddbd Iustin Pop
        msg = ", ".join(job_ids)
250 e566ddbd Iustin Pop
      else:
251 e566ddbd Iustin Pop
        msg = str(job_ids)
252 e566ddbd Iustin Pop
      logging.info("Received job query request for %s", msg)
253 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
254 0bbe448c Michael Hanselmann
255 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
256 ec79568d Iustin Pop
      (names, fields, use_locking) = args
257 e566ddbd Iustin Pop
      logging.info("Received instance query request for %s", names)
258 77921a95 Iustin Pop
      if use_locking:
259 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
260 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
261 ec79568d Iustin Pop
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262 ec79568d Iustin Pop
                                    use_locking=use_locking)
263 ee6c7b94 Michael Hanselmann
      return self._Query(op)
264 ee6c7b94 Michael Hanselmann
265 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
266 ec79568d Iustin Pop
      (names, fields, use_locking) = args
267 e566ddbd Iustin Pop
      logging.info("Received node query request for %s", names)
268 77921a95 Iustin Pop
      if use_locking:
269 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
270 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
271 ec79568d Iustin Pop
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
272 ec79568d Iustin Pop
                                use_locking=use_locking)
273 02f7fe54 Michael Hanselmann
      return self._Query(op)
274 02f7fe54 Michael Hanselmann
275 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
276 ec79568d Iustin Pop
      nodes, use_locking = args
277 77921a95 Iustin Pop
      if use_locking:
278 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
279 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
280 e566ddbd Iustin Pop
      logging.info("Received exports query request")
281 ec79568d Iustin Pop
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
282 32f93223 Michael Hanselmann
      return self._Query(op)
283 32f93223 Michael Hanselmann
284 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
285 ae5849b5 Michael Hanselmann
      fields = args
286 e566ddbd Iustin Pop
      logging.info("Received config values query request for %s", fields)
287 ae5849b5 Michael Hanselmann
      op = opcodes.OpQueryConfigValues(output_fields=fields)
288 ae5849b5 Michael Hanselmann
      return self._Query(op)
289 ae5849b5 Michael Hanselmann
290 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
291 e566ddbd Iustin Pop
      logging.info("Received cluster info query request")
292 66baeccc Iustin Pop
      op = opcodes.OpQueryClusterInfo()
293 66baeccc Iustin Pop
      return self._Query(op)
294 66baeccc Iustin Pop
295 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
296 3ccafd0e Iustin Pop
      drain_flag = args
297 e566ddbd Iustin Pop
      logging.info("Received queue drain flag change request to %s",
298 e566ddbd Iustin Pop
                   drain_flag)
299 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
300 3ccafd0e Iustin Pop
301 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
302 05e50653 Michael Hanselmann
      (until, ) = args
303 05e50653 Michael Hanselmann
304 05e50653 Michael Hanselmann
      if until is None:
305 05e50653 Michael Hanselmann
        logging.info("Received request to no longer pause the watcher")
306 05e50653 Michael Hanselmann
      else:
307 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
308 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
309 05e50653 Michael Hanselmann
310 05e50653 Michael Hanselmann
        if until < time.time():
311 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
312 05e50653 Michael Hanselmann
313 05e50653 Michael Hanselmann
        logging.info("Received request to pause the watcher until %s", until)
314 05e50653 Michael Hanselmann
315 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
316 05e50653 Michael Hanselmann
317 0bbe448c Michael Hanselmann
    else:
318 e566ddbd Iustin Pop
      logging.info("Received invalid request '%s'", method)
319 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
320 ffeffa1d Iustin Pop
321 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
322 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
323 ee6c7b94 Michael Hanselmann
324 ee6c7b94 Michael Hanselmann
    """
325 adfa97e3 Guido Trotter
    # Queries don't have a job id
326 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
327 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
328 ee6c7b94 Michael Hanselmann
329 ffeffa1d Iustin Pop
330 39dcf2ef Guido Trotter
class GanetiContext(object):
331 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
332 39dcf2ef Guido Trotter
333 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
334 39dcf2ef Guido Trotter
335 39dcf2ef Guido Trotter
  """
336 39dcf2ef Guido Trotter
  _instance = None
337 39dcf2ef Guido Trotter
338 39dcf2ef Guido Trotter
  def __init__(self):
339 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
340 39dcf2ef Guido Trotter
341 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
342 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
343 39dcf2ef Guido Trotter
344 39dcf2ef Guido Trotter
    """
345 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
346 39dcf2ef Guido Trotter
347 9113300d Michael Hanselmann
    # Create global configuration object
348 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
349 9113300d Michael Hanselmann
350 9113300d Michael Hanselmann
    # Locking manager
351 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
352 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
353 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
354 39dcf2ef Guido Trotter
355 9113300d Michael Hanselmann
    # Job queue
356 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
357 9113300d Michael Hanselmann
358 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
359 39dcf2ef Guido Trotter
    self.__class__._instance = self
360 39dcf2ef Guido Trotter
361 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
362 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
363 39dcf2ef Guido Trotter
364 39dcf2ef Guido Trotter
    """
365 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
367 39dcf2ef Guido Trotter
368 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
369 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
370 d8470559 Michael Hanselmann
371 d8470559 Michael Hanselmann
    """
372 d8470559 Michael Hanselmann
    # Add it to the configuration
373 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
374 d8470559 Michael Hanselmann
375 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
376 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
377 c36176cc Michael Hanselmann
378 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
379 d8470559 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE, node.name)
380 d8470559 Michael Hanselmann
381 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
382 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
383 d8470559 Michael Hanselmann
384 d8470559 Michael Hanselmann
    """
385 c36176cc Michael Hanselmann
    # Synchronize the queue again
386 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
387 d8470559 Michael Hanselmann
388 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
389 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
390 d8470559 Michael Hanselmann
391 d8470559 Michael Hanselmann
    """
392 d8470559 Michael Hanselmann
    # Remove node from configuration
393 d8470559 Michael Hanselmann
    self.cfg.RemoveNode(name)
394 d8470559 Michael Hanselmann
395 c36176cc Michael Hanselmann
    # Notify job queue
396 c36176cc Michael Hanselmann
    self.jobqueue.RemoveNode(name)
397 c36176cc Michael Hanselmann
398 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
399 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
400 d8470559 Michael Hanselmann
401 39dcf2ef Guido Trotter
402 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
403 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
404 05e50653 Michael Hanselmann
405 05e50653 Michael Hanselmann
  @type until: None or int
406 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
407 05e50653 Michael Hanselmann
408 05e50653 Michael Hanselmann
  """
409 05e50653 Michael Hanselmann
  if until is None:
410 05e50653 Michael Hanselmann
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
411 05e50653 Michael Hanselmann
  else:
412 05e50653 Michael Hanselmann
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
413 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
414 05e50653 Michael Hanselmann
415 28b498cd Michael Hanselmann
  return until
416 28b498cd Michael Hanselmann
417 05e50653 Michael Hanselmann
418 36205981 Iustin Pop
def CheckAgreement():
419 36205981 Iustin Pop
  """Check the agreement on who is the master.
420 36205981 Iustin Pop
421 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
422 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
423 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
424 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
425 36205981 Iustin Pop
  obsolete) known nodes.
426 36205981 Iustin Pop
427 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
428 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
429 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
430 d7cdb55d Iustin Pop
  should be attempted.
431 d7cdb55d Iustin Pop
432 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
433 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
434 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
435 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
436 d7cdb55d Iustin Pop
437 36205981 Iustin Pop
  """
438 36205981 Iustin Pop
  myself = utils.HostInfo().name
439 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
440 36205981 Iustin Pop
  cfg = config.ConfigWriter()
441 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
442 36205981 Iustin Pop
  del cfg
443 d7cdb55d Iustin Pop
  retries = 6
444 d7cdb55d Iustin Pop
  while retries > 0:
445 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
446 d7cdb55d Iustin Pop
    if not votes:
447 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
448 d7cdb55d Iustin Pop
      return True
449 d7cdb55d Iustin Pop
    if votes[0][0] is None:
450 d7cdb55d Iustin Pop
      retries -= 1
451 d7cdb55d Iustin Pop
      time.sleep(10)
452 36205981 Iustin Pop
      continue
453 d7cdb55d Iustin Pop
    break
454 d7cdb55d Iustin Pop
  if retries == 0:
455 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
456 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
457 e09fdcfa Iustin Pop
    return False
458 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
459 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
460 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
461 8a20c732 Michael Hanselmann
462 d7cdb55d Iustin Pop
  result = False
463 d7cdb55d Iustin Pop
  if top_node != myself:
464 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
465 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
466 bbe19c17 Iustin Pop
                     all_votes)
467 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
468 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
469 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
470 d7cdb55d Iustin Pop
  else:
471 d7cdb55d Iustin Pop
    result = True
472 d7cdb55d Iustin Pop
473 d7cdb55d Iustin Pop
  return result
474 36205981 Iustin Pop
475 6c948699 Michael Hanselmann
476 ed0efaa5 Michael Hanselmann
def CheckAgreementWithRpc():
477 4331f6cd Michael Hanselmann
  rpc.Init()
478 4331f6cd Michael Hanselmann
  try:
479 ed0efaa5 Michael Hanselmann
    return CheckAgreement()
480 4331f6cd Michael Hanselmann
  finally:
481 4331f6cd Michael Hanselmann
    rpc.Shutdown()
482 ffeffa1d Iustin Pop
483 c1f2901b Iustin Pop
484 8a20c732 Michael Hanselmann
def _RunInSeparateProcess(fn):
485 8a20c732 Michael Hanselmann
  """Runs a function in a separate process.
486 8a20c732 Michael Hanselmann
487 8a20c732 Michael Hanselmann
  Note: Only boolean return values are supported.
488 8a20c732 Michael Hanselmann
489 8a20c732 Michael Hanselmann
  @type fn: callable
490 8a20c732 Michael Hanselmann
  @param fn: Function to be called
491 8a20c732 Michael Hanselmann
  @rtype: bool
492 8a20c732 Michael Hanselmann
493 8a20c732 Michael Hanselmann
  """
494 8a20c732 Michael Hanselmann
  pid = os.fork()
495 8a20c732 Michael Hanselmann
  if pid == 0:
496 8a20c732 Michael Hanselmann
    # Child process
497 8a20c732 Michael Hanselmann
    try:
498 8a20c732 Michael Hanselmann
      # Call function
499 8a20c732 Michael Hanselmann
      result = int(bool(fn()))
500 8a20c732 Michael Hanselmann
      assert result in (0, 1)
501 8a20c732 Michael Hanselmann
    except:
502 8a20c732 Michael Hanselmann
      logging.exception("Error while calling function in separate process")
503 8a20c732 Michael Hanselmann
      # 0 and 1 are reserved for the return value
504 8a20c732 Michael Hanselmann
      result = 33
505 8a20c732 Michael Hanselmann
506 8a20c732 Michael Hanselmann
    os._exit(result)
507 8a20c732 Michael Hanselmann
508 8a20c732 Michael Hanselmann
  # Parent process
509 8a20c732 Michael Hanselmann
510 8a20c732 Michael Hanselmann
  # Avoid zombies and check exit code
511 8a20c732 Michael Hanselmann
  (_, status) = os.waitpid(pid, 0)
512 8a20c732 Michael Hanselmann
513 8a20c732 Michael Hanselmann
  if os.WIFSIGNALED(status):
514 8a20c732 Michael Hanselmann
    signum = os.WTERMSIG(status)
515 8a20c732 Michael Hanselmann
    exitcode = None
516 8a20c732 Michael Hanselmann
  else:
517 8a20c732 Michael Hanselmann
    signum = None
518 8a20c732 Michael Hanselmann
    exitcode = os.WEXITSTATUS(status)
519 8a20c732 Michael Hanselmann
520 8a20c732 Michael Hanselmann
  if not (exitcode in (0, 1) and signum is None):
521 8a20c732 Michael Hanselmann
    logging.error("Child program failed (code=%s, signal=%s)",
522 8a20c732 Michael Hanselmann
                  exitcode, signum)
523 8a20c732 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
524 8a20c732 Michael Hanselmann
525 8a20c732 Michael Hanselmann
  return bool(exitcode)
526 8a20c732 Michael Hanselmann
527 8a20c732 Michael Hanselmann
528 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
529 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
530 ed0efaa5 Michael Hanselmann
531 ed0efaa5 Michael Hanselmann
  """
532 ed0efaa5 Michael Hanselmann
  ssconf.CheckMaster(options.debug)
533 ed0efaa5 Michael Hanselmann
534 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
535 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
536 ed0efaa5 Michael Hanselmann
  if options.no_voting:
537 ed0efaa5 Michael Hanselmann
    if options.yes_do_it:
538 ed0efaa5 Michael Hanselmann
      return
539 ed0efaa5 Michael Hanselmann
540 ed0efaa5 Michael Hanselmann
    sys.stdout.write("The 'no voting' option has been selected.\n")
541 ed0efaa5 Michael Hanselmann
    sys.stdout.write("This is dangerous, please confirm by"
542 ed0efaa5 Michael Hanselmann
                     " typing uppercase 'yes': ")
543 ed0efaa5 Michael Hanselmann
    sys.stdout.flush()
544 ed0efaa5 Michael Hanselmann
545 ed0efaa5 Michael Hanselmann
    confirmation = sys.stdin.readline().strip()
546 ed0efaa5 Michael Hanselmann
    if confirmation != "YES":
547 ed0efaa5 Michael Hanselmann
      print >>sys.stderr, "Aborting."
548 ed0efaa5 Michael Hanselmann
      sys.exit(constants.EXIT_FAILURE)
549 ed0efaa5 Michael Hanselmann
550 ed0efaa5 Michael Hanselmann
    return
551 ed0efaa5 Michael Hanselmann
552 ed0efaa5 Michael Hanselmann
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
553 ed0efaa5 Michael Hanselmann
  # process before we call utils.Daemonize in the current process.
554 ed0efaa5 Michael Hanselmann
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
555 ed0efaa5 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
556 ed0efaa5 Michael Hanselmann
557 ed0efaa5 Michael Hanselmann
558 6c948699 Michael Hanselmann
def ExecMasterd (options, args):
559 6c948699 Michael Hanselmann
  """Main master daemon function, executed with the PID file held.
560 3b316acb Iustin Pop
561 04ccf5e9 Guido Trotter
  """
562 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
563 04ccf5e9 Guido Trotter
  # concurrent execution.
564 04ccf5e9 Guido Trotter
  utils.RemoveFile(constants.MASTER_SOCKET)
565 b1b6ea87 Iustin Pop
566 04ccf5e9 Guido Trotter
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
567 04ccf5e9 Guido Trotter
  try:
568 15486fa7 Michael Hanselmann
    rpc.Init()
569 4331f6cd Michael Hanselmann
    try:
570 15486fa7 Michael Hanselmann
      # activate ip
571 b2890442 Guido Trotter
      master_node = ssconf.SimpleStore().GetMasterNode()
572 3583908a Guido Trotter
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
573 3cebe102 Michael Hanselmann
      msg = result.fail_msg
574 b726aff0 Iustin Pop
      if msg:
575 b726aff0 Iustin Pop
        logging.error("Can't activate master IP address: %s", msg)
576 15486fa7 Michael Hanselmann
577 15486fa7 Michael Hanselmann
      master.setup_queue()
578 15486fa7 Michael Hanselmann
      try:
579 15486fa7 Michael Hanselmann
        master.serve_forever()
580 15486fa7 Michael Hanselmann
      finally:
581 15486fa7 Michael Hanselmann
        master.server_cleanup()
582 4331f6cd Michael Hanselmann
    finally:
583 15486fa7 Michael Hanselmann
      rpc.Shutdown()
584 a4af651e Iustin Pop
  finally:
585 227647ac Guido Trotter
    utils.RemoveFile(constants.MASTER_SOCKET)
586 a4af651e Iustin Pop
587 ffeffa1d Iustin Pop
588 04ccf5e9 Guido Trotter
def main():
589 04ccf5e9 Guido Trotter
  """Main function"""
590 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
591 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
592 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
593 04ccf5e9 Guido Trotter
                        constants.RELEASE_VERSION)
594 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
595 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
596 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
597 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
598 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
599 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
600 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
601 04ccf5e9 Guido Trotter
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
602 04ccf5e9 Guido Trotter
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
603 04ccf5e9 Guido Trotter
         ]
604 04ccf5e9 Guido Trotter
  daemon.GenericMain(constants.MASTERD, parser, dirs,
605 6c948699 Michael Hanselmann
                     CheckMasterd, ExecMasterd)
606 6c948699 Michael Hanselmann
607 04ccf5e9 Guido Trotter
608 ffeffa1d Iustin Pop
if __name__ == "__main__":
609 ffeffa1d Iustin Pop
  main()