Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ d8f5a37d

History | View | Annotate | Download (18 kB)

1 834f8b67 Iustin Pop
#!/usr/bin/python
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 ffeffa1d Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop
24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop
27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 7260cfbe Iustin Pop
# pylint: disable-msg=C0103
30 7260cfbe Iustin Pop
# C0103: Invalid name ganeti-masterd
31 ffeffa1d Iustin Pop
32 c1f2901b Iustin Pop
import sys
33 ffeffa1d Iustin Pop
import SocketServer
34 ffeffa1d Iustin Pop
import time
35 ffeffa1d Iustin Pop
import collections
36 ffeffa1d Iustin Pop
import signal
37 96cb3986 Michael Hanselmann
import logging
38 ffeffa1d Iustin Pop
39 c1f2901b Iustin Pop
from optparse import OptionParser
40 ffeffa1d Iustin Pop
41 39dcf2ef Guido Trotter
from ganeti import config
42 ffeffa1d Iustin Pop
from ganeti import constants
43 04ccf5e9 Guido Trotter
from ganeti import daemon
44 ffeffa1d Iustin Pop
from ganeti import mcpu
45 ffeffa1d Iustin Pop
from ganeti import opcodes
46 ffeffa1d Iustin Pop
from ganeti import jqueue
47 39dcf2ef Guido Trotter
from ganeti import locking
48 ffeffa1d Iustin Pop
from ganeti import luxi
49 ffeffa1d Iustin Pop
from ganeti import utils
50 c1f2901b Iustin Pop
from ganeti import errors
51 c1f2901b Iustin Pop
from ganeti import ssconf
52 23e50d39 Michael Hanselmann
from ganeti import workerpool
53 b1b6ea87 Iustin Pop
from ganeti import rpc
54 d7cdb55d Iustin Pop
from ganeti import bootstrap
55 dd36d829 Iustin Pop
from ganeti import serializer
56 c1f2901b Iustin Pop
57 c1f2901b Iustin Pop
58 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
59 23e50d39 Michael Hanselmann
60 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62 ffeffa1d Iustin Pop
63 ffeffa1d Iustin Pop
64 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
65 7260cfbe Iustin Pop
   # pylint: disable-msg=W0221
66 23e50d39 Michael Hanselmann
  def RunTask(self, server, request, client_address):
67 23e50d39 Michael Hanselmann
    """Process the request.
68 23e50d39 Michael Hanselmann
69 23e50d39 Michael Hanselmann
    This is copied from the code in ThreadingMixIn.
70 23e50d39 Michael Hanselmann
71 23e50d39 Michael Hanselmann
    """
72 23e50d39 Michael Hanselmann
    try:
73 23e50d39 Michael Hanselmann
      server.finish_request(request, client_address)
74 23e50d39 Michael Hanselmann
      server.close_request(request)
75 7260cfbe Iustin Pop
    except: # pylint: disable-msg=W0702
76 23e50d39 Michael Hanselmann
      server.handle_error(request, client_address)
77 23e50d39 Michael Hanselmann
      server.close_request(request)
78 23e50d39 Michael Hanselmann
79 23e50d39 Michael Hanselmann
80 ffeffa1d Iustin Pop
class IOServer(SocketServer.UnixStreamServer):
81 ffeffa1d Iustin Pop
  """IO thread class.
82 ffeffa1d Iustin Pop
83 ffeffa1d Iustin Pop
  This class takes care of initializing the other threads, setting
84 ffeffa1d Iustin Pop
  signal handlers (which are processed only in this thread), and doing
85 ffeffa1d Iustin Pop
  cleanup at shutdown.
86 ffeffa1d Iustin Pop
87 ffeffa1d Iustin Pop
  """
88 9113300d Michael Hanselmann
  def __init__(self, address, rqhandler):
89 ce862cd5 Guido Trotter
    """IOServer constructor
90 ce862cd5 Guido Trotter
91 c41eea6e Iustin Pop
    @param address: the address to bind this IOServer to
92 c41eea6e Iustin Pop
    @param rqhandler: RequestHandler type object
93 ce862cd5 Guido Trotter
94 ce862cd5 Guido Trotter
    """
95 ffeffa1d Iustin Pop
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96 50a3fbb2 Michael Hanselmann
97 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
98 9113300d Michael Hanselmann
    self.context = None
99 23e50d39 Michael Hanselmann
    self.request_workers = None
100 50a3fbb2 Michael Hanselmann
101 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
102 9113300d Michael Hanselmann
    self.context = GanetiContext()
103 89e2b4d2 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool("ClientReq",
104 89e2b4d2 Michael Hanselmann
                                                 CLIENT_REQUEST_WORKERS,
105 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
106 ffeffa1d Iustin Pop
107 ffeffa1d Iustin Pop
  def process_request(self, request, client_address):
108 23e50d39 Michael Hanselmann
    """Add task to workerpool to process request.
109 ffeffa1d Iustin Pop
110 ffeffa1d Iustin Pop
    """
111 23e50d39 Michael Hanselmann
    self.request_workers.AddTask(self, request, client_address)
112 ffeffa1d Iustin Pop
113 6b5e5018 Guido Trotter
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114 7260cfbe Iustin Pop
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115 ffeffa1d Iustin Pop
    """Handle one request at a time until told to quit."""
116 6b5e5018 Guido Trotter
    assert isinstance(signal_handlers, dict) and \
117 6b5e5018 Guido Trotter
           len(signal_handlers) > 0, \
118 6b5e5018 Guido Trotter
           "Broken SignalHandled decorator"
119 6b5e5018 Guido Trotter
    # Since we use SignalHandled only once, the resulting dict will map all
120 6b5e5018 Guido Trotter
    # signals to the same handler. We'll just use the first one.
121 6b5e5018 Guido Trotter
    sighandler = signal_handlers.values()[0]
122 6b5e5018 Guido Trotter
    while not sighandler.called:
123 6b5e5018 Guido Trotter
      self.handle_request()
124 c1f2901b Iustin Pop
125 c1f2901b Iustin Pop
  def server_cleanup(self):
126 c1f2901b Iustin Pop
    """Cleanup the server.
127 c1f2901b Iustin Pop
128 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
129 c1f2901b Iustin Pop
    socket.
130 c1f2901b Iustin Pop
131 c1f2901b Iustin Pop
    """
132 50a3fbb2 Michael Hanselmann
    try:
133 50a3fbb2 Michael Hanselmann
      self.server_close()
134 50a3fbb2 Michael Hanselmann
    finally:
135 23e50d39 Michael Hanselmann
      if self.request_workers:
136 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
137 9113300d Michael Hanselmann
      if self.context:
138 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
139 ffeffa1d Iustin Pop
140 ffeffa1d Iustin Pop
141 ffeffa1d Iustin Pop
class ClientRqHandler(SocketServer.BaseRequestHandler):
142 ffeffa1d Iustin Pop
  """Client handler"""
143 ffeffa1d Iustin Pop
  EOM = '\3'
144 ffeffa1d Iustin Pop
  READ_SIZE = 4096
145 ffeffa1d Iustin Pop
146 ffeffa1d Iustin Pop
  def setup(self):
147 7260cfbe Iustin Pop
    # pylint: disable-msg=W0201
148 7260cfbe Iustin Pop
    # setup() is the api for initialising for this class
149 ffeffa1d Iustin Pop
    self._buffer = ""
150 ffeffa1d Iustin Pop
    self._msgs = collections.deque()
151 ffeffa1d Iustin Pop
    self._ops = ClientOps(self.server)
152 ffeffa1d Iustin Pop
153 ffeffa1d Iustin Pop
  def handle(self):
154 ffeffa1d Iustin Pop
    while True:
155 ffeffa1d Iustin Pop
      msg = self.read_message()
156 ffeffa1d Iustin Pop
      if msg is None:
157 d21d09d6 Iustin Pop
        logging.debug("client closed connection")
158 ffeffa1d Iustin Pop
        break
159 3d8548c4 Michael Hanselmann
160 dd36d829 Iustin Pop
      request = serializer.LoadJson(msg)
161 3d8548c4 Michael Hanselmann
      logging.debug("request: %s", request)
162 ffeffa1d Iustin Pop
      if not isinstance(request, dict):
163 3d8548c4 Michael Hanselmann
        logging.error("wrong request received: %s", msg)
164 ffeffa1d Iustin Pop
        break
165 3d8548c4 Michael Hanselmann
166 3d8548c4 Michael Hanselmann
      method = request.get(luxi.KEY_METHOD, None)
167 3d8548c4 Michael Hanselmann
      args = request.get(luxi.KEY_ARGS, None)
168 3d8548c4 Michael Hanselmann
      if method is None or args is None:
169 3d8548c4 Michael Hanselmann
        logging.error("no method or args in request")
170 ffeffa1d Iustin Pop
        break
171 3d8548c4 Michael Hanselmann
172 3d8548c4 Michael Hanselmann
      success = False
173 3d8548c4 Michael Hanselmann
      try:
174 3d8548c4 Michael Hanselmann
        result = self._ops.handle_request(method, args)
175 3d8548c4 Michael Hanselmann
        success = True
176 6797ec29 Iustin Pop
      except errors.GenericError, err:
177 6797ec29 Iustin Pop
        success = False
178 6956e9cd Iustin Pop
        result = errors.EncodeException(err)
179 3d8548c4 Michael Hanselmann
      except:
180 3d8548c4 Michael Hanselmann
        logging.error("Unexpected exception", exc_info=True)
181 3d8548c4 Michael Hanselmann
        err = sys.exc_info()
182 3d8548c4 Michael Hanselmann
        result = "Caught exception: %s" % str(err[1])
183 3d8548c4 Michael Hanselmann
184 3d8548c4 Michael Hanselmann
      response = {
185 3d8548c4 Michael Hanselmann
        luxi.KEY_SUCCESS: success,
186 3d8548c4 Michael Hanselmann
        luxi.KEY_RESULT: result,
187 3d8548c4 Michael Hanselmann
        }
188 3d8548c4 Michael Hanselmann
      logging.debug("response: %s", response)
189 dd36d829 Iustin Pop
      self.send_message(serializer.DumpJson(response))
190 ffeffa1d Iustin Pop
191 ffeffa1d Iustin Pop
  def read_message(self):
192 ffeffa1d Iustin Pop
    while not self._msgs:
193 ffeffa1d Iustin Pop
      data = self.request.recv(self.READ_SIZE)
194 ffeffa1d Iustin Pop
      if not data:
195 ffeffa1d Iustin Pop
        return None
196 ffeffa1d Iustin Pop
      new_msgs = (self._buffer + data).split(self.EOM)
197 ffeffa1d Iustin Pop
      self._buffer = new_msgs.pop()
198 ffeffa1d Iustin Pop
      self._msgs.extend(new_msgs)
199 ffeffa1d Iustin Pop
    return self._msgs.popleft()
200 ffeffa1d Iustin Pop
201 ffeffa1d Iustin Pop
  def send_message(self, msg):
202 ffeffa1d Iustin Pop
    #print "sending", msg
203 6096ee13 Michael Hanselmann
    # TODO: sendall is not guaranteed to send everything
204 ffeffa1d Iustin Pop
    self.request.sendall(msg + self.EOM)
205 ffeffa1d Iustin Pop
206 ffeffa1d Iustin Pop
207 ffeffa1d Iustin Pop
class ClientOps:
208 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
209 ffeffa1d Iustin Pop
  def __init__(self, server):
210 ffeffa1d Iustin Pop
    self.server = server
211 ffeffa1d Iustin Pop
212 7260cfbe Iustin Pop
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
214 0bbe448c Michael Hanselmann
215 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
216 0bbe448c Michael Hanselmann
217 7260cfbe Iustin Pop
    # TODO: Rewrite to not exit in each 'if/elif' branch
218 7260cfbe Iustin Pop
219 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
220 e566ddbd Iustin Pop
      logging.info("Received new job")
221 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222 4c848b18 Michael Hanselmann
      return queue.SubmitJob(ops)
223 ffeffa1d Iustin Pop
224 2971c913 Iustin Pop
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
225 2971c913 Iustin Pop
      logging.info("Received multiple jobs")
226 2971c913 Iustin Pop
      jobs = []
227 2971c913 Iustin Pop
      for ops in args:
228 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229 2971c913 Iustin Pop
      return queue.SubmitManyJobs(jobs)
230 2971c913 Iustin Pop
231 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
232 3a2c7775 Michael Hanselmann
      job_id = args
233 e566ddbd Iustin Pop
      logging.info("Received job cancel request for %s", job_id)
234 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
235 ffeffa1d Iustin Pop
236 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
237 3a2c7775 Michael Hanselmann
      job_id = args
238 e566ddbd Iustin Pop
      logging.info("Received job archive request for %s", job_id)
239 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
240 0bbe448c Michael Hanselmann
241 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242 f8ad5591 Michael Hanselmann
      (age, timeout) = args
243 e566ddbd Iustin Pop
      logging.info("Received job autoarchive request for age %s, timeout %s",
244 e566ddbd Iustin Pop
                   age, timeout)
245 f8ad5591 Michael Hanselmann
      return queue.AutoArchiveJobs(age, timeout)
246 07cd723a Iustin Pop
247 dfe57c22 Michael Hanselmann
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248 5c735209 Iustin Pop
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249 e566ddbd Iustin Pop
      logging.info("Received job poll request for %s", job_id)
250 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251 5c735209 Iustin Pop
                                     prev_log_serial, timeout)
252 dfe57c22 Michael Hanselmann
253 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
254 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
255 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
256 1f864b60 Iustin Pop
        msg = utils.CommaJoin(job_ids)
257 e566ddbd Iustin Pop
      else:
258 e566ddbd Iustin Pop
        msg = str(job_ids)
259 e566ddbd Iustin Pop
      logging.info("Received job query request for %s", msg)
260 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
261 0bbe448c Michael Hanselmann
262 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
263 ec79568d Iustin Pop
      (names, fields, use_locking) = args
264 e566ddbd Iustin Pop
      logging.info("Received instance query request for %s", names)
265 77921a95 Iustin Pop
      if use_locking:
266 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
267 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
268 ec79568d Iustin Pop
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269 ec79568d Iustin Pop
                                    use_locking=use_locking)
270 ee6c7b94 Michael Hanselmann
      return self._Query(op)
271 ee6c7b94 Michael Hanselmann
272 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
273 ec79568d Iustin Pop
      (names, fields, use_locking) = args
274 e566ddbd Iustin Pop
      logging.info("Received node query request for %s", names)
275 77921a95 Iustin Pop
      if use_locking:
276 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
277 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
278 ec79568d Iustin Pop
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279 ec79568d Iustin Pop
                                use_locking=use_locking)
280 02f7fe54 Michael Hanselmann
      return self._Query(op)
281 02f7fe54 Michael Hanselmann
282 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
283 ec79568d Iustin Pop
      nodes, use_locking = args
284 77921a95 Iustin Pop
      if use_locking:
285 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
286 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
287 e566ddbd Iustin Pop
      logging.info("Received exports query request")
288 ec79568d Iustin Pop
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289 32f93223 Michael Hanselmann
      return self._Query(op)
290 32f93223 Michael Hanselmann
291 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292 ae5849b5 Michael Hanselmann
      fields = args
293 e566ddbd Iustin Pop
      logging.info("Received config values query request for %s", fields)
294 ae5849b5 Michael Hanselmann
      op = opcodes.OpQueryConfigValues(output_fields=fields)
295 ae5849b5 Michael Hanselmann
      return self._Query(op)
296 ae5849b5 Michael Hanselmann
297 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298 e566ddbd Iustin Pop
      logging.info("Received cluster info query request")
299 66baeccc Iustin Pop
      op = opcodes.OpQueryClusterInfo()
300 66baeccc Iustin Pop
      return self._Query(op)
301 66baeccc Iustin Pop
302 7699c3af Iustin Pop
    elif method == luxi.REQ_QUERY_TAGS:
303 7699c3af Iustin Pop
      kind, name = args
304 7699c3af Iustin Pop
      logging.info("Received tags query request")
305 7699c3af Iustin Pop
      op = opcodes.OpGetTags(kind=kind, name=name)
306 7699c3af Iustin Pop
      return self._Query(op)
307 7699c3af Iustin Pop
308 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
309 3ccafd0e Iustin Pop
      drain_flag = args
310 e566ddbd Iustin Pop
      logging.info("Received queue drain flag change request to %s",
311 e566ddbd Iustin Pop
                   drain_flag)
312 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
313 3ccafd0e Iustin Pop
314 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
315 05e50653 Michael Hanselmann
      (until, ) = args
316 05e50653 Michael Hanselmann
317 05e50653 Michael Hanselmann
      if until is None:
318 05e50653 Michael Hanselmann
        logging.info("Received request to no longer pause the watcher")
319 05e50653 Michael Hanselmann
      else:
320 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
321 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
322 05e50653 Michael Hanselmann
323 05e50653 Michael Hanselmann
        if until < time.time():
324 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
325 05e50653 Michael Hanselmann
326 05e50653 Michael Hanselmann
        logging.info("Received request to pause the watcher until %s", until)
327 05e50653 Michael Hanselmann
328 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
329 05e50653 Michael Hanselmann
330 0bbe448c Michael Hanselmann
    else:
331 e566ddbd Iustin Pop
      logging.info("Received invalid request '%s'", method)
332 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
333 ffeffa1d Iustin Pop
334 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
335 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
336 ee6c7b94 Michael Hanselmann
337 ee6c7b94 Michael Hanselmann
    """
338 adfa97e3 Guido Trotter
    # Queries don't have a job id
339 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
340 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
341 ee6c7b94 Michael Hanselmann
342 ffeffa1d Iustin Pop
343 39dcf2ef Guido Trotter
class GanetiContext(object):
344 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
345 39dcf2ef Guido Trotter
346 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
347 39dcf2ef Guido Trotter
348 39dcf2ef Guido Trotter
  """
349 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
350 7260cfbe Iustin Pop
  # we do want to ensure a singleton here
351 39dcf2ef Guido Trotter
  _instance = None
352 39dcf2ef Guido Trotter
353 39dcf2ef Guido Trotter
  def __init__(self):
354 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
355 39dcf2ef Guido Trotter
356 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
357 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
358 39dcf2ef Guido Trotter
359 39dcf2ef Guido Trotter
    """
360 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
361 39dcf2ef Guido Trotter
362 9113300d Michael Hanselmann
    # Create global configuration object
363 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
364 9113300d Michael Hanselmann
365 9113300d Michael Hanselmann
    # Locking manager
366 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
367 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
368 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
369 39dcf2ef Guido Trotter
370 9113300d Michael Hanselmann
    # Job queue
371 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
372 9113300d Michael Hanselmann
373 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
374 39dcf2ef Guido Trotter
    self.__class__._instance = self
375 39dcf2ef Guido Trotter
376 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
377 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
378 39dcf2ef Guido Trotter
379 39dcf2ef Guido Trotter
    """
380 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
381 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
382 39dcf2ef Guido Trotter
383 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
384 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
385 d8470559 Michael Hanselmann
386 d8470559 Michael Hanselmann
    """
387 d8470559 Michael Hanselmann
    # Add it to the configuration
388 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
389 d8470559 Michael Hanselmann
390 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
391 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
392 c36176cc Michael Hanselmann
393 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
394 d8470559 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE, node.name)
395 d8470559 Michael Hanselmann
396 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
397 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
398 d8470559 Michael Hanselmann
399 d8470559 Michael Hanselmann
    """
400 c36176cc Michael Hanselmann
    # Synchronize the queue again
401 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
402 d8470559 Michael Hanselmann
403 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
404 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
405 d8470559 Michael Hanselmann
406 d8470559 Michael Hanselmann
    """
407 d8470559 Michael Hanselmann
    # Remove node from configuration
408 d8470559 Michael Hanselmann
    self.cfg.RemoveNode(name)
409 d8470559 Michael Hanselmann
410 c36176cc Michael Hanselmann
    # Notify job queue
411 c36176cc Michael Hanselmann
    self.jobqueue.RemoveNode(name)
412 c36176cc Michael Hanselmann
413 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
414 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
415 d8470559 Michael Hanselmann
416 39dcf2ef Guido Trotter
417 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
418 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
419 05e50653 Michael Hanselmann
420 05e50653 Michael Hanselmann
  @type until: None or int
421 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
422 05e50653 Michael Hanselmann
423 05e50653 Michael Hanselmann
  """
424 05e50653 Michael Hanselmann
  if until is None:
425 05e50653 Michael Hanselmann
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
426 05e50653 Michael Hanselmann
  else:
427 05e50653 Michael Hanselmann
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
428 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
429 05e50653 Michael Hanselmann
430 28b498cd Michael Hanselmann
  return until
431 28b498cd Michael Hanselmann
432 05e50653 Michael Hanselmann
433 36205981 Iustin Pop
def CheckAgreement():
434 36205981 Iustin Pop
  """Check the agreement on who is the master.
435 36205981 Iustin Pop
436 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
437 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
438 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
439 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
440 36205981 Iustin Pop
  obsolete) known nodes.
441 36205981 Iustin Pop
442 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
443 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
444 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
445 d7cdb55d Iustin Pop
  should be attempted.
446 d7cdb55d Iustin Pop
447 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
448 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
449 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
450 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
451 d7cdb55d Iustin Pop
452 36205981 Iustin Pop
  """
453 36205981 Iustin Pop
  myself = utils.HostInfo().name
454 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
455 36205981 Iustin Pop
  cfg = config.ConfigWriter()
456 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
457 36205981 Iustin Pop
  del cfg
458 d7cdb55d Iustin Pop
  retries = 6
459 d7cdb55d Iustin Pop
  while retries > 0:
460 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
461 d7cdb55d Iustin Pop
    if not votes:
462 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
463 d7cdb55d Iustin Pop
      return True
464 d7cdb55d Iustin Pop
    if votes[0][0] is None:
465 d7cdb55d Iustin Pop
      retries -= 1
466 d7cdb55d Iustin Pop
      time.sleep(10)
467 36205981 Iustin Pop
      continue
468 d7cdb55d Iustin Pop
    break
469 d7cdb55d Iustin Pop
  if retries == 0:
470 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
471 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
472 d8f5a37d Iustin Pop
    logging.critical("Use the --no-voting option if you understand what"
473 d8f5a37d Iustin Pop
                     " effects it has on the cluster state")
474 e09fdcfa Iustin Pop
    return False
475 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
476 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
477 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
478 8a20c732 Michael Hanselmann
479 d7cdb55d Iustin Pop
  result = False
480 d7cdb55d Iustin Pop
  if top_node != myself:
481 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
482 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
483 bbe19c17 Iustin Pop
                     all_votes)
484 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
485 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
486 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
487 d7cdb55d Iustin Pop
  else:
488 d7cdb55d Iustin Pop
    result = True
489 d7cdb55d Iustin Pop
490 d7cdb55d Iustin Pop
  return result
491 36205981 Iustin Pop
492 6c948699 Michael Hanselmann
493 ed0efaa5 Michael Hanselmann
def CheckAgreementWithRpc():
494 4331f6cd Michael Hanselmann
  rpc.Init()
495 4331f6cd Michael Hanselmann
  try:
496 ed0efaa5 Michael Hanselmann
    return CheckAgreement()
497 4331f6cd Michael Hanselmann
  finally:
498 4331f6cd Michael Hanselmann
    rpc.Shutdown()
499 ffeffa1d Iustin Pop
500 c1f2901b Iustin Pop
501 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
502 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
503 ed0efaa5 Michael Hanselmann
504 ed0efaa5 Michael Hanselmann
  """
505 f93427cd Iustin Pop
  if args: # masterd doesn't take any arguments
506 f93427cd Iustin Pop
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
507 f93427cd Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
508 f93427cd Iustin Pop
509 ed0efaa5 Michael Hanselmann
  ssconf.CheckMaster(options.debug)
510 ed0efaa5 Michael Hanselmann
511 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
512 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
513 ed0efaa5 Michael Hanselmann
  if options.no_voting:
514 ed0efaa5 Michael Hanselmann
    if options.yes_do_it:
515 ed0efaa5 Michael Hanselmann
      return
516 ed0efaa5 Michael Hanselmann
517 ed0efaa5 Michael Hanselmann
    sys.stdout.write("The 'no voting' option has been selected.\n")
518 ed0efaa5 Michael Hanselmann
    sys.stdout.write("This is dangerous, please confirm by"
519 ed0efaa5 Michael Hanselmann
                     " typing uppercase 'yes': ")
520 ed0efaa5 Michael Hanselmann
    sys.stdout.flush()
521 ed0efaa5 Michael Hanselmann
522 ed0efaa5 Michael Hanselmann
    confirmation = sys.stdin.readline().strip()
523 ed0efaa5 Michael Hanselmann
    if confirmation != "YES":
524 7260cfbe Iustin Pop
      print >> sys.stderr, "Aborting."
525 ed0efaa5 Michael Hanselmann
      sys.exit(constants.EXIT_FAILURE)
526 ed0efaa5 Michael Hanselmann
527 ed0efaa5 Michael Hanselmann
    return
528 ed0efaa5 Michael Hanselmann
529 ed0efaa5 Michael Hanselmann
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
530 ed0efaa5 Michael Hanselmann
  # process before we call utils.Daemonize in the current process.
531 eb58f7bd Michael Hanselmann
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
532 ed0efaa5 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
533 ed0efaa5 Michael Hanselmann
534 ed0efaa5 Michael Hanselmann
535 2d54e29c Iustin Pop
def ExecMasterd (options, args): # pylint: disable-msg=W0613
536 6c948699 Michael Hanselmann
  """Main master daemon function, executed with the PID file held.
537 3b316acb Iustin Pop
538 04ccf5e9 Guido Trotter
  """
539 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
540 04ccf5e9 Guido Trotter
  # concurrent execution.
541 04ccf5e9 Guido Trotter
  utils.RemoveFile(constants.MASTER_SOCKET)
542 b1b6ea87 Iustin Pop
543 04ccf5e9 Guido Trotter
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
544 04ccf5e9 Guido Trotter
  try:
545 15486fa7 Michael Hanselmann
    rpc.Init()
546 4331f6cd Michael Hanselmann
    try:
547 15486fa7 Michael Hanselmann
      # activate ip
548 b2890442 Guido Trotter
      master_node = ssconf.SimpleStore().GetMasterNode()
549 3583908a Guido Trotter
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
550 3cebe102 Michael Hanselmann
      msg = result.fail_msg
551 b726aff0 Iustin Pop
      if msg:
552 b726aff0 Iustin Pop
        logging.error("Can't activate master IP address: %s", msg)
553 15486fa7 Michael Hanselmann
554 15486fa7 Michael Hanselmann
      master.setup_queue()
555 15486fa7 Michael Hanselmann
      try:
556 15486fa7 Michael Hanselmann
        master.serve_forever()
557 15486fa7 Michael Hanselmann
      finally:
558 15486fa7 Michael Hanselmann
        master.server_cleanup()
559 4331f6cd Michael Hanselmann
    finally:
560 15486fa7 Michael Hanselmann
      rpc.Shutdown()
561 a4af651e Iustin Pop
  finally:
562 227647ac Guido Trotter
    utils.RemoveFile(constants.MASTER_SOCKET)
563 a4af651e Iustin Pop
564 ffeffa1d Iustin Pop
565 04ccf5e9 Guido Trotter
def main():
566 04ccf5e9 Guido Trotter
  """Main function"""
567 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
568 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
569 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
570 04ccf5e9 Guido Trotter
                        constants.RELEASE_VERSION)
571 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
572 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
573 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
574 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
575 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
576 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
577 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
578 04ccf5e9 Guido Trotter
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
579 04ccf5e9 Guido Trotter
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
580 04ccf5e9 Guido Trotter
         ]
581 04ccf5e9 Guido Trotter
  daemon.GenericMain(constants.MASTERD, parser, dirs,
582 6c948699 Michael Hanselmann
                     CheckMasterd, ExecMasterd)
583 6c948699 Michael Hanselmann
584 04ccf5e9 Guido Trotter
585 ffeffa1d Iustin Pop
if __name__ == "__main__":
586 ffeffa1d Iustin Pop
  main()