Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ 28b71a76

History | View | Annotate | Download (21 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59

    
60

    
61
CLIENT_REQUEST_WORKERS = 16
62

    
63
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
64
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
65

    
66

    
67
class ClientRequestWorker(workerpool.BaseWorker):
68
  # pylint: disable-msg=W0221
69
  def RunTask(self, server, message, client):
70
    """Process the request.
71

72
    """
73
    client_ops = ClientOps(server)
74

    
75
    try:
76
      (method, args, version) = luxi.ParseRequest(message)
77
    except luxi.ProtocolError, err:
78
      logging.error("Protocol Error: %s", err)
79
      client.close_log()
80
      return
81

    
82
    success = False
83
    try:
84
      # Verify client's version if there was one in the request
85
      if version is not None and version != constants.LUXI_VERSION:
86
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
87
                               (constants.LUXI_VERSION, version))
88

    
89
      result = client_ops.handle_request(method, args)
90
      success = True
91
    except errors.GenericError, err:
92
      logging.exception("Unexpected exception")
93
      success = False
94
      result = errors.EncodeException(err)
95
    except:
96
      logging.exception("Unexpected exception")
97
      err = sys.exc_info()
98
      result = "Caught exception: %s" % str(err[1])
99

    
100
    try:
101
      reply = luxi.FormatResponse(success, result)
102
      client.send_message(reply)
103
      # awake the main thread so that it can write out the data.
104
      server.awaker.signal()
105
    except: # pylint: disable-msg=W0702
106
      logging.exception("Send error")
107
      client.close_log()
108

    
109

    
110
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
111
  """Handler for master peers.
112

113
  """
114
  _MAX_UNHANDLED = 1
115
  def __init__(self, server, connected_socket, client_address, family):
116
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
117
                                                 client_address,
118
                                                 constants.LUXI_EOM,
119
                                                 family, self._MAX_UNHANDLED)
120
    self.server = server
121

    
122
  def handle_message(self, message, _):
123
    self.server.request_workers.AddTask((self.server, message, self))
124

    
125

    
126
class MasterServer(daemon.AsyncStreamServer):
127
  """Master Server.
128

129
  This is the main asynchronous master server. It handles connections to the
130
  master socket.
131

132
  """
133
  family = socket.AF_UNIX
134

    
135
  def __init__(self, mainloop, address, uid, gid):
136
    """MasterServer constructor
137

138
    @type mainloop: ganeti.daemon.Mainloop
139
    @param mainloop: Mainloop used to poll for I/O events
140
    @param address: the unix socket address to bind the MasterServer to
141
    @param uid: The uid of the owner of the socket
142
    @param gid: The gid of the owner of the socket
143

144
    """
145
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
146
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
147
    os.chmod(temp_name, 0770)
148
    os.chown(temp_name, uid, gid)
149
    os.rename(temp_name, address)
150

    
151
    self.mainloop = mainloop
152
    self.awaker = daemon.AsyncAwaker()
153

    
154
    # We'll only start threads once we've forked.
155
    self.context = None
156
    self.request_workers = None
157

    
158
  def handle_connection(self, connected_socket, client_address):
159
    # TODO: add connection count and limit the number of open connections to a
160
    # maximum number to avoid breaking for lack of file descriptors or memory.
161
    MasterClientHandler(self, connected_socket, client_address, self.family)
162

    
163
  def setup_queue(self):
164
    self.context = GanetiContext()
165
    self.request_workers = workerpool.WorkerPool("ClientReq",
166
                                                 CLIENT_REQUEST_WORKERS,
167
                                                 ClientRequestWorker)
168

    
169
  def server_cleanup(self):
170
    """Cleanup the server.
171

172
    This involves shutting down the processor threads and the master
173
    socket.
174

175
    """
176
    try:
177
      self.close()
178
    finally:
179
      if self.request_workers:
180
        self.request_workers.TerminateWorkers()
181
      if self.context:
182
        self.context.jobqueue.Shutdown()
183

    
184

    
185
class ClientOps:
186
  """Class holding high-level client operations."""
187
  def __init__(self, server):
188
    self.server = server
189

    
190
  def handle_request(self, method, args): # pylint: disable-msg=R0911
191
    queue = self.server.context.jobqueue
192

    
193
    # TODO: Parameter validation
194

    
195
    # TODO: Rewrite to not exit in each 'if/elif' branch
196

    
197
    if method == luxi.REQ_SUBMIT_JOB:
198
      logging.info("Received new job")
199
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
200
      return queue.SubmitJob(ops)
201

    
202
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
203
      logging.info("Received multiple jobs")
204
      jobs = []
205
      for ops in args:
206
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
207
      return queue.SubmitManyJobs(jobs)
208

    
209
    elif method == luxi.REQ_CANCEL_JOB:
210
      job_id = args
211
      logging.info("Received job cancel request for %s", job_id)
212
      return queue.CancelJob(job_id)
213

    
214
    elif method == luxi.REQ_ARCHIVE_JOB:
215
      job_id = args
216
      logging.info("Received job archive request for %s", job_id)
217
      return queue.ArchiveJob(job_id)
218

    
219
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
220
      (age, timeout) = args
221
      logging.info("Received job autoarchive request for age %s, timeout %s",
222
                   age, timeout)
223
      return queue.AutoArchiveJobs(age, timeout)
224

    
225
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
226
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
227
      logging.info("Received job poll request for %s", job_id)
228
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
229
                                     prev_log_serial, timeout)
230

    
231
    elif method == luxi.REQ_QUERY:
232
      req = objects.QueryRequest.FromDict(args)
233

    
234
      if req.what in constants.QR_OP_QUERY:
235
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
236
                                             filter=req.filter))
237
      elif req.what in constants.QR_OP_LUXI:
238
        raise NotImplementedError
239
      else:
240
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
241
                                   errors.ECODE_INVAL)
242

    
243
      return result
244

    
245
    elif method == luxi.REQ_QUERY_FIELDS:
246
      req = objects.QueryFieldsRequest.FromDict(args)
247

    
248
      if req.what in constants.QR_OP_QUERY:
249
        result = self._Query(opcodes.OpQueryFields(what=req.what,
250
                                                   fields=req.fields))
251
      elif req.what in constants.QR_OP_LUXI:
252
        raise NotImplementedError
253
      else:
254
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
255
                                   errors.ECODE_INVAL)
256

    
257
      return result
258

    
259
    elif method == luxi.REQ_QUERY_JOBS:
260
      (job_ids, fields) = args
261
      if isinstance(job_ids, (tuple, list)) and job_ids:
262
        msg = utils.CommaJoin(job_ids)
263
      else:
264
        msg = str(job_ids)
265
      logging.info("Received job query request for %s", msg)
266
      return queue.QueryJobs(job_ids, fields)
267

    
268
    elif method == luxi.REQ_QUERY_INSTANCES:
269
      (names, fields, use_locking) = args
270
      logging.info("Received instance query request for %s", names)
271
      if use_locking:
272
        raise errors.OpPrereqError("Sync queries are not allowed",
273
                                   errors.ECODE_INVAL)
274
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
275
                                    use_locking=use_locking)
276
      return self._Query(op)
277

    
278
    elif method == luxi.REQ_QUERY_NODES:
279
      (names, fields, use_locking) = args
280
      logging.info("Received node query request for %s", names)
281
      if use_locking:
282
        raise errors.OpPrereqError("Sync queries are not allowed",
283
                                   errors.ECODE_INVAL)
284
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
285
                                use_locking=use_locking)
286
      return self._Query(op)
287

    
288
    elif method == luxi.REQ_QUERY_GROUPS:
289
      (names, fields, use_locking) = args
290
      logging.info("Received group query request for %s", names)
291
      if use_locking:
292
        raise errors.OpPrereqError("Sync queries are not allowed",
293
                                   errors.ECODE_INVAL)
294
      op = opcodes.OpQueryGroups(names=names, output_fields=fields)
295
      return self._Query(op)
296

    
297
    elif method == luxi.REQ_QUERY_EXPORTS:
298
      nodes, use_locking = args
299
      if use_locking:
300
        raise errors.OpPrereqError("Sync queries are not allowed",
301
                                   errors.ECODE_INVAL)
302
      logging.info("Received exports query request")
303
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
304
      return self._Query(op)
305

    
306
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
307
      fields = args
308
      logging.info("Received config values query request for %s", fields)
309
      op = opcodes.OpQueryConfigValues(output_fields=fields)
310
      return self._Query(op)
311

    
312
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
313
      logging.info("Received cluster info query request")
314
      op = opcodes.OpQueryClusterInfo()
315
      return self._Query(op)
316

    
317
    elif method == luxi.REQ_QUERY_TAGS:
318
      kind, name = args
319
      logging.info("Received tags query request")
320
      op = opcodes.OpGetTags(kind=kind, name=name)
321
      return self._Query(op)
322

    
323
    elif method == luxi.REQ_QUERY_LOCKS:
324
      (fields, sync) = args
325
      logging.info("Received locks query request")
326
      return self.server.context.glm.QueryLocks(fields, sync)
327

    
328
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
329
      drain_flag = args
330
      logging.info("Received queue drain flag change request to %s",
331
                   drain_flag)
332
      return queue.SetDrainFlag(drain_flag)
333

    
334
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
335
      (until, ) = args
336

    
337
      if until is None:
338
        logging.info("Received request to no longer pause the watcher")
339
      else:
340
        if not isinstance(until, (int, float)):
341
          raise TypeError("Duration must be an integer or float")
342

    
343
        if until < time.time():
344
          raise errors.GenericError("Unable to set pause end time in the past")
345

    
346
        logging.info("Received request to pause the watcher until %s", until)
347

    
348
      return _SetWatcherPause(until)
349

    
350
    else:
351
      logging.info("Received invalid request '%s'", method)
352
      raise ValueError("Invalid operation '%s'" % method)
353

    
354
  def _Query(self, op):
355
    """Runs the specified opcode and returns the result.
356

357
    """
358
    # Queries don't have a job id
359
    proc = mcpu.Processor(self.server.context, None)
360

    
361
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
362
    # Consider using a timeout for retries.
363
    return proc.ExecOpCode(op, None)
364

    
365

    
366
class GanetiContext(object):
367
  """Context common to all ganeti threads.
368

369
  This class creates and holds common objects shared by all threads.
370

371
  """
372
  # pylint: disable-msg=W0212
373
  # we do want to ensure a singleton here
374
  _instance = None
375

    
376
  def __init__(self):
377
    """Constructs a new GanetiContext object.
378

379
    There should be only a GanetiContext object at any time, so this
380
    function raises an error if this is not the case.
381

382
    """
383
    assert self.__class__._instance is None, "double GanetiContext instance"
384

    
385
    # Create global configuration object
386
    self.cfg = config.ConfigWriter()
387

    
388
    # Locking manager
389
    self.glm = locking.GanetiLockManager(
390
                self.cfg.GetNodeList(),
391
                self.cfg.GetNodeGroupList(),
392
                self.cfg.GetInstanceList())
393

    
394
    # Job queue
395
    self.jobqueue = jqueue.JobQueue(self)
396

    
397
    # setting this also locks the class against attribute modifications
398
    self.__class__._instance = self
399

    
400
  def __setattr__(self, name, value):
401
    """Setting GanetiContext attributes is forbidden after initialization.
402

403
    """
404
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
405
    object.__setattr__(self, name, value)
406

    
407
  def AddNode(self, node, ec_id):
408
    """Adds a node to the configuration and lock manager.
409

410
    """
411
    # Add it to the configuration
412
    self.cfg.AddNode(node, ec_id)
413

    
414
    # If preseeding fails it'll not be added
415
    self.jobqueue.AddNode(node)
416

    
417
    # Add the new node to the Ganeti Lock Manager
418
    self.glm.add(locking.LEVEL_NODE, node.name)
419

    
420
  def ReaddNode(self, node):
421
    """Updates a node that's already in the configuration
422

423
    """
424
    # Synchronize the queue again
425
    self.jobqueue.AddNode(node)
426

    
427
  def RemoveNode(self, name):
428
    """Removes a node from the configuration and lock manager.
429

430
    """
431
    # Remove node from configuration
432
    self.cfg.RemoveNode(name)
433

    
434
    # Notify job queue
435
    self.jobqueue.RemoveNode(name)
436

    
437
    # Remove the node from the Ganeti Lock Manager
438
    self.glm.remove(locking.LEVEL_NODE, name)
439

    
440

    
441
def _SetWatcherPause(until):
442
  """Creates or removes the watcher pause file.
443

444
  @type until: None or int
445
  @param until: Unix timestamp saying until when the watcher shouldn't run
446

447
  """
448
  if until is None:
449
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
450
  else:
451
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
452
                    data="%d\n" % (until, ))
453

    
454
  return until
455

    
456

    
457
@rpc.RunWithRPC
458
def CheckAgreement():
459
  """Check the agreement on who is the master.
460

461
  The function uses a very simple algorithm: we must get more positive
462
  than negative answers. Since in most of the cases we are the master,
463
  we'll use our own config file for getting the node list. In the
464
  future we could collect the current node list from our (possibly
465
  obsolete) known nodes.
466

467
  In order to account for cold-start of all nodes, we retry for up to
468
  a minute until we get a real answer as the top-voted one. If the
469
  nodes are more out-of-sync, for now manual startup of the master
470
  should be attempted.
471

472
  Note that for a even number of nodes cluster, we need at least half
473
  of the nodes (beside ourselves) to vote for us. This creates a
474
  problem on two-node clusters, since in this case we require the
475
  other node to be up too to confirm our status.
476

477
  """
478
  myself = netutils.Hostname.GetSysName()
479
  #temp instantiation of a config writer, used only to get the node list
480
  cfg = config.ConfigWriter()
481
  node_list = cfg.GetNodeList()
482
  del cfg
483
  retries = 6
484
  while retries > 0:
485
    votes = bootstrap.GatherMasterVotes(node_list)
486
    if not votes:
487
      # empty node list, this is a one node cluster
488
      return True
489
    if votes[0][0] is None:
490
      retries -= 1
491
      time.sleep(10)
492
      continue
493
    break
494
  if retries == 0:
495
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
496
                     " after multiple retries. Aborting startup")
497
    logging.critical("Use the --no-voting option if you understand what"
498
                     " effects it has on the cluster state")
499
    return False
500
  # here a real node is at the top of the list
501
  all_votes = sum(item[1] for item in votes)
502
  top_node, top_votes = votes[0]
503

    
504
  result = False
505
  if top_node != myself:
506
    logging.critical("It seems we are not the master (top-voted node"
507
                     " is %s with %d out of %d votes)", top_node, top_votes,
508
                     all_votes)
509
  elif top_votes < all_votes - top_votes:
510
    logging.critical("It seems we are not the master (%d votes for,"
511
                     " %d votes against)", top_votes, all_votes - top_votes)
512
  else:
513
    result = True
514

    
515
  return result
516

    
517

    
518
@rpc.RunWithRPC
519
def ActivateMasterIP():
520
  # activate ip
521
  master_node = ssconf.SimpleStore().GetMasterNode()
522
  result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
523
  msg = result.fail_msg
524
  if msg:
525
    logging.error("Can't activate master IP address: %s", msg)
526

    
527

    
528
def CheckMasterd(options, args):
529
  """Initial checks whether to run or exit with a failure.
530

531
  """
532
  if args: # masterd doesn't take any arguments
533
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
534
    sys.exit(constants.EXIT_FAILURE)
535

    
536
  ssconf.CheckMaster(options.debug)
537

    
538
  try:
539
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
540
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
541
  except KeyError:
542
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
543
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
544
    sys.exit(constants.EXIT_FAILURE)
545

    
546
  # Check the configuration is sane before anything else
547
  try:
548
    config.ConfigWriter()
549
  except errors.ConfigVersionMismatch, err:
550
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
551
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
552
    print >> sys.stderr,  \
553
        ("Configuration version mismatch. The current Ganeti software"
554
         " expects version %s, but the on-disk configuration file has"
555
         " version %s. This is likely the result of upgrading the"
556
         " software without running the upgrade procedure. Please contact"
557
         " your cluster administrator or complete the upgrade using the"
558
         " cfgupgrade utility, after reading the upgrade notes." %
559
         (v1, v2))
560
    sys.exit(constants.EXIT_FAILURE)
561
  except errors.ConfigurationError, err:
562
    print >> sys.stderr, \
563
        ("Configuration error while opening the configuration file: %s\n"
564
         "This might be caused by an incomplete software upgrade or"
565
         " by a corrupted configuration file. Until the problem is fixed"
566
         " the master daemon cannot start." % str(err))
567
    sys.exit(constants.EXIT_FAILURE)
568

    
569
  # If CheckMaster didn't fail we believe we are the master, but we have to
570
  # confirm with the other nodes.
571
  if options.no_voting:
572
    if options.yes_do_it:
573
      return
574

    
575
    sys.stdout.write("The 'no voting' option has been selected.\n")
576
    sys.stdout.write("This is dangerous, please confirm by"
577
                     " typing uppercase 'yes': ")
578
    sys.stdout.flush()
579

    
580
    confirmation = sys.stdin.readline().strip()
581
    if confirmation != "YES":
582
      print >> sys.stderr, "Aborting."
583
      sys.exit(constants.EXIT_FAILURE)
584

    
585
    return
586

    
587
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
588
  # process before we call utils.Daemonize in the current process.
589
  if not utils.RunInSeparateProcess(CheckAgreement):
590
    sys.exit(constants.EXIT_FAILURE)
591

    
592
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
593
  # separate process.
594

    
595
  # TODO: decide whether failure to activate the master IP is a fatal error
596
  utils.RunInSeparateProcess(ActivateMasterIP)
597

    
598

    
599
def PrepMasterd(options, _):
600
  """Prep master daemon function, executed with the PID file held.
601

602
  """
603
  # This is safe to do as the pid file guarantees against
604
  # concurrent execution.
605
  utils.RemoveFile(constants.MASTER_SOCKET)
606

    
607
  mainloop = daemon.Mainloop()
608
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
609
                        options.uid, options.gid)
610
  return (mainloop, master)
611

    
612

    
613
def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
614
  """Main master daemon function, executed with the PID file held.
615

616
  """
617
  (mainloop, master) = prep_data
618
  try:
619
    rpc.Init()
620
    try:
621
      master.setup_queue()
622
      try:
623
        mainloop.Run()
624
      finally:
625
        master.server_cleanup()
626
    finally:
627
      rpc.Shutdown()
628
  finally:
629
    utils.RemoveFile(constants.MASTER_SOCKET)
630

    
631

    
632
def Main():
633
  """Main function"""
634
  parser = OptionParser(description="Ganeti master daemon",
635
                        usage="%prog [-f] [-d]",
636
                        version="%%prog (ganeti) %s" %
637
                        constants.RELEASE_VERSION)
638
  parser.add_option("--no-voting", dest="no_voting",
639
                    help="Do not check that the nodes agree on this node"
640
                    " being the master and start the daemon unconditionally",
641
                    default=False, action="store_true")
642
  parser.add_option("--yes-do-it", dest="yes_do_it",
643
                    help="Override interactive check for --no-voting",
644
                    default=False, action="store_true")
645
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
646
                     ExecMasterd, multithreaded=True)