Statistics
| Branch: | Tag: | Revision:

root / vncauthproxy / proxy.py @ 376a8634

History | View | Annotate | Download (20.6 kB)

1
#!/usr/bin/env python
2
"""
3
vncauthproxy - a VNC authentication proxy
4
"""
5
#
6
# Copyright (c) 2010-2011 Greek Research and Technology Network S.A.
7
#
8
# This program is free software; you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation; either version 2 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful, but
14
# WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16
# General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program; if not, write to the Free Software
20
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21
# 02110-1301, USA.
22

    
23
DEFAULT_CTRL_SOCKET = "/var/run/vncauthproxy/ctrl.sock"
24
DEFAULT_LOG_FILE = "/var/log/vncauthproxy/vncauthproxy.log"
25
DEFAULT_PID_FILE = "/var/run/vncauthproxy/vncauthproxy.pid"
26
DEFAULT_CONNECT_TIMEOUT = 30
27
# Default values per http://www.iana.org/assignments/port-numbers
28
DEFAULT_MIN_PORT = 49152 
29
DEFAULT_MAX_PORT = 65535
30

    
31
import os
32
import sys
33
import logging
34
import gevent
35
import daemon
36
import random
37
import daemon.pidlockfile
38

    
39
import rfb
40
 
41
try:
42
    import simplejson as json
43
except ImportError:
44
    import json
45

    
46
from gevent import socket
47
from signal import SIGINT, SIGTERM
48
from gevent import signal
49
from gevent.select import select
50

    
51
logger = None
52

    
53
# Currently, gevent uses libevent-dns for asynchornous DNS resolution,
54
# which opens a socket upon initialization time. Since we can't get the fd
55
# reliably, We have to maintain all file descriptors open (which won't harm
56
# anyway)
57

    
58
class AllFilesDaemonContext(daemon.DaemonContext):
59
    """DaemonContext class keeping all file descriptors open"""
60
    def _get_exclude_file_descriptors(self):
61
        class All:
62
            def __contains__(self, value):
63
                return True
64
        return All()
65

    
66

    
67
class VncAuthProxy(gevent.Greenlet):
68
    """
69
    Simple class implementing a VNC Forwarder with MITM authentication as a
70
    Greenlet
71

72
    VncAuthProxy forwards VNC traffic from a specified port of the local host
73
    to a specified remote host:port. Furthermore, it implements VNC
74
    Authentication, intercepting the client/server handshake and asking the
75
    client for authentication even if the backend requires none.
76

77
    It is primarily intended for use in virtualization environments, as a VNC
78
    ``switch''.
79

80
    """
81
    id = 1
82

    
83
    def __init__(self, logger, listeners, pool, daddr, dport, password, connect_timeout):
84
        """
85
        @type logger: logging.Logger
86
        @param logger: the logger to use
87
        @type listeners: list
88
        @param listeners: list of listening sockets to use for client connections
89
        @type pool: list
90
        @param pool: if not None, return the client port number into this port pool
91
        @type daddr: str
92
        @param daddr: destination address (IPv4, IPv6 or hostname)
93
        @type dport: int
94
        @param dport: destination port
95
        @type password: str
96
        @param password: password to request from the client
97
        @type connect_timeout: int
98
        @param connect_timeout: how long to wait for client connections
99
                                (seconds)
100

101
        """
102
        gevent.Greenlet.__init__(self)
103
        self.id = VncAuthProxy.id
104
        VncAuthProxy.id += 1
105
        self.log = logger
106
        self.listeners = listeners
107
        # All listening sockets are assumed to be on the same port
108
        self.sport = listeners[0].getsockname()[1]
109
        self.pool = pool
110
        self.daddr = daddr
111
        self.dport = dport
112
        self.password = password
113
        self.server = None
114
        self.client = None
115
        self.timeout = connect_timeout
116

    
117
    def _cleanup(self):
118
        """Close all active sockets and exit gracefully"""
119
        # Reintroduce the port number of the client socket in
120
        # the port pool, if applicable.
121
        if not self.pool is None:
122
            self.pool.append(self.sport)
123
            self.log.debug("Returned port %d to port pool, contains %d ports",
124
                self.sport, len(self.pool))
125

    
126
        while self.listeners:
127
            self.listeners.pop().close()
128
        if self.server:
129
            self.server.close()
130
        if self.client:
131
            self.client.close()
132

    
133
        raise gevent.GreenletExit
134

    
135
    def info(self, msg):
136
        self.log.info("[C%d] %s" % (self.id, msg))
137

    
138
    def debug(self, msg):
139
        self.log.debug("[C%d] %s" % (self.id, msg))
140

    
141
    def warn(self, msg):
142
        self.log.warn("[C%d] %s" % (self.id, msg))
143

    
144
    def error(self, msg):
145
        self.log.error("[C%d] %s" % (self.id, msg))
146

    
147
    def critical(self, msg):
148
        self.log.critical("[C%d] %s" % (self.id, msg))
149

    
150
    def __str__(self):
151
        return "VncAuthProxy: %d -> %s:%d" % (self.sport, self.daddr, self.dport)
152

    
153
    def _forward(self, source, dest):
154
        """
155
        Forward traffic from source to dest
156

157
        @type source: socket
158
        @param source: source socket
159
        @type dest: socket
160
        @param dest: destination socket
161

162
        """
163

    
164
        while True:
165
            d = source.recv(16384)
166
            if d == '':
167
                if source == self.client:
168
                    self.info("Client connection closed")
169
                else:
170
                    self.info("Server connection closed")
171
                break
172
            dest.sendall(d)
173
        # No need to close the source and dest sockets here.
174
        # They are owned by and will be closed by the original greenlet.
175

    
176
    def _handshake(self):
177
        """
178
        Perform handshake/authentication with a connecting client
179

180
        Outline:
181
        1. Client connects
182
        2. We fake RFB 3.8 protocol and require VNC authentication [also supports RFB 3.3]
183
        3. Client accepts authentication method
184
        4. We send an authentication challenge
185
        5. Client sends the authentication response
186
        6. We check the authentication
187
        7. We initiate a connection with the backend server and perform basic
188
           RFB 3.8 handshake with it.
189

190
        Upon return, self.client and self.server are sockets
191
        connected to the client and the backend server, respectively.
192

193
        """
194
        self.client.send(rfb.RFB_VERSION_3_8 + "\n")
195
        client_version_str = self.client.recv(1024)
196
        client_version = rfb.check_version(client_version_str)
197
        if not client_version:
198
            self.error("Invalid version: %s" % client_version_str)
199
            raise gevent.GreenletExit
200

    
201
        # Both for RFB 3.3 and 3.8
202
        self.debug("Requesting authentication")
203
        auth_request = rfb.make_auth_request(rfb.RFB_AUTHTYPE_VNC,
204
            version=client_version)
205
        self.client.send(auth_request)
206

    
207
        # The client gets to propose an authtype only for RFB 3.8
208
        if client_version == rfb.RFB_VERSION_3_8:
209
            res = self.client.recv(1024)
210
            type = rfb.parse_client_authtype(res)
211
            if type == rfb.RFB_AUTHTYPE_ERROR:
212
                self.warn("Client refused authentication: %s" % res[1:])
213
            else:
214
                self.debug("Client requested authtype %x" % type)
215

    
216
            if type != rfb.RFB_AUTHTYPE_VNC:
217
                self.error("Wrong auth type: %d" % type)
218
                self.client.send(rfb.to_u32(rfb.RFB_AUTH_ERROR))
219
                raise gevent.GreenletExit
220
        
221
        # Generate the challenge
222
        challenge = os.urandom(16)
223
        self.client.send(challenge)
224
        response = self.client.recv(1024)
225
        if len(response) != 16:
226
            self.error("Wrong response length %d, should be 16" % len(response))
227
            raise gevent.GreenletExit
228

    
229
        if rfb.check_password(challenge, response, self.password):
230
            self.debug("Authentication successful!")
231
        else:
232
            self.warn("Authentication failed")
233
            self.client.send(rfb.to_u32(rfb.RFB_AUTH_ERROR))
234
            raise gevent.GreenletExit
235

    
236
        # Accept the authentication
237
        self.client.send(rfb.to_u32(rfb.RFB_AUTH_SUCCESS))
238

    
239
        # Try to connect to the server
240
        tries = 50
241

    
242
        while tries:
243
            tries -= 1
244

    
245
            # Initiate server connection
246
            for res in socket.getaddrinfo(self.daddr, self.dport, socket.AF_UNSPEC,
247
                                          socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
248
                af, socktype, proto, canonname, sa = res
249
                try:
250
                    self.server = socket.socket(af, socktype, proto)
251
                except socket.error, msg:
252
                    self.server = None
253
                    continue
254

    
255
                try:
256
                    self.debug("Connecting to %s:%s" % sa[:2])
257
                    self.server.connect(sa)
258
                    self.debug("Connection to %s:%s successful" % sa[:2])
259
                except socket.error, msg:
260
                    self.server.close()
261
                    self.server = None
262
                    continue
263

    
264
                # We succesfully connected to the server
265
                tries = 0
266
                break
267

    
268
            # Wait and retry
269
            gevent.sleep(0.2)
270

    
271
        if self.server is None:
272
            self.error("Failed to connect to server")
273
            raise gevent.GreenletExit
274

    
275
        version = self.server.recv(1024)
276
        if not rfb.check_version(version):
277
            self.error("Unsupported RFB version: %s" % version.strip())
278
            raise gevent.GreenletExit
279

    
280
        self.server.send(rfb.RFB_VERSION_3_8 + "\n")
281

    
282
        res = self.server.recv(1024)
283
        types = rfb.parse_auth_request(res)
284
        if not types:
285
            self.error("Error handshaking with the server")
286
            raise gevent.GreenletExit
287

    
288
        else:
289
            self.debug("Supported authentication types: %s" %
290
                           " ".join([str(x) for x in types]))
291

    
292
        if rfb.RFB_AUTHTYPE_NONE not in types:
293
            self.error("Error, server demands authentication")
294
            raise gevent.GreenletExit
295

    
296
        self.server.send(rfb.to_u8(rfb.RFB_AUTHTYPE_NONE))
297

    
298
        # Check authentication response
299
        res = self.server.recv(4)
300
        res = rfb.from_u32(res)
301

    
302
        if res != 0:
303
            self.error("Authentication error")
304
            raise gevent.GreenletExit
305
       
306
    def _run(self):
307
        try:
308
            self.log.debug("Waiting for client to connect")
309
            rlist, _, _ = select(self.listeners, [], [], timeout=self.timeout)
310

    
311
            if not rlist:
312
                self.info("Timed out, no connection after %d sec" % self.timeout)
313
                raise gevent.GreenletExit
314

    
315
            for sock in rlist:
316
                self.client, addrinfo = sock.accept()
317
                self.info("Connection from %s:%d" % addrinfo[:2])
318

    
319
                # Close all listening sockets, we only want a one-shot connection
320
                # from a single client.
321
                while self.listeners:
322
                    self.listeners.pop().close()
323
                break
324
       
325
            # Perform RFB handshake with the client and the backend server.
326
            # If all goes as planned, we have two connected sockets,
327
            # self.client and self.server.
328
            self._handshake()
329

    
330
            # Bridge both connections through two "forwarder" greenlets.
331
            self.workers = [gevent.spawn(self._forward, self.client, self.server),
332
                gevent.spawn(self._forward, self.server, self.client)]
333
            
334
            # If one greenlet goes, the other has to go too.
335
            self.workers[0].link(self.workers[1])
336
            self.workers[1].link(self.workers[0])
337
            gevent.joinall(self.workers)
338
            del self.workers
339
            raise gevent.GreenletExit
340
        except Exception, e:
341
            # Any unhandled exception in the previous block
342
            # is an error and must be logged accordingly
343
            if not isinstance(e, gevent.GreenletExit):
344
                self.log.exception(e)
345
            raise e
346
        finally:
347
            self._cleanup()
348

    
349

    
350
def fatal_signal_handler(signame):
351
    logger.info("Caught %s, will raise SystemExit" % signame)
352
    raise SystemExit
353

    
354
def get_listening_sockets(sport):
355
    sockets = []
356

    
357
    # Use two sockets, one for IPv4, one for IPv6. IPv4-to-IPv6 mapped
358
    # addresses do not work reliably everywhere (under linux it may have
359
    # been disabled in /proc/sys/net/ipv6/bind_ipv6_only).
360
    for res in socket.getaddrinfo(None, sport, socket.AF_UNSPEC,
361
                                  socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
362
        af, socktype, proto, canonname, sa = res
363
        try:
364
            s = None
365
            s = socket.socket(af, socktype, proto)
366
            if af == socket.AF_INET6:
367
                # Bind v6 only when AF_INET6, otherwise either v4 or v6 bind
368
                # will fail.
369
                s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
370
            s.bind(sa)
371
            s.listen(1)
372
            sockets.append(s)
373
            logger.debug("Listening on %s:%d" % sa[:2])
374
        except socket.error, msg:
375
            logger.error("Error binding to %s:%d: %s" %
376
                           (sa[0], sa[1], msg[1]))
377
            if s:
378
                s.close()
379
            while sockets:
380
                sockets.pop().close()
381
            
382
            # Make sure we fail immediately if we cannot get a socket
383
            raise msg
384
    
385
    return sockets
386

    
387
def parse_arguments(args):
388
    from optparse import OptionParser
389

    
390
    parser = OptionParser()
391
    parser.add_option("-s", "--socket", dest="ctrl_socket",
392
                      default=DEFAULT_CTRL_SOCKET,
393
                      metavar="PATH",
394
                      help="UNIX socket path for control connections (default: %s" %
395
                          DEFAULT_CTRL_SOCKET)
396
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
397
                      help="Enable debugging information")
398
    parser.add_option("-l", "--log", dest="log_file",
399
                      default=DEFAULT_LOG_FILE,
400
                      metavar="FILE",
401
                      help="Write log to FILE instead of %s" % DEFAULT_LOG_FILE),
402
    parser.add_option('--pid-file', dest="pid_file",
403
                      default=DEFAULT_PID_FILE,
404
                      metavar='PIDFILE',
405
                      help="Save PID to file (default: %s)" %
406
                          DEFAULT_PID_FILE)
407
    parser.add_option("-t", "--connect-timeout", dest="connect_timeout",
408
                      default=DEFAULT_CONNECT_TIMEOUT, type="int", metavar="SECONDS",
409
                      help="How long to listen for clients to forward")
410
    parser.add_option("-p", "--min-port", dest="min_port",
411
                      default=DEFAULT_MIN_PORT, type="int", metavar="MIN_PORT",
412
                      help="The minimum port to use for automatically-allocated ephemeral ports")
413
    parser.add_option("-P", "--max-port", dest="max_port",
414
                      default=DEFAULT_MAX_PORT, type="int", metavar="MAX_PORT",
415
                      help="The minimum port to use for automatically-allocated ephemeral ports")
416

    
417
    return parser.parse_args(args)
418

    
419

    
420
def main():
421
    """Run the daemon from the command line."""
422

    
423
    (opts, args) = parse_arguments(sys.argv[1:])
424

    
425
    # Create pidfile
426
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(
427
        opts.pid_file, 10)
428
    
429
    # Initialize logger
430
    lvl = logging.DEBUG if opts.debug else logging.INFO
431

    
432
    global logger
433
    logger = logging.getLogger("vncauthproxy")
434
    logger.setLevel(lvl)
435
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
436
        "%Y-%m-%d %H:%M:%S")
437
    handler = logging.FileHandler(opts.log_file)
438
    handler.setFormatter(formatter)
439
    logger.addHandler(handler)
440

    
441
    # Become a daemon:
442
    # Redirect stdout and stderr to handler.stream to catch
443
    # early errors in the daemonization process [e.g., pidfile creation]
444
    # which will otherwise go to /dev/null.
445
    daemon_context = AllFilesDaemonContext(
446
        pidfile=pidf,
447
        umask=0022,
448
        stdout=handler.stream,
449
        stderr=handler.stream,
450
        files_preserve=[handler.stream])
451
    daemon_context.open()
452
    logger.info("Became a daemon")
453

    
454
    # A fork() has occured while daemonizing,
455
    # we *must* reinit gevent
456
    gevent.reinit()
457

    
458
    if os.path.exists(opts.ctrl_socket):
459
        logger.critical("Socket '%s' already exists" % opts.ctrl_socket)
460
        sys.exit(1)
461

    
462
    # TODO: make this tunable? chgrp as well?
463
    old_umask = os.umask(0007)
464

    
465
    ctrl = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
466
    ctrl.bind(opts.ctrl_socket)
467

    
468
    os.umask(old_umask)
469

    
470
    ctrl.listen(1)
471
    logger.info("Initialized, waiting for control connections at %s" %
472
                 opts.ctrl_socket)
473

    
474
    # Catch signals to ensure graceful shutdown,
475
    # e.g., to make sure the control socket gets unlink()ed.
476
    #
477
    # Uses gevent.signal so the handler fires even during
478
    # gevent.socket.accept()
479
    gevent.signal(SIGINT, fatal_signal_handler, "SIGINT")
480
    gevent.signal(SIGTERM, fatal_signal_handler, "SIGTERM")
481

    
482
    # Init ephemeral port pool
483
    ports = range(opts.min_port, opts.max_port + 1) 
484

    
485
    while True:
486
        try:
487
            client, addr = ctrl.accept()
488
            logger.info("New control connection")
489
           
490
            # Receive and parse a client request.
491
            response = {
492
                "source_port": 0,
493
                "status": "FAILED"
494
            }
495
            try:
496
                # TODO: support multiple forwardings in the same message?
497
                # 
498
                # Control request, in JSON:
499
                #
500
                # {
501
                #     "source_port": <source port or 0 for automatic allocation>,
502
                #     "destination_address": <destination address of backend server>,
503
                #     "destination_port": <destination port>
504
                #     "password": <the password to use for MITM authentication of clients>
505
                # }
506
                # 
507
                # The <password> is used for MITM authentication of clients
508
                # connecting to <source_port>, who will subsequently be forwarded
509
                # to a VNC server at <destination_address>:<destination_port>
510
                #
511
                # Control reply, in JSON:
512
                # {
513
                #     "source_port": <the allocated source port>
514
                #     "status": <one of "OK" or "FAILED">
515
                # }
516
                buf = client.recv(1024)
517
                req = json.loads(buf)
518
                
519
                sport_orig = int(req['source_port'])
520
                daddr = req['destination_address']
521
                dport = int(req['destination_port'])
522
                password = req['password']
523
            except Exception, e:
524
                logger.warn("Malformed request: %s" % buf)
525
                client.send(json.dumps(response))
526
                client.close()
527
                continue
528
            
529
            # Spawn a new Greenlet to service the request.
530
            try:
531
                # If the client has so indicated, pick an ephemeral source port
532
                # randomly, and remove it from the port pool.
533
                if sport_orig == 0:
534
                    sport = random.choice(ports)
535
                    ports.remove(sport)
536
                    logger.debug("Got port %d from port pool, contains %d ports",
537
                        sport, len(ports))
538
                    pool = ports
539
                else:
540
                    sport = sport_orig
541
                    pool = None
542
                listeners = get_listening_sockets(sport)
543
                VncAuthProxy.spawn(logger, listeners, pool, daddr, dport,
544
                    password, opts.connect_timeout)
545
                logger.info("New forwarding [%d (req'd by client: %d) -> %s:%d]" %
546
                    (sport, sport_orig, daddr, dport))
547
                response = {
548
                    "source_port": sport,
549
                    "status": "OK"
550
                }
551
            except IndexError:
552
                logger.error("FAILED forwarding, out of ports for [req'd by "
553
                    "client: %d -> %s:%d]" % (sport_orig, daddr, dport))
554
            except socket.error, msg:
555
                logger.error("FAILED forwarding [%d (req'd by client: %d) -> %s:%d]" %
556
                    (sport, sport_orig, daddr, dport))
557
                if not pool is None:
558
                    pool.append(sport)
559
                    logger.debug("Returned port %d to port pool, contains %d ports",
560
                        sport, len(pool))
561
            finally:
562
                client.send(json.dumps(response))
563
                client.close()
564
        except Exception, e:
565
            logger.exception(e)
566
            continue
567
        except SystemExit:
568
            break
569
 
570
    logger.info("Unlinking control socket at %s" %
571
                 opts.ctrl_socket)
572
    os.unlink(opts.ctrl_socket)
573
    daemon_context.close()
574
    sys.exit(0)