Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ b2acdbdc

History | View | Annotate | Download (21.7 kB)

1 69cf3abd Michael Hanselmann
#
2 ffeffa1d Iustin Pop
#
3 ffeffa1d Iustin Pop
4 f2af0bec Iustin Pop
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 ffeffa1d Iustin Pop
#
6 ffeffa1d Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 ffeffa1d Iustin Pop
# it under the terms of the GNU General Public License as published by
8 ffeffa1d Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 ffeffa1d Iustin Pop
# (at your option) any later version.
10 ffeffa1d Iustin Pop
#
11 ffeffa1d Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 ffeffa1d Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ffeffa1d Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ffeffa1d Iustin Pop
# General Public License for more details.
15 ffeffa1d Iustin Pop
#
16 ffeffa1d Iustin Pop
# You should have received a copy of the GNU General Public License
17 ffeffa1d Iustin Pop
# along with this program; if not, write to the Free Software
18 ffeffa1d Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ffeffa1d Iustin Pop
# 02110-1301, USA.
20 ffeffa1d Iustin Pop
21 ffeffa1d Iustin Pop
22 ffeffa1d Iustin Pop
"""Master daemon program.
23 ffeffa1d Iustin Pop

24 ffeffa1d Iustin Pop
Some classes deviates from the standard style guide since the
25 ffeffa1d Iustin Pop
inheritance from parent classes requires it.
26 ffeffa1d Iustin Pop

27 ffeffa1d Iustin Pop
"""
28 ffeffa1d Iustin Pop
29 b459a848 Andrea Spadaccini
# pylint: disable=C0103
30 7260cfbe Iustin Pop
# C0103: Invalid name ganeti-masterd
31 ffeffa1d Iustin Pop
32 bbfd0568 René Nussbaumer
import grp
33 bbfd0568 René Nussbaumer
import os
34 bbfd0568 René Nussbaumer
import pwd
35 c1f2901b Iustin Pop
import sys
36 cdd7f900 Guido Trotter
import socket
37 ffeffa1d Iustin Pop
import time
38 bbfd0568 René Nussbaumer
import tempfile
39 96cb3986 Michael Hanselmann
import logging
40 ffeffa1d Iustin Pop
41 c1f2901b Iustin Pop
from optparse import OptionParser
42 ffeffa1d Iustin Pop
43 39dcf2ef Guido Trotter
from ganeti import config
44 ffeffa1d Iustin Pop
from ganeti import constants
45 04ccf5e9 Guido Trotter
from ganeti import daemon
46 ffeffa1d Iustin Pop
from ganeti import mcpu
47 ffeffa1d Iustin Pop
from ganeti import opcodes
48 ffeffa1d Iustin Pop
from ganeti import jqueue
49 39dcf2ef Guido Trotter
from ganeti import locking
50 ffeffa1d Iustin Pop
from ganeti import luxi
51 ffeffa1d Iustin Pop
from ganeti import utils
52 c1f2901b Iustin Pop
from ganeti import errors
53 c1f2901b Iustin Pop
from ganeti import ssconf
54 23e50d39 Michael Hanselmann
from ganeti import workerpool
55 b1b6ea87 Iustin Pop
from ganeti import rpc
56 d7cdb55d Iustin Pop
from ganeti import bootstrap
57 a744b676 Manuel Franceschini
from ganeti import netutils
58 28b71a76 Michael Hanselmann
from ganeti import objects
59 24d16f76 Michael Hanselmann
from ganeti import query
60 c1f2901b Iustin Pop
61 c1f2901b Iustin Pop
62 23e50d39 Michael Hanselmann
CLIENT_REQUEST_WORKERS = 16
63 23e50d39 Michael Hanselmann
64 c1f2901b Iustin Pop
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65 c1f2901b Iustin Pop
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66 ffeffa1d Iustin Pop
67 ffeffa1d Iustin Pop
68 23e50d39 Michael Hanselmann
class ClientRequestWorker(workerpool.BaseWorker):
69 b459a848 Andrea Spadaccini
  # pylint: disable=W0221
70 7e5a6e86 Guido Trotter
  def RunTask(self, server, message, client):
71 23e50d39 Michael Hanselmann
    """Process the request.
72 23e50d39 Michael Hanselmann

73 23e50d39 Michael Hanselmann
    """
74 7e5a6e86 Guido Trotter
    client_ops = ClientOps(server)
75 7e5a6e86 Guido Trotter
76 23e50d39 Michael Hanselmann
    try:
77 e986f20c Michael Hanselmann
      (method, args, version) = luxi.ParseRequest(message)
78 7e5a6e86 Guido Trotter
    except luxi.ProtocolError, err:
79 7e5a6e86 Guido Trotter
      logging.error("Protocol Error: %s", err)
80 7e5a6e86 Guido Trotter
      client.close_log()
81 7e5a6e86 Guido Trotter
      return
82 7e5a6e86 Guido Trotter
83 7e5a6e86 Guido Trotter
    success = False
84 7e5a6e86 Guido Trotter
    try:
85 e986f20c Michael Hanselmann
      # Verify client's version if there was one in the request
86 e986f20c Michael Hanselmann
      if version is not None and version != constants.LUXI_VERSION:
87 e986f20c Michael Hanselmann
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88 e986f20c Michael Hanselmann
                               (constants.LUXI_VERSION, version))
89 e986f20c Michael Hanselmann
90 7e5a6e86 Guido Trotter
      result = client_ops.handle_request(method, args)
91 7e5a6e86 Guido Trotter
      success = True
92 7e5a6e86 Guido Trotter
    except errors.GenericError, err:
93 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
94 7e5a6e86 Guido Trotter
      success = False
95 7e5a6e86 Guido Trotter
      result = errors.EncodeException(err)
96 7e5a6e86 Guido Trotter
    except:
97 7e5a6e86 Guido Trotter
      logging.exception("Unexpected exception")
98 7e5a6e86 Guido Trotter
      err = sys.exc_info()
99 7e5a6e86 Guido Trotter
      result = "Caught exception: %s" % str(err[1])
100 7e5a6e86 Guido Trotter
101 7e5a6e86 Guido Trotter
    try:
102 7e5a6e86 Guido Trotter
      reply = luxi.FormatResponse(success, result)
103 7e5a6e86 Guido Trotter
      client.send_message(reply)
104 7e5a6e86 Guido Trotter
      # awake the main thread so that it can write out the data.
105 7e5a6e86 Guido Trotter
      server.awaker.signal()
106 b459a848 Andrea Spadaccini
    except: # pylint: disable=W0702
107 7e5a6e86 Guido Trotter
      logging.exception("Send error")
108 7e5a6e86 Guido Trotter
      client.close_log()
109 7e5a6e86 Guido Trotter
110 7e5a6e86 Guido Trotter
111 7e5a6e86 Guido Trotter
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112 7e5a6e86 Guido Trotter
  """Handler for master peers.
113 7e5a6e86 Guido Trotter

114 7e5a6e86 Guido Trotter
  """
115 7e5a6e86 Guido Trotter
  _MAX_UNHANDLED = 1
116 e687ec01 Michael Hanselmann
117 7e5a6e86 Guido Trotter
  def __init__(self, server, connected_socket, client_address, family):
118 7e5a6e86 Guido Trotter
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119 7e5a6e86 Guido Trotter
                                                 client_address,
120 7e5a6e86 Guido Trotter
                                                 constants.LUXI_EOM,
121 7e5a6e86 Guido Trotter
                                                 family, self._MAX_UNHANDLED)
122 7e5a6e86 Guido Trotter
    self.server = server
123 7e5a6e86 Guido Trotter
124 7e5a6e86 Guido Trotter
  def handle_message(self, message, _):
125 b2e8a4d9 Michael Hanselmann
    self.server.request_workers.AddTask((self.server, message, self))
126 23e50d39 Michael Hanselmann
127 23e50d39 Michael Hanselmann
128 cdd7f900 Guido Trotter
class MasterServer(daemon.AsyncStreamServer):
129 cdd7f900 Guido Trotter
  """Master Server.
130 ffeffa1d Iustin Pop

131 cdd7f900 Guido Trotter
  This is the main asynchronous master server. It handles connections to the
132 cdd7f900 Guido Trotter
  master socket.
133 ffeffa1d Iustin Pop

134 ffeffa1d Iustin Pop
  """
135 7e5a6e86 Guido Trotter
  family = socket.AF_UNIX
136 7e5a6e86 Guido Trotter
137 7e5a6e86 Guido Trotter
  def __init__(self, mainloop, address, uid, gid):
138 cdd7f900 Guido Trotter
    """MasterServer constructor
139 ce862cd5 Guido Trotter

140 cdd7f900 Guido Trotter
    @type mainloop: ganeti.daemon.Mainloop
141 cdd7f900 Guido Trotter
    @param mainloop: Mainloop used to poll for I/O events
142 cdd7f900 Guido Trotter
    @param address: the unix socket address to bind the MasterServer to
143 bbfd0568 René Nussbaumer
    @param uid: The uid of the owner of the socket
144 bbfd0568 René Nussbaumer
    @param gid: The gid of the owner of the socket
145 ce862cd5 Guido Trotter

146 ce862cd5 Guido Trotter
    """
147 bbfd0568 René Nussbaumer
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148 7e5a6e86 Guido Trotter
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149 bbfd0568 René Nussbaumer
    os.chmod(temp_name, 0770)
150 bbfd0568 René Nussbaumer
    os.chown(temp_name, uid, gid)
151 bbfd0568 René Nussbaumer
    os.rename(temp_name, address)
152 bbfd0568 René Nussbaumer
153 cdd7f900 Guido Trotter
    self.mainloop = mainloop
154 7e5a6e86 Guido Trotter
    self.awaker = daemon.AsyncAwaker()
155 50a3fbb2 Michael Hanselmann
156 50a3fbb2 Michael Hanselmann
    # We'll only start threads once we've forked.
157 9113300d Michael Hanselmann
    self.context = None
158 23e50d39 Michael Hanselmann
    self.request_workers = None
159 50a3fbb2 Michael Hanselmann
160 cdd7f900 Guido Trotter
  def handle_connection(self, connected_socket, client_address):
161 7e5a6e86 Guido Trotter
    # TODO: add connection count and limit the number of open connections to a
162 7e5a6e86 Guido Trotter
    # maximum number to avoid breaking for lack of file descriptors or memory.
163 7e5a6e86 Guido Trotter
    MasterClientHandler(self, connected_socket, client_address, self.family)
164 cdd7f900 Guido Trotter
165 50a3fbb2 Michael Hanselmann
  def setup_queue(self):
166 9113300d Michael Hanselmann
    self.context = GanetiContext()
167 89e2b4d2 Michael Hanselmann
    self.request_workers = workerpool.WorkerPool("ClientReq",
168 89e2b4d2 Michael Hanselmann
                                                 CLIENT_REQUEST_WORKERS,
169 23e50d39 Michael Hanselmann
                                                 ClientRequestWorker)
170 ffeffa1d Iustin Pop
171 c1f2901b Iustin Pop
  def server_cleanup(self):
172 c1f2901b Iustin Pop
    """Cleanup the server.
173 c1f2901b Iustin Pop

174 c1f2901b Iustin Pop
    This involves shutting down the processor threads and the master
175 c1f2901b Iustin Pop
    socket.
176 c1f2901b Iustin Pop

177 c1f2901b Iustin Pop
    """
178 50a3fbb2 Michael Hanselmann
    try:
179 cdd7f900 Guido Trotter
      self.close()
180 50a3fbb2 Michael Hanselmann
    finally:
181 23e50d39 Michael Hanselmann
      if self.request_workers:
182 36088c4c Michael Hanselmann
        self.request_workers.TerminateWorkers()
183 9113300d Michael Hanselmann
      if self.context:
184 9113300d Michael Hanselmann
        self.context.jobqueue.Shutdown()
185 ffeffa1d Iustin Pop
186 ffeffa1d Iustin Pop
187 ffeffa1d Iustin Pop
class ClientOps:
188 ffeffa1d Iustin Pop
  """Class holding high-level client operations."""
189 ffeffa1d Iustin Pop
  def __init__(self, server):
190 ffeffa1d Iustin Pop
    self.server = server
191 ffeffa1d Iustin Pop
192 b459a848 Andrea Spadaccini
  def handle_request(self, method, args): # pylint: disable=R0911
193 9113300d Michael Hanselmann
    queue = self.server.context.jobqueue
194 0bbe448c Michael Hanselmann
195 0bbe448c Michael Hanselmann
    # TODO: Parameter validation
196 a629ecb9 Iustin Pop
    if not isinstance(args, (tuple, list)):
197 a629ecb9 Iustin Pop
      logging.info("Received invalid arguments of type '%s'", type(args))
198 a629ecb9 Iustin Pop
      raise ValueError("Invalid arguments type '%s'" % type(args))
199 0bbe448c Michael Hanselmann
200 7260cfbe Iustin Pop
    # TODO: Rewrite to not exit in each 'if/elif' branch
201 7260cfbe Iustin Pop
202 0bbe448c Michael Hanselmann
    if method == luxi.REQ_SUBMIT_JOB:
203 e566ddbd Iustin Pop
      logging.info("Received new job")
204 0bbe448c Michael Hanselmann
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
205 4c848b18 Michael Hanselmann
      return queue.SubmitJob(ops)
206 ffeffa1d Iustin Pop
207 2971c913 Iustin Pop
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
208 2971c913 Iustin Pop
      logging.info("Received multiple jobs")
209 2971c913 Iustin Pop
      jobs = []
210 2971c913 Iustin Pop
      for ops in args:
211 2971c913 Iustin Pop
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
212 2971c913 Iustin Pop
      return queue.SubmitManyJobs(jobs)
213 2971c913 Iustin Pop
214 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_CANCEL_JOB:
215 a629ecb9 Iustin Pop
      (job_id, ) = args
216 e566ddbd Iustin Pop
      logging.info("Received job cancel request for %s", job_id)
217 0bbe448c Michael Hanselmann
      return queue.CancelJob(job_id)
218 ffeffa1d Iustin Pop
219 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_ARCHIVE_JOB:
220 a629ecb9 Iustin Pop
      (job_id, ) = args
221 e566ddbd Iustin Pop
      logging.info("Received job archive request for %s", job_id)
222 0bbe448c Michael Hanselmann
      return queue.ArchiveJob(job_id)
223 0bbe448c Michael Hanselmann
224 07cd723a Iustin Pop
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225 f8ad5591 Michael Hanselmann
      (age, timeout) = args
226 e566ddbd Iustin Pop
      logging.info("Received job autoarchive request for age %s, timeout %s",
227 e566ddbd Iustin Pop
                   age, timeout)
228 f8ad5591 Michael Hanselmann
      return queue.AutoArchiveJobs(age, timeout)
229 07cd723a Iustin Pop
230 dfe57c22 Michael Hanselmann
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
231 5c735209 Iustin Pop
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
232 e566ddbd Iustin Pop
      logging.info("Received job poll request for %s", job_id)
233 6c5a7090 Michael Hanselmann
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
234 5c735209 Iustin Pop
                                     prev_log_serial, timeout)
235 dfe57c22 Michael Hanselmann
236 28b71a76 Michael Hanselmann
    elif method == luxi.REQ_QUERY:
237 a629ecb9 Iustin Pop
      (what, fields, qfilter) = args
238 a629ecb9 Iustin Pop
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
239 28b71a76 Michael Hanselmann
240 abd66bf8 Michael Hanselmann
      if req.what in constants.QR_VIA_OP:
241 28b71a76 Michael Hanselmann
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
242 2e5c33db Iustin Pop
                                             qfilter=req.qfilter))
243 24d16f76 Michael Hanselmann
      elif req.what == constants.QR_LOCK:
244 2e5c33db Iustin Pop
        if req.qfilter is not None:
245 24d16f76 Michael Hanselmann
          raise errors.OpPrereqError("Lock queries can't be filtered")
246 24d16f76 Michael Hanselmann
        return self.server.context.glm.QueryLocks(req.fields)
247 abd66bf8 Michael Hanselmann
      elif req.what in constants.QR_VIA_LUXI:
248 28b71a76 Michael Hanselmann
        raise NotImplementedError
249 28b71a76 Michael Hanselmann
      else:
250 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
251 28b71a76 Michael Hanselmann
                                   errors.ECODE_INVAL)
252 28b71a76 Michael Hanselmann
253 28b71a76 Michael Hanselmann
      return result
254 28b71a76 Michael Hanselmann
255 28b71a76 Michael Hanselmann
    elif method == luxi.REQ_QUERY_FIELDS:
256 a629ecb9 Iustin Pop
      (what, fields) = args
257 a629ecb9 Iustin Pop
      req = objects.QueryFieldsRequest(what=what, fields=fields)
258 28b71a76 Michael Hanselmann
259 c1391810 Michael Hanselmann
      try:
260 c1391810 Michael Hanselmann
        fielddefs = query.ALL_FIELDS[req.what]
261 c1391810 Michael Hanselmann
      except KeyError:
262 28b71a76 Michael Hanselmann
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
263 28b71a76 Michael Hanselmann
                                   errors.ECODE_INVAL)
264 28b71a76 Michael Hanselmann
265 c1391810 Michael Hanselmann
      return query.QueryFields(fielddefs, req.fields)
266 28b71a76 Michael Hanselmann
267 0bbe448c Michael Hanselmann
    elif method == luxi.REQ_QUERY_JOBS:
268 0bbe448c Michael Hanselmann
      (job_ids, fields) = args
269 e566ddbd Iustin Pop
      if isinstance(job_ids, (tuple, list)) and job_ids:
270 1f864b60 Iustin Pop
        msg = utils.CommaJoin(job_ids)
271 e566ddbd Iustin Pop
      else:
272 e566ddbd Iustin Pop
        msg = str(job_ids)
273 e566ddbd Iustin Pop
      logging.info("Received job query request for %s", msg)
274 0bbe448c Michael Hanselmann
      return queue.QueryJobs(job_ids, fields)
275 0bbe448c Michael Hanselmann
276 ee6c7b94 Michael Hanselmann
    elif method == luxi.REQ_QUERY_INSTANCES:
277 ec79568d Iustin Pop
      (names, fields, use_locking) = args
278 e566ddbd Iustin Pop
      logging.info("Received instance query request for %s", names)
279 77921a95 Iustin Pop
      if use_locking:
280 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
281 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
282 f2af0bec Iustin Pop
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
283 f2af0bec Iustin Pop
                                   use_locking=use_locking)
284 ee6c7b94 Michael Hanselmann
      return self._Query(op)
285 ee6c7b94 Michael Hanselmann
286 02f7fe54 Michael Hanselmann
    elif method == luxi.REQ_QUERY_NODES:
287 ec79568d Iustin Pop
      (names, fields, use_locking) = args
288 e566ddbd Iustin Pop
      logging.info("Received node query request for %s", names)
289 77921a95 Iustin Pop
      if use_locking:
290 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
291 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
292 2237687b Iustin Pop
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
293 2237687b Iustin Pop
                               use_locking=use_locking)
294 02f7fe54 Michael Hanselmann
      return self._Query(op)
295 02f7fe54 Michael Hanselmann
296 a79ef2a5 Adeodato Simo
    elif method == luxi.REQ_QUERY_GROUPS:
297 a79ef2a5 Adeodato Simo
      (names, fields, use_locking) = args
298 a79ef2a5 Adeodato Simo
      logging.info("Received group query request for %s", names)
299 a79ef2a5 Adeodato Simo
      if use_locking:
300 a79ef2a5 Adeodato Simo
        raise errors.OpPrereqError("Sync queries are not allowed",
301 a79ef2a5 Adeodato Simo
                                   errors.ECODE_INVAL)
302 d4d654bd Iustin Pop
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
303 a79ef2a5 Adeodato Simo
      return self._Query(op)
304 a79ef2a5 Adeodato Simo
305 32f93223 Michael Hanselmann
    elif method == luxi.REQ_QUERY_EXPORTS:
306 a629ecb9 Iustin Pop
      (nodes, use_locking) = args
307 77921a95 Iustin Pop
      if use_locking:
308 debac808 Iustin Pop
        raise errors.OpPrereqError("Sync queries are not allowed",
309 debac808 Iustin Pop
                                   errors.ECODE_INVAL)
310 e566ddbd Iustin Pop
      logging.info("Received exports query request")
311 7ca2d4d8 Iustin Pop
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
312 32f93223 Michael Hanselmann
      return self._Query(op)
313 32f93223 Michael Hanselmann
314 ae5849b5 Michael Hanselmann
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
315 a629ecb9 Iustin Pop
      (fields, ) = args
316 e566ddbd Iustin Pop
      logging.info("Received config values query request for %s", fields)
317 2f093ea0 Iustin Pop
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
318 ae5849b5 Michael Hanselmann
      return self._Query(op)
319 ae5849b5 Michael Hanselmann
320 66baeccc Iustin Pop
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
321 e566ddbd Iustin Pop
      logging.info("Received cluster info query request")
322 a2f7ab92 Iustin Pop
      op = opcodes.OpClusterQuery()
323 66baeccc Iustin Pop
      return self._Query(op)
324 66baeccc Iustin Pop
325 7699c3af Iustin Pop
    elif method == luxi.REQ_QUERY_TAGS:
326 a629ecb9 Iustin Pop
      (kind, name) = args
327 7699c3af Iustin Pop
      logging.info("Received tags query request")
328 c6afb1ca Iustin Pop
      op = opcodes.OpTagsGet(kind=kind, name=name)
329 7699c3af Iustin Pop
      return self._Query(op)
330 7699c3af Iustin Pop
331 19b9ba9a Michael Hanselmann
    elif method == luxi.REQ_QUERY_LOCKS:
332 19b9ba9a Michael Hanselmann
      (fields, sync) = args
333 19b9ba9a Michael Hanselmann
      logging.info("Received locks query request")
334 24d16f76 Michael Hanselmann
      if sync:
335 24d16f76 Michael Hanselmann
        raise NotImplementedError("Synchronous queries are not implemented")
336 24d16f76 Michael Hanselmann
      return self.server.context.glm.OldStyleQueryLocks(fields)
337 19b9ba9a Michael Hanselmann
338 3ccafd0e Iustin Pop
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
339 a629ecb9 Iustin Pop
      (drain_flag, ) = args
340 e566ddbd Iustin Pop
      logging.info("Received queue drain flag change request to %s",
341 e566ddbd Iustin Pop
                   drain_flag)
342 3ccafd0e Iustin Pop
      return queue.SetDrainFlag(drain_flag)
343 3ccafd0e Iustin Pop
344 05e50653 Michael Hanselmann
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
345 05e50653 Michael Hanselmann
      (until, ) = args
346 05e50653 Michael Hanselmann
347 05e50653 Michael Hanselmann
      if until is None:
348 05e50653 Michael Hanselmann
        logging.info("Received request to no longer pause the watcher")
349 05e50653 Michael Hanselmann
      else:
350 05e50653 Michael Hanselmann
        if not isinstance(until, (int, float)):
351 05e50653 Michael Hanselmann
          raise TypeError("Duration must be an integer or float")
352 05e50653 Michael Hanselmann
353 05e50653 Michael Hanselmann
        if until < time.time():
354 05e50653 Michael Hanselmann
          raise errors.GenericError("Unable to set pause end time in the past")
355 05e50653 Michael Hanselmann
356 05e50653 Michael Hanselmann
        logging.info("Received request to pause the watcher until %s", until)
357 05e50653 Michael Hanselmann
358 05e50653 Michael Hanselmann
      return _SetWatcherPause(until)
359 05e50653 Michael Hanselmann
360 0bbe448c Michael Hanselmann
    else:
361 e566ddbd Iustin Pop
      logging.info("Received invalid request '%s'", method)
362 e566ddbd Iustin Pop
      raise ValueError("Invalid operation '%s'" % method)
363 ffeffa1d Iustin Pop
364 ee6c7b94 Michael Hanselmann
  def _Query(self, op):
365 ee6c7b94 Michael Hanselmann
    """Runs the specified opcode and returns the result.
366 ee6c7b94 Michael Hanselmann

367 ee6c7b94 Michael Hanselmann
    """
368 adfa97e3 Guido Trotter
    # Queries don't have a job id
369 adfa97e3 Guido Trotter
    proc = mcpu.Processor(self.server.context, None)
370 26d3fd2f Michael Hanselmann
371 26d3fd2f Michael Hanselmann
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
372 26d3fd2f Michael Hanselmann
    # Consider using a timeout for retries.
373 031a3e57 Michael Hanselmann
    return proc.ExecOpCode(op, None)
374 ee6c7b94 Michael Hanselmann
375 ffeffa1d Iustin Pop
376 39dcf2ef Guido Trotter
class GanetiContext(object):
377 39dcf2ef Guido Trotter
  """Context common to all ganeti threads.
378 39dcf2ef Guido Trotter

379 39dcf2ef Guido Trotter
  This class creates and holds common objects shared by all threads.
380 39dcf2ef Guido Trotter

381 39dcf2ef Guido Trotter
  """
382 b459a848 Andrea Spadaccini
  # pylint: disable=W0212
383 7260cfbe Iustin Pop
  # we do want to ensure a singleton here
384 39dcf2ef Guido Trotter
  _instance = None
385 39dcf2ef Guido Trotter
386 39dcf2ef Guido Trotter
  def __init__(self):
387 39dcf2ef Guido Trotter
    """Constructs a new GanetiContext object.
388 39dcf2ef Guido Trotter

389 39dcf2ef Guido Trotter
    There should be only a GanetiContext object at any time, so this
390 39dcf2ef Guido Trotter
    function raises an error if this is not the case.
391 39dcf2ef Guido Trotter

392 39dcf2ef Guido Trotter
    """
393 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "double GanetiContext instance"
394 39dcf2ef Guido Trotter
395 9113300d Michael Hanselmann
    # Create global configuration object
396 39dcf2ef Guido Trotter
    self.cfg = config.ConfigWriter()
397 9113300d Michael Hanselmann
398 9113300d Michael Hanselmann
    # Locking manager
399 984f7c32 Guido Trotter
    self.glm = locking.GanetiLockManager(
400 39dcf2ef Guido Trotter
                self.cfg.GetNodeList(),
401 819ca990 Guido Trotter
                self.cfg.GetNodeGroupList(),
402 39dcf2ef Guido Trotter
                self.cfg.GetInstanceList())
403 39dcf2ef Guido Trotter
404 b2acdbdc Michael Hanselmann
    self.cfg.SetContext(self)
405 b2acdbdc Michael Hanselmann
406 9113300d Michael Hanselmann
    # Job queue
407 9113300d Michael Hanselmann
    self.jobqueue = jqueue.JobQueue(self)
408 9113300d Michael Hanselmann
409 87b3cb26 Michael Hanselmann
    # RPC runner
410 87b3cb26 Michael Hanselmann
    self.rpc = rpc.RpcRunner(self)
411 87b3cb26 Michael Hanselmann
412 39dcf2ef Guido Trotter
    # setting this also locks the class against attribute modifications
413 39dcf2ef Guido Trotter
    self.__class__._instance = self
414 39dcf2ef Guido Trotter
415 39dcf2ef Guido Trotter
  def __setattr__(self, name, value):
416 39dcf2ef Guido Trotter
    """Setting GanetiContext attributes is forbidden after initialization.
417 39dcf2ef Guido Trotter

418 39dcf2ef Guido Trotter
    """
419 39dcf2ef Guido Trotter
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
420 39dcf2ef Guido Trotter
    object.__setattr__(self, name, value)
421 39dcf2ef Guido Trotter
422 0debfb35 Guido Trotter
  def AddNode(self, node, ec_id):
423 d8470559 Michael Hanselmann
    """Adds a node to the configuration and lock manager.
424 d8470559 Michael Hanselmann

425 d8470559 Michael Hanselmann
    """
426 d8470559 Michael Hanselmann
    # Add it to the configuration
427 0debfb35 Guido Trotter
    self.cfg.AddNode(node, ec_id)
428 d8470559 Michael Hanselmann
429 c36176cc Michael Hanselmann
    # If preseeding fails it'll not be added
430 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
431 c36176cc Michael Hanselmann
432 d8470559 Michael Hanselmann
    # Add the new node to the Ganeti Lock Manager
433 d8470559 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE, node.name)
434 4e070776 Michael Hanselmann
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
435 d8470559 Michael Hanselmann
436 d8470559 Michael Hanselmann
  def ReaddNode(self, node):
437 d8470559 Michael Hanselmann
    """Updates a node that's already in the configuration
438 d8470559 Michael Hanselmann

439 d8470559 Michael Hanselmann
    """
440 c36176cc Michael Hanselmann
    # Synchronize the queue again
441 99aabbed Iustin Pop
    self.jobqueue.AddNode(node)
442 d8470559 Michael Hanselmann
443 d8470559 Michael Hanselmann
  def RemoveNode(self, name):
444 d8470559 Michael Hanselmann
    """Removes a node from the configuration and lock manager.
445 d8470559 Michael Hanselmann

446 d8470559 Michael Hanselmann
    """
447 d8470559 Michael Hanselmann
    # Remove node from configuration
448 d8470559 Michael Hanselmann
    self.cfg.RemoveNode(name)
449 d8470559 Michael Hanselmann
450 c36176cc Michael Hanselmann
    # Notify job queue
451 c36176cc Michael Hanselmann
    self.jobqueue.RemoveNode(name)
452 c36176cc Michael Hanselmann
453 d8470559 Michael Hanselmann
    # Remove the node from the Ganeti Lock Manager
454 d8470559 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE, name)
455 4e070776 Michael Hanselmann
    self.glm.remove(locking.LEVEL_NODE_RES, name)
456 d8470559 Michael Hanselmann
457 39dcf2ef Guido Trotter
458 05e50653 Michael Hanselmann
def _SetWatcherPause(until):
459 05e50653 Michael Hanselmann
  """Creates or removes the watcher pause file.
460 05e50653 Michael Hanselmann

461 05e50653 Michael Hanselmann
  @type until: None or int
462 05e50653 Michael Hanselmann
  @param until: Unix timestamp saying until when the watcher shouldn't run
463 05e50653 Michael Hanselmann

464 05e50653 Michael Hanselmann
  """
465 05e50653 Michael Hanselmann
  if until is None:
466 05e50653 Michael Hanselmann
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
467 05e50653 Michael Hanselmann
  else:
468 05e50653 Michael Hanselmann
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
469 05e50653 Michael Hanselmann
                    data="%d\n" % (until, ))
470 05e50653 Michael Hanselmann
471 28b498cd Michael Hanselmann
  return until
472 28b498cd Michael Hanselmann
473 05e50653 Michael Hanselmann
474 e0e916fe Iustin Pop
@rpc.RunWithRPC
475 36205981 Iustin Pop
def CheckAgreement():
476 36205981 Iustin Pop
  """Check the agreement on who is the master.
477 36205981 Iustin Pop

478 36205981 Iustin Pop
  The function uses a very simple algorithm: we must get more positive
479 36205981 Iustin Pop
  than negative answers. Since in most of the cases we are the master,
480 36205981 Iustin Pop
  we'll use our own config file for getting the node list. In the
481 36205981 Iustin Pop
  future we could collect the current node list from our (possibly
482 36205981 Iustin Pop
  obsolete) known nodes.
483 36205981 Iustin Pop

484 d7cdb55d Iustin Pop
  In order to account for cold-start of all nodes, we retry for up to
485 d7cdb55d Iustin Pop
  a minute until we get a real answer as the top-voted one. If the
486 d7cdb55d Iustin Pop
  nodes are more out-of-sync, for now manual startup of the master
487 d7cdb55d Iustin Pop
  should be attempted.
488 d7cdb55d Iustin Pop

489 d7cdb55d Iustin Pop
  Note that for a even number of nodes cluster, we need at least half
490 d7cdb55d Iustin Pop
  of the nodes (beside ourselves) to vote for us. This creates a
491 d7cdb55d Iustin Pop
  problem on two-node clusters, since in this case we require the
492 d7cdb55d Iustin Pop
  other node to be up too to confirm our status.
493 d7cdb55d Iustin Pop

494 36205981 Iustin Pop
  """
495 b705c7a6 Manuel Franceschini
  myself = netutils.Hostname.GetSysName()
496 36205981 Iustin Pop
  #temp instantiation of a config writer, used only to get the node list
497 36205981 Iustin Pop
  cfg = config.ConfigWriter()
498 36205981 Iustin Pop
  node_list = cfg.GetNodeList()
499 36205981 Iustin Pop
  del cfg
500 d7cdb55d Iustin Pop
  retries = 6
501 d7cdb55d Iustin Pop
  while retries > 0:
502 d7cdb55d Iustin Pop
    votes = bootstrap.GatherMasterVotes(node_list)
503 d7cdb55d Iustin Pop
    if not votes:
504 d7cdb55d Iustin Pop
      # empty node list, this is a one node cluster
505 d7cdb55d Iustin Pop
      return True
506 d7cdb55d Iustin Pop
    if votes[0][0] is None:
507 d7cdb55d Iustin Pop
      retries -= 1
508 d7cdb55d Iustin Pop
      time.sleep(10)
509 36205981 Iustin Pop
      continue
510 d7cdb55d Iustin Pop
    break
511 d7cdb55d Iustin Pop
  if retries == 0:
512 e09fdcfa Iustin Pop
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
513 e09fdcfa Iustin Pop
                     " after multiple retries. Aborting startup")
514 d8f5a37d Iustin Pop
    logging.critical("Use the --no-voting option if you understand what"
515 d8f5a37d Iustin Pop
                     " effects it has on the cluster state")
516 e09fdcfa Iustin Pop
    return False
517 d7cdb55d Iustin Pop
  # here a real node is at the top of the list
518 d7cdb55d Iustin Pop
  all_votes = sum(item[1] for item in votes)
519 d7cdb55d Iustin Pop
  top_node, top_votes = votes[0]
520 8a20c732 Michael Hanselmann
521 d7cdb55d Iustin Pop
  result = False
522 d7cdb55d Iustin Pop
  if top_node != myself:
523 d7cdb55d Iustin Pop
    logging.critical("It seems we are not the master (top-voted node"
524 bbe19c17 Iustin Pop
                     " is %s with %d out of %d votes)", top_node, top_votes,
525 bbe19c17 Iustin Pop
                     all_votes)
526 d7cdb55d Iustin Pop
  elif top_votes < all_votes - top_votes:
527 36205981 Iustin Pop
    logging.critical("It seems we are not the master (%d votes for,"
528 d7cdb55d Iustin Pop
                     " %d votes against)", top_votes, all_votes - top_votes)
529 d7cdb55d Iustin Pop
  else:
530 d7cdb55d Iustin Pop
    result = True
531 d7cdb55d Iustin Pop
532 d7cdb55d Iustin Pop
  return result
533 36205981 Iustin Pop
534 6c948699 Michael Hanselmann
535 340f4757 Iustin Pop
@rpc.RunWithRPC
536 340f4757 Iustin Pop
def ActivateMasterIP():
537 340f4757 Iustin Pop
  # activate ip
538 8da2bd43 Andrea Spadaccini
  cfg = config.ConfigWriter()
539 f9d20654 Andrea Spadaccini
  master_params = cfg.GetMasterNetworkParameters()
540 8da2bd43 Andrea Spadaccini
  runner = rpc.BootstrapRunner()
541 f9d20654 Andrea Spadaccini
  result = runner.call_node_activate_master_ip(master_params.name,
542 c79198a0 Andrea Spadaccini
                                               master_params)
543 8da2bd43 Andrea Spadaccini
544 340f4757 Iustin Pop
  msg = result.fail_msg
545 340f4757 Iustin Pop
  if msg:
546 340f4757 Iustin Pop
    logging.error("Can't activate master IP address: %s", msg)
547 340f4757 Iustin Pop
548 340f4757 Iustin Pop
549 ed0efaa5 Michael Hanselmann
def CheckMasterd(options, args):
550 ed0efaa5 Michael Hanselmann
  """Initial checks whether to run or exit with a failure.
551 ed0efaa5 Michael Hanselmann

552 ed0efaa5 Michael Hanselmann
  """
553 f93427cd Iustin Pop
  if args: # masterd doesn't take any arguments
554 f93427cd Iustin Pop
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
555 f93427cd Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
556 f93427cd Iustin Pop
557 ed0efaa5 Michael Hanselmann
  ssconf.CheckMaster(options.debug)
558 ed0efaa5 Michael Hanselmann
559 bbfd0568 René Nussbaumer
  try:
560 bbfd0568 René Nussbaumer
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
561 bbfd0568 René Nussbaumer
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
562 bbfd0568 René Nussbaumer
  except KeyError:
563 bbfd0568 René Nussbaumer
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
564 bbfd0568 René Nussbaumer
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
565 bbfd0568 René Nussbaumer
    sys.exit(constants.EXIT_FAILURE)
566 bbfd0568 René Nussbaumer
567 4b63dc7a Iustin Pop
  # Check the configuration is sane before anything else
568 4b63dc7a Iustin Pop
  try:
569 4b63dc7a Iustin Pop
    config.ConfigWriter()
570 4b63dc7a Iustin Pop
  except errors.ConfigVersionMismatch, err:
571 4b63dc7a Iustin Pop
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
572 4b63dc7a Iustin Pop
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
573 4b63dc7a Iustin Pop
    print >> sys.stderr,  \
574 4b63dc7a Iustin Pop
        ("Configuration version mismatch. The current Ganeti software"
575 4b63dc7a Iustin Pop
         " expects version %s, but the on-disk configuration file has"
576 4b63dc7a Iustin Pop
         " version %s. This is likely the result of upgrading the"
577 4b63dc7a Iustin Pop
         " software without running the upgrade procedure. Please contact"
578 4b63dc7a Iustin Pop
         " your cluster administrator or complete the upgrade using the"
579 4b63dc7a Iustin Pop
         " cfgupgrade utility, after reading the upgrade notes." %
580 4b63dc7a Iustin Pop
         (v1, v2))
581 4b63dc7a Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
582 4b63dc7a Iustin Pop
  except errors.ConfigurationError, err:
583 4b63dc7a Iustin Pop
    print >> sys.stderr, \
584 4b63dc7a Iustin Pop
        ("Configuration error while opening the configuration file: %s\n"
585 4b63dc7a Iustin Pop
         "This might be caused by an incomplete software upgrade or"
586 4b63dc7a Iustin Pop
         " by a corrupted configuration file. Until the problem is fixed"
587 4b63dc7a Iustin Pop
         " the master daemon cannot start." % str(err))
588 4b63dc7a Iustin Pop
    sys.exit(constants.EXIT_FAILURE)
589 bbfd0568 René Nussbaumer
590 ed0efaa5 Michael Hanselmann
  # If CheckMaster didn't fail we believe we are the master, but we have to
591 ed0efaa5 Michael Hanselmann
  # confirm with the other nodes.
592 ed0efaa5 Michael Hanselmann
  if options.no_voting:
593 675e2bf5 Iustin Pop
    if not options.yes_do_it:
594 675e2bf5 Iustin Pop
      sys.stdout.write("The 'no voting' option has been selected.\n")
595 675e2bf5 Iustin Pop
      sys.stdout.write("This is dangerous, please confirm by"
596 675e2bf5 Iustin Pop
                       " typing uppercase 'yes': ")
597 675e2bf5 Iustin Pop
      sys.stdout.flush()
598 ed0efaa5 Michael Hanselmann
599 675e2bf5 Iustin Pop
      confirmation = sys.stdin.readline().strip()
600 675e2bf5 Iustin Pop
      if confirmation != "YES":
601 675e2bf5 Iustin Pop
        print >> sys.stderr, "Aborting."
602 675e2bf5 Iustin Pop
        sys.exit(constants.EXIT_FAILURE)
603 ed0efaa5 Michael Hanselmann
604 675e2bf5 Iustin Pop
  else:
605 675e2bf5 Iustin Pop
    # CheckAgreement uses RPC and threads, hence it needs to be run in
606 675e2bf5 Iustin Pop
    # a separate process before we call utils.Daemonize in the current
607 675e2bf5 Iustin Pop
    # process.
608 675e2bf5 Iustin Pop
    if not utils.RunInSeparateProcess(CheckAgreement):
609 ed0efaa5 Michael Hanselmann
      sys.exit(constants.EXIT_FAILURE)
610 ed0efaa5 Michael Hanselmann
611 340f4757 Iustin Pop
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
612 340f4757 Iustin Pop
  # separate process.
613 340f4757 Iustin Pop
614 340f4757 Iustin Pop
  # TODO: decide whether failure to activate the master IP is a fatal error
615 340f4757 Iustin Pop
  utils.RunInSeparateProcess(ActivateMasterIP)
616 340f4757 Iustin Pop
617 ed0efaa5 Michael Hanselmann
618 3ee53f1f Iustin Pop
def PrepMasterd(options, _):
619 3ee53f1f Iustin Pop
  """Prep master daemon function, executed with the PID file held.
620 3b316acb Iustin Pop

621 04ccf5e9 Guido Trotter
  """
622 04ccf5e9 Guido Trotter
  # This is safe to do as the pid file guarantees against
623 04ccf5e9 Guido Trotter
  # concurrent execution.
624 04ccf5e9 Guido Trotter
  utils.RemoveFile(constants.MASTER_SOCKET)
625 b1b6ea87 Iustin Pop
626 cdd7f900 Guido Trotter
  mainloop = daemon.Mainloop()
627 7e5a6e86 Guido Trotter
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
628 bbfd0568 René Nussbaumer
                        options.uid, options.gid)
629 3ee53f1f Iustin Pop
  return (mainloop, master)
630 3ee53f1f Iustin Pop
631 3ee53f1f Iustin Pop
632 b459a848 Andrea Spadaccini
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
633 3ee53f1f Iustin Pop
  """Main master daemon function, executed with the PID file held.
634 3ee53f1f Iustin Pop

635 3ee53f1f Iustin Pop
  """
636 3ee53f1f Iustin Pop
  (mainloop, master) = prep_data
637 04ccf5e9 Guido Trotter
  try:
638 15486fa7 Michael Hanselmann
    rpc.Init()
639 4331f6cd Michael Hanselmann
    try:
640 15486fa7 Michael Hanselmann
      master.setup_queue()
641 15486fa7 Michael Hanselmann
      try:
642 cdd7f900 Guido Trotter
        mainloop.Run()
643 15486fa7 Michael Hanselmann
      finally:
644 15486fa7 Michael Hanselmann
        master.server_cleanup()
645 4331f6cd Michael Hanselmann
    finally:
646 15486fa7 Michael Hanselmann
      rpc.Shutdown()
647 a4af651e Iustin Pop
  finally:
648 227647ac Guido Trotter
    utils.RemoveFile(constants.MASTER_SOCKET)
649 a4af651e Iustin Pop
650 ffeffa1d Iustin Pop
651 29d91329 Michael Hanselmann
def Main():
652 04ccf5e9 Guido Trotter
  """Main function"""
653 04ccf5e9 Guido Trotter
  parser = OptionParser(description="Ganeti master daemon",
654 04ccf5e9 Guido Trotter
                        usage="%prog [-f] [-d]",
655 04ccf5e9 Guido Trotter
                        version="%%prog (ganeti) %s" %
656 04ccf5e9 Guido Trotter
                        constants.RELEASE_VERSION)
657 04ccf5e9 Guido Trotter
  parser.add_option("--no-voting", dest="no_voting",
658 04ccf5e9 Guido Trotter
                    help="Do not check that the nodes agree on this node"
659 04ccf5e9 Guido Trotter
                    " being the master and start the daemon unconditionally",
660 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
661 04ccf5e9 Guido Trotter
  parser.add_option("--yes-do-it", dest="yes_do_it",
662 04ccf5e9 Guido Trotter
                    help="Override interactive check for --no-voting",
663 04ccf5e9 Guido Trotter
                    default=False, action="store_true")
664 3ee53f1f Iustin Pop
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
665 b42ea9ed Iustin Pop
                     ExecMasterd, multithreaded=True)