Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ f4ad2ef0

History | View | Annotate | Download (18.4 kB)

1 834f8b67 Iustin Pop
#!/usr/bin/python
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 7260cfbe Iustin Pop
# pylint: disable-msg=C0103
30 7260cfbe Iustin Pop
# C0103: Invalid name ganeti-masterd
31 ffeffa1d Iustin Pop
32 d823660a Guido Trotter
import os
33 c1f2901b Iustin Pop
import sys
34 ffeffa1d Iustin Pop
import SocketServer
35 ffeffa1d Iustin Pop
import time
36 ffeffa1d Iustin Pop
import collections
37 ffeffa1d Iustin Pop
import signal
38 96cb3986 Michael Hanselmann
import logging
39 ffeffa1d Iustin Pop
40 c1f2901b Iustin Pop
from optparse import OptionParser
41 ffeffa1d Iustin Pop
42 39dcf2ef Guido Trotter
from ganeti import config
43 ffeffa1d Iustin Pop
from ganeti import constants
44 04ccf5e9 Guido Trotter
from ganeti import daemon
45 ffeffa1d Iustin Pop
from ganeti import mcpu
46 ffeffa1d Iustin Pop
from ganeti import opcodes
47 ffeffa1d Iustin Pop
from ganeti import jqueue
48 39dcf2ef Guido Trotter
from ganeti import locking
49 ffeffa1d Iustin Pop
from ganeti import luxi
50 ffeffa1d Iustin Pop
from ganeti import utils
51 c1f2901b Iustin Pop
from ganeti import errors
52 c1f2901b Iustin Pop
from ganeti import ssconf
53 23e50d39 Michael Hanselmann
from ganeti import workerpool
54 b1b6ea87 Iustin Pop
from ganeti import rpc
55 d7cdb55d Iustin Pop
from ganeti import bootstrap
56 dd36d829 Iustin Pop
from ganeti import serializer
57 c1f2901b Iustin Pop
58 c1f2901b Iustin Pop
59 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
60 23e50d39 Michael Hanselmann
61 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63 ffeffa1d Iustin Pop
64 ffeffa1d Iustin Pop
65 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
66 7260cfbe Iustin Pop
   # pylint: disable-msg=W0221
67 23e50d39 Michael Hanselmann
  def RunTask(self, server, request, client_address):
68 23e50d39 Michael Hanselmann
    """Process the request.
69 23e50d39 Michael Hanselmann
70 23e50d39 Michael Hanselmann
    This is copied from the code in ThreadingMixIn.
71 23e50d39 Michael Hanselmann
72 23e50d39 Michael Hanselmann
    """
73 23e50d39 Michael Hanselmann
    try:
74 23e50d39 Michael Hanselmann
      server.finish_request(request, client_address)
75 23e50d39 Michael Hanselmann
      server.close_request(request)
76 7260cfbe Iustin Pop
    except: # pylint: disable-msg=W0702
77 23e50d39 Michael Hanselmann
      server.handle_error(request, client_address)
78 23e50d39 Michael Hanselmann
      server.close_request(request)
79 23e50d39 Michael Hanselmann
80 23e50d39 Michael Hanselmann
81 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
82 ffeffa1d Iustin Pop
  """IO thread class.
83 ffeffa1d Iustin Pop
84 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
85 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
86 ffeffa1d Iustin Pop
  cleanup at shutdown.
87 ffeffa1d Iustin Pop
88 ffeffa1d Iustin Pop
  """
89 9113300d Michael Hanselmann
  def __init__(self, address, rqhandler):
90 ce862cd5 Guido Trotter
    """IOServer constructor
91 ce862cd5 Guido Trotter
92 c41eea6e Iustin Pop
    @param address: the address to bind this IOServer to
93 c41eea6e Iustin Pop
    @param rqhandler: RequestHandler type object
94 ce862cd5 Guido Trotter
95 ce862cd5 Guido Trotter
    """
96 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97 50a3fbb2 Michael Hanselmann
98 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
99 9113300d Michael Hanselmann
    self.context = None
100 23e50d39 Michael Hanselmann
    self.request_workers = None
101 50a3fbb2 Michael Hanselmann
102 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
103 9113300d Michael Hanselmann
    self.context = GanetiContext()
104 23e50d39 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
106 ffeffa1d Iustin Pop
107 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
108 23e50d39 Michael Hanselmann
    """Add task to workerpool to process request.
109 ffeffa1d Iustin Pop
110 ffeffa1d Iustin Pop
    """
111 23e50d39 Michael Hanselmann
    self.request_workers.AddTask(self, request, client_address)
112 ffeffa1d Iustin Pop
113 6b5e5018 Guido Trotter
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114 7260cfbe Iustin Pop
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
116 6b5e5018 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
117 6b5e5018 Guido Trotter
           len(signal_handlers) > 0, \
118 6b5e5018 Guido Trotter
           "Broken SignalHandled decorator"
119 6b5e5018 Guido Trotter
    # Since we use SignalHandled only once, the resulting dict will map all
120 6b5e5018 Guido Trotter
    # signals to the same handler. We'll just use the first one.
121 6b5e5018 Guido Trotter
    sighandler = signal_handlers.values()[0]
122 6b5e5018 Guido Trotter
    while not sighandler.called:
123 6b5e5018 Guido Trotter
      self.handle_request()
124 c1f2901b Iustin Pop
125 c1f2901b Iustin Pop
  def server_cleanup(self):
126 c1f2901b Iustin Pop
    """Cleanup the server.
127 c1f2901b Iustin Pop
128 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
129 c1f2901b Iustin Pop
    socket.
130 c1f2901b Iustin Pop
131 c1f2901b Iustin Pop
    """
132 50a3fbb2 Michael Hanselmann
    try:
133 50a3fbb2 Michael Hanselmann
      self.server_close()
134 50a3fbb2 Michael Hanselmann
    finally:
135 23e50d39 Michael Hanselmann
      if self.request_workers:
136 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
137 9113300d Michael Hanselmann
      if self.context:
138 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
139 ffeffa1d Iustin Pop
140 ffeffa1d Iustin Pop
141 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
142 ffeffa1d Iustin Pop
  """Client handler"""
143 ffeffa1d Iustin Pop
  EOM = '\3'
144 ffeffa1d Iustin Pop
  READ_SIZE = 4096
145 ffeffa1d Iustin Pop
146 ffeffa1d Iustin Pop
  def setup(self):
147 7260cfbe Iustin Pop
    # pylint: disable-msg=W0201
148 7260cfbe Iustin Pop
    # setup() is the api for initialising for this class
149 ffeffa1d Iustin Pop
    self._buffer = ""
150 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
151 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
152 ffeffa1d Iustin Pop
153 ffeffa1d Iustin Pop
  def handle(self):
154 ffeffa1d Iustin Pop
    while True:
155 ffeffa1d Iustin Pop
      msg = self.read_message()
156 ffeffa1d Iustin Pop
      if msg is None:
157 d21d09d6 Iustin Pop
        logging.debug("client closed connection")
158 ffeffa1d Iustin Pop
        break
159 3d8548c4 Michael Hanselmann
160 dd36d829 Iustin Pop
      request = serializer.LoadJson(msg)
161 3d8548c4 Michael Hanselmann
      logging.debug("request: %s", request)
162 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
163 3d8548c4 Michael Hanselmann
        logging.error("wrong request received: %s", msg)
164 ffeffa1d Iustin Pop
        break
165 3d8548c4 Michael Hanselmann
166 3d8548c4 Michael Hanselmann
      method = request.get(luxi.KEY_METHOD, None)
167 3d8548c4 Michael Hanselmann
      args = request.get(luxi.KEY_ARGS, None)
168 3d8548c4 Michael Hanselmann
      if method is None or args is None:
169 3d8548c4 Michael Hanselmann
        logging.error("no method or args in request")
170 ffeffa1d Iustin Pop
        break
171 3d8548c4 Michael Hanselmann
172 3d8548c4 Michael Hanselmann
      success = False
173 3d8548c4 Michael Hanselmann
      try:
174 3d8548c4 Michael Hanselmann
        result = self._ops.handle_request(method, args)
175 3d8548c4 Michael Hanselmann
        success = True
176 6797ec29 Iustin Pop
      except errors.GenericError, err:
177 6797ec29 Iustin Pop
        success = False
178 6956e9cd Iustin Pop
        result = errors.EncodeException(err)
179 3d8548c4 Michael Hanselmann
      except:
180 3d8548c4 Michael Hanselmann
        logging.error("Unexpected exception", exc_info=True)
181 3d8548c4 Michael Hanselmann
        err = sys.exc_info()
182 3d8548c4 Michael Hanselmann
        result = "Caught exception: %s" % str(err[1])
183 3d8548c4 Michael Hanselmann
184 3d8548c4 Michael Hanselmann
      response = {
185 3d8548c4 Michael Hanselmann
        luxi.KEY_SUCCESS: success,
186 3d8548c4 Michael Hanselmann
        luxi.KEY_RESULT: result,
187 3d8548c4 Michael Hanselmann
        }
188 3d8548c4 Michael Hanselmann
      logging.debug("response: %s", response)
189 dd36d829 Iustin Pop
      self.send_message(serializer.DumpJson(response))
190 ffeffa1d Iustin Pop
191 ffeffa1d Iustin Pop
  def read_message(self):
192 ffeffa1d Iustin Pop
    while not self._msgs:
193 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
194 ffeffa1d Iustin Pop
      if not data:
195 ffeffa1d Iustin Pop
        return None
196 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
197 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
198 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
199 ffeffa1d Iustin Pop
    return self._msgs.popleft()
200 ffeffa1d Iustin Pop
201 ffeffa1d Iustin Pop
  def send_message(self, msg):
202 ffeffa1d Iustin Pop
    #print "sending", msg
203 6096ee13 Michael Hanselmann
    # TODO: sendall is not guaranteed to send everything
204 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
205 ffeffa1d Iustin Pop
206 ffeffa1d Iustin Pop
207 ffeffa1d Iustin Pop
class ClientOps:
208 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
209 ffeffa1d Iustin Pop
  def __init__(self, server):
210 ffeffa1d Iustin Pop
    self.server = server
211 ffeffa1d Iustin Pop
212 7260cfbe Iustin Pop
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
214 0bbe448c Michael Hanselmann
215 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
216 0bbe448c Michael Hanselmann
217 7260cfbe Iustin Pop
    # TODO: Rewrite to not exit in each 'if/elif' branch
218 7260cfbe Iustin Pop
219 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
220 e566ddbd Iustin Pop
      logging.info("Received new job")
221 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222 4c848b18 Michael Hanselmann
      return queue.SubmitJob(ops)
223 ffeffa1d Iustin Pop
224 2971c913 Iustin Pop
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
225 2971c913 Iustin Pop
      logging.info("Received multiple jobs")
226 2971c913 Iustin Pop
      jobs = []
227 2971c913 Iustin Pop
      for ops in args:
228 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229 2971c913 Iustin Pop
      return queue.SubmitManyJobs(jobs)
230 2971c913 Iustin Pop
231 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
232 3a2c7775 Michael Hanselmann
      job_id = args
233 e566ddbd Iustin Pop
      logging.info("Received job cancel request for %s", job_id)
234 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
235 ffeffa1d Iustin Pop
236 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
237 3a2c7775 Michael Hanselmann
      job_id = args
238 e566ddbd Iustin Pop
      logging.info("Received job archive request for %s", job_id)
239 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
240 0bbe448c Michael Hanselmann
241 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242 f8ad5591 Michael Hanselmann
      (age, timeout) = args
243 e566ddbd Iustin Pop
      logging.info("Received job autoarchive request for age %s, timeout %s",
244 e566ddbd Iustin Pop
                   age, timeout)
245 f8ad5591 Michael Hanselmann
      return queue.AutoArchiveJobs(age, timeout)
246 07cd723a Iustin Pop
247 dfe57c22 Michael Hanselmann
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248 5c735209 Iustin Pop
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249 e566ddbd Iustin Pop
      logging.info("Received job poll request for %s", job_id)
250 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251 5c735209 Iustin Pop
                                     prev_log_serial, timeout)
252 dfe57c22 Michael Hanselmann
253 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
254 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
255 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
256 1f864b60 Iustin Pop
        msg = utils.CommaJoin(job_ids)
257 e566ddbd Iustin Pop
      else:
258 e566ddbd Iustin Pop
        msg = str(job_ids)
259 e566ddbd Iustin Pop
      logging.info("Received job query request for %s", msg)
260 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
261 0bbe448c Michael Hanselmann
262 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
263 ec79568d Iustin Pop
      (names, fields, use_locking) = args
264 e566ddbd Iustin Pop
      logging.info("Received instance query request for %s", names)
265 77921a95 Iustin Pop
      if use_locking:
266 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
267 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
268 ec79568d Iustin Pop
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269 ec79568d Iustin Pop
                                    use_locking=use_locking)
270 ee6c7b94 Michael Hanselmann
      return self._Query(op)
271 ee6c7b94 Michael Hanselmann
272 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
273 ec79568d Iustin Pop
      (names, fields, use_locking) = args
274 e566ddbd Iustin Pop
      logging.info("Received node query request for %s", names)
275 77921a95 Iustin Pop
      if use_locking:
276 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
277 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
278 ec79568d Iustin Pop
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279 ec79568d Iustin Pop
                                use_locking=use_locking)
280 02f7fe54 Michael Hanselmann
      return self._Query(op)
281 02f7fe54 Michael Hanselmann
282 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
283 ec79568d Iustin Pop
      nodes, use_locking = args
284 77921a95 Iustin Pop
      if use_locking:
285 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
286 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
287 e566ddbd Iustin Pop
      logging.info("Received exports query request")
288 ec79568d Iustin Pop
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 32f93223 Michael Hanselmann
      return self._Query(op)
290 32f93223 Michael Hanselmann
291 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292 ae5849b5 Michael Hanselmann
      fields = args
293 e566ddbd Iustin Pop
      logging.info("Received config values query request for %s", fields)
294 ae5849b5 Michael Hanselmann
      op = opcodes.OpQueryConfigValues(output_fields=fields)
295 ae5849b5 Michael Hanselmann
      return self._Query(op)
296 ae5849b5 Michael Hanselmann
297 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298 e566ddbd Iustin Pop
      logging.info("Received cluster info query request")
299 66baeccc Iustin Pop
      op = opcodes.OpQueryClusterInfo()
300 66baeccc Iustin Pop
      return self._Query(op)
301 66baeccc Iustin Pop
302 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
303 3ccafd0e Iustin Pop
      drain_flag = args
304 e566ddbd Iustin Pop
      logging.info("Received queue drain flag change request to %s",
305 e566ddbd Iustin Pop
                   drain_flag)
306 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
307 3ccafd0e Iustin Pop
308 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
309 05e50653 Michael Hanselmann
      (until, ) = args
310 05e50653 Michael Hanselmann
311 05e50653 Michael Hanselmann
      if until is None:
312 05e50653 Michael Hanselmann
        logging.info("Received request to no longer pause the watcher")
313 05e50653 Michael Hanselmann
      else:
314 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
315 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
316 05e50653 Michael Hanselmann
317 05e50653 Michael Hanselmann
        if until < time.time():
318 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
319 05e50653 Michael Hanselmann
320 05e50653 Michael Hanselmann
        logging.info("Received request to pause the watcher until %s", until)
321 05e50653 Michael Hanselmann
322 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
323 05e50653 Michael Hanselmann
324 0bbe448c Michael Hanselmann
    else:
325 e566ddbd Iustin Pop
      logging.info("Received invalid request '%s'", method)
326 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
327 ffeffa1d Iustin Pop
328 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
329 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
330 ee6c7b94 Michael Hanselmann
331 ee6c7b94 Michael Hanselmann
    """
332 adfa97e3 Guido Trotter
    # Queries don't have a job id
333 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
334 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
335 ee6c7b94 Michael Hanselmann
336 ffeffa1d Iustin Pop
337 39dcf2ef Guido Trotter
class GanetiContext(object):
338 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
339 39dcf2ef Guido Trotter
340 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
341 39dcf2ef Guido Trotter
342 39dcf2ef Guido Trotter
  """
343 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
344 7260cfbe Iustin Pop
  # we do want to ensure a singleton here
345 39dcf2ef Guido Trotter
  _instance = None
346 39dcf2ef Guido Trotter
347 39dcf2ef Guido Trotter
  def __init__(self):
348 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
349 39dcf2ef Guido Trotter
350 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
351 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
352 39dcf2ef Guido Trotter
353 39dcf2ef Guido Trotter
    """
354 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
355 39dcf2ef Guido Trotter
356 9113300d Michael Hanselmann
    # Create global configuration object
357 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
358 9113300d Michael Hanselmann
359 9113300d Michael Hanselmann
    # Locking manager
360 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
361 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
362 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
363 39dcf2ef Guido Trotter
364 9113300d Michael Hanselmann
    # Job queue
365 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
366 9113300d Michael Hanselmann
367 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
368 39dcf2ef Guido Trotter
    self.__class__._instance = self
369 39dcf2ef Guido Trotter
370 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
371 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
372 39dcf2ef Guido Trotter
373 39dcf2ef Guido Trotter
    """
374 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
375 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
376 39dcf2ef Guido Trotter
377 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
378 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
379 d8470559 Michael Hanselmann
380 d8470559 Michael Hanselmann
    """
381 d8470559 Michael Hanselmann
    # Add it to the configuration
382 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
383 d8470559 Michael Hanselmann
384 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
385 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
386 c36176cc Michael Hanselmann
387 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
388 d8470559 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE, node.name)
389 d8470559 Michael Hanselmann
390 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
391 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
392 d8470559 Michael Hanselmann
393 d8470559 Michael Hanselmann
    """
394 c36176cc Michael Hanselmann
    # Synchronize the queue again
395 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
396 d8470559 Michael Hanselmann
397 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
398 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
399 d8470559 Michael Hanselmann
400 d8470559 Michael Hanselmann
    """
401 d8470559 Michael Hanselmann
    # Remove node from configuration
402 d8470559 Michael Hanselmann
    self.cfg.RemoveNode(name)
403 d8470559 Michael Hanselmann
404 c36176cc Michael Hanselmann
    # Notify job queue
405 c36176cc Michael Hanselmann
    self.jobqueue.RemoveNode(name)
406 c36176cc Michael Hanselmann
407 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
408 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
409 d8470559 Michael Hanselmann
410 39dcf2ef Guido Trotter
411 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
412 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
413 05e50653 Michael Hanselmann
414 05e50653 Michael Hanselmann
  @type until: None or int
415 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
416 05e50653 Michael Hanselmann
417 05e50653 Michael Hanselmann
  """
418 05e50653 Michael Hanselmann
  if until is None:
419 05e50653 Michael Hanselmann
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
420 05e50653 Michael Hanselmann
  else:
421 05e50653 Michael Hanselmann
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
422 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
423 05e50653 Michael Hanselmann
424 28b498cd Michael Hanselmann
  return until
425 28b498cd Michael Hanselmann
426 05e50653 Michael Hanselmann
427 36205981 Iustin Pop
def CheckAgreement():
428 36205981 Iustin Pop
  """Check the agreement on who is the master.
429 36205981 Iustin Pop
430 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
431 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
432 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
433 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
434 36205981 Iustin Pop
  obsolete) known nodes.
435 36205981 Iustin Pop
436 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
437 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
438 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
439 d7cdb55d Iustin Pop
  should be attempted.
440 d7cdb55d Iustin Pop
441 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
442 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
443 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
444 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
445 d7cdb55d Iustin Pop
446 36205981 Iustin Pop
  """
447 36205981 Iustin Pop
  myself = utils.HostInfo().name
448 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
449 36205981 Iustin Pop
  cfg = config.ConfigWriter()
450 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
451 36205981 Iustin Pop
  del cfg
452 d7cdb55d Iustin Pop
  retries = 6
453 d7cdb55d Iustin Pop
  while retries > 0:
454 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
455 d7cdb55d Iustin Pop
    if not votes:
456 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
457 d7cdb55d Iustin Pop
      return True
458 d7cdb55d Iustin Pop
    if votes[0][0] is None:
459 d7cdb55d Iustin Pop
      retries -= 1
460 d7cdb55d Iustin Pop
      time.sleep(10)
461 36205981 Iustin Pop
      continue
462 d7cdb55d Iustin Pop
    break
463 d7cdb55d Iustin Pop
  if retries == 0:
464 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
465 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
466 e09fdcfa Iustin Pop
    return False
467 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
468 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
469 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
470 8a20c732 Michael Hanselmann
471 d7cdb55d Iustin Pop
  result = False
472 d7cdb55d Iustin Pop
  if top_node != myself:
473 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
474 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
475 bbe19c17 Iustin Pop
                     all_votes)
476 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
477 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
478 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
479 d7cdb55d Iustin Pop
  else:
480 d7cdb55d Iustin Pop
    result = True
481 d7cdb55d Iustin Pop
482 d7cdb55d Iustin Pop
  return result
483 36205981 Iustin Pop
484 6c948699 Michael Hanselmann
485 ed0efaa5 Michael Hanselmann
def CheckAgreementWithRpc():
486 4331f6cd Michael Hanselmann
  rpc.Init()
487 4331f6cd Michael Hanselmann
  try:
488 ed0efaa5 Michael Hanselmann
    return CheckAgreement()
489 4331f6cd Michael Hanselmann
  finally:
490 4331f6cd Michael Hanselmann
    rpc.Shutdown()
491 ffeffa1d Iustin Pop
492 c1f2901b Iustin Pop
493 8a20c732 Michael Hanselmann
def _RunInSeparateProcess(fn):
494 8a20c732 Michael Hanselmann
  """Runs a function in a separate process.
495 8a20c732 Michael Hanselmann
496 8a20c732 Michael Hanselmann
  Note: Only boolean return values are supported.
497 8a20c732 Michael Hanselmann
498 8a20c732 Michael Hanselmann
  @type fn: callable
499 8a20c732 Michael Hanselmann
  @param fn: Function to be called
500 8a20c732 Michael Hanselmann
  @rtype: bool
501 8a20c732 Michael Hanselmann
502 8a20c732 Michael Hanselmann
  """
503 8a20c732 Michael Hanselmann
  pid = os.fork()
504 8a20c732 Michael Hanselmann
  if pid == 0:
505 8a20c732 Michael Hanselmann
    # Child process
506 8a20c732 Michael Hanselmann
    try:
507 8a20c732 Michael Hanselmann
      # Call function
508 8a20c732 Michael Hanselmann
      result = int(bool(fn()))
509 8a20c732 Michael Hanselmann
      assert result in (0, 1)
510 7260cfbe Iustin Pop
    except: # pylint: disable-msg=W0702
511 8a20c732 Michael Hanselmann
      logging.exception("Error while calling function in separate process")
512 8a20c732 Michael Hanselmann
      # 0 and 1 are reserved for the return value
513 8a20c732 Michael Hanselmann
      result = 33
514 8a20c732 Michael Hanselmann
515 7260cfbe Iustin Pop
    os._exit(result) # pylint: disable-msg=W0212
516 8a20c732 Michael Hanselmann
517 8a20c732 Michael Hanselmann
  # Parent process
518 8a20c732 Michael Hanselmann
519 8a20c732 Michael Hanselmann
  # Avoid zombies and check exit code
520 8a20c732 Michael Hanselmann
  (_, status) = os.waitpid(pid, 0)
521 8a20c732 Michael Hanselmann
522 8a20c732 Michael Hanselmann
  if os.WIFSIGNALED(status):
523 8a20c732 Michael Hanselmann
    signum = os.WTERMSIG(status)
524 8a20c732 Michael Hanselmann
    exitcode = None
525 8a20c732 Michael Hanselmann
  else:
526 8a20c732 Michael Hanselmann
    signum = None
527 8a20c732 Michael Hanselmann
    exitcode = os.WEXITSTATUS(status)
528 8a20c732 Michael Hanselmann
529 8a20c732 Michael Hanselmann
  if not (exitcode in (0, 1) and signum is None):
530 8a20c732 Michael Hanselmann
    logging.error("Child program failed (code=%s, signal=%s)",
531 8a20c732 Michael Hanselmann
                  exitcode, signum)
532 8a20c732 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
533 8a20c732 Michael Hanselmann
534 8a20c732 Michael Hanselmann
  return bool(exitcode)
535 8a20c732 Michael Hanselmann
536 8a20c732 Michael Hanselmann
537 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
538 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
539 ed0efaa5 Michael Hanselmann
540 ed0efaa5 Michael Hanselmann
  """
541 ed0efaa5 Michael Hanselmann
  ssconf.CheckMaster(options.debug)
542 ed0efaa5 Michael Hanselmann
543 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
544 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
545 ed0efaa5 Michael Hanselmann
  if options.no_voting:
546 ed0efaa5 Michael Hanselmann
    if options.yes_do_it:
547 ed0efaa5 Michael Hanselmann
      return
548 ed0efaa5 Michael Hanselmann
549 ed0efaa5 Michael Hanselmann
    sys.stdout.write("The 'no voting' option has been selected.\n")
550 ed0efaa5 Michael Hanselmann
    sys.stdout.write("This is dangerous, please confirm by"
551 ed0efaa5 Michael Hanselmann
                     " typing uppercase 'yes': ")
552 ed0efaa5 Michael Hanselmann
    sys.stdout.flush()
553 ed0efaa5 Michael Hanselmann
554 ed0efaa5 Michael Hanselmann
    confirmation = sys.stdin.readline().strip()
555 ed0efaa5 Michael Hanselmann
    if confirmation != "YES":
556 7260cfbe Iustin Pop
      print >> sys.stderr, "Aborting."
557 ed0efaa5 Michael Hanselmann
      sys.exit(constants.EXIT_FAILURE)
558 ed0efaa5 Michael Hanselmann
559 ed0efaa5 Michael Hanselmann
    return
560 ed0efaa5 Michael Hanselmann
561 ed0efaa5 Michael Hanselmann
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
562 ed0efaa5 Michael Hanselmann
  # process before we call utils.Daemonize in the current process.
563 ed0efaa5 Michael Hanselmann
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
564 ed0efaa5 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
565 ed0efaa5 Michael Hanselmann
566 ed0efaa5 Michael Hanselmann
567 6c948699 Michael Hanselmann
def ExecMasterd (options, args):
568 6c948699 Michael Hanselmann
  """Main master daemon function, executed with the PID file held.
569 3b316acb Iustin Pop
570 04ccf5e9 Guido Trotter
  """
571 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
572 04ccf5e9 Guido Trotter
  # concurrent execution.
573 04ccf5e9 Guido Trotter
  utils.RemoveFile(constants.MASTER_SOCKET)
574 b1b6ea87 Iustin Pop
575 04ccf5e9 Guido Trotter
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
576 04ccf5e9 Guido Trotter
  try:
577 15486fa7 Michael Hanselmann
    rpc.Init()
578 4331f6cd Michael Hanselmann
    try:
579 15486fa7 Michael Hanselmann
      # activate ip
580 b2890442 Guido Trotter
      master_node = ssconf.SimpleStore().GetMasterNode()
581 3583908a Guido Trotter
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
582 3cebe102 Michael Hanselmann
      msg = result.fail_msg
583 b726aff0 Iustin Pop
      if msg:
584 b726aff0 Iustin Pop
        logging.error("Can't activate master IP address: %s", msg)
585 15486fa7 Michael Hanselmann
586 15486fa7 Michael Hanselmann
      master.setup_queue()
587 15486fa7 Michael Hanselmann
      try:
588 15486fa7 Michael Hanselmann
        master.serve_forever()
589 15486fa7 Michael Hanselmann
      finally:
590 15486fa7 Michael Hanselmann
        master.server_cleanup()
591 4331f6cd Michael Hanselmann
    finally:
592 15486fa7 Michael Hanselmann
      rpc.Shutdown()
593 a4af651e Iustin Pop
  finally:
594 227647ac Guido Trotter
    utils.RemoveFile(constants.MASTER_SOCKET)
595 a4af651e Iustin Pop
596 ffeffa1d Iustin Pop
597 04ccf5e9 Guido Trotter
def main():
598 04ccf5e9 Guido Trotter
  """Main function"""
599 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
600 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
601 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
602 04ccf5e9 Guido Trotter
                        constants.RELEASE_VERSION)
603 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
604 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
605 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
606 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
607 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
608 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
609 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
610 04ccf5e9 Guido Trotter
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
611 04ccf5e9 Guido Trotter
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
612 04ccf5e9 Guido Trotter
         ]
613 04ccf5e9 Guido Trotter
  daemon.GenericMain(constants.MASTERD, parser, dirs,
614 6c948699 Michael Hanselmann
                     CheckMasterd, ExecMasterd)
615 6c948699 Michael Hanselmann
616 04ccf5e9 Guido Trotter
617 ffeffa1d Iustin Pop
if __name__ == "__main__":
618 ffeffa1d Iustin Pop
  main()