Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 4c848b18

History | View | Annotate | Download (9.2 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  def serve_forever(self):
113
    """Handle one request at a time until told to quit."""
114
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115
    try:
116
      while not sighandler.called:
117
        self.handle_request()
118
    finally:
119
      sighandler.Reset()
120

    
121
  def server_cleanup(self):
122
    """Cleanup the server.
123

    
124
    This involves shutting down the processor threads and the master
125
    socket.
126

    
127
    """
128
    try:
129
      self.server_close()
130
      utils.RemoveFile(constants.MASTER_SOCKET)
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.info("client closed connection")
153
        break
154

    
155
      request = simplejson.loads(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except:
172
        logging.error("Unexpected exception", exc_info=True)
173
        err = sys.exc_info()
174
        result = "Caught exception: %s" % str(err[1])
175

    
176
      response = {
177
        luxi.KEY_SUCCESS: success,
178
        luxi.KEY_RESULT: result,
179
        }
180
      logging.debug("response: %s", response)
181
      self.send_message(simplejson.dumps(response))
182

    
183
  def read_message(self):
184
    while not self._msgs:
185
      data = self.request.recv(self.READ_SIZE)
186
      if not data:
187
        return None
188
      new_msgs = (self._buffer + data).split(self.EOM)
189
      self._buffer = new_msgs.pop()
190
      self._msgs.extend(new_msgs)
191
    return self._msgs.popleft()
192

    
193
  def send_message(self, msg):
194
    #print "sending", msg
195
    self.request.sendall(msg + self.EOM)
196

    
197

    
198
class ClientOps:
199
  """Class holding high-level client operations."""
200
  def __init__(self, server):
201
    self.server = server
202

    
203
  def handle_request(self, method, args):
204
    queue = self.server.context.jobqueue
205

    
206
    # TODO: Parameter validation
207

    
208
    if method == luxi.REQ_SUBMIT_JOB:
209
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
210
      return queue.SubmitJob(ops)
211

    
212
    elif method == luxi.REQ_CANCEL_JOB:
213
      job_id = args
214
      return queue.CancelJob(job_id)
215

    
216
    elif method == luxi.REQ_ARCHIVE_JOB:
217
      job_id = args
218
      return queue.ArchiveJob(job_id)
219

    
220
    elif method == luxi.REQ_QUERY_JOBS:
221
      (job_ids, fields) = args
222
      return queue.QueryJobs(job_ids, fields)
223

    
224
    elif method == luxi.REQ_QUERY_INSTANCES:
225
      (names, fields) = args
226
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
227
      return self._Query(op)
228

    
229
    elif method == luxi.REQ_QUERY_NODES:
230
      (names, fields) = args
231
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
232
      return self._Query(op)
233

    
234
    else:
235
      raise ValueError("Invalid operation")
236

    
237
  def _DummyLog(self, *args):
238
    pass
239

    
240
  def _Query(self, op):
241
    """Runs the specified opcode and returns the result.
242

    
243
    """
244
    proc = mcpu.Processor(self.server.context)
245
    # TODO: Where should log messages go?
246
    return proc.ExecOpCode(op, self._DummyLog)
247

    
248

    
249
class GanetiContext(object):
250
  """Context common to all ganeti threads.
251

    
252
  This class creates and holds common objects shared by all threads.
253

    
254
  """
255
  _instance = None
256

    
257
  def __init__(self):
258
    """Constructs a new GanetiContext object.
259

    
260
    There should be only a GanetiContext object at any time, so this
261
    function raises an error if this is not the case.
262

    
263
    """
264
    assert self.__class__._instance is None, "double GanetiContext instance"
265

    
266
    # Create global configuration object
267
    self.cfg = config.ConfigWriter()
268

    
269
    # Locking manager
270
    self.glm = locking.GanetiLockManager(
271
                self.cfg.GetNodeList(),
272
                self.cfg.GetInstanceList())
273

    
274
    # Job queue
275
    self.jobqueue = jqueue.JobQueue(self)
276

    
277
    # setting this also locks the class against attribute modifications
278
    self.__class__._instance = self
279

    
280
  def __setattr__(self, name, value):
281
    """Setting GanetiContext attributes is forbidden after initialization.
282

    
283
    """
284
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
285
    object.__setattr__(self, name, value)
286

    
287

    
288
def ParseOptions():
289
  """Parse the command line options.
290

    
291
  Returns:
292
    (options, args) as from OptionParser.parse_args()
293

    
294
  """
295
  parser = OptionParser(description="Ganeti master daemon",
296
                        usage="%prog [-f] [-d]",
297
                        version="%%prog (ganeti) %s" %
298
                        constants.RELEASE_VERSION)
299

    
300
  parser.add_option("-f", "--foreground", dest="fork",
301
                    help="Don't detach from the current terminal",
302
                    default=True, action="store_false")
303
  parser.add_option("-d", "--debug", dest="debug",
304
                    help="Enable some debug messages",
305
                    default=False, action="store_true")
306
  options, args = parser.parse_args()
307
  return options, args
308

    
309

    
310
def main():
311
  """Main function"""
312

    
313
  options, args = ParseOptions()
314
  utils.debug = options.debug
315
  utils.no_fork = True
316

    
317
  ssconf.CheckMaster(options.debug)
318

    
319
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
320

    
321
  # become a daemon
322
  if options.fork:
323
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
324
                    noclose_fds=[master.fileno()])
325

    
326
  utils.WritePidFile(constants.MASTERD_PID)
327

    
328
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
329
                      stderr_logging=not options.fork)
330

    
331
  logging.info("ganeti master daemon startup")
332

    
333
  # activate ip
334
  master_node = ssconf.SimpleStore().GetMasterNode()
335
  if not rpc.call_node_start_master(master_node, False):
336
    logging.error("Can't activate master IP address")
337

    
338
  master.setup_queue()
339
  try:
340
    master.serve_forever()
341
  finally:
342
    master.server_cleanup()
343
    utils.RemovePidFile(constants.MASTERD_PID)
344

    
345

    
346
if __name__ == "__main__":
347
  main()