Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 2237687b

History | View | Annotate | Download (21.4 kB)

1 69cf3abd Michael Hanselmann
#
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 f2af0bec Iustin Pop
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop

24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop

27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 7260cfbe Iustin Pop
# pylint: disable-msg=C0103
30 7260cfbe Iustin Pop
# C0103: Invalid name ganeti-masterd
31 ffeffa1d Iustin Pop
32 bbfd0568 René Nussbaumer
import grp
33 bbfd0568 René Nussbaumer
import os
34 bbfd0568 René Nussbaumer
import pwd
35 c1f2901b Iustin Pop
import sys
36 cdd7f900 Guido Trotter
import socket
37 ffeffa1d Iustin Pop
import time
38 bbfd0568 René Nussbaumer
import tempfile
39 96cb3986 Michael Hanselmann
import logging
40 ffeffa1d Iustin Pop
41 c1f2901b Iustin Pop
from optparse import OptionParser
42 ffeffa1d Iustin Pop
43 39dcf2ef Guido Trotter
from ganeti import config
44 ffeffa1d Iustin Pop
from ganeti import constants
45 04ccf5e9 Guido Trotter
from ganeti import daemon
46 ffeffa1d Iustin Pop
from ganeti import mcpu
47 ffeffa1d Iustin Pop
from ganeti import opcodes
48 ffeffa1d Iustin Pop
from ganeti import jqueue
49 39dcf2ef Guido Trotter
from ganeti import locking
50 ffeffa1d Iustin Pop
from ganeti import luxi
51 ffeffa1d Iustin Pop
from ganeti import utils
52 c1f2901b Iustin Pop
from ganeti import errors
53 c1f2901b Iustin Pop
from ganeti import ssconf
54 23e50d39 Michael Hanselmann
from ganeti import workerpool
55 b1b6ea87 Iustin Pop
from ganeti import rpc
56 d7cdb55d Iustin Pop
from ganeti import bootstrap
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 28b71a76 Michael Hanselmann
from ganeti import objects
59 24d16f76 Michael Hanselmann
from ganeti import query
60 c1f2901b Iustin Pop
61 c1f2901b Iustin Pop
62 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
63 23e50d39 Michael Hanselmann
64 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 ffeffa1d Iustin Pop
67 ffeffa1d Iustin Pop
68 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
69 e0dbb89b Guido Trotter
  # pylint: disable-msg=W0221
70 7e5a6e86 Guido Trotter
  def RunTask(self, server, message, client):
71 23e50d39 Michael Hanselmann
    """Process the request.
72 23e50d39 Michael Hanselmann

73 23e50d39 Michael Hanselmann
    """
74 7e5a6e86 Guido Trotter
    client_ops = ClientOps(server)
75 7e5a6e86 Guido Trotter
76 23e50d39 Michael Hanselmann
    try:
77 e986f20c Michael Hanselmann
      (method, args, version) = luxi.ParseRequest(message)
78 7e5a6e86 Guido Trotter
    except luxi.ProtocolError, err:
79 7e5a6e86 Guido Trotter
      logging.error("Protocol Error: %s", err)
80 7e5a6e86 Guido Trotter
      client.close_log()
81 7e5a6e86 Guido Trotter
      return
82 7e5a6e86 Guido Trotter
83 7e5a6e86 Guido Trotter
    success = False
84 7e5a6e86 Guido Trotter
    try:
85 e986f20c Michael Hanselmann
      # Verify client's version if there was one in the request
86 e986f20c Michael Hanselmann
      if version is not None and version != constants.LUXI_VERSION:
87 e986f20c Michael Hanselmann
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88 e986f20c Michael Hanselmann
                               (constants.LUXI_VERSION, version))
89 e986f20c Michael Hanselmann
90 7e5a6e86 Guido Trotter
      result = client_ops.handle_request(method, args)
91 7e5a6e86 Guido Trotter
      success = True
92 7e5a6e86 Guido Trotter
    except errors.GenericError, err:
93 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
94 7e5a6e86 Guido Trotter
      success = False
95 7e5a6e86 Guido Trotter
      result = errors.EncodeException(err)
96 7e5a6e86 Guido Trotter
    except:
97 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
98 7e5a6e86 Guido Trotter
      err = sys.exc_info()
99 7e5a6e86 Guido Trotter
      result = "Caught exception: %s" % str(err[1])
100 7e5a6e86 Guido Trotter
101 7e5a6e86 Guido Trotter
    try:
102 7e5a6e86 Guido Trotter
      reply = luxi.FormatResponse(success, result)
103 7e5a6e86 Guido Trotter
      client.send_message(reply)
104 7e5a6e86 Guido Trotter
      # awake the main thread so that it can write out the data.
105 7e5a6e86 Guido Trotter
      server.awaker.signal()
106 e0dbb89b Guido Trotter
    except: # pylint: disable-msg=W0702
107 7e5a6e86 Guido Trotter
      logging.exception("Send error")
108 7e5a6e86 Guido Trotter
      client.close_log()
109 7e5a6e86 Guido Trotter
110 7e5a6e86 Guido Trotter
111 7e5a6e86 Guido Trotter
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112 7e5a6e86 Guido Trotter
  """Handler for master peers.
113 7e5a6e86 Guido Trotter

114 7e5a6e86 Guido Trotter
  """
115 7e5a6e86 Guido Trotter
  _MAX_UNHANDLED = 1
116 7e5a6e86 Guido Trotter
  def __init__(self, server, connected_socket, client_address, family):
117 7e5a6e86 Guido Trotter
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
118 7e5a6e86 Guido Trotter
                                                 client_address,
119 7e5a6e86 Guido Trotter
                                                 constants.LUXI_EOM,
120 7e5a6e86 Guido Trotter
                                                 family, self._MAX_UNHANDLED)
121 7e5a6e86 Guido Trotter
    self.server = server
122 7e5a6e86 Guido Trotter
123 7e5a6e86 Guido Trotter
  def handle_message(self, message, _):
124 b2e8a4d9 Michael Hanselmann
    self.server.request_workers.AddTask((self.server, message, self))
125 23e50d39 Michael Hanselmann
126 23e50d39 Michael Hanselmann
127 cdd7f900 Guido Trotter
class MasterServer(daemon.AsyncStreamServer):
128 cdd7f900 Guido Trotter
  """Master Server.
129 ffeffa1d Iustin Pop

130 cdd7f900 Guido Trotter
  This is the main asynchronous master server. It handles connections to the
131 cdd7f900 Guido Trotter
  master socket.
132 ffeffa1d Iustin Pop

133 ffeffa1d Iustin Pop
  """
134 7e5a6e86 Guido Trotter
  family = socket.AF_UNIX
135 7e5a6e86 Guido Trotter
136 7e5a6e86 Guido Trotter
  def __init__(self, mainloop, address, uid, gid):
137 cdd7f900 Guido Trotter
    """MasterServer constructor
138 ce862cd5 Guido Trotter

139 cdd7f900 Guido Trotter
    @type mainloop: ganeti.daemon.Mainloop
140 cdd7f900 Guido Trotter
    @param mainloop: Mainloop used to poll for I/O events
141 cdd7f900 Guido Trotter
    @param address: the unix socket address to bind the MasterServer to
142 bbfd0568 René Nussbaumer
    @param uid: The uid of the owner of the socket
143 bbfd0568 René Nussbaumer
    @param gid: The gid of the owner of the socket
144 ce862cd5 Guido Trotter

145 ce862cd5 Guido Trotter
    """
146 bbfd0568 René Nussbaumer
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
147 7e5a6e86 Guido Trotter
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
148 bbfd0568 René Nussbaumer
    os.chmod(temp_name, 0770)
149 bbfd0568 René Nussbaumer
    os.chown(temp_name, uid, gid)
150 bbfd0568 René Nussbaumer
    os.rename(temp_name, address)
151 bbfd0568 René Nussbaumer
152 cdd7f900 Guido Trotter
    self.mainloop = mainloop
153 7e5a6e86 Guido Trotter
    self.awaker = daemon.AsyncAwaker()
154 50a3fbb2 Michael Hanselmann
155 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
156 9113300d Michael Hanselmann
    self.context = None
157 23e50d39 Michael Hanselmann
    self.request_workers = None
158 50a3fbb2 Michael Hanselmann
159 cdd7f900 Guido Trotter
  def handle_connection(self, connected_socket, client_address):
160 7e5a6e86 Guido Trotter
    # TODO: add connection count and limit the number of open connections to a
161 7e5a6e86 Guido Trotter
    # maximum number to avoid breaking for lack of file descriptors or memory.
162 7e5a6e86 Guido Trotter
    MasterClientHandler(self, connected_socket, client_address, self.family)
163 cdd7f900 Guido Trotter
164 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
165 9113300d Michael Hanselmann
    self.context = GanetiContext()
166 89e2b4d2 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool("ClientReq",
167 89e2b4d2 Michael Hanselmann
                                                 CLIENT_REQUEST_WORKERS,
168 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
169 ffeffa1d Iustin Pop
170 c1f2901b Iustin Pop
  def server_cleanup(self):
171 c1f2901b Iustin Pop
    """Cleanup the server.
172 c1f2901b Iustin Pop

173 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
174 c1f2901b Iustin Pop
    socket.
175 c1f2901b Iustin Pop

176 c1f2901b Iustin Pop
    """
177 50a3fbb2 Michael Hanselmann
    try:
178 cdd7f900 Guido Trotter
      self.close()
179 50a3fbb2 Michael Hanselmann
    finally:
180 23e50d39 Michael Hanselmann
      if self.request_workers:
181 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
182 9113300d Michael Hanselmann
      if self.context:
183 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
184 ffeffa1d Iustin Pop
185 ffeffa1d Iustin Pop
186 ffeffa1d Iustin Pop
class ClientOps:
187 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
188 ffeffa1d Iustin Pop
  def __init__(self, server):
189 ffeffa1d Iustin Pop
    self.server = server
190 ffeffa1d Iustin Pop
191 7260cfbe Iustin Pop
  def handle_request(self, method, args): # pylint: disable-msg=R0911
192 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
193 0bbe448c Michael Hanselmann
194 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
195 0bbe448c Michael Hanselmann
196 7260cfbe Iustin Pop
    # TODO: Rewrite to not exit in each 'if/elif' branch
197 7260cfbe Iustin Pop
198 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
199 e566ddbd Iustin Pop
      logging.info("Received new job")
200 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
201 4c848b18 Michael Hanselmann
      return queue.SubmitJob(ops)
202 ffeffa1d Iustin Pop
203 2971c913 Iustin Pop
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
204 2971c913 Iustin Pop
      logging.info("Received multiple jobs")
205 2971c913 Iustin Pop
      jobs = []
206 2971c913 Iustin Pop
      for ops in args:
207 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
208 2971c913 Iustin Pop
      return queue.SubmitManyJobs(jobs)
209 2971c913 Iustin Pop
210 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
211 3a2c7775 Michael Hanselmann
      job_id = args
212 e566ddbd Iustin Pop
      logging.info("Received job cancel request for %s", job_id)
213 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
214 ffeffa1d Iustin Pop
215 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
216 3a2c7775 Michael Hanselmann
      job_id = args
217 e566ddbd Iustin Pop
      logging.info("Received job archive request for %s", job_id)
218 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
219 0bbe448c Michael Hanselmann
220 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
221 f8ad5591 Michael Hanselmann
      (age, timeout) = args
222 e566ddbd Iustin Pop
      logging.info("Received job autoarchive request for age %s, timeout %s",
223 e566ddbd Iustin Pop
                   age, timeout)
224 f8ad5591 Michael Hanselmann
      return queue.AutoArchiveJobs(age, timeout)
225 07cd723a Iustin Pop
226 dfe57c22 Michael Hanselmann
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227 5c735209 Iustin Pop
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228 e566ddbd Iustin Pop
      logging.info("Received job poll request for %s", job_id)
229 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230 5c735209 Iustin Pop
                                     prev_log_serial, timeout)
231 dfe57c22 Michael Hanselmann
232 28b71a76 Michael Hanselmann
    elif method == luxi.REQ_QUERY:
233 28b71a76 Michael Hanselmann
      req = objects.QueryRequest.FromDict(args)
234 28b71a76 Michael Hanselmann
235 28b71a76 Michael Hanselmann
      if req.what in constants.QR_OP_QUERY:
236 28b71a76 Michael Hanselmann
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
237 28b71a76 Michael Hanselmann
                                             filter=req.filter))
238 24d16f76 Michael Hanselmann
      elif req.what == constants.QR_LOCK:
239 24d16f76 Michael Hanselmann
        if req.filter is not None:
240 24d16f76 Michael Hanselmann
          raise errors.OpPrereqError("Lock queries can't be filtered")
241 24d16f76 Michael Hanselmann
        return self.server.context.glm.QueryLocks(req.fields)
242 28b71a76 Michael Hanselmann
      elif req.what in constants.QR_OP_LUXI:
243 28b71a76 Michael Hanselmann
        raise NotImplementedError
244 28b71a76 Michael Hanselmann
      else:
245 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
246 28b71a76 Michael Hanselmann
                                   errors.ECODE_INVAL)
247 28b71a76 Michael Hanselmann
248 28b71a76 Michael Hanselmann
      return result
249 28b71a76 Michael Hanselmann
250 28b71a76 Michael Hanselmann
    elif method == luxi.REQ_QUERY_FIELDS:
251 28b71a76 Michael Hanselmann
      req = objects.QueryFieldsRequest.FromDict(args)
252 28b71a76 Michael Hanselmann
253 28b71a76 Michael Hanselmann
      if req.what in constants.QR_OP_QUERY:
254 28b71a76 Michael Hanselmann
        result = self._Query(opcodes.OpQueryFields(what=req.what,
255 28b71a76 Michael Hanselmann
                                                   fields=req.fields))
256 24d16f76 Michael Hanselmann
      elif req.what == constants.QR_LOCK:
257 24d16f76 Michael Hanselmann
        return query.QueryFields(query.LOCK_FIELDS, req.fields)
258 28b71a76 Michael Hanselmann
      elif req.what in constants.QR_OP_LUXI:
259 28b71a76 Michael Hanselmann
        raise NotImplementedError
260 28b71a76 Michael Hanselmann
      else:
261 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
262 28b71a76 Michael Hanselmann
                                   errors.ECODE_INVAL)
263 28b71a76 Michael Hanselmann
264 28b71a76 Michael Hanselmann
      return result
265 28b71a76 Michael Hanselmann
266 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
267 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
268 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
269 1f864b60 Iustin Pop
        msg = utils.CommaJoin(job_ids)
270 e566ddbd Iustin Pop
      else:
271 e566ddbd Iustin Pop
        msg = str(job_ids)
272 e566ddbd Iustin Pop
      logging.info("Received job query request for %s", msg)
273 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
274 0bbe448c Michael Hanselmann
275 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
276 ec79568d Iustin Pop
      (names, fields, use_locking) = args
277 e566ddbd Iustin Pop
      logging.info("Received instance query request for %s", names)
278 77921a95 Iustin Pop
      if use_locking:
279 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
280 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
281 f2af0bec Iustin Pop
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
282 f2af0bec Iustin Pop
                                   use_locking=use_locking)
283 ee6c7b94 Michael Hanselmann
      return self._Query(op)
284 ee6c7b94 Michael Hanselmann
285 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
286 ec79568d Iustin Pop
      (names, fields, use_locking) = args
287 e566ddbd Iustin Pop
      logging.info("Received node query request for %s", names)
288 77921a95 Iustin Pop
      if use_locking:
289 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
290 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
291 2237687b Iustin Pop
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
292 2237687b Iustin Pop
                               use_locking=use_locking)
293 02f7fe54 Michael Hanselmann
      return self._Query(op)
294 02f7fe54 Michael Hanselmann
295 a79ef2a5 Adeodato Simo
    elif method == luxi.REQ_QUERY_GROUPS:
296 a79ef2a5 Adeodato Simo
      (names, fields, use_locking) = args
297 a79ef2a5 Adeodato Simo
      logging.info("Received group query request for %s", names)
298 a79ef2a5 Adeodato Simo
      if use_locking:
299 a79ef2a5 Adeodato Simo
        raise errors.OpPrereqError("Sync queries are not allowed",
300 a79ef2a5 Adeodato Simo
                                   errors.ECODE_INVAL)
301 d4d654bd Iustin Pop
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
302 a79ef2a5 Adeodato Simo
      return self._Query(op)
303 a79ef2a5 Adeodato Simo
304 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
305 ec79568d Iustin Pop
      nodes, use_locking = args
306 77921a95 Iustin Pop
      if use_locking:
307 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
308 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
309 e566ddbd Iustin Pop
      logging.info("Received exports query request")
310 7ca2d4d8 Iustin Pop
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
311 32f93223 Michael Hanselmann
      return self._Query(op)
312 32f93223 Michael Hanselmann
313 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
314 ae5849b5 Michael Hanselmann
      fields = args
315 e566ddbd Iustin Pop
      logging.info("Received config values query request for %s", fields)
316 2f093ea0 Iustin Pop
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
317 ae5849b5 Michael Hanselmann
      return self._Query(op)
318 ae5849b5 Michael Hanselmann
319 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
320 e566ddbd Iustin Pop
      logging.info("Received cluster info query request")
321 a2f7ab92 Iustin Pop
      op = opcodes.OpClusterQuery()
322 66baeccc Iustin Pop
      return self._Query(op)
323 66baeccc Iustin Pop
324 7699c3af Iustin Pop
    elif method == luxi.REQ_QUERY_TAGS:
325 7699c3af Iustin Pop
      kind, name = args
326 7699c3af Iustin Pop
      logging.info("Received tags query request")
327 7699c3af Iustin Pop
      op = opcodes.OpGetTags(kind=kind, name=name)
328 7699c3af Iustin Pop
      return self._Query(op)
329 7699c3af Iustin Pop
330 19b9ba9a Michael Hanselmann
    elif method == luxi.REQ_QUERY_LOCKS:
331 19b9ba9a Michael Hanselmann
      (fields, sync) = args
332 19b9ba9a Michael Hanselmann
      logging.info("Received locks query request")
333 24d16f76 Michael Hanselmann
      if sync:
334 24d16f76 Michael Hanselmann
        raise NotImplementedError("Synchronous queries are not implemented")
335 24d16f76 Michael Hanselmann
      return self.server.context.glm.OldStyleQueryLocks(fields)
336 19b9ba9a Michael Hanselmann
337 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
338 3ccafd0e Iustin Pop
      drain_flag = args
339 e566ddbd Iustin Pop
      logging.info("Received queue drain flag change request to %s",
340 e566ddbd Iustin Pop
                   drain_flag)
341 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
342 3ccafd0e Iustin Pop
343 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
344 05e50653 Michael Hanselmann
      (until, ) = args
345 05e50653 Michael Hanselmann
346 05e50653 Michael Hanselmann
      if until is None:
347 05e50653 Michael Hanselmann
        logging.info("Received request to no longer pause the watcher")
348 05e50653 Michael Hanselmann
      else:
349 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
350 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
351 05e50653 Michael Hanselmann
352 05e50653 Michael Hanselmann
        if until < time.time():
353 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
354 05e50653 Michael Hanselmann
355 05e50653 Michael Hanselmann
        logging.info("Received request to pause the watcher until %s", until)
356 05e50653 Michael Hanselmann
357 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
358 05e50653 Michael Hanselmann
359 0bbe448c Michael Hanselmann
    else:
360 e566ddbd Iustin Pop
      logging.info("Received invalid request '%s'", method)
361 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
362 ffeffa1d Iustin Pop
363 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
364 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
365 ee6c7b94 Michael Hanselmann

366 ee6c7b94 Michael Hanselmann
    """
367 adfa97e3 Guido Trotter
    # Queries don't have a job id
368 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
369 26d3fd2f Michael Hanselmann
370 26d3fd2f Michael Hanselmann
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
371 26d3fd2f Michael Hanselmann
    # Consider using a timeout for retries.
372 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
373 ee6c7b94 Michael Hanselmann
374 ffeffa1d Iustin Pop
375 39dcf2ef Guido Trotter
class GanetiContext(object):
376 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
377 39dcf2ef Guido Trotter

378 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
379 39dcf2ef Guido Trotter

380 39dcf2ef Guido Trotter
  """
381 7260cfbe Iustin Pop
  # pylint: disable-msg=W0212
382 7260cfbe Iustin Pop
  # we do want to ensure a singleton here
383 39dcf2ef Guido Trotter
  _instance = None
384 39dcf2ef Guido Trotter
385 39dcf2ef Guido Trotter
  def __init__(self):
386 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
387 39dcf2ef Guido Trotter

388 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
389 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
390 39dcf2ef Guido Trotter

391 39dcf2ef Guido Trotter
    """
392 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
393 39dcf2ef Guido Trotter
394 9113300d Michael Hanselmann
    # Create global configuration object
395 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
396 9113300d Michael Hanselmann
397 9113300d Michael Hanselmann
    # Locking manager
398 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
399 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
400 819ca990 Guido Trotter
                self.cfg.GetNodeGroupList(),
401 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
402 39dcf2ef Guido Trotter
403 9113300d Michael Hanselmann
    # Job queue
404 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
405 9113300d Michael Hanselmann
406 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
407 39dcf2ef Guido Trotter
    self.__class__._instance = self
408 39dcf2ef Guido Trotter
409 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
410 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
411 39dcf2ef Guido Trotter

412 39dcf2ef Guido Trotter
    """
413 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
414 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
415 39dcf2ef Guido Trotter
416 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
417 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
418 d8470559 Michael Hanselmann

419 d8470559 Michael Hanselmann
    """
420 d8470559 Michael Hanselmann
    # Add it to the configuration
421 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
422 d8470559 Michael Hanselmann
423 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
424 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
425 c36176cc Michael Hanselmann
426 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
427 d8470559 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE, node.name)
428 d8470559 Michael Hanselmann
429 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
430 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
431 d8470559 Michael Hanselmann

432 d8470559 Michael Hanselmann
    """
433 c36176cc Michael Hanselmann
    # Synchronize the queue again
434 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
435 d8470559 Michael Hanselmann
436 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
437 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
438 d8470559 Michael Hanselmann

439 d8470559 Michael Hanselmann
    """
440 d8470559 Michael Hanselmann
    # Remove node from configuration
441 d8470559 Michael Hanselmann
    self.cfg.RemoveNode(name)
442 d8470559 Michael Hanselmann
443 c36176cc Michael Hanselmann
    # Notify job queue
444 c36176cc Michael Hanselmann
    self.jobqueue.RemoveNode(name)
445 c36176cc Michael Hanselmann
446 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
447 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
448 d8470559 Michael Hanselmann
449 39dcf2ef Guido Trotter
450 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
451 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
452 05e50653 Michael Hanselmann

453 05e50653 Michael Hanselmann
  @type until: None or int
454 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
455 05e50653 Michael Hanselmann

456 05e50653 Michael Hanselmann
  """
457 05e50653 Michael Hanselmann
  if until is None:
458 05e50653 Michael Hanselmann
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
459 05e50653 Michael Hanselmann
  else:
460 05e50653 Michael Hanselmann
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
461 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
462 05e50653 Michael Hanselmann
463 28b498cd Michael Hanselmann
  return until
464 28b498cd Michael Hanselmann
465 05e50653 Michael Hanselmann
466 e0e916fe Iustin Pop
@rpc.RunWithRPC
467 36205981 Iustin Pop
def CheckAgreement():
468 36205981 Iustin Pop
  """Check the agreement on who is the master.
469 36205981 Iustin Pop

470 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
471 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
472 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
473 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
474 36205981 Iustin Pop
  obsolete) known nodes.
475 36205981 Iustin Pop

476 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
477 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
478 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
479 d7cdb55d Iustin Pop
  should be attempted.
480 d7cdb55d Iustin Pop

481 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
482 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
483 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
484 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
485 d7cdb55d Iustin Pop

486 36205981 Iustin Pop
  """
487 b705c7a6 Manuel Franceschini
  myself = netutils.Hostname.GetSysName()
488 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
489 36205981 Iustin Pop
  cfg = config.ConfigWriter()
490 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
491 36205981 Iustin Pop
  del cfg
492 d7cdb55d Iustin Pop
  retries = 6
493 d7cdb55d Iustin Pop
  while retries > 0:
494 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
495 d7cdb55d Iustin Pop
    if not votes:
496 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
497 d7cdb55d Iustin Pop
      return True
498 d7cdb55d Iustin Pop
    if votes[0][0] is None:
499 d7cdb55d Iustin Pop
      retries -= 1
500 d7cdb55d Iustin Pop
      time.sleep(10)
501 36205981 Iustin Pop
      continue
502 d7cdb55d Iustin Pop
    break
503 d7cdb55d Iustin Pop
  if retries == 0:
504 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
505 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
506 d8f5a37d Iustin Pop
    logging.critical("Use the --no-voting option if you understand what"
507 d8f5a37d Iustin Pop
                     " effects it has on the cluster state")
508 e09fdcfa Iustin Pop
    return False
509 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
510 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
511 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
512 8a20c732 Michael Hanselmann
513 d7cdb55d Iustin Pop
  result = False
514 d7cdb55d Iustin Pop
  if top_node != myself:
515 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
516 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
517 bbe19c17 Iustin Pop
                     all_votes)
518 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
519 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
520 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
521 d7cdb55d Iustin Pop
  else:
522 d7cdb55d Iustin Pop
    result = True
523 d7cdb55d Iustin Pop
524 d7cdb55d Iustin Pop
  return result
525 36205981 Iustin Pop
526 6c948699 Michael Hanselmann
527 340f4757 Iustin Pop
@rpc.RunWithRPC
528 340f4757 Iustin Pop
def ActivateMasterIP():
529 340f4757 Iustin Pop
  # activate ip
530 340f4757 Iustin Pop
  master_node = ssconf.SimpleStore().GetMasterNode()
531 340f4757 Iustin Pop
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
532 340f4757 Iustin Pop
  msg = result.fail_msg
533 340f4757 Iustin Pop
  if msg:
534 340f4757 Iustin Pop
    logging.error("Can't activate master IP address: %s", msg)
535 340f4757 Iustin Pop
536 340f4757 Iustin Pop
537 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
538 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
539 ed0efaa5 Michael Hanselmann

540 ed0efaa5 Michael Hanselmann
  """
541 f93427cd Iustin Pop
  if args: # masterd doesn't take any arguments
542 f93427cd Iustin Pop
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
543 f93427cd Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
544 f93427cd Iustin Pop
545 ed0efaa5 Michael Hanselmann
  ssconf.CheckMaster(options.debug)
546 ed0efaa5 Michael Hanselmann
547 bbfd0568 René Nussbaumer
  try:
548 bbfd0568 René Nussbaumer
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
549 bbfd0568 René Nussbaumer
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
550 bbfd0568 René Nussbaumer
  except KeyError:
551 bbfd0568 René Nussbaumer
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
552 bbfd0568 René Nussbaumer
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
553 bbfd0568 René Nussbaumer
    sys.exit(constants.EXIT_FAILURE)
554 bbfd0568 René Nussbaumer
555 4b63dc7a Iustin Pop
  # Check the configuration is sane before anything else
556 4b63dc7a Iustin Pop
  try:
557 4b63dc7a Iustin Pop
    config.ConfigWriter()
558 4b63dc7a Iustin Pop
  except errors.ConfigVersionMismatch, err:
559 4b63dc7a Iustin Pop
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560 4b63dc7a Iustin Pop
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561 4b63dc7a Iustin Pop
    print >> sys.stderr,  \
562 4b63dc7a Iustin Pop
        ("Configuration version mismatch. The current Ganeti software"
563 4b63dc7a Iustin Pop
         " expects version %s, but the on-disk configuration file has"
564 4b63dc7a Iustin Pop
         " version %s. This is likely the result of upgrading the"
565 4b63dc7a Iustin Pop
         " software without running the upgrade procedure. Please contact"
566 4b63dc7a Iustin Pop
         " your cluster administrator or complete the upgrade using the"
567 4b63dc7a Iustin Pop
         " cfgupgrade utility, after reading the upgrade notes." %
568 4b63dc7a Iustin Pop
         (v1, v2))
569 4b63dc7a Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
570 4b63dc7a Iustin Pop
  except errors.ConfigurationError, err:
571 4b63dc7a Iustin Pop
    print >> sys.stderr, \
572 4b63dc7a Iustin Pop
        ("Configuration error while opening the configuration file: %s\n"
573 4b63dc7a Iustin Pop
         "This might be caused by an incomplete software upgrade or"
574 4b63dc7a Iustin Pop
         " by a corrupted configuration file. Until the problem is fixed"
575 4b63dc7a Iustin Pop
         " the master daemon cannot start." % str(err))
576 4b63dc7a Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
577 bbfd0568 René Nussbaumer
578 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
579 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
580 ed0efaa5 Michael Hanselmann
  if options.no_voting:
581 ed0efaa5 Michael Hanselmann
    if options.yes_do_it:
582 ed0efaa5 Michael Hanselmann
      return
583 ed0efaa5 Michael Hanselmann
584 ed0efaa5 Michael Hanselmann
    sys.stdout.write("The 'no voting' option has been selected.\n")
585 ed0efaa5 Michael Hanselmann
    sys.stdout.write("This is dangerous, please confirm by"
586 ed0efaa5 Michael Hanselmann
                     " typing uppercase 'yes': ")
587 ed0efaa5 Michael Hanselmann
    sys.stdout.flush()
588 ed0efaa5 Michael Hanselmann
589 ed0efaa5 Michael Hanselmann
    confirmation = sys.stdin.readline().strip()
590 ed0efaa5 Michael Hanselmann
    if confirmation != "YES":
591 7260cfbe Iustin Pop
      print >> sys.stderr, "Aborting."
592 ed0efaa5 Michael Hanselmann
      sys.exit(constants.EXIT_FAILURE)
593 ed0efaa5 Michael Hanselmann
594 ed0efaa5 Michael Hanselmann
    return
595 ed0efaa5 Michael Hanselmann
596 ed0efaa5 Michael Hanselmann
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
597 ed0efaa5 Michael Hanselmann
  # process before we call utils.Daemonize in the current process.
598 e0e916fe Iustin Pop
  if not utils.RunInSeparateProcess(CheckAgreement):
599 ed0efaa5 Michael Hanselmann
    sys.exit(constants.EXIT_FAILURE)
600 ed0efaa5 Michael Hanselmann
601 340f4757 Iustin Pop
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
602 340f4757 Iustin Pop
  # separate process.
603 340f4757 Iustin Pop
604 340f4757 Iustin Pop
  # TODO: decide whether failure to activate the master IP is a fatal error
605 340f4757 Iustin Pop
  utils.RunInSeparateProcess(ActivateMasterIP)
606 340f4757 Iustin Pop
607 ed0efaa5 Michael Hanselmann
608 3ee53f1f Iustin Pop
def PrepMasterd(options, _):
609 3ee53f1f Iustin Pop
  """Prep master daemon function, executed with the PID file held.
610 3b316acb Iustin Pop

611 04ccf5e9 Guido Trotter
  """
612 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
613 04ccf5e9 Guido Trotter
  # concurrent execution.
614 04ccf5e9 Guido Trotter
  utils.RemoveFile(constants.MASTER_SOCKET)
615 b1b6ea87 Iustin Pop
616 cdd7f900 Guido Trotter
  mainloop = daemon.Mainloop()
617 7e5a6e86 Guido Trotter
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
618 bbfd0568 René Nussbaumer
                        options.uid, options.gid)
619 3ee53f1f Iustin Pop
  return (mainloop, master)
620 3ee53f1f Iustin Pop
621 3ee53f1f Iustin Pop
622 3ee53f1f Iustin Pop
def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
623 3ee53f1f Iustin Pop
  """Main master daemon function, executed with the PID file held.
624 3ee53f1f Iustin Pop

625 3ee53f1f Iustin Pop
  """
626 3ee53f1f Iustin Pop
  (mainloop, master) = prep_data
627 04ccf5e9 Guido Trotter
  try:
628 15486fa7 Michael Hanselmann
    rpc.Init()
629 4331f6cd Michael Hanselmann
    try:
630 15486fa7 Michael Hanselmann
      master.setup_queue()
631 15486fa7 Michael Hanselmann
      try:
632 cdd7f900 Guido Trotter
        mainloop.Run()
633 15486fa7 Michael Hanselmann
      finally:
634 15486fa7 Michael Hanselmann
        master.server_cleanup()
635 4331f6cd Michael Hanselmann
    finally:
636 15486fa7 Michael Hanselmann
      rpc.Shutdown()
637 a4af651e Iustin Pop
  finally:
638 227647ac Guido Trotter
    utils.RemoveFile(constants.MASTER_SOCKET)
639 a4af651e Iustin Pop
640 ffeffa1d Iustin Pop
641 29d91329 Michael Hanselmann
def Main():
642 04ccf5e9 Guido Trotter
  """Main function"""
643 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
644 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
645 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
646 04ccf5e9 Guido Trotter
                        constants.RELEASE_VERSION)
647 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
648 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
649 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
650 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
651 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
652 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
653 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
654 3ee53f1f Iustin Pop
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
655 b42ea9ed Iustin Pop
                     ExecMasterd, multithreaded=True)