Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 2e5c33db

History | View | Annotate | Download (21.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60

    
61

    
62
CLIENT_REQUEST_WORKERS = 16
63

    
64
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
65
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
66

    
67

    
68
class ClientRequestWorker(workerpool.BaseWorker):
69
  # pylint: disable=W0221
70
  def RunTask(self, server, message, client):
71
    """Process the request.
72

73
    """
74
    client_ops = ClientOps(server)
75

    
76
    try:
77
      (method, args, version) = luxi.ParseRequest(message)
78
    except luxi.ProtocolError, err:
79
      logging.error("Protocol Error: %s", err)
80
      client.close_log()
81
      return
82

    
83
    success = False
84
    try:
85
      # Verify client's version if there was one in the request
86
      if version is not None and version != constants.LUXI_VERSION:
87
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
88
                               (constants.LUXI_VERSION, version))
89

    
90
      result = client_ops.handle_request(method, args)
91
      success = True
92
    except errors.GenericError, err:
93
      logging.exception("Unexpected exception")
94
      success = False
95
      result = errors.EncodeException(err)
96
    except:
97
      logging.exception("Unexpected exception")
98
      err = sys.exc_info()
99
      result = "Caught exception: %s" % str(err[1])
100

    
101
    try:
102
      reply = luxi.FormatResponse(success, result)
103
      client.send_message(reply)
104
      # awake the main thread so that it can write out the data.
105
      server.awaker.signal()
106
    except: # pylint: disable=W0702
107
      logging.exception("Send error")
108
      client.close_log()
109

    
110

    
111
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
112
  """Handler for master peers.
113

114
  """
115
  _MAX_UNHANDLED = 1
116

    
117
  def __init__(self, server, connected_socket, client_address, family):
118
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
119
                                                 client_address,
120
                                                 constants.LUXI_EOM,
121
                                                 family, self._MAX_UNHANDLED)
122
    self.server = server
123

    
124
  def handle_message(self, message, _):
125
    self.server.request_workers.AddTask((self.server, message, self))
126

    
127

    
128
class MasterServer(daemon.AsyncStreamServer):
129
  """Master Server.
130

131
  This is the main asynchronous master server. It handles connections to the
132
  master socket.
133

134
  """
135
  family = socket.AF_UNIX
136

    
137
  def __init__(self, mainloop, address, uid, gid):
138
    """MasterServer constructor
139

140
    @type mainloop: ganeti.daemon.Mainloop
141
    @param mainloop: Mainloop used to poll for I/O events
142
    @param address: the unix socket address to bind the MasterServer to
143
    @param uid: The uid of the owner of the socket
144
    @param gid: The gid of the owner of the socket
145

146
    """
147
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
148
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
149
    os.chmod(temp_name, 0770)
150
    os.chown(temp_name, uid, gid)
151
    os.rename(temp_name, address)
152

    
153
    self.mainloop = mainloop
154
    self.awaker = daemon.AsyncAwaker()
155

    
156
    # We'll only start threads once we've forked.
157
    self.context = None
158
    self.request_workers = None
159

    
160
  def handle_connection(self, connected_socket, client_address):
161
    # TODO: add connection count and limit the number of open connections to a
162
    # maximum number to avoid breaking for lack of file descriptors or memory.
163
    MasterClientHandler(self, connected_socket, client_address, self.family)
164

    
165
  def setup_queue(self):
166
    self.context = GanetiContext()
167
    self.request_workers = workerpool.WorkerPool("ClientReq",
168
                                                 CLIENT_REQUEST_WORKERS,
169
                                                 ClientRequestWorker)
170

    
171
  def server_cleanup(self):
172
    """Cleanup the server.
173

174
    This involves shutting down the processor threads and the master
175
    socket.
176

177
    """
178
    try:
179
      self.close()
180
    finally:
181
      if self.request_workers:
182
        self.request_workers.TerminateWorkers()
183
      if self.context:
184
        self.context.jobqueue.Shutdown()
185

    
186

    
187
class ClientOps:
188
  """Class holding high-level client operations."""
189
  def __init__(self, server):
190
    self.server = server
191

    
192
  def handle_request(self, method, args): # pylint: disable=R0911
193
    queue = self.server.context.jobqueue
194

    
195
    # TODO: Parameter validation
196

    
197
    # TODO: Rewrite to not exit in each 'if/elif' branch
198

    
199
    if method == luxi.REQ_SUBMIT_JOB:
200
      logging.info("Received new job")
201
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
202
      return queue.SubmitJob(ops)
203

    
204
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
205
      logging.info("Received multiple jobs")
206
      jobs = []
207
      for ops in args:
208
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
209
      return queue.SubmitManyJobs(jobs)
210

    
211
    elif method == luxi.REQ_CANCEL_JOB:
212
      job_id = args
213
      logging.info("Received job cancel request for %s", job_id)
214
      return queue.CancelJob(job_id)
215

    
216
    elif method == luxi.REQ_ARCHIVE_JOB:
217
      job_id = args
218
      logging.info("Received job archive request for %s", job_id)
219
      return queue.ArchiveJob(job_id)
220

    
221
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
222
      (age, timeout) = args
223
      logging.info("Received job autoarchive request for age %s, timeout %s",
224
                   age, timeout)
225
      return queue.AutoArchiveJobs(age, timeout)
226

    
227
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229
      logging.info("Received job poll request for %s", job_id)
230
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
231
                                     prev_log_serial, timeout)
232

    
233
    elif method == luxi.REQ_QUERY:
234
      req = objects.QueryRequest.FromDict(args)
235

    
236
      if req.what in constants.QR_VIA_OP:
237
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
238
                                             qfilter=req.qfilter))
239
      elif req.what == constants.QR_LOCK:
240
        if req.qfilter is not None:
241
          raise errors.OpPrereqError("Lock queries can't be filtered")
242
        return self.server.context.glm.QueryLocks(req.fields)
243
      elif req.what in constants.QR_VIA_LUXI:
244
        raise NotImplementedError
245
      else:
246
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
247
                                   errors.ECODE_INVAL)
248

    
249
      return result
250

    
251
    elif method == luxi.REQ_QUERY_FIELDS:
252
      req = objects.QueryFieldsRequest.FromDict(args)
253

    
254
      try:
255
        fielddefs = query.ALL_FIELDS[req.what]
256
      except KeyError:
257
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
258
                                   errors.ECODE_INVAL)
259

    
260
      return query.QueryFields(fielddefs, req.fields)
261

    
262
    elif method == luxi.REQ_QUERY_JOBS:
263
      (job_ids, fields) = args
264
      if isinstance(job_ids, (tuple, list)) and job_ids:
265
        msg = utils.CommaJoin(job_ids)
266
      else:
267
        msg = str(job_ids)
268
      logging.info("Received job query request for %s", msg)
269
      return queue.QueryJobs(job_ids, fields)
270

    
271
    elif method == luxi.REQ_QUERY_INSTANCES:
272
      (names, fields, use_locking) = args
273
      logging.info("Received instance query request for %s", names)
274
      if use_locking:
275
        raise errors.OpPrereqError("Sync queries are not allowed",
276
                                   errors.ECODE_INVAL)
277
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
278
                                   use_locking=use_locking)
279
      return self._Query(op)
280

    
281
    elif method == luxi.REQ_QUERY_NODES:
282
      (names, fields, use_locking) = args
283
      logging.info("Received node query request for %s", names)
284
      if use_locking:
285
        raise errors.OpPrereqError("Sync queries are not allowed",
286
                                   errors.ECODE_INVAL)
287
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
288
                               use_locking=use_locking)
289
      return self._Query(op)
290

    
291
    elif method == luxi.REQ_QUERY_GROUPS:
292
      (names, fields, use_locking) = args
293
      logging.info("Received group query request for %s", names)
294
      if use_locking:
295
        raise errors.OpPrereqError("Sync queries are not allowed",
296
                                   errors.ECODE_INVAL)
297
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
298
      return self._Query(op)
299

    
300
    elif method == luxi.REQ_QUERY_EXPORTS:
301
      nodes, use_locking = args
302
      if use_locking:
303
        raise errors.OpPrereqError("Sync queries are not allowed",
304
                                   errors.ECODE_INVAL)
305
      logging.info("Received exports query request")
306
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
307
      return self._Query(op)
308

    
309
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
310
      fields = args
311
      logging.info("Received config values query request for %s", fields)
312
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
313
      return self._Query(op)
314

    
315
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
316
      logging.info("Received cluster info query request")
317
      op = opcodes.OpClusterQuery()
318
      return self._Query(op)
319

    
320
    elif method == luxi.REQ_QUERY_TAGS:
321
      kind, name = args
322
      logging.info("Received tags query request")
323
      op = opcodes.OpTagsGet(kind=kind, name=name)
324
      return self._Query(op)
325

    
326
    elif method == luxi.REQ_QUERY_LOCKS:
327
      (fields, sync) = args
328
      logging.info("Received locks query request")
329
      if sync:
330
        raise NotImplementedError("Synchronous queries are not implemented")
331
      return self.server.context.glm.OldStyleQueryLocks(fields)
332

    
333
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
334
      drain_flag = args
335
      logging.info("Received queue drain flag change request to %s",
336
                   drain_flag)
337
      return queue.SetDrainFlag(drain_flag)
338

    
339
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
340
      (until, ) = args
341

    
342
      if until is None:
343
        logging.info("Received request to no longer pause the watcher")
344
      else:
345
        if not isinstance(until, (int, float)):
346
          raise TypeError("Duration must be an integer or float")
347

    
348
        if until < time.time():
349
          raise errors.GenericError("Unable to set pause end time in the past")
350

    
351
        logging.info("Received request to pause the watcher until %s", until)
352

    
353
      return _SetWatcherPause(until)
354

    
355
    else:
356
      logging.info("Received invalid request '%s'", method)
357
      raise ValueError("Invalid operation '%s'" % method)
358

    
359
  def _Query(self, op):
360
    """Runs the specified opcode and returns the result.
361

362
    """
363
    # Queries don't have a job id
364
    proc = mcpu.Processor(self.server.context, None)
365

    
366
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
367
    # Consider using a timeout for retries.
368
    return proc.ExecOpCode(op, None)
369

    
370

    
371
class GanetiContext(object):
372
  """Context common to all ganeti threads.
373

374
  This class creates and holds common objects shared by all threads.
375

376
  """
377
  # pylint: disable=W0212
378
  # we do want to ensure a singleton here
379
  _instance = None
380

    
381
  def __init__(self):
382
    """Constructs a new GanetiContext object.
383

384
    There should be only a GanetiContext object at any time, so this
385
    function raises an error if this is not the case.
386

387
    """
388
    assert self.__class__._instance is None, "double GanetiContext instance"
389

    
390
    # Create global configuration object
391
    self.cfg = config.ConfigWriter()
392

    
393
    # Locking manager
394
    self.glm = locking.GanetiLockManager(
395
                self.cfg.GetNodeList(),
396
                self.cfg.GetNodeGroupList(),
397
                self.cfg.GetInstanceList())
398

    
399
    # Job queue
400
    self.jobqueue = jqueue.JobQueue(self)
401

    
402
    # RPC runner
403
    self.rpc = rpc.RpcRunner(self)
404

    
405
    # setting this also locks the class against attribute modifications
406
    self.__class__._instance = self
407

    
408
  def __setattr__(self, name, value):
409
    """Setting GanetiContext attributes is forbidden after initialization.
410

411
    """
412
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
413
    object.__setattr__(self, name, value)
414

    
415
  def AddNode(self, node, ec_id):
416
    """Adds a node to the configuration and lock manager.
417

418
    """
419
    # Add it to the configuration
420
    self.cfg.AddNode(node, ec_id)
421

    
422
    # If preseeding fails it'll not be added
423
    self.jobqueue.AddNode(node)
424

    
425
    # Add the new node to the Ganeti Lock Manager
426
    self.glm.add(locking.LEVEL_NODE, node.name)
427

    
428
  def ReaddNode(self, node):
429
    """Updates a node that's already in the configuration
430

431
    """
432
    # Synchronize the queue again
433
    self.jobqueue.AddNode(node)
434

    
435
  def RemoveNode(self, name):
436
    """Removes a node from the configuration and lock manager.
437

438
    """
439
    # Remove node from configuration
440
    self.cfg.RemoveNode(name)
441

    
442
    # Notify job queue
443
    self.jobqueue.RemoveNode(name)
444

    
445
    # Remove the node from the Ganeti Lock Manager
446
    self.glm.remove(locking.LEVEL_NODE, name)
447

    
448

    
449
def _SetWatcherPause(until):
450
  """Creates or removes the watcher pause file.
451

452
  @type until: None or int
453
  @param until: Unix timestamp saying until when the watcher shouldn't run
454

455
  """
456
  if until is None:
457
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
458
  else:
459
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
460
                    data="%d\n" % (until, ))
461

    
462
  return until
463

    
464

    
465
@rpc.RunWithRPC
466
def CheckAgreement():
467
  """Check the agreement on who is the master.
468

469
  The function uses a very simple algorithm: we must get more positive
470
  than negative answers. Since in most of the cases we are the master,
471
  we'll use our own config file for getting the node list. In the
472
  future we could collect the current node list from our (possibly
473
  obsolete) known nodes.
474

475
  In order to account for cold-start of all nodes, we retry for up to
476
  a minute until we get a real answer as the top-voted one. If the
477
  nodes are more out-of-sync, for now manual startup of the master
478
  should be attempted.
479

480
  Note that for a even number of nodes cluster, we need at least half
481
  of the nodes (beside ourselves) to vote for us. This creates a
482
  problem on two-node clusters, since in this case we require the
483
  other node to be up too to confirm our status.
484

485
  """
486
  myself = netutils.Hostname.GetSysName()
487
  #temp instantiation of a config writer, used only to get the node list
488
  cfg = config.ConfigWriter()
489
  node_list = cfg.GetNodeList()
490
  del cfg
491
  retries = 6
492
  while retries > 0:
493
    votes = bootstrap.GatherMasterVotes(node_list)
494
    if not votes:
495
      # empty node list, this is a one node cluster
496
      return True
497
    if votes[0][0] is None:
498
      retries -= 1
499
      time.sleep(10)
500
      continue
501
    break
502
  if retries == 0:
503
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
504
                     " after multiple retries. Aborting startup")
505
    logging.critical("Use the --no-voting option if you understand what"
506
                     " effects it has on the cluster state")
507
    return False
508
  # here a real node is at the top of the list
509
  all_votes = sum(item[1] for item in votes)
510
  top_node, top_votes = votes[0]
511

    
512
  result = False
513
  if top_node != myself:
514
    logging.critical("It seems we are not the master (top-voted node"
515
                     " is %s with %d out of %d votes)", top_node, top_votes,
516
                     all_votes)
517
  elif top_votes < all_votes - top_votes:
518
    logging.critical("It seems we are not the master (%d votes for,"
519
                     " %d votes against)", top_votes, all_votes - top_votes)
520
  else:
521
    result = True
522

    
523
  return result
524

    
525

    
526
@rpc.RunWithRPC
527
def ActivateMasterIP():
528
  # activate ip
529
  master_node = ssconf.SimpleStore().GetMasterNode()
530
  result = rpc.RpcRunner.call_node_activate_master_ip(master_node)
531
  msg = result.fail_msg
532
  if msg:
533
    logging.error("Can't activate master IP address: %s", msg)
534

    
535

    
536
def CheckMasterd(options, args):
537
  """Initial checks whether to run or exit with a failure.
538

539
  """
540
  if args: # masterd doesn't take any arguments
541
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
542
    sys.exit(constants.EXIT_FAILURE)
543

    
544
  ssconf.CheckMaster(options.debug)
545

    
546
  try:
547
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
548
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
549
  except KeyError:
550
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
551
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
552
    sys.exit(constants.EXIT_FAILURE)
553

    
554
  # Check the configuration is sane before anything else
555
  try:
556
    config.ConfigWriter()
557
  except errors.ConfigVersionMismatch, err:
558
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
559
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
560
    print >> sys.stderr,  \
561
        ("Configuration version mismatch. The current Ganeti software"
562
         " expects version %s, but the on-disk configuration file has"
563
         " version %s. This is likely the result of upgrading the"
564
         " software without running the upgrade procedure. Please contact"
565
         " your cluster administrator or complete the upgrade using the"
566
         " cfgupgrade utility, after reading the upgrade notes." %
567
         (v1, v2))
568
    sys.exit(constants.EXIT_FAILURE)
569
  except errors.ConfigurationError, err:
570
    print >> sys.stderr, \
571
        ("Configuration error while opening the configuration file: %s\n"
572
         "This might be caused by an incomplete software upgrade or"
573
         " by a corrupted configuration file. Until the problem is fixed"
574
         " the master daemon cannot start." % str(err))
575
    sys.exit(constants.EXIT_FAILURE)
576

    
577
  # If CheckMaster didn't fail we believe we are the master, but we have to
578
  # confirm with the other nodes.
579
  if options.no_voting:
580
    if not options.yes_do_it:
581
      sys.stdout.write("The 'no voting' option has been selected.\n")
582
      sys.stdout.write("This is dangerous, please confirm by"
583
                       " typing uppercase 'yes': ")
584
      sys.stdout.flush()
585

    
586
      confirmation = sys.stdin.readline().strip()
587
      if confirmation != "YES":
588
        print >> sys.stderr, "Aborting."
589
        sys.exit(constants.EXIT_FAILURE)
590

    
591
  else:
592
    # CheckAgreement uses RPC and threads, hence it needs to be run in
593
    # a separate process before we call utils.Daemonize in the current
594
    # process.
595
    if not utils.RunInSeparateProcess(CheckAgreement):
596
      sys.exit(constants.EXIT_FAILURE)
597

    
598
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
599
  # separate process.
600

    
601
  # TODO: decide whether failure to activate the master IP is a fatal error
602
  utils.RunInSeparateProcess(ActivateMasterIP)
603

    
604

    
605
def PrepMasterd(options, _):
606
  """Prep master daemon function, executed with the PID file held.
607

608
  """
609
  # This is safe to do as the pid file guarantees against
610
  # concurrent execution.
611
  utils.RemoveFile(constants.MASTER_SOCKET)
612

    
613
  mainloop = daemon.Mainloop()
614
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
615
                        options.uid, options.gid)
616
  return (mainloop, master)
617

    
618

    
619
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
620
  """Main master daemon function, executed with the PID file held.
621

622
  """
623
  (mainloop, master) = prep_data
624
  try:
625
    rpc.Init()
626
    try:
627
      master.setup_queue()
628
      try:
629
        mainloop.Run()
630
      finally:
631
        master.server_cleanup()
632
    finally:
633
      rpc.Shutdown()
634
  finally:
635
    utils.RemoveFile(constants.MASTER_SOCKET)
636

    
637

    
638
def Main():
639
  """Main function"""
640
  parser = OptionParser(description="Ganeti master daemon",
641
                        usage="%prog [-f] [-d]",
642
                        version="%%prog (ganeti) %s" %
643
                        constants.RELEASE_VERSION)
644
  parser.add_option("--no-voting", dest="no_voting",
645
                    help="Do not check that the nodes agree on this node"
646
                    " being the master and start the daemon unconditionally",
647
                    default=False, action="store_true")
648
  parser.add_option("--yes-do-it", dest="yes_do_it",
649
                    help="Override interactive check for --no-voting",
650
                    default=False, action="store_true")
651
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
652
                     ExecMasterd, multithreaded=True)