Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ a6607331

History | View | Annotate | Download (16.1 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import sys
32
import SocketServer
33
import time
34
import collections
35
import signal
36
import logging
37

    
38
from cStringIO import StringIO
39
from optparse import OptionParser
40

    
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import mcpu
44
from ganeti import opcodes
45
from ganeti import jqueue
46
from ganeti import locking
47
from ganeti import luxi
48
from ganeti import utils
49
from ganeti import errors
50
from ganeti import ssconf
51
from ganeti import workerpool
52
from ganeti import rpc
53
from ganeti import bootstrap
54
from ganeti import serializer
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

    
67
    This is copied from the code in ThreadingMixIn.
68

    
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

    
77

    
78
class IOServer(SocketServer.UnixStreamServer):
79
  """IO thread class.
80

    
81
  This class takes care of initializing the other threads, setting
82
  signal handlers (which are processed only in this thread), and doing
83
  cleanup at shutdown.
84

    
85
  """
86
  def __init__(self, address, rqhandler):
87
    """IOServer constructor
88

    
89
    @param address: the address to bind this IOServer to
90
    @param rqhandler: RequestHandler type object
91

    
92
    """
93
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94

    
95
    # We'll only start threads once we've forked.
96
    self.context = None
97
    self.request_workers = None
98

    
99
  def setup_queue(self):
100
    self.context = GanetiContext()
101
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102
                                                 ClientRequestWorker)
103

    
104
  def process_request(self, request, client_address):
105
    """Add task to workerpool to process request.
106

    
107
    """
108
    self.request_workers.AddTask(self, request, client_address)
109

    
110
  def serve_forever(self):
111
    """Handle one request at a time until told to quit."""
112
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
113
    try:
114
      while not sighandler.called:
115
        self.handle_request()
116
    finally:
117
      sighandler.Reset()
118

    
119
  def server_cleanup(self):
120
    """Cleanup the server.
121

    
122
    This involves shutting down the processor threads and the master
123
    socket.
124

    
125
    """
126
    try:
127
      self.server_close()
128
    finally:
129
      if self.request_workers:
130
        self.request_workers.TerminateWorkers()
131
      if self.context:
132
        self.context.jobqueue.Shutdown()
133

    
134

    
135
class ClientRqHandler(SocketServer.BaseRequestHandler):
136
  """Client handler"""
137
  EOM = '\3'
138
  READ_SIZE = 4096
139

    
140
  def setup(self):
141
    self._buffer = ""
142
    self._msgs = collections.deque()
143
    self._ops = ClientOps(self.server)
144

    
145
  def handle(self):
146
    while True:
147
      msg = self.read_message()
148
      if msg is None:
149
        logging.debug("client closed connection")
150
        break
151

    
152
      request = serializer.LoadJson(msg)
153
      logging.debug("request: %s", request)
154
      if not isinstance(request, dict):
155
        logging.error("wrong request received: %s", msg)
156
        break
157

    
158
      method = request.get(luxi.KEY_METHOD, None)
159
      args = request.get(luxi.KEY_ARGS, None)
160
      if method is None or args is None:
161
        logging.error("no method or args in request")
162
        break
163

    
164
      success = False
165
      try:
166
        result = self._ops.handle_request(method, args)
167
        success = True
168
      except errors.GenericError, err:
169
        success = False
170
        result = errors.EncodeException(err)
171
      except:
172
        logging.error("Unexpected exception", exc_info=True)
173
        err = sys.exc_info()
174
        result = "Caught exception: %s" % str(err[1])
175

    
176
      response = {
177
        luxi.KEY_SUCCESS: success,
178
        luxi.KEY_RESULT: result,
179
        }
180
      logging.debug("response: %s", response)
181
      self.send_message(serializer.DumpJson(response))
182

    
183
  def read_message(self):
184
    while not self._msgs:
185
      data = self.request.recv(self.READ_SIZE)
186
      if not data:
187
        return None
188
      new_msgs = (self._buffer + data).split(self.EOM)
189
      self._buffer = new_msgs.pop()
190
      self._msgs.extend(new_msgs)
191
    return self._msgs.popleft()
192

    
193
  def send_message(self, msg):
194
    #print "sending", msg
195
    # TODO: sendall is not guaranteed to send everything
196
    self.request.sendall(msg + self.EOM)
197

    
198

    
199
class ClientOps:
200
  """Class holding high-level client operations."""
201
  def __init__(self, server):
202
    self.server = server
203

    
204
  def handle_request(self, method, args):
205
    queue = self.server.context.jobqueue
206

    
207
    # TODO: Parameter validation
208

    
209
    if method == luxi.REQ_SUBMIT_JOB:
210
      logging.info("Received new job")
211
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212
      return queue.SubmitJob(ops)
213

    
214
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
215
      logging.info("Received multiple jobs")
216
      jobs = []
217
      for ops in args:
218
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
219
      return queue.SubmitManyJobs(jobs)
220

    
221
    elif method == luxi.REQ_CANCEL_JOB:
222
      job_id = args
223
      logging.info("Received job cancel request for %s", job_id)
224
      return queue.CancelJob(job_id)
225

    
226
    elif method == luxi.REQ_ARCHIVE_JOB:
227
      job_id = args
228
      logging.info("Received job archive request for %s", job_id)
229
      return queue.ArchiveJob(job_id)
230

    
231
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
232
      (age, timeout) = args
233
      logging.info("Received job autoarchive request for age %s, timeout %s",
234
                   age, timeout)
235
      return queue.AutoArchiveJobs(age, timeout)
236

    
237
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
238
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
239
      logging.info("Received job poll request for %s", job_id)
240
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
241
                                     prev_log_serial, timeout)
242

    
243
    elif method == luxi.REQ_QUERY_JOBS:
244
      (job_ids, fields) = args
245
      if isinstance(job_ids, (tuple, list)) and job_ids:
246
        msg = ", ".join(job_ids)
247
      else:
248
        msg = str(job_ids)
249
      logging.info("Received job query request for %s", msg)
250
      return queue.QueryJobs(job_ids, fields)
251

    
252
    elif method == luxi.REQ_QUERY_INSTANCES:
253
      (names, fields, use_locking) = args
254
      logging.info("Received instance query request for %s", names)
255
      if use_locking:
256
        raise errors.OpPrereqError("Sync queries are not allowed")
257
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
258
                                    use_locking=use_locking)
259
      return self._Query(op)
260

    
261
    elif method == luxi.REQ_QUERY_NODES:
262
      (names, fields, use_locking) = args
263
      logging.info("Received node query request for %s", names)
264
      if use_locking:
265
        raise errors.OpPrereqError("Sync queries are not allowed")
266
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
267
                                use_locking=use_locking)
268
      return self._Query(op)
269

    
270
    elif method == luxi.REQ_QUERY_EXPORTS:
271
      nodes, use_locking = args
272
      if use_locking:
273
        raise errors.OpPrereqError("Sync queries are not allowed")
274
      logging.info("Received exports query request")
275
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
276
      return self._Query(op)
277

    
278
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
279
      fields = args
280
      logging.info("Received config values query request for %s", fields)
281
      op = opcodes.OpQueryConfigValues(output_fields=fields)
282
      return self._Query(op)
283

    
284
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
285
      logging.info("Received cluster info query request")
286
      op = opcodes.OpQueryClusterInfo()
287
      return self._Query(op)
288

    
289
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
290
      drain_flag = args
291
      logging.info("Received queue drain flag change request to %s",
292
                   drain_flag)
293
      return queue.SetDrainFlag(drain_flag)
294

    
295
    else:
296
      logging.info("Received invalid request '%s'", method)
297
      raise ValueError("Invalid operation '%s'" % method)
298

    
299
  def _DummyLog(self, *args):
300
    pass
301

    
302
  def _Query(self, op):
303
    """Runs the specified opcode and returns the result.
304

    
305
    """
306
    proc = mcpu.Processor(self.server.context)
307
    # TODO: Where should log messages go?
308
    return proc.ExecOpCode(op, self._DummyLog, None)
309

    
310

    
311
class GanetiContext(object):
312
  """Context common to all ganeti threads.
313

    
314
  This class creates and holds common objects shared by all threads.
315

    
316
  """
317
  _instance = None
318

    
319
  def __init__(self):
320
    """Constructs a new GanetiContext object.
321

    
322
    There should be only a GanetiContext object at any time, so this
323
    function raises an error if this is not the case.
324

    
325
    """
326
    assert self.__class__._instance is None, "double GanetiContext instance"
327

    
328
    # Create global configuration object
329
    self.cfg = config.ConfigWriter()
330

    
331
    # Locking manager
332
    self.glm = locking.GanetiLockManager(
333
                self.cfg.GetNodeList(),
334
                self.cfg.GetInstanceList())
335

    
336
    # Job queue
337
    self.jobqueue = jqueue.JobQueue(self)
338

    
339
    # setting this also locks the class against attribute modifications
340
    self.__class__._instance = self
341

    
342
  def __setattr__(self, name, value):
343
    """Setting GanetiContext attributes is forbidden after initialization.
344

    
345
    """
346
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
347
    object.__setattr__(self, name, value)
348

    
349
  def AddNode(self, node):
350
    """Adds a node to the configuration and lock manager.
351

    
352
    """
353
    # Add it to the configuration
354
    self.cfg.AddNode(node)
355

    
356
    # If preseeding fails it'll not be added
357
    self.jobqueue.AddNode(node)
358

    
359
    # Add the new node to the Ganeti Lock Manager
360
    self.glm.add(locking.LEVEL_NODE, node.name)
361

    
362
  def ReaddNode(self, node):
363
    """Updates a node that's already in the configuration
364

    
365
    """
366
    # Synchronize the queue again
367
    self.jobqueue.AddNode(node)
368

    
369
  def RemoveNode(self, name):
370
    """Removes a node from the configuration and lock manager.
371

    
372
    """
373
    # Remove node from configuration
374
    self.cfg.RemoveNode(name)
375

    
376
    # Notify job queue
377
    self.jobqueue.RemoveNode(name)
378

    
379
    # Remove the node from the Ganeti Lock Manager
380
    self.glm.remove(locking.LEVEL_NODE, name)
381

    
382

    
383
def ParseOptions():
384
  """Parse the command line options.
385

    
386
  @return: (options, args) as from OptionParser.parse_args()
387

    
388
  """
389
  parser = OptionParser(description="Ganeti master daemon",
390
                        usage="%prog [-f] [-d]",
391
                        version="%%prog (ganeti) %s" %
392
                        constants.RELEASE_VERSION)
393

    
394
  parser.add_option("-f", "--foreground", dest="fork",
395
                    help="Don't detach from the current terminal",
396
                    default=True, action="store_false")
397
  parser.add_option("-d", "--debug", dest="debug",
398
                    help="Enable some debug messages",
399
                    default=False, action="store_true")
400
  parser.add_option("--no-voting", dest="no_voting",
401
                    help="Do not check that the nodes agree on this node"
402
                    " being the master and start the daemon unconditionally",
403
                    default=False, action="store_true")
404
  parser.add_option("--yes-do-it", dest="yes_do_it",
405
                    help="Override interactive check for --no-voting",
406
                    default=False, action="store_true")
407

    
408
  options, args = parser.parse_args()
409
  return options, args
410

    
411

    
412
def CheckAgreement():
413
  """Check the agreement on who is the master.
414

    
415
  The function uses a very simple algorithm: we must get more positive
416
  than negative answers. Since in most of the cases we are the master,
417
  we'll use our own config file for getting the node list. In the
418
  future we could collect the current node list from our (possibly
419
  obsolete) known nodes.
420

    
421
  In order to account for cold-start of all nodes, we retry for up to
422
  a minute until we get a real answer as the top-voted one. If the
423
  nodes are more out-of-sync, for now manual startup of the master
424
  should be attempted.
425

    
426
  Note that for a even number of nodes cluster, we need at least half
427
  of the nodes (beside ourselves) to vote for us. This creates a
428
  problem on two-node clusters, since in this case we require the
429
  other node to be up too to confirm our status.
430

    
431
  """
432
  myself = utils.HostInfo().name
433
  #temp instantiation of a config writer, used only to get the node list
434
  cfg = config.ConfigWriter()
435
  node_list = cfg.GetNodeList()
436
  del cfg
437
  retries = 6
438
  while retries > 0:
439
    votes = bootstrap.GatherMasterVotes(node_list)
440
    if not votes:
441
      # empty node list, this is a one node cluster
442
      return True
443
    if votes[0][0] is None:
444
      retries -= 1
445
      time.sleep(10)
446
      continue
447
    break
448
  if retries == 0:
449
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
450
                     " after multiple retries. Aborting startup")
451
    return False
452
  # here a real node is at the top of the list
453
  all_votes = sum(item[1] for item in votes)
454
  top_node, top_votes = votes[0]
455
  result = False
456
  if top_node != myself:
457
    logging.critical("It seems we are not the master (top-voted node"
458
                     " is %s with %d out of %d votes)", top_node, top_votes,
459
                     all_votes)
460
  elif top_votes < all_votes - top_votes:
461
    logging.critical("It seems we are not the master (%d votes for,"
462
                     " %d votes against)", top_votes, all_votes - top_votes)
463
  else:
464
    result = True
465

    
466
  return result
467

    
468

    
469
def main():
470
  """Main function"""
471

    
472
  options, args = ParseOptions()
473
  utils.no_fork = True
474

    
475
  if options.fork:
476
    utils.CloseFDs()
477

    
478
  rpc.Init()
479
  try:
480
    ssconf.CheckMaster(options.debug)
481

    
482
    # we believe we are the master, let's ask the other nodes...
483
    if options.no_voting and not options.yes_do_it:
484
      sys.stdout.write("The 'no voting' option has been selected.\n")
485
      sys.stdout.write("This is dangerous, please confirm by"
486
                       " typing uppercase 'yes': ")
487
      sys.stdout.flush()
488
      confirmation = sys.stdin.readline().strip()
489
      if confirmation != "YES":
490
        print "Aborting."
491
        return
492
    elif not options.no_voting:
493
      if not CheckAgreement():
494
        return
495

    
496
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
497
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
498
           ]
499
    utils.EnsureDirs(dirs)
500

    
501
    # This is safe to do as the pid file guarantees against
502
    # concurrent execution.
503
    utils.RemoveFile(constants.MASTER_SOCKET)
504

    
505
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
506
  finally:
507
    rpc.Shutdown()
508

    
509
  # become a daemon
510
  if options.fork:
511
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
512

    
513
  utils.WritePidFile(constants.MASTERD_PID)
514
  try:
515
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
516
                       stderr_logging=not options.fork, multithreaded=True)
517

    
518
    logging.info("Ganeti master daemon startup")
519

    
520
    rpc.Init()
521
    try:
522
      # activate ip
523
      master_node = ssconf.SimpleStore().GetMasterNode()
524
      if not rpc.RpcRunner.call_node_start_master(master_node, False, False):
525
        logging.error("Can't activate master IP address")
526

    
527
      master.setup_queue()
528
      try:
529
        master.serve_forever()
530
      finally:
531
        master.server_cleanup()
532
    finally:
533
      rpc.Shutdown()
534
  finally:
535
    utils.RemovePidFile(constants.MASTERD_PID)
536
    utils.RemoveFile(constants.MASTER_SOCKET)
537

    
538

    
539
if __name__ == "__main__":
540
  main()