Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 9113300d

History | View | Annotate | Download (9.4 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38
import logging
39

    
40
from cStringIO import StringIO
41
from optparse import OptionParser
42

    
43
from ganeti import config
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import locking
49
from ganeti import luxi
50
from ganeti import utils
51
from ganeti import errors
52
from ganeti import ssconf
53
from ganeti import logger
54
from ganeti import workerpool
55
from ganeti import rpc
56

    
57

    
58
CLIENT_REQUEST_WORKERS = 16
59

    
60
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
61
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
62

    
63

    
64
class ClientRequestWorker(workerpool.BaseWorker):
65
  def RunTask(self, server, request, client_address):
66
    """Process the request.
67

    
68
    This is copied from the code in ThreadingMixIn.
69

    
70
    """
71
    try:
72
      server.finish_request(request, client_address)
73
      server.close_request(request)
74
    except:
75
      server.handle_error(request, client_address)
76
      server.close_request(request)
77

    
78

    
79
class IOServer(SocketServer.UnixStreamServer):
80
  """IO thread class.
81

    
82
  This class takes care of initializing the other threads, setting
83
  signal handlers (which are processed only in this thread), and doing
84
  cleanup at shutdown.
85

    
86
  """
87
  def __init__(self, address, rqhandler):
88
    """IOServer constructor
89

    
90
    Args:
91
      address: the address to bind this IOServer to
92
      rqhandler: RequestHandler type object
93

    
94
    """
95
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
96

    
97
    # We'll only start threads once we've forked.
98
    self.context = None
99
    self.request_workers = None
100

    
101
  def setup_queue(self):
102
    self.context = GanetiContext()
103
    self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
104
                                                 ClientRequestWorker)
105

    
106
  def process_request(self, request, client_address):
107
    """Add task to workerpool to process request.
108

    
109
    """
110
    self.request_workers.AddTask(self, request, client_address)
111

    
112
  def serve_forever(self):
113
    """Handle one request at a time until told to quit."""
114
    sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
115
    try:
116
      while not sighandler.called:
117
        self.handle_request()
118
    finally:
119
      sighandler.Reset()
120

    
121
  def server_cleanup(self):
122
    """Cleanup the server.
123

    
124
    This involves shutting down the processor threads and the master
125
    socket.
126

    
127
    """
128
    try:
129
      self.server_close()
130
      utils.RemoveFile(constants.MASTER_SOCKET)
131
    finally:
132
      if self.request_workers:
133
        self.request_workers.TerminateWorkers()
134
      if self.context:
135
        self.context.jobqueue.Shutdown()
136

    
137

    
138
class ClientRqHandler(SocketServer.BaseRequestHandler):
139
  """Client handler"""
140
  EOM = '\3'
141
  READ_SIZE = 4096
142

    
143
  def setup(self):
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146
    self._ops = ClientOps(self.server)
147

    
148
  def handle(self):
149
    while True:
150
      msg = self.read_message()
151
      if msg is None:
152
        logging.info("client closed connection")
153
        break
154

    
155
      request = simplejson.loads(msg)
156
      logging.debug("request: %s", request)
157
      if not isinstance(request, dict):
158
        logging.error("wrong request received: %s", msg)
159
        break
160

    
161
      method = request.get(luxi.KEY_METHOD, None)
162
      args = request.get(luxi.KEY_ARGS, None)
163
      if method is None or args is None:
164
        logging.error("no method or args in request")
165
        break
166

    
167
      success = False
168
      try:
169
        result = self._ops.handle_request(method, args)
170
        success = True
171
      except:
172
        logging.error("Unexpected exception", exc_info=True)
173
        err = sys.exc_info()
174
        result = "Caught exception: %s" % str(err[1])
175

    
176
      response = {
177
        luxi.KEY_SUCCESS: success,
178
        luxi.KEY_RESULT: result,
179
        }
180
      logging.debug("response: %s", response)
181
      self.send_message(simplejson.dumps(response))
182

    
183
  def read_message(self):
184
    while not self._msgs:
185
      data = self.request.recv(self.READ_SIZE)
186
      if not data:
187
        return None
188
      new_msgs = (self._buffer + data).split(self.EOM)
189
      self._buffer = new_msgs.pop()
190
      self._msgs.extend(new_msgs)
191
    return self._msgs.popleft()
192

    
193
  def send_message(self, msg):
194
    #print "sending", msg
195
    self.request.sendall(msg + self.EOM)
196

    
197

    
198
class ClientOps:
199
  """Class holding high-level client operations."""
200
  def __init__(self, server):
201
    self.server = server
202

    
203
  def handle_request(self, method, args):
204
    queue = self.server.context.jobqueue
205

    
206
    # TODO: Parameter validation
207

    
208
    if method == luxi.REQ_SUBMIT_JOB:
209
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
210
      # we need to compute the node list here, since from now on all
211
      # operations require locks on the queue or the storage, and we
212
      # shouldn't get another lock
213
      node_list = self.server.context.cfg.GetNodeList()
214
      return queue.SubmitJob(ops, node_list)
215

    
216
    elif method == luxi.REQ_CANCEL_JOB:
217
      job_id = args
218
      return queue.CancelJob(job_id)
219

    
220
    elif method == luxi.REQ_ARCHIVE_JOB:
221
      job_id = args
222
      return queue.ArchiveJob(job_id)
223

    
224
    elif method == luxi.REQ_QUERY_JOBS:
225
      (job_ids, fields) = args
226
      return queue.QueryJobs(job_ids, fields)
227

    
228
    elif method == luxi.REQ_QUERY_INSTANCES:
229
      (names, fields) = args
230
      op = opcodes.OpQueryInstances(names=names, output_fields=fields)
231
      return self._Query(op)
232

    
233
    elif method == luxi.REQ_QUERY_NODES:
234
      (names, fields) = args
235
      op = opcodes.OpQueryNodes(names=names, output_fields=fields)
236
      return self._Query(op)
237

    
238
    else:
239
      raise ValueError("Invalid operation")
240

    
241
  def _DummyLog(self, *args):
242
    pass
243

    
244
  def _Query(self, op):
245
    """Runs the specified opcode and returns the result.
246

    
247
    """
248
    proc = mcpu.Processor(self.server.context)
249
    # TODO: Where should log messages go?
250
    return proc.ExecOpCode(op, self._DummyLog)
251

    
252

    
253
class GanetiContext(object):
254
  """Context common to all ganeti threads.
255

    
256
  This class creates and holds common objects shared by all threads.
257

    
258
  """
259
  _instance = None
260

    
261
  def __init__(self):
262
    """Constructs a new GanetiContext object.
263

    
264
    There should be only a GanetiContext object at any time, so this
265
    function raises an error if this is not the case.
266

    
267
    """
268
    assert self.__class__._instance is None, "double GanetiContext instance"
269

    
270
    # Create global configuration object
271
    self.cfg = config.ConfigWriter()
272

    
273
    # Locking manager
274
    self.glm = locking.GanetiLockManager(
275
                self.cfg.GetNodeList(),
276
                self.cfg.GetInstanceList())
277

    
278
    # Job queue
279
    self.jobqueue = jqueue.JobQueue(self)
280

    
281
    # setting this also locks the class against attribute modifications
282
    self.__class__._instance = self
283

    
284
  def __setattr__(self, name, value):
285
    """Setting GanetiContext attributes is forbidden after initialization.
286

    
287
    """
288
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
289
    object.__setattr__(self, name, value)
290

    
291

    
292
def ParseOptions():
293
  """Parse the command line options.
294

    
295
  Returns:
296
    (options, args) as from OptionParser.parse_args()
297

    
298
  """
299
  parser = OptionParser(description="Ganeti master daemon",
300
                        usage="%prog [-f] [-d]",
301
                        version="%%prog (ganeti) %s" %
302
                        constants.RELEASE_VERSION)
303

    
304
  parser.add_option("-f", "--foreground", dest="fork",
305
                    help="Don't detach from the current terminal",
306
                    default=True, action="store_false")
307
  parser.add_option("-d", "--debug", dest="debug",
308
                    help="Enable some debug messages",
309
                    default=False, action="store_true")
310
  options, args = parser.parse_args()
311
  return options, args
312

    
313

    
314
def main():
315
  """Main function"""
316

    
317
  options, args = ParseOptions()
318
  utils.debug = options.debug
319
  utils.no_fork = True
320

    
321
  ssconf.CheckMaster(options.debug)
322

    
323
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
324

    
325
  # become a daemon
326
  if options.fork:
327
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
328
                    noclose_fds=[master.fileno()])
329

    
330
  utils.WritePidFile(constants.MASTERD_PID)
331

    
332
  logger.SetupLogging(constants.LOG_MASTERDAEMON, debug=options.debug,
333
                      stderr_logging=not options.fork)
334

    
335
  logging.info("ganeti master daemon startup")
336

    
337
  # activate ip
338
  master_node = ssconf.SimpleStore().GetMasterNode()
339
  if not rpc.call_node_start_master(master_node, False):
340
    logging.error("Can't activate master IP address")
341

    
342
  master.setup_queue()
343
  try:
344
    master.serve_forever()
345
  finally:
346
    master.server_cleanup()
347
    utils.RemovePidFile(constants.MASTERD_PID)
348

    
349

    
350
if __name__ == "__main__":
351
  main()