Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ cb4d3314

History | View | Annotate | Download (23.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116

    
117
  def __init__(self, server, connected_socket, client_address, family):
118
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119
                                                 client_address,
120
                                                 constants.LUXI_EOM,
121
                                                 family, self._MAX_UNHANDLED)
122
    self.server = server
123

    
124
  def handle_message(self, message, _):
125
    self.server.request_workers.AddTask((self.server, message, self))
126

    
127

    
128
class _MasterShutdownCheck:
129
  """Logic for master daemon shutdown.
130

131
  """
132
  #: How long to wait between checks
133
  _CHECK_INTERVAL = 5.0
134

    
135
  #: How long to wait after all jobs are done (e.g. to give clients time to
136
  #: retrieve the job status)
137
  _SHUTDOWN_LINGER = 5.0
138

    
139
  def __init__(self):
140
    """Initializes this class.
141

142
    """
143
    self._had_active_jobs = None
144
    self._linger_timeout = None
145

    
146
  def __call__(self, jq_prepare_result):
147
    """Determines if master daemon is ready for shutdown.
148

149
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
150
    @rtype: None or number
151
    @return: None if master daemon is ready, timeout if the check must be
152
             repeated
153

154
    """
155
    if jq_prepare_result:
156
      # Check again shortly
157
      logging.info("Job queue has been notified for shutdown but is still"
158
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
159
      self._had_active_jobs = True
160
      return self._CHECK_INTERVAL
161

    
162
    if not self._had_active_jobs:
163
      # Can shut down as there were no active jobs on the first check
164
      return None
165

    
166
    # No jobs are running anymore, but maybe some clients want to collect some
167
    # information. Give them a short amount of time.
168
    if self._linger_timeout is None:
169
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
170

    
171
    remaining = self._linger_timeout.Remaining()
172

    
173
    logging.info("Job queue no longer busy; shutting down master daemon"
174
                 " in %s seconds", remaining)
175

    
176
    # TODO: Should the master daemon socket be closed at this point? Doing so
177
    # wouldn't affect existing connections.
178

    
179
    if remaining < 0:
180
      return None
181
    else:
182
      return remaining
183

    
184

    
185
class MasterServer(daemon.AsyncStreamServer):
186
  """Master Server.
187

188
  This is the main asynchronous master server. It handles connections to the
189
  master socket.
190

191
  """
192
  family = socket.AF_UNIX
193

    
194
  def __init__(self, address, uid, gid):
195
    """MasterServer constructor
196

197
    @param address: the unix socket address to bind the MasterServer to
198
    @param uid: The uid of the owner of the socket
199
    @param gid: The gid of the owner of the socket
200

201
    """
202
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
203
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
204
    os.chmod(temp_name, 0770)
205
    os.chown(temp_name, uid, gid)
206
    os.rename(temp_name, address)
207

    
208
    self.awaker = daemon.AsyncAwaker()
209

    
210
    # We'll only start threads once we've forked.
211
    self.context = None
212
    self.request_workers = None
213

    
214
    self._shutdown_check = None
215

    
216
  def handle_connection(self, connected_socket, client_address):
217
    # TODO: add connection count and limit the number of open connections to a
218
    # maximum number to avoid breaking for lack of file descriptors or memory.
219
    MasterClientHandler(self, connected_socket, client_address, self.family)
220

    
221
  def setup_queue(self):
222
    self.context = GanetiContext()
223
    self.request_workers = workerpool.WorkerPool("ClientReq",
224
                                                 CLIENT_REQUEST_WORKERS,
225
                                                 ClientRequestWorker)
226

    
227
  def WaitForShutdown(self):
228
    """Prepares server for shutdown.
229

230
    """
231
    if self._shutdown_check is None:
232
      self._shutdown_check = _MasterShutdownCheck()
233

    
234
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
235

    
236
  def server_cleanup(self):
237
    """Cleanup the server.
238

239
    This involves shutting down the processor threads and the master
240
    socket.
241

242
    """
243
    try:
244
      self.close()
245
    finally:
246
      if self.request_workers:
247
        self.request_workers.TerminateWorkers()
248
      if self.context:
249
        self.context.jobqueue.Shutdown()
250

    
251

    
252
class ClientOps:
253
  """Class holding high-level client operations."""
254
  def __init__(self, server):
255
    self.server = server
256

    
257
  def handle_request(self, method, args): # pylint: disable=R0911
258
    queue = self.server.context.jobqueue
259

    
260
    # TODO: Parameter validation
261
    if not isinstance(args, (tuple, list)):
262
      logging.info("Received invalid arguments of type '%s'", type(args))
263
      raise ValueError("Invalid arguments type '%s'" % type(args))
264

    
265
    # TODO: Rewrite to not exit in each 'if/elif' branch
266

    
267
    if method == luxi.REQ_SUBMIT_JOB:
268
      logging.info("Received new job")
269
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
270
      return queue.SubmitJob(ops)
271

    
272
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
273
      logging.info("Received multiple jobs")
274
      jobs = []
275
      for ops in args:
276
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
277
      return queue.SubmitManyJobs(jobs)
278

    
279
    elif method == luxi.REQ_CANCEL_JOB:
280
      (job_id, ) = args
281
      logging.info("Received job cancel request for %s", job_id)
282
      return queue.CancelJob(job_id)
283

    
284
    elif method == luxi.REQ_ARCHIVE_JOB:
285
      (job_id, ) = args
286
      logging.info("Received job archive request for %s", job_id)
287
      return queue.ArchiveJob(job_id)
288

    
289
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
290
      (age, timeout) = args
291
      logging.info("Received job autoarchive request for age %s, timeout %s",
292
                   age, timeout)
293
      return queue.AutoArchiveJobs(age, timeout)
294

    
295
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
296
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
297
      logging.info("Received job poll request for %s", job_id)
298
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
299
                                     prev_log_serial, timeout)
300

    
301
    elif method == luxi.REQ_QUERY:
302
      (what, fields, qfilter) = args
303
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
304

    
305
      if req.what in constants.QR_VIA_OP:
306
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
307
                                             qfilter=req.qfilter))
308
      elif req.what == constants.QR_LOCK:
309
        if req.qfilter is not None:
310
          raise errors.OpPrereqError("Lock queries can't be filtered")
311
        return self.server.context.glm.QueryLocks(req.fields)
312
      elif req.what in constants.QR_VIA_LUXI:
313
        raise NotImplementedError
314
      else:
315
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
316
                                   errors.ECODE_INVAL)
317

    
318
      return result
319

    
320
    elif method == luxi.REQ_QUERY_FIELDS:
321
      (what, fields) = args
322
      req = objects.QueryFieldsRequest(what=what, fields=fields)
323

    
324
      try:
325
        fielddefs = query.ALL_FIELDS[req.what]
326
      except KeyError:
327
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
328
                                   errors.ECODE_INVAL)
329

    
330
      return query.QueryFields(fielddefs, req.fields)
331

    
332
    elif method == luxi.REQ_QUERY_JOBS:
333
      (job_ids, fields) = args
334
      if isinstance(job_ids, (tuple, list)) and job_ids:
335
        msg = utils.CommaJoin(job_ids)
336
      else:
337
        msg = str(job_ids)
338
      logging.info("Received job query request for %s", msg)
339
      return queue.QueryJobs(job_ids, fields)
340

    
341
    elif method == luxi.REQ_QUERY_INSTANCES:
342
      (names, fields, use_locking) = args
343
      logging.info("Received instance query request for %s", names)
344
      if use_locking:
345
        raise errors.OpPrereqError("Sync queries are not allowed",
346
                                   errors.ECODE_INVAL)
347
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
348
                                   use_locking=use_locking)
349
      return self._Query(op)
350

    
351
    elif method == luxi.REQ_QUERY_NODES:
352
      (names, fields, use_locking) = args
353
      logging.info("Received node query request for %s", names)
354
      if use_locking:
355
        raise errors.OpPrereqError("Sync queries are not allowed",
356
                                   errors.ECODE_INVAL)
357
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
358
                               use_locking=use_locking)
359
      return self._Query(op)
360

    
361
    elif method == luxi.REQ_QUERY_GROUPS:
362
      (names, fields, use_locking) = args
363
      logging.info("Received group query request for %s", names)
364
      if use_locking:
365
        raise errors.OpPrereqError("Sync queries are not allowed",
366
                                   errors.ECODE_INVAL)
367
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
368
      return self._Query(op)
369

    
370
    elif method == luxi.REQ_QUERY_EXPORTS:
371
      (nodes, use_locking) = args
372
      if use_locking:
373
        raise errors.OpPrereqError("Sync queries are not allowed",
374
                                   errors.ECODE_INVAL)
375
      logging.info("Received exports query request")
376
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
377
      return self._Query(op)
378

    
379
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
380
      (fields, ) = args
381
      logging.info("Received config values query request for %s", fields)
382
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
383
      return self._Query(op)
384

    
385
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
386
      logging.info("Received cluster info query request")
387
      op = opcodes.OpClusterQuery()
388
      return self._Query(op)
389

    
390
    elif method == luxi.REQ_QUERY_TAGS:
391
      (kind, name) = args
392
      logging.info("Received tags query request")
393
      op = opcodes.OpTagsGet(kind=kind, name=name)
394
      return self._Query(op)
395

    
396
    elif method == luxi.REQ_QUERY_LOCKS:
397
      (fields, sync) = args
398
      logging.info("Received locks query request")
399
      if sync:
400
        raise NotImplementedError("Synchronous queries are not implemented")
401
      return self.server.context.glm.OldStyleQueryLocks(fields)
402

    
403
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
404
      (drain_flag, ) = args
405
      logging.info("Received queue drain flag change request to %s",
406
                   drain_flag)
407
      return queue.SetDrainFlag(drain_flag)
408

    
409
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
410
      (until, ) = args
411

    
412
      if until is None:
413
        logging.info("Received request to no longer pause the watcher")
414
      else:
415
        if not isinstance(until, (int, float)):
416
          raise TypeError("Duration must be an integer or float")
417

    
418
        if until < time.time():
419
          raise errors.GenericError("Unable to set pause end time in the past")
420

    
421
        logging.info("Received request to pause the watcher until %s", until)
422

    
423
      return _SetWatcherPause(until)
424

    
425
    else:
426
      logging.info("Received invalid request '%s'", method)
427
      raise ValueError("Invalid operation '%s'" % method)
428

    
429
  def _Query(self, op):
430
    """Runs the specified opcode and returns the result.
431

432
    """
433
    # Queries don't have a job id
434
    proc = mcpu.Processor(self.server.context, None)
435

    
436
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
437
    # Consider using a timeout for retries.
438
    return proc.ExecOpCode(op, None)
439

    
440

    
441
class GanetiContext(object):
442
  """Context common to all ganeti threads.
443

444
  This class creates and holds common objects shared by all threads.
445

446
  """
447
  # pylint: disable=W0212
448
  # we do want to ensure a singleton here
449
  _instance = None
450

    
451
  def __init__(self):
452
    """Constructs a new GanetiContext object.
453

454
    There should be only a GanetiContext object at any time, so this
455
    function raises an error if this is not the case.
456

457
    """
458
    assert self.__class__._instance is None, "double GanetiContext instance"
459

    
460
    # Create global configuration object
461
    self.cfg = config.ConfigWriter()
462

    
463
    # Locking manager
464
    self.glm = locking.GanetiLockManager(
465
                self.cfg.GetNodeList(),
466
                self.cfg.GetNodeGroupList(),
467
                self.cfg.GetInstanceList())
468

    
469
    self.cfg.SetContext(self)
470

    
471
    # RPC runner
472
    self.rpc = rpc.RpcRunner(self)
473

    
474
    # Job queue
475
    self.jobqueue = jqueue.JobQueue(self)
476

    
477
    # setting this also locks the class against attribute modifications
478
    self.__class__._instance = self
479

    
480
  def __setattr__(self, name, value):
481
    """Setting GanetiContext attributes is forbidden after initialization.
482

483
    """
484
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
485
    object.__setattr__(self, name, value)
486

    
487
  def AddNode(self, node, ec_id):
488
    """Adds a node to the configuration and lock manager.
489

490
    """
491
    # Add it to the configuration
492
    self.cfg.AddNode(node, ec_id)
493

    
494
    # If preseeding fails it'll not be added
495
    self.jobqueue.AddNode(node)
496

    
497
    # Add the new node to the Ganeti Lock Manager
498
    self.glm.add(locking.LEVEL_NODE, node.name)
499
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
500

    
501
  def ReaddNode(self, node):
502
    """Updates a node that's already in the configuration
503

504
    """
505
    # Synchronize the queue again
506
    self.jobqueue.AddNode(node)
507

    
508
  def RemoveNode(self, name):
509
    """Removes a node from the configuration and lock manager.
510

511
    """
512
    # Remove node from configuration
513
    self.cfg.RemoveNode(name)
514

    
515
    # Notify job queue
516
    self.jobqueue.RemoveNode(name)
517

    
518
    # Remove the node from the Ganeti Lock Manager
519
    self.glm.remove(locking.LEVEL_NODE, name)
520
    self.glm.remove(locking.LEVEL_NODE_RES, name)
521

    
522

    
523
def _SetWatcherPause(until):
524
  """Creates or removes the watcher pause file.
525

526
  @type until: None or int
527
  @param until: Unix timestamp saying until when the watcher shouldn't run
528

529
  """
530
  if until is None:
531
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
532
  else:
533
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
534
                    data="%d\n" % (until, ))
535

    
536
  return until
537

    
538

    
539
@rpc.RunWithRPC
540
def CheckAgreement():
541
  """Check the agreement on who is the master.
542

543
  The function uses a very simple algorithm: we must get more positive
544
  than negative answers. Since in most of the cases we are the master,
545
  we'll use our own config file for getting the node list. In the
546
  future we could collect the current node list from our (possibly
547
  obsolete) known nodes.
548

549
  In order to account for cold-start of all nodes, we retry for up to
550
  a minute until we get a real answer as the top-voted one. If the
551
  nodes are more out-of-sync, for now manual startup of the master
552
  should be attempted.
553

554
  Note that for a even number of nodes cluster, we need at least half
555
  of the nodes (beside ourselves) to vote for us. This creates a
556
  problem on two-node clusters, since in this case we require the
557
  other node to be up too to confirm our status.
558

559
  """
560
  myself = netutils.Hostname.GetSysName()
561
  #temp instantiation of a config writer, used only to get the node list
562
  cfg = config.ConfigWriter()
563
  node_list = cfg.GetNodeList()
564
  del cfg
565
  retries = 6
566
  while retries > 0:
567
    votes = bootstrap.GatherMasterVotes(node_list)
568
    if not votes:
569
      # empty node list, this is a one node cluster
570
      return True
571
    if votes[0][0] is None:
572
      retries -= 1
573
      time.sleep(10)
574
      continue
575
    break
576
  if retries == 0:
577
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
578
                     " after multiple retries. Aborting startup")
579
    logging.critical("Use the --no-voting option if you understand what"
580
                     " effects it has on the cluster state")
581
    return False
582
  # here a real node is at the top of the list
583
  all_votes = sum(item[1] for item in votes)
584
  top_node, top_votes = votes[0]
585

    
586
  result = False
587
  if top_node != myself:
588
    logging.critical("It seems we are not the master (top-voted node"
589
                     " is %s with %d out of %d votes)", top_node, top_votes,
590
                     all_votes)
591
  elif top_votes < all_votes - top_votes:
592
    logging.critical("It seems we are not the master (%d votes for,"
593
                     " %d votes against)", top_votes, all_votes - top_votes)
594
  else:
595
    result = True
596

    
597
  return result
598

    
599

    
600
@rpc.RunWithRPC
601
def ActivateMasterIP():
602
  # activate ip
603
  cfg = config.ConfigWriter()
604
  master_params = cfg.GetMasterNetworkParameters()
605
  ems = cfg.GetUseExternalMipScript()
606
  runner = rpc.BootstrapRunner()
607
  result = runner.call_node_activate_master_ip(master_params.name,
608
                                               master_params, ems)
609

    
610
  msg = result.fail_msg
611
  if msg:
612
    logging.error("Can't activate master IP address: %s", msg)
613

    
614

    
615
def CheckMasterd(options, args):
616
  """Initial checks whether to run or exit with a failure.
617

618
  """
619
  if args: # masterd doesn't take any arguments
620
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
621
    sys.exit(constants.EXIT_FAILURE)
622

    
623
  ssconf.CheckMaster(options.debug)
624

    
625
  try:
626
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
627
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
628
  except KeyError:
629
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
630
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
631
    sys.exit(constants.EXIT_FAILURE)
632

    
633
  # Check the configuration is sane before anything else
634
  try:
635
    config.ConfigWriter()
636
  except errors.ConfigVersionMismatch, err:
637
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
638
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
639
    print >> sys.stderr,  \
640
        ("Configuration version mismatch. The current Ganeti software"
641
         " expects version %s, but the on-disk configuration file has"
642
         " version %s. This is likely the result of upgrading the"
643
         " software without running the upgrade procedure. Please contact"
644
         " your cluster administrator or complete the upgrade using the"
645
         " cfgupgrade utility, after reading the upgrade notes." %
646
         (v1, v2))
647
    sys.exit(constants.EXIT_FAILURE)
648
  except errors.ConfigurationError, err:
649
    print >> sys.stderr, \
650
        ("Configuration error while opening the configuration file: %s\n"
651
         "This might be caused by an incomplete software upgrade or"
652
         " by a corrupted configuration file. Until the problem is fixed"
653
         " the master daemon cannot start." % str(err))
654
    sys.exit(constants.EXIT_FAILURE)
655

    
656
  # If CheckMaster didn't fail we believe we are the master, but we have to
657
  # confirm with the other nodes.
658
  if options.no_voting:
659
    if not options.yes_do_it:
660
      sys.stdout.write("The 'no voting' option has been selected.\n")
661
      sys.stdout.write("This is dangerous, please confirm by"
662
                       " typing uppercase 'yes': ")
663
      sys.stdout.flush()
664

    
665
      confirmation = sys.stdin.readline().strip()
666
      if confirmation != "YES":
667
        print >> sys.stderr, "Aborting."
668
        sys.exit(constants.EXIT_FAILURE)
669

    
670
  else:
671
    # CheckAgreement uses RPC and threads, hence it needs to be run in
672
    # a separate process before we call utils.Daemonize in the current
673
    # process.
674
    if not utils.RunInSeparateProcess(CheckAgreement):
675
      sys.exit(constants.EXIT_FAILURE)
676

    
677
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
678
  # separate process.
679

    
680
  # TODO: decide whether failure to activate the master IP is a fatal error
681
  utils.RunInSeparateProcess(ActivateMasterIP)
682

    
683

    
684
def PrepMasterd(options, _):
685
  """Prep master daemon function, executed with the PID file held.
686

687
  """
688
  # This is safe to do as the pid file guarantees against
689
  # concurrent execution.
690
  utils.RemoveFile(constants.MASTER_SOCKET)
691

    
692
  mainloop = daemon.Mainloop()
693
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
694
  return (mainloop, master)
695

    
696

    
697
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
698
  """Main master daemon function, executed with the PID file held.
699

700
  """
701
  (mainloop, master) = prep_data
702
  try:
703
    rpc.Init()
704
    try:
705
      master.setup_queue()
706
      try:
707
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
708
      finally:
709
        master.server_cleanup()
710
    finally:
711
      rpc.Shutdown()
712
  finally:
713
    utils.RemoveFile(constants.MASTER_SOCKET)
714

    
715
  logging.info("Clean master daemon shutdown")
716

    
717

    
718
def Main():
719
  """Main function"""
720
  parser = OptionParser(description="Ganeti master daemon",
721
                        usage="%prog [-f] [-d]",
722
                        version="%%prog (ganeti) %s" %
723
                        constants.RELEASE_VERSION)
724
  parser.add_option("--no-voting", dest="no_voting",
725
                    help="Do not check that the nodes agree on this node"
726
                    " being the master and start the daemon unconditionally",
727
                    default=False, action="store_true")
728
  parser.add_option("--yes-do-it", dest="yes_do_it",
729
                    help="Override interactive check for --no-voting",
730
                    default=False, action="store_true")
731
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
732
                     ExecMasterd, multithreaded=True)