Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 7699c3af

History | View | Annotate | Download (18.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import os
33
import sys
34
import SocketServer
35
import time
36
import collections
37
import signal
38
import logging
39

    
40
from optparse import OptionParser
41

    
42
from ganeti import config
43
from ganeti import constants
44
from ganeti import daemon
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import workerpool
54
from ganeti import rpc
55
from ganeti import bootstrap
56
from ganeti import serializer
57

    
58

    
59
CLIENT_REQUEST_WORKERS = 16
60

    
61
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
62
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
63

    
64

    
65
class ClientRequestWorker(workerpool.BaseWorker):
66
   # pylint: disable-msg=W0221
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except: # pylint: disable-msg=W0702
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115
    """Handle one request at a time until told to quit."""
116
    assert isinstance(signal_handlers, dict) and \
117
           len(signal_handlers) > 0, \
118
           "Broken SignalHandled decorator"
119
    # Since we use SignalHandled only once, the resulting dict will map all
120
    # signals to the same handler. We'll just use the first one.
121
    sighandler = signal_handlers.values()[0]
122
    while not sighandler.called:
123
      self.handle_request()
124

    
125
  def server_cleanup(self):
126
    """Cleanup the server.
127

    
128
    This involves shutting down the processor threads and the master
129
    socket.
130

    
131
    """
132
    try:
133
      self.server_close()
134
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137
      if self.context:
138
        self.context.jobqueue.Shutdown()
139

    
140

    
141
class ClientRqHandler(SocketServer.BaseRequestHandler):
142
  """Client handler"""
143
  EOM = '\3'
144
  READ_SIZE = 4096
145

    
146
  def setup(self):
147
    # pylint: disable-msg=W0201
148
    # setup() is the api for initialising for this class
149
    self._buffer = ""
150
    self._msgs = collections.deque()
151
    self._ops = ClientOps(self.server)
152

    
153
  def handle(self):
154
    while True:
155
      msg = self.read_message()
156
      if msg is None:
157
        logging.debug("client closed connection")
158
        break
159

    
160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
164
        break
165

    
166
      method = request.get(luxi.KEY_METHOD, None)
167
      args = request.get(luxi.KEY_ARGS, None)
168
      if method is None or args is None:
169
        logging.error("no method or args in request")
170
        break
171

    
172
      success = False
173
      try:
174
        result = self._ops.handle_request(method, args)
175
        success = True
176
      except errors.GenericError, err:
177
        success = False
178
        result = errors.EncodeException(err)
179
      except:
180
        logging.error("Unexpected exception", exc_info=True)
181
        err = sys.exc_info()
182
        result = "Caught exception: %s" % str(err[1])
183

    
184
      response = {
185
        luxi.KEY_SUCCESS: success,
186
        luxi.KEY_RESULT: result,
187
        }
188
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
190

    
191
  def read_message(self):
192
    while not self._msgs:
193
      data = self.request.recv(self.READ_SIZE)
194
      if not data:
195
        return None
196
      new_msgs = (self._buffer + data).split(self.EOM)
197
      self._buffer = new_msgs.pop()
198
      self._msgs.extend(new_msgs)
199
    return self._msgs.popleft()
200

    
201
  def send_message(self, msg):
202
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
204
    self.request.sendall(msg + self.EOM)
205

    
206

    
207
class ClientOps:
208
  """Class holding high-level client operations."""
209
  def __init__(self, server):
210
    self.server = server
211

    
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214

    
215
    # TODO: Parameter validation
216

    
217
    # TODO: Rewrite to not exit in each 'if/elif' branch
218

    
219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
223

    
224
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
225
      logging.info("Received multiple jobs")
226
      jobs = []
227
      for ops in args:
228
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229
      return queue.SubmitManyJobs(jobs)
230

    
231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
235

    
236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239
      return queue.ArchiveJob(job_id)
240

    
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243
      logging.info("Received job autoarchive request for age %s, timeout %s",
244
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
246

    
247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

    
253
    elif method == luxi.REQ_QUERY_JOBS:
254
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257
      else:
258
        msg = str(job_ids)
259
      logging.info("Received job query request for %s", msg)
260
      return queue.QueryJobs(job_ids, fields)
261

    
262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266
        raise errors.OpPrereqError("Sync queries are not allowed",
267
                                   errors.ECODE_INVAL)
268
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269
                                    use_locking=use_locking)
270
      return self._Query(op)
271

    
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed",
277
                                   errors.ECODE_INVAL)
278
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279
                                use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285
        raise errors.OpPrereqError("Sync queries are not allowed",
286
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289
      return self._Query(op)
290

    
291
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294
      op = opcodes.OpQueryConfigValues(output_fields=fields)
295
      return self._Query(op)
296

    
297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299
      op = opcodes.OpQueryClusterInfo()
300
      return self._Query(op)
301

    
302
    elif method == luxi.REQ_QUERY_TAGS:
303
      kind, name = args
304
      logging.info("Received tags query request")
305
      op = opcodes.OpGetTags(kind=kind, name=name)
306
      return self._Query(op)
307

    
308
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
309
      drain_flag = args
310
      logging.info("Received queue drain flag change request to %s",
311
                   drain_flag)
312
      return queue.SetDrainFlag(drain_flag)
313

    
314
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
315
      (until, ) = args
316

    
317
      if until is None:
318
        logging.info("Received request to no longer pause the watcher")
319
      else:
320
        if not isinstance(until, (int, float)):
321
          raise TypeError("Duration must be an integer or float")
322

    
323
        if until < time.time():
324
          raise errors.GenericError("Unable to set pause end time in the past")
325

    
326
        logging.info("Received request to pause the watcher until %s", until)
327

    
328
      return _SetWatcherPause(until)
329

    
330
    else:
331
      logging.info("Received invalid request '%s'", method)
332
      raise ValueError("Invalid operation '%s'" % method)
333

    
334
  def _Query(self, op):
335
    """Runs the specified opcode and returns the result.
336

    
337
    """
338
    # Queries don't have a job id
339
    proc = mcpu.Processor(self.server.context, None)
340
    return proc.ExecOpCode(op, None)
341

    
342

    
343
class GanetiContext(object):
344
  """Context common to all ganeti threads.
345

    
346
  This class creates and holds common objects shared by all threads.
347

    
348
  """
349
  # pylint: disable-msg=W0212
350
  # we do want to ensure a singleton here
351
  _instance = None
352

    
353
  def __init__(self):
354
    """Constructs a new GanetiContext object.
355

    
356
    There should be only a GanetiContext object at any time, so this
357
    function raises an error if this is not the case.
358

    
359
    """
360
    assert self.__class__._instance is None, "double GanetiContext instance"
361

    
362
    # Create global configuration object
363
    self.cfg = config.ConfigWriter()
364

    
365
    # Locking manager
366
    self.glm = locking.GanetiLockManager(
367
                self.cfg.GetNodeList(),
368
                self.cfg.GetInstanceList())
369

    
370
    # Job queue
371
    self.jobqueue = jqueue.JobQueue(self)
372

    
373
    # setting this also locks the class against attribute modifications
374
    self.__class__._instance = self
375

    
376
  def __setattr__(self, name, value):
377
    """Setting GanetiContext attributes is forbidden after initialization.
378

    
379
    """
380
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
381
    object.__setattr__(self, name, value)
382

    
383
  def AddNode(self, node, ec_id):
384
    """Adds a node to the configuration and lock manager.
385

    
386
    """
387
    # Add it to the configuration
388
    self.cfg.AddNode(node, ec_id)
389

    
390
    # If preseeding fails it'll not be added
391
    self.jobqueue.AddNode(node)
392

    
393
    # Add the new node to the Ganeti Lock Manager
394
    self.glm.add(locking.LEVEL_NODE, node.name)
395

    
396
  def ReaddNode(self, node):
397
    """Updates a node that's already in the configuration
398

    
399
    """
400
    # Synchronize the queue again
401
    self.jobqueue.AddNode(node)
402

    
403
  def RemoveNode(self, name):
404
    """Removes a node from the configuration and lock manager.
405

    
406
    """
407
    # Remove node from configuration
408
    self.cfg.RemoveNode(name)
409

    
410
    # Notify job queue
411
    self.jobqueue.RemoveNode(name)
412

    
413
    # Remove the node from the Ganeti Lock Manager
414
    self.glm.remove(locking.LEVEL_NODE, name)
415

    
416

    
417
def _SetWatcherPause(until):
418
  """Creates or removes the watcher pause file.
419

    
420
  @type until: None or int
421
  @param until: Unix timestamp saying until when the watcher shouldn't run
422

    
423
  """
424
  if until is None:
425
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
426
  else:
427
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
428
                    data="%d\n" % (until, ))
429

    
430
  return until
431

    
432

    
433
def CheckAgreement():
434
  """Check the agreement on who is the master.
435

    
436
  The function uses a very simple algorithm: we must get more positive
437
  than negative answers. Since in most of the cases we are the master,
438
  we'll use our own config file for getting the node list. In the
439
  future we could collect the current node list from our (possibly
440
  obsolete) known nodes.
441

    
442
  In order to account for cold-start of all nodes, we retry for up to
443
  a minute until we get a real answer as the top-voted one. If the
444
  nodes are more out-of-sync, for now manual startup of the master
445
  should be attempted.
446

    
447
  Note that for a even number of nodes cluster, we need at least half
448
  of the nodes (beside ourselves) to vote for us. This creates a
449
  problem on two-node clusters, since in this case we require the
450
  other node to be up too to confirm our status.
451

    
452
  """
453
  myself = utils.HostInfo().name
454
  #temp instantiation of a config writer, used only to get the node list
455
  cfg = config.ConfigWriter()
456
  node_list = cfg.GetNodeList()
457
  del cfg
458
  retries = 6
459
  while retries > 0:
460
    votes = bootstrap.GatherMasterVotes(node_list)
461
    if not votes:
462
      # empty node list, this is a one node cluster
463
      return True
464
    if votes[0][0] is None:
465
      retries -= 1
466
      time.sleep(10)
467
      continue
468
    break
469
  if retries == 0:
470
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
471
                     " after multiple retries. Aborting startup")
472
    return False
473
  # here a real node is at the top of the list
474
  all_votes = sum(item[1] for item in votes)
475
  top_node, top_votes = votes[0]
476

    
477
  result = False
478
  if top_node != myself:
479
    logging.critical("It seems we are not the master (top-voted node"
480
                     " is %s with %d out of %d votes)", top_node, top_votes,
481
                     all_votes)
482
  elif top_votes < all_votes - top_votes:
483
    logging.critical("It seems we are not the master (%d votes for,"
484
                     " %d votes against)", top_votes, all_votes - top_votes)
485
  else:
486
    result = True
487

    
488
  return result
489

    
490

    
491
def CheckAgreementWithRpc():
492
  rpc.Init()
493
  try:
494
    return CheckAgreement()
495
  finally:
496
    rpc.Shutdown()
497

    
498

    
499
def _RunInSeparateProcess(fn):
500
  """Runs a function in a separate process.
501

    
502
  Note: Only boolean return values are supported.
503

    
504
  @type fn: callable
505
  @param fn: Function to be called
506
  @rtype: bool
507

    
508
  """
509
  pid = os.fork()
510
  if pid == 0:
511
    # Child process
512
    try:
513
      # Call function
514
      result = int(bool(fn()))
515
      assert result in (0, 1)
516
    except: # pylint: disable-msg=W0702
517
      logging.exception("Error while calling function in separate process")
518
      # 0 and 1 are reserved for the return value
519
      result = 33
520

    
521
    os._exit(result) # pylint: disable-msg=W0212
522

    
523
  # Parent process
524

    
525
  # Avoid zombies and check exit code
526
  (_, status) = os.waitpid(pid, 0)
527

    
528
  if os.WIFSIGNALED(status):
529
    signum = os.WTERMSIG(status)
530
    exitcode = None
531
  else:
532
    signum = None
533
    exitcode = os.WEXITSTATUS(status)
534

    
535
  if not (exitcode in (0, 1) and signum is None):
536
    logging.error("Child program failed (code=%s, signal=%s)",
537
                  exitcode, signum)
538
    sys.exit(constants.EXIT_FAILURE)
539

    
540
  return bool(exitcode)
541

    
542

    
543
def CheckMasterd(options, args):
544
  """Initial checks whether to run or exit with a failure.
545

    
546
  """
547
  if args: # masterd doesn't take any arguments
548
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
549
    sys.exit(constants.EXIT_FAILURE)
550

    
551
  ssconf.CheckMaster(options.debug)
552

    
553
  # If CheckMaster didn't fail we believe we are the master, but we have to
554
  # confirm with the other nodes.
555
  if options.no_voting:
556
    if options.yes_do_it:
557
      return
558

    
559
    sys.stdout.write("The 'no voting' option has been selected.\n")
560
    sys.stdout.write("This is dangerous, please confirm by"
561
                     " typing uppercase 'yes': ")
562
    sys.stdout.flush()
563

    
564
    confirmation = sys.stdin.readline().strip()
565
    if confirmation != "YES":
566
      print >> sys.stderr, "Aborting."
567
      sys.exit(constants.EXIT_FAILURE)
568

    
569
    return
570

    
571
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
572
  # process before we call utils.Daemonize in the current process.
573
  if not _RunInSeparateProcess(CheckAgreementWithRpc):
574
    sys.exit(constants.EXIT_FAILURE)
575

    
576

    
577
def ExecMasterd (options, args): # pylint: disable-msg=W0613
578
  """Main master daemon function, executed with the PID file held.
579

    
580
  """
581
  # This is safe to do as the pid file guarantees against
582
  # concurrent execution.
583
  utils.RemoveFile(constants.MASTER_SOCKET)
584

    
585
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
586
  try:
587
    rpc.Init()
588
    try:
589
      # activate ip
590
      master_node = ssconf.SimpleStore().GetMasterNode()
591
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
592
      msg = result.fail_msg
593
      if msg:
594
        logging.error("Can't activate master IP address: %s", msg)
595

    
596
      master.setup_queue()
597
      try:
598
        master.serve_forever()
599
      finally:
600
        master.server_cleanup()
601
    finally:
602
      rpc.Shutdown()
603
  finally:
604
    utils.RemoveFile(constants.MASTER_SOCKET)
605

    
606

    
607
def main():
608
  """Main function"""
609
  parser = OptionParser(description="Ganeti master daemon",
610
                        usage="%prog [-f] [-d]",
611
                        version="%%prog (ganeti) %s" %
612
                        constants.RELEASE_VERSION)
613
  parser.add_option("--no-voting", dest="no_voting",
614
                    help="Do not check that the nodes agree on this node"
615
                    " being the master and start the daemon unconditionally",
616
                    default=False, action="store_true")
617
  parser.add_option("--yes-do-it", dest="yes_do_it",
618
                    help="Override interactive check for --no-voting",
619
                    default=False, action="store_true")
620
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
621
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
622
         ]
623
  daemon.GenericMain(constants.MASTERD, parser, dirs,
624
                     CheckMasterd, ExecMasterd)
625

    
626

    
627
if __name__ == "__main__":
628
  main()