Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 9dae41ad

History | View | Annotate | Download (14.4 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import simplejson
40
import logging
41

    
42
from cStringIO import StringIO
43
from optparse import OptionParser
44

    
45
from ganeti import config
46
from ganeti import constants
47
from ganeti import mcpu
48
from ganeti import opcodes
49
from ganeti import jqueue
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import utils
53
from ganeti import errors
54
from ganeti import ssconf
55
from ganeti import workerpool
56
from ganeti import rpc
57
from ganeti import bootstrap
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    @param address: the address to bind this IOServer to
93
    @param rqhandler: RequestHandler type object
94

    
95
    """
96
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
97

    
98
    # We'll only start threads once we've forked.
99
    self.context = None
100
    self.request_workers = None
101

    
102
  def setup_queue(self):
103
    self.context = GanetiContext()
104
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
105
                                                 ClientRequestWorker)
106

    
107
  def process_request(self, request, client_address):
108
    """Add task to workerpool to process request.
109

    
110
    """
111
    self.request_workers.AddTask(self, request, client_address)
112

    
113
  def serve_forever(self):
114
    """Handle one request at a time until told to quit."""
115
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
116
    try:
117
      while not sighandler.called:
118
        self.handle_request()
119
    finally:
120
      sighandler.Reset()
121

    
122
  def server_cleanup(self):
123
    """Cleanup the server.
124

    
125
    This involves shutting down the processor threads and the master
126
    socket.
127

    
128
    """
129
    try:
130
      self.server_close()
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.debug("client closed connection")
153
        break
154

    
155
      request = simplejson.loads(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except errors.GenericError, err:
172
        success = False
173
        result = (err.__class__.__name__, err.args)
174
      except:
175
        logging.error("Unexpected exception", exc_info=True)
176
        err = sys.exc_info()
177
        result = "Caught exception: %s" % str(err[1])
178

    
179
      response = {
180
        luxi.KEY_SUCCESS: success,
181
        luxi.KEY_RESULT: result,
182
        }
183
      logging.debug("response: %s", response)
184
      self.send_message(simplejson.dumps(response))
185

    
186
  def read_message(self):
187
    while not self._msgs:
188
      data = self.request.recv(self.READ_SIZE)
189
      if not data:
190
        return None
191
      new_msgs = (self._buffer + data).split(self.EOM)
192
      self._buffer = new_msgs.pop()
193
      self._msgs.extend(new_msgs)
194
    return self._msgs.popleft()
195

    
196
  def send_message(self, msg):
197
    #print "sending", msg
198
    self.request.sendall(msg + self.EOM)
199

    
200

    
201
class ClientOps:
202
  """Class holding high-level client operations."""
203
  def __init__(self, server):
204
    self.server = server
205

    
206
  def handle_request(self, method, args):
207
    queue = self.server.context.jobqueue
208

    
209
    # TODO: Parameter validation
210

    
211
    if method == luxi.REQ_SUBMIT_JOB:
212
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
213
      return queue.SubmitJob(ops)
214

    
215
    elif method == luxi.REQ_CANCEL_JOB:
216
      job_id = args
217
      return queue.CancelJob(job_id)
218

    
219
    elif method == luxi.REQ_ARCHIVE_JOB:
220
      job_id = args
221
      return queue.ArchiveJob(job_id)
222

    
223
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
224
      (age, timeout) = args
225
      return queue.AutoArchiveJobs(age, timeout)
226

    
227
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
228
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
229
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
230
                                     prev_log_serial, timeout)
231

    
232
    elif method == luxi.REQ_QUERY_JOBS:
233
      (job_ids, fields) = args
234
      return queue.QueryJobs(job_ids, fields)
235

    
236
    elif method == luxi.REQ_QUERY_INSTANCES:
237
      (names, fields, use_locking) = args
238
      op = opcodes.OpQueryInstances(names=names, output_fields=fields,
239
                                    use_locking=use_locking)
240
      return self._Query(op)
241

    
242
    elif method == luxi.REQ_QUERY_NODES:
243
      (names, fields, use_locking) = args
244
      op = opcodes.OpQueryNodes(names=names, output_fields=fields,
245
                                use_locking=use_locking)
246
      return self._Query(op)
247

    
248
    elif method == luxi.REQ_QUERY_EXPORTS:
249
      nodes, use_locking = args
250
      op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
251
      return self._Query(op)
252

    
253
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
254
      fields = args
255
      op = opcodes.OpQueryConfigValues(output_fields=fields)
256
      return self._Query(op)
257

    
258
    elif method == luxi.REQ_QUERY_CLUSTER_INFO:
259
      op = opcodes.OpQueryClusterInfo()
260
      return self._Query(op)
261

    
262
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
263
      drain_flag = args
264
      return queue.SetDrainFlag(drain_flag)
265

    
266
    else:
267
      raise ValueError("Invalid operation")
268

    
269
  def _DummyLog(self, *args):
270
    pass
271

    
272
  def _Query(self, op):
273
    """Runs the specified opcode and returns the result.
274

    
275
    """
276
    proc = mcpu.Processor(self.server.context)
277
    # TODO: Where should log messages go?
278
    return proc.ExecOpCode(op, self._DummyLog, None)
279

    
280

    
281
class GanetiContext(object):
282
  """Context common to all ganeti threads.
283

    
284
  This class creates and holds common objects shared by all threads.
285

    
286
  """
287
  _instance = None
288

    
289
  def __init__(self):
290
    """Constructs a new GanetiContext object.
291

    
292
    There should be only a GanetiContext object at any time, so this
293
    function raises an error if this is not the case.
294

    
295
    """
296
    assert self.__class__._instance is None, "double GanetiContext instance"
297

    
298
    # Create global configuration object
299
    self.cfg = config.ConfigWriter()
300

    
301
    # Locking manager
302
    self.glm = locking.GanetiLockManager(
303
                self.cfg.GetNodeList(),
304
                self.cfg.GetInstanceList())
305

    
306
    # Job queue
307
    self.jobqueue = jqueue.JobQueue(self)
308

    
309
    # setting this also locks the class against attribute modifications
310
    self.__class__._instance = self
311

    
312
  def __setattr__(self, name, value):
313
    """Setting GanetiContext attributes is forbidden after initialization.
314

    
315
    """
316
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
317
    object.__setattr__(self, name, value)
318

    
319
  def AddNode(self, node):
320
    """Adds a node to the configuration and lock manager.
321

    
322
    """
323
    # Add it to the configuration
324
    self.cfg.AddNode(node)
325

    
326
    # If preseeding fails it'll not be added
327
    self.jobqueue.AddNode(node)
328

    
329
    # Add the new node to the Ganeti Lock Manager
330
    self.glm.add(locking.LEVEL_NODE, node.name)
331

    
332
  def ReaddNode(self, node):
333
    """Updates a node that's already in the configuration
334

    
335
    """
336
    # Synchronize the queue again
337
    self.jobqueue.AddNode(node)
338

    
339
  def RemoveNode(self, name):
340
    """Removes a node from the configuration and lock manager.
341

    
342
    """
343
    # Remove node from configuration
344
    self.cfg.RemoveNode(name)
345

    
346
    # Notify job queue
347
    self.jobqueue.RemoveNode(name)
348

    
349
    # Remove the node from the Ganeti Lock Manager
350
    self.glm.remove(locking.LEVEL_NODE, name)
351

    
352

    
353
def ParseOptions():
354
  """Parse the command line options.
355

    
356
  @return: (options, args) as from OptionParser.parse_args()
357

    
358
  """
359
  parser = OptionParser(description="Ganeti master daemon",
360
                        usage="%prog [-f] [-d]",
361
                        version="%%prog (ganeti) %s" %
362
                        constants.RELEASE_VERSION)
363

    
364
  parser.add_option("-f", "--foreground", dest="fork",
365
                    help="Don't detach from the current terminal",
366
                    default=True, action="store_false")
367
  parser.add_option("-d", "--debug", dest="debug",
368
                    help="Enable some debug messages",
369
                    default=False, action="store_true")
370
  parser.add_option("--no-voting", dest="no_voting",
371
                    help="Do not check that the nodes agree on this node"
372
                    " being the master and start the daemon unconditionally",
373
                    default=False, action="store_true")
374
  options, args = parser.parse_args()
375
  return options, args
376

    
377

    
378
def CheckAgreement():
379
  """Check the agreement on who is the master.
380

    
381
  The function uses a very simple algorithm: we must get more positive
382
  than negative answers. Since in most of the cases we are the master,
383
  we'll use our own config file for getting the node list. In the
384
  future we could collect the current node list from our (possibly
385
  obsolete) known nodes.
386

    
387
  In order to account for cold-start of all nodes, we retry for up to
388
  a minute until we get a real answer as the top-voted one. If the
389
  nodes are more out-of-sync, for now manual startup of the master
390
  should be attempted.
391

    
392
  Note that for a even number of nodes cluster, we need at least half
393
  of the nodes (beside ourselves) to vote for us. This creates a
394
  problem on two-node clusters, since in this case we require the
395
  other node to be up too to confirm our status.
396

    
397
  """
398
  myself = utils.HostInfo().name
399
  #temp instantiation of a config writer, used only to get the node list
400
  cfg = config.ConfigWriter()
401
  node_list = cfg.GetNodeList()
402
  del cfg
403
  retries = 6
404
  while retries > 0:
405
    votes = bootstrap.GatherMasterVotes(node_list)
406
    if not votes:
407
      # empty node list, this is a one node cluster
408
      return True
409
    if votes[0][0] is None:
410
      retries -= 1
411
      time.sleep(10)
412
      continue
413
    break
414
  if retries == 0:
415
    logging.critical("Cluster inconsistent, most of the nodes didn't answer"
416
                     " after multiple retries. Aborting startup")
417
    return False
418
  # here a real node is at the top of the list
419
  all_votes = sum(item[1] for item in votes)
420
  top_node, top_votes = votes[0]
421
  result = False
422
  if top_node != myself:
423
    logging.critical("It seems we are not the master (top-voted node"
424
                     " is %s with %d out of %d votes)", top_node, top_votes,
425
                     all_votes)
426
  elif top_votes < all_votes - top_votes:
427
    logging.critical("It seems we are not the master (%d votes for,"
428
                     " %d votes against)", top_votes, all_votes - top_votes)
429
  else:
430
    result = True
431

    
432
  return result
433

    
434

    
435
def main():
436
  """Main function"""
437

    
438
  options, args = ParseOptions()
439
  utils.debug = options.debug
440
  utils.no_fork = True
441

    
442
  if options.fork:
443
    utils.CloseFDs()
444

    
445
  rpc.Init()
446
  try:
447
    ssconf.CheckMaster(options.debug)
448

    
449
    # we believe we are the master, let's ask the other nodes...
450
    if options.no_voting:
451
      sys.stdout.write("The 'no voting' option has been selected.\n")
452
      sys.stdout.write("This is dangerous, please confirm by"
453
                       " typing uppercase 'yes': ")
454
      sys.stdout.flush()
455
      confirmation = sys.stdin.readline().strip()
456
      if confirmation != "YES":
457
        print "Aborting."
458
        return
459
    else:
460
      if not CheckAgreement():
461
        return
462

    
463
    dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE),
464
            (constants.SOCKET_DIR, constants.SOCKET_DIR_MODE),
465
           ]
466
    utils.EnsureDirs(dirs)
467

    
468
    # This is safe to do as the pid file guarantees against
469
    # concurrent execution.
470
    utils.RemoveFile(constants.MASTER_SOCKET)
471

    
472
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
473
  finally:
474
    rpc.Shutdown()
475

    
476
  # become a daemon
477
  if options.fork:
478
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON)
479

    
480
  utils.WritePidFile(constants.MASTERD_PID)
481
  try:
482
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
483
                       stderr_logging=not options.fork, multithreaded=True)
484

    
485
    logging.info("Ganeti master daemon startup")
486

    
487
    rpc.Init()
488
    try:
489
      # activate ip
490
      master_node = ssconf.SimpleConfigReader().GetMasterNode()
491
      if not rpc.RpcRunner.call_node_start_master(master_node, False):
492
        logging.error("Can't activate master IP address")
493

    
494
      master.setup_queue()
495
      try:
496
        master.serve_forever()
497
      finally:
498
        master.server_cleanup()
499
    finally:
500
      rpc.Shutdown()
501
  finally:
502
    utils.RemovePidFile(constants.MASTERD_PID)
503
    utils.RemoveFile(constants.MASTER_SOCKET)
504

    
505

    
506
if __name__ == "__main__":
507
  main()