Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 0db7ac4d

History | View | Annotate | Download (10.5 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39

    
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52

    
53

    
54
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56

    
57

    
58
class IOServer(SocketServer.UnixStreamServer):
59
  """IO thread class.
60

    
61
  This class takes care of initializing the other threads, setting
62
  signal handlers (which are processed only in this thread), and doing
63
  cleanup at shutdown.
64

    
65
  """
66
  QUEUE_PROCESSOR_SIZE = 1
67

    
68
  def __init__(self, address, rqhandler):
69
    """IOServer constructor
70

    
71
    Args:
72
      address: the address to bind this IOServer to
73
      rqhandler: RequestHandler type object
74

    
75
    """
76
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
77
    self.do_quit = False
78
    self.queue = jqueue.QueueManager()
79
    self.processors = []
80
    signal.signal(signal.SIGINT, self.handle_quit_signals)
81
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
82

    
83
  def setup_processors(self):
84
    """Spawn the processors threads.
85

    
86
    This initializes the queue and the thread processors. It is done
87
    separately from the constructor because we want the clone()
88
    syscalls to happen after the daemonize part.
89

    
90
    """
91
    for i in range(self.QUEUE_PROCESSOR_SIZE):
92
      self.processors.append(threading.Thread(target=PoolWorker,
93
                                              args=(i, self.queue.new_queue)))
94
    for t in self.processors:
95
      t.start()
96

    
97
  def process_request_thread(self, request, client_address):
98
    """Process the request.
99

    
100
    This is copied from the code in ThreadingMixIn.
101

    
102
    """
103
    try:
104
      self.finish_request(request, client_address)
105
      self.close_request(request)
106
    except:
107
      self.handle_error(request, client_address)
108
      self.close_request(request)
109

    
110
  def process_request(self, request, client_address):
111
    """Start a new thread to process the request.
112

    
113
    This is copied from the coode in ThreadingMixIn.
114

    
115
    """
116
    t = threading.Thread(target=self.process_request_thread,
117
                         args=(request, client_address))
118
    t.start()
119

    
120
  def handle_quit_signals(self, signum, frame):
121
    print "received %s in %s" % (signum, frame)
122
    self.do_quit = True
123

    
124
  def serve_forever(self):
125
    """Handle one request at a time until told to quit."""
126
    while not self.do_quit:
127
      self.handle_request()
128
      print "served request, quit=%s" % (self.do_quit)
129

    
130
  def server_cleanup(self):
131
    """Cleanup the server.
132

    
133
    This involves shutting down the processor threads and the master
134
    socket.
135

    
136
    """
137
    self.server_close()
138
    utils.RemoveFile(constants.MASTER_SOCKET)
139
    for i in range(self.QUEUE_PROCESSOR_SIZE):
140
      self.queue.new_queue.put(None)
141
    for idx, t in enumerate(self.processors):
142
      print "waiting for processor thread %s..." % idx
143
      t.join()
144
    print "done threads"
145

    
146

    
147
class ClientRqHandler(SocketServer.BaseRequestHandler):
148
  """Client handler"""
149
  EOM = '\3'
150
  READ_SIZE = 4096
151

    
152
  def setup(self):
153
    self._buffer = ""
154
    self._msgs = collections.deque()
155
    self._ops = ClientOps(self.server)
156

    
157
  def handle(self):
158
    while True:
159
      msg = self.read_message()
160
      if msg is None:
161
        print "client closed connection"
162
        break
163
      request = simplejson.loads(msg)
164
      if not isinstance(request, dict):
165
        print "wrong request received: %s" % msg
166
        break
167
      method = request.get('request', None)
168
      data = request.get('data', None)
169
      if method is None or data is None:
170
        print "no method or data in request"
171
        break
172
      print "request:", method, data
173
      result = self._ops.handle_request(method, data)
174
      print "result:", result
175
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
176

    
177
  def read_message(self):
178
    while not self._msgs:
179
      data = self.request.recv(self.READ_SIZE)
180
      if not data:
181
        return None
182
      new_msgs = (self._buffer + data).split(self.EOM)
183
      self._buffer = new_msgs.pop()
184
      self._msgs.extend(new_msgs)
185
    return self._msgs.popleft()
186

    
187
  def send_message(self, msg):
188
    #print "sending", msg
189
    self.request.sendall(msg + self.EOM)
190

    
191

    
192
class ClientOps:
193
  """Class holding high-level client operations."""
194
  def __init__(self, server):
195
    self.server = server
196
    self._cpu = None
197

    
198
  def _getcpu(self):
199
    if self._cpu is None:
200
      self._cpu = mcpu.Processor(lambda x: None)
201
    return self._cpu
202

    
203
  def handle_request(self, operation, args):
204
    print operation, args
205
    if operation == "submit":
206
      return self.put(args)
207
    elif operation == "query":
208
      return self.query(args)
209
    else:
210
      raise ValueError("Invalid operation")
211

    
212
  def put(self, args):
213
    job = luxi.UnserializeJob(args)
214
    rid = self.server.queue.put(job)
215
    return rid
216

    
217
  def query(self, args):
218
    path = args["object"]
219
    fields = args["fields"]
220
    names = args["names"]
221
    if path == "instances":
222
      opclass = opcodes.OpQueryInstances
223
    elif path == "jobs":
224
      # early exit because job query-ing is special (not via opcodes)
225
      return self.query_jobs(fields, names)
226
    else:
227
      raise ValueError("Invalid object %s" % path)
228

    
229
    op = opclass(output_fields = fields, names=names)
230
    cpu = self._getcpu()
231
    result = cpu.ExecOpCode(op)
232
    return result
233

    
234
  def query_jobs(self, fields, names):
235
    return self.server.queue.query_jobs(fields, names)
236

    
237

    
238
def JobRunner(proc, job):
239
  """Job executor.
240

    
241
  This functions processes a single job in the context of given
242
  processor instance.
243

    
244
  Args:
245
    proc: Ganeti Processor to run the job on
246
    job: The job to run (unserialized format)
247

    
248
  """
249
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
250
  fail = False
251
  for idx, op in enumerate(job.data.op_list):
252
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
253
    try:
254
      job.data.op_result[idx] = proc.ExecOpCode(op)
255
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
256
    except (errors.OpPrereqError, errors.OpExecError), err:
257
      fail = True
258
      job.data.op_result[idx] = str(err)
259
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
260
  if fail:
261
    job.SetStatus(opcodes.Job.STATUS_FAIL)
262
  else:
263
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
264

    
265

    
266
def PoolWorker(worker_id, incoming_queue):
267
  """A worker thread function.
268

    
269
  This is the actual processor of a single thread of Job execution.
270

    
271
  Args:
272
    worker_id: the unique id for this worker
273
    incoming_queue: a queue to get jobs from
274

    
275
  """
276
  while True:
277
    print "worker %s sleeping" % worker_id
278
    item = incoming_queue.get(True)
279
    if item is None:
280
      break
281
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
282
    #utils.Lock('cmd')
283
    try:
284
      proc = mcpu.Processor(feedback=lambda x: None)
285
      try:
286
        JobRunner(proc, item)
287
      except errors.GenericError, err:
288
        msg = "ganeti exception %s" % err
289
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
290
        print msg
291
      except Exception, err:
292
        msg = "unhandled exception %s" % err
293
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
294
        print msg
295
      except:
296
        msg = "unhandled unknown exception"
297
        item.SetStatus(opcodes.Job.STATUS_FAIL, result=[msg])
298
        print msg
299

    
300
    finally:
301
      #utils.Unlock('cmd')
302
      #utils.LockCleanup()
303
      pass
304
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
305
  print "worker %s exiting" % worker_id
306

    
307

    
308
def CheckMaster(debug):
309
  """Checks the node setup.
310

    
311
  If this is the master, the function will return. Otherwise it will
312
  exit with an exit code based on the node status.
313

    
314
  """
315
  try:
316
    ss = ssconf.SimpleStore()
317
    master_name = ss.GetMasterNode()
318
  except errors.ConfigurationError, err:
319
    print "Cluster configuration incomplete: '%s'" % str(err)
320
    sys.exit(EXIT_NODESETUP_ERROR)
321

    
322
  try:
323
    myself = utils.HostInfo()
324
  except errors.ResolverError, err:
325
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
326
    sys.exit(EXIT_NODESETUP_ERROR)
327

    
328
  if myself.name != master_name:
329
    if debug:
330
      sys.stderr.write("Not master, exiting.\n")
331
    sys.exit(EXIT_NOTMASTER)
332

    
333

    
334
def ParseOptions():
335
  """Parse the command line options.
336

    
337
  Returns:
338
    (options, args) as from OptionParser.parse_args()
339

    
340
  """
341
  parser = OptionParser(description="Ganeti master daemon",
342
                        usage="%prog [-f] [-d]",
343
                        version="%%prog (ganeti) %s" %
344
                        constants.RELEASE_VERSION)
345

    
346
  parser.add_option("-f", "--foreground", dest="fork",
347
                    help="Don't detach from the current terminal",
348
                    default=True, action="store_false")
349
  parser.add_option("-d", "--debug", dest="debug",
350
                    help="Enable some debug messages",
351
                    default=False, action="store_true")
352
  options, args = parser.parse_args()
353
  return options, args
354

    
355

    
356
def main():
357
  """Main function"""
358

    
359
  options, args = ParseOptions()
360
  utils.debug = options.debug
361
  utils.no_fork = True
362

    
363
  CheckMaster(options.debug)
364

    
365
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
366

    
367
  # become a daemon
368
  if options.fork:
369
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
370
                    noclose_fds=[master.fileno()])
371

    
372
  try:
373
    utils.Lock('cmd', debug=options.debug)
374
  except errors.LockError, err:
375
    print >> sys.stderr, str(err)
376
    master.server_cleanup()
377
    return
378

    
379
  try:
380
    master.setup_processors()
381
    try:
382
      master.serve_forever()
383
    finally:
384
      master.server_cleanup()
385
  finally:
386
    utils.Unlock('cmd')
387
    utils.LockCleanup()
388

    
389

    
390
if __name__ == "__main__":
391
  main()