Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 09037780

History | View | Annotate | Download (23.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116

    
117
  def __init__(self, server, connected_socket, client_address, family):
118
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119
                                                 client_address,
120
                                                 constants.LUXI_EOM,
121
                                                 family, self._MAX_UNHANDLED)
122
    self.server = server
123

    
124
  def handle_message(self, message, _):
125
    self.server.request_workers.AddTask((self.server, message, self))
126

    
127

    
128
class _MasterShutdownCheck:
129
  """Logic for master daemon shutdown.
130

131
  """
132
  #: How long to wait between checks
133
  _CHECK_INTERVAL = 5.0
134

    
135
  #: How long to wait after all jobs are done (e.g. to give clients time to
136
  #: retrieve the job status)
137
  _SHUTDOWN_LINGER = 5.0
138

    
139
  def __init__(self):
140
    """Initializes this class.
141

142
    """
143
    self._had_active_jobs = None
144
    self._linger_timeout = None
145

    
146
  def __call__(self, jq_prepare_result):
147
    """Determines if master daemon is ready for shutdown.
148

149
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
150
    @rtype: None or number
151
    @return: None if master daemon is ready, timeout if the check must be
152
             repeated
153

154
    """
155
    if jq_prepare_result:
156
      # Check again shortly
157
      logging.info("Job queue has been notified for shutdown but is still"
158
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
159
      self._had_active_jobs = True
160
      return self._CHECK_INTERVAL
161

    
162
    if not self._had_active_jobs:
163
      # Can shut down as there were no active jobs on the first check
164
      return None
165

    
166
    # No jobs are running anymore, but maybe some clients want to collect some
167
    # information. Give them a short amount of time.
168
    if self._linger_timeout is None:
169
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
170

    
171
    remaining = self._linger_timeout.Remaining()
172

    
173
    logging.info("Job queue no longer busy; shutting down master daemon"
174
                 " in %s seconds", remaining)
175

    
176
    # TODO: Should the master daemon socket be closed at this point? Doing so
177
    # wouldn't affect existing connections.
178

    
179
    if remaining < 0:
180
      return None
181
    else:
182
      return remaining
183

    
184

    
185
class MasterServer(daemon.AsyncStreamServer):
186
  """Master Server.
187

188
  This is the main asynchronous master server. It handles connections to the
189
  master socket.
190

191
  """
192
  family = socket.AF_UNIX
193

    
194
  def __init__(self, address, uid, gid):
195
    """MasterServer constructor
196

197
    @param address: the unix socket address to bind the MasterServer to
198
    @param uid: The uid of the owner of the socket
199
    @param gid: The gid of the owner of the socket
200

201
    """
202
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
203
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
204
    os.chmod(temp_name, 0770)
205
    os.chown(temp_name, uid, gid)
206
    os.rename(temp_name, address)
207

    
208
    self.awaker = daemon.AsyncAwaker()
209

    
210
    # We'll only start threads once we've forked.
211
    self.context = None
212
    self.request_workers = None
213

    
214
    self._shutdown_check = None
215

    
216
  def handle_connection(self, connected_socket, client_address):
217
    # TODO: add connection count and limit the number of open connections to a
218
    # maximum number to avoid breaking for lack of file descriptors or memory.
219
    MasterClientHandler(self, connected_socket, client_address, self.family)
220

    
221
  def setup_queue(self):
222
    self.context = GanetiContext()
223
    self.request_workers = workerpool.WorkerPool("ClientReq",
224
                                                 CLIENT_REQUEST_WORKERS,
225
                                                 ClientRequestWorker)
226

    
227
  def WaitForShutdown(self):
228
    """Prepares server for shutdown.
229

230
    """
231
    if self._shutdown_check is None:
232
      self._shutdown_check = _MasterShutdownCheck()
233

    
234
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
235

    
236
  def server_cleanup(self):
237
    """Cleanup the server.
238

239
    This involves shutting down the processor threads and the master
240
    socket.
241

242
    """
243
    try:
244
      self.close()
245
    finally:
246
      if self.request_workers:
247
        self.request_workers.TerminateWorkers()
248
      if self.context:
249
        self.context.jobqueue.Shutdown()
250

    
251

    
252
class ClientOps:
253
  """Class holding high-level client operations."""
254
  def __init__(self, server):
255
    self.server = server
256

    
257
  def handle_request(self, method, args): # pylint: disable=R0911
258
    queue = self.server.context.jobqueue
259

    
260
    # TODO: Parameter validation
261
    if not isinstance(args, (tuple, list)):
262
      logging.info("Received invalid arguments of type '%s'", type(args))
263
      raise ValueError("Invalid arguments type '%s'" % type(args))
264

    
265
    # TODO: Rewrite to not exit in each 'if/elif' branch
266

    
267
    if method == luxi.REQ_SUBMIT_JOB:
268
      logging.info("Received new job")
269
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
270
      return queue.SubmitJob(ops)
271

    
272
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
273
      logging.info("Received multiple jobs")
274
      jobs = []
275
      for ops in args:
276
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
277
      return queue.SubmitManyJobs(jobs)
278

    
279
    elif method == luxi.REQ_CANCEL_JOB:
280
      (job_id, ) = args
281
      logging.info("Received job cancel request for %s", job_id)
282
      return queue.CancelJob(job_id)
283

    
284
    elif method == luxi.REQ_ARCHIVE_JOB:
285
      (job_id, ) = args
286
      logging.info("Received job archive request for %s", job_id)
287
      return queue.ArchiveJob(job_id)
288

    
289
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
290
      (age, timeout) = args
291
      logging.info("Received job autoarchive request for age %s, timeout %s",
292
                   age, timeout)
293
      return queue.AutoArchiveJobs(age, timeout)
294

    
295
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
296
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
297
      logging.info("Received job poll request for %s", job_id)
298
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
299
                                     prev_log_serial, timeout)
300

    
301
    elif method == luxi.REQ_QUERY:
302
      (what, fields, qfilter) = args
303
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
304

    
305
      if req.what in constants.QR_VIA_OP:
306
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
307
                                             qfilter=req.qfilter))
308
      elif req.what == constants.QR_LOCK:
309
        if req.qfilter is not None:
310
          raise errors.OpPrereqError("Lock queries can't be filtered")
311
        return self.server.context.glm.QueryLocks(req.fields)
312
      elif req.what in constants.QR_VIA_LUXI:
313
        raise NotImplementedError
314
      else:
315
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
316
                                   errors.ECODE_INVAL)
317

    
318
      return result
319

    
320
    elif method == luxi.REQ_QUERY_FIELDS:
321
      (what, fields) = args
322
      req = objects.QueryFieldsRequest(what=what, fields=fields)
323

    
324
      try:
325
        fielddefs = query.ALL_FIELDS[req.what]
326
      except KeyError:
327
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
328
                                   errors.ECODE_INVAL)
329

    
330
      return query.QueryFields(fielddefs, req.fields)
331

    
332
    elif method == luxi.REQ_QUERY_JOBS:
333
      (job_ids, fields) = args
334
      if isinstance(job_ids, (tuple, list)) and job_ids:
335
        msg = utils.CommaJoin(job_ids)
336
      else:
337
        msg = str(job_ids)
338
      logging.info("Received job query request for %s", msg)
339
      return queue.QueryJobs(job_ids, fields)
340

    
341
    elif method == luxi.REQ_QUERY_INSTANCES:
342
      (names, fields, use_locking) = args
343
      logging.info("Received instance query request for %s", names)
344
      if use_locking:
345
        raise errors.OpPrereqError("Sync queries are not allowed",
346
                                   errors.ECODE_INVAL)
347
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
348
                                   use_locking=use_locking)
349
      return self._Query(op)
350

    
351
    elif method == luxi.REQ_QUERY_NODES:
352
      (names, fields, use_locking) = args
353
      logging.info("Received node query request for %s", names)
354
      if use_locking:
355
        raise errors.OpPrereqError("Sync queries are not allowed",
356
                                   errors.ECODE_INVAL)
357
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
358
                               use_locking=use_locking)
359
      return self._Query(op)
360

    
361
    elif method == luxi.REQ_QUERY_GROUPS:
362
      (names, fields, use_locking) = args
363
      logging.info("Received group query request for %s", names)
364
      if use_locking:
365
        raise errors.OpPrereqError("Sync queries are not allowed",
366
                                   errors.ECODE_INVAL)
367
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
368
      return self._Query(op)
369

    
370
    elif method == luxi.REQ_QUERY_EXPORTS:
371
      (nodes, use_locking) = args
372
      if use_locking:
373
        raise errors.OpPrereqError("Sync queries are not allowed",
374
                                   errors.ECODE_INVAL)
375
      logging.info("Received exports query request")
376
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
377
      return self._Query(op)
378

    
379
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
380
      (fields, ) = args
381
      logging.info("Received config values query request for %s", fields)
382
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
383
      return self._Query(op)
384

    
385
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
386
      logging.info("Received cluster info query request")
387
      op = opcodes.OpClusterQuery()
388
      return self._Query(op)
389

    
390
    elif method == luxi.REQ_QUERY_TAGS:
391
      (kind, name) = args
392
      logging.info("Received tags query request")
393
      op = opcodes.OpTagsGet(kind=kind, name=name)
394
      return self._Query(op)
395

    
396
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
397
      (drain_flag, ) = args
398
      logging.info("Received queue drain flag change request to %s",
399
                   drain_flag)
400
      return queue.SetDrainFlag(drain_flag)
401

    
402
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
403
      (until, ) = args
404

    
405
      if until is None:
406
        logging.info("Received request to no longer pause the watcher")
407
      else:
408
        if not isinstance(until, (int, float)):
409
          raise TypeError("Duration must be an integer or float")
410

    
411
        if until < time.time():
412
          raise errors.GenericError("Unable to set pause end time in the past")
413

    
414
        logging.info("Received request to pause the watcher until %s", until)
415

    
416
      return _SetWatcherPause(until)
417

    
418
    else:
419
      logging.info("Received invalid request '%s'", method)
420
      raise ValueError("Invalid operation '%s'" % method)
421

    
422
  def _Query(self, op):
423
    """Runs the specified opcode and returns the result.
424

425
    """
426
    # Queries don't have a job id
427
    proc = mcpu.Processor(self.server.context, None)
428

    
429
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
430
    # Consider using a timeout for retries.
431
    return proc.ExecOpCode(op, None)
432

    
433

    
434
class GanetiContext(object):
435
  """Context common to all ganeti threads.
436

437
  This class creates and holds common objects shared by all threads.
438

439
  """
440
  # pylint: disable=W0212
441
  # we do want to ensure a singleton here
442
  _instance = None
443

    
444
  def __init__(self):
445
    """Constructs a new GanetiContext object.
446

447
    There should be only a GanetiContext object at any time, so this
448
    function raises an error if this is not the case.
449

450
    """
451
    assert self.__class__._instance is None, "double GanetiContext instance"
452

    
453
    # Create global configuration object
454
    self.cfg = config.ConfigWriter()
455

    
456
    # Locking manager
457
    self.glm = locking.GanetiLockManager(
458
                self.cfg.GetNodeList(),
459
                self.cfg.GetNodeGroupList(),
460
                self.cfg.GetInstanceList())
461

    
462
    self.cfg.SetContext(self)
463

    
464
    # RPC runner
465
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
466

    
467
    # Job queue
468
    self.jobqueue = jqueue.JobQueue(self)
469

    
470
    # setting this also locks the class against attribute modifications
471
    self.__class__._instance = self
472

    
473
  def __setattr__(self, name, value):
474
    """Setting GanetiContext attributes is forbidden after initialization.
475

476
    """
477
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
478
    object.__setattr__(self, name, value)
479

    
480
  def AddNode(self, node, ec_id):
481
    """Adds a node to the configuration and lock manager.
482

483
    """
484
    # Add it to the configuration
485
    self.cfg.AddNode(node, ec_id)
486

    
487
    # If preseeding fails it'll not be added
488
    self.jobqueue.AddNode(node)
489

    
490
    # Add the new node to the Ganeti Lock Manager
491
    self.glm.add(locking.LEVEL_NODE, node.name)
492
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
493

    
494
  def ReaddNode(self, node):
495
    """Updates a node that's already in the configuration
496

497
    """
498
    # Synchronize the queue again
499
    self.jobqueue.AddNode(node)
500

    
501
  def RemoveNode(self, name):
502
    """Removes a node from the configuration and lock manager.
503

504
    """
505
    # Remove node from configuration
506
    self.cfg.RemoveNode(name)
507

    
508
    # Notify job queue
509
    self.jobqueue.RemoveNode(name)
510

    
511
    # Remove the node from the Ganeti Lock Manager
512
    self.glm.remove(locking.LEVEL_NODE, name)
513
    self.glm.remove(locking.LEVEL_NODE_RES, name)
514

    
515

    
516
def _SetWatcherPause(until):
517
  """Creates or removes the watcher pause file.
518

519
  @type until: None or int
520
  @param until: Unix timestamp saying until when the watcher shouldn't run
521

522
  """
523
  if until is None:
524
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
525
  else:
526
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
527
                    data="%d\n" % (until, ))
528

    
529
  return until
530

    
531

    
532
@rpc.RunWithRPC
533
def CheckAgreement():
534
  """Check the agreement on who is the master.
535

536
  The function uses a very simple algorithm: we must get more positive
537
  than negative answers. Since in most of the cases we are the master,
538
  we'll use our own config file for getting the node list. In the
539
  future we could collect the current node list from our (possibly
540
  obsolete) known nodes.
541

542
  In order to account for cold-start of all nodes, we retry for up to
543
  a minute until we get a real answer as the top-voted one. If the
544
  nodes are more out-of-sync, for now manual startup of the master
545
  should be attempted.
546

547
  Note that for a even number of nodes cluster, we need at least half
548
  of the nodes (beside ourselves) to vote for us. This creates a
549
  problem on two-node clusters, since in this case we require the
550
  other node to be up too to confirm our status.
551

552
  """
553
  myself = netutils.Hostname.GetSysName()
554
  #temp instantiation of a config writer, used only to get the node list
555
  cfg = config.ConfigWriter()
556
  node_list = cfg.GetNodeList()
557
  del cfg
558
  retries = 6
559
  while retries > 0:
560
    votes = bootstrap.GatherMasterVotes(node_list)
561
    if not votes:
562
      # empty node list, this is a one node cluster
563
      return True
564
    if votes[0][0] is None:
565
      retries -= 1
566
      time.sleep(10)
567
      continue
568
    break
569
  if retries == 0:
570
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
571
                     " after multiple retries. Aborting startup")
572
    logging.critical("Use the --no-voting option if you understand what"
573
                     " effects it has on the cluster state")
574
    return False
575
  # here a real node is at the top of the list
576
  all_votes = sum(item[1] for item in votes)
577
  top_node, top_votes = votes[0]
578

    
579
  result = False
580
  if top_node != myself:
581
    logging.critical("It seems we are not the master (top-voted node"
582
                     " is %s with %d out of %d votes)", top_node, top_votes,
583
                     all_votes)
584
  elif top_votes < all_votes - top_votes:
585
    logging.critical("It seems we are not the master (%d votes for,"
586
                     " %d votes against)", top_votes, all_votes - top_votes)
587
  else:
588
    result = True
589

    
590
  return result
591

    
592

    
593
@rpc.RunWithRPC
594
def ActivateMasterIP():
595
  # activate ip
596
  cfg = config.ConfigWriter()
597
  master_params = cfg.GetMasterNetworkParameters()
598
  ems = cfg.GetUseExternalMipScript()
599
  runner = rpc.BootstrapRunner()
600
  result = runner.call_node_activate_master_ip(master_params.name,
601
                                               master_params, ems)
602

    
603
  msg = result.fail_msg
604
  if msg:
605
    logging.error("Can't activate master IP address: %s", msg)
606

    
607

    
608
def CheckMasterd(options, args):
609
  """Initial checks whether to run or exit with a failure.
610

611
  """
612
  if args: # masterd doesn't take any arguments
613
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
614
    sys.exit(constants.EXIT_FAILURE)
615

    
616
  ssconf.CheckMaster(options.debug)
617

    
618
  try:
619
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
620
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
621
  except KeyError:
622
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
623
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
624
    sys.exit(constants.EXIT_FAILURE)
625

    
626
  # Check the configuration is sane before anything else
627
  try:
628
    config.ConfigWriter()
629
  except errors.ConfigVersionMismatch, err:
630
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
631
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
632
    print >> sys.stderr,  \
633
        ("Configuration version mismatch. The current Ganeti software"
634
         " expects version %s, but the on-disk configuration file has"
635
         " version %s. This is likely the result of upgrading the"
636
         " software without running the upgrade procedure. Please contact"
637
         " your cluster administrator or complete the upgrade using the"
638
         " cfgupgrade utility, after reading the upgrade notes." %
639
         (v1, v2))
640
    sys.exit(constants.EXIT_FAILURE)
641
  except errors.ConfigurationError, err:
642
    print >> sys.stderr, \
643
        ("Configuration error while opening the configuration file: %s\n"
644
         "This might be caused by an incomplete software upgrade or"
645
         " by a corrupted configuration file. Until the problem is fixed"
646
         " the master daemon cannot start." % str(err))
647
    sys.exit(constants.EXIT_FAILURE)
648

    
649
  # If CheckMaster didn't fail we believe we are the master, but we have to
650
  # confirm with the other nodes.
651
  if options.no_voting:
652
    if not options.yes_do_it:
653
      sys.stdout.write("The 'no voting' option has been selected.\n")
654
      sys.stdout.write("This is dangerous, please confirm by"
655
                       " typing uppercase 'yes': ")
656
      sys.stdout.flush()
657

    
658
      confirmation = sys.stdin.readline().strip()
659
      if confirmation != "YES":
660
        print >> sys.stderr, "Aborting."
661
        sys.exit(constants.EXIT_FAILURE)
662

    
663
  else:
664
    # CheckAgreement uses RPC and threads, hence it needs to be run in
665
    # a separate process before we call utils.Daemonize in the current
666
    # process.
667
    if not utils.RunInSeparateProcess(CheckAgreement):
668
      sys.exit(constants.EXIT_FAILURE)
669

    
670
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
671
  # separate process.
672

    
673
  # TODO: decide whether failure to activate the master IP is a fatal error
674
  utils.RunInSeparateProcess(ActivateMasterIP)
675

    
676

    
677
def PrepMasterd(options, _):
678
  """Prep master daemon function, executed with the PID file held.
679

680
  """
681
  # This is safe to do as the pid file guarantees against
682
  # concurrent execution.
683
  utils.RemoveFile(constants.MASTER_SOCKET)
684

    
685
  mainloop = daemon.Mainloop()
686
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
687
  return (mainloop, master)
688

    
689

    
690
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
691
  """Main master daemon function, executed with the PID file held.
692

693
  """
694
  (mainloop, master) = prep_data
695
  try:
696
    rpc.Init()
697
    try:
698
      master.setup_queue()
699
      try:
700
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
701
      finally:
702
        master.server_cleanup()
703
    finally:
704
      rpc.Shutdown()
705
  finally:
706
    utils.RemoveFile(constants.MASTER_SOCKET)
707

    
708
  logging.info("Clean master daemon shutdown")
709

    
710

    
711
def Main():
712
  """Main function"""
713
  parser = OptionParser(description="Ganeti master daemon",
714
                        usage="%prog [-f] [-d]",
715
                        version="%%prog (ganeti) %s" %
716
                        constants.RELEASE_VERSION)
717
  parser.add_option("--no-voting", dest="no_voting",
718
                    help="Do not check that the nodes agree on this node"
719
                    " being the master and start the daemon unconditionally",
720
                    default=False, action="store_true")
721
  parser.add_option("--yes-do-it", dest="yes_do_it",
722
                    help="Override interactive check for --no-voting",
723
                    default=False, action="store_true")
724
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
725
                     ExecMasterd, multithreaded=True)