Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 3ccafd0e

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  def serve_forever(self):
113
    """Handle one request at a time until told to quit."""
114
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115
    try:
116
      while not sighandler.called:
117
        self.handle_request()
118
    finally:
119
      sighandler.Reset()
120

    
121
  def server_cleanup(self):
122
    """Cleanup the server.
123

    
124
    This involves shutting down the processor threads and the master
125
    socket.
126

    
127
    """
128
    try:
129
      self.server_close()
130
    finally:
131
      if self.request_workers:
132
        self.request_workers.TerminateWorkers()
133
      if self.context:
134
        self.context.jobqueue.Shutdown()
135

    
136

    
137
class ClientRqHandler(SocketServer.BaseRequestHandler):
138
  """Client handler"""
139
  EOM = '\3'
140
  READ_SIZE = 4096
141

    
142
  def setup(self):
143
    self._buffer = ""
144
    self._msgs = collections.deque()
145
    self._ops = ClientOps(self.server)
146

    
147
  def handle(self):
148
    while True:
149
      msg = self.read_message()
150
      if msg is None:
151
        logging.info("client closed connection")
152
        break
153

    
154
      request = simplejson.loads(msg)
155
      logging.debug("request: %s", request)
156
      if not isinstance(request, dict):
157
        logging.error("wrong request received: %s", msg)
158
        break
159

    
160
      method = request.get(luxi.KEY_METHOD, None)
161
      args = request.get(luxi.KEY_ARGS, None)
162
      if method is None or args is None:
163
        logging.error("no method or args in request")
164
        break
165

    
166
      success = False
167
      try:
168
        result = self._ops.handle_request(method, args)
169
        success = True
170
      except errors.GenericError, err:
171
        success = False
172
        result = (err.__class__.__name__, err.args)
173
      except:
174
        logging.error("Unexpected exception", exc_info=True)
175
        err = sys.exc_info()
176
        result = "Caught exception: %s" % str(err[1])
177

    
178
      response = {
179
        luxi.KEY_SUCCESS: success,
180
        luxi.KEY_RESULT: result,
181
        }
182
      logging.debug("response: %s", response)
183
      self.send_message(simplejson.dumps(response))
184

    
185
  def read_message(self):
186
    while not self._msgs:
187
      data = self.request.recv(self.READ_SIZE)
188
      if not data:
189
        return None
190
      new_msgs = (self._buffer + data).split(self.EOM)
191
      self._buffer = new_msgs.pop()
192
      self._msgs.extend(new_msgs)
193
    return self._msgs.popleft()
194

    
195
  def send_message(self, msg):
196
    #print "sending", msg
197
    self.request.sendall(msg + self.EOM)
198

    
199

    
200
class ClientOps:
201
  """Class holding high-level client operations."""
202
  def __init__(self, server):
203
    self.server = server
204

    
205
  def handle_request(self, method, args):
206
    queue = self.server.context.jobqueue
207

    
208
    # TODO: Parameter validation
209

    
210
    if method == luxi.REQ_SUBMIT_JOB:
211
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
212
      return queue.SubmitJob(ops)
213

    
214
    elif method == luxi.REQ_CANCEL_JOB:
215
      job_id = args
216
      return queue.CancelJob(job_id)
217

    
218
    elif method == luxi.REQ_ARCHIVE_JOB:
219
      job_id = args
220
      return queue.ArchiveJob(job_id)
221

    
222
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
223
      age = args
224
      return queue.AutoArchiveJobs(age)
225

    
226
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
227
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
228
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
229
                                     prev_log_serial, timeout)
230

    
231
    elif method == luxi.REQ_QUERY_JOBS:
232
      (job_ids, fields) = args
233
      return queue.QueryJobs(job_ids, fields)
234

    
235
    elif method == luxi.REQ_QUERY_INSTANCES:
236
      (names, fields) = args
237
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
238
      return self._Query(op)
239

    
240
    elif method == luxi.REQ_QUERY_NODES:
241
      (names, fields) = args
242
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
243
      return self._Query(op)
244

    
245
    elif method == luxi.REQ_QUERY_EXPORTS:
246
      nodes = args
247
      op = opcodes.OpQueryExports(nodes=nodes)
248
      return self._Query(op)
249

    
250
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
251
      fields = args
252
      op = opcodes.OpQueryConfigValues(output_fields=fields)
253
      return self._Query(op)
254

    
255
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
256
      drain_flag = args
257
      return queue.SetDrainFlag(drain_flag)
258

    
259
    else:
260
      raise ValueError("Invalid operation")
261

    
262
  def _DummyLog(self, *args):
263
    pass
264

    
265
  def _Query(self, op):
266
    """Runs the specified opcode and returns the result.
267

    
268
    """
269
    proc = mcpu.Processor(self.server.context)
270
    # TODO: Where should log messages go?
271
    return proc.ExecOpCode(op, self._DummyLog, None)
272

    
273

    
274
class GanetiContext(object):
275
  """Context common to all ganeti threads.
276

    
277
  This class creates and holds common objects shared by all threads.
278

    
279
  """
280
  _instance = None
281

    
282
  def __init__(self):
283
    """Constructs a new GanetiContext object.
284

    
285
    There should be only a GanetiContext object at any time, so this
286
    function raises an error if this is not the case.
287

    
288
    """
289
    assert self.__class__._instance is None, "double GanetiContext instance"
290

    
291
    # Create global configuration object
292
    self.cfg = config.ConfigWriter()
293

    
294
    # Locking manager
295
    self.glm = locking.GanetiLockManager(
296
                self.cfg.GetNodeList(),
297
                self.cfg.GetInstanceList())
298

    
299
    # Job queue
300
    self.jobqueue = jqueue.JobQueue(self)
301

    
302
    # setting this also locks the class against attribute modifications
303
    self.__class__._instance = self
304

    
305
  def __setattr__(self, name, value):
306
    """Setting GanetiContext attributes is forbidden after initialization.
307

    
308
    """
309
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
310
    object.__setattr__(self, name, value)
311

    
312
  def AddNode(self, node):
313
    """Adds a node to the configuration and lock manager.
314

    
315
    """
316
    # Add it to the configuration
317
    self.cfg.AddNode(node)
318

    
319
    # If preseeding fails it'll not be added
320
    self.jobqueue.AddNode(node.name)
321

    
322
    # Add the new node to the Ganeti Lock Manager
323
    self.glm.add(locking.LEVEL_NODE, node.name)
324

    
325
  def ReaddNode(self, node):
326
    """Updates a node that's already in the configuration
327

    
328
    """
329
    # Synchronize the queue again
330
    self.jobqueue.AddNode(node.name)
331

    
332
  def RemoveNode(self, name):
333
    """Removes a node from the configuration and lock manager.
334

    
335
    """
336
    # Remove node from configuration
337
    self.cfg.RemoveNode(name)
338

    
339
    # Notify job queue
340
    self.jobqueue.RemoveNode(name)
341

    
342
    # Remove the node from the Ganeti Lock Manager
343
    self.glm.remove(locking.LEVEL_NODE, name)
344

    
345

    
346
def ParseOptions():
347
  """Parse the command line options.
348

    
349
  Returns:
350
    (options, args) as from OptionParser.parse_args()
351

    
352
  """
353
  parser = OptionParser(description="Ganeti master daemon",
354
                        usage="%prog [-f] [-d]",
355
                        version="%%prog (ganeti) %s" %
356
                        constants.RELEASE_VERSION)
357

    
358
  parser.add_option("-f", "--foreground", dest="fork",
359
                    help="Don't detach from the current terminal",
360
                    default=True, action="store_false")
361
  parser.add_option("-d", "--debug", dest="debug",
362
                    help="Enable some debug messages",
363
                    default=False, action="store_true")
364
  options, args = parser.parse_args()
365
  return options, args
366

    
367

    
368
def CheckAgreement():
369
  """Check the agreement on who is the master.
370

    
371
  The function uses a very simple algorithm: we must get more positive
372
  than negative answers. Since in most of the cases we are the master,
373
  we'll use our own config file for getting the node list. In the
374
  future we could collect the current node list from our (possibly
375
  obsolete) known nodes.
376

    
377
  """
378
  myself = utils.HostInfo().name
379
  #temp instantiation of a config writer, used only to get the node list
380
  cfg = config.ConfigWriter()
381
  node_list = cfg.GetNodeList()
382
  del cfg
383
  try:
384
    node_list.remove(myself)
385
  except KeyError:
386
    pass
387
  if not node_list:
388
    # either single node cluster, or a misconfiguration, but I won't
389
    # break any other node, so I can proceed
390
    return True
391
  results = rpc.RpcRunner.call_master_info(node_list)
392
  if not isinstance(results, dict):
393
    # this should not happen (unless internal error in rpc)
394
    logging.critical("Can't complete rpc call, aborting master startup")
395
    return False
396
  positive = negative = 0
397
  other_masters = {}
398
  for node in results:
399
    if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
400
      logging.warning("Can't contact node %s", node)
401
      continue
402
    master_node = results[node][2]
403
    if master_node == myself:
404
      positive += 1
405
    else:
406
      negative += 1
407
      if not master_node in other_masters:
408
        other_masters[master_node] = 0
409
      other_masters[master_node] += 1
410
  if positive <= negative:
411
    # bad!
412
    logging.critical("It seems we are not the master (%d votes for,"
413
                     " %d votes against)", positive, negative)
414
    if len(other_masters) > 1:
415
      logging.critical("The other nodes do not agree on a single master")
416
    elif other_masters:
417
      # TODO: resync my files from the master
418
      logging.critical("It seems the real master is %s",
419
                       other_masters.keys()[0])
420
    else:
421
      logging.critical("Can't contact any node for data, aborting startup")
422
    return False
423
  return True
424

    
425

    
426
def main():
427
  """Main function"""
428

    
429
  options, args = ParseOptions()
430
  utils.debug = options.debug
431
  utils.no_fork = True
432

    
433
  ssconf.CheckMaster(options.debug)
434

    
435
  # we believe we are the master, let's ask the other nodes...
436
  if not CheckAgreement():
437
    return
438

    
439
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
440

    
441
  # become a daemon
442
  if options.fork:
443
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
444
                    noclose_fds=[master.fileno()])
445

    
446
  utils.WritePidFile(constants.MASTERD_PID)
447

    
448
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
449
                      stderr_logging=not options.fork)
450

    
451
  logging.info("ganeti master daemon startup")
452

    
453
  # activate ip
454
  master_node = ssconf.SimpleConfigReader().GetMasterNode()
455
  if not rpc.RpcRunner.call_node_start_master(master_node, False):
456
    logging.error("Can't activate master IP address")
457

    
458
  master.setup_queue()
459
  try:
460
    master.serve_forever()
461
  finally:
462
    master.server_cleanup()
463
    utils.RemovePidFile(constants.MASTERD_PID)
464

    
465

    
466
if __name__ == "__main__":
467
  main()