Statistics
| Branch: | Tag: | Revision:

root / lib / server / masterd.py @ a20e4768

History | View | Annotate | Download (21.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

27
"""
28

    
29
# pylint: disable=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import time
38
import tempfile
39
import logging
40

    
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import daemon
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import netutils
58
from ganeti import objects
59
from ganeti import query
60
from ganeti import runtime
61

    
62

    
63
CLIENT_REQUEST_WORKERS = 16
64

    
65
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
66
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
67

    
68

    
69
class ClientRequestWorker(workerpool.BaseWorker):
70
  # pylint: disable=W0221
71
  def RunTask(self, server, message, client):
72
    """Process the request.
73

74
    """
75
    client_ops = ClientOps(server)
76

    
77
    try:
78
      (method, args, version) = luxi.ParseRequest(message)
79
    except luxi.ProtocolError, err:
80
      logging.error("Protocol Error: %s", err)
81
      client.close_log()
82
      return
83

    
84
    success = False
85
    try:
86
      # Verify client's version if there was one in the request
87
      if version is not None and version != constants.LUXI_VERSION:
88
        raise errors.LuxiError("LUXI version mismatch, server %s, request %s" %
89
                               (constants.LUXI_VERSION, version))
90

    
91
      result = client_ops.handle_request(method, args)
92
      success = True
93
    except errors.GenericError, err:
94
      logging.exception("Unexpected exception")
95
      success = False
96
      result = errors.EncodeException(err)
97
    except:
98
      logging.exception("Unexpected exception")
99
      err = sys.exc_info()
100
      result = "Caught exception: %s" % str(err[1])
101

    
102
    try:
103
      reply = luxi.FormatResponse(success, result)
104
      client.send_message(reply)
105
      # awake the main thread so that it can write out the data.
106
      server.awaker.signal()
107
    except: # pylint: disable=W0702
108
      logging.exception("Send error")
109
      client.close_log()
110

    
111

    
112
class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
113
  """Handler for master peers.
114

115
  """
116
  _MAX_UNHANDLED = 1
117

    
118
  def __init__(self, server, connected_socket, client_address, family):
119
    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
120
                                                 client_address,
121
                                                 constants.LUXI_EOM,
122
                                                 family, self._MAX_UNHANDLED)
123
    self.server = server
124

    
125
  def handle_message(self, message, _):
126
    self.server.request_workers.AddTask((self.server, message, self))
127

    
128

    
129
class MasterServer(daemon.AsyncStreamServer):
130
  """Master Server.
131

132
  This is the main asynchronous master server. It handles connections to the
133
  master socket.
134

135
  """
136
  family = socket.AF_UNIX
137

    
138
  def __init__(self, mainloop, address, uid, gid):
139
    """MasterServer constructor
140

141
    @type mainloop: ganeti.daemon.Mainloop
142
    @param mainloop: Mainloop used to poll for I/O events
143
    @param address: the unix socket address to bind the MasterServer to
144
    @param uid: The uid of the owner of the socket
145
    @param gid: The gid of the owner of the socket
146

147
    """
148
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
149
    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
150
    os.chmod(temp_name, 0770)
151
    os.chown(temp_name, uid, gid)
152
    os.rename(temp_name, address)
153

    
154
    self.mainloop = mainloop
155
    self.awaker = daemon.AsyncAwaker()
156

    
157
    # We'll only start threads once we've forked.
158
    self.context = None
159
    self.request_workers = None
160

    
161
  def handle_connection(self, connected_socket, client_address):
162
    # TODO: add connection count and limit the number of open connections to a
163
    # maximum number to avoid breaking for lack of file descriptors or memory.
164
    MasterClientHandler(self, connected_socket, client_address, self.family)
165

    
166
  def setup_queue(self):
167
    self.context = GanetiContext()
168
    self.request_workers = workerpool.WorkerPool("ClientReq",
169
                                                 CLIENT_REQUEST_WORKERS,
170
                                                 ClientRequestWorker)
171

    
172
  def server_cleanup(self):
173
    """Cleanup the server.
174

175
    This involves shutting down the processor threads and the master
176
    socket.
177

178
    """
179
    try:
180
      self.close()
181
    finally:
182
      if self.request_workers:
183
        self.request_workers.TerminateWorkers()
184
      if self.context:
185
        self.context.jobqueue.Shutdown()
186

    
187

    
188
class ClientOps:
189
  """Class holding high-level client operations."""
190
  def __init__(self, server):
191
    self.server = server
192

    
193
  def handle_request(self, method, args): # pylint: disable=R0911
194
    queue = self.server.context.jobqueue
195

    
196
    # TODO: Parameter validation
197

    
198
    # TODO: Rewrite to not exit in each 'if/elif' branch
199

    
200
    if method == luxi.REQ_SUBMIT_JOB:
201
      logging.info("Received new job")
202
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
203
      return queue.SubmitJob(ops)
204

    
205
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
206
      logging.info("Received multiple jobs")
207
      jobs = []
208
      for ops in args:
209
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
210
      return queue.SubmitManyJobs(jobs)
211

    
212
    elif method == luxi.REQ_CANCEL_JOB:
213
      job_id = args
214
      logging.info("Received job cancel request for %s", job_id)
215
      return queue.CancelJob(job_id)
216

    
217
    elif method == luxi.REQ_ARCHIVE_JOB:
218
      job_id = args
219
      logging.info("Received job archive request for %s", job_id)
220
      return queue.ArchiveJob(job_id)
221

    
222
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
223
      (age, timeout) = args
224
      logging.info("Received job autoarchive request for age %s, timeout %s",
225
                   age, timeout)
226
      return queue.AutoArchiveJobs(age, timeout)
227

    
228
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
229
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
230
      logging.info("Received job poll request for %s", job_id)
231
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
232
                                     prev_log_serial, timeout)
233

    
234
    elif method == luxi.REQ_QUERY:
235
      req = objects.QueryRequest.FromDict(args)
236

    
237
      if req.what in constants.QR_VIA_OP:
238
        result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
239
                                             filter=req.filter))
240
      elif req.what == constants.QR_LOCK:
241
        if req.filter is not None:
242
          raise errors.OpPrereqError("Lock queries can't be filtered")
243
        return self.server.context.glm.QueryLocks(req.fields)
244
      elif req.what in constants.QR_VIA_LUXI:
245
        raise NotImplementedError
246
      else:
247
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
248
                                   errors.ECODE_INVAL)
249

    
250
      return result
251

    
252
    elif method == luxi.REQ_QUERY_FIELDS:
253
      req = objects.QueryFieldsRequest.FromDict(args)
254

    
255
      try:
256
        fielddefs = query.ALL_FIELDS[req.what]
257
      except KeyError:
258
        raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
259
                                   errors.ECODE_INVAL)
260

    
261
      return query.QueryFields(fielddefs, req.fields)
262

    
263
    elif method == luxi.REQ_QUERY_JOBS:
264
      (job_ids, fields) = args
265
      if isinstance(job_ids, (tuple, list)) and job_ids:
266
        msg = utils.CommaJoin(job_ids)
267
      else:
268
        msg = str(job_ids)
269
      logging.info("Received job query request for %s", msg)
270
      return queue.QueryJobs(job_ids, fields)
271

    
272
    elif method == luxi.REQ_QUERY_INSTANCES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received instance query request for %s", names)
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed",
277
                                   errors.ECODE_INVAL)
278
      op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
279
                                   use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_NODES:
283
      (names, fields, use_locking) = args
284
      logging.info("Received node query request for %s", names)
285
      if use_locking:
286
        raise errors.OpPrereqError("Sync queries are not allowed",
287
                                   errors.ECODE_INVAL)
288
      op = opcodes.OpNodeQuery(names=names, output_fields=fields,
289
                               use_locking=use_locking)
290
      return self._Query(op)
291

    
292
    elif method == luxi.REQ_QUERY_GROUPS:
293
      (names, fields, use_locking) = args
294
      logging.info("Received group query request for %s", names)
295
      if use_locking:
296
        raise errors.OpPrereqError("Sync queries are not allowed",
297
                                   errors.ECODE_INVAL)
298
      op = opcodes.OpGroupQuery(names=names, output_fields=fields)
299
      return self._Query(op)
300

    
301
    elif method == luxi.REQ_QUERY_EXPORTS:
302
      nodes, use_locking = args
303
      if use_locking:
304
        raise errors.OpPrereqError("Sync queries are not allowed",
305
                                   errors.ECODE_INVAL)
306
      logging.info("Received exports query request")
307
      op = opcodes.OpBackupQuery(nodes=nodes, use_locking=use_locking)
308
      return self._Query(op)
309

    
310
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
311
      fields = args
312
      logging.info("Received config values query request for %s", fields)
313
      op = opcodes.OpClusterConfigQuery(output_fields=fields)
314
      return self._Query(op)
315

    
316
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
317
      logging.info("Received cluster info query request")
318
      op = opcodes.OpClusterQuery()
319
      return self._Query(op)
320

    
321
    elif method == luxi.REQ_QUERY_TAGS:
322
      kind, name = args
323
      logging.info("Received tags query request")
324
      op = opcodes.OpTagsGet(kind=kind, name=name)
325
      return self._Query(op)
326

    
327
    elif method == luxi.REQ_QUERY_LOCKS:
328
      (fields, sync) = args
329
      logging.info("Received locks query request")
330
      if sync:
331
        raise NotImplementedError("Synchronous queries are not implemented")
332
      return self.server.context.glm.OldStyleQueryLocks(fields)
333

    
334
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
335
      drain_flag = args
336
      logging.info("Received queue drain flag change request to %s",
337
                   drain_flag)
338
      return queue.SetDrainFlag(drain_flag)
339

    
340
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
341
      (until, ) = args
342

    
343
      if until is None:
344
        logging.info("Received request to no longer pause the watcher")
345
      else:
346
        if not isinstance(until, (int, float)):
347
          raise TypeError("Duration must be an integer or float")
348

    
349
        if until < time.time():
350
          raise errors.GenericError("Unable to set pause end time in the past")
351

    
352
        logging.info("Received request to pause the watcher until %s", until)
353

    
354
      return _SetWatcherPause(until)
355

    
356
    else:
357
      logging.info("Received invalid request '%s'", method)
358
      raise ValueError("Invalid operation '%s'" % method)
359

    
360
  def _Query(self, op):
361
    """Runs the specified opcode and returns the result.
362

363
    """
364
    # Queries don't have a job id
365
    proc = mcpu.Processor(self.server.context, None)
366

    
367
    # TODO: Executing an opcode using locks will acquire them in blocking mode.
368
    # Consider using a timeout for retries.
369
    return proc.ExecOpCode(op, None)
370

    
371

    
372
class GanetiContext(object):
373
  """Context common to all ganeti threads.
374

375
  This class creates and holds common objects shared by all threads.
376

377
  """
378
  # pylint: disable=W0212
379
  # we do want to ensure a singleton here
380
  _instance = None
381

    
382
  def __init__(self):
383
    """Constructs a new GanetiContext object.
384

385
    There should be only a GanetiContext object at any time, so this
386
    function raises an error if this is not the case.
387

388
    """
389
    assert self.__class__._instance is None, "double GanetiContext instance"
390

    
391
    # Create global configuration object
392
    self.cfg = config.ConfigWriter()
393

    
394
    # Locking manager
395
    self.glm = locking.GanetiLockManager(
396
                self.cfg.GetNodeList(),
397
                self.cfg.GetNodeGroupList(),
398
                self.cfg.GetInstanceList())
399

    
400
    # Job queue
401
    self.jobqueue = jqueue.JobQueue(self)
402

    
403
    # setting this also locks the class against attribute modifications
404
    self.__class__._instance = self
405

    
406
  def __setattr__(self, name, value):
407
    """Setting GanetiContext attributes is forbidden after initialization.
408

409
    """
410
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
411
    object.__setattr__(self, name, value)
412

    
413
  def AddNode(self, node, ec_id):
414
    """Adds a node to the configuration and lock manager.
415

416
    """
417
    # Add it to the configuration
418
    self.cfg.AddNode(node, ec_id)
419

    
420
    # If preseeding fails it'll not be added
421
    self.jobqueue.AddNode(node)
422

    
423
    # Add the new node to the Ganeti Lock Manager
424
    self.glm.add(locking.LEVEL_NODE, node.name)
425

    
426
  def ReaddNode(self, node):
427
    """Updates a node that's already in the configuration
428

429
    """
430
    # Synchronize the queue again
431
    self.jobqueue.AddNode(node)
432

    
433
  def RemoveNode(self, name):
434
    """Removes a node from the configuration and lock manager.
435

436
    """
437
    # Remove node from configuration
438
    self.cfg.RemoveNode(name)
439

    
440
    # Notify job queue
441
    self.jobqueue.RemoveNode(name)
442

    
443
    # Remove the node from the Ganeti Lock Manager
444
    self.glm.remove(locking.LEVEL_NODE, name)
445

    
446

    
447
def _SetWatcherPause(until):
448
  """Creates or removes the watcher pause file.
449

450
  @type until: None or int
451
  @param until: Unix timestamp saying until when the watcher shouldn't run
452

453
  """
454
  if until is None:
455
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
456
  else:
457
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
458
                    data="%d\n" % (until, ))
459

    
460
  return until
461

    
462

    
463
@rpc.RunWithRPC
464
def CheckAgreement():
465
  """Check the agreement on who is the master.
466

467
  The function uses a very simple algorithm: we must get more positive
468
  than negative answers. Since in most of the cases we are the master,
469
  we'll use our own config file for getting the node list. In the
470
  future we could collect the current node list from our (possibly
471
  obsolete) known nodes.
472

473
  In order to account for cold-start of all nodes, we retry for up to
474
  a minute until we get a real answer as the top-voted one. If the
475
  nodes are more out-of-sync, for now manual startup of the master
476
  should be attempted.
477

478
  Note that for a even number of nodes cluster, we need at least half
479
  of the nodes (beside ourselves) to vote for us. This creates a
480
  problem on two-node clusters, since in this case we require the
481
  other node to be up too to confirm our status.
482

483
  """
484
  myself = netutils.Hostname.GetSysName()
485
  #temp instantiation of a config writer, used only to get the node list
486
  cfg = config.ConfigWriter()
487
  node_list = cfg.GetNodeList()
488
  del cfg
489
  retries = 6
490
  while retries > 0:
491
    votes = bootstrap.GatherMasterVotes(node_list)
492
    if not votes:
493
      # empty node list, this is a one node cluster
494
      return True
495
    if votes[0][0] is None:
496
      retries -= 1
497
      time.sleep(10)
498
      continue
499
    break
500
  if retries == 0:
501
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
502
                     " after multiple retries. Aborting startup")
503
    logging.critical("Use the --no-voting option if you understand what"
504
                     " effects it has on the cluster state")
505
    return False
506
  # here a real node is at the top of the list
507
  all_votes = sum(item[1] for item in votes)
508
  top_node, top_votes = votes[0]
509

    
510
  result = False
511
  if top_node != myself:
512
    logging.critical("It seems we are not the master (top-voted node"
513
                     " is %s with %d out of %d votes)", top_node, top_votes,
514
                     all_votes)
515
  elif top_votes < all_votes - top_votes:
516
    logging.critical("It seems we are not the master (%d votes for,"
517
                     " %d votes against)", top_votes, all_votes - top_votes)
518
  else:
519
    result = True
520

    
521
  return result
522

    
523

    
524
@rpc.RunWithRPC
525
def ActivateMasterIP():
526
  # activate ip
527
  master_node = ssconf.SimpleStore().GetMasterNode()
528
  result = rpc.RpcRunner.call_node_activate_master_ip(master_node)
529
  msg = result.fail_msg
530
  if msg:
531
    logging.error("Can't activate master IP address: %s", msg)
532

    
533

    
534
def CheckMasterd(options, args):
535
  """Initial checks whether to run or exit with a failure.
536

537
  """
538
  if args: # masterd doesn't take any arguments
539
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
540
    sys.exit(constants.EXIT_FAILURE)
541

    
542
  ssconf.CheckMaster(options.debug)
543

    
544
  try:
545
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
546
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
547
  except KeyError:
548
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
549
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
550
    sys.exit(constants.EXIT_FAILURE)
551

    
552
  # Determine static runtime architecture information
553
  runtime.InitArchInfo()
554

    
555
  # Check the configuration is sane before anything else
556
  try:
557
    config.ConfigWriter()
558
  except errors.ConfigVersionMismatch, err:
559
    v1 = "%s.%s.%s" % constants.SplitVersion(err.args[0])
560
    v2 = "%s.%s.%s" % constants.SplitVersion(err.args[1])
561
    print >> sys.stderr,  \
562
        ("Configuration version mismatch. The current Ganeti software"
563
         " expects version %s, but the on-disk configuration file has"
564
         " version %s. This is likely the result of upgrading the"
565
         " software without running the upgrade procedure. Please contact"
566
         " your cluster administrator or complete the upgrade using the"
567
         " cfgupgrade utility, after reading the upgrade notes." %
568
         (v1, v2))
569
    sys.exit(constants.EXIT_FAILURE)
570
  except errors.ConfigurationError, err:
571
    print >> sys.stderr, \
572
        ("Configuration error while opening the configuration file: %s\n"
573
         "This might be caused by an incomplete software upgrade or"
574
         " by a corrupted configuration file. Until the problem is fixed"
575
         " the master daemon cannot start." % str(err))
576
    sys.exit(constants.EXIT_FAILURE)
577

    
578
  # If CheckMaster didn't fail we believe we are the master, but we have to
579
  # confirm with the other nodes.
580
  if options.no_voting:
581
    if not options.yes_do_it:
582
      sys.stdout.write("The 'no voting' option has been selected.\n")
583
      sys.stdout.write("This is dangerous, please confirm by"
584
                       " typing uppercase 'yes': ")
585
      sys.stdout.flush()
586

    
587
      confirmation = sys.stdin.readline().strip()
588
      if confirmation != "YES":
589
        print >> sys.stderr, "Aborting."
590
        sys.exit(constants.EXIT_FAILURE)
591

    
592
  else:
593
    # CheckAgreement uses RPC and threads, hence it needs to be run in
594
    # a separate process before we call utils.Daemonize in the current
595
    # process.
596
    if not utils.RunInSeparateProcess(CheckAgreement):
597
      sys.exit(constants.EXIT_FAILURE)
598

    
599
  # ActivateMasterIP also uses RPC/threads, so we run it again via a
600
  # separate process.
601

    
602
  # TODO: decide whether failure to activate the master IP is a fatal error
603
  utils.RunInSeparateProcess(ActivateMasterIP)
604

    
605

    
606
def PrepMasterd(options, _):
607
  """Prep master daemon function, executed with the PID file held.
608

609
  """
610
  # This is safe to do as the pid file guarantees against
611
  # concurrent execution.
612
  utils.RemoveFile(constants.MASTER_SOCKET)
613

    
614
  mainloop = daemon.Mainloop()
615
  master = MasterServer(mainloop, constants.MASTER_SOCKET,
616
                        options.uid, options.gid)
617
  return (mainloop, master)
618

    
619

    
620
def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
621
  """Main master daemon function, executed with the PID file held.
622

623
  """
624
  (mainloop, master) = prep_data
625
  try:
626
    rpc.Init()
627
    try:
628
      master.setup_queue()
629
      try:
630
        mainloop.Run()
631
      finally:
632
        master.server_cleanup()
633
    finally:
634
      rpc.Shutdown()
635
  finally:
636
    utils.RemoveFile(constants.MASTER_SOCKET)
637

    
638

    
639
def Main():
640
  """Main function"""
641
  parser = OptionParser(description="Ganeti master daemon",
642
                        usage="%prog [-f] [-d]",
643
                        version="%%prog (ganeti) %s" %
644
                        constants.RELEASE_VERSION)
645
  parser.add_option("--no-voting", dest="no_voting",
646
                    help="Do not check that the nodes agree on this node"
647
                    " being the master and start the daemon unconditionally",
648
                    default=False, action="store_true")
649
  parser.add_option("--yes-do-it", dest="yes_do_it",
650
                    help="Override interactive check for --no-voting",
651
                    default=False, action="store_true")
652
  daemon.GenericMain(constants.MASTERD, parser, CheckMasterd, PrepMasterd,
653
                     ExecMasterd, multithreaded=True)