Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ e07f7f7a

History | View | Annotate | Download (23.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116

    
117
  def __init__(self, server, connected_socket, client_address, family):
118
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119
                                                 client_address,
120
                                                 constants.LUXI_EOM,
121
                                                 family, self._MAX_UNHANDLED)
122
    self.server = server
123

    
124
  def handle_message(self, message, _):
125
    self.server.request_workers.AddTask((self.server, message, self))
126

    
127

    
128
class _MasterShutdownCheck:
129
  """Logic for master daemon shutdown.
130

131
  """
132
  #: How long to wait between checks
133
  _CHECK_INTERVAL = 5.0
134

    
135
  #: How long to wait after all jobs are done (e.g. to give clients time to
136
  #: retrieve the job status)
137
  _SHUTDOWN_LINGER = 5.0
138

    
139
  def __init__(self):
140
    """Initializes this class.
141

142
    """
143
    self._had_active_jobs = None
144
    self._linger_timeout = None
145

    
146
  def __call__(self, jq_prepare_result):
147
    """Determines if master daemon is ready for shutdown.
148

149
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
150
    @rtype: None or number
151
    @return: None if master daemon is ready, timeout if the check must be
152
             repeated
153

154
    """
155
    if jq_prepare_result:
156
      # Check again shortly
157
      logging.info("Job queue has been notified for shutdown but is still"
158
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
159
      self._had_active_jobs = True
160
      return self._CHECK_INTERVAL
161

    
162
    if not self._had_active_jobs:
163
      # Can shut down as there were no active jobs on the first check
164
      return None
165

    
166
    # No jobs are running anymore, but maybe some clients want to collect some
167
    # information. Give them a short amount of time.
168
    if self._linger_timeout is None:
169
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
170

    
171
    remaining = self._linger_timeout.Remaining()
172

    
173
    logging.info("Job queue no longer busy; shutting down master daemon"
174
                 " in %s seconds", remaining)
175

    
176
    # TODO: Should the master daemon socket be closed at this point? Doing so
177
    # wouldn't affect existing connections.
178

    
179
    if remaining < 0:
180
      return None
181
    else:
182
      return remaining
183

    
184

    
185
class MasterServer(daemon.AsyncStreamServer):
186
  """Master Server.
187

188
  This is the main asynchronous master server. It handles connections to the
189
  master socket.
190

191
  """
192
  family = socket.AF_UNIX
193

    
194
  def __init__(self, address, uid, gid):
195
    """MasterServer constructor
196

197
    @param address: the unix socket address to bind the MasterServer to
198
    @param uid: The uid of the owner of the socket
199
    @param gid: The gid of the owner of the socket
200

201
    """
202
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
203
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
204
    os.chmod(temp_name, 0770)
205
    os.chown(temp_name, uid, gid)
206
    os.rename(temp_name, address)
207

    
208
    self.awaker = daemon.AsyncAwaker()
209

    
210
    # We'll only start threads once we've forked.
211
    self.context = None
212
    self.request_workers = None
213

    
214
    self._shutdown_check = None
215

    
216
  def handle_connection(self, connected_socket, client_address):
217
    # TODO: add connection count and limit the number of open connections to a
218
    # maximum number to avoid breaking for lack of file descriptors or memory.
219
    MasterClientHandler(self, connected_socket, client_address, self.family)
220

    
221
  def setup_queue(self):
222
    self.context = GanetiContext()
223
    self.request_workers = workerpool.WorkerPool("ClientReq",
224
                                                 CLIENT_REQUEST_WORKERS,
225
                                                 ClientRequestWorker)
226

    
227
  def WaitForShutdown(self):
228
    """Prepares server for shutdown.
229

230
    """
231
    if self._shutdown_check is None:
232
      self._shutdown_check = _MasterShutdownCheck()
233

    
234
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
235

    
236
  def server_cleanup(self):
237
    """Cleanup the server.
238

239
    This involves shutting down the processor threads and the master
240
    socket.
241

242
    """
243
    try:
244
      self.close()
245
    finally:
246
      if self.request_workers:
247
        self.request_workers.TerminateWorkers()
248
      if self.context:
249
        self.context.jobqueue.Shutdown()
250

    
251

    
252
class ClientOps:
253
  """Class holding high-level client operations."""
254
  def __init__(self, server):
255
    self.server = server
256

    
257
  def handle_request(self, method, args): # pylint: disable=R0911
258
    context = self.server.context
259
    queue = context.jobqueue
260

    
261
    # TODO: Parameter validation
262
    if not isinstance(args, (tuple, list)):
263
      logging.info("Received invalid arguments of type '%s'", type(args))
264
      raise ValueError("Invalid arguments type '%s'" % type(args))
265

    
266
    # TODO: Rewrite to not exit in each 'if/elif' branch
267

    
268
    if method == luxi.REQ_SUBMIT_JOB:
269
      logging.info("Received new job")
270
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
271
      return queue.SubmitJob(ops)
272

    
273
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
274
      logging.info("Received multiple jobs")
275
      jobs = []
276
      for ops in args:
277
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
278
      return queue.SubmitManyJobs(jobs)
279

    
280
    elif method == luxi.REQ_CANCEL_JOB:
281
      (job_id, ) = args
282
      logging.info("Received job cancel request for %s", job_id)
283
      return queue.CancelJob(job_id)
284

    
285
    elif method == luxi.REQ_ARCHIVE_JOB:
286
      (job_id, ) = args
287
      logging.info("Received job archive request for %s", job_id)
288
      return queue.ArchiveJob(job_id)
289

    
290
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
291
      (age, timeout) = args
292
      logging.info("Received job autoarchive request for age %s, timeout %s",
293
                   age, timeout)
294
      return queue.AutoArchiveJobs(age, timeout)
295

    
296
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
297
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
298
      logging.info("Received job poll request for %s", job_id)
299
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
300
                                     prev_log_serial, timeout)
301

    
302
    elif method == luxi.REQ_QUERY:
303
      (what, fields, qfilter) = args
304
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
305

    
306
      if req.what in constants.QR_VIA_OP:
307
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
308
                                             qfilter=req.qfilter))
309
      elif req.what == constants.QR_LOCK:
310
        if req.qfilter is not None:
311
          raise errors.OpPrereqError("Lock queries can't be filtered")
312
        return context.glm.QueryLocks(req.fields)
313
      elif req.what == constants.QR_JOB:
314
        return queue.QueryJobs(req.fields, req.qfilter)
315
      elif req.what in constants.QR_VIA_LUXI:
316
        raise NotImplementedError
317
      else:
318
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
319
                                   errors.ECODE_INVAL)
320

    
321
      return result
322

    
323
    elif method == luxi.REQ_QUERY_FIELDS:
324
      (what, fields) = args
325
      req = objects.QueryFieldsRequest(what=what, fields=fields)
326

    
327
      try:
328
        fielddefs = query.ALL_FIELDS[req.what]
329
      except KeyError:
330
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
331
                                   errors.ECODE_INVAL)
332

    
333
      return query.QueryFields(fielddefs, req.fields)
334

    
335
    elif method == luxi.REQ_QUERY_JOBS:
336
      (job_ids, fields) = args
337
      if isinstance(job_ids, (tuple, list)) and job_ids:
338
        msg = utils.CommaJoin(job_ids)
339
      else:
340
        msg = str(job_ids)
341
      logging.info("Received job query request for %s", msg)
342
      return queue.OldStyleQueryJobs(job_ids, fields)
343

    
344
    elif method == luxi.REQ_QUERY_INSTANCES:
345
      (names, fields, use_locking) = args
346
      logging.info("Received instance query request for %s", names)
347
      if use_locking:
348
        raise errors.OpPrereqError("Sync queries are not allowed",
349
                                   errors.ECODE_INVAL)
350
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
351
                                   use_locking=use_locking)
352
      return self._Query(op)
353

    
354
    elif method == luxi.REQ_QUERY_NODES:
355
      (names, fields, use_locking) = args
356
      logging.info("Received node query request for %s", names)
357
      if use_locking:
358
        raise errors.OpPrereqError("Sync queries are not allowed",
359
                                   errors.ECODE_INVAL)
360
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
361
                               use_locking=use_locking)
362
      return self._Query(op)
363

    
364
    elif method == luxi.REQ_QUERY_GROUPS:
365
      (names, fields, use_locking) = args
366
      logging.info("Received group query request for %s", names)
367
      if use_locking:
368
        raise errors.OpPrereqError("Sync queries are not allowed",
369
                                   errors.ECODE_INVAL)
370
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
371
      return self._Query(op)
372

    
373
    elif method == luxi.REQ_QUERY_EXPORTS:
374
      (nodes, use_locking) = args
375
      if use_locking:
376
        raise errors.OpPrereqError("Sync queries are not allowed",
377
                                   errors.ECODE_INVAL)
378
      logging.info("Received exports query request")
379
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
380
      return self._Query(op)
381

    
382
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
383
      (fields, ) = args
384
      logging.info("Received config values query request for %s", fields)
385
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
386
      return self._Query(op)
387

    
388
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
389
      logging.info("Received cluster info query request")
390
      op = opcodes.OpClusterQuery()
391
      return self._Query(op)
392

    
393
    elif method == luxi.REQ_QUERY_TAGS:
394
      (kind, name) = args
395
      logging.info("Received tags query request")
396
      op = opcodes.OpTagsGet(kind=kind, name=name)
397
      return self._Query(op)
398

    
399
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
400
      (drain_flag, ) = args
401
      logging.info("Received queue drain flag change request to %s",
402
                   drain_flag)
403
      return queue.SetDrainFlag(drain_flag)
404

    
405
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
406
      (until, ) = args
407

    
408
      if until is None:
409
        logging.info("Received request to no longer pause the watcher")
410
      else:
411
        if not isinstance(until, (int, float)):
412
          raise TypeError("Duration must be an integer or float")
413

    
414
        if until < time.time():
415
          raise errors.GenericError("Unable to set pause end time in the past")
416

    
417
        logging.info("Received request to pause the watcher until %s", until)
418

    
419
      return _SetWatcherPause(until)
420

    
421
    else:
422
      logging.info("Received invalid request '%s'", method)
423
      raise ValueError("Invalid operation '%s'" % method)
424

    
425
  def _Query(self, op):
426
    """Runs the specified opcode and returns the result.
427

428
    """
429
    # Queries don't have a job id
430
    proc = mcpu.Processor(self.server.context, None)
431

    
432
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
433
    # Consider using a timeout for retries.
434
    return proc.ExecOpCode(op, None)
435

    
436

    
437
class GanetiContext(object):
438
  """Context common to all ganeti threads.
439

440
  This class creates and holds common objects shared by all threads.
441

442
  """
443
  # pylint: disable=W0212
444
  # we do want to ensure a singleton here
445
  _instance = None
446

    
447
  def __init__(self):
448
    """Constructs a new GanetiContext object.
449

450
    There should be only a GanetiContext object at any time, so this
451
    function raises an error if this is not the case.
452

453
    """
454
    assert self.__class__._instance is None, "double GanetiContext instance"
455

    
456
    # Create global configuration object
457
    self.cfg = config.ConfigWriter()
458

    
459
    # Locking manager
460
    self.glm = locking.GanetiLockManager(
461
                self.cfg.GetNodeList(),
462
                self.cfg.GetNodeGroupList(),
463
                self.cfg.GetInstanceList())
464

    
465
    self.cfg.SetContext(self)
466

    
467
    # RPC runner
468
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
469

    
470
    # Job queue
471
    self.jobqueue = jqueue.JobQueue(self)
472

    
473
    # setting this also locks the class against attribute modifications
474
    self.__class__._instance = self
475

    
476
  def __setattr__(self, name, value):
477
    """Setting GanetiContext attributes is forbidden after initialization.
478

479
    """
480
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
481
    object.__setattr__(self, name, value)
482

    
483
  def AddNode(self, node, ec_id):
484
    """Adds a node to the configuration and lock manager.
485

486
    """
487
    # Add it to the configuration
488
    self.cfg.AddNode(node, ec_id)
489

    
490
    # If preseeding fails it'll not be added
491
    self.jobqueue.AddNode(node)
492

    
493
    # Add the new node to the Ganeti Lock Manager
494
    self.glm.add(locking.LEVEL_NODE, node.name)
495
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
496

    
497
  def ReaddNode(self, node):
498
    """Updates a node that's already in the configuration
499

500
    """
501
    # Synchronize the queue again
502
    self.jobqueue.AddNode(node)
503

    
504
  def RemoveNode(self, name):
505
    """Removes a node from the configuration and lock manager.
506

507
    """
508
    # Remove node from configuration
509
    self.cfg.RemoveNode(name)
510

    
511
    # Notify job queue
512
    self.jobqueue.RemoveNode(name)
513

    
514
    # Remove the node from the Ganeti Lock Manager
515
    self.glm.remove(locking.LEVEL_NODE, name)
516
    self.glm.remove(locking.LEVEL_NODE_RES, name)
517

    
518

    
519
def _SetWatcherPause(until):
520
  """Creates or removes the watcher pause file.
521

522
  @type until: None or int
523
  @param until: Unix timestamp saying until when the watcher shouldn't run
524

525
  """
526
  if until is None:
527
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
528
  else:
529
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
530
                    data="%d\n" % (until, ))
531

    
532
  return until
533

    
534

    
535
@rpc.RunWithRPC
536
def CheckAgreement():
537
  """Check the agreement on who is the master.
538

539
  The function uses a very simple algorithm: we must get more positive
540
  than negative answers. Since in most of the cases we are the master,
541
  we'll use our own config file for getting the node list. In the
542
  future we could collect the current node list from our (possibly
543
  obsolete) known nodes.
544

545
  In order to account for cold-start of all nodes, we retry for up to
546
  a minute until we get a real answer as the top-voted one. If the
547
  nodes are more out-of-sync, for now manual startup of the master
548
  should be attempted.
549

550
  Note that for a even number of nodes cluster, we need at least half
551
  of the nodes (beside ourselves) to vote for us. This creates a
552
  problem on two-node clusters, since in this case we require the
553
  other node to be up too to confirm our status.
554

555
  """
556
  myself = netutils.Hostname.GetSysName()
557
  #temp instantiation of a config writer, used only to get the node list
558
  cfg = config.ConfigWriter()
559
  node_list = cfg.GetNodeList()
560
  del cfg
561
  retries = 6
562
  while retries > 0:
563
    votes = bootstrap.GatherMasterVotes(node_list)
564
    if not votes:
565
      # empty node list, this is a one node cluster
566
      return True
567
    if votes[0][0] is None:
568
      retries -= 1
569
      time.sleep(10)
570
      continue
571
    break
572
  if retries == 0:
573
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
574
                     " after multiple retries. Aborting startup")
575
    logging.critical("Use the --no-voting option if you understand what"
576
                     " effects it has on the cluster state")
577
    return False
578
  # here a real node is at the top of the list
579
  all_votes = sum(item[1] for item in votes)
580
  top_node, top_votes = votes[0]
581

    
582
  result = False
583
  if top_node != myself:
584
    logging.critical("It seems we are not the master (top-voted node"
585
                     " is %s with %d out of %d votes)", top_node, top_votes,
586
                     all_votes)
587
  elif top_votes < all_votes - top_votes:
588
    logging.critical("It seems we are not the master (%d votes for,"
589
                     " %d votes against)", top_votes, all_votes - top_votes)
590
  else:
591
    result = True
592

    
593
  return result
594

    
595

    
596
@rpc.RunWithRPC
597
def ActivateMasterIP():
598
  # activate ip
599
  cfg = config.ConfigWriter()
600
  master_params = cfg.GetMasterNetworkParameters()
601
  ems = cfg.GetUseExternalMipScript()
602
  runner = rpc.BootstrapRunner()
603
  result = runner.call_node_activate_master_ip(master_params.name,
604
                                               master_params, ems)
605

    
606
  msg = result.fail_msg
607
  if msg:
608
    logging.error("Can't activate master IP address: %s", msg)
609

    
610

    
611
def CheckMasterd(options, args):
612
  """Initial checks whether to run or exit with a failure.
613

614
  """
615
  if args: # masterd doesn't take any arguments
616
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
617
    sys.exit(constants.EXIT_FAILURE)
618

    
619
  ssconf.CheckMaster(options.debug)
620

    
621
  try:
622
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
623
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
624
  except KeyError:
625
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
626
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
627
    sys.exit(constants.EXIT_FAILURE)
628

    
629
  # Check the configuration is sane before anything else
630
  try:
631
    config.ConfigWriter()
632
  except errors.ConfigVersionMismatch, err:
633
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
634
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
635
    print >> sys.stderr,  \
636
        ("Configuration version mismatch. The current Ganeti software"
637
         " expects version %s, but the on-disk configuration file has"
638
         " version %s. This is likely the result of upgrading the"
639
         " software without running the upgrade procedure. Please contact"
640
         " your cluster administrator or complete the upgrade using the"
641
         " cfgupgrade utility, after reading the upgrade notes." %
642
         (v1, v2))
643
    sys.exit(constants.EXIT_FAILURE)
644
  except errors.ConfigurationError, err:
645
    print >> sys.stderr, \
646
        ("Configuration error while opening the configuration file: %s\n"
647
         "This might be caused by an incomplete software upgrade or"
648
         " by a corrupted configuration file. Until the problem is fixed"
649
         " the master daemon cannot start." % str(err))
650
    sys.exit(constants.EXIT_FAILURE)
651

    
652
  # If CheckMaster didn't fail we believe we are the master, but we have to
653
  # confirm with the other nodes.
654
  if options.no_voting:
655
    if not options.yes_do_it:
656
      sys.stdout.write("The 'no voting' option has been selected.\n")
657
      sys.stdout.write("This is dangerous, please confirm by"
658
                       " typing uppercase 'yes': ")
659
      sys.stdout.flush()
660

    
661
      confirmation = sys.stdin.readline().strip()
662
      if confirmation != "YES":
663
        print >> sys.stderr, "Aborting."
664
        sys.exit(constants.EXIT_FAILURE)
665

    
666
  else:
667
    # CheckAgreement uses RPC and threads, hence it needs to be run in
668
    # a separate process before we call utils.Daemonize in the current
669
    # process.
670
    if not utils.RunInSeparateProcess(CheckAgreement):
671
      sys.exit(constants.EXIT_FAILURE)
672

    
673
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
674
  # separate process.
675

    
676
  # TODO: decide whether failure to activate the master IP is a fatal error
677
  utils.RunInSeparateProcess(ActivateMasterIP)
678

    
679

    
680
def PrepMasterd(options, _):
681
  """Prep master daemon function, executed with the PID file held.
682

683
  """
684
  # This is safe to do as the pid file guarantees against
685
  # concurrent execution.
686
  utils.RemoveFile(constants.MASTER_SOCKET)
687

    
688
  mainloop = daemon.Mainloop()
689
  master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
690
  return (mainloop, master)
691

    
692

    
693
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
694
  """Main master daemon function, executed with the PID file held.
695

696
  """
697
  (mainloop, master) = prep_data
698
  try:
699
    rpc.Init()
700
    try:
701
      master.setup_queue()
702
      try:
703
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
704
      finally:
705
        master.server_cleanup()
706
    finally:
707
      rpc.Shutdown()
708
  finally:
709
    utils.RemoveFile(constants.MASTER_SOCKET)
710

    
711
  logging.info("Clean master daemon shutdown")
712

    
713

    
714
def Main():
715
  """Main function"""
716
  parser = OptionParser(description="Ganeti master daemon",
717
                        usage="%prog [-f] [-d]",
718
                        version="%%prog (ganeti) %s" %
719
                        constants.RELEASE_VERSION)
720
  parser.add_option("--no-voting", dest="no_voting",
721
                    help="Do not check that the nodes agree on this node"
722
                    " being the master and start the daemon unconditionally",
723
                    default=False, action="store_true")
724
  parser.add_option("--yes-do-it", dest="yes_do_it",
725
                    help="Override interactive check for --no-voting",
726
                    default=False, action="store_true")
727
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
728
                     ExecMasterd, multithreaded=True)