Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 72737a7f

History | View | Annotate | Download (12.7 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  def serve_forever(self):
113
    """Handle one request at a time until told to quit."""
114
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115
    try:
116
      while not sighandler.called:
117
        self.handle_request()
118
    finally:
119
      sighandler.Reset()
120

    
121
  def server_cleanup(self):
122
    """Cleanup the server.
123

    
124
    This involves shutting down the processor threads and the master
125
    socket.
126

    
127
    """
128
    try:
129
      self.server_close()
130
    finally:
131
      if self.request_workers:
132
        self.request_workers.TerminateWorkers()
133
      if self.context:
134
        self.context.jobqueue.Shutdown()
135

    
136

    
137
class ClientRqHandler(SocketServer.BaseRequestHandler):
138
  """Client handler"""
139
  EOM = '\3'
140
  READ_SIZE = 4096
141

    
142
  def setup(self):
143
    self._buffer = ""
144
    self._msgs = collections.deque()
145
    self._ops = ClientOps(self.server)
146

    
147
  def handle(self):
148
    while True:
149
      msg = self.read_message()
150
      if msg is None:
151
        logging.info("client closed connection")
152
        break
153

    
154
      request = simplejson.loads(msg)
155
      logging.debug("request: %s", request)
156
      if not isinstance(request, dict):
157
        logging.error("wrong request received: %s", msg)
158
        break
159

    
160
      method = request.get(luxi.KEY_METHOD, None)
161
      args = request.get(luxi.KEY_ARGS, None)
162
      if method is None or args is None:
163
        logging.error("no method or args in request")
164
        break
165

    
166
      success = False
167
      try:
168
        result = self._ops.handle_request(method, args)
169
        success = True
170
      except:
171
        logging.error("Unexpected exception", exc_info=True)
172
        err = sys.exc_info()
173
        result = "Caught exception: %s" % str(err[1])
174

    
175
      response = {
176
        luxi.KEY_SUCCESS: success,
177
        luxi.KEY_RESULT: result,
178
        }
179
      logging.debug("response: %s", response)
180
      self.send_message(simplejson.dumps(response))
181

    
182
  def read_message(self):
183
    while not self._msgs:
184
      data = self.request.recv(self.READ_SIZE)
185
      if not data:
186
        return None
187
      new_msgs = (self._buffer + data).split(self.EOM)
188
      self._buffer = new_msgs.pop()
189
      self._msgs.extend(new_msgs)
190
    return self._msgs.popleft()
191

    
192
  def send_message(self, msg):
193
    #print "sending", msg
194
    self.request.sendall(msg + self.EOM)
195

    
196

    
197
class ClientOps:
198
  """Class holding high-level client operations."""
199
  def __init__(self, server):
200
    self.server = server
201

    
202
  def handle_request(self, method, args):
203
    queue = self.server.context.jobqueue
204

    
205
    # TODO: Parameter validation
206

    
207
    if method == luxi.REQ_SUBMIT_JOB:
208
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
209
      return queue.SubmitJob(ops)
210

    
211
    elif method == luxi.REQ_CANCEL_JOB:
212
      job_id = args
213
      return queue.CancelJob(job_id)
214

    
215
    elif method == luxi.REQ_ARCHIVE_JOB:
216
      job_id = args
217
      return queue.ArchiveJob(job_id)
218

    
219
    elif method == luxi.REQ_AUTOARCHIVE_JOBS:
220
      age = args
221
      return queue.AutoArchiveJobs(age)
222

    
223
    elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
224
      (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
225
      return queue.WaitForJobChanges(job_id, fields, prev_job_info,
226
                                     prev_log_serial, timeout)
227

    
228
    elif method == luxi.REQ_QUERY_JOBS:
229
      (job_ids, fields) = args
230
      return queue.QueryJobs(job_ids, fields)
231

    
232
    elif method == luxi.REQ_QUERY_INSTANCES:
233
      (names, fields) = args
234
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
235
      return self._Query(op)
236

    
237
    elif method == luxi.REQ_QUERY_NODES:
238
      (names, fields) = args
239
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
240
      return self._Query(op)
241

    
242
    elif method == luxi.REQ_QUERY_EXPORTS:
243
      nodes = args
244
      op = opcodes.OpQueryExports(nodes=nodes)
245
      return self._Query(op)
246

    
247
    elif method == luxi.REQ_QUERY_CONFIG_VALUES:
248
      fields = args
249
      op = opcodes.OpQueryConfigValues(output_fields=fields)
250
      return self._Query(op)
251

    
252
    else:
253
      raise ValueError("Invalid operation")
254

    
255
  def _DummyLog(self, *args):
256
    pass
257

    
258
  def _Query(self, op):
259
    """Runs the specified opcode and returns the result.
260

    
261
    """
262
    proc = mcpu.Processor(self.server.context)
263
    # TODO: Where should log messages go?
264
    return proc.ExecOpCode(op, self._DummyLog, None)
265

    
266

    
267
class GanetiContext(object):
268
  """Context common to all ganeti threads.
269

    
270
  This class creates and holds common objects shared by all threads.
271

    
272
  """
273
  _instance = None
274

    
275
  def __init__(self):
276
    """Constructs a new GanetiContext object.
277

    
278
    There should be only a GanetiContext object at any time, so this
279
    function raises an error if this is not the case.
280

    
281
    """
282
    assert self.__class__._instance is None, "double GanetiContext instance"
283

    
284
    # Create global configuration object
285
    self.cfg = config.ConfigWriter()
286

    
287
    # Locking manager
288
    self.glm = locking.GanetiLockManager(
289
                self.cfg.GetNodeList(),
290
                self.cfg.GetInstanceList())
291

    
292
    # Job queue
293
    self.jobqueue = jqueue.JobQueue(self)
294

    
295
    # setting this also locks the class against attribute modifications
296
    self.__class__._instance = self
297

    
298
  def __setattr__(self, name, value):
299
    """Setting GanetiContext attributes is forbidden after initialization.
300

    
301
    """
302
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
303
    object.__setattr__(self, name, value)
304

    
305
  def AddNode(self, node):
306
    """Adds a node to the configuration and lock manager.
307

    
308
    """
309
    # Add it to the configuration
310
    self.cfg.AddNode(node)
311

    
312
    # If preseeding fails it'll not be added
313
    self.jobqueue.AddNode(node.name)
314

    
315
    # Add the new node to the Ganeti Lock Manager
316
    self.glm.add(locking.LEVEL_NODE, node.name)
317

    
318
  def ReaddNode(self, node):
319
    """Updates a node that's already in the configuration
320

    
321
    """
322
    # Synchronize the queue again
323
    self.jobqueue.AddNode(node.name)
324

    
325
  def RemoveNode(self, name):
326
    """Removes a node from the configuration and lock manager.
327

    
328
    """
329
    # Remove node from configuration
330
    self.cfg.RemoveNode(name)
331

    
332
    # Notify job queue
333
    self.jobqueue.RemoveNode(name)
334

    
335
    # Remove the node from the Ganeti Lock Manager
336
    self.glm.remove(locking.LEVEL_NODE, name)
337

    
338

    
339
def ParseOptions():
340
  """Parse the command line options.
341

    
342
  Returns:
343
    (options, args) as from OptionParser.parse_args()
344

    
345
  """
346
  parser = OptionParser(description="Ganeti master daemon",
347
                        usage="%prog [-f] [-d]",
348
                        version="%%prog (ganeti) %s" %
349
                        constants.RELEASE_VERSION)
350

    
351
  parser.add_option("-f", "--foreground", dest="fork",
352
                    help="Don't detach from the current terminal",
353
                    default=True, action="store_false")
354
  parser.add_option("-d", "--debug", dest="debug",
355
                    help="Enable some debug messages",
356
                    default=False, action="store_true")
357
  options, args = parser.parse_args()
358
  return options, args
359

    
360

    
361
def CheckAgreement():
362
  """Check the agreement on who is the master.
363

    
364
  The function uses a very simple algorithm: we must get more positive
365
  than negative answers. Since in most of the cases we are the master,
366
  we'll use our own config file for getting the node list. In the
367
  future we could collect the current node list from our (possibly
368
  obsolete) known nodes.
369

    
370
  """
371
  myself = utils.HostInfo().name
372
  #temp instantiation of a config writer, used only to get the node list
373
  cfg = config.ConfigWriter()
374
  node_list = cfg.GetNodeList()
375
  del cfg
376
  try:
377
    node_list.remove(myself)
378
  except KeyError:
379
    pass
380
  if not node_list:
381
    # either single node cluster, or a misconfiguration, but I won't
382
    # break any other node, so I can proceed
383
    return True
384
  results = rpc.RpcRunner.call_master_info(node_list)
385
  if not isinstance(results, dict):
386
    # this should not happen (unless internal error in rpc)
387
    logging.critical("Can't complete rpc call, aborting master startup")
388
    return False
389
  positive = negative = 0
390
  other_masters = {}
391
  for node in results:
392
    if not isinstance(results[node], (tuple, list)) or len(results[node]) < 3:
393
      logging.warning("Can't contact node %s", node)
394
      continue
395
    master_node = results[node][2]
396
    if master_node == myself:
397
      positive += 1
398
    else:
399
      negative += 1
400
      if not master_node in other_masters:
401
        other_masters[master_node] = 0
402
      other_masters[master_node] += 1
403
  if positive <= negative:
404
    # bad!
405
    logging.critical("It seems we are not the master (%d votes for,"
406
                     " %d votes against)", positive, negative)
407
    if len(other_masters) > 1:
408
      logging.critical("The other nodes do not agree on a single master")
409
    elif other_masters:
410
      # TODO: resync my files from the master
411
      logging.critical("It seems the real master is %s",
412
                       other_masters.keys()[0])
413
    else:
414
      logging.critical("Can't contact any node for data, aborting startup")
415
    return False
416
  return True
417

    
418

    
419
def main():
420
  """Main function"""
421

    
422
  options, args = ParseOptions()
423
  utils.debug = options.debug
424
  utils.no_fork = True
425

    
426
  ssconf.CheckMaster(options.debug)
427

    
428
  # we believe we are the master, let's ask the other nodes...
429
  if not CheckAgreement():
430
    return
431

    
432
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
433

    
434
  # become a daemon
435
  if options.fork:
436
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
437
                    noclose_fds=[master.fileno()])
438

    
439
  utils.WritePidFile(constants.MASTERD_PID)
440

    
441
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
442
                      stderr_logging=not options.fork)
443

    
444
  logging.info("ganeti master daemon startup")
445

    
446
  # activate ip
447
  master_node = ssconf.SimpleConfigReader().GetMasterNode()
448
  if not rpc.RpcRunner.call_node_start_master(master_node, False):
449
    logging.error("Can't activate master IP address")
450

    
451
  master.setup_queue()
452
  try:
453
    master.serve_forever()
454
  finally:
455
    master.server_cleanup()
456
    utils.RemovePidFile(constants.MASTERD_PID)
457

    
458

    
459
if __name__ == "__main__":
460
  main()