Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 5e96d216

History | View | Annotate | Download (15.9 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import workerpool
55
from ganeti import rpc
56
from ganeti import bootstrap
57
from ganeti import serializer
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = serializer.LoadJson(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(serializer.DumpJson(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    self.request.sendall(msg + self.EOM)
199

    
200

    
201
class ClientOps:
202
  """Class holding high-level client operations."""
203
  def __init__(self, server):
204
    self.server = server
205

    
206
  def handle_request(self, method, args):
207
    queue = self.server.context.jobqueue
208

    
209
    # TODO: Parameter validation
210

    
211
    if method == luxi.REQ_SUBMIT_JOB:
212
      logging.info("Received new job")
213
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214
      return queue.SubmitJob(ops)
215

    
216
    elif method == luxi.REQ_CANCEL_JOB:
217
      job_id = args
218
      logging.info("Received job cancel request for %s", job_id)
219
      return queue.CancelJob(job_id)
220

    
221
    elif method == luxi.REQ_ARCHIVE_JOB:
222
      job_id = args
223
      logging.info("Received job archive request for %s", job_id)
224
      return queue.ArchiveJob(job_id)
225

    
226
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
227
      (age, timeout) = args
228
      logging.info("Received job autoarchive request for age %s, timeout %s",
229
                   age, timeout)
230
      return queue.AutoArchiveJobs(age, timeout)
231

    
232
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
233
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
234
      logging.info("Received job poll request for %s", job_id)
235
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
236
                                     prev_log_serial, timeout)
237

    
238
    elif method == luxi.REQ_QUERY_JOBS:
239
      (job_ids, fields) = args
240
      if isinstance(job_ids, (tuple, list)) and job_ids:
241
        msg = ", ".join(job_ids)
242
      else:
243
        msg = str(job_ids)
244
      logging.info("Received job query request for %s", msg)
245
      return queue.QueryJobs(job_ids, fields)
246

    
247
    elif method == luxi.REQ_QUERY_INSTANCES:
248
      (names, fields, use_locking) = args
249
      logging.info("Received instance query request for %s", names)
250
      if use_locking:
251
        raise errors.OpPrereqError("Sync queries are not allowed")
252
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
253
                                    use_locking=use_locking)
254
      return self._Query(op)
255

    
256
    elif method == luxi.REQ_QUERY_NODES:
257
      (names, fields, use_locking) = args
258
      logging.info("Received node query request for %s", names)
259
      if use_locking:
260
        raise errors.OpPrereqError("Sync queries are not allowed")
261
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
262
                                use_locking=use_locking)
263
      return self._Query(op)
264

    
265
    elif method == luxi.REQ_QUERY_EXPORTS:
266
      nodes, use_locking = args
267
      if use_locking:
268
        raise errors.OpPrereqError("Sync queries are not allowed")
269
      logging.info("Received exports query request")
270
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
271
      return self._Query(op)
272

    
273
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
274
      fields = args
275
      logging.info("Received config values query request for %s", fields)
276
      op = opcodes.OpQueryConfigValues(output_fields=fields)
277
      return self._Query(op)
278

    
279
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
280
      logging.info("Received cluster info query request")
281
      op = opcodes.OpQueryClusterInfo()
282
      return self._Query(op)
283

    
284
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
285
      drain_flag = args
286
      logging.info("Received queue drain flag change request to %s",
287
                   drain_flag)
288
      return queue.SetDrainFlag(drain_flag)
289

    
290
    else:
291
      logging.info("Received invalid request '%s'", method)
292
      raise ValueError("Invalid operation '%s'" % method)
293

    
294
  def _DummyLog(self, *args):
295
    pass
296

    
297
  def _Query(self, op):
298
    """Runs the specified opcode and returns the result.
299

    
300
    """
301
    proc = mcpu.Processor(self.server.context)
302
    # TODO: Where should log messages go?
303
    return proc.ExecOpCode(op, self._DummyLog, None)
304

    
305

    
306
class GanetiContext(object):
307
  """Context common to all ganeti threads.
308

    
309
  This class creates and holds common objects shared by all threads.
310

    
311
  """
312
  _instance = None
313

    
314
  def __init__(self):
315
    """Constructs a new GanetiContext object.
316

    
317
    There should be only a GanetiContext object at any time, so this
318
    function raises an error if this is not the case.
319

    
320
    """
321
    assert self.__class__._instance is None, "double GanetiContext instance"
322

    
323
    # Create global configuration object
324
    self.cfg = config.ConfigWriter()
325

    
326
    # Locking manager
327
    self.glm = locking.GanetiLockManager(
328
                self.cfg.GetNodeList(),
329
                self.cfg.GetInstanceList())
330

    
331
    # Job queue
332
    self.jobqueue = jqueue.JobQueue(self)
333

    
334
    # setting this also locks the class against attribute modifications
335
    self.__class__._instance = self
336

    
337
  def __setattr__(self, name, value):
338
    """Setting GanetiContext attributes is forbidden after initialization.
339

    
340
    """
341
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
342
    object.__setattr__(self, name, value)
343

    
344
  def AddNode(self, node):
345
    """Adds a node to the configuration and lock manager.
346

    
347
    """
348
    # Add it to the configuration
349
    self.cfg.AddNode(node)
350

    
351
    # If preseeding fails it'll not be added
352
    self.jobqueue.AddNode(node)
353

    
354
    # Add the new node to the Ganeti Lock Manager
355
    self.glm.add(locking.LEVEL_NODE, node.name)
356

    
357
  def ReaddNode(self, node):
358
    """Updates a node that's already in the configuration
359

    
360
    """
361
    # Synchronize the queue again
362
    self.jobqueue.AddNode(node)
363

    
364
  def RemoveNode(self, name):
365
    """Removes a node from the configuration and lock manager.
366

    
367
    """
368
    # Remove node from configuration
369
    self.cfg.RemoveNode(name)
370

    
371
    # Notify job queue
372
    self.jobqueue.RemoveNode(name)
373

    
374
    # Remove the node from the Ganeti Lock Manager
375
    self.glm.remove(locking.LEVEL_NODE, name)
376

    
377

    
378
def ParseOptions():
379
  """Parse the command line options.
380

    
381
  @return: (options, args) as from OptionParser.parse_args()
382

    
383
  """
384
  parser = OptionParser(description="Ganeti master daemon",
385
                        usage="%prog [-f] [-d]",
386
                        version="%%prog (ganeti) %s" %
387
                        constants.RELEASE_VERSION)
388

    
389
  parser.add_option("-f", "--foreground", dest="fork",
390
                    help="Don't detach from the current terminal",
391
                    default=True, action="store_false")
392
  parser.add_option("-d", "--debug", dest="debug",
393
                    help="Enable some debug messages",
394
                    default=False, action="store_true")
395
  parser.add_option("--no-voting", dest="no_voting",
396
                    help="Do not check that the nodes agree on this node"
397
                    " being the master and start the daemon unconditionally",
398
                    default=False, action="store_true")
399
  parser.add_option("--yes-do-it", dest="yes_do_it",
400
                    help="Override interactive check for --no-voting",
401
                    default=False, action="store_true")
402

    
403
  options, args = parser.parse_args()
404
  return options, args
405

    
406

    
407
def CheckAgreement():
408
  """Check the agreement on who is the master.
409

    
410
  The function uses a very simple algorithm: we must get more positive
411
  than negative answers. Since in most of the cases we are the master,
412
  we'll use our own config file for getting the node list. In the
413
  future we could collect the current node list from our (possibly
414
  obsolete) known nodes.
415

    
416
  In order to account for cold-start of all nodes, we retry for up to
417
  a minute until we get a real answer as the top-voted one. If the
418
  nodes are more out-of-sync, for now manual startup of the master
419
  should be attempted.
420

    
421
  Note that for a even number of nodes cluster, we need at least half
422
  of the nodes (beside ourselves) to vote for us. This creates a
423
  problem on two-node clusters, since in this case we require the
424
  other node to be up too to confirm our status.
425

    
426
  """
427
  myself = utils.HostInfo().name
428
  #temp instantiation of a config writer, used only to get the node list
429
  cfg = config.ConfigWriter()
430
  node_list = cfg.GetNodeList()
431
  del cfg
432
  retries = 6
433
  while retries > 0:
434
    votes = bootstrap.GatherMasterVotes(node_list)
435
    if not votes:
436
      # empty node list, this is a one node cluster
437
      return True
438
    if votes[0][0] is None:
439
      retries -= 1
440
      time.sleep(10)
441
      continue
442
    break
443
  if retries == 0:
444
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
445
                     " after multiple retries. Aborting startup")
446
    return False
447
  # here a real node is at the top of the list
448
  all_votes = sum(item[1] for item in votes)
449
  top_node, top_votes = votes[0]
450
  result = False
451
  if top_node != myself:
452
    logging.critical("It seems we are not the master (top-voted node"
453
                     " is %s with %d out of %d votes)", top_node, top_votes,
454
                     all_votes)
455
  elif top_votes < all_votes - top_votes:
456
    logging.critical("It seems we are not the master (%d votes for,"
457
                     " %d votes against)", top_votes, all_votes - top_votes)
458
  else:
459
    result = True
460

    
461
  return result
462

    
463

    
464
def main():
465
  """Main function"""
466

    
467
  options, args = ParseOptions()
468
  utils.debug = options.debug
469
  utils.no_fork = True
470

    
471
  if options.fork:
472
    utils.CloseFDs()
473

    
474
  rpc.Init()
475
  try:
476
    ssconf.CheckMaster(options.debug)
477

    
478
    # we believe we are the master, let's ask the other nodes...
479
    if options.no_voting and not options.yes_do_it:
480
      sys.stdout.write("The 'no voting' option has been selected.\n")
481
      sys.stdout.write("This is dangerous, please confirm by"
482
                       " typing uppercase 'yes': ")
483
      sys.stdout.flush()
484
      confirmation = sys.stdin.readline().strip()
485
      if confirmation != "YES":
486
        print "Aborting."
487
        return
488
    elif not options.no_voting:
489
      if not CheckAgreement():
490
        return
491

    
492
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
493
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
494
           ]
495
    utils.EnsureDirs(dirs)
496

    
497
    # This is safe to do as the pid file guarantees against
498
    # concurrent execution.
499
    utils.RemoveFile(constants.MASTER_SOCKET)
500

    
501
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
502
  finally:
503
    rpc.Shutdown()
504

    
505
  # become a daemon
506
  if options.fork:
507
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
508

    
509
  utils.WritePidFile(constants.MASTERD_PID)
510
  try:
511
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
512
                       stderr_logging=not options.fork, multithreaded=True)
513

    
514
    logging.info("Ganeti master daemon startup")
515

    
516
    rpc.Init()
517
    try:
518
      # activate ip
519
      master_node = ssconf.SimpleConfigReader().GetMasterNode()
520
      if not rpc.RpcRunner.call_node_start_master(master_node, False):
521
        logging.error("Can't activate master IP address")
522

    
523
      master.setup_queue()
524
      try:
525
        master.serve_forever()
526
      finally:
527
        master.server_cleanup()
528
    finally:
529
      rpc.Shutdown()
530
  finally:
531
    utils.RemovePidFile(constants.MASTERD_PID)
532
    utils.RemoveFile(constants.MASTER_SOCKET)
533

    
534

    
535
if __name__ == "__main__":
536
  main()