Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ d4fa5c23

History | View | Annotate | Download (11.1 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39
import logging
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import config
45
from ganeti import constants
46
from ganeti import mcpu
47
from ganeti import opcodes
48
from ganeti import jqueue
49
from ganeti import locking
50
from ganeti import luxi
51
from ganeti import utils
52
from ganeti import errors
53
from ganeti import ssconf
54
from ganeti import logger
55

    
56

    
57
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
58
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
59

    
60

    
61
class IOServer(SocketServer.UnixStreamServer):
62
  """IO thread class.
63

    
64
  This class takes care of initializing the other threads, setting
65
  signal handlers (which are processed only in this thread), and doing
66
  cleanup at shutdown.
67

    
68
  """
69
  def __init__(self, address, rqhandler, context):
70
    """IOServer constructor
71

    
72
    Args:
73
      address: the address to bind this IOServer to
74
      rqhandler: RequestHandler type object
75
      context: Context Object common to all worker threads
76

    
77
    """
78
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
79
    self.do_quit = False
80
    self.context = context
81

    
82
    # We'll only start threads once we've forked.
83
    self.jobqueue = None
84

    
85
    signal.signal(signal.SIGINT, self.handle_quit_signals)
86
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
87

    
88
  def setup_queue(self):
89
    self.jobqueue = jqueue.JobQueue(self.context)
90

    
91
  def process_request_thread(self, request, client_address):
92
    """Process the request.
93

    
94
    This is copied from the code in ThreadingMixIn.
95

    
96
    """
97
    try:
98
      self.finish_request(request, client_address)
99
      self.close_request(request)
100
    except:
101
      self.handle_error(request, client_address)
102
      self.close_request(request)
103

    
104
  def process_request(self, request, client_address):
105
    """Start a new thread to process the request.
106

    
107
    This is copied from the coode in ThreadingMixIn.
108

    
109
    """
110
    t = threading.Thread(target=self.process_request_thread,
111
                         args=(request, client_address))
112
    t.start()
113

    
114
  def handle_quit_signals(self, signum, frame):
115
    print "received %s in %s" % (signum, frame)
116
    self.do_quit = True
117

    
118
  def serve_forever(self):
119
    """Handle one request at a time until told to quit."""
120
    while not self.do_quit:
121
      self.handle_request()
122
      print "served request, quit=%s" % (self.do_quit)
123

    
124
  def server_cleanup(self):
125
    """Cleanup the server.
126

    
127
    This involves shutting down the processor threads and the master
128
    socket.
129

    
130
    """
131
    try:
132
      self.server_close()
133
      utils.RemoveFile(constants.MASTER_SOCKET)
134
    finally:
135
      if self.jobqueue:
136
        self.jobqueue.Shutdown()
137

    
138

    
139
class ClientRqHandler(SocketServer.BaseRequestHandler):
140
  """Client handler"""
141
  EOM = '\3'
142
  READ_SIZE = 4096
143

    
144
  def setup(self):
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147
    self._ops = ClientOps(self.server)
148

    
149
  def handle(self):
150
    while True:
151
      msg = self.read_message()
152
      if msg is None:
153
        logging.info("client closed connection")
154
        break
155

    
156
      request = simplejson.loads(msg)
157
      logging.debug("request: %s", request)
158
      if not isinstance(request, dict):
159
        logging.error("wrong request received: %s", msg)
160
        break
161

    
162
      method = request.get(luxi.KEY_METHOD, None)
163
      args = request.get(luxi.KEY_ARGS, None)
164
      if method is None or args is None:
165
        logging.error("no method or args in request")
166
        break
167

    
168
      success = False
169
      try:
170
        result = self._ops.handle_request(method, args)
171
        success = True
172
      except:
173
        logging.error("Unexpected exception", exc_info=True)
174
        err = sys.exc_info()
175
        result = "Caught exception: %s" % str(err[1])
176

    
177
      response = {
178
        luxi.KEY_SUCCESS: success,
179
        luxi.KEY_RESULT: result,
180
        }
181
      logging.debug("response: %s", response)
182
      self.send_message(simplejson.dumps(response))
183

    
184
  def read_message(self):
185
    while not self._msgs:
186
      data = self.request.recv(self.READ_SIZE)
187
      if not data:
188
        return None
189
      new_msgs = (self._buffer + data).split(self.EOM)
190
      self._buffer = new_msgs.pop()
191
      self._msgs.extend(new_msgs)
192
    return self._msgs.popleft()
193

    
194
  def send_message(self, msg):
195
    #print "sending", msg
196
    self.request.sendall(msg + self.EOM)
197

    
198

    
199
class ClientOps:
200
  """Class holding high-level client operations."""
201
  def __init__(self, server):
202
    self.server = server
203

    
204
  def handle_request(self, method, args):
205
    queue = self.server.jobqueue
206

    
207
    # TODO: Parameter validation
208

    
209
    if method == luxi.REQ_SUBMIT_JOB:
210
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
211
      return queue.SubmitJob(ops)
212

    
213
    elif method == luxi.REQ_CANCEL_JOB:
214
      (job_id, ) = args
215
      return queue.CancelJob(job_id)
216

    
217
    elif method == luxi.REQ_ARCHIVE_JOB:
218
      (job_id, ) = args
219
      return queue.ArchiveJob(job_id)
220

    
221
    elif method == luxi.REQ_QUERY_JOBS:
222
      (job_ids, fields) = args
223
      return queue.QueryJobs(job_ids, fields)
224

    
225
    else:
226
      raise ValueError("Invalid operation")
227

    
228

    
229
def JobRunner(proc, job, context):
230
  """Job executor.
231

    
232
  This functions processes a single job in the context of given
233
  processor instance.
234

    
235
  Args:
236
    proc: Ganeti Processor to run the job on
237
    job: The job to run (unserialized format)
238
    context: Ganeti shared context
239

    
240
  """
241
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
242
  fail = False
243
  for idx, op in enumerate(job.data.op_list):
244
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
245
    try:
246
      job.data.op_result[idx] = proc.ExecOpCode(op)
247
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
248
    except (errors.OpPrereqError, errors.OpExecError), err:
249
      fail = True
250
      job.data.op_result[idx] = str(err)
251
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
252
  if fail:
253
    job.SetStatus(opcodes.Job.STATUS_FAIL)
254
  else:
255
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
256

    
257

    
258
def PoolWorker(worker_id, incoming_queue, context):
259
  """A worker thread function.
260

    
261
  This is the actual processor of a single thread of Job execution.
262

    
263
  Args:
264
    worker_id: the unique id for this worker
265
    incoming_queue: a queue to get jobs from
266
    context: the common server context, containing all shared data and
267
             synchronization structures.
268

    
269
  """
270
  while True:
271
    logging.debug("worker %s sleeping", worker_id)
272
    item = incoming_queue.get(True)
273
    if item is None:
274
      break
275
    logging.debug("worker %s processing job %s", worker_id, item.data.job_id)
276
    proc = mcpu.Processor(context, feedback=lambda x: None)
277
    try:
278
      JobRunner(proc, item, context)
279
    except errors.GenericError, err:
280
      msg = "ganeti exception"
281
      logging.error(msg, exc_info=err)
282
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
283
    except Exception, err:
284
      msg = "unhandled exception"
285
      logging.error(msg, exc_info=err)
286
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
287
    except:
288
      msg = "unhandled unknown exception"
289
      logging.error(msg, exc_info=True)
290
      item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
291
    logging.debug("worker %s finish job %s", worker_id, item.data.job_id)
292
  logging.debug("worker %s exiting", worker_id)
293

    
294

    
295
class GanetiContext(object):
296
  """Context common to all ganeti threads.
297

    
298
  This class creates and holds common objects shared by all threads.
299

    
300
  """
301
  _instance = None
302

    
303
  def __init__(self):
304
    """Constructs a new GanetiContext object.
305

    
306
    There should be only a GanetiContext object at any time, so this
307
    function raises an error if this is not the case.
308

    
309
    """
310
    assert self.__class__._instance is None, "double GanetiContext instance"
311

    
312
    # Create a ConfigWriter...
313
    self.cfg = config.ConfigWriter()
314
    # And a GanetiLockingManager...
315
    self.glm = locking.GanetiLockManager(
316
                self.cfg.GetNodeList(),
317
                self.cfg.GetInstanceList())
318

    
319
    # setting this also locks the class against attribute modifications
320
    self.__class__._instance = self
321

    
322
  def __setattr__(self, name, value):
323
    """Setting GanetiContext attributes is forbidden after initialization.
324

    
325
    """
326
    assert self.__class__._instance is None, "Attempt to modify Ganeti Context"
327
    object.__setattr__(self, name, value)
328

    
329

    
330
def CheckMaster(debug):
331
  """Checks the node setup.
332

    
333
  If this is the master, the function will return. Otherwise it will
334
  exit with an exit code based on the node status.
335

    
336
  """
337
  try:
338
    ss = ssconf.SimpleStore()
339
    master_name = ss.GetMasterNode()
340
  except errors.ConfigurationError, err:
341
    print "Cluster configuration incomplete: '%s'" % str(err)
342
    sys.exit(EXIT_NODESETUP_ERROR)
343

    
344
  try:
345
    myself = utils.HostInfo()
346
  except errors.ResolverError, err:
347
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
348
    sys.exit(EXIT_NODESETUP_ERROR)
349

    
350
  if myself.name != master_name:
351
    if debug:
352
      sys.stderr.write("Not master, exiting.\n")
353
    sys.exit(EXIT_NOTMASTER)
354

    
355

    
356
def ParseOptions():
357
  """Parse the command line options.
358

    
359
  Returns:
360
    (options, args) as from OptionParser.parse_args()
361

    
362
  """
363
  parser = OptionParser(description="Ganeti master daemon",
364
                        usage="%prog [-f] [-d]",
365
                        version="%%prog (ganeti) %s" %
366
                        constants.RELEASE_VERSION)
367

    
368
  parser.add_option("-f", "--foreground", dest="fork",
369
                    help="Don't detach from the current terminal",
370
                    default=True, action="store_false")
371
  parser.add_option("-d", "--debug", dest="debug",
372
                    help="Enable some debug messages",
373
                    default=False, action="store_true")
374
  options, args = parser.parse_args()
375
  return options, args
376

    
377

    
378
def main():
379
  """Main function"""
380

    
381
  options, args = ParseOptions()
382
  utils.debug = options.debug
383
  utils.no_fork = True
384

    
385
  CheckMaster(options.debug)
386

    
387
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler, GanetiContext())
388

    
389
  # become a daemon
390
  if options.fork:
391
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
392
                    noclose_fds=[master.fileno()])
393

    
394
  logger.SetupDaemon(constants.LOG_MASTERDAEMON, debug=options.debug)
395

    
396
  logging.info("ganeti master daemon startup")
397

    
398
  master.setup_queue()
399
  try:
400
    master.serve_forever()
401
  finally:
402
    master.server_cleanup()
403

    
404

    
405
if __name__ == "__main__":
406
  main()