Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ f93f2016

History | View | Annotate | Download (18.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import signal
37
import logging
38

    
39
from optparse import OptionParser
40

    
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import daemon
44
from ganeti import mcpu
45
from ganeti import opcodes
46
from ganeti import jqueue
47
from ganeti import locking
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
   # pylint: disable-msg=W0221
66
  def RunTask(self, server, request, client_address):
67
    """Process the request.
68

    
69
    This is copied from the code in ThreadingMixIn.
70

    
71
    """
72
    try:
73
      server.finish_request(request, client_address)
74
      server.close_request(request)
75
    except: # pylint: disable-msg=W0702
76
      server.handle_error(request, client_address)
77
      server.close_request(request)
78

    
79

    
80
class IOServer(SocketServer.UnixStreamServer):
81
  """IO thread class.
82

    
83
  This class takes care of initializing the other threads, setting
84
  signal handlers (which are processed only in this thread), and doing
85
  cleanup at shutdown.
86

    
87
  """
88
  def __init__(self, address, rqhandler):
89
    """IOServer constructor
90

    
91
    @param address: the address to bind this IOServer to
92
    @param rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool("ClientReq",
104
                                                 CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    (pid, uid, gid) = utils.GetSocketCredentials(request)
112
    logging.info("Accepted connection from pid=%s, uid=%s, gid=%s",
113
                 pid, uid, gid)
114

    
115
    self.request_workers.AddTask(self, request, client_address)
116

    
117
  def handle_error(self, request, client_address):
118
    logging.exception("Error while handling request")
119

    
120
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
121
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
122
    """Handle one request at a time until told to quit."""
123
    assert isinstance(signal_handlers, dict) and \
124
           len(signal_handlers) > 0, \
125
           "Broken SignalHandled decorator"
126
    # Since we use SignalHandled only once, the resulting dict will map all
127
    # signals to the same handler. We'll just use the first one.
128
    sighandler = signal_handlers.values()[0]
129
    while not sighandler.called:
130
      self.handle_request()
131

    
132
  def server_cleanup(self):
133
    """Cleanup the server.
134

    
135
    This involves shutting down the processor threads and the master
136
    socket.
137

    
138
    """
139
    try:
140
      self.server_close()
141
    finally:
142
      if self.request_workers:
143
        self.request_workers.TerminateWorkers()
144
      if self.context:
145
        self.context.jobqueue.Shutdown()
146

    
147

    
148
class ClientRqHandler(SocketServer.BaseRequestHandler):
149
  """Client handler"""
150
  EOM = '\3'
151
  READ_SIZE = 4096
152

    
153
  def setup(self):
154
    # pylint: disable-msg=W0201
155
    # setup() is the api for initialising for this class
156
    self._buffer = ""
157
    self._msgs = collections.deque()
158
    self._ops = ClientOps(self.server)
159

    
160
  def handle(self):
161
    while True:
162
      msg = self.read_message()
163
      if msg is None:
164
        logging.debug("client closed connection")
165
        break
166

    
167
      request = serializer.LoadJson(msg)
168
      logging.debug("request: %s", request)
169
      if not isinstance(request, dict):
170
        logging.error("wrong request received: %s", msg)
171
        break
172

    
173
      method = request.get(luxi.KEY_METHOD, None) # pylint: disable-msg=E1103
174
      args = request.get(luxi.KEY_ARGS, None) # pylint: disable-msg=E1103
175
      if method is None or args is None:
176
        logging.error("no method or args in request")
177
        break
178

    
179
      success = False
180
      try:
181
        result = self._ops.handle_request(method, args)
182
        success = True
183
      except errors.GenericError, err:
184
        success = False
185
        result = errors.EncodeException(err)
186
      except:
187
        logging.error("Unexpected exception", exc_info=True)
188
        err = sys.exc_info()
189
        result = "Caught exception: %s" % str(err[1])
190

    
191
      response = {
192
        luxi.KEY_SUCCESS: success,
193
        luxi.KEY_RESULT: result,
194
        }
195
      logging.debug("response: %s", response)
196
      self.send_message(serializer.DumpJson(response))
197

    
198
  def read_message(self):
199
    while not self._msgs:
200
      data = self.request.recv(self.READ_SIZE)
201
      if not data:
202
        return None
203
      new_msgs = (self._buffer + data).split(self.EOM)
204
      self._buffer = new_msgs.pop()
205
      self._msgs.extend(new_msgs)
206
    return self._msgs.popleft()
207

    
208
  def send_message(self, msg):
209
    #print "sending", msg
210
    # TODO: sendall is not guaranteed to send everything
211
    self.request.sendall(msg + self.EOM)
212

    
213

    
214
class ClientOps:
215
  """Class holding high-level client operations."""
216
  def __init__(self, server):
217
    self.server = server
218

    
219
  def handle_request(self, method, args): # pylint: disable-msg=R0911
220
    queue = self.server.context.jobqueue
221

    
222
    # TODO: Parameter validation
223

    
224
    # TODO: Rewrite to not exit in each 'if/elif' branch
225

    
226
    if method == luxi.REQ_SUBMIT_JOB:
227
      logging.info("Received new job")
228
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
229
      return queue.SubmitJob(ops)
230

    
231
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
232
      logging.info("Received multiple jobs")
233
      jobs = []
234
      for ops in args:
235
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
236
      return queue.SubmitManyJobs(jobs)
237

    
238
    elif method == luxi.REQ_CANCEL_JOB:
239
      job_id = args
240
      logging.info("Received job cancel request for %s", job_id)
241
      return queue.CancelJob(job_id)
242

    
243
    elif method == luxi.REQ_ARCHIVE_JOB:
244
      job_id = args
245
      logging.info("Received job archive request for %s", job_id)
246
      return queue.ArchiveJob(job_id)
247

    
248
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
249
      (age, timeout) = args
250
      logging.info("Received job autoarchive request for age %s, timeout %s",
251
                   age, timeout)
252
      return queue.AutoArchiveJobs(age, timeout)
253

    
254
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
255
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
256
      logging.info("Received job poll request for %s", job_id)
257
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
258
                                     prev_log_serial, timeout)
259

    
260
    elif method == luxi.REQ_QUERY_JOBS:
261
      (job_ids, fields) = args
262
      if isinstance(job_ids, (tuple, list)) and job_ids:
263
        msg = utils.CommaJoin(job_ids)
264
      else:
265
        msg = str(job_ids)
266
      logging.info("Received job query request for %s", msg)
267
      return queue.QueryJobs(job_ids, fields)
268

    
269
    elif method == luxi.REQ_QUERY_INSTANCES:
270
      (names, fields, use_locking) = args
271
      logging.info("Received instance query request for %s", names)
272
      if use_locking:
273
        raise errors.OpPrereqError("Sync queries are not allowed",
274
                                   errors.ECODE_INVAL)
275
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
276
                                    use_locking=use_locking)
277
      return self._Query(op)
278

    
279
    elif method == luxi.REQ_QUERY_NODES:
280
      (names, fields, use_locking) = args
281
      logging.info("Received node query request for %s", names)
282
      if use_locking:
283
        raise errors.OpPrereqError("Sync queries are not allowed",
284
                                   errors.ECODE_INVAL)
285
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
286
                                use_locking=use_locking)
287
      return self._Query(op)
288

    
289
    elif method == luxi.REQ_QUERY_EXPORTS:
290
      nodes, use_locking = args
291
      if use_locking:
292
        raise errors.OpPrereqError("Sync queries are not allowed",
293
                                   errors.ECODE_INVAL)
294
      logging.info("Received exports query request")
295
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
296
      return self._Query(op)
297

    
298
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
299
      fields = args
300
      logging.info("Received config values query request for %s", fields)
301
      op = opcodes.OpQueryConfigValues(output_fields=fields)
302
      return self._Query(op)
303

    
304
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
305
      logging.info("Received cluster info query request")
306
      op = opcodes.OpQueryClusterInfo()
307
      return self._Query(op)
308

    
309
    elif method == luxi.REQ_QUERY_TAGS:
310
      kind, name = args
311
      logging.info("Received tags query request")
312
      op = opcodes.OpGetTags(kind=kind, name=name)
313
      return self._Query(op)
314

    
315
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
316
      drain_flag = args
317
      logging.info("Received queue drain flag change request to %s",
318
                   drain_flag)
319
      return queue.SetDrainFlag(drain_flag)
320

    
321
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
322
      (until, ) = args
323

    
324
      if until is None:
325
        logging.info("Received request to no longer pause the watcher")
326
      else:
327
        if not isinstance(until, (int, float)):
328
          raise TypeError("Duration must be an integer or float")
329

    
330
        if until < time.time():
331
          raise errors.GenericError("Unable to set pause end time in the past")
332

    
333
        logging.info("Received request to pause the watcher until %s", until)
334

    
335
      return _SetWatcherPause(until)
336

    
337
    else:
338
      logging.info("Received invalid request '%s'", method)
339
      raise ValueError("Invalid operation '%s'" % method)
340

    
341
  def _Query(self, op):
342
    """Runs the specified opcode and returns the result.
343

    
344
    """
345
    # Queries don't have a job id
346
    proc = mcpu.Processor(self.server.context, None)
347
    return proc.ExecOpCode(op, None)
348

    
349

    
350
class GanetiContext(object):
351
  """Context common to all ganeti threads.
352

    
353
  This class creates and holds common objects shared by all threads.
354

    
355
  """
356
  # pylint: disable-msg=W0212
357
  # we do want to ensure a singleton here
358
  _instance = None
359

    
360
  def __init__(self):
361
    """Constructs a new GanetiContext object.
362

    
363
    There should be only a GanetiContext object at any time, so this
364
    function raises an error if this is not the case.
365

    
366
    """
367
    assert self.__class__._instance is None, "double GanetiContext instance"
368

    
369
    # Create global configuration object
370
    self.cfg = config.ConfigWriter()
371

    
372
    # Locking manager
373
    self.glm = locking.GanetiLockManager(
374
                self.cfg.GetNodeList(),
375
                self.cfg.GetInstanceList())
376

    
377
    # Job queue
378
    self.jobqueue = jqueue.JobQueue(self)
379

    
380
    # setting this also locks the class against attribute modifications
381
    self.__class__._instance = self
382

    
383
  def __setattr__(self, name, value):
384
    """Setting GanetiContext attributes is forbidden after initialization.
385

    
386
    """
387
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
388
    object.__setattr__(self, name, value)
389

    
390
  def AddNode(self, node, ec_id):
391
    """Adds a node to the configuration and lock manager.
392

    
393
    """
394
    # Add it to the configuration
395
    self.cfg.AddNode(node, ec_id)
396

    
397
    # If preseeding fails it'll not be added
398
    self.jobqueue.AddNode(node)
399

    
400
    # Add the new node to the Ganeti Lock Manager
401
    self.glm.add(locking.LEVEL_NODE, node.name)
402

    
403
  def ReaddNode(self, node):
404
    """Updates a node that's already in the configuration
405

    
406
    """
407
    # Synchronize the queue again
408
    self.jobqueue.AddNode(node)
409

    
410
  def RemoveNode(self, name):
411
    """Removes a node from the configuration and lock manager.
412

    
413
    """
414
    # Remove node from configuration
415
    self.cfg.RemoveNode(name)
416

    
417
    # Notify job queue
418
    self.jobqueue.RemoveNode(name)
419

    
420
    # Remove the node from the Ganeti Lock Manager
421
    self.glm.remove(locking.LEVEL_NODE, name)
422

    
423

    
424
def _SetWatcherPause(until):
425
  """Creates or removes the watcher pause file.
426

    
427
  @type until: None or int
428
  @param until: Unix timestamp saying until when the watcher shouldn't run
429

    
430
  """
431
  if until is None:
432
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
433
  else:
434
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
435
                    data="%d\n" % (until, ))
436

    
437
  return until
438

    
439

    
440
def CheckAgreement():
441
  """Check the agreement on who is the master.
442

    
443
  The function uses a very simple algorithm: we must get more positive
444
  than negative answers. Since in most of the cases we are the master,
445
  we'll use our own config file for getting the node list. In the
446
  future we could collect the current node list from our (possibly
447
  obsolete) known nodes.
448

    
449
  In order to account for cold-start of all nodes, we retry for up to
450
  a minute until we get a real answer as the top-voted one. If the
451
  nodes are more out-of-sync, for now manual startup of the master
452
  should be attempted.
453

    
454
  Note that for a even number of nodes cluster, we need at least half
455
  of the nodes (beside ourselves) to vote for us. This creates a
456
  problem on two-node clusters, since in this case we require the
457
  other node to be up too to confirm our status.
458

    
459
  """
460
  myself = utils.HostInfo().name
461
  #temp instantiation of a config writer, used only to get the node list
462
  cfg = config.ConfigWriter()
463
  node_list = cfg.GetNodeList()
464
  del cfg
465
  retries = 6
466
  while retries > 0:
467
    votes = bootstrap.GatherMasterVotes(node_list)
468
    if not votes:
469
      # empty node list, this is a one node cluster
470
      return True
471
    if votes[0][0] is None:
472
      retries -= 1
473
      time.sleep(10)
474
      continue
475
    break
476
  if retries == 0:
477
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
478
                     " after multiple retries. Aborting startup")
479
    logging.critical("Use the --no-voting option if you understand what"
480
                     " effects it has on the cluster state")
481
    return False
482
  # here a real node is at the top of the list
483
  all_votes = sum(item[1] for item in votes)
484
  top_node, top_votes = votes[0]
485

    
486
  result = False
487
  if top_node != myself:
488
    logging.critical("It seems we are not the master (top-voted node"
489
                     " is %s with %d out of %d votes)", top_node, top_votes,
490
                     all_votes)
491
  elif top_votes < all_votes - top_votes:
492
    logging.critical("It seems we are not the master (%d votes for,"
493
                     " %d votes against)", top_votes, all_votes - top_votes)
494
  else:
495
    result = True
496

    
497
  return result
498

    
499

    
500
def CheckAgreementWithRpc():
501
  rpc.Init()
502
  try:
503
    return CheckAgreement()
504
  finally:
505
    rpc.Shutdown()
506

    
507

    
508
def CheckMasterd(options, args):
509
  """Initial checks whether to run or exit with a failure.
510

    
511
  """
512
  if args: # masterd doesn't take any arguments
513
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
514
    sys.exit(constants.EXIT_FAILURE)
515

    
516
  ssconf.CheckMaster(options.debug)
517

    
518
  # If CheckMaster didn't fail we believe we are the master, but we have to
519
  # confirm with the other nodes.
520
  if options.no_voting:
521
    if options.yes_do_it:
522
      return
523

    
524
    sys.stdout.write("The 'no voting' option has been selected.\n")
525
    sys.stdout.write("This is dangerous, please confirm by"
526
                     " typing uppercase 'yes': ")
527
    sys.stdout.flush()
528

    
529
    confirmation = sys.stdin.readline().strip()
530
    if confirmation != "YES":
531
      print >> sys.stderr, "Aborting."
532
      sys.exit(constants.EXIT_FAILURE)
533

    
534
    return
535

    
536
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
537
  # process before we call utils.Daemonize in the current process.
538
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
539
    sys.exit(constants.EXIT_FAILURE)
540

    
541

    
542
def ExecMasterd (options, args): # pylint: disable-msg=W0613
543
  """Main master daemon function, executed with the PID file held.
544

    
545
  """
546
  # This is safe to do as the pid file guarantees against
547
  # concurrent execution.
548
  utils.RemoveFile(constants.MASTER_SOCKET)
549

    
550
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
551
  try:
552
    rpc.Init()
553
    try:
554
      # activate ip
555
      master_node = ssconf.SimpleStore().GetMasterNode()
556
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
557
      msg = result.fail_msg
558
      if msg:
559
        logging.error("Can't activate master IP address: %s", msg)
560

    
561
      master.setup_queue()
562
      try:
563
        master.serve_forever()
564
      finally:
565
        master.server_cleanup()
566
    finally:
567
      rpc.Shutdown()
568
  finally:
569
    utils.RemoveFile(constants.MASTER_SOCKET)
570

    
571

    
572
def main():
573
  """Main function"""
574
  parser = OptionParser(description="Ganeti master daemon",
575
                        usage="%prog [-f] [-d]",
576
                        version="%%prog (ganeti) %s" %
577
                        constants.RELEASE_VERSION)
578
  parser.add_option("--no-voting", dest="no_voting",
579
                    help="Do not check that the nodes agree on this node"
580
                    " being the master and start the daemon unconditionally",
581
                    default=False, action="store_true")
582
  parser.add_option("--yes-do-it", dest="yes_do_it",
583
                    help="Override interactive check for --no-voting",
584
                    default=False, action="store_true")
585
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
586
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
587
         ]
588
  daemon.GenericMain(constants.MASTERD, parser, dirs,
589
                     CheckMasterd, ExecMasterd)
590

    
591

    
592
if __name__ == "__main__":
593
  main()