Revision 91b63720

b/snf-pithos-backend/pithos/workers/gevent_archipelago.py
1
# -*- coding: utf-8 -
2
#
3
# Copyright 2013 GRNET S.A. All rights reserved.
4
#
5
# Redistribution and use in source and binary forms, with or
6
# without modification, are permitted provided that the following
7
# conditions are met:
8
#
9
#   1. Redistributions of source code must retain the above
10
#      copyright notice, this list of conditions and the following
11
#      disclaimer.
12
#
13
#   2. Redistributions in binary form must reproduce the above
14
#      copyright notice, this list of conditions and the following
15
#      disclaimer in the documentation and/or other materials
16
#      provided with the distribution.
17
#
18
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
19
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
22
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
25
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
28
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
# POSSIBILITY OF SUCH DAMAGE.
30
#
31
# The views and conclusions contained in the software and
32
# documentation are those of the authors and should not be
33
# interpreted as representing official policies, either expressed
34
# or implied, of GRNET S.A.
35

  
36

  
37
from __future__ import with_statement
38

  
39
import os
40
import sys
41
from datetime import datetime
42

  
43
# workaround on osx, disable kqueue
44
if sys.platform == "darwin":
45
    os.environ['EVENT_NOKQUEUE'] = "1"
46

  
47
try:
48
    import gevent
49
except ImportError:
50
    raise RuntimeError("You need gevent installed to use this worker.")
51
from gevent.pool import Pool
52
from gevent.server import StreamServer
53
from gevent import pywsgi
54
from gevent import select
55

  
56
import gunicorn
57
from gunicorn.workers.async import AsyncWorker
58

  
59
VERSION = "gevent/%s gunicorn/%s" % (gevent.__version__, gunicorn.__version__)
60

  
61
BASE_WSGI_ENV = {
62
    'GATEWAY_INTERFACE': 'CGI/1.1',
63
    'SERVER_SOFTWARE': VERSION,
64
    'SCRIPT_NAME': '',
65
    'wsgi.version': (1, 0),
66
    'wsgi.multithread': False,
67
    'wsgi.multiprocess': False,
68
    'wsgi.run_once': False
69
}
70

  
71

  
72
class GeventArchipelagoWorker(AsyncWorker):
73

  
74
    server_class = None
75
    wsgi_handler = None
76

  
77
    def __init__(self, *args, **kwargs):
78
        super(GeventArchipelagoWorker, self).__init__(*args, **kwargs)
79
        self.worker_id = args[0]
80

  
81
    @classmethod
82
    def setup(cls):
83
        from gevent import monkey
84
        monkey.noisy = False
85
        monkey.patch_all()
86

  
87
    def timeout_ctx(self):
88
        return gevent.Timeout(self.cfg.keepalive, False)
89

  
90
    def run(self):
91
        self.socket.setblocking(1)
92

  
93
        pool = Pool(self.worker_connections)
94
        if self.server_class is not None:
95
            server = self.server_class(
96
                self.socket, application=self.wsgi, spawn=pool, log=self.log,
97
                handler_class=self.wsgi_handler)
98
        else:
99
            server = StreamServer(self.socket, handle=self.handle, spawn=pool)
100

  
101
        server.start()
102
        try:
103
            while self.alive:
104
                self.notify()
105
                if self.ppid != os.getppid():
106
                    self.log.info("Parent changed, shutting down: %s", self)
107
                    break
108

  
109
                gevent.sleep(1.0)
110

  
111
        except KeyboardInterrupt:
112
            pass
113

  
114
        try:
115
            # Try to stop connections until timeout
116
            self.notify()
117
            server.stop(timeout=self.cfg.graceful_timeout)
118
        except:
119
            pass
120

  
121
    def handle_request(self, *args):
122
        try:
123
            super(GeventArchipelagoWorker, self).handle_request(*args)
124
        except gevent.GreenletExit:
125
            pass
126

  
127
    if gevent.version_info[0] == 0:
128

  
129
        def init_process(self):
130
            #gevent 0.13 and older doesn't reinitialize dns for us after
131
            #forking here's the workaround
132
            import gevent.core
133
            gevent.core.dns_shutdown(fail_requests=1)
134
            gevent.core.dns_init()
135
            super(GeventArchipelagoWorker, self).init_process()
136

  
137

  
138
class GeventResponse(object):
139

  
140
    status = None
141
    headers = None
142
    response_length = None
143

  
144
    def __init__(self, status, headers, clength):
145
        self.status = status
146
        self.headers = headers
147
        self.response_length = clength
148

  
149

  
150
class PyWSGIHandler(pywsgi.WSGIHandler):
151

  
152
    def log_request(self):
153
        start = datetime.fromtimestamp(self.time_start)
154
        finish = datetime.fromtimestamp(self.time_finish)
155
        response_time = finish - start
156
        resp = GeventResponse(self.status, self.response_headers,
157
                              self.response_length)
158
        req_headers = [h.split(":", 1) for h in self.headers.headers]
159
        self.server.log.access(resp, req_headers, self.environ, response_time)
160

  
161
    def get_environ(self):
162
        env = super(PyWSGIHandler, self).get_environ()
163
        env['gunicorn.sock'] = self.socket
164
        env['RAW_URI'] = self.path
165
        return env
166

  
167

  
168
class PyWSGIServer(pywsgi.WSGIServer):
169
    base_env = BASE_WSGI_ENV
170

  
171

  
172
class GeventPyWSGIWorker(GeventArchipelagoWorker):
173
    "The Gevent StreamServer based workers."
174
    server_class = PyWSGIServer
175
    wsgi_handler = PyWSGIHandler

Also available in: Unified diff