Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 32f93223

History | View | Annotate | Download (10.1 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  def serve_forever(self):
113
    """Handle one request at a time until told to quit."""
114
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115
    try:
116
      while not sighandler.called:
117
        self.handle_request()
118
    finally:
119
      sighandler.Reset()
120

    
121
  def server_cleanup(self):
122
    """Cleanup the server.
123

    
124
    This involves shutting down the processor threads and the master
125
    socket.
126

    
127
    """
128
    try:
129
      self.server_close()
130
      utils.RemoveFile(constants.MASTER_SOCKET)
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.info("client closed connection")
153
        break
154

    
155
      request = simplejson.loads(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except:
172
        logging.error("Unexpected exception", exc_info=True)
173
        err = sys.exc_info()
174
        result = "Caught exception: %s" % str(err[1])
175

    
176
      response = {
177
        luxi.KEY_SUCCESS: success,
178
        luxi.KEY_RESULT: result,
179
        }
180
      logging.debug("response: %s", response)
181
      self.send_message(simplejson.dumps(response))
182

    
183
  def read_message(self):
184
    while not self._msgs:
185
      data = self.request.recv(self.READ_SIZE)
186
      if not data:
187
        return None
188
      new_msgs = (self._buffer + data).split(self.EOM)
189
      self._buffer = new_msgs.pop()
190
      self._msgs.extend(new_msgs)
191
    return self._msgs.popleft()
192

    
193
  def send_message(self, msg):
194
    #print "sending", msg
195
    self.request.sendall(msg + self.EOM)
196

    
197

    
198
class ClientOps:
199
  """Class holding high-level client operations."""
200
  def __init__(self, server):
201
    self.server = server
202

    
203
  def handle_request(self, method, args):
204
    queue = self.server.context.jobqueue
205

    
206
    # TODO: Parameter validation
207

    
208
    if method == luxi.REQ_SUBMIT_JOB:
209
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
210
      return queue.SubmitJob(ops)
211

    
212
    elif method == luxi.REQ_CANCEL_JOB:
213
      job_id = args
214
      return queue.CancelJob(job_id)
215

    
216
    elif method == luxi.REQ_ARCHIVE_JOB:
217
      job_id = args
218
      return queue.ArchiveJob(job_id)
219

    
220
    elif method == luxi.REQ_QUERY_JOBS:
221
      (job_ids, fields) = args
222
      return queue.QueryJobs(job_ids, fields)
223

    
224
    elif method == luxi.REQ_QUERY_INSTANCES:
225
      (names, fields) = args
226
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
227
      return self._Query(op)
228

    
229
    elif method == luxi.REQ_QUERY_NODES:
230
      (names, fields) = args
231
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
232
      return self._Query(op)
233

    
234
    elif method == luxi.REQ_QUERY_EXPORTS:
235
      nodes = args
236
      op = opcodes.OpQueryExports(nodes=nodes)
237
      return self._Query(op)
238

    
239
    else:
240
      raise ValueError("Invalid operation")
241

    
242
  def _DummyLog(self, *args):
243
    pass
244

    
245
  def _Query(self, op):
246
    """Runs the specified opcode and returns the result.
247

    
248
    """
249
    proc = mcpu.Processor(self.server.context)
250
    # TODO: Where should log messages go?
251
    return proc.ExecOpCode(op, self._DummyLog)
252

    
253

    
254
class GanetiContext(object):
255
  """Context common to all ganeti threads.
256

    
257
  This class creates and holds common objects shared by all threads.
258

    
259
  """
260
  _instance = None
261

    
262
  def __init__(self):
263
    """Constructs a new GanetiContext object.
264

    
265
    There should be only a GanetiContext object at any time, so this
266
    function raises an error if this is not the case.
267

    
268
    """
269
    assert self.__class__._instance is None, "double GanetiContext instance"
270

    
271
    # Create global configuration object
272
    self.cfg = config.ConfigWriter()
273

    
274
    # Locking manager
275
    self.glm = locking.GanetiLockManager(
276
                self.cfg.GetNodeList(),
277
                self.cfg.GetInstanceList())
278

    
279
    # Job queue
280
    self.jobqueue = jqueue.JobQueue(self)
281

    
282
    # setting this also locks the class against attribute modifications
283
    self.__class__._instance = self
284

    
285
  def __setattr__(self, name, value):
286
    """Setting GanetiContext attributes is forbidden after initialization.
287

    
288
    """
289
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
290
    object.__setattr__(self, name, value)
291

    
292
  def AddNode(self, node):
293
    """Adds a node to the configuration and lock manager.
294

    
295
    """
296
    # Add it to the configuration
297
    self.cfg.AddNode(node)
298

    
299
    # If preseeding fails it'll not be added
300
    self.jobqueue.AddNode(node.name)
301

    
302
    # Add the new node to the Ganeti Lock Manager
303
    self.glm.add(locking.LEVEL_NODE, node.name)
304

    
305
  def ReaddNode(self, node):
306
    """Updates a node that's already in the configuration
307

    
308
    """
309
    # Synchronize the queue again
310
    self.jobqueue.AddNode(node.name)
311

    
312
  def RemoveNode(self, name):
313
    """Removes a node from the configuration and lock manager.
314

    
315
    """
316
    # Remove node from configuration
317
    self.cfg.RemoveNode(name)
318

    
319
    # Notify job queue
320
    self.jobqueue.RemoveNode(name)
321

    
322
    # Remove the node from the Ganeti Lock Manager
323
    self.glm.remove(locking.LEVEL_NODE, name)
324

    
325

    
326
def ParseOptions():
327
  """Parse the command line options.
328

    
329
  Returns:
330
    (options, args) as from OptionParser.parse_args()
331

    
332
  """
333
  parser = OptionParser(description="Ganeti master daemon",
334
                        usage="%prog [-f] [-d]",
335
                        version="%%prog (ganeti) %s" %
336
                        constants.RELEASE_VERSION)
337

    
338
  parser.add_option("-f", "--foreground", dest="fork",
339
                    help="Don't detach from the current terminal",
340
                    default=True, action="store_false")
341
  parser.add_option("-d", "--debug", dest="debug",
342
                    help="Enable some debug messages",
343
                    default=False, action="store_true")
344
  options, args = parser.parse_args()
345
  return options, args
346

    
347

    
348
def main():
349
  """Main function"""
350

    
351
  options, args = ParseOptions()
352
  utils.debug = options.debug
353
  utils.no_fork = True
354

    
355
  ssconf.CheckMaster(options.debug)
356

    
357
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
358

    
359
  # become a daemon
360
  if options.fork:
361
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
362
                    noclose_fds=[master.fileno()])
363

    
364
  utils.WritePidFile(constants.MASTERD_PID)
365

    
366
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
367
                      stderr_logging=not options.fork)
368

    
369
  logging.info("ganeti master daemon startup")
370

    
371
  # activate ip
372
  master_node = ssconf.SimpleStore().GetMasterNode()
373
  if not rpc.call_node_start_master(master_node, False):
374
    logging.error("Can't activate master IP address")
375

    
376
  master.setup_queue()
377
  try:
378
    master.serve_forever()
379
  finally:
380
    master.server_cleanup()
381
    utils.RemovePidFile(constants.MASTERD_PID)
382

    
383

    
384
if __name__ == "__main__":
385
  main()