Statistics
| Branch: | Tag: | Revision:

root / vncauthproxy / proxy.py @ b129b0c0

History | View | Annotate | Download (25 kB)

1
#!/usr/bin/env python
2
"""
3
vncauthproxy - a VNC authentication proxy
4
"""
5
#
6
# Copyright (c) 2010-2013 Greek Research and Technology Network S.A.
7
#
8
# This program is free software; you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation; either version 2 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful, but
14
# WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16
# General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program; if not, write to the Free Software
20
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21
# 02110-1301, USA.
22

    
23
DEFAULT_BIND_ADDRESS = None
24
DEFAULT_LPORT = 24999
25
DEFAULT_LOG_FILE = "/var/log/vncauthproxy/vncauthproxy.log"
26
DEFAULT_PID_FILE = "/var/run/vncauthproxy/vncauthproxy.pid"
27
DEFAULT_CONNECT_TIMEOUT = 30
28
DEFAULT_CONNECT_RETRIES = 3
29
DEFAULT_RETRY_WAIT = 0.1
30
DEFAULT_BACKLOG = 256
31
DEFAULT_SOCK_TIMEOUT = 60.0
32
# We must take care not to fall into the ephemeral port range,
33
# this can lead to transient failures to bind a chosen port.
34
#
35
# By default, Linux uses 32768 to 61000, see:
36
# http://www.ncftp.com/ncftpd/doc/misc/ephemeral_ports.html#Linux
37
# so 25000-30000 seems to be a sensible default.
38
DEFAULT_MIN_PORT = 25000
39
DEFAULT_MAX_PORT = 30000
40

    
41
import os
42
import sys
43
import logging
44
import gevent
45
import gevent.event
46
import daemon
47
import random
48
import daemon.runner
49

    
50
import rfb
51

    
52
try:
53
    import simplejson as json
54
except ImportError:
55
    import json
56

    
57
from gevent import socket
58
from signal import SIGINT, SIGTERM
59
from gevent.select import select
60

    
61
from lockfile import LockTimeout, AlreadyLocked
62
# Take care of differences between python-daemon versions.
63
try:
64
    from daemon import pidfile as pidlockfile
65
except:
66
    from daemon import pidlockfile
67

    
68

    
69
logger = None
70

    
71

    
72
# Currently, gevent uses libevent-dns for asynchronous DNS resolution,
73
# which opens a socket upon initialization time. Since we can't get the fd
74
# reliably, We have to maintain all file descriptors open (which won't harm
75
# anyway)
76
class AllFilesDaemonContext(daemon.DaemonContext):
77
    """DaemonContext class keeping all file descriptors open"""
78
    def _get_exclude_file_descriptors(self):
79
        class All:
80
            def __contains__(self, value):
81
                return True
82
        return All()
83

    
84

    
85
class VncAuthProxy(gevent.Greenlet):
86
    """
87
    Simple class implementing a VNC Forwarder with MITM authentication as a
88
    Greenlet
89

90
    VncAuthProxy forwards VNC traffic from a specified port of the local host
91
    to a specified remote host:port. Furthermore, it implements VNC
92
    Authentication, intercepting the client/server handshake and asking the
93
    client for authentication even if the backend requires none.
94

95
    It is primarily intended for use in virtualization environments, as a VNC
96
    ``switch''.
97

98
    """
99
    id = 1
100

    
101
    def __init__(self, logger, listeners, pool, daddr, dport, server, password,
102
                 connect_timeout):
103
        """
104
        @type logger: logging.Logger
105
        @param logger: the logger to use
106
        @type listeners: list
107
        @param listeners: list of listening sockets to use for clients
108
        @type pool: list
109
        @param pool: if not None, return the client number into this port pool
110
        @type daddr: str
111
        @param daddr: destination address (IPv4, IPv6 or hostname)
112
        @type dport: int
113
        @param dport: destination port
114
        @type server: socket
115
        @param server: VNC server socket
116
        @type password: str
117
        @param password: password to request from the client
118
        @type connect_timeout: int
119
        @param connect_timeout: how long to wait for client connections
120
                                (seconds)
121

122
        """
123
        gevent.Greenlet.__init__(self)
124
        self.id = VncAuthProxy.id
125
        VncAuthProxy.id += 1
126
        self.log = logger
127
        self.listeners = listeners
128
        # A list of worker/forwarder greenlets, one for each direction
129
        self.workers = []
130
        # All listening sockets are assumed to be on the same port
131
        self.sport = listeners[0].getsockname()[1]
132
        self.pool = pool
133
        self.daddr = daddr
134
        self.dport = dport
135
        self.server = server
136
        self.password = password
137
        self.client = None
138
        self.timeout = connect_timeout
139

    
140
    def _cleanup(self):
141
        """Cleanup everything: workers, sockets, ports
142

143
        Kill all remaining forwarder greenlets, close all active sockets,
144
        return the source port to the pool if applicable, then exit
145
        gracefully.
146

147
        """
148
        # Make sure all greenlets are dead, then clean them up
149
        self.debug("Cleaning up %d workers", len(self.workers))
150
        for g in self.workers:
151
            g.kill()
152
        gevent.joinall(self.workers)
153
        del self.workers
154

    
155
        self.debug("Cleaning up sockets")
156
        while self.listeners:
157
            self.listeners.pop().close()
158
        if self.server:
159
            self.server.close()
160
        if self.client:
161
            self.client.close()
162

    
163
        # Reintroduce the port number of the client socket in
164
        # the port pool, if applicable.
165
        if not self.pool is None:
166
            self.pool.append(self.sport)
167
            self.debug("Returned port %d to port pool, contains %d ports",
168
                       self.sport, len(self.pool))
169

    
170
        self.info("Cleaned up connection, all done")
171
        raise gevent.GreenletExit
172

    
173
    def __str__(self):
174
        return "VncAuthProxy: %d -> %s:%d" % (self.sport, self.daddr,
175
                                              self.dport)
176

    
177
    def _forward(self, source, dest):
178
        """
179
        Forward traffic from source to dest
180

181
        @type source: socket
182
        @param source: source socket
183
        @type dest: socket
184
        @param dest: destination socket
185

186
        """
187

    
188
        while True:
189
            d = source.recv(16384)
190
            if d == '':
191
                if source == self.client:
192
                    self.info("Client connection closed")
193
                else:
194
                    self.info("Server connection closed")
195
                break
196
            dest.sendall(d)
197
        # No need to close the source and dest sockets here.
198
        # They are owned by and will be closed by the original greenlet.
199

    
200
    def _client_handshake(self):
201
        """
202
        Perform handshake/authentication with a connecting client
203

204
        Outline:
205
        1. Client connects
206
        2. We fake RFB 3.8 protocol and require VNC authentication
207
           [processing also supports RFB 3.3]
208
        3. Client accepts authentication method
209
        4. We send an authentication challenge
210
        5. Client sends the authentication response
211
        6. We check the authentication
212

213
        Upon return, self.client socket is connected to the client.
214

215
        """
216
        self.client.send(rfb.RFB_VERSION_3_8 + "\n")
217
        client_version_str = self.client.recv(1024)
218
        client_version = rfb.check_version(client_version_str)
219
        if not client_version:
220
            self.error("Invalid version: %s", client_version_str)
221
            raise gevent.GreenletExit
222

    
223
        # Both for RFB 3.3 and 3.8
224
        self.debug("Requesting authentication")
225
        auth_request = rfb.make_auth_request(rfb.RFB_AUTHTYPE_VNC,
226
                                             version=client_version)
227
        self.client.send(auth_request)
228

    
229
        # The client gets to propose an authtype only for RFB 3.8
230
        if client_version == rfb.RFB_VERSION_3_8:
231
            res = self.client.recv(1024)
232
            type = rfb.parse_client_authtype(res)
233
            if type == rfb.RFB_AUTHTYPE_ERROR:
234
                self.warn("Client refused authentication: %s", res[1:])
235
            else:
236
                self.debug("Client requested authtype %x", type)
237

    
238
            if type != rfb.RFB_AUTHTYPE_VNC:
239
                self.error("Wrong auth type: %d", type)
240
                self.client.send(rfb.to_u32(rfb.RFB_AUTH_ERROR))
241
                raise gevent.GreenletExit
242

    
243
        # Generate the challenge
244
        challenge = os.urandom(16)
245
        self.client.send(challenge)
246
        response = self.client.recv(1024)
247
        if len(response) != 16:
248
            self.error("Wrong response length %d, should be 16", len(response))
249
            raise gevent.GreenletExit
250

    
251
        if rfb.check_password(challenge, response, self.password):
252
            self.debug("Authentication successful")
253
        else:
254
            self.warn("Authentication failed")
255
            self.client.send(rfb.to_u32(rfb.RFB_AUTH_ERROR))
256
            raise gevent.GreenletExit
257

    
258
        # Accept the authentication
259
        self.client.send(rfb.to_u32(rfb.RFB_AUTH_SUCCESS))
260

    
261
    def _run(self):
262
        try:
263
            self.info("Waiting for a client to connect at %s",
264
                      ", ".join(["%s:%d" % s.getsockname()[:2]
265
                                 for s in self.listeners]))
266
            rlist, _, _ = select(self.listeners, [], [], timeout=self.timeout)
267

    
268
            if not rlist:
269
                self.info("Timed out, no connection after %d sec",
270
                          self.timeout)
271
                raise gevent.GreenletExit
272

    
273
            for sock in rlist:
274
                self.client, addrinfo = sock.accept()
275
                self.info("Connection from %s:%d", *addrinfo[:2])
276

    
277
                # Close all listening sockets, we only want a one-shot
278
                # connection from a single client.
279
                while self.listeners:
280
                    self.listeners.pop().close()
281
                break
282

    
283
            # Perform RFB handshake with the client.
284
            self._client_handshake()
285

    
286
            # Bridge both connections through two "forwarder" greenlets.
287
            # This greenlet will wait until any of the workers dies.
288
            # Final cleanup will take place in _cleanup().
289
            dead = gevent.event.Event()
290
            dead.clear()
291

    
292
            # This callback will get called if any of the two workers dies.
293
            def callback(g):
294
                self.debug("Worker %d/%d died", self.workers.index(g),
295
                           len(self.workers))
296
                dead.set()
297

    
298
            self.workers.append(gevent.spawn(self._forward,
299
                                             self.client, self.server))
300
            self.workers.append(gevent.spawn(self._forward,
301
                                             self.server, self.client))
302
            for g in self.workers:
303
                g.link(callback)
304

    
305
            # Wait until any of the workers dies
306
            self.debug("Waiting for any of %d workers to die",
307
                       len(self.workers))
308
            dead.wait()
309

    
310
            # We can go now, _cleanup() will take care of
311
            # all worker, socket and port cleanup
312
            self.debug("A forwarder died, our work here is done")
313
            raise gevent.GreenletExit
314
        except Exception, e:
315
            # Any unhandled exception in the previous block
316
            # is an error and must be logged accordingly
317
            if not isinstance(e, gevent.GreenletExit):
318
                self.exception(e)
319
            raise e
320
        finally:
321
            self._cleanup()
322

    
323
# Logging support inside VncAuthproxy
324
# Wrap all common logging functions in logging-specific methods
325
for funcname in ["info", "debug", "warn", "error", "critical",
326
                 "exception"]:
327

    
328
    def gen(funcname):
329
        def wrapped_log_func(self, *args, **kwargs):
330
            func = getattr(self.log, funcname)
331
            func("[C%d] %s" % (self.id, args[0]), *args[1:], **kwargs)
332
        return wrapped_log_func
333
    setattr(VncAuthProxy, funcname, gen(funcname))
334

    
335

    
336
def fatal_signal_handler(signame):
337
    logger.info("Caught %s, will raise SystemExit", signame)
338
    raise SystemExit
339

    
340

    
341
def get_listening_sockets(sport):
342
    sockets = []
343

    
344
    # Use two sockets, one for IPv4, one for IPv6. IPv4-to-IPv6 mapped
345
    # addresses do not work reliably everywhere (under linux it may have
346
    # been disabled in /proc/sys/net/ipv6/bind_ipv6_only).
347
    for res in socket.getaddrinfo(None, sport, socket.AF_UNSPEC,
348
                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
349
        af, socktype, proto, canonname, sa = res
350
        try:
351
            s = None
352
            s = socket.socket(af, socktype, proto)
353
            if af == socket.AF_INET6:
354
                # Bind v6 only when AF_INET6, otherwise either v4 or v6 bind
355
                # will fail.
356
                s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
357
            s.bind(sa)
358
            s.listen(1)
359
            sockets.append(s)
360
            logger.debug("Listening on %s:%d", *sa[:2])
361
        except socket.error, msg:
362
            logger.error("Error binding to %s:%d: %s", sa[0], sa[1], msg[1])
363
            if s:
364
                s.close()
365
            while sockets:
366
                sockets.pop().close()
367

    
368
            # Make sure we fail immediately if we cannot get a socket
369
            raise msg
370

    
371
    return sockets
372

    
373

    
374
def perform_server_handshake(daddr, dport, tries, retry_wait, sock_timeout):
375
    """
376
    Initiate a connection with the backend server and perform basic
377
    RFB 3.8 handshake with it.
378

379
    Return a socket connected to the backend server.
380

381
    """
382
    server = None
383

    
384
    while tries:
385
        tries -= 1
386

    
387
        # Initiate server connection
388
        for res in socket.getaddrinfo(daddr, dport, socket.AF_UNSPEC,
389
                                      socket.SOCK_STREAM, 0,
390
                                      socket.AI_PASSIVE):
391
            af, socktype, proto, canonname, sa = res
392
            try:
393
                server = socket.socket(af, socktype, proto)
394
            except socket.error:
395
                server = None
396
                continue
397

    
398
            # Set socket timeout for the initial handshake
399
            server.settimeout(sock_timeout)
400

    
401
            try:
402
                logger.debug("Connecting to %s:%s", *sa[:2])
403
                server.connect(sa)
404
                logger.debug("Connection to %s:%s successful", *sa[:2])
405
            except socket.error:
406
                server.close()
407
                server = None
408
                continue
409

    
410
            # We succesfully connected to the server
411
            tries = 0
412
            break
413

    
414
        # Wait and retry
415
        gevent.sleep(retry_wait)
416

    
417
    if server is None:
418
        raise Exception("Failed to connect to server")
419

    
420
    version = server.recv(1024)
421
    if not rfb.check_version(version):
422
        raise Exception("Unsupported RFB version: %s" % version.strip())
423

    
424
    server.send(rfb.RFB_VERSION_3_8 + "\n")
425

    
426
    res = server.recv(1024)
427
    types = rfb.parse_auth_request(res)
428
    if not types:
429
        raise Exception("Error handshaking with the server")
430

    
431
    else:
432
        logger.debug("Supported authentication types: %s",
433
                     " ".join([str(x) for x in types]))
434

    
435
    if rfb.RFB_AUTHTYPE_NONE not in types:
436
        raise Exception("Error, server demands authentication")
437

    
438
    server.send(rfb.to_u8(rfb.RFB_AUTHTYPE_NONE))
439

    
440
    # Check authentication response
441
    res = server.recv(4)
442
    res = rfb.from_u32(res)
443

    
444
    if res != 0:
445
        raise Exception("Authentication error")
446

    
447
    # Reset the timeout for the rest of the session
448
    server.settimeout(None)
449

    
450
    return server
451

    
452

    
453
def parse_arguments(args):
454
    from optparse import OptionParser
455

    
456
    parser = OptionParser()
457
    parser.add_option("--bind", dest="bind_address",
458
                      default=DEFAULT_BIND_ADDRESS,
459
                      metavar="ADDRESS",
460
                      help=("Address to listen for control connections"))
461
    parser.add_option( "--lport", dest="lport",
462
                      default=DEFAULT_LPORT,
463
                      metavar="LPORT",
464
                      help=("Port to listen for control connections"))
465
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
466
                      help="Enable debugging information")
467
    parser.add_option("-l", "--log", dest="log_file",
468
                      default=DEFAULT_LOG_FILE,
469
                      metavar="FILE",
470
                      help=("Write log to FILE instead of %s" %
471
                            DEFAULT_LOG_FILE))
472
    parser.add_option('--pid-file', dest="pid_file",
473
                      default=DEFAULT_PID_FILE,
474
                      metavar='PIDFILE',
475
                      help=("Save PID to file (default: %s)" %
476
                            DEFAULT_PID_FILE))
477
    parser.add_option("-t", "--connect-timeout", dest="connect_timeout",
478
                      default=DEFAULT_CONNECT_TIMEOUT, type="int",
479
                      metavar="SECONDS", help=("Wait SECONDS sec for a client "
480
                                               "to connect"))
481
    parser.add_option("-r", "--connect-retries", dest="connect_retries",
482
                      default=DEFAULT_CONNECT_RETRIES, type="int",
483
                      metavar="RETRIES",
484
                      help="How many times to try to connect to the server")
485
    parser.add_option("-w", "--retry-wait", dest="retry_wait",
486
                      default=DEFAULT_RETRY_WAIT, type="float",
487
                      metavar="SECONDS", help=("Retry connection to server "
488
                                               "every SECONDS sec"))
489
    parser.add_option("-p", "--min-port", dest="min_port",
490
                      default=DEFAULT_MIN_PORT, type="int", metavar="MIN_PORT",
491
                      help=("The minimum port number to use for automatically-"
492
                            "allocated ephemeral ports"))
493
    parser.add_option("-P", "--max-port", dest="max_port",
494
                      default=DEFAULT_MAX_PORT, type="int", metavar="MAX_PORT",
495
                      help=("The maximum port number to use for automatically-"
496
                            "allocated ephemeral ports"))
497
    parser.add_option("-b", "--backlog", dest="backlog",
498
                      default=DEFAULT_BACKLOG, type="int", metavar="BACKLOG",
499
                      help=("Length of the backlog queue for the control"
500
                            "connection socket"))
501
    parser.add_option("--socket-timeout", dest="sock_timeout",
502
                      default=DEFAULT_SOCK_TIMEOUT, type="float",
503
                      metavar="SOCK_TIMEOUT",
504
                      help=("Socket timeout for the server handshake"))
505

    
506
    return parser.parse_args(args)
507

    
508

    
509
def establish_connection(client, addr, ports, opts):
510
    # Receive and parse a client request.
511
    response = {
512
        "source_port": 0,
513
        "status": "FAILED",
514
    }
515
    try:
516
        # TODO: support multiple forwardings in the same message?
517
        #
518
        # Control request, in JSON:
519
        #
520
        # {
521
        #     "source_port":
522
        #         <source port or 0 for automatic allocation>,
523
        #     "destination_address":
524
        #         <destination address of backend server>,
525
        #     "destination_port":
526
        #         <destination port>
527
        #     "password":
528
        #         <the password to use to authenticate clients>
529
        # }
530
        #
531
        # The <password> is used for MITM authentication of clients
532
        # connecting to <source_port>, who will subsequently be
533
        # forwarded to a VNC server listening at
534
        # <destination_address>:<destination_port>
535
        #
536
        # Control reply, in JSON:
537
        # {
538
        #     "source_port": <the allocated source port>
539
        #     "status": <one of "OK" or "FAILED">
540
        # }
541
        #
542
        buf = client.recv(1024)
543
        req = json.loads(buf)
544

    
545
        sport_orig = int(req['source_port'])
546
        daddr = req['destination_address']
547
        dport = int(req['destination_port'])
548
        password = req['password']
549
    except Exception, e:
550
        logger.warn("Malformed request: %s", buf)
551
        client.send(json.dumps(response))
552
        client.close()
553

    
554
    # Spawn a new Greenlet to service the request.
555
    server = None
556
    try:
557
        # If the client has so indicated, pick an ephemeral source port
558
        # randomly, and remove it from the port pool.
559
        if sport_orig == 0:
560
            while True:
561
                try:
562
                    sport = random.choice(ports)
563
                    ports.remove(sport)
564
                    break
565
                except ValueError:
566
                    logger.debug("Port %d already taken", sport)
567

    
568
            logger.debug("Got port %d from pool, %d remaining",
569
                         sport, len(ports))
570
            pool = ports
571
        else:
572
            sport = sport_orig
573
            pool = None
574

    
575
        listeners = get_listening_sockets(sport)
576
        server = perform_server_handshake(daddr, dport,
577
                                          opts.connect_retries,
578
                                          opts.retry_wait, opts.sock_timeout)
579

    
580
        VncAuthProxy.spawn(logger, listeners, pool, daddr, dport,
581
                           server, password, opts.connect_timeout)
582

    
583
        logger.info("New forwarding: %d (client req'd: %d) -> %s:%d",
584
                    sport, sport_orig, daddr, dport)
585
        response = {"source_port": sport,
586
                    "status": "OK"}
587
    except IndexError:
588
        logger.error(("FAILED forwarding, out of ports for [req'd by "
589
                      "client: %d -> %s:%d]"),
590
                     sport_orig, daddr, dport)
591
    except Exception, msg:
592
        logger.error(msg)
593
        logger.error(("FAILED forwarding: %d (client req'd: %d) -> "
594
                      "%s:%d"), sport, sport_orig, daddr, dport)
595
        if not pool is None:
596
            pool.append(sport)
597
            logger.debug("Returned port %d to pool, %d remanining",
598
                         sport, len(pool))
599
        if not server is None:
600
            server.close()
601
    finally:
602
        client.send(json.dumps(response))
603
        client.close()
604

    
605

    
606
def main():
607
    """Run the daemon from the command line"""
608

    
609
    (opts, args) = parse_arguments(sys.argv[1:])
610

    
611
    # Create pidfile
612
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
613

    
614
    # Initialize logger
615
    lvl = logging.DEBUG if opts.debug else logging.INFO
616

    
617
    global logger
618
    logger = logging.getLogger("vncauthproxy")
619
    logger.setLevel(lvl)
620
    formatter = logging.Formatter(("%(asctime)s %(module)s[%(process)d] "
621
                                   " %(levelname)s: %(message)s"),
622
                                  "%Y-%m-%d %H:%M:%S")
623
    handler = logging.FileHandler(opts.log_file)
624
    handler.setFormatter(formatter)
625
    logger.addHandler(handler)
626

    
627
    # Become a daemon:
628
    # Redirect stdout and stderr to handler.stream to catch
629
    # early errors in the daemonization process [e.g., pidfile creation]
630
    # which will otherwise go to /dev/null.
631
    daemon_context = AllFilesDaemonContext(
632
        pidfile=pidf,
633
        umask=0022,
634
        stdout=handler.stream,
635
        stderr=handler.stream,
636
        files_preserve=[handler.stream])
637

    
638
    # Remove any stale PID files, left behind by previous invocations
639
    if daemon.runner.is_pidfile_stale(pidf):
640
        logger.warning("Removing stale PID lock file %s", pidf.path)
641
        pidf.break_lock()
642

    
643
    try:
644
        daemon_context.open()
645
    except (AlreadyLocked, LockTimeout):
646
        logger.critical(("Failed to lock PID file %s, another instance "
647
                         "running?"), pidf.path)
648
        sys.exit(1)
649
    logger.info("Became a daemon")
650

    
651
    # A fork() has occured while daemonizing,
652
    # we *must* reinit gevent
653
    gevent.reinit()
654

    
655
    sockets = []
656
    for res in socket.getaddrinfo(opts.bind_address, opts.lport,
657
                             socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
658
                             socket.AI_PASSIVE):
659
        af, socktype, proto, canonname, sa = res
660
        try:
661
            s = None
662
            s = socket.socket(af, socktype, proto)
663
            if af == socket.AF_INET6:
664
                # Bind v6 only when AF_INET6, otherwise either v4 or v6 bind
665
                # will fail.
666
                s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
667
            s.bind(sa)
668
            s.listen(opts.backlog)
669
            sockets.append(s)
670
            logger.info("Control socket listening on %s:%d", *sa[:2])
671
        except socket.error, msg:
672
            logger.critical("Error binding control socket to %s:%d: %s",
673
                         sa[0], sa[1], msg[1])
674
            if s:
675
                s.close()
676
            while sockets:
677
                sockets.pop.close()
678

    
679
            sys.exit(1)
680

    
681
    # Catch signals to ensure graceful shutdown,
682
    # e.g., to make sure the control socket gets unlink()ed.
683
    #
684
    # Uses gevent.signal so the handler fires even during
685
    # gevent.socket.accept()
686
    gevent.signal(SIGINT, fatal_signal_handler, "SIGINT")
687
    gevent.signal(SIGTERM, fatal_signal_handler, "SIGTERM")
688

    
689
    # Init ephemeral port pool
690
    ports = range(opts.min_port, opts.max_port + 1)
691

    
692
    while True:
693
        try:
694
            rlist, _, _ = select(sockets, [], [])
695
            for ctrl in rlist:
696
                client, addr = ctrl.accept()
697
                logger.info("New control connection")
698

    
699
                gevent.Greenlet.spawn(establish_connection, client, addr,
700
                                      ports, opts)
701
        except Exception, e:
702
            logger.exception(e)
703
            continue
704
        except SystemExit:
705
            break
706

    
707
    logger.info("Closing control sockets")
708
    while sockets:
709
        sockets.pop.close()
710
    daemon_context.close()
711
    sys.exit(0)