Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 30dabd03

History | View | Annotate | Download (18 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import sys
32
import SocketServer
33
import time
34
import collections
35
import signal
36
import logging
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import config
41
from ganeti import constants
42
from ganeti import daemon
43
from ganeti import mcpu
44
from ganeti import opcodes
45
from ganeti import jqueue
46
from ganeti import locking
47
from ganeti import luxi
48
from ganeti import utils
49
from ganeti import errors
50
from ganeti import ssconf
51
from ganeti import workerpool
52
from ganeti import rpc
53
from ganeti import bootstrap
54
from ganeti import serializer
55

    
56

    
57
CLIENT_REQUEST_WORKERS = 16
58

    
59
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
60
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
61

    
62

    
63
class ClientRequestWorker(workerpool.BaseWorker):
64
  def RunTask(self, server, request, client_address):
65
    """Process the request.
66

    
67
    This is copied from the code in ThreadingMixIn.
68

    
69
    """
70
    try:
71
      server.finish_request(request, client_address)
72
      server.close_request(request)
73
    except:
74
      server.handle_error(request, client_address)
75
      server.close_request(request)
76

    
77

    
78
class IOServer(SocketServer.UnixStreamServer):
79
  """IO thread class.
80

    
81
  This class takes care of initializing the other threads, setting
82
  signal handlers (which are processed only in this thread), and doing
83
  cleanup at shutdown.
84

    
85
  """
86
  def __init__(self, address, rqhandler):
87
    """IOServer constructor
88

    
89
    @param address: the address to bind this IOServer to
90
    @param rqhandler: RequestHandler type object
91

    
92
    """
93
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
94

    
95
    # We'll only start threads once we've forked.
96
    self.context = None
97
    self.request_workers = None
98

    
99
  def setup_queue(self):
100
    self.context = GanetiContext()
101
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
102
                                                 ClientRequestWorker)
103

    
104
  def process_request(self, request, client_address):
105
    """Add task to workerpool to process request.
106

    
107
    """
108
    self.request_workers.AddTask(self, request, client_address)
109

    
110
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
111
  def serve_forever(self, signal_handlers=None):
112
    """Handle one request at a time until told to quit."""
113
    assert isinstance(signal_handlers, dict) and \
114
           len(signal_handlers) > 0, \
115
           "Broken SignalHandled decorator"
116
    # Since we use SignalHandled only once, the resulting dict will map all
117
    # signals to the same handler. We'll just use the first one.
118
    sighandler = signal_handlers.values()[0]
119
    while not sighandler.called:
120
      self.handle_request()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = errors.EncodeException(err)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    # TODO: sendall is not guaranteed to send everything
199
    self.request.sendall(msg + self.EOM)
200

    
201

    
202
class ClientOps:
203
  """Class holding high-level client operations."""
204
  def __init__(self, server):
205
    self.server = server
206

    
207
  def handle_request(self, method, args):
208
    queue = self.server.context.jobqueue
209

    
210
    # TODO: Parameter validation
211

    
212
    if method == luxi.REQ_SUBMIT_JOB:
213
      logging.info("Received new job")
214
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215
      return queue.SubmitJob(ops)
216

    
217
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
218
      logging.info("Received multiple jobs")
219
      jobs = []
220
      for ops in args:
221
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222
      return queue.SubmitManyJobs(jobs)
223

    
224
    elif method == luxi.REQ_CANCEL_JOB:
225
      job_id = args
226
      logging.info("Received job cancel request for %s", job_id)
227
      return queue.CancelJob(job_id)
228

    
229
    elif method == luxi.REQ_ARCHIVE_JOB:
230
      job_id = args
231
      logging.info("Received job archive request for %s", job_id)
232
      return queue.ArchiveJob(job_id)
233

    
234
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
235
      (age, timeout) = args
236
      logging.info("Received job autoarchive request for age %s, timeout %s",
237
                   age, timeout)
238
      return queue.AutoArchiveJobs(age, timeout)
239

    
240
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
241
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
242
      logging.info("Received job poll request for %s", job_id)
243
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
244
                                     prev_log_serial, timeout)
245

    
246
    elif method == luxi.REQ_QUERY_JOBS:
247
      (job_ids, fields) = args
248
      if isinstance(job_ids, (tuple, list)) and job_ids:
249
        msg = utils.CommaJoin(job_ids)
250
      else:
251
        msg = str(job_ids)
252
      logging.info("Received job query request for %s", msg)
253
      return queue.QueryJobs(job_ids, fields)
254

    
255
    elif method == luxi.REQ_QUERY_INSTANCES:
256
      (names, fields, use_locking) = args
257
      logging.info("Received instance query request for %s", names)
258
      if use_locking:
259
        raise errors.OpPrereqError("Sync queries are not allowed",
260
                                   errors.ECODE_INVAL)
261
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
262
                                    use_locking=use_locking)
263
      return self._Query(op)
264

    
265
    elif method == luxi.REQ_QUERY_NODES:
266
      (names, fields, use_locking) = args
267
      logging.info("Received node query request for %s", names)
268
      if use_locking:
269
        raise errors.OpPrereqError("Sync queries are not allowed",
270
                                   errors.ECODE_INVAL)
271
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
272
                                use_locking=use_locking)
273
      return self._Query(op)
274

    
275
    elif method == luxi.REQ_QUERY_EXPORTS:
276
      nodes, use_locking = args
277
      if use_locking:
278
        raise errors.OpPrereqError("Sync queries are not allowed",
279
                                   errors.ECODE_INVAL)
280
      logging.info("Received exports query request")
281
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
282
      return self._Query(op)
283

    
284
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
285
      fields = args
286
      logging.info("Received config values query request for %s", fields)
287
      op = opcodes.OpQueryConfigValues(output_fields=fields)
288
      return self._Query(op)
289

    
290
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
291
      logging.info("Received cluster info query request")
292
      op = opcodes.OpQueryClusterInfo()
293
      return self._Query(op)
294

    
295
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
296
      drain_flag = args
297
      logging.info("Received queue drain flag change request to %s",
298
                   drain_flag)
299
      return queue.SetDrainFlag(drain_flag)
300

    
301
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
302
      (until, ) = args
303

    
304
      if until is None:
305
        logging.info("Received request to no longer pause the watcher")
306
      else:
307
        if not isinstance(until, (int, float)):
308
          raise TypeError("Duration must be an integer or float")
309

    
310
        if until < time.time():
311
          raise errors.GenericError("Unable to set pause end time in the past")
312

    
313
        logging.info("Received request to pause the watcher until %s", until)
314

    
315
      return _SetWatcherPause(until)
316

    
317
    else:
318
      logging.info("Received invalid request '%s'", method)
319
      raise ValueError("Invalid operation '%s'" % method)
320

    
321
  def _Query(self, op):
322
    """Runs the specified opcode and returns the result.
323

    
324
    """
325
    # Queries don't have a job id
326
    proc = mcpu.Processor(self.server.context, None)
327
    return proc.ExecOpCode(op, None)
328

    
329

    
330
class GanetiContext(object):
331
  """Context common to all ganeti threads.
332

    
333
  This class creates and holds common objects shared by all threads.
334

    
335
  """
336
  _instance = None
337

    
338
  def __init__(self):
339
    """Constructs a new GanetiContext object.
340

    
341
    There should be only a GanetiContext object at any time, so this
342
    function raises an error if this is not the case.
343

    
344
    """
345
    assert self.__class__._instance is None, "double GanetiContext instance"
346

    
347
    # Create global configuration object
348
    self.cfg = config.ConfigWriter()
349

    
350
    # Locking manager
351
    self.glm = locking.GanetiLockManager(
352
                self.cfg.GetNodeList(),
353
                self.cfg.GetInstanceList())
354

    
355
    # Job queue
356
    self.jobqueue = jqueue.JobQueue(self)
357

    
358
    # setting this also locks the class against attribute modifications
359
    self.__class__._instance = self
360

    
361
  def __setattr__(self, name, value):
362
    """Setting GanetiContext attributes is forbidden after initialization.
363

    
364
    """
365
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
366
    object.__setattr__(self, name, value)
367

    
368
  def AddNode(self, node, ec_id):
369
    """Adds a node to the configuration and lock manager.
370

    
371
    """
372
    # Add it to the configuration
373
    self.cfg.AddNode(node, ec_id)
374

    
375
    # If preseeding fails it'll not be added
376
    self.jobqueue.AddNode(node)
377

    
378
    # Add the new node to the Ganeti Lock Manager
379
    self.glm.add(locking.LEVEL_NODE, node.name)
380

    
381
  def ReaddNode(self, node):
382
    """Updates a node that's already in the configuration
383

    
384
    """
385
    # Synchronize the queue again
386
    self.jobqueue.AddNode(node)
387

    
388
  def RemoveNode(self, name):
389
    """Removes a node from the configuration and lock manager.
390

    
391
    """
392
    # Remove node from configuration
393
    self.cfg.RemoveNode(name)
394

    
395
    # Notify job queue
396
    self.jobqueue.RemoveNode(name)
397

    
398
    # Remove the node from the Ganeti Lock Manager
399
    self.glm.remove(locking.LEVEL_NODE, name)
400

    
401

    
402
def _SetWatcherPause(until):
403
  """Creates or removes the watcher pause file.
404

    
405
  @type until: None or int
406
  @param until: Unix timestamp saying until when the watcher shouldn't run
407

    
408
  """
409
  if until is None:
410
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
411
  else:
412
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
413
                    data="%d\n" % (until, ))
414

    
415
  return until
416

    
417

    
418
def CheckAgreement():
419
  """Check the agreement on who is the master.
420

    
421
  The function uses a very simple algorithm: we must get more positive
422
  than negative answers. Since in most of the cases we are the master,
423
  we'll use our own config file for getting the node list. In the
424
  future we could collect the current node list from our (possibly
425
  obsolete) known nodes.
426

    
427
  In order to account for cold-start of all nodes, we retry for up to
428
  a minute until we get a real answer as the top-voted one. If the
429
  nodes are more out-of-sync, for now manual startup of the master
430
  should be attempted.
431

    
432
  Note that for a even number of nodes cluster, we need at least half
433
  of the nodes (beside ourselves) to vote for us. This creates a
434
  problem on two-node clusters, since in this case we require the
435
  other node to be up too to confirm our status.
436

    
437
  """
438
  myself = utils.HostInfo().name
439
  #temp instantiation of a config writer, used only to get the node list
440
  cfg = config.ConfigWriter()
441
  node_list = cfg.GetNodeList()
442
  del cfg
443
  retries = 6
444
  while retries > 0:
445
    votes = bootstrap.GatherMasterVotes(node_list)
446
    if not votes:
447
      # empty node list, this is a one node cluster
448
      return True
449
    if votes[0][0] is None:
450
      retries -= 1
451
      time.sleep(10)
452
      continue
453
    break
454
  if retries == 0:
455
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
456
                     " after multiple retries. Aborting startup")
457
    return False
458
  # here a real node is at the top of the list
459
  all_votes = sum(item[1] for item in votes)
460
  top_node, top_votes = votes[0]
461

    
462
  result = False
463
  if top_node != myself:
464
    logging.critical("It seems we are not the master (top-voted node"
465
                     " is %s with %d out of %d votes)", top_node, top_votes,
466
                     all_votes)
467
  elif top_votes < all_votes - top_votes:
468
    logging.critical("It seems we are not the master (%d votes for,"
469
                     " %d votes against)", top_votes, all_votes - top_votes)
470
  else:
471
    result = True
472

    
473
  return result
474

    
475

    
476
def CheckAgreementWithRpc():
477
  rpc.Init()
478
  try:
479
    return CheckAgreement()
480
  finally:
481
    rpc.Shutdown()
482

    
483

    
484
def _RunInSeparateProcess(fn):
485
  """Runs a function in a separate process.
486

    
487
  Note: Only boolean return values are supported.
488

    
489
  @type fn: callable
490
  @param fn: Function to be called
491
  @rtype: bool
492

    
493
  """
494
  pid = os.fork()
495
  if pid == 0:
496
    # Child process
497
    try:
498
      # Call function
499
      result = int(bool(fn()))
500
      assert result in (0, 1)
501
    except:
502
      logging.exception("Error while calling function in separate process")
503
      # 0 and 1 are reserved for the return value
504
      result = 33
505

    
506
    os._exit(result)
507

    
508
  # Parent process
509

    
510
  # Avoid zombies and check exit code
511
  (_, status) = os.waitpid(pid, 0)
512

    
513
  if os.WIFSIGNALED(status):
514
    signum = os.WTERMSIG(status)
515
    exitcode = None
516
  else:
517
    signum = None
518
    exitcode = os.WEXITSTATUS(status)
519

    
520
  if not (exitcode in (0, 1) and signum is None):
521
    logging.error("Child program failed (code=%s, signal=%s)",
522
                  exitcode, signum)
523
    sys.exit(constants.EXIT_FAILURE)
524

    
525
  return bool(exitcode)
526

    
527

    
528
def CheckMasterd(options, args):
529
  """Initial checks whether to run or exit with a failure.
530

    
531
  """
532
  ssconf.CheckMaster(options.debug)
533

    
534
  # If CheckMaster didn't fail we believe we are the master, but we have to
535
  # confirm with the other nodes.
536
  if options.no_voting:
537
    if options.yes_do_it:
538
      return
539

    
540
    sys.stdout.write("The 'no voting' option has been selected.\n")
541
    sys.stdout.write("This is dangerous, please confirm by"
542
                     " typing uppercase 'yes': ")
543
    sys.stdout.flush()
544

    
545
    confirmation = sys.stdin.readline().strip()
546
    if confirmation != "YES":
547
      print >>sys.stderr, "Aborting."
548
      sys.exit(constants.EXIT_FAILURE)
549

    
550
    return
551

    
552
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
553
  # process before we call utils.Daemonize in the current process.
554
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
555
    sys.exit(constants.EXIT_FAILURE)
556

    
557

    
558
def ExecMasterd (options, args):
559
  """Main master daemon function, executed with the PID file held.
560

    
561
  """
562
  # This is safe to do as the pid file guarantees against
563
  # concurrent execution.
564
  utils.RemoveFile(constants.MASTER_SOCKET)
565

    
566
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
567
  try:
568
    rpc.Init()
569
    try:
570
      # activate ip
571
      master_node = ssconf.SimpleStore().GetMasterNode()
572
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
573
      msg = result.fail_msg
574
      if msg:
575
        logging.error("Can't activate master IP address: %s", msg)
576

    
577
      master.setup_queue()
578
      try:
579
        master.serve_forever()
580
      finally:
581
        master.server_cleanup()
582
    finally:
583
      rpc.Shutdown()
584
  finally:
585
    utils.RemoveFile(constants.MASTER_SOCKET)
586

    
587

    
588
def main():
589
  """Main function"""
590
  parser = OptionParser(description="Ganeti master daemon",
591
                        usage="%prog [-f] [-d]",
592
                        version="%%prog (ganeti) %s" %
593
                        constants.RELEASE_VERSION)
594
  parser.add_option("--no-voting", dest="no_voting",
595
                    help="Do not check that the nodes agree on this node"
596
                    " being the master and start the daemon unconditionally",
597
                    default=False, action="store_true")
598
  parser.add_option("--yes-do-it", dest="yes_do_it",
599
                    help="Override interactive check for --no-voting",
600
                    default=False, action="store_true")
601
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
602
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
603
         ]
604
  daemon.GenericMain(constants.MASTERD, parser, dirs,
605
                     CheckMasterd, ExecMasterd,
606
                     multithreaded=True)
607

    
608

    
609
if __name__ == "__main__":
610
  main()