Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 64d7e30f

History | View | Annotate | Download (25.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61
from ganeti import pathutils
62
from ganeti import ht
63

    
64

    
65
CLIENT_REQUEST_WORKERS = 16
66

    
67
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
68
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
69

    
70

    
71
def _LogNewJob(status, info, ops):
72
  """Log information about a recently submitted job.
73

74
  """
75
  op_summary = utils.CommaJoin(op.Summary() for op in ops)
76

    
77
  if status:
78
    logging.info("New job with id %s, summary: %s", info, op_summary)
79
  else:
80
    logging.info("Failed to submit job, reason: '%s', summary: %s",
81
                 info, op_summary)
82

    
83

    
84
class ClientRequestWorker(workerpool.BaseWorker):
85
  # pylint: disable=W0221
86
  def RunTask(self, server, message, client):
87
    """Process the request.
88

89
    """
90
    client_ops = ClientOps(server)
91

    
92
    try:
93
      (method, args, version) = luxi.ParseRequest(message)
94
    except luxi.ProtocolError, err:
95
      logging.error("Protocol Error: %s", err)
96
      client.close_log()
97
      return
98

    
99
    success = False
100
    try:
101
      # Verify client's version if there was one in the request
102
      if version is not None and version != constants.LUXI_VERSION:
103
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
104
                               (constants.LUXI_VERSION, version))
105

    
106
      result = client_ops.handle_request(method, args)
107
      success = True
108
    except errors.GenericError, err:
109
      logging.exception("Unexpected exception")
110
      success = False
111
      result = errors.EncodeException(err)
112
    except:
113
      logging.exception("Unexpected exception")
114
      err = sys.exc_info()
115
      result = "Caught exception: %s" % str(err[1])
116

    
117
    try:
118
      reply = luxi.FormatResponse(success, result)
119
      client.send_message(reply)
120
      # awake the main thread so that it can write out the data.
121
      server.awaker.signal()
122
    except: # pylint: disable=W0702
123
      logging.exception("Send error")
124
      client.close_log()
125

    
126

    
127
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
128
  """Handler for master peers.
129

130
  """
131
  _MAX_UNHANDLED = 1
132

    
133
  def __init__(self, server, connected_socket, client_address, family):
134
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
135
                                                 client_address,
136
                                                 constants.LUXI_EOM,
137
                                                 family, self._MAX_UNHANDLED)
138
    self.server = server
139

    
140
  def handle_message(self, message, _):
141
    self.server.request_workers.AddTask((self.server, message, self))
142

    
143

    
144
class _MasterShutdownCheck:
145
  """Logic for master daemon shutdown.
146

147
  """
148
  #: How long to wait between checks
149
  _CHECK_INTERVAL = 5.0
150

    
151
  #: How long to wait after all jobs are done (e.g. to give clients time to
152
  #: retrieve the job status)
153
  _SHUTDOWN_LINGER = 5.0
154

    
155
  def __init__(self):
156
    """Initializes this class.
157

158
    """
159
    self._had_active_jobs = None
160
    self._linger_timeout = None
161

    
162
  def __call__(self, jq_prepare_result):
163
    """Determines if master daemon is ready for shutdown.
164

165
    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
166
    @rtype: None or number
167
    @return: None if master daemon is ready, timeout if the check must be
168
             repeated
169

170
    """
171
    if jq_prepare_result:
172
      # Check again shortly
173
      logging.info("Job queue has been notified for shutdown but is still"
174
                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
175
      self._had_active_jobs = True
176
      return self._CHECK_INTERVAL
177

    
178
    if not self._had_active_jobs:
179
      # Can shut down as there were no active jobs on the first check
180
      return None
181

    
182
    # No jobs are running anymore, but maybe some clients want to collect some
183
    # information. Give them a short amount of time.
184
    if self._linger_timeout is None:
185
      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
186

    
187
    remaining = self._linger_timeout.Remaining()
188

    
189
    logging.info("Job queue no longer busy; shutting down master daemon"
190
                 " in %s seconds", remaining)
191

    
192
    # TODO: Should the master daemon socket be closed at this point? Doing so
193
    # wouldn't affect existing connections.
194

    
195
    if remaining < 0:
196
      return None
197
    else:
198
      return remaining
199

    
200

    
201
class MasterServer(daemon.AsyncStreamServer):
202
  """Master Server.
203

204
  This is the main asynchronous master server. It handles connections to the
205
  master socket.
206

207
  """
208
  family = socket.AF_UNIX
209

    
210
  def __init__(self, address, uid, gid):
211
    """MasterServer constructor
212

213
    @param address: the unix socket address to bind the MasterServer to
214
    @param uid: The uid of the owner of the socket
215
    @param gid: The gid of the owner of the socket
216

217
    """
218
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
219
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
220
    os.chmod(temp_name, 0770)
221
    os.chown(temp_name, uid, gid)
222
    os.rename(temp_name, address)
223

    
224
    self.awaker = daemon.AsyncAwaker()
225

    
226
    # We'll only start threads once we've forked.
227
    self.context = None
228
    self.request_workers = None
229

    
230
    self._shutdown_check = None
231

    
232
  def handle_connection(self, connected_socket, client_address):
233
    # TODO: add connection count and limit the number of open connections to a
234
    # maximum number to avoid breaking for lack of file descriptors or memory.
235
    MasterClientHandler(self, connected_socket, client_address, self.family)
236

    
237
  def setup_queue(self):
238
    self.context = GanetiContext()
239
    self.request_workers = workerpool.WorkerPool("ClientReq",
240
                                                 CLIENT_REQUEST_WORKERS,
241
                                                 ClientRequestWorker)
242

    
243
  def WaitForShutdown(self):
244
    """Prepares server for shutdown.
245

246
    """
247
    if self._shutdown_check is None:
248
      self._shutdown_check = _MasterShutdownCheck()
249

    
250
    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
251

    
252
  def server_cleanup(self):
253
    """Cleanup the server.
254

255
    This involves shutting down the processor threads and the master
256
    socket.
257

258
    """
259
    try:
260
      self.close()
261
    finally:
262
      if self.request_workers:
263
        self.request_workers.TerminateWorkers()
264
      if self.context:
265
        self.context.jobqueue.Shutdown()
266

    
267

    
268
class ClientOps:
269
  """Class holding high-level client operations."""
270
  def __init__(self, server):
271
    self.server = server
272

    
273
  def handle_request(self, method, args): # pylint: disable=R0911
274
    context = self.server.context
275
    queue = context.jobqueue
276

    
277
    # TODO: Parameter validation
278
    if not isinstance(args, (tuple, list)):
279
      logging.info("Received invalid arguments of type '%s'", type(args))
280
      raise ValueError("Invalid arguments type '%s'" % type(args))
281

    
282
    if method not in luxi.REQ_ALL:
283
      logging.info("Received invalid request '%s'", method)
284
      raise ValueError("Invalid operation '%s'" % method)
285

    
286
    # TODO: Rewrite to not exit in each 'if/elif' branch
287

    
288
    if method == luxi.REQ_SUBMIT_JOB:
289
      logging.info("Receiving new job")
290
      (job_def, ) = args
291
      ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def]
292
      job_id = queue.SubmitJob(ops)
293
      _LogNewJob(True, job_id, ops)
294
      return job_id
295

    
296
    elif method == luxi.REQ_SUBMIT_MANY_JOBS:
297
      logging.info("Receiving multiple jobs")
298
      (job_defs, ) = args
299
      jobs = []
300
      for ops in job_defs:
301
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
302
      job_ids = queue.SubmitManyJobs(jobs)
303
      for ((status, job_id), ops) in zip(job_ids, jobs):
304
        _LogNewJob(status, job_id, ops)
305
      return job_ids
306

    
307
    elif method == luxi.REQ_CANCEL_JOB:
308
      (job_id, ) = args
309
      logging.info("Received job cancel request for %s", job_id)
310
      return queue.CancelJob(job_id)
311

    
312
    elif method == luxi.REQ_CHANGE_JOB_PRIORITY:
313
      (job_id, priority) = args
314
      logging.info("Received request to change priority for job %s to %s",
315
                   job_id, priority)
316
      return queue.ChangeJobPriority(job_id, priority)
317

    
318
    elif method == luxi.REQ_ARCHIVE_JOB:
319
      (job_id, ) = args
320
      logging.info("Received job archive request for %s", job_id)
321
      return queue.ArchiveJob(job_id)
322

    
323
    elif method == luxi.REQ_AUTO_ARCHIVE_JOBS:
324
      (age, timeout) = args
325
      logging.info("Received job autoarchive request for age %s, timeout %s",
326
                   age, timeout)
327
      return queue.AutoArchiveJobs(age, timeout)
328

    
329
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
330
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
331
      logging.info("Received job poll request for %s", job_id)
332
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
333
                                     prev_log_serial, timeout)
334

    
335
    elif method == luxi.REQ_QUERY:
336
      (what, fields, qfilter) = args
337

    
338
      if what in constants.QR_VIA_OP:
339
        result = self._Query(opcodes.OpQuery(what=what, fields=fields,
340
                                             qfilter=qfilter))
341
      elif what == constants.QR_LOCK:
342
        if qfilter is not None:
343
          raise errors.OpPrereqError("Lock queries can't be filtered",
344
                                     errors.ECODE_INVAL)
345
        return context.glm.QueryLocks(fields)
346
      elif what == constants.QR_JOB:
347
        return queue.QueryJobs(fields, qfilter)
348
      elif what in constants.QR_VIA_LUXI:
349
        raise NotImplementedError
350
      else:
351
        raise errors.OpPrereqError("Resource type '%s' unknown" % what,
352
                                   errors.ECODE_INVAL)
353

    
354
      return result
355

    
356
    elif method == luxi.REQ_QUERY_FIELDS:
357
      (what, fields) = args
358
      req = objects.QueryFieldsRequest(what=what, fields=fields)
359

    
360
      try:
361
        fielddefs = query.ALL_FIELDS[req.what]
362
      except KeyError:
363
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
364
                                   errors.ECODE_INVAL)
365

    
366
      return query.QueryFields(fielddefs, req.fields)
367

    
368
    elif method == luxi.REQ_QUERY_JOBS:
369
      (job_ids, fields) = args
370
      if isinstance(job_ids, (tuple, list)) and job_ids:
371
        msg = utils.CommaJoin(job_ids)
372
      else:
373
        msg = str(job_ids)
374
      logging.info("Received job query request for %s", msg)
375
      return queue.OldStyleQueryJobs(job_ids, fields)
376

    
377
    elif method == luxi.REQ_QUERY_INSTANCES:
378
      (names, fields, use_locking) = args
379
      logging.info("Received instance query request for %s", names)
380
      if use_locking:
381
        raise errors.OpPrereqError("Sync queries are not allowed",
382
                                   errors.ECODE_INVAL)
383
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
384
                                   use_locking=use_locking)
385
      return self._Query(op)
386

    
387
    elif method == luxi.REQ_QUERY_NODES:
388
      (names, fields, use_locking) = args
389
      logging.info("Received node query request for %s", names)
390
      if use_locking:
391
        raise errors.OpPrereqError("Sync queries are not allowed",
392
                                   errors.ECODE_INVAL)
393
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
394
                               use_locking=use_locking)
395
      return self._Query(op)
396

    
397
    elif method == luxi.REQ_QUERY_GROUPS:
398
      (names, fields, use_locking) = args
399
      logging.info("Received group query request for %s", names)
400
      if use_locking:
401
        raise errors.OpPrereqError("Sync queries are not allowed",
402
                                   errors.ECODE_INVAL)
403
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
404
      return self._Query(op)
405

    
406
    elif method == luxi.REQ_QUERY_NETWORKS:
407
      (names, fields, use_locking) = args
408
      logging.info("Received network query request for %s", names)
409
      if use_locking:
410
        raise errors.OpPrereqError("Sync queries are not allowed",
411
                                   errors.ECODE_INVAL)
412
      op = opcodes.OpNetworkQuery(names=names, output_fields=fields)
413
      return self._Query(op)
414

    
415
    elif method == luxi.REQ_QUERY_EXPORTS:
416
      (nodes, use_locking) = args
417
      if use_locking:
418
        raise errors.OpPrereqError("Sync queries are not allowed",
419
                                   errors.ECODE_INVAL)
420
      logging.info("Received exports query request")
421
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
422
      return self._Query(op)
423

    
424
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
425
      (fields, ) = args
426
      logging.info("Received config values query request for %s", fields)
427
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
428
      return self._Query(op)
429

    
430
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
431
      logging.info("Received cluster info query request")
432
      op = opcodes.OpClusterQuery()
433
      return self._Query(op)
434

    
435
    elif method == luxi.REQ_QUERY_TAGS:
436
      (kind, name) = args
437
      logging.info("Received tags query request")
438
      op = opcodes.OpTagsGet(kind=kind, name=name, use_locking=False)
439
      return self._Query(op)
440

    
441
    elif method == luxi.REQ_SET_DRAIN_FLAG:
442
      (drain_flag, ) = args
443
      logging.info("Received queue drain flag change request to %s",
444
                   drain_flag)
445
      return queue.SetDrainFlag(drain_flag)
446

    
447
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
448
      (until, ) = args
449

    
450
      return _SetWatcherPause(context, until)
451

    
452
    else:
453
      logging.critical("Request '%s' in luxi.REQ_ALL, but not known", method)
454
      raise errors.ProgrammerError("Operation '%s' in luxi.REQ_ALL,"
455
                                   " but not implemented" % method)
456

    
457
  def _Query(self, op):
458
    """Runs the specified opcode and returns the result.
459

460
    """
461
    # Queries don't have a job id
462
    proc = mcpu.Processor(self.server.context, None, enable_locks=False)
463

    
464
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
465
    # Consider using a timeout for retries.
466
    return proc.ExecOpCode(op, None)
467

    
468

    
469
class GanetiContext(object):
470
  """Context common to all ganeti threads.
471

472
  This class creates and holds common objects shared by all threads.
473

474
  """
475
  # pylint: disable=W0212
476
  # we do want to ensure a singleton here
477
  _instance = None
478

    
479
  def __init__(self):
480
    """Constructs a new GanetiContext object.
481

482
    There should be only a GanetiContext object at any time, so this
483
    function raises an error if this is not the case.
484

485
    """
486
    assert self.__class__._instance is None, "double GanetiContext instance"
487

    
488
    # Create global configuration object
489
    self.cfg = config.ConfigWriter()
490

    
491
    # Locking manager
492
    self.glm = locking.GanetiLockManager(
493
      self.cfg.GetNodeList(),
494
      self.cfg.GetNodeGroupList(),
495
      self.cfg.GetInstanceList(),
496
      self.cfg.GetNetworkList())
497

    
498
    self.cfg.SetContext(self)
499

    
500
    # RPC runner
501
    self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
502

    
503
    # Job queue
504
    self.jobqueue = jqueue.JobQueue(self)
505

    
506
    # setting this also locks the class against attribute modifications
507
    self.__class__._instance = self
508

    
509
  def __setattr__(self, name, value):
510
    """Setting GanetiContext attributes is forbidden after initialization.
511

512
    """
513
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
514
    object.__setattr__(self, name, value)
515

    
516
  def AddNode(self, node, ec_id):
517
    """Adds a node to the configuration and lock manager.
518

519
    """
520
    # Add it to the configuration
521
    self.cfg.AddNode(node, ec_id)
522

    
523
    # If preseeding fails it'll not be added
524
    self.jobqueue.AddNode(node)
525

    
526
    # Add the new node to the Ganeti Lock Manager
527
    self.glm.add(locking.LEVEL_NODE, node.name)
528
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
529

    
530
  def ReaddNode(self, node):
531
    """Updates a node that's already in the configuration
532

533
    """
534
    # Synchronize the queue again
535
    self.jobqueue.AddNode(node)
536

    
537
  def RemoveNode(self, name):
538
    """Removes a node from the configuration and lock manager.
539

540
    """
541
    # Remove node from configuration
542
    self.cfg.RemoveNode(name)
543

    
544
    # Notify job queue
545
    self.jobqueue.RemoveNode(name)
546

    
547
    # Remove the node from the Ganeti Lock Manager
548
    self.glm.remove(locking.LEVEL_NODE, name)
549
    self.glm.remove(locking.LEVEL_NODE_RES, name)
550

    
551

    
552
def _SetWatcherPause(context, until):
553
  """Creates or removes the watcher pause file.
554

555
  @type context: L{GanetiContext}
556
  @param context: Global Ganeti context
557
  @type until: None or int
558
  @param until: Unix timestamp saying until when the watcher shouldn't run
559

560
  """
561
  node_names = context.cfg.GetNodeList()
562

    
563
  if until is None:
564
    logging.info("Received request to no longer pause watcher")
565
  else:
566
    if not ht.TNumber(until):
567
      raise TypeError("Duration must be numeric")
568

    
569
    if until < time.time():
570
      raise errors.GenericError("Unable to set pause end time in the past")
571

    
572
    logging.info("Received request to pause watcher until %s", until)
573

    
574
  result = context.rpc.call_set_watcher_pause(node_names, until)
575

    
576
  errmsg = utils.CommaJoin("%s (%s)" % (node_name, nres.fail_msg)
577
                           for (node_name, nres) in result.items()
578
                           if nres.fail_msg and not nres.offline)
579
  if errmsg:
580
    raise errors.OpExecError("Watcher pause was set where possible, but failed"
581
                             " on the following node(s): %s" % errmsg)
582

    
583
  return until
584

    
585

    
586
@rpc.RunWithRPC
587
def CheckAgreement():
588
  """Check the agreement on who is the master.
589

590
  The function uses a very simple algorithm: we must get more positive
591
  than negative answers. Since in most of the cases we are the master,
592
  we'll use our own config file for getting the node list. In the
593
  future we could collect the current node list from our (possibly
594
  obsolete) known nodes.
595

596
  In order to account for cold-start of all nodes, we retry for up to
597
  a minute until we get a real answer as the top-voted one. If the
598
  nodes are more out-of-sync, for now manual startup of the master
599
  should be attempted.
600

601
  Note that for a even number of nodes cluster, we need at least half
602
  of the nodes (beside ourselves) to vote for us. This creates a
603
  problem on two-node clusters, since in this case we require the
604
  other node to be up too to confirm our status.
605

606
  """
607
  myself = netutils.Hostname.GetSysName()
608
  #temp instantiation of a config writer, used only to get the node list
609
  cfg = config.ConfigWriter()
610
  node_list = cfg.GetNodeList()
611
  del cfg
612
  retries = 6
613
  while retries > 0:
614
    votes = bootstrap.GatherMasterVotes(node_list)
615
    if not votes:
616
      # empty node list, this is a one node cluster
617
      return True
618
    if votes[0][0] is None:
619
      retries -= 1
620
      time.sleep(10)
621
      continue
622
    break
623
  if retries == 0:
624
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
625
                     " after multiple retries. Aborting startup")
626
    logging.critical("Use the --no-voting option if you understand what"
627
                     " effects it has on the cluster state")
628
    return False
629
  # here a real node is at the top of the list
630
  all_votes = sum(item[1] for item in votes)
631
  top_node, top_votes = votes[0]
632

    
633
  result = False
634
  if top_node != myself:
635
    logging.critical("It seems we are not the master (top-voted node"
636
                     " is %s with %d out of %d votes)", top_node, top_votes,
637
                     all_votes)
638
  elif top_votes < all_votes - top_votes:
639
    logging.critical("It seems we are not the master (%d votes for,"
640
                     " %d votes against)", top_votes, all_votes - top_votes)
641
  else:
642
    result = True
643

    
644
  return result
645

    
646

    
647
@rpc.RunWithRPC
648
def ActivateMasterIP():
649
  # activate ip
650
  cfg = config.ConfigWriter()
651
  master_params = cfg.GetMasterNetworkParameters()
652
  ems = cfg.GetUseExternalMipScript()
653
  runner = rpc.BootstrapRunner()
654
  result = runner.call_node_activate_master_ip(master_params.name,
655
                                               master_params, ems)
656

    
657
  msg = result.fail_msg
658
  if msg:
659
    logging.error("Can't activate master IP address: %s", msg)
660

    
661

    
662
def CheckMasterd(options, args):
663
  """Initial checks whether to run or exit with a failure.
664

665
  """
666
  if args: # masterd doesn't take any arguments
667
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
668
    sys.exit(constants.EXIT_FAILURE)
669

    
670
  ssconf.CheckMaster(options.debug)
671

    
672
  try:
673
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
674
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
675
  except KeyError:
676
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
677
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
678
    sys.exit(constants.EXIT_FAILURE)
679

    
680
  # Determine static runtime architecture information
681
  runtime.InitArchInfo()
682

    
683
  # Check the configuration is sane before anything else
684
  try:
685
    config.ConfigWriter()
686
  except errors.ConfigVersionMismatch, err:
687
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
688
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
689
    print >> sys.stderr,  \
690
        ("Configuration version mismatch. The current Ganeti software"
691
         " expects version %s, but the on-disk configuration file has"
692
         " version %s. This is likely the result of upgrading the"
693
         " software without running the upgrade procedure. Please contact"
694
         " your cluster administrator or complete the upgrade using the"
695
         " cfgupgrade utility, after reading the upgrade notes." %
696
         (v1, v2))
697
    sys.exit(constants.EXIT_FAILURE)
698
  except errors.ConfigurationError, err:
699
    print >> sys.stderr, \
700
        ("Configuration error while opening the configuration file: %s\n"
701
         "This might be caused by an incomplete software upgrade or"
702
         " by a corrupted configuration file. Until the problem is fixed"
703
         " the master daemon cannot start." % str(err))
704
    sys.exit(constants.EXIT_FAILURE)
705

    
706
  # If CheckMaster didn't fail we believe we are the master, but we have to
707
  # confirm with the other nodes.
708
  if options.no_voting:
709
    if not options.yes_do_it:
710
      sys.stdout.write("The 'no voting' option has been selected.\n")
711
      sys.stdout.write("This is dangerous, please confirm by"
712
                       " typing uppercase 'yes': ")
713
      sys.stdout.flush()
714

    
715
      confirmation = sys.stdin.readline().strip()
716
      if confirmation != "YES":
717
        print >> sys.stderr, "Aborting."
718
        sys.exit(constants.EXIT_FAILURE)
719

    
720
  else:
721
    # CheckAgreement uses RPC and threads, hence it needs to be run in
722
    # a separate process before we call utils.Daemonize in the current
723
    # process.
724
    if not utils.RunInSeparateProcess(CheckAgreement):
725
      sys.exit(constants.EXIT_FAILURE)
726

    
727
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
728
  # separate process.
729

    
730
  # TODO: decide whether failure to activate the master IP is a fatal error
731
  utils.RunInSeparateProcess(ActivateMasterIP)
732

    
733

    
734
def PrepMasterd(options, _):
735
  """Prep master daemon function, executed with the PID file held.
736

737
  """
738
  # This is safe to do as the pid file guarantees against
739
  # concurrent execution.
740
  utils.RemoveFile(pathutils.MASTER_SOCKET)
741

    
742
  mainloop = daemon.Mainloop()
743
  master = MasterServer(pathutils.MASTER_SOCKET, options.uid, options.gid)
744
  return (mainloop, master)
745

    
746

    
747
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
748
  """Main master daemon function, executed with the PID file held.
749

750
  """
751
  (mainloop, master) = prep_data
752
  try:
753
    rpc.Init()
754
    try:
755
      master.setup_queue()
756
      try:
757
        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
758
      finally:
759
        master.server_cleanup()
760
    finally:
761
      rpc.Shutdown()
762
  finally:
763
    utils.RemoveFile(pathutils.MASTER_SOCKET)
764

    
765
  logging.info("Clean master daemon shutdown")
766

    
767

    
768
def Main():
769
  """Main function"""
770
  parser = OptionParser(description="Ganeti master daemon",
771
                        usage="%prog [-f] [-d]",
772
                        version="%%prog (ganeti) %s" %
773
                        constants.RELEASE_VERSION)
774
  parser.add_option("--no-voting", dest="no_voting",
775
                    help="Do not check that the nodes agree on this node"
776
                    " being the master and start the daemon unconditionally",
777
                    default=False, action="store_true")
778
  parser.add_option("--yes-do-it", dest="yes_do_it",
779
                    help="Override interactive check for --no-voting",
780
                    default=False, action="store_true")
781
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
782
                     ExecMasterd, multithreaded=True)