Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-masterd @ 7a1ecaed

History | View | Annotate | Download (6.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Master daemon program.
23

    
24
Some classes deviates from the standard style guide since the
25
inheritance from parent classes requires it.
26

    
27
"""
28

    
29

    
30
import SocketServer
31
import threading
32
import time
33
import collections
34
import Queue
35
import random
36
import signal
37
import simplejson
38

    
39

    
40
from cStringIO import StringIO
41

    
42
from ganeti import constants
43
from ganeti import mcpu
44
from ganeti import opcodes
45
from ganeti import jqueue
46
from ganeti import luxi
47
from ganeti import utils
48

    
49

    
50
class IOServer(SocketServer.UnixStreamServer):
51
  """IO thread class.
52

    
53
  This class takes care of initializing the other threads, setting
54
  signal handlers (which are processed only in this thread), and doing
55
  cleanup at shutdown.
56

    
57
  """
58
  QUEUE_PROCESSOR_SIZE = 1
59

    
60
  def __init__(self, address, rqhandler):
61
    SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
62
    self.do_quit = False
63
    self.queue = jqueue.QueueManager()
64
    self.processors = []
65
    for i in range(self.QUEUE_PROCESSOR_SIZE):
66
      self.processors.append(threading.Thread(target=PoolWorker,
67
                                              args=(i, self.queue.new_queue)))
68
    for t in self.processors:
69
      t.start()
70
    signal.signal(signal.SIGINT, self.handle_sigint)
71

    
72
  def process_request_thread(self, request, client_address):
73
    """Process the request.
74

    
75
    This is copied from the code in ThreadingMixIn.
76

    
77
    """
78
    try:
79
      self.finish_request(request, client_address)
80
      self.close_request(request)
81
    except:
82
      self.handle_error(request, client_address)
83
      self.close_request(request)
84

    
85
  def process_request(self, request, client_address):
86
    """Start a new thread to process the request.
87

    
88
    This is copied from the coode in ThreadingMixIn.
89

    
90
    """
91
    t = threading.Thread(target=self.process_request_thread,
92
                         args=(request, client_address))
93
    t.start()
94

    
95
  def handle_sigint(self, signum, frame):
96
    print "received %s in %s" % (signum, frame)
97
    self.do_quit = True
98
    self.server_close()
99
    for i in range(self.QUEUE_PROCESSOR_SIZE):
100
      self.queue.new_queue.put(None)
101

    
102
  def serve_forever(self):
103
    """Handle one request at a time until told to quit."""
104
    while not self.do_quit:
105
      self.handle_request()
106

    
107

    
108
class ClientRqHandler(SocketServer.BaseRequestHandler):
109
  """Client handler"""
110
  EOM = '\3'
111
  READ_SIZE = 4096
112

    
113
  def setup(self):
114
    self._buffer = ""
115
    self._msgs = collections.deque()
116
    self._ops = ClientOps(self.server)
117

    
118
  def handle(self):
119
    while True:
120
      msg = self.read_message()
121
      if msg is None:
122
        print "client closed connection"
123
        break
124
      request = simplejson.loads(msg)
125
      if not isinstance(request, dict):
126
        print "wrong request received: %s" % msg
127
        break
128
      method = request.get('request', None)
129
      data = request.get('data', None)
130
      if method is None or data is None:
131
        print "no method or data in request"
132
        break
133
      print "request:", method, data
134
      result = self._ops.handle_request(method, data)
135
      print "result:", result
136
      self.send_message(simplejson.dumps({'success': True, 'result': result}))
137

    
138
  def read_message(self):
139
    while not self._msgs:
140
      data = self.request.recv(self.READ_SIZE)
141
      if not data:
142
        return None
143
      new_msgs = (self._buffer + data).split(self.EOM)
144
      self._buffer = new_msgs.pop()
145
      self._msgs.extend(new_msgs)
146
    return self._msgs.popleft()
147

    
148
  def send_message(self, msg):
149
    #print "sending", msg
150
    self.request.sendall(msg + self.EOM)
151

    
152

    
153
class ClientOps:
154
  """Class holding high-level client operations."""
155
  def __init__(self, server):
156
    self.server = server
157
    self._cpu = None
158

    
159
  def _getcpu(self):
160
    if self._cpu is None:
161
      self._cpu = mcpu.Processor(lambda x: None)
162
    return self._cpu
163

    
164
  def handle_request(self, operation, args):
165
    print operation, args
166
    if operation == "submit":
167
      return self.put(args)
168
    elif operation == "query":
169
      return self.query(args)
170
    else:
171
      raise ValueError("Invalid operation")
172

    
173
  def put(self, args):
174
    job = luxi.UnserializeJob(args)
175
    rid = self.server.queue.put(job)
176
    return rid
177

    
178
  def query(self, args):
179
    path = args["object"]
180
    fields = args["fields"]
181
    names = args["names"]
182
    if path == "instances":
183
      opclass = opcodes.OpQueryInstances
184
    elif path == "jobs":
185
      # early exit because job query-ing is special (not via opcodes)
186
      return self.query_jobs(fields, names)
187
    else:
188
      raise ValueError("Invalid object %s" % path)
189

    
190
    op = opclass(output_fields = fields, names=names)
191
    cpu = self._getcpu()
192
    result = cpu.ExecOpCode(op)
193
    return result
194

    
195
  def query_jobs(self, fields, names):
196
    return self.server.queue.query_jobs(fields, names)
197

    
198

    
199
def JobRunner(proc, job):
200
  """Job executor.
201

    
202
  This functions processes a single job in the context of given
203
  processor instance.
204

    
205
  """
206
  job.SetStatus(opcodes.Job.STATUS_RUNNING)
207
  for op in job.data.op_list:
208
    proc.ExecOpCode(op)
209
  job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK)
210

    
211

    
212
def PoolWorker(worker_id, incoming_queue):
213
  """A worker thread function.
214

    
215
  This is the actual processor of a single thread of Job execution.
216

    
217
  """
218
  while True:
219
    print "worker %s sleeping" % worker_id
220
    item = incoming_queue.get(True)
221
    if item is None:
222
      break
223
    print "worker %s processing job %s" % (worker_id, item.data.job_id)
224
    utils.Lock('cmd')
225
    try:
226
      proc = mcpu.Processor(feedback=lambda x: None)
227
      try:
228
        JobRunner(proc, item)
229
      except errors.GenericError, err:
230
        print "ganeti exception %s" % err
231
    finally:
232
      utils.Unlock('cmd')
233
      utils.LockCleanup()
234
    print "worker %s finish job %s" % (worker_id, item.data.job_id)
235
  print "worker %s exiting" % worker_id
236

    
237

    
238
def main():
239
  """Main function"""
240

    
241
  master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
242
  master.serve_forever()
243

    
244

    
245
if __name__ == "__main__":
246
  main()