Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 227647ac

History | View | Annotate | Download (13.5 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import os
31
import errno
32
import sys
33
import SocketServer
34
import time
35
import collections
36
import Queue
37
import random
38
import signal
39
import simplejson
40
import logging
41

    
42
from cStringIO import StringIO
43
from optparse import OptionParser
44

    
45
from ganeti import config
46
from ganeti import constants
47
from ganeti import mcpu
48
from ganeti import opcodes
49
from ganeti import jqueue
50
from ganeti import locking
51
from ganeti import luxi
52
from ganeti import utils
53
from ganeti import errors
54
from ganeti import ssconf
55
from ganeti import workerpool
56
from ganeti import rpc
57
from ganeti import bootstrap
58

    
59

    
60
CLIENT_REQUEST_WORKERS = 16
61

    
62
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
63
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
64

    
65

    
66
class ClientRequestWorker(workerpool.BaseWorker):
67
  def RunTask(self, server, request, client_address):
68
    """Process the request.
69

    
70
    This is copied from the code in ThreadingMixIn.
71

    
72
    """
73
    try:
74
      server.finish_request(request, client_address)
75
      server.close_request(request)
76
    except:
77
      server.handle_error(request, client_address)
78
      server.close_request(request)
79

    
80

    
81
class IOServer(SocketServer.UnixStreamServer):
82
  """IO thread class.
83

    
84
  This class takes care of initializing the other threads, setting
85
  signal handlers (which are processed only in this thread), and doing
86
  cleanup at shutdown.
87

    
88
  """
89
  def __init__(self, address, rqhandler):
90
    """IOServer constructor
91

    
92
    Args:
93
      address: the address to bind this IOServer to
94
      rqhandler: RequestHandler type object
95

    
96
    """
97
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
98

    
99
    # We'll only start threads once we've forked.
100
    self.context = None
101
    self.request_workers = None
102

    
103
  def setup_queue(self):
104
    self.context = GanetiContext()
105
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
106
                                                 ClientRequestWorker)
107

    
108
  def process_request(self, request, client_address):
109
    """Add task to workerpool to process request.
110

    
111
    """
112
    self.request_workers.AddTask(self, request, client_address)
113

    
114
  def serve_forever(self):
115
    """Handle one request at a time until told to quit."""
116
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
117
    try:
118
      while not sighandler.called:
119
        self.handle_request()
120
    finally:
121
      sighandler.Reset()
122

    
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

    
126
    This involves shutting down the processor threads and the master
127
    socket.
128

    
129
    """
130
    try:
131
      self.server_close()
132
    finally:
133
      if self.request_workers:
134
        self.request_workers.TerminateWorkers()
135
      if self.context:
136
        self.context.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.info("client closed connection")
154
        break
155

    
156
      request = simplejson.loads(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except errors.GenericError, err:
173
        success = False
174
        result = (err.__class__.__name__, err.args)
175
      except:
176
        logging.error("Unexpected exception", exc_info=True)
177
        err = sys.exc_info()
178
        result = "Caught exception: %s" % str(err[1])
179

    
180
      response = {
181
        luxi.KEY_SUCCESS: success,
182
        luxi.KEY_RESULT: result,
183
        }
184
      logging.debug("response: %s", response)
185
      self.send_message(simplejson.dumps(response))
186

    
187
  def read_message(self):
188
    while not self._msgs:
189
      data = self.request.recv(self.READ_SIZE)
190
      if not data:
191
        return None
192
      new_msgs = (self._buffer + data).split(self.EOM)
193
      self._buffer = new_msgs.pop()
194
      self._msgs.extend(new_msgs)
195
    return self._msgs.popleft()
196

    
197
  def send_message(self, msg):
198
    #print "sending", msg
199
    self.request.sendall(msg + self.EOM)
200

    
201

    
202
class ClientOps:
203
  """Class holding high-level client operations."""
204
  def __init__(self, server):
205
    self.server = server
206

    
207
  def handle_request(self, method, args):
208
    queue = self.server.context.jobqueue
209

    
210
    # TODO: Parameter validation
211

    
212
    if method == luxi.REQ_SUBMIT_JOB:
213
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214
      return queue.SubmitJob(ops)
215

    
216
    elif method == luxi.REQ_CANCEL_JOB:
217
      job_id = args
218
      return queue.CancelJob(job_id)
219

    
220
    elif method == luxi.REQ_ARCHIVE_JOB:
221
      job_id = args
222
      return queue.ArchiveJob(job_id)
223

    
224
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
225
      age = args
226
      return queue.AutoArchiveJobs(age)
227

    
228
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
229
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
230
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
231
                                     prev_log_serial, timeout)
232

    
233
    elif method == luxi.REQ_QUERY_JOBS:
234
      (job_ids, fields) = args
235
      return queue.QueryJobs(job_ids, fields)
236

    
237
    elif method == luxi.REQ_QUERY_INSTANCES:
238
      (names, fields) = args
239
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
240
      return self._Query(op)
241

    
242
    elif method == luxi.REQ_QUERY_NODES:
243
      (names, fields) = args
244
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
245
      return self._Query(op)
246

    
247
    elif method == luxi.REQ_QUERY_EXPORTS:
248
      nodes = args
249
      op = opcodes.OpQueryExports(nodes=nodes)
250
      return self._Query(op)
251

    
252
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
253
      fields = args
254
      op = opcodes.OpQueryConfigValues(output_fields=fields)
255
      return self._Query(op)
256

    
257
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
258
      drain_flag = args
259
      return queue.SetDrainFlag(drain_flag)
260

    
261
    else:
262
      raise ValueError("Invalid operation")
263

    
264
  def _DummyLog(self, *args):
265
    pass
266

    
267
  def _Query(self, op):
268
    """Runs the specified opcode and returns the result.
269

    
270
    """
271
    proc = mcpu.Processor(self.server.context)
272
    # TODO: Where should log messages go?
273
    return proc.ExecOpCode(op, self._DummyLog, None)
274

    
275

    
276
class GanetiContext(object):
277
  """Context common to all ganeti threads.
278

    
279
  This class creates and holds common objects shared by all threads.
280

    
281
  """
282
  _instance = None
283

    
284
  def __init__(self):
285
    """Constructs a new GanetiContext object.
286

    
287
    There should be only a GanetiContext object at any time, so this
288
    function raises an error if this is not the case.
289

    
290
    """
291
    assert self.__class__._instance is None, "double GanetiContext instance"
292

    
293
    # Create global configuration object
294
    self.cfg = config.ConfigWriter()
295

    
296
    # Locking manager
297
    self.glm = locking.GanetiLockManager(
298
                self.cfg.GetNodeList(),
299
                self.cfg.GetInstanceList())
300

    
301
    # Job queue
302
    self.jobqueue = jqueue.JobQueue(self)
303

    
304
    # setting this also locks the class against attribute modifications
305
    self.__class__._instance = self
306

    
307
  def __setattr__(self, name, value):
308
    """Setting GanetiContext attributes is forbidden after initialization.
309

    
310
    """
311
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
312
    object.__setattr__(self, name, value)
313

    
314
  def AddNode(self, node):
315
    """Adds a node to the configuration and lock manager.
316

    
317
    """
318
    # Add it to the configuration
319
    self.cfg.AddNode(node)
320

    
321
    # If preseeding fails it'll not be added
322
    self.jobqueue.AddNode(node)
323

    
324
    # Add the new node to the Ganeti Lock Manager
325
    self.glm.add(locking.LEVEL_NODE, node.name)
326

    
327
  def ReaddNode(self, node):
328
    """Updates a node that's already in the configuration
329

    
330
    """
331
    # Synchronize the queue again
332
    self.jobqueue.AddNode(node)
333

    
334
  def RemoveNode(self, name):
335
    """Removes a node from the configuration and lock manager.
336

    
337
    """
338
    # Remove node from configuration
339
    self.cfg.RemoveNode(name)
340

    
341
    # Notify job queue
342
    self.jobqueue.RemoveNode(name)
343

    
344
    # Remove the node from the Ganeti Lock Manager
345
    self.glm.remove(locking.LEVEL_NODE, name)
346

    
347

    
348
def ParseOptions():
349
  """Parse the command line options.
350

    
351
  Returns:
352
    (options, args) as from OptionParser.parse_args()
353

    
354
  """
355
  parser = OptionParser(description="Ganeti master daemon",
356
                        usage="%prog [-f] [-d]",
357
                        version="%%prog (ganeti) %s" %
358
                        constants.RELEASE_VERSION)
359

    
360
  parser.add_option("-f", "--foreground", dest="fork",
361
                    help="Don't detach from the current terminal",
362
                    default=True, action="store_false")
363
  parser.add_option("-d", "--debug", dest="debug",
364
                    help="Enable some debug messages",
365
                    default=False, action="store_true")
366
  options, args = parser.parse_args()
367
  return options, args
368

    
369

    
370
def CheckAgreement():
371
  """Check the agreement on who is the master.
372

    
373
  The function uses a very simple algorithm: we must get more positive
374
  than negative answers. Since in most of the cases we are the master,
375
  we'll use our own config file for getting the node list. In the
376
  future we could collect the current node list from our (possibly
377
  obsolete) known nodes.
378

    
379
  In order to account for cold-start of all nodes, we retry for up to
380
  a minute until we get a real answer as the top-voted one. If the
381
  nodes are more out-of-sync, for now manual startup of the master
382
  should be attempted.
383

    
384
  Note that for a even number of nodes cluster, we need at least half
385
  of the nodes (beside ourselves) to vote for us. This creates a
386
  problem on two-node clusters, since in this case we require the
387
  other node to be up too to confirm our status.
388

    
389
  """
390
  myself = utils.HostInfo().name
391
  #temp instantiation of a config writer, used only to get the node list
392
  cfg = config.ConfigWriter()
393
  node_list = cfg.GetNodeList()
394
  del cfg
395
  retries = 6
396
  while retries > 0:
397
    votes = bootstrap.GatherMasterVotes(node_list)
398
    if not votes:
399
      # empty node list, this is a one node cluster
400
      return True
401
    if votes[0][0] is None:
402
      retries -= 1
403
      time.sleep(10)
404
      continue
405
    break
406
  if retries == 0:
407
      logging.critical("Cluster inconsistent, most of the nodes didn't answer"
408
                       " after multiple retries. Aborting startup")
409
      return False
410
  # here a real node is at the top of the list
411
  all_votes = sum(item[1] for item in votes)
412
  top_node, top_votes = votes[0]
413
  result = False
414
  if top_node != myself:
415
    logging.critical("It seems we are not the master (top-voted node"
416
                     " is %s)", top_node)
417
  elif top_votes < all_votes - top_votes:
418
    logging.critical("It seems we are not the master (%d votes for,"
419
                     " %d votes against)", top_votes, all_votes - top_votes)
420
  else:
421
    result = True
422

    
423
  return result
424

    
425

    
426
def main():
427
  """Main function"""
428

    
429
  options, args = ParseOptions()
430
  utils.debug = options.debug
431
  utils.no_fork = True
432

    
433
  rpc.Init()
434
  try:
435
    ssconf.CheckMaster(options.debug)
436

    
437
    # we believe we are the master, let's ask the other nodes...
438
    if not CheckAgreement():
439
      return
440

    
441
    try:
442
      os.mkdir(constants.SOCKET_DIR, constants.SOCKET_DIR_MODE)
443
    except EnvironmentError, err:
444
      if err.errno != errno.EEXIST:
445
        raise errors.GenericError("Cannot create socket directory"
446
          " '%s': %s" % (constants.SOCKET_DIR, err))
447

    
448
    # This is safe to do as the pid file guarantees against
449
    # concurrent execution.
450
    utils.RemoveFile(constants.MASTER_SOCKET)
451

    
452
    master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
453
  finally:
454
    rpc.Shutdown()
455

    
456
  # become a daemon
457
  if options.fork:
458
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
459
                    noclose_fds=[master.fileno()])
460

    
461
  utils.WritePidFile(constants.MASTERD_PID)
462
  try:
463
    utils.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
464
                       stderr_logging=not options.fork)
465

    
466
    logging.info("Ganeti master daemon startup")
467

    
468
    rpc.Init()
469
    try:
470
      # activate ip
471
      master_node = ssconf.SimpleConfigReader().GetMasterNode()
472
      if not rpc.RpcRunner.call_node_start_master(master_node, False):
473
        logging.error("Can't activate master IP address")
474

    
475
      master.setup_queue()
476
      try:
477
        master.serve_forever()
478
      finally:
479
        master.server_cleanup()
480
    finally:
481
      rpc.Shutdown()
482
  finally:
483
    utils.RemovePidFile(constants.MASTERD_PID)
484
    utils.RemoveFile(constants.MASTER_SOCKET)
485

    
486

    
487
if __name__ == "__main__":
488
  main()