Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ b2890442

History | View | Annotate | Download (16 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import serializer
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    # TODO: sendall is not guaranteed to send everything
199
    self.request.sendall(msg + self.EOM)
200

    
201

    
202
class ClientOps:
203
  """Class holding high-level client operations."""
204
  def __init__(self, server):
205
    self.server = server
206

    
207
  def handle_request(self, method, args):
208
    queue = self.server.context.jobqueue
209

    
210
    # TODO: Parameter validation
211

    
212
    if method == luxi.REQ_SUBMIT_JOB:
213
      logging.info("Received new job")
214
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215
      return queue.SubmitJob(ops)
216

    
217
    elif method == luxi.REQ_CANCEL_JOB:
218
      job_id = args
219
      logging.info("Received job cancel request for %s", job_id)
220
      return queue.CancelJob(job_id)
221

    
222
    elif method == luxi.REQ_ARCHIVE_JOB:
223
      job_id = args
224
      logging.info("Received job archive request for %s", job_id)
225
      return queue.ArchiveJob(job_id)
226

    
227
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
228
      (age, timeout) = args
229
      logging.info("Received job autoarchive request for age %s, timeout %s",
230
                   age, timeout)
231
      return queue.AutoArchiveJobs(age, timeout)
232

    
233
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
234
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
235
      logging.info("Received job poll request for %s", job_id)
236
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
237
                                     prev_log_serial, timeout)
238

    
239
    elif method == luxi.REQ_QUERY_JOBS:
240
      (job_ids, fields) = args
241
      if isinstance(job_ids, (tuple, list)) and job_ids:
242
        msg = ", ".join(job_ids)
243
      else:
244
        msg = str(job_ids)
245
      logging.info("Received job query request for %s", msg)
246
      return queue.QueryJobs(job_ids, fields)
247

    
248
    elif method == luxi.REQ_QUERY_INSTANCES:
249
      (names, fields, use_locking) = args
250
      logging.info("Received instance query request for %s", names)
251
      if use_locking:
252
        raise errors.OpPrereqError("Sync queries are not allowed")
253
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
254
                                    use_locking=use_locking)
255
      return self._Query(op)
256

    
257
    elif method == luxi.REQ_QUERY_NODES:
258
      (names, fields, use_locking) = args
259
      logging.info("Received node query request for %s", names)
260
      if use_locking:
261
        raise errors.OpPrereqError("Sync queries are not allowed")
262
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
263
                                use_locking=use_locking)
264
      return self._Query(op)
265

    
266
    elif method == luxi.REQ_QUERY_EXPORTS:
267
      nodes, use_locking = args
268
      if use_locking:
269
        raise errors.OpPrereqError("Sync queries are not allowed")
270
      logging.info("Received exports query request")
271
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
272
      return self._Query(op)
273

    
274
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
275
      fields = args
276
      logging.info("Received config values query request for %s", fields)
277
      op = opcodes.OpQueryConfigValues(output_fields=fields)
278
      return self._Query(op)
279

    
280
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
281
      logging.info("Received cluster info query request")
282
      op = opcodes.OpQueryClusterInfo()
283
      return self._Query(op)
284

    
285
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
286
      drain_flag = args
287
      logging.info("Received queue drain flag change request to %s",
288
                   drain_flag)
289
      return queue.SetDrainFlag(drain_flag)
290

    
291
    else:
292
      logging.info("Received invalid request '%s'", method)
293
      raise ValueError("Invalid operation '%s'" % method)
294

    
295
  def _DummyLog(self, *args):
296
    pass
297

    
298
  def _Query(self, op):
299
    """Runs the specified opcode and returns the result.
300

    
301
    """
302
    proc = mcpu.Processor(self.server.context)
303
    # TODO: Where should log messages go?
304
    return proc.ExecOpCode(op, self._DummyLog, None)
305

    
306

    
307
class GanetiContext(object):
308
  """Context common to all ganeti threads.
309

    
310
  This class creates and holds common objects shared by all threads.
311

    
312
  """
313
  _instance = None
314

    
315
  def __init__(self):
316
    """Constructs a new GanetiContext object.
317

    
318
    There should be only a GanetiContext object at any time, so this
319
    function raises an error if this is not the case.
320

    
321
    """
322
    assert self.__class__._instance is None, "double GanetiContext instance"
323

    
324
    # Create global configuration object
325
    self.cfg = config.ConfigWriter()
326

    
327
    # Locking manager
328
    self.glm = locking.GanetiLockManager(
329
                self.cfg.GetNodeList(),
330
                self.cfg.GetInstanceList())
331

    
332
    # Job queue
333
    self.jobqueue = jqueue.JobQueue(self)
334

    
335
    # setting this also locks the class against attribute modifications
336
    self.__class__._instance = self
337

    
338
  def __setattr__(self, name, value):
339
    """Setting GanetiContext attributes is forbidden after initialization.
340

    
341
    """
342
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
343
    object.__setattr__(self, name, value)
344

    
345
  def AddNode(self, node):
346
    """Adds a node to the configuration and lock manager.
347

    
348
    """
349
    # Add it to the configuration
350
    self.cfg.AddNode(node)
351

    
352
    # If preseeding fails it'll not be added
353
    self.jobqueue.AddNode(node)
354

    
355
    # Add the new node to the Ganeti Lock Manager
356
    self.glm.add(locking.LEVEL_NODE, node.name)
357

    
358
  def ReaddNode(self, node):
359
    """Updates a node that's already in the configuration
360

    
361
    """
362
    # Synchronize the queue again
363
    self.jobqueue.AddNode(node)
364

    
365
  def RemoveNode(self, name):
366
    """Removes a node from the configuration and lock manager.
367

    
368
    """
369
    # Remove node from configuration
370
    self.cfg.RemoveNode(name)
371

    
372
    # Notify job queue
373
    self.jobqueue.RemoveNode(name)
374

    
375
    # Remove the node from the Ganeti Lock Manager
376
    self.glm.remove(locking.LEVEL_NODE, name)
377

    
378

    
379
def ParseOptions():
380
  """Parse the command line options.
381

    
382
  @return: (options, args) as from OptionParser.parse_args()
383

    
384
  """
385
  parser = OptionParser(description="Ganeti master daemon",
386
                        usage="%prog [-f] [-d]",
387
                        version="%%prog (ganeti) %s" %
388
                        constants.RELEASE_VERSION)
389

    
390
  parser.add_option("-f", "--foreground", dest="fork",
391
                    help="Don't detach from the current terminal",
392
                    default=True, action="store_false")
393
  parser.add_option("-d", "--debug", dest="debug",
394
                    help="Enable some debug messages",
395
                    default=False, action="store_true")
396
  parser.add_option("--no-voting", dest="no_voting",
397
                    help="Do not check that the nodes agree on this node"
398
                    " being the master and start the daemon unconditionally",
399
                    default=False, action="store_true")
400
  parser.add_option("--yes-do-it", dest="yes_do_it",
401
                    help="Override interactive check for --no-voting",
402
                    default=False, action="store_true")
403

    
404
  options, args = parser.parse_args()
405
  return options, args
406

    
407

    
408
def CheckAgreement():
409
  """Check the agreement on who is the master.
410

    
411
  The function uses a very simple algorithm: we must get more positive
412
  than negative answers. Since in most of the cases we are the master,
413
  we'll use our own config file for getting the node list. In the
414
  future we could collect the current node list from our (possibly
415
  obsolete) known nodes.
416

    
417
  In order to account for cold-start of all nodes, we retry for up to
418
  a minute until we get a real answer as the top-voted one. If the
419
  nodes are more out-of-sync, for now manual startup of the master
420
  should be attempted.
421

    
422
  Note that for a even number of nodes cluster, we need at least half
423
  of the nodes (beside ourselves) to vote for us. This creates a
424
  problem on two-node clusters, since in this case we require the
425
  other node to be up too to confirm our status.
426

    
427
  """
428
  myself = utils.HostInfo().name
429
  #temp instantiation of a config writer, used only to get the node list
430
  cfg = config.ConfigWriter()
431
  node_list = cfg.GetNodeList()
432
  del cfg
433
  retries = 6
434
  while retries > 0:
435
    votes = bootstrap.GatherMasterVotes(node_list)
436
    if not votes:
437
      # empty node list, this is a one node cluster
438
      return True
439
    if votes[0][0] is None:
440
      retries -= 1
441
      time.sleep(10)
442
      continue
443
    break
444
  if retries == 0:
445
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
446
                     " after multiple retries. Aborting startup")
447
    return False
448
  # here a real node is at the top of the list
449
  all_votes = sum(item[1] for item in votes)
450
  top_node, top_votes = votes[0]
451
  result = False
452
  if top_node != myself:
453
    logging.critical("It seems we are not the master (top-voted node"
454
                     " is %s with %d out of %d votes)", top_node, top_votes,
455
                     all_votes)
456
  elif top_votes < all_votes - top_votes:
457
    logging.critical("It seems we are not the master (%d votes for,"
458
                     " %d votes against)", top_votes, all_votes - top_votes)
459
  else:
460
    result = True
461

    
462
  return result
463

    
464

    
465
def main():
466
  """Main function"""
467

    
468
  options, args = ParseOptions()
469
  utils.debug = options.debug
470
  utils.no_fork = True
471

    
472
  if options.fork:
473
    utils.CloseFDs()
474

    
475
  rpc.Init()
476
  try:
477
    ssconf.CheckMaster(options.debug)
478

    
479
    # we believe we are the master, let's ask the other nodes...
480
    if options.no_voting and not options.yes_do_it:
481
      sys.stdout.write("The 'no voting' option has been selected.\n")
482
      sys.stdout.write("This is dangerous, please confirm by"
483
                       " typing uppercase 'yes': ")
484
      sys.stdout.flush()
485
      confirmation = sys.stdin.readline().strip()
486
      if confirmation != "YES":
487
        print "Aborting."
488
        return
489
    elif not options.no_voting:
490
      if not CheckAgreement():
491
        return
492

    
493
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
494
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
495
           ]
496
    utils.EnsureDirs(dirs)
497

    
498
    # This is safe to do as the pid file guarantees against
499
    # concurrent execution.
500
    utils.RemoveFile(constants.MASTER_SOCKET)
501

    
502
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
503
  finally:
504
    rpc.Shutdown()
505

    
506
  # become a daemon
507
  if options.fork:
508
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
509

    
510
  utils.WritePidFile(constants.MASTERD_PID)
511
  try:
512
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
513
                       stderr_logging=not options.fork, multithreaded=True)
514

    
515
    logging.info("Ganeti master daemon startup")
516

    
517
    rpc.Init()
518
    try:
519
      # activate ip
520
      master_node = ssconf.SimpleStore().GetMasterNode()
521
      if not rpc.RpcRunner.call_node_start_master(master_node, False):
522
        logging.error("Can't activate master IP address")
523

    
524
      master.setup_queue()
525
      try:
526
        master.serve_forever()
527
      finally:
528
        master.server_cleanup()
529
    finally:
530
      rpc.Shutdown()
531
  finally:
532
    utils.RemovePidFile(constants.MASTERD_PID)
533
    utils.RemoveFile(constants.MASTER_SOCKET)
534

    
535

    
536
if __name__ == "__main__":
537
  main()