Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ bbfd0568

History | View | Annotate | Download (17.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import grp
33
import os
34
import pwd
35
import sys
36
import socket
37
import SocketServer
38
import time
39
import tempfile
40
import collections
41
import logging
42

    
43
from optparse import OptionParser
44

    
45
from ganeti import config
46
from ganeti import constants
47
from ganeti import daemon
48
from ganeti import mcpu
49
from ganeti import opcodes
50
from ganeti import jqueue
51
from ganeti import locking
52
from ganeti import luxi
53
from ganeti import utils
54
from ganeti import errors
55
from ganeti import ssconf
56
from ganeti import workerpool
57
from ganeti import rpc
58
from ganeti import bootstrap
59

    
60

    
61
CLIENT_REQUEST_WORKERS = 16
62

    
63
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
64
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
65

    
66

    
67
class ClientRequestWorker(workerpool.BaseWorker):
68
   # pylint: disable-msg=W0221
69
  def RunTask(self, server, request, client_address):
70
    """Process the request.
71

    
72
    """
73
    try:
74
      server.request_handler_class(request, client_address, server)
75
    finally:
76
      request.close()
77

    
78

    
79
class MasterServer(daemon.AsyncStreamServer):
80
  """Master Server.
81

    
82
  This is the main asynchronous master server. It handles connections to the
83
  master socket.
84

    
85
  """
86
  def __init__(self, mainloop, address, handler_class, uid, gid):
87
    """MasterServer constructor
88

    
89
    @type mainloop: ganeti.daemon.Mainloop
90
    @param mainloop: Mainloop used to poll for I/O events
91
    @param address: the unix socket address to bind the MasterServer to
92
    @param handler_class: handler class for the connections
93
    @param uid: The uid of the owner of the socket
94
    @param gid: The gid of the owner of the socket
95

    
96
    """
97
    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
98
    daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, temp_name)
99
    os.chmod(temp_name, 0770)
100
    os.chown(temp_name, uid, gid)
101
    os.rename(temp_name, address)
102

    
103
    self.request_handler_class = handler_class
104
    self.mainloop = mainloop
105

    
106
    # We'll only start threads once we've forked.
107
    self.context = None
108
    self.request_workers = None
109

    
110
  def handle_connection(self, connected_socket, client_address):
111
    self.request_workers.AddTask(self, connected_socket, client_address)
112

    
113
  def setup_queue(self):
114
    self.context = GanetiContext()
115
    self.request_workers = workerpool.WorkerPool("ClientReq",
116
                                                 CLIENT_REQUEST_WORKERS,
117
                                                 ClientRequestWorker)
118

    
119
  def server_cleanup(self):
120
    """Cleanup the server.
121

    
122
    This involves shutting down the processor threads and the master
123
    socket.
124

    
125
    """
126
    try:
127
      self.close()
128
    finally:
129
      if self.request_workers:
130
        self.request_workers.TerminateWorkers()
131
      if self.context:
132
        self.context.jobqueue.Shutdown()
133

    
134

    
135
class ClientRqHandler(SocketServer.BaseRequestHandler):
136
  """Client handler"""
137
  READ_SIZE = 4096
138

    
139
  def setup(self):
140
    # pylint: disable-msg=W0201
141
    # setup() is the api for initialising for this class
142
    self._buffer = ""
143
    self._msgs = collections.deque()
144
    self._ops = ClientOps(self.server)
145

    
146
  def handle(self):
147
    while True:
148
      msg = self.read_message()
149
      if msg is None:
150
        logging.debug("client closed connection")
151
        break
152

    
153
      (method, args) = luxi.ParseRequest(msg)
154

    
155
      success = False
156
      try:
157
        result = self._ops.handle_request(method, args)
158
        success = True
159
      except errors.GenericError, err:
160
        logging.exception("Unexpected exception")
161
        result = errors.EncodeException(err)
162
      except:
163
        logging.exception("Unexpected exception")
164
        result = "Caught exception: %s" % str(sys.exc_info()[1])
165

    
166
      self.send_message(luxi.FormatResponse(success, result))
167

    
168
  def read_message(self):
169
    while not self._msgs:
170
      data = self.request.recv(self.READ_SIZE)
171
      if not data:
172
        return None
173
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
174
      self._buffer = new_msgs.pop()
175
      self._msgs.extend(new_msgs)
176
    return self._msgs.popleft()
177

    
178
  def send_message(self, msg):
179
    # TODO: sendall is not guaranteed to send everything
180
    self.request.sendall(msg + constants.LUXI_EOM)
181

    
182

    
183
class ClientOps:
184
  """Class holding high-level client operations."""
185
  def __init__(self, server):
186
    self.server = server
187

    
188
  def handle_request(self, method, args): # pylint: disable-msg=R0911
189
    queue = self.server.context.jobqueue
190

    
191
    # TODO: Parameter validation
192

    
193
    # TODO: Rewrite to not exit in each 'if/elif' branch
194

    
195
    if method == luxi.REQ_SUBMIT_JOB:
196
      logging.info("Received new job")
197
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
198
      return queue.SubmitJob(ops)
199

    
200
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
201
      logging.info("Received multiple jobs")
202
      jobs = []
203
      for ops in args:
204
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
205
      return queue.SubmitManyJobs(jobs)
206

    
207
    elif method == luxi.REQ_CANCEL_JOB:
208
      job_id = args
209
      logging.info("Received job cancel request for %s", job_id)
210
      return queue.CancelJob(job_id)
211

    
212
    elif method == luxi.REQ_ARCHIVE_JOB:
213
      job_id = args
214
      logging.info("Received job archive request for %s", job_id)
215
      return queue.ArchiveJob(job_id)
216

    
217
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
218
      (age, timeout) = args
219
      logging.info("Received job autoarchive request for age %s, timeout %s",
220
                   age, timeout)
221
      return queue.AutoArchiveJobs(age, timeout)
222

    
223
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
224
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
225
      logging.info("Received job poll request for %s", job_id)
226
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
227
                                     prev_log_serial, timeout)
228

    
229
    elif method == luxi.REQ_QUERY_JOBS:
230
      (job_ids, fields) = args
231
      if isinstance(job_ids, (tuple, list)) and job_ids:
232
        msg = utils.CommaJoin(job_ids)
233
      else:
234
        msg = str(job_ids)
235
      logging.info("Received job query request for %s", msg)
236
      return queue.QueryJobs(job_ids, fields)
237

    
238
    elif method == luxi.REQ_QUERY_INSTANCES:
239
      (names, fields, use_locking) = args
240
      logging.info("Received instance query request for %s", names)
241
      if use_locking:
242
        raise errors.OpPrereqError("Sync queries are not allowed",
243
                                   errors.ECODE_INVAL)
244
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
245
                                    use_locking=use_locking)
246
      return self._Query(op)
247

    
248
    elif method == luxi.REQ_QUERY_NODES:
249
      (names, fields, use_locking) = args
250
      logging.info("Received node query request for %s", names)
251
      if use_locking:
252
        raise errors.OpPrereqError("Sync queries are not allowed",
253
                                   errors.ECODE_INVAL)
254
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
255
                                use_locking=use_locking)
256
      return self._Query(op)
257

    
258
    elif method == luxi.REQ_QUERY_EXPORTS:
259
      nodes, use_locking = args
260
      if use_locking:
261
        raise errors.OpPrereqError("Sync queries are not allowed",
262
                                   errors.ECODE_INVAL)
263
      logging.info("Received exports query request")
264
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
265
      return self._Query(op)
266

    
267
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
268
      fields = args
269
      logging.info("Received config values query request for %s", fields)
270
      op = opcodes.OpQueryConfigValues(output_fields=fields)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
274
      logging.info("Received cluster info query request")
275
      op = opcodes.OpQueryClusterInfo()
276
      return self._Query(op)
277

    
278
    elif method == luxi.REQ_QUERY_TAGS:
279
      kind, name = args
280
      logging.info("Received tags query request")
281
      op = opcodes.OpGetTags(kind=kind, name=name)
282
      return self._Query(op)
283

    
284
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
285
      drain_flag = args
286
      logging.info("Received queue drain flag change request to %s",
287
                   drain_flag)
288
      return queue.SetDrainFlag(drain_flag)
289

    
290
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
291
      (until, ) = args
292

    
293
      if until is None:
294
        logging.info("Received request to no longer pause the watcher")
295
      else:
296
        if not isinstance(until, (int, float)):
297
          raise TypeError("Duration must be an integer or float")
298

    
299
        if until < time.time():
300
          raise errors.GenericError("Unable to set pause end time in the past")
301

    
302
        logging.info("Received request to pause the watcher until %s", until)
303

    
304
      return _SetWatcherPause(until)
305

    
306
    else:
307
      logging.info("Received invalid request '%s'", method)
308
      raise ValueError("Invalid operation '%s'" % method)
309

    
310
  def _Query(self, op):
311
    """Runs the specified opcode and returns the result.
312

    
313
    """
314
    # Queries don't have a job id
315
    proc = mcpu.Processor(self.server.context, None)
316
    return proc.ExecOpCode(op, None)
317

    
318

    
319
class GanetiContext(object):
320
  """Context common to all ganeti threads.
321

    
322
  This class creates and holds common objects shared by all threads.
323

    
324
  """
325
  # pylint: disable-msg=W0212
326
  # we do want to ensure a singleton here
327
  _instance = None
328

    
329
  def __init__(self):
330
    """Constructs a new GanetiContext object.
331

    
332
    There should be only a GanetiContext object at any time, so this
333
    function raises an error if this is not the case.
334

    
335
    """
336
    assert self.__class__._instance is None, "double GanetiContext instance"
337

    
338
    # Create global configuration object
339
    self.cfg = config.ConfigWriter()
340

    
341
    # Locking manager
342
    self.glm = locking.GanetiLockManager(
343
                self.cfg.GetNodeList(),
344
                self.cfg.GetInstanceList())
345

    
346
    # Job queue
347
    self.jobqueue = jqueue.JobQueue(self)
348

    
349
    # setting this also locks the class against attribute modifications
350
    self.__class__._instance = self
351

    
352
  def __setattr__(self, name, value):
353
    """Setting GanetiContext attributes is forbidden after initialization.
354

    
355
    """
356
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
357
    object.__setattr__(self, name, value)
358

    
359
  def AddNode(self, node, ec_id):
360
    """Adds a node to the configuration and lock manager.
361

    
362
    """
363
    # Add it to the configuration
364
    self.cfg.AddNode(node, ec_id)
365

    
366
    # If preseeding fails it'll not be added
367
    self.jobqueue.AddNode(node)
368

    
369
    # Add the new node to the Ganeti Lock Manager
370
    self.glm.add(locking.LEVEL_NODE, node.name)
371

    
372
  def ReaddNode(self, node):
373
    """Updates a node that's already in the configuration
374

    
375
    """
376
    # Synchronize the queue again
377
    self.jobqueue.AddNode(node)
378

    
379
  def RemoveNode(self, name):
380
    """Removes a node from the configuration and lock manager.
381

    
382
    """
383
    # Remove node from configuration
384
    self.cfg.RemoveNode(name)
385

    
386
    # Notify job queue
387
    self.jobqueue.RemoveNode(name)
388

    
389
    # Remove the node from the Ganeti Lock Manager
390
    self.glm.remove(locking.LEVEL_NODE, name)
391

    
392

    
393
def _SetWatcherPause(until):
394
  """Creates or removes the watcher pause file.
395

    
396
  @type until: None or int
397
  @param until: Unix timestamp saying until when the watcher shouldn't run
398

    
399
  """
400
  if until is None:
401
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
402
  else:
403
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
404
                    data="%d\n" % (until, ))
405

    
406
  return until
407

    
408

    
409
def CheckAgreement():
410
  """Check the agreement on who is the master.
411

    
412
  The function uses a very simple algorithm: we must get more positive
413
  than negative answers. Since in most of the cases we are the master,
414
  we'll use our own config file for getting the node list. In the
415
  future we could collect the current node list from our (possibly
416
  obsolete) known nodes.
417

    
418
  In order to account for cold-start of all nodes, we retry for up to
419
  a minute until we get a real answer as the top-voted one. If the
420
  nodes are more out-of-sync, for now manual startup of the master
421
  should be attempted.
422

    
423
  Note that for a even number of nodes cluster, we need at least half
424
  of the nodes (beside ourselves) to vote for us. This creates a
425
  problem on two-node clusters, since in this case we require the
426
  other node to be up too to confirm our status.
427

    
428
  """
429
  myself = utils.HostInfo().name
430
  #temp instantiation of a config writer, used only to get the node list
431
  cfg = config.ConfigWriter()
432
  node_list = cfg.GetNodeList()
433
  del cfg
434
  retries = 6
435
  while retries > 0:
436
    votes = bootstrap.GatherMasterVotes(node_list)
437
    if not votes:
438
      # empty node list, this is a one node cluster
439
      return True
440
    if votes[0][0] is None:
441
      retries -= 1
442
      time.sleep(10)
443
      continue
444
    break
445
  if retries == 0:
446
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
447
                     " after multiple retries. Aborting startup")
448
    logging.critical("Use the --no-voting option if you understand what"
449
                     " effects it has on the cluster state")
450
    return False
451
  # here a real node is at the top of the list
452
  all_votes = sum(item[1] for item in votes)
453
  top_node, top_votes = votes[0]
454

    
455
  result = False
456
  if top_node != myself:
457
    logging.critical("It seems we are not the master (top-voted node"
458
                     " is %s with %d out of %d votes)", top_node, top_votes,
459
                     all_votes)
460
  elif top_votes < all_votes - top_votes:
461
    logging.critical("It seems we are not the master (%d votes for,"
462
                     " %d votes against)", top_votes, all_votes - top_votes)
463
  else:
464
    result = True
465

    
466
  return result
467

    
468

    
469
def CheckAgreementWithRpc():
470
  rpc.Init()
471
  try:
472
    return CheckAgreement()
473
  finally:
474
    rpc.Shutdown()
475

    
476

    
477
def CheckMasterd(options, args):
478
  """Initial checks whether to run or exit with a failure.
479

    
480
  """
481
  if args: # masterd doesn't take any arguments
482
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
483
    sys.exit(constants.EXIT_FAILURE)
484

    
485
  ssconf.CheckMaster(options.debug)
486

    
487
  try:
488
    options.uid = pwd.getpwnam(constants.MASTERD_USER).pw_uid
489
    options.gid = grp.getgrnam(constants.DAEMONS_GROUP).gr_gid
490
  except KeyError:
491
    print >> sys.stderr, ("User or group not existing on system: %s:%s" %
492
                          (constants.MASTERD_USER, constants.DAEMONS_GROUP))
493
    sys.exit(constants.EXIT_FAILURE)
494

    
495

    
496
  # If CheckMaster didn't fail we believe we are the master, but we have to
497
  # confirm with the other nodes.
498
  if options.no_voting:
499
    if options.yes_do_it:
500
      return
501

    
502
    sys.stdout.write("The 'no voting' option has been selected.\n")
503
    sys.stdout.write("This is dangerous, please confirm by"
504
                     " typing uppercase 'yes': ")
505
    sys.stdout.flush()
506

    
507
    confirmation = sys.stdin.readline().strip()
508
    if confirmation != "YES":
509
      print >> sys.stderr, "Aborting."
510
      sys.exit(constants.EXIT_FAILURE)
511

    
512
    return
513

    
514
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
515
  # process before we call utils.Daemonize in the current process.
516
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
517
    sys.exit(constants.EXIT_FAILURE)
518

    
519

    
520
def ExecMasterd (options, args): # pylint: disable-msg=W0613
521
  """Main master daemon function, executed with the PID file held.
522

    
523
  """
524
  # This is safe to do as the pid file guarantees against
525
  # concurrent execution.
526
  utils.RemoveFile(constants.MASTER_SOCKET)
527

    
528
  mainloop = daemon.Mainloop()
529
  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler,
530
                        options.uid, options.gid)
531
  try:
532
    rpc.Init()
533
    try:
534
      # activate ip
535
      master_node = ssconf.SimpleStore().GetMasterNode()
536
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
537
      msg = result.fail_msg
538
      if msg:
539
        logging.error("Can't activate master IP address: %s", msg)
540

    
541
      master.setup_queue()
542
      try:
543
        mainloop.Run()
544
      finally:
545
        master.server_cleanup()
546
    finally:
547
      rpc.Shutdown()
548
  finally:
549
    utils.RemoveFile(constants.MASTER_SOCKET)
550

    
551

    
552
def main():
553
  """Main function"""
554
  parser = OptionParser(description="Ganeti master daemon",
555
                        usage="%prog [-f] [-d]",
556
                        version="%%prog (ganeti) %s" %
557
                        constants.RELEASE_VERSION)
558
  parser.add_option("--no-voting", dest="no_voting",
559
                    help="Do not check that the nodes agree on this node"
560
                    " being the master and start the daemon unconditionally",
561
                    default=False, action="store_true")
562
  parser.add_option("--yes-do-it", dest="yes_do_it",
563
                    help="Override interactive check for --no-voting",
564
                    default=False, action="store_true")
565
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
566
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
567
         ]
568
  daemon.GenericMain(constants.MASTERD, parser, dirs,
569
                     CheckMasterd, ExecMasterd,
570
                     multithreaded=True)
571

    
572

    
573
if __name__ == "__main__":
574
  main()