Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ fe7c59d5

History | View | Annotate | Download (18 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-masterd
31

    
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import signal
37
import logging
38

    
39
from optparse import OptionParser
40

    
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import daemon
44
from ganeti import mcpu
45
from ganeti import opcodes
46
from ganeti import jqueue
47
from ganeti import locking
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52
from ganeti import workerpool
53
from ganeti import rpc
54
from ganeti import bootstrap
55
from ganeti import serializer
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
   # pylint: disable-msg=W0221
66
  def RunTask(self, server, request, client_address):
67
    """Process the request.
68

    
69
    This is copied from the code in ThreadingMixIn.
70

    
71
    """
72
    try:
73
      server.finish_request(request, client_address)
74
      server.close_request(request)
75
    except: # pylint: disable-msg=W0702
76
      server.handle_error(request, client_address)
77
      server.close_request(request)
78

    
79

    
80
class IOServer(SocketServer.UnixStreamServer):
81
  """IO thread class.
82

    
83
  This class takes care of initializing the other threads, setting
84
  signal handlers (which are processed only in this thread), and doing
85
  cleanup at shutdown.
86

    
87
  """
88
  def __init__(self, address, rqhandler):
89
    """IOServer constructor
90

    
91
    @param address: the address to bind this IOServer to
92
    @param rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool("ClientReq",
104
                                                 CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  @utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
114
  def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
115
    """Handle one request at a time until told to quit."""
116
    assert isinstance(signal_handlers, dict) and \
117
           len(signal_handlers) > 0, \
118
           "Broken SignalHandled decorator"
119
    # Since we use SignalHandled only once, the resulting dict will map all
120
    # signals to the same handler. We'll just use the first one.
121
    sighandler = signal_handlers.values()[0]
122
    while not sighandler.called:
123
      self.handle_request()
124

    
125
  def server_cleanup(self):
126
    """Cleanup the server.
127

    
128
    This involves shutting down the processor threads and the master
129
    socket.
130

    
131
    """
132
    try:
133
      self.server_close()
134
    finally:
135
      if self.request_workers:
136
        self.request_workers.TerminateWorkers()
137
      if self.context:
138
        self.context.jobqueue.Shutdown()
139

    
140

    
141
class ClientRqHandler(SocketServer.BaseRequestHandler):
142
  """Client handler"""
143
  EOM = '\3'
144
  READ_SIZE = 4096
145

    
146
  def setup(self):
147
    # pylint: disable-msg=W0201
148
    # setup() is the api for initialising for this class
149
    self._buffer = ""
150
    self._msgs = collections.deque()
151
    self._ops = ClientOps(self.server)
152

    
153
  def handle(self):
154
    while True:
155
      msg = self.read_message()
156
      if msg is None:
157
        logging.debug("client closed connection")
158
        break
159

    
160
      request = serializer.LoadJson(msg)
161
      logging.debug("request: %s", request)
162
      if not isinstance(request, dict):
163
        logging.error("wrong request received: %s", msg)
164
        break
165

    
166
      method = request.get(luxi.KEY_METHOD, None) # pylint: disable-msg=E1103
167
      args = request.get(luxi.KEY_ARGS, None) # pylint: disable-msg=E1103
168
      if method is None or args is None:
169
        logging.error("no method or args in request")
170
        break
171

    
172
      success = False
173
      try:
174
        result = self._ops.handle_request(method, args)
175
        success = True
176
      except errors.GenericError, err:
177
        success = False
178
        result = errors.EncodeException(err)
179
      except:
180
        logging.error("Unexpected exception", exc_info=True)
181
        err = sys.exc_info()
182
        result = "Caught exception: %s" % str(err[1])
183

    
184
      response = {
185
        luxi.KEY_SUCCESS: success,
186
        luxi.KEY_RESULT: result,
187
        }
188
      logging.debug("response: %s", response)
189
      self.send_message(serializer.DumpJson(response))
190

    
191
  def read_message(self):
192
    while not self._msgs:
193
      data = self.request.recv(self.READ_SIZE)
194
      if not data:
195
        return None
196
      new_msgs = (self._buffer + data).split(self.EOM)
197
      self._buffer = new_msgs.pop()
198
      self._msgs.extend(new_msgs)
199
    return self._msgs.popleft()
200

    
201
  def send_message(self, msg):
202
    #print "sending", msg
203
    # TODO: sendall is not guaranteed to send everything
204
    self.request.sendall(msg + self.EOM)
205

    
206

    
207
class ClientOps:
208
  """Class holding high-level client operations."""
209
  def __init__(self, server):
210
    self.server = server
211

    
212
  def handle_request(self, method, args): # pylint: disable-msg=R0911
213
    queue = self.server.context.jobqueue
214

    
215
    # TODO: Parameter validation
216

    
217
    # TODO: Rewrite to not exit in each 'if/elif' branch
218

    
219
    if method == luxi.REQ_SUBMIT_JOB:
220
      logging.info("Received new job")
221
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
222
      return queue.SubmitJob(ops)
223

    
224
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
225
      logging.info("Received multiple jobs")
226
      jobs = []
227
      for ops in args:
228
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
229
      return queue.SubmitManyJobs(jobs)
230

    
231
    elif method == luxi.REQ_CANCEL_JOB:
232
      job_id = args
233
      logging.info("Received job cancel request for %s", job_id)
234
      return queue.CancelJob(job_id)
235

    
236
    elif method == luxi.REQ_ARCHIVE_JOB:
237
      job_id = args
238
      logging.info("Received job archive request for %s", job_id)
239
      return queue.ArchiveJob(job_id)
240

    
241
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
242
      (age, timeout) = args
243
      logging.info("Received job autoarchive request for age %s, timeout %s",
244
                   age, timeout)
245
      return queue.AutoArchiveJobs(age, timeout)
246

    
247
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
248
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
249
      logging.info("Received job poll request for %s", job_id)
250
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
251
                                     prev_log_serial, timeout)
252

    
253
    elif method == luxi.REQ_QUERY_JOBS:
254
      (job_ids, fields) = args
255
      if isinstance(job_ids, (tuple, list)) and job_ids:
256
        msg = utils.CommaJoin(job_ids)
257
      else:
258
        msg = str(job_ids)
259
      logging.info("Received job query request for %s", msg)
260
      return queue.QueryJobs(job_ids, fields)
261

    
262
    elif method == luxi.REQ_QUERY_INSTANCES:
263
      (names, fields, use_locking) = args
264
      logging.info("Received instance query request for %s", names)
265
      if use_locking:
266
        raise errors.OpPrereqError("Sync queries are not allowed",
267
                                   errors.ECODE_INVAL)
268
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
269
                                    use_locking=use_locking)
270
      return self._Query(op)
271

    
272
    elif method == luxi.REQ_QUERY_NODES:
273
      (names, fields, use_locking) = args
274
      logging.info("Received node query request for %s", names)
275
      if use_locking:
276
        raise errors.OpPrereqError("Sync queries are not allowed",
277
                                   errors.ECODE_INVAL)
278
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
279
                                use_locking=use_locking)
280
      return self._Query(op)
281

    
282
    elif method == luxi.REQ_QUERY_EXPORTS:
283
      nodes, use_locking = args
284
      if use_locking:
285
        raise errors.OpPrereqError("Sync queries are not allowed",
286
                                   errors.ECODE_INVAL)
287
      logging.info("Received exports query request")
288
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
289
      return self._Query(op)
290

    
291
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
292
      fields = args
293
      logging.info("Received config values query request for %s", fields)
294
      op = opcodes.OpQueryConfigValues(output_fields=fields)
295
      return self._Query(op)
296

    
297
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
298
      logging.info("Received cluster info query request")
299
      op = opcodes.OpQueryClusterInfo()
300
      return self._Query(op)
301

    
302
    elif method == luxi.REQ_QUERY_TAGS:
303
      kind, name = args
304
      logging.info("Received tags query request")
305
      op = opcodes.OpGetTags(kind=kind, name=name)
306
      return self._Query(op)
307

    
308
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
309
      drain_flag = args
310
      logging.info("Received queue drain flag change request to %s",
311
                   drain_flag)
312
      return queue.SetDrainFlag(drain_flag)
313

    
314
    elif method == luxi.REQ_SET_WATCHER_PAUSE:
315
      (until, ) = args
316

    
317
      if until is None:
318
        logging.info("Received request to no longer pause the watcher")
319
      else:
320
        if not isinstance(until, (int, float)):
321
          raise TypeError("Duration must be an integer or float")
322

    
323
        if until < time.time():
324
          raise errors.GenericError("Unable to set pause end time in the past")
325

    
326
        logging.info("Received request to pause the watcher until %s", until)
327

    
328
      return _SetWatcherPause(until)
329

    
330
    else:
331
      logging.info("Received invalid request '%s'", method)
332
      raise ValueError("Invalid operation '%s'" % method)
333

    
334
  def _Query(self, op):
335
    """Runs the specified opcode and returns the result.
336

    
337
    """
338
    # Queries don't have a job id
339
    proc = mcpu.Processor(self.server.context, None)
340
    return proc.ExecOpCode(op, None)
341

    
342

    
343
class GanetiContext(object):
344
  """Context common to all ganeti threads.
345

    
346
  This class creates and holds common objects shared by all threads.
347

    
348
  """
349
  # pylint: disable-msg=W0212
350
  # we do want to ensure a singleton here
351
  _instance = None
352

    
353
  def __init__(self):
354
    """Constructs a new GanetiContext object.
355

    
356
    There should be only a GanetiContext object at any time, so this
357
    function raises an error if this is not the case.
358

    
359
    """
360
    assert self.__class__._instance is None, "double GanetiContext instance"
361

    
362
    # Create global configuration object
363
    self.cfg = config.ConfigWriter()
364

    
365
    # Locking manager
366
    self.glm = locking.GanetiLockManager(
367
                self.cfg.GetNodeList(),
368
                self.cfg.GetInstanceList())
369

    
370
    # Job queue
371
    self.jobqueue = jqueue.JobQueue(self)
372

    
373
    # setting this also locks the class against attribute modifications
374
    self.__class__._instance = self
375

    
376
  def __setattr__(self, name, value):
377
    """Setting GanetiContext attributes is forbidden after initialization.
378

    
379
    """
380
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
381
    object.__setattr__(self, name, value)
382

    
383
  def AddNode(self, node, ec_id):
384
    """Adds a node to the configuration and lock manager.
385

    
386
    """
387
    # Add it to the configuration
388
    self.cfg.AddNode(node, ec_id)
389

    
390
    # If preseeding fails it'll not be added
391
    self.jobqueue.AddNode(node)
392

    
393
    # Add the new node to the Ganeti Lock Manager
394
    self.glm.add(locking.LEVEL_NODE, node.name)
395

    
396
  def ReaddNode(self, node):
397
    """Updates a node that's already in the configuration
398

    
399
    """
400
    # Synchronize the queue again
401
    self.jobqueue.AddNode(node)
402

    
403
  def RemoveNode(self, name):
404
    """Removes a node from the configuration and lock manager.
405

    
406
    """
407
    # Remove node from configuration
408
    self.cfg.RemoveNode(name)
409

    
410
    # Notify job queue
411
    self.jobqueue.RemoveNode(name)
412

    
413
    # Remove the node from the Ganeti Lock Manager
414
    self.glm.remove(locking.LEVEL_NODE, name)
415

    
416

    
417
def _SetWatcherPause(until):
418
  """Creates or removes the watcher pause file.
419

    
420
  @type until: None or int
421
  @param until: Unix timestamp saying until when the watcher shouldn't run
422

    
423
  """
424
  if until is None:
425
    utils.RemoveFile(constants.WATCHER_PAUSEFILE)
426
  else:
427
    utils.WriteFile(constants.WATCHER_PAUSEFILE,
428
                    data="%d\n" % (until, ))
429

    
430
  return until
431

    
432

    
433
def CheckAgreement():
434
  """Check the agreement on who is the master.
435

    
436
  The function uses a very simple algorithm: we must get more positive
437
  than negative answers. Since in most of the cases we are the master,
438
  we'll use our own config file for getting the node list. In the
439
  future we could collect the current node list from our (possibly
440
  obsolete) known nodes.
441

    
442
  In order to account for cold-start of all nodes, we retry for up to
443
  a minute until we get a real answer as the top-voted one. If the
444
  nodes are more out-of-sync, for now manual startup of the master
445
  should be attempted.
446

    
447
  Note that for a even number of nodes cluster, we need at least half
448
  of the nodes (beside ourselves) to vote for us. This creates a
449
  problem on two-node clusters, since in this case we require the
450
  other node to be up too to confirm our status.
451

    
452
  """
453
  myself = utils.HostInfo().name
454
  #temp instantiation of a config writer, used only to get the node list
455
  cfg = config.ConfigWriter()
456
  node_list = cfg.GetNodeList()
457
  del cfg
458
  retries = 6
459
  while retries > 0:
460
    votes = bootstrap.GatherMasterVotes(node_list)
461
    if not votes:
462
      # empty node list, this is a one node cluster
463
      return True
464
    if votes[0][0] is None:
465
      retries -= 1
466
      time.sleep(10)
467
      continue
468
    break
469
  if retries == 0:
470
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
471
                     " after multiple retries. Aborting startup")
472
    logging.critical("Use the --no-voting option if you understand what"
473
                     " effects it has on the cluster state")
474
    return False
475
  # here a real node is at the top of the list
476
  all_votes = sum(item[1] for item in votes)
477
  top_node, top_votes = votes[0]
478

    
479
  result = False
480
  if top_node != myself:
481
    logging.critical("It seems we are not the master (top-voted node"
482
                     " is %s with %d out of %d votes)", top_node, top_votes,
483
                     all_votes)
484
  elif top_votes < all_votes - top_votes:
485
    logging.critical("It seems we are not the master (%d votes for,"
486
                     " %d votes against)", top_votes, all_votes - top_votes)
487
  else:
488
    result = True
489

    
490
  return result
491

    
492

    
493
def CheckAgreementWithRpc():
494
  rpc.Init()
495
  try:
496
    return CheckAgreement()
497
  finally:
498
    rpc.Shutdown()
499

    
500

    
501
def CheckMasterd(options, args):
502
  """Initial checks whether to run or exit with a failure.
503

    
504
  """
505
  if args: # masterd doesn't take any arguments
506
    print >> sys.stderr, ("Usage: %s [-f] [-d]" % sys.argv[0])
507
    sys.exit(constants.EXIT_FAILURE)
508

    
509
  ssconf.CheckMaster(options.debug)
510

    
511
  # If CheckMaster didn't fail we believe we are the master, but we have to
512
  # confirm with the other nodes.
513
  if options.no_voting:
514
    if options.yes_do_it:
515
      return
516

    
517
    sys.stdout.write("The 'no voting' option has been selected.\n")
518
    sys.stdout.write("This is dangerous, please confirm by"
519
                     " typing uppercase 'yes': ")
520
    sys.stdout.flush()
521

    
522
    confirmation = sys.stdin.readline().strip()
523
    if confirmation != "YES":
524
      print >> sys.stderr, "Aborting."
525
      sys.exit(constants.EXIT_FAILURE)
526

    
527
    return
528

    
529
  # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
530
  # process before we call utils.Daemonize in the current process.
531
  if not utils.RunInSeparateProcess(CheckAgreementWithRpc):
532
    sys.exit(constants.EXIT_FAILURE)
533

    
534

    
535
def ExecMasterd (options, args): # pylint: disable-msg=W0613
536
  """Main master daemon function, executed with the PID file held.
537

    
538
  """
539
  # This is safe to do as the pid file guarantees against
540
  # concurrent execution.
541
  utils.RemoveFile(constants.MASTER_SOCKET)
542

    
543
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
544
  try:
545
    rpc.Init()
546
    try:
547
      # activate ip
548
      master_node = ssconf.SimpleStore().GetMasterNode()
549
      result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
550
      msg = result.fail_msg
551
      if msg:
552
        logging.error("Can't activate master IP address: %s", msg)
553

    
554
      master.setup_queue()
555
      try:
556
        master.serve_forever()
557
      finally:
558
        master.server_cleanup()
559
    finally:
560
      rpc.Shutdown()
561
  finally:
562
    utils.RemoveFile(constants.MASTER_SOCKET)
563

    
564

    
565
def main():
566
  """Main function"""
567
  parser = OptionParser(description="Ganeti master daemon",
568
                        usage="%prog [-f] [-d]",
569
                        version="%%prog (ganeti) %s" %
570
                        constants.RELEASE_VERSION)
571
  parser.add_option("--no-voting", dest="no_voting",
572
                    help="Do not check that the nodes agree on this node"
573
                    " being the master and start the daemon unconditionally",
574
                    default=False, action="store_true")
575
  parser.add_option("--yes-do-it", dest="yes_do_it",
576
                    help="Override interactive check for --no-voting",
577
                    default=False, action="store_true")
578
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
579
          (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
580
         ]
581
  daemon.GenericMain(constants.MASTERD, parser, dirs,
582
                     CheckMasterd, ExecMasterd)
583

    
584

    
585
if __name__ == "__main__":
586
  main()