Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ b2acdbdc

History | View | Annotate | Download (21.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116

    
117
  def __init__(self, server, connected_socket, client_address, family):
118
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119
                                                 client_address,
120
                                                 constants.LUXI_EOM,
121
                                                 family, self._MAX_UNHANDLED)
122
    self.server = server
123

    
124
  def handle_message(self, message, _):
125
    self.server.request_workers.AddTask((self.server, message, self))
126

    
127

    
128
class MasterServer(daemon.AsyncStreamServer):
129
  """Master Server.
130

131
  This is the main asynchronous master server. It handles connections to the
132
  master socket.
133

134
  """
135
  family = socket.AF_UNIX
136

    
137
  def __init__(self, mainloop, address, uid, gid):
138
    """MasterServer constructor
139

140
    @type mainloop: ganeti.daemon.Mainloop
141
    @param mainloop: Mainloop used to poll for I/O events
142
    @param address: the unix socket address to bind the MasterServer to
143
    @param uid: The uid of the owner of the socket
144
    @param gid: The gid of the owner of the socket
145

146
    """
147
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149
    os.chmod(temp_name, 0770)
150
    os.chown(temp_name, uid, gid)
151
    os.rename(temp_name, address)
152

    
153
    self.mainloop = mainloop
154
    self.awaker = daemon.AsyncAwaker()
155

    
156
    # We'll only start threads once we've forked.
157
    self.context = None
158
    self.request_workers = None
159

    
160
  def handle_connection(self, connected_socket, client_address):
161
    # TODO: add connection count and limit the number of open connections to a
162
    # maximum number to avoid breaking for lack of file descriptors or memory.
163
    MasterClientHandler(self, connected_socket, client_address, self.family)
164

    
165
  def setup_queue(self):
166
    self.context = GanetiContext()
167
    self.request_workers = workerpool.WorkerPool("ClientReq",
168
                                                 CLIENT_REQUEST_WORKERS,
169
                                                 ClientRequestWorker)
170

    
171
  def server_cleanup(self):
172
    """Cleanup the server.
173

174
    This involves shutting down the processor threads and the master
175
    socket.
176

177
    """
178
    try:
179
      self.close()
180
    finally:
181
      if self.request_workers:
182
        self.request_workers.TerminateWorkers()
183
      if self.context:
184
        self.context.jobqueue.Shutdown()
185

    
186

    
187
class ClientOps:
188
  """Class holding high-level client operations."""
189
  def __init__(self, server):
190
    self.server = server
191

    
192
  def handle_request(self, method, args): # pylint: disable=R0911
193
    queue = self.server.context.jobqueue
194

    
195
    # TODO: Parameter validation
196
    if not isinstance(args, (tuple, list)):
197
      logging.info("Received invalid arguments of type '%s'", type(args))
198
      raise ValueError("Invalid arguments type '%s'" % type(args))
199

    
200
    # TODO: Rewrite to not exit in each 'if/elif' branch
201

    
202
    if method == luxi.REQ_SUBMIT_JOB:
203
      logging.info("Received new job")
204
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
205
      return queue.SubmitJob(ops)
206

    
207
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
208
      logging.info("Received multiple jobs")
209
      jobs = []
210
      for ops in args:
211
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
212
      return queue.SubmitManyJobs(jobs)
213

    
214
    elif method == luxi.REQ_CANCEL_JOB:
215
      (job_id, ) = args
216
      logging.info("Received job cancel request for %s", job_id)
217
      return queue.CancelJob(job_id)
218

    
219
    elif method == luxi.REQ_ARCHIVE_JOB:
220
      (job_id, ) = args
221
      logging.info("Received job archive request for %s", job_id)
222
      return queue.ArchiveJob(job_id)
223

    
224
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225
      (age, timeout) = args
226
      logging.info("Received job autoarchive request for age %s, timeout %s",
227
                   age, timeout)
228
      return queue.AutoArchiveJobs(age, timeout)
229

    
230
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
231
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
232
      logging.info("Received job poll request for %s", job_id)
233
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
234
                                     prev_log_serial, timeout)
235

    
236
    elif method == luxi.REQ_QUERY:
237
      (what, fields, qfilter) = args
238
      req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
239

    
240
      if req.what in constants.QR_VIA_OP:
241
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
242
                                             qfilter=req.qfilter))
243
      elif req.what == constants.QR_LOCK:
244
        if req.qfilter is not None:
245
          raise errors.OpPrereqError("Lock queries can't be filtered")
246
        return self.server.context.glm.QueryLocks(req.fields)
247
      elif req.what in constants.QR_VIA_LUXI:
248
        raise NotImplementedError
249
      else:
250
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
251
                                   errors.ECODE_INVAL)
252

    
253
      return result
254

    
255
    elif method == luxi.REQ_QUERY_FIELDS:
256
      (what, fields) = args
257
      req = objects.QueryFieldsRequest(what=what, fields=fields)
258

    
259
      try:
260
        fielddefs = query.ALL_FIELDS[req.what]
261
      except KeyError:
262
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
263
                                   errors.ECODE_INVAL)
264

    
265
      return query.QueryFields(fielddefs, req.fields)
266

    
267
    elif method == luxi.REQ_QUERY_JOBS:
268
      (job_ids, fields) = args
269
      if isinstance(job_ids, (tuple, list)) and job_ids:
270
        msg = utils.CommaJoin(job_ids)
271
      else:
272
        msg = str(job_ids)
273
      logging.info("Received job query request for %s", msg)
274
      return queue.QueryJobs(job_ids, fields)
275

    
276
    elif method == luxi.REQ_QUERY_INSTANCES:
277
      (names, fields, use_locking) = args
278
      logging.info("Received instance query request for %s", names)
279
      if use_locking:
280
        raise errors.OpPrereqError("Sync queries are not allowed",
281
                                   errors.ECODE_INVAL)
282
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
283
                                   use_locking=use_locking)
284
      return self._Query(op)
285

    
286
    elif method == luxi.REQ_QUERY_NODES:
287
      (names, fields, use_locking) = args
288
      logging.info("Received node query request for %s", names)
289
      if use_locking:
290
        raise errors.OpPrereqError("Sync queries are not allowed",
291
                                   errors.ECODE_INVAL)
292
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
293
                               use_locking=use_locking)
294
      return self._Query(op)
295

    
296
    elif method == luxi.REQ_QUERY_GROUPS:
297
      (names, fields, use_locking) = args
298
      logging.info("Received group query request for %s", names)
299
      if use_locking:
300
        raise errors.OpPrereqError("Sync queries are not allowed",
301
                                   errors.ECODE_INVAL)
302
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
303
      return self._Query(op)
304

    
305
    elif method == luxi.REQ_QUERY_EXPORTS:
306
      (nodes, use_locking) = args
307
      if use_locking:
308
        raise errors.OpPrereqError("Sync queries are not allowed",
309
                                   errors.ECODE_INVAL)
310
      logging.info("Received exports query request")
311
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
312
      return self._Query(op)
313

    
314
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
315
      (fields, ) = args
316
      logging.info("Received config values query request for %s", fields)
317
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
318
      return self._Query(op)
319

    
320
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
321
      logging.info("Received cluster info query request")
322
      op = opcodes.OpClusterQuery()
323
      return self._Query(op)
324

    
325
    elif method == luxi.REQ_QUERY_TAGS:
326
      (kind, name) = args
327
      logging.info("Received tags query request")
328
      op = opcodes.OpTagsGet(kind=kind, name=name)
329
      return self._Query(op)
330

    
331
    elif method == luxi.REQ_QUERY_LOCKS:
332
      (fields, sync) = args
333
      logging.info("Received locks query request")
334
      if sync:
335
        raise NotImplementedError("Synchronous queries are not implemented")
336
      return self.server.context.glm.OldStyleQueryLocks(fields)
337

    
338
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
339
      (drain_flag, ) = args
340
      logging.info("Received queue drain flag change request to %s",
341
                   drain_flag)
342
      return queue.SetDrainFlag(drain_flag)
343

    
344
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
345
      (until, ) = args
346

    
347
      if until is None:
348
        logging.info("Received request to no longer pause the watcher")
349
      else:
350
        if not isinstance(until, (int, float)):
351
          raise TypeError("Duration must be an integer or float")
352

    
353
        if until < time.time():
354
          raise errors.GenericError("Unable to set pause end time in the past")
355

    
356
        logging.info("Received request to pause the watcher until %s", until)
357

    
358
      return _SetWatcherPause(until)
359

    
360
    else:
361
      logging.info("Received invalid request '%s'", method)
362
      raise ValueError("Invalid operation '%s'" % method)
363

    
364
  def _Query(self, op):
365
    """Runs the specified opcode and returns the result.
366

367
    """
368
    # Queries don't have a job id
369
    proc = mcpu.Processor(self.server.context, None)
370

    
371
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
372
    # Consider using a timeout for retries.
373
    return proc.ExecOpCode(op, None)
374

    
375

    
376
class GanetiContext(object):
377
  """Context common to all ganeti threads.
378

379
  This class creates and holds common objects shared by all threads.
380

381
  """
382
  # pylint: disable=W0212
383
  # we do want to ensure a singleton here
384
  _instance = None
385

    
386
  def __init__(self):
387
    """Constructs a new GanetiContext object.
388

389
    There should be only a GanetiContext object at any time, so this
390
    function raises an error if this is not the case.
391

392
    """
393
    assert self.__class__._instance is None, "double GanetiContext instance"
394

    
395
    # Create global configuration object
396
    self.cfg = config.ConfigWriter()
397

    
398
    # Locking manager
399
    self.glm = locking.GanetiLockManager(
400
                self.cfg.GetNodeList(),
401
                self.cfg.GetNodeGroupList(),
402
                self.cfg.GetInstanceList())
403

    
404
    self.cfg.SetContext(self)
405

    
406
    # Job queue
407
    self.jobqueue = jqueue.JobQueue(self)
408

    
409
    # RPC runner
410
    self.rpc = rpc.RpcRunner(self)
411

    
412
    # setting this also locks the class against attribute modifications
413
    self.__class__._instance = self
414

    
415
  def __setattr__(self, name, value):
416
    """Setting GanetiContext attributes is forbidden after initialization.
417

418
    """
419
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
420
    object.__setattr__(self, name, value)
421

    
422
  def AddNode(self, node, ec_id):
423
    """Adds a node to the configuration and lock manager.
424

425
    """
426
    # Add it to the configuration
427
    self.cfg.AddNode(node, ec_id)
428

    
429
    # If preseeding fails it'll not be added
430
    self.jobqueue.AddNode(node)
431

    
432
    # Add the new node to the Ganeti Lock Manager
433
    self.glm.add(locking.LEVEL_NODE, node.name)
434
    self.glm.add(locking.LEVEL_NODE_RES, node.name)
435

    
436
  def ReaddNode(self, node):
437
    """Updates a node that's already in the configuration
438

439
    """
440
    # Synchronize the queue again
441
    self.jobqueue.AddNode(node)
442

    
443
  def RemoveNode(self, name):
444
    """Removes a node from the configuration and lock manager.
445

446
    """
447
    # Remove node from configuration
448
    self.cfg.RemoveNode(name)
449

    
450
    # Notify job queue
451
    self.jobqueue.RemoveNode(name)
452

    
453
    # Remove the node from the Ganeti Lock Manager
454
    self.glm.remove(locking.LEVEL_NODE, name)
455
    self.glm.remove(locking.LEVEL_NODE_RES, name)
456

    
457

    
458
def _SetWatcherPause(until):
459
  """Creates or removes the watcher pause file.
460

461
  @type until: None or int
462
  @param until: Unix timestamp saying until when the watcher shouldn't run
463

464
  """
465
  if until is None:
466
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
467
  else:
468
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
469
                    data="%d\n" % (until, ))
470

    
471
  return until
472

    
473

    
474
@rpc.RunWithRPC
475
def CheckAgreement():
476
  """Check the agreement on who is the master.
477

478
  The function uses a very simple algorithm: we must get more positive
479
  than negative answers. Since in most of the cases we are the master,
480
  we'll use our own config file for getting the node list. In the
481
  future we could collect the current node list from our (possibly
482
  obsolete) known nodes.
483

484
  In order to account for cold-start of all nodes, we retry for up to
485
  a minute until we get a real answer as the top-voted one. If the
486
  nodes are more out-of-sync, for now manual startup of the master
487
  should be attempted.
488

489
  Note that for a even number of nodes cluster, we need at least half
490
  of the nodes (beside ourselves) to vote for us. This creates a
491
  problem on two-node clusters, since in this case we require the
492
  other node to be up too to confirm our status.
493

494
  """
495
  myself = netutils.Hostname.GetSysName()
496
  #temp instantiation of a config writer, used only to get the node list
497
  cfg = config.ConfigWriter()
498
  node_list = cfg.GetNodeList()
499
  del cfg
500
  retries = 6
501
  while retries > 0:
502
    votes = bootstrap.GatherMasterVotes(node_list)
503
    if not votes:
504
      # empty node list, this is a one node cluster
505
      return True
506
    if votes[0][0] is None:
507
      retries -= 1
508
      time.sleep(10)
509
      continue
510
    break
511
  if retries == 0:
512
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
513
                     " after multiple retries. Aborting startup")
514
    logging.critical("Use the --no-voting option if you understand what"
515
                     " effects it has on the cluster state")
516
    return False
517
  # here a real node is at the top of the list
518
  all_votes = sum(item[1] for item in votes)
519
  top_node, top_votes = votes[0]
520

    
521
  result = False
522
  if top_node != myself:
523
    logging.critical("It seems we are not the master (top-voted node"
524
                     " is %s with %d out of %d votes)", top_node, top_votes,
525
                     all_votes)
526
  elif top_votes < all_votes - top_votes:
527
    logging.critical("It seems we are not the master (%d votes for,"
528
                     " %d votes against)", top_votes, all_votes - top_votes)
529
  else:
530
    result = True
531

    
532
  return result
533

    
534

    
535
@rpc.RunWithRPC
536
def ActivateMasterIP():
537
  # activate ip
538
  cfg = config.ConfigWriter()
539
  master_params = cfg.GetMasterNetworkParameters()
540
  runner = rpc.BootstrapRunner()
541
  result = runner.call_node_activate_master_ip(master_params.name,
542
                                               master_params)
543

    
544
  msg = result.fail_msg
545
  if msg:
546
    logging.error("Can't activate master IP address: %s", msg)
547

    
548

    
549
def CheckMasterd(options, args):
550
  """Initial checks whether to run or exit with a failure.
551

552
  """
553
  if args: # masterd doesn't take any arguments
554
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
555
    sys.exit(constants.EXIT_FAILURE)
556

    
557
  ssconf.CheckMaster(options.debug)
558

    
559
  try:
560
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
561
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
562
  except KeyError:
563
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
564
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
565
    sys.exit(constants.EXIT_FAILURE)
566

    
567
  # Check the configuration is sane before anything else
568
  try:
569
    config.ConfigWriter()
570
  except errors.ConfigVersionMismatch, err:
571
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
572
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
573
    print >> sys.stderr,  \
574
        ("Configuration version mismatch. The current Ganeti software"
575
         " expects version %s, but the on-disk configuration file has"
576
         " version %s. This is likely the result of upgrading the"
577
         " software without running the upgrade procedure. Please contact"
578
         " your cluster administrator or complete the upgrade using the"
579
         " cfgupgrade utility, after reading the upgrade notes." %
580
         (v1, v2))
581
    sys.exit(constants.EXIT_FAILURE)
582
  except errors.ConfigurationError, err:
583
    print >> sys.stderr, \
584
        ("Configuration error while opening the configuration file: %s\n"
585
         "This might be caused by an incomplete software upgrade or"
586
         " by a corrupted configuration file. Until the problem is fixed"
587
         " the master daemon cannot start." % str(err))
588
    sys.exit(constants.EXIT_FAILURE)
589

    
590
  # If CheckMaster didn't fail we believe we are the master, but we have to
591
  # confirm with the other nodes.
592
  if options.no_voting:
593
    if not options.yes_do_it:
594
      sys.stdout.write("The 'no voting' option has been selected.\n")
595
      sys.stdout.write("This is dangerous, please confirm by"
596
                       " typing uppercase 'yes': ")
597
      sys.stdout.flush()
598

    
599
      confirmation = sys.stdin.readline().strip()
600
      if confirmation != "YES":
601
        print >> sys.stderr, "Aborting."
602
        sys.exit(constants.EXIT_FAILURE)
603

    
604
  else:
605
    # CheckAgreement uses RPC and threads, hence it needs to be run in
606
    # a separate process before we call utils.Daemonize in the current
607
    # process.
608
    if not utils.RunInSeparateProcess(CheckAgreement):
609
      sys.exit(constants.EXIT_FAILURE)
610

    
611
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
612
  # separate process.
613

    
614
  # TODO: decide whether failure to activate the master IP is a fatal error
615
  utils.RunInSeparateProcess(ActivateMasterIP)
616

    
617

    
618
def PrepMasterd(options, _):
619
  """Prep master daemon function, executed with the PID file held.
620

621
  """
622
  # This is safe to do as the pid file guarantees against
623
  # concurrent execution.
624
  utils.RemoveFile(constants.MASTER_SOCKET)
625

    
626
  mainloop = daemon.Mainloop()
627
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
628
                        options.uid, options.gid)
629
  return (mainloop, master)
630

    
631

    
632
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
633
  """Main master daemon function, executed with the PID file held.
634

635
  """
636
  (mainloop, master) = prep_data
637
  try:
638
    rpc.Init()
639
    try:
640
      master.setup_queue()
641
      try:
642
        mainloop.Run()
643
      finally:
644
        master.server_cleanup()
645
    finally:
646
      rpc.Shutdown()
647
  finally:
648
    utils.RemoveFile(constants.MASTER_SOCKET)
649

    
650

    
651
def Main():
652
  """Main function"""
653
  parser = OptionParser(description="Ganeti master daemon",
654
                        usage="%prog [-f] [-d]",
655
                        version="%%prog (ganeti) %s" %
656
                        constants.RELEASE_VERSION)
657
  parser.add_option("--no-voting", dest="no_voting",
658
                    help="Do not check that the nodes agree on this node"
659
                    " being the master and start the daemon unconditionally",
660
                    default=False, action="store_true")
661
  parser.add_option("--yes-do-it", dest="yes_do_it",
662
                    help="Override interactive check for --no-voting",
663
                    default=False, action="store_true")
664
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
665
                     ExecMasterd, multithreaded=True)