Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ a4af651e

History | View | Annotate | Download (9.8 kB)

1
#!/usr/bin/python -u
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import sys
31
import SocketServer
32
import threading
33
import time
34
import collections
35
import Queue
36
import random
37
import signal
38
import simplejson
39

    
40

    
41
from cStringIO import StringIO
42
from optparse import OptionParser
43

    
44
from ganeti import constants
45
from ganeti import mcpu
46
from ganeti import opcodes
47
from ganeti import jqueue
48
from ganeti import luxi
49
from ganeti import utils
50
from ganeti import errors
51
from ganeti import ssconf
52

    
53

    
54
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
55
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
56

    
57

    
58
class IOServer(SocketServer.UnixStreamServer):
59
  """IO thread class.
60

    
61
  This class takes care of initializing the other threads, setting
62
  signal handlers (which are processed only in this thread), and doing
63
  cleanup at shutdown.
64

    
65
  """
66
  QUEUE_PROCESSOR_SIZE = 1
67

    
68
  def __init__(self, address, rqhandler):
69
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
70
    self.do_quit = False
71
    self.queue = jqueue.QueueManager()
72
    self.processors = []
73
    signal.signal(signal.SIGINT, self.handle_quit_signals)
74
    signal.signal(signal.SIGTERM, self.handle_quit_signals)
75

    
76
  def setup_processors(self):
77
    """Spawn the processors threads.
78

    
79
    This initializes the queue and the thread processors. It is done
80
    separately from the constructor because we want the clone()
81
    syscalls to happen after the daemonize part.
82

    
83
    """
84
    for i in range(self.QUEUE_PROCESSOR_SIZE):
85
      self.processors.append(threading.Thread(target=PoolWorker,
86
                                              args=(i, self.queue.new_queue)))
87
    for t in self.processors:
88
      t.start()
89

    
90
  def process_request_thread(self, request, client_address):
91
    """Process the request.
92

    
93
    This is copied from the code in ThreadingMixIn.
94

    
95
    """
96
    try:
97
      self.finish_request(request, client_address)
98
      self.close_request(request)
99
    except:
100
      self.handle_error(request, client_address)
101
      self.close_request(request)
102

    
103
  def process_request(self, request, client_address):
104
    """Start a new thread to process the request.
105

    
106
    This is copied from the coode in ThreadingMixIn.
107

    
108
    """
109
    t = threading.Thread(target=self.process_request_thread,
110
                         args=(request, client_address))
111
    t.start()
112

    
113
  def handle_quit_signals(self, signum, frame):
114
    print "received %s in %s" % (signum, frame)
115
    self.do_quit = True
116

    
117
  def serve_forever(self):
118
    """Handle one request at a time until told to quit."""
119
    while not self.do_quit:
120
      self.handle_request()
121
      print "served request, quit=%s" % (self.do_quit)
122

    
123
  def server_cleanup(self):
124
    """Cleanup the server.
125

    
126
    This involves shutting down the processor threads and the master
127
    socket.
128

    
129
    """
130
    self.server_close()
131
    utils.RemoveFile(constants.MASTER_SOCKET)
132
    for i in range(self.QUEUE_PROCESSOR_SIZE):
133
      self.queue.new_queue.put(None)
134
    for idx, t in enumerate(self.processors):
135
      print "waiting for processor thread %s..." % idx
136
      t.join()
137
    print "done threads"
138

    
139

    
140
class ClientRqHandler(SocketServer.BaseRequestHandler):
141
  """Client handler"""
142
  EOM = '\3'
143
  READ_SIZE = 4096
144

    
145
  def setup(self):
146
    self._buffer = ""
147
    self._msgs = collections.deque()
148
    self._ops = ClientOps(self.server)
149

    
150
  def handle(self):
151
    while True:
152
      msg = self.read_message()
153
      if msg is None:
154
        print "client closed connection"
155
        break
156
      request = simplejson.loads(msg)
157
      if not isinstance(request, dict):
158
        print "wrong request received: %s" % msg
159
        break
160
      method = request.get('request', None)
161
      data = request.get('data', None)
162
      if method is None or data is None:
163
        print "no method or data in request"
164
        break
165
      print "request:", method, data
166
      result = self._ops.handle_request(method, data)
167
      print "result:", result
168
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
169

    
170
  def read_message(self):
171
    while not self._msgs:
172
      data = self.request.recv(self.READ_SIZE)
173
      if not data:
174
        return None
175
      new_msgs = (self._buffer + data).split(self.EOM)
176
      self._buffer = new_msgs.pop()
177
      self._msgs.extend(new_msgs)
178
    return self._msgs.popleft()
179

    
180
  def send_message(self, msg):
181
    #print "sending", msg
182
    self.request.sendall(msg + self.EOM)
183

    
184

    
185
class ClientOps:
186
  """Class holding high-level client operations."""
187
  def __init__(self, server):
188
    self.server = server
189
    self._cpu = None
190

    
191
  def _getcpu(self):
192
    if self._cpu is None:
193
      self._cpu = mcpu.Processor(lambda x: None)
194
    return self._cpu
195

    
196
  def handle_request(self, operation, args):
197
    print operation, args
198
    if operation == "submit":
199
      return self.put(args)
200
    elif operation == "query":
201
      return self.query(args)
202
    else:
203
      raise ValueError("Invalid operation")
204

    
205
  def put(self, args):
206
    job = luxi.UnserializeJob(args)
207
    rid = self.server.queue.put(job)
208
    return rid
209

    
210
  def query(self, args):
211
    path = args["object"]
212
    fields = args["fields"]
213
    names = args["names"]
214
    if path == "instances":
215
      opclass = opcodes.OpQueryInstances
216
    elif path == "jobs":
217
      # early exit because job query-ing is special (not via opcodes)
218
      return self.query_jobs(fields, names)
219
    else:
220
      raise ValueError("Invalid object %s" % path)
221

    
222
    op = opclass(output_fields = fields, names=names)
223
    cpu = self._getcpu()
224
    result = cpu.ExecOpCode(op)
225
    return result
226

    
227
  def query_jobs(self, fields, names):
228
    return self.server.queue.query_jobs(fields, names)
229

    
230

    
231
def JobRunner(proc, job):
232
  """Job executor.
233

    
234
  This functions processes a single job in the context of given
235
  processor instance.
236

    
237
  """
238
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
239
  fail = False
240
  for idx, op in enumerate(job.data.op_list):
241
    job.data.op_status[idx] = opcodes.Job.STATUS_RUNNING
242
    try:
243
      job.data.op_result[idx] = proc.ExecOpCode(op)
244
      job.data.op_status[idx] = opcodes.Job.STATUS_SUCCESS
245
    except (errors.OpPrereqError, errors.OpExecError), err:
246
      fail = True
247
      job.data.op_result[idx] = str(err)
248
      job.data.op_status[idx] = opcodes.Job.STATUS_FAIL
249
  if fail:
250
    job.SetStatus(opcodes.Job.STATUS_FAIL)
251
  else:
252
    job.SetStatus(opcodes.Job.STATUS_SUCCESS)
253

    
254

    
255
def PoolWorker(worker_id, incoming_queue):
256
  """A worker thread function.
257

    
258
  This is the actual processor of a single thread of Job execution.
259

    
260
  """
261
  while True:
262
    print "worker %s sleeping" % worker_id
263
    item = incoming_queue.get(True)
264
    if item is None:
265
      break
266
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
267
    #utils.Lock('cmd')
268
    try:
269
      proc = mcpu.Processor(feedback=lambda x: None)
270
      try:
271
        JobRunner(proc, item)
272
      except errors.GenericError, err:
273
        print "ganeti exception %s" % err
274
    finally:
275
      #utils.Unlock('cmd')
276
      #utils.LockCleanup()
277
      pass
278
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
279
  print "worker %s exiting" % worker_id
280

    
281

    
282
def CheckMaster(debug):
283
  """Checks the node setup.
284

    
285
  If this is the master, the function will return. Otherwise it will
286
  exit with an exit code based on the node status.
287

    
288
  """
289
  try:
290
    ss = ssconf.SimpleStore()
291
    master_name = ss.GetMasterNode()
292
  except errors.ConfigurationError, err:
293
    print "Cluster configuration incomplete: '%s'" % str(err)
294
    sys.exit(EXIT_NODESETUP_ERROR)
295

    
296
  try:
297
    myself = utils.HostInfo()
298
  except errors.ResolverError, err:
299
    sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
300
    sys.exit(EXIT_NODESETUP_ERROR)
301

    
302
  if myself.name != master_name:
303
    if debug:
304
      sys.stderr.write("Not master, exiting.\n")
305
    sys.exit(EXIT_NOTMASTER)
306

    
307

    
308
def ParseOptions():
309
  """Parse the command line options.
310

    
311
  Returns:
312
    (options, args) as from OptionParser.parse_args()
313

    
314
  """
315
  parser = OptionParser(description="Ganeti master daemon",
316
                        usage="%prog [-f] [-d]",
317
                        version="%%prog (ganeti) %s" %
318
                        constants.RELEASE_VERSION)
319

    
320
  parser.add_option("-f", "--foreground", dest="fork",
321
                    help="Don't detach from the current terminal",
322
                    default=True, action="store_false")
323
  parser.add_option("-d", "--debug", dest="debug",
324
                    help="Enable some debug messages",
325
                    default=False, action="store_true")
326
  options, args = parser.parse_args()
327
  return options, args
328

    
329

    
330
def main():
331
  """Main function"""
332

    
333
  options, args = ParseOptions()
334
  utils.debug = options.debug
335

    
336
  CheckMaster(options.debug)
337

    
338
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
339

    
340
  # become a daemon
341
  if options.fork:
342
    utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
343
                    noclose_fds=[master.fileno()])
344

    
345
  try:
346
    utils.Lock('cmd', debug=options.debug)
347
  except errors.LockError, err:
348
    print >> sys.stderr, str(err)
349
    master.server_cleanup()
350
    return
351

    
352
  try:
353
    master.setup_processors()
354
    try:
355
      master.serve_forever()
356
    finally:
357
      master.server_cleanup()
358
  finally:
359
    utils.Unlock('cmd')
360
    utils.LockCleanup()
361

    
362

    
363
if __name__ == "__main__":
364
  main()