Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ d7cdb55d

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57

    
58

    
59
CLIENT_REQUEST_WORKERS = 16
60

    
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63

    
64

    
65
class ClientRequestWorker(workerpool.BaseWorker):
66
  def RunTask(self, server, request, client_address):
67
    """Process the request.
68

    
69
    This is copied from the code in ThreadingMixIn.
70

    
71
    """
72
    try:
73
      server.finish_request(request, client_address)
74
      server.close_request(request)
75
    except:
76
      server.handle_error(request, client_address)
77
      server.close_request(request)
78

    
79

    
80
class IOServer(SocketServer.UnixStreamServer):
81
  """IO thread class.
82

    
83
  This class takes care of initializing the other threads, setting
84
  signal handlers (which are processed only in this thread), and doing
85
  cleanup at shutdown.
86

    
87
  """
88
  def __init__(self, address, rqhandler):
89
    """IOServer constructor
90

    
91
    Args:
92
      address: the address to bind this IOServer to
93
      rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.info("client closed connection")
153
        break
154

    
155
      request = simplejson.loads(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(simplejson.dumps(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    self.request.sendall(msg + self.EOM)
199

    
200

    
201
class ClientOps:
202
  """Class holding high-level client operations."""
203
  def __init__(self, server):
204
    self.server = server
205

    
206
  def handle_request(self, method, args):
207
    queue = self.server.context.jobqueue
208

    
209
    # TODO: Parameter validation
210

    
211
    if method == luxi.REQ_SUBMIT_JOB:
212
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
213
      return queue.SubmitJob(ops)
214

    
215
    elif method == luxi.REQ_CANCEL_JOB:
216
      job_id = args
217
      return queue.CancelJob(job_id)
218

    
219
    elif method == luxi.REQ_ARCHIVE_JOB:
220
      job_id = args
221
      return queue.ArchiveJob(job_id)
222

    
223
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
224
      age = args
225
      return queue.AutoArchiveJobs(age)
226

    
227
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230
                                     prev_log_serial, timeout)
231

    
232
    elif method == luxi.REQ_QUERY_JOBS:
233
      (job_ids, fields) = args
234
      return queue.QueryJobs(job_ids, fields)
235

    
236
    elif method == luxi.REQ_QUERY_INSTANCES:
237
      (names, fields) = args
238
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
239
      return self._Query(op)
240

    
241
    elif method == luxi.REQ_QUERY_NODES:
242
      (names, fields) = args
243
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
244
      return self._Query(op)
245

    
246
    elif method == luxi.REQ_QUERY_EXPORTS:
247
      nodes = args
248
      op = opcodes.OpQueryExports(nodes=nodes)
249
      return self._Query(op)
250

    
251
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
252
      fields = args
253
      op = opcodes.OpQueryConfigValues(output_fields=fields)
254
      return self._Query(op)
255

    
256
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
257
      drain_flag = args
258
      return queue.SetDrainFlag(drain_flag)
259

    
260
    else:
261
      raise ValueError("Invalid operation")
262

    
263
  def _DummyLog(self, *args):
264
    pass
265

    
266
  def _Query(self, op):
267
    """Runs the specified opcode and returns the result.
268

    
269
    """
270
    proc = mcpu.Processor(self.server.context)
271
    # TODO: Where should log messages go?
272
    return proc.ExecOpCode(op, self._DummyLog, None)
273

    
274

    
275
class GanetiContext(object):
276
  """Context common to all ganeti threads.
277

    
278
  This class creates and holds common objects shared by all threads.
279

    
280
  """
281
  _instance = None
282

    
283
  def __init__(self):
284
    """Constructs a new GanetiContext object.
285

    
286
    There should be only a GanetiContext object at any time, so this
287
    function raises an error if this is not the case.
288

    
289
    """
290
    assert self.__class__._instance is None, "double GanetiContext instance"
291

    
292
    # Create global configuration object
293
    self.cfg = config.ConfigWriter()
294

    
295
    # Locking manager
296
    self.glm = locking.GanetiLockManager(
297
                self.cfg.GetNodeList(),
298
                self.cfg.GetInstanceList())
299

    
300
    # Job queue
301
    self.jobqueue = jqueue.JobQueue(self)
302

    
303
    # setting this also locks the class against attribute modifications
304
    self.__class__._instance = self
305

    
306
  def __setattr__(self, name, value):
307
    """Setting GanetiContext attributes is forbidden after initialization.
308

    
309
    """
310
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
311
    object.__setattr__(self, name, value)
312

    
313
  def AddNode(self, node):
314
    """Adds a node to the configuration and lock manager.
315

    
316
    """
317
    # Add it to the configuration
318
    self.cfg.AddNode(node)
319

    
320
    # If preseeding fails it'll not be added
321
    self.jobqueue.AddNode(node.name)
322

    
323
    # Add the new node to the Ganeti Lock Manager
324
    self.glm.add(locking.LEVEL_NODE, node.name)
325

    
326
  def ReaddNode(self, node):
327
    """Updates a node that's already in the configuration
328

    
329
    """
330
    # Synchronize the queue again
331
    self.jobqueue.AddNode(node.name)
332

    
333
  def RemoveNode(self, name):
334
    """Removes a node from the configuration and lock manager.
335

    
336
    """
337
    # Remove node from configuration
338
    self.cfg.RemoveNode(name)
339

    
340
    # Notify job queue
341
    self.jobqueue.RemoveNode(name)
342

    
343
    # Remove the node from the Ganeti Lock Manager
344
    self.glm.remove(locking.LEVEL_NODE, name)
345

    
346

    
347
def ParseOptions():
348
  """Parse the command line options.
349

    
350
  Returns:
351
    (options, args) as from OptionParser.parse_args()
352

    
353
  """
354
  parser = OptionParser(description="Ganeti master daemon",
355
                        usage="%prog [-f] [-d]",
356
                        version="%%prog (ganeti) %s" %
357
                        constants.RELEASE_VERSION)
358

    
359
  parser.add_option("-f", "--foreground", dest="fork",
360
                    help="Don't detach from the current terminal",
361
                    default=True, action="store_false")
362
  parser.add_option("-d", "--debug", dest="debug",
363
                    help="Enable some debug messages",
364
                    default=False, action="store_true")
365
  options, args = parser.parse_args()
366
  return options, args
367

    
368

    
369
def CheckAgreement():
370
  """Check the agreement on who is the master.
371

    
372
  The function uses a very simple algorithm: we must get more positive
373
  than negative answers. Since in most of the cases we are the master,
374
  we'll use our own config file for getting the node list. In the
375
  future we could collect the current node list from our (possibly
376
  obsolete) known nodes.
377

    
378
  In order to account for cold-start of all nodes, we retry for up to
379
  a minute until we get a real answer as the top-voted one. If the
380
  nodes are more out-of-sync, for now manual startup of the master
381
  should be attempted.
382

    
383
  Note that for a even number of nodes cluster, we need at least half
384
  of the nodes (beside ourselves) to vote for us. This creates a
385
  problem on two-node clusters, since in this case we require the
386
  other node to be up too to confirm our status.
387

    
388
  """
389
  myself = utils.HostInfo().name
390
  #temp instantiation of a config writer, used only to get the node list
391
  cfg = config.ConfigWriter()
392
  node_list = cfg.GetNodeList()
393
  del cfg
394
  retries = 6
395
  while retries > 0:
396
    votes = bootstrap.GatherMasterVotes(node_list)
397
    if not votes:
398
      # empty node list, this is a one node cluster
399
      return True
400
    if votes[0][0] is None:
401
      retries -= 1
402
      time.sleep(10)
403
      continue
404
    break
405
  if retries == 0:
406
      logging.critical("Cluster inconsistent, most of the nodes didn't answer"
407
                       " after multiple retries. Aborting startup")
408
      return False
409
  # here a real node is at the top of the list
410
  all_votes = sum(item[1] for item in votes)
411
  top_node, top_votes = votes[0]
412
  result = False
413
  if top_node != myself:
414
    logging.critical("It seems we are not the master (top-voted node"
415
                     " is %s)", top_node)
416
  elif top_votes < all_votes - top_votes:
417
    logging.critical("It seems we are not the master (%d votes for,"
418
                     " %d votes against)", top_votes, all_votes - top_votes)
419
  else:
420
    result = True
421

    
422
  return result
423

    
424

    
425
def main():
426
  """Main function"""
427

    
428
  options, args = ParseOptions()
429
  utils.debug = options.debug
430
  utils.no_fork = True
431

    
432
  ssconf.CheckMaster(options.debug)
433

    
434
  # we believe we are the master, let's ask the other nodes...
435
  if not CheckAgreement():
436
    return
437

    
438
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
439

    
440
  # become a daemon
441
  if options.fork:
442
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
443
                    noclose_fds=[master.fileno()])
444

    
445
  utils.WritePidFile(constants.MASTERD_PID)
446

    
447
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
448
                      stderr_logging=not options.fork)
449

    
450
  logging.info("ganeti master daemon startup")
451

    
452
  # activate ip
453
  master_node = ssconf.SimpleConfigReader().GetMasterNode()
454
  if not rpc.RpcRunner.call_node_start_master(master_node, False):
455
    logging.error("Can't activate master IP address")
456

    
457
  master.setup_queue()
458
  try:
459
    master.serve_forever()
460
  finally:
461
    master.server_cleanup()
462
    utils.RemovePidFile(constants.MASTERD_PID)
463

    
464

    
465
if __name__ == "__main__":
466
  main()