"""Master daemon program.
2 ffeffa1d Iustin Pop
3 ffeffa1d Iustin Pop
4 f2af0bec Iustin Pop
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 ffeffa1d Iustin Pop
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop

24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop

27 ffeffa1d Iustin Pop
28 ffeffa1d Iustin Pop
29 b459a848 Andrea Spadaccini
# pylint: disable=C0103
30 7260cfbe Iustin Pop
# C0103: Invalid name ganeti-masterd
31 ffeffa1d Iustin Pop
import grp
import grp
import os
import os
import pwd
import pwd
import sys
import sys
import socket
import socket
import time
import time
import tempfile
import tempfile
import logging
import logging
40 ffeffa1d Iustin Pop
from optparse import OptionParser
from optparse import OptionParser
42 ffeffa1d Iustin Pop
from ganeti import config
from ganeti import config
from ganeti import constants
from ganeti import constants
from ganeti import daemon
from ganeti import daemon
from ganeti import mcpu
from ganeti import mcpu
from ganeti import opcodes
from ganeti import opcodes
from ganeti import jqueue
from ganeti import jqueue
from ganeti import locking
from ganeti import locking
from ganeti import luxi
from ganeti import luxi
from ganeti import utils
from ganeti import utils
from ganeti import errors
from ganeti import errors
from ganeti import ssconf
from ganeti import ssconf
from ganeti import workerpool
from ganeti import workerpool
from ganeti import rpc
from ganeti import rpc
from ganeti import bootstrap
from ganeti import bootstrap
from ganeti import netutils
from ganeti import netutils
from ganeti import objects
from ganeti import objects
from ganeti import query
from ganeti import query
60 c1f2901b Iustin Pop
61 c1f2901b Iustin Pop
62 23e50d39 Michael Hanselmann
63 23e50d39 Michael Hanselmann
64 c1f2901b Iustin Pop
65 c1f2901b Iustin Pop
66 ffeffa1d Iustin Pop
67 ffeffa1d Iustin Pop
68 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
# pylint: disable=W0221
  # pylint: disable=W0221
70 7e5a6e86 Guido Trotter
  def RunTask(self, server, message, client):
"""Process the request.

    """
    """Process the request.
72 23e50d39 Michael Hanselmann

73 23e50d39 Michael Hanselmann
client_ops = ClientOps(server)
    client_ops = ClientOps(server)
75 7e5a6e86 Guido Trotter
76 23e50d39 Michael Hanselmann
77 e986f20c Michael Hanselmann
      (method, args, version) = luxi.ParseRequest(message)
except luxi.ProtocolError, err:
    except luxi.ProtocolError, err:
79 7e5a6e86 Guido Trotter
      logging.error("Protocol Error: %s", err)
80 7e5a6e86 Guido Trotter
81 7e5a6e86 Guido Trotter
82 7e5a6e86 Guido Trotter
success = False
    success = False
84 7e5a6e86 Guido Trotter
85 e986f20c Michael Hanselmann
      # Verify client's version if there was one in the request
86 e986f20c Michael Hanselmann
      if version is not None and version != constants.LUXI_VERSION:
87 e986f20c Michael Hanselmann
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
(constants.LUXI_VERSION, version))
                               (constants.LUXI_VERSION, version))
89 e986f20c Michael Hanselmann
90 7e5a6e86 Guido Trotter
      result = client_ops.handle_request(method, args)
success = True
      success = True
except errors.GenericError, err:
    except errors.GenericError, err:
93 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
success = False
      success = False
result = errors.EncodeException(err)
      result = errors.EncodeException(err)
96 7e5a6e86 Guido Trotter
97 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
err = sys.exc_info()
      err = sys.exc_info()
99 7e5a6e86 Guido Trotter
      result = "Caught exception: %s" % str(err[1])
100 7e5a6e86 Guido Trotter
101 7e5a6e86 Guido Trotter
102 7e5a6e86 Guido Trotter
      reply = luxi.FormatResponse(success, result)
103 7e5a6e86 Guido Trotter
104 7e5a6e86 Guido Trotter
      # awake the main thread so that it can write out the data.
105 7e5a6e86 Guido Trotter
except: # pylint: disable=W0702
    except: # pylint: disable=W0702
logging.exception("Send error")
      logging.exception("Send error")
108 7e5a6e86 Guido Trotter
109 7e5a6e86 Guido Trotter
110 7e5a6e86 Guido Trotter
111 7e5a6e86 Guido Trotter
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
"""Handler for master peers.

  """
  """Handler for master peers.
113 7e5a6e86 Guido Trotter

114 7e5a6e86 Guido Trotter
115 7e5a6e86 Guido Trotter
_MAX_UNHANDLED = 1
117 7e5a6e86 Guido Trotter
  def __init__(self, server, connected_socket, client_address, family):
118 7e5a6e86 Guido Trotter
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119 7e5a6e86 Guido Trotter
120 7e5a6e86 Guido Trotter
family, self._MAX_UNHANDLED)
                                                 family, self._MAX_UNHANDLED)
self.server = server
    self.server = server
123 7e5a6e86 Guido Trotter
def handle_message(self, message, _):
  def handle_message(self, message, _):
125 b2e8a4d9 Michael Hanselmann
    self.server.request_workers.AddTask((self.server, message, self))
126 23e50d39 Michael Hanselmann
127 23e50d39 Michael Hanselmann
class _MasterShutdownCheck:
class _MasterShutdownCheck:
"""Logic for master daemon shutdown.

  """
  """Logic for master daemon shutdown.
130 5483fd73 Michael Hanselmann

131 5483fd73 Michael Hanselmann
#: How long to wait between checks
  #: How long to wait between checks
_CHECK_INTERVAL = 5.0
134 5483fd73 Michael Hanselmann
135 5483fd73 Michael Hanselmann
  #: How long to wait after all jobs are done (e.g. to give clients time to
#: retrieve the job status)
  #: retrieve the job status)
_SHUTDOWN_LINGER = 5.0
138 5483fd73 Michael Hanselmann
def __init__(self):
  def __init__(self):
"""Initializes this class.

    """
    """Initializes this class.
141 5483fd73 Michael Hanselmann

142 5483fd73 Michael Hanselmann
self._had_active_jobs = None
    self._had_active_jobs = None
self._linger_timeout = None
    self._linger_timeout = None
145 5483fd73 Michael Hanselmann
def __call__(self, jq_prepare_result):
  def __call__(self, jq_prepare_result):
147 5483fd73 Michael Hanselmann
    """Determines if master daemon is ready for shutdown.
148 5483fd73 Michael Hanselmann

149 5483fd73 Michael Hanselmann
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
150 5483fd73 Michael Hanselmann
    @rtype: None or number
151 5483fd73 Michael Hanselmann
    @return: None if master daemon is ready, timeout if the check must be
152 5483fd73 Michael Hanselmann
153 5483fd73 Michael Hanselmann

154 5483fd73 Michael Hanselmann
if jq_prepare_result:
    if jq_prepare_result:
# Check again shortly
      # Check again shortly
157 5483fd73 Michael Hanselmann"Job queue has been notified for shutdown but is still"
158 5483fd73 Michael Hanselmann
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
self._had_active_jobs = True
      self._had_active_jobs = True
return self._CHECK_INTERVAL
      return self._CHECK_INTERVAL
161 5483fd73 Michael Hanselmann
if not self._had_active_jobs:
    if not self._had_active_jobs:
163 5483fd73 Michael Hanselmann
      # Can shut down as there were no active jobs on the first check
return None
      return None
165 5483fd73 Michael Hanselmann
166 5483fd73 Michael Hanselmann
    # No jobs are running anymore, but maybe some clients want to collect some
167 5483fd73 Michael Hanselmann
    # information. Give them a short amount of time.
if self._linger_timeout is None:
    if self._linger_timeout is None:
169 5483fd73 Michael Hanselmann
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
170 5483fd73 Michael Hanselmann
remaining = self._linger_timeout.Remaining()
    remaining = self._linger_timeout.Remaining()
172 5483fd73 Michael Hanselmann
173 5483fd73 Michael Hanselmann"Job queue no longer busy; shutting down master daemon"
" in %s seconds", remaining)
                 " in %s seconds", remaining)
175 5483fd73 Michael Hanselmann
176 5483fd73 Michael Hanselmann
    # TODO: Should the master daemon socket be closed at this point? Doing so
# wouldn't affect existing connections.
    # wouldn't affect existing connections.
178 5483fd73 Michael Hanselmann
if remaining < 0:
    if remaining < 0:
return None
      return None
181 5483fd73 Michael Hanselmann
return remaining
      return remaining
183 5483fd73 Michael Hanselmann
184 5483fd73 Michael Hanselmann
185 cdd7f900 Guido Trotter
class MasterServer(daemon.AsyncStreamServer):
186 cdd7f900 Guido Trotter
  """Master Server.
187 ffeffa1d Iustin Pop

188 cdd7f900 Guido Trotter
  This is the main asynchronous master server. It handles connections to the
189 cdd7f900 Guido Trotter
  master socket.
190 ffeffa1d Iustin Pop

191 ffeffa1d Iustin Pop
family = socket.AF_UNIX
  family = socket.AF_UNIX
193 7e5a6e86 Guido Trotter
def __init__(self, address, uid, gid):
  def __init__(self, address, uid, gid):
195 cdd7f900 Guido Trotter
    """MasterServer constructor
196 ce862cd5 Guido Trotter

197 cdd7f900 Guido Trotter
    @param address: the unix socket address to bind the MasterServer to
198 bbfd0568 René Nussbaumer
    @param uid: The uid of the owner of the socket
199 bbfd0568 René Nussbaumer
    @param gid: The gid of the owner of the socket
200 ce862cd5 Guido Trotter

201 ce862cd5 Guido Trotter
202 bbfd0568 René Nussbaumer
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
203 7e5a6e86 Guido Trotter
    daemon.AsyncStreamServer.__init__(self,, temp_name)
os.chmod(temp_name, 0770)
    os.chmod(temp_name, 0770)
os.chown(temp_name, uid, gid)
    os.chown(temp_name, uid, gid)
os.rename(temp_name, address)
    os.rename(temp_name, address)
207 bbfd0568 René Nussbaumer
self.awaker = daemon.AsyncAwaker()
    self.awaker = daemon.AsyncAwaker()
209 50a3fbb2 Michael Hanselmann
# We'll only start threads once we've forked.
    # We'll only start threads once we've forked.
self.context = None
    self.context = None
self.request_workers = None
    self.request_workers = None
213 50a3fbb2 Michael Hanselmann
self._shutdown_check = None
    self._shutdown_check = None
215 5483fd73 Michael Hanselmann
216 cdd7f900 Guido Trotter
  def handle_connection(self, connected_socket, client_address):
217 7e5a6e86 Guido Trotter
    # TODO: add connection count and limit the number of open connections to a
218 7e5a6e86 Guido Trotter
    # maximum number to avoid breaking for lack of file descriptors or memory.
219 7e5a6e86 Guido Trotter
    MasterClientHandler(self, connected_socket, client_address,
220 cdd7f900 Guido Trotter
def setup_queue(self):
  def setup_queue(self):
self.context = GanetiContext()
    self.context = GanetiContext()
223 89e2b4d2 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool("ClientReq",
224 89e2b4d2 Michael Hanselmann
225 23e50d39 Michael Hanselmann
226 ffeffa1d Iustin Pop
def WaitForShutdown(self):
  def WaitForShutdown(self):
"""Prepares server for shutdown.

    """
    """Prepares server for shutdown.
229 5483fd73 Michael Hanselmann

230 5483fd73 Michael Hanselmann
if self._shutdown_check is None:
    if self._shutdown_check is None:
self._shutdown_check = _MasterShutdownCheck()
      self._shutdown_check = _MasterShutdownCheck()
233 5483fd73 Michael Hanselmann
234 5483fd73 Michael Hanselmann
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
235 5483fd73 Michael Hanselmann
def server_cleanup(self):
  def server_cleanup(self):
237 c1f2901b Iustin Pop
    """Cleanup the server.
238 c1f2901b Iustin Pop

239 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
240 c1f2901b Iustin Pop
241 c1f2901b Iustin Pop

242 c1f2901b Iustin Pop
243 50a3fbb2 Michael Hanselmann
244 cdd7f900 Guido Trotter
245 50a3fbb2 Michael Hanselmann
if self.request_workers:
      if self.request_workers:
247 36088c4c Michael Hanselmann
if self.context:
      if self.context:
249 9113300d Michael Hanselmann
250 ffeffa1d Iustin Pop
251 ffeffa1d Iustin Pop
class ClientOps:
class ClientOps:
253 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
def __init__(self, server):
  def __init__(self, server):
self.server = server
    self.server = server
256 ffeffa1d Iustin Pop
257 b459a848 Andrea Spadaccini
  def handle_request(self, method, args): # pylint: disable=R0911
queue = self.server.context.jobqueue
    queue = self.server.context.jobqueue
259 0bbe448c Michael Hanselmann
# TODO: Parameter validation
    # TODO: Parameter validation
261 a629ecb9 Iustin Pop
    "Received invalid arguments of type '%s'", type(args))
262 a629ecb9 Iustin Pop"Received invalid arguments of type '%s'", type(args))
263 a629ecb9 Iustin Pop
      raise ValueError("Invalid arguments type '%s'" % type(args))
264 0bbe448c Michael Hanselmann
265 7260cfbe Iustin Pop
    # TODO: Rewrite to not exit in each 'if/elif' branch
266 7260cfbe Iustin Pop
if method == luxi.REQ_SUBMIT_JOB:
    "Received new job")
268 e566ddbd Iustin Pop"Received new job")
269 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
return queue.SubmitJob(ops)
      return queue.SubmitJob(ops)
271 ffeffa1d Iustin Pop
272 2971c913 Iustin Pop
    "Received multiple jobs")
273 2971c913 Iustin Pop"Received multiple jobs")
jobs = []
      jobs = []
for ops in args:
      for ops in args:
276 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
return queue.SubmitManyJobs(jobs)
      return queue.SubmitManyJobs(jobs)
278 2971c913 Iustin Pop
elif method == luxi.REQ_CANCEL_JOB:
    elif method == luxi.REQ_CANCEL_JOB:
(job_id, ) = args
      (job_id, ) = args
281 e566ddbd Iustin Pop"Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
      return queue.CancelJob(job_id)
283 ffeffa1d Iustin Pop
elif method == luxi.REQ_ARCHIVE_JOB:
    elif method == luxi.REQ_ARCHIVE_JOB:
(job_id, ) = args
      (job_id, ) = args
286 e566ddbd Iustin Pop"Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
      return queue.ArchiveJob(job_id)
288 0bbe448c Michael Hanselmann
289 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
(age, timeout) = args
      (age, timeout) = args
291 e566ddbd Iustin Pop"Received job autoarchive request for age %s, timeout %s",
age, timeout)
                   age, timeout)
return queue.AutoArchiveJobs(age, timeout)
      return queue.AutoArchiveJobs(age, timeout)
294 07cd723a Iustin Pop
elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
296 5c735209 Iustin Pop
      "Received job poll request for %s", job_id)
297 e566ddbd Iustin Pop"Received job poll request for %s", job_id)
298 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
prev_log_serial, timeout)
                                     prev_log_serial, timeout)
300 dfe57c22 Michael Hanselmann
elif method == luxi.REQ_QUERY
    elif method == luxi.REQ_QUERY:
302 a629ecb9 Iustin Pop
      (what, fields, qfilter) = args
303 a629ecb9 Iustin Pop
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
304 28b71a76 Michael Hanselmann
305 abd66bf8 Michael Hanselmann
      if req.what in constants.QR_VIA_OP:
306 28b71a76 Michael Hanselmann
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
307 2e5c33db Iustin Pop
308 24d16f76 Michael Hanselmann
      elif req.what == constants.QR_LOCK:
309 2e5c33db Iustin Pop
        if req.qfilter is not None:
310 24d16f76 Michael Hanselmann
          raise errors.OpPrereqError("Lock queries can't be filtered")
311 24d16f76 Michael Hanselmann
        return self.server.context.glm.QueryLocks(req.fields)
312 abd66bf8 Michael Hanselmann
      elif req.what in constants.QR_VIA_LUXI:
313 28b71a76 Michael Hanselmann
        raise NotImplementedError
314 28b71a76 Michael Hanselmann
315 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
316 28b71a76 Michael Hanselmann
317 28b71a76 Michael Hanselmann
318 28b71a76 Michael Hanselmann
      return result
319 28b71a76 Michael Hanselmann
320 28b71a76 Michael Hanselmann
    elif method == luxi.REQ_QUERY_FIELDS:
321 a629ecb9 Iustin Pop
      (what, fields) = args
322 a629ecb9 Iustin Pop
      req = objects.QueryFieldsRequest(what=what, fields=fields)
323 28b71a76 Michael Hanselmann
324 c1391810 Michael Hanselmann
325 c1391810 Michael Hanselmann
        fielddefs = query.ALL_FIELDS[req.what]
326 c1391810 Michael Hanselmann
      except KeyError:
327 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
328 28b71a76 Michael Hanselmann
329 28b71a76 Michael Hanselmann
330 c1391810 Michael Hanselmann
      return query.QueryFields(fielddefs, req.fields)
331 28b71a76 Michael Hanselmann
332 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
333 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
334 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
335 1f864b60 Iustin Pop
        msg = utils.CommaJoin(job_ids)
336 e566ddbd Iustin Pop
337 e566ddbd Iustin Pop
        msg = str(job_ids)
338 e566ddbd Iustin Pop"Received job query request for %s", msg)
339 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
340 0bbe448c Michael Hanselmann
341 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
342 ec79568d Iustin Pop
      (names, fields, use_locking) = args
343 e566ddbd Iustin Pop"Received instance query request for %s", names)
344 77921a95 Iustin Pop
      if use_locking:
345 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
346 debac808 Iustin Pop
347 f2af0bec Iustin Pop
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
348 f2af0bec Iustin Pop
349 ee6c7b94 Michael Hanselmann
      return self._Query(op)
350 ee6c7b94 Michael Hanselmann
351 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
352 ec79568d Iustin Pop
      (names, fields, use_locking) = args
353 e566ddbd Iustin Pop"Received node query request for %s", names)
354 77921a95 Iustin Pop
      if use_locking:
355 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
356 debac808 Iustin Pop
357 2237687b Iustin Pop
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
358 2237687b Iustin Pop
359 02f7fe54 Michael Hanselmann
      return self._Query(op)
360 02f7fe54 Michael Hanselmann
361 a79ef2a5 Adeodato Simo
    elif method == luxi.REQ_QUERY_GROUPS:
362 a79ef2a5 Adeodato Simo
      (names, fields, use_locking) = args
363 a79ef2a5 Adeodato Simo"Received group query request for %s", names)
364 a79ef2a5 Adeodato Simo
      if use_locking:
365 a79ef2a5 Adeodato Simo
        raise errors.OpPrereqError("Sync queries are not allowed",
366 a79ef2a5 Adeodato Simo
367 d4d654bd Iustin Pop
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
368 a79ef2a5 Adeodato Simo
      return self._Query(op)
369 a79ef2a5 Adeodato Simo
370 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
371 a629ecb9 Iustin Pop
      (nodes, use_locking) = args
372 77921a95 Iustin Pop
      if use_locking:
373 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
374 debac808 Iustin Pop
375 e566ddbd Iustin Pop"Received exports query request")
376 7ca2d4d8 Iustin Pop
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
377 32f93223 Michael Hanselmann
      return self._Query(op)
378 32f93223 Michael Hanselmann
379 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
380 a629ecb9 Iustin Pop
      (fields, ) = args
381 e566ddbd Iustin Pop"Received config values query request for %s", fields)
382 2f093ea0 Iustin Pop
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
383 ae5849b5 Michael Hanselmann
      return self._Query(op)
384 ae5849b5 Michael Hanselmann
385 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
386 e566ddbd Iustin Pop"Received cluster info query request")
387 a2f7ab92 Iustin Pop
      op = opcodes.OpClusterQuery()
388 66baeccc Iustin Pop
      return self._Query(op)
389 66baeccc Iustin Pop
390 7699c3af Iustin Pop
    elif method == luxi.REQ_QUERY_TAGS:
391 a629ecb9 Iustin Pop
      (kind, name) = args
392 7699c3af Iustin Pop"Received tags query request")
393 c6afb1ca Iustin Pop
      op = opcodes.OpTagsGet(kind=kind, name=name)
394 7699c3af Iustin Pop
      return self._Query(op)
395 7699c3af Iustin Pop
396 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
397 a629ecb9 Iustin Pop
      (drain_flag, ) = args
398 e566ddbd Iustin Pop"Received queue drain flag change request to %s",
399 e566ddbd Iustin Pop
400 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
401 3ccafd0e Iustin Pop
402 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
403 05e50653 Michael Hanselmann
      (until, ) = args
404 05e50653 Michael Hanselmann
405 05e50653 Michael Hanselmann
      if until is None:
406 05e50653 Michael Hanselmann"Received request to no longer pause the watcher")
407 05e50653 Michael Hanselmann
408 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
409 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
410 05e50653 Michael Hanselmann
411 05e50653 Michael Hanselmann
        if until < time.time():
412 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
413 05e50653 Michael Hanselmann
414 05e50653 Michael Hanselmann"Received request to pause the watcher until %s", until)
415 05e50653 Michael Hanselmann
416 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
417 05e50653 Michael Hanselmann
418 0bbe448c Michael Hanselmann
419 e566ddbd Iustin Pop"Received invalid request '%s'", method)
420 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
421 ffeffa1d Iustin Pop
422 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
423 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
424 ee6c7b94 Michael Hanselmann

425 ee6c7b94 Michael Hanselmann
426 adfa97e3 Guido Trotter
    # Queries don't have a job id
427 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
428 26d3fd2f Michael Hanselmann
429 26d3fd2f Michael Hanselmann
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
430 26d3fd2f Michael Hanselmann
    # Consider using a timeout for retries.
431 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
432 ee6c7b94 Michael Hanselmann
433 ffeffa1d Iustin Pop
434 39dcf2ef Guido Trotter
class GanetiContext(object):
435 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
436 39dcf2ef Guido Trotter

437 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
438 39dcf2ef Guido Trotter

439 39dcf2ef Guido Trotter
440 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
441 7260cfbe Iustin Pop
  # we do want to ensure a singleton here
442 39dcf2ef Guido Trotter
  _instance = None
443 39dcf2ef Guido Trotter
444 39dcf2ef Guido Trotter
  def __init__(self):
445 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
446 39dcf2ef Guido Trotter

447 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
448 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
449 39dcf2ef Guido Trotter

450 39dcf2ef Guido Trotter
451 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
452 39dcf2ef Guido Trotter
453 9113300d Michael Hanselmann
    # Create global configuration object
454 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
455 9113300d Michael Hanselmann
456 9113300d Michael Hanselmann
    # Locking manager
457 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
458 39dcf2ef Guido Trotter
459 819ca990 Guido Trotter
460 39dcf2ef Guido Trotter
461 39dcf2ef Guido Trotter
462 b2acdbdc Michael Hanselmann
463 b2acdbdc Michael Hanselmann
464 87b3cb26 Michael Hanselmann
    # RPC runner
465 d5ea30e8 Michael Hanselmann
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
466 87b3cb26 Michael Hanselmann
467 cb4d3314 Michael Hanselmann
    # Job queue
468 cb4d3314 Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
469 cb4d3314 Michael Hanselmann
470 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
471 39dcf2ef Guido Trotter
    self.__class__._instance = self
472 39dcf2ef Guido Trotter
473 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
474 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
475 39dcf2ef Guido Trotter

476 39dcf2ef Guido Trotter
477 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
478 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
479 39dcf2ef Guido Trotter
480 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
481 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
482 d8470559 Michael Hanselmann

483 d8470559 Michael Hanselmann
484 d8470559 Michael Hanselmann
    # Add it to the configuration
485 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
486 d8470559 Michael Hanselmann
487 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
488 99aabbed Iustin Pop
489 c36176cc Michael Hanselmann
490 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
491 d8470559 Michael Hanselmann
492 4e070776 Michael Hanselmann
493 d8470559 Michael Hanselmann
494 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
495 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
496 d8470559 Michael Hanselmann

497 d8470559 Michael Hanselmann
498 c36176cc Michael Hanselmann
    # Synchronize the queue again
499 99aabbed Iustin Pop
500 d8470559 Michael Hanselmann
501 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
502 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
503 d8470559 Michael Hanselmann

504 d8470559 Michael Hanselmann
505 d8470559 Michael Hanselmann
    # Remove node from configuration
506 d8470559 Michael Hanselmann
507 d8470559 Michael Hanselmann
508 c36176cc Michael Hanselmann
    # Notify job queue
509 c36176cc Michael Hanselmann
510 c36176cc Michael Hanselmann
511 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
512 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
513 4e070776 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE_RES, name)
514 d8470559 Michael Hanselmann
515 39dcf2ef Guido Trotter
516 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
517 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
518 05e50653 Michael Hanselmann

519 05e50653 Michael Hanselmann
  @type until: None or int
520 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
521 05e50653 Michael Hanselmann

522 05e50653 Michael Hanselmann
523 05e50653 Michael Hanselmann
  if until is None:
524 05e50653 Michael Hanselmann
525 05e50653 Michael Hanselmann
526 05e50653 Michael Hanselmann
527 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
528 05e50653 Michael Hanselmann
529 28b498cd Michael Hanselmann
  return until
530 28b498cd Michael Hanselmann
531 05e50653 Michael Hanselmann
532 e0e916fe Iustin Pop
533 36205981 Iustin Pop
def CheckAgreement():
534 36205981 Iustin Pop
  """Check the agreement on who is the master.
535 36205981 Iustin Pop

536 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
537 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
538 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
539 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
540 36205981 Iustin Pop
  obsolete) known nodes.
541 36205981 Iustin Pop

542 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
543 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
544 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
545 d7cdb55d Iustin Pop
  should be attempted.
546 d7cdb55d Iustin Pop

547 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
548 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
549 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
550 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
551 d7cdb55d Iustin Pop

552 36205981 Iustin Pop
553 b705c7a6 Manuel Franceschini
  myself = netutils.Hostname.GetSysName()
554 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
555 36205981 Iustin Pop
  cfg = config.ConfigWriter()
556 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
557 36205981 Iustin Pop
  del cfg
558 d7cdb55d Iustin Pop
  retries = 6
559 d7cdb55d Iustin Pop
  while retries > 0:
560 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
561 d7cdb55d Iustin Pop
    if not votes:
562 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
563 d7cdb55d Iustin Pop
      return True
564 d7cdb55d Iustin Pop
    if votes[0][0] is None:
565 d7cdb55d Iustin Pop
      retries -= 1
566 d7cdb55d Iustin Pop
567 36205981 Iustin Pop
568 d7cdb55d Iustin Pop
569 d7cdb55d Iustin Pop
  if retries == 0:
570 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
571 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
572 d8f5a37d Iustin Pop
    logging.critical("Use the --no-voting option if you understand what"
573 d8f5a37d Iustin Pop
                     " effects it has on the cluster state")
574 e09fdcfa Iustin Pop
    return False
575 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
576 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
577 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
578 8a20c732 Michael Hanselmann
579 d7cdb55d Iustin Pop
  result = False
580 d7cdb55d Iustin Pop
  if top_node != myself:
581 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
582 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
583 bbe19c17 Iustin Pop
584 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
585 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
586 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
587 d7cdb55d Iustin Pop
588 d7cdb55d Iustin Pop
    result = True
589 d7cdb55d Iustin Pop
590 d7cdb55d Iustin Pop
  return result
591 36205981 Iustin Pop
592 6c948699 Michael Hanselmann
593 340f4757 Iustin Pop
594 340f4757 Iustin Pop
def ActivateMasterIP():
595 340f4757 Iustin Pop
  # activate ip
596 8da2bd43 Andrea Spadaccini
  cfg = config.ConfigWriter()
597 f9d20654 Andrea Spadaccini
  master_params = cfg.GetMasterNetworkParameters()
598 57c7bc57 Andrea Spadaccini
  ems = cfg.GetUseExternalMipScript()
599 8da2bd43 Andrea Spadaccini
  runner = rpc.BootstrapRunner()
600 f9d20654 Andrea Spadaccini
  result = runner.call_node_activate_master_ip(,
601 57c7bc57 Andrea Spadaccini
                                               master_params, ems)
602 8da2bd43 Andrea Spadaccini
603 340f4757 Iustin Pop
  msg = result.fail_msg
604 340f4757 Iustin Pop
  if msg:
605 340f4757 Iustin Pop
    logging.error("Can't activate master IP address: %s", msg)
606 340f4757 Iustin Pop
607 340f4757 Iustin Pop
608 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
609 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
610 ed0efaa5 Michael Hanselmann

611 ed0efaa5 Michael Hanselmann
612 f93427cd Iustin Pop
  if args: # masterd doesn't take any arguments
613 f93427cd Iustin Pop
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
614 f93427cd Iustin Pop
615 f93427cd Iustin Pop
616 ed0efaa5 Michael Hanselmann
617 ed0efaa5 Michael Hanselmann
618 bbfd0568 René Nussbaumer
619 bbfd0568 René Nussbaumer
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
620 bbfd0568 René Nussbaumer
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
621 bbfd0568 René Nussbaumer
  except KeyError:
622 bbfd0568 René Nussbaumer
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
623 bbfd0568 René Nussbaumer
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
624 bbfd0568 René Nussbaumer
625 bbfd0568 René Nussbaumer
626 4b63dc7a Iustin Pop
  # Check the configuration is sane before anything else
627 4b63dc7a Iustin Pop
628 4b63dc7a Iustin Pop
629 4b63dc7a Iustin Pop
  except errors.ConfigVersionMismatch, err:
630 4b63dc7a Iustin Pop
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
631 4b63dc7a Iustin Pop
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
632 4b63dc7a Iustin Pop
    print >> sys.stderr,  \
633 4b63dc7a Iustin Pop
        ("Configuration version mismatch. The current Ganeti software"
634 4b63dc7a Iustin Pop
         " expects version %s, but the on-disk configuration file has"
635 4b63dc7a Iustin Pop
         " version %s. This is likely the result of upgrading the"
636 4b63dc7a Iustin Pop
         " software without running the upgrade procedure. Please contact"
637 4b63dc7a Iustin Pop
         " your cluster administrator or complete the upgrade using the"
638 4b63dc7a Iustin Pop
         " cfgupgrade utility, after reading the upgrade notes." %
639 4b63dc7a Iustin Pop
         (v1, v2))
640 4b63dc7a Iustin Pop
641 4b63dc7a Iustin Pop
  except errors.ConfigurationError, err:
642 4b63dc7a Iustin Pop
    print >> sys.stderr, \
643 4b63dc7a Iustin Pop
        ("Configuration error while opening the configuration file: %s\n"
644 4b63dc7a Iustin Pop
         "This might be caused by an incomplete software upgrade or"
645 4b63dc7a Iustin Pop
         " by a corrupted configuration file. Until the problem is fixed"
646 4b63dc7a Iustin Pop
         " the master daemon cannot start." % str(err))
647 4b63dc7a Iustin Pop
648 bbfd0568 René Nussbaumer
649 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
650 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
651 ed0efaa5 Michael Hanselmann
  if options.no_voting:
652 675e2bf5 Iustin Pop
    if not options.yes_do_it:
653 675e2bf5 Iustin Pop
      sys.stdout.write("The 'no voting' option has been selected.\n")
654 675e2bf5 Iustin Pop
      sys.stdout.write("This is dangerous, please confirm by"
655 675e2bf5 Iustin Pop
                       " typing uppercase 'yes': ")
656 675e2bf5 Iustin Pop
657 ed0efaa5 Michael Hanselmann
658 675e2bf5 Iustin Pop
      confirmation = sys.stdin.readline().strip()
659 675e2bf5 Iustin Pop
      if confirmation != "YES":
660 675e2bf5 Iustin Pop
        print >> sys.stderr, "Aborting."
661 675e2bf5 Iustin Pop
662 ed0efaa5 Michael Hanselmann
663 675e2bf5 Iustin Pop
664 675e2bf5 Iustin Pop
    # CheckAgreement uses RPC and threads, hence it needs to be run in
665 675e2bf5 Iustin Pop
    # a separate process before we call utils.Daemonize in the current
666 675e2bf5 Iustin Pop
    # process.
667 675e2bf5 Iustin Pop
    if not utils.RunInSeparateProcess(CheckAgreement):
668 ed0efaa5 Michael Hanselmann
669 ed0efaa5 Michael Hanselmann
670 340f4757 Iustin Pop
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
671 340f4757 Iustin Pop
  # separate process.
672 340f4757 Iustin Pop
673 340f4757 Iustin Pop
  # TODO: decide whether failure to activate the master IP is a fatal error
674 340f4757 Iustin Pop
675 340f4757 Iustin Pop
676 ed0efaa5 Michael Hanselmann
677 3ee53f1f Iustin Pop
def PrepMasterd(options, _):
678 3ee53f1f Iustin Pop
  """Prep master daemon function, executed with the PID file held.
679 3b316acb Iustin Pop

680 04ccf5e9 Guido Trotter
681 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
682 04ccf5e9 Guido Trotter
  # concurrent execution.
683 04ccf5e9 Guido Trotter
684 b1b6ea87 Iustin Pop
685 cdd7f900 Guido Trotter
  mainloop = daemon.Mainloop()
686 e8a701f6 Michael Hanselmann
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
687 3ee53f1f Iustin Pop
  return (mainloop, master)
688 3ee53f1f Iustin Pop
689 3ee53f1f Iustin Pop
690 b459a848 Andrea Spadaccini
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
691 3ee53f1f Iustin Pop
  """Main master daemon function, executed with the PID file held.
692 3ee53f1f Iustin Pop

693 3ee53f1f Iustin Pop
694 3ee53f1f Iustin Pop
  (mainloop, master) = prep_data
695 04ccf5e9 Guido Trotter
696 15486fa7 Michael Hanselmann
697 4331f6cd Michael Hanselmann
698 15486fa7 Michael Hanselmann
699 15486fa7 Michael Hanselmann
700 5483fd73 Michael Hanselmann
701 15486fa7 Michael Hanselmann
702 15486fa7 Michael Hanselmann
703 4331f6cd Michael Hanselmann
704 15486fa7 Michael Hanselmann
705 a4af651e Iustin Pop
706 227647ac Guido Trotter
707 a4af651e Iustin Pop
708 5483fd73 Michael Hanselmann"Clean master daemon shutdown")
709 5483fd73 Michael Hanselmann
710 ffeffa1d Iustin Pop
711 29d91329 Michael Hanselmann
def Main():
712 04ccf5e9 Guido Trotter
  """Main function"""
713 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
714 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
715 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
716 04ccf5e9 Guido Trotter
717 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
718 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
719 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
720 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
721 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
722 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
723 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
724 3ee53f1f Iustin Pop
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
725 b42ea9ed Iustin Pop
                     ExecMasterd, multithreaded=True)