root / lib / rpc / transport.py @ 51b69fc8
History | View | Annotate | Download (6.3 kB)
1 | ff1012ef | Petr Pudlak | #
|
---|---|---|---|
2 | ff1012ef | Petr Pudlak | #
|
3 | ff1012ef | Petr Pudlak | |
4 | ff1012ef | Petr Pudlak | # Copyright (C) 2013 Google Inc.
|
5 | ff1012ef | Petr Pudlak | #
|
6 | ff1012ef | Petr Pudlak | # This program is free software; you can redistribute it and/or modify
|
7 | ff1012ef | Petr Pudlak | # it under the terms of the GNU General Public License as published by
|
8 | ff1012ef | Petr Pudlak | # the Free Software Foundation; either version 2 of the License, or
|
9 | ff1012ef | Petr Pudlak | # (at your option) any later version.
|
10 | ff1012ef | Petr Pudlak | #
|
11 | ff1012ef | Petr Pudlak | # This program is distributed in the hope that it will be useful, but
|
12 | ff1012ef | Petr Pudlak | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | ff1012ef | Petr Pudlak | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | ff1012ef | Petr Pudlak | # General Public License for more details.
|
15 | ff1012ef | Petr Pudlak | #
|
16 | ff1012ef | Petr Pudlak | # You should have received a copy of the GNU General Public License
|
17 | ff1012ef | Petr Pudlak | # along with this program; if not, write to the Free Software
|
18 | ff1012ef | Petr Pudlak | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | ff1012ef | Petr Pudlak | # 02110-1301, USA.
|
20 | ff1012ef | Petr Pudlak | |
21 | ff1012ef | Petr Pudlak | |
22 | ff1012ef | Petr Pudlak | """Module that defines a transport for RPC connections.
|
23 | ff1012ef | Petr Pudlak |
|
24 | ff1012ef | Petr Pudlak | A transport can send to and receive messages from some endpoint.
|
25 | ff1012ef | Petr Pudlak |
|
26 | ff1012ef | Petr Pudlak | """
|
27 | ff1012ef | Petr Pudlak | |
28 | ff1012ef | Petr Pudlak | import collections |
29 | ff1012ef | Petr Pudlak | import errno |
30 | ff1012ef | Petr Pudlak | import socket |
31 | ff1012ef | Petr Pudlak | import time |
32 | ff1012ef | Petr Pudlak | |
33 | ff1012ef | Petr Pudlak | from ganeti import constants |
34 | ff1012ef | Petr Pudlak | from ganeti import utils |
35 | ff1012ef | Petr Pudlak | from ganeti.rpc import errors |
36 | ff1012ef | Petr Pudlak | |
37 | ff1012ef | Petr Pudlak | |
38 | ff1012ef | Petr Pudlak | DEF_CTMO = constants.LUXI_DEF_CTMO |
39 | ff1012ef | Petr Pudlak | DEF_RWTO = constants.LUXI_DEF_RWTO |
40 | ff1012ef | Petr Pudlak | |
41 | ff1012ef | Petr Pudlak | |
42 | ff1012ef | Petr Pudlak | class Transport: |
43 | ff1012ef | Petr Pudlak | """Low-level transport class.
|
44 | ff1012ef | Petr Pudlak |
|
45 | ff1012ef | Petr Pudlak | This is used on the client side.
|
46 | ff1012ef | Petr Pudlak |
|
47 | ff1012ef | Petr Pudlak | This could be replace by any other class that provides the same
|
48 | ff1012ef | Petr Pudlak | semantics to the Client. This means:
|
49 | ff1012ef | Petr Pudlak | - can send messages and receive messages
|
50 | ff1012ef | Petr Pudlak | - safe for multithreading
|
51 | ff1012ef | Petr Pudlak |
|
52 | ff1012ef | Petr Pudlak | """
|
53 | ff1012ef | Petr Pudlak | |
54 | ff1012ef | Petr Pudlak | def __init__(self, address, timeouts=None): |
55 | ff1012ef | Petr Pudlak | """Constructor for the Client class.
|
56 | ff1012ef | Petr Pudlak |
|
57 | ff1012ef | Petr Pudlak | Arguments:
|
58 | ff1012ef | Petr Pudlak | - address: a valid address the the used transport class
|
59 | ff1012ef | Petr Pudlak | - timeout: a list of timeouts, to be used on connect and read/write
|
60 | ff1012ef | Petr Pudlak |
|
61 | ff1012ef | Petr Pudlak | There are two timeouts used since we might want to wait for a long
|
62 | ff1012ef | Petr Pudlak | time for a response, but the connect timeout should be lower.
|
63 | ff1012ef | Petr Pudlak |
|
64 | ff1012ef | Petr Pudlak | If not passed, we use a default of 10 and respectively 60 seconds.
|
65 | ff1012ef | Petr Pudlak |
|
66 | ff1012ef | Petr Pudlak | Note that on reading data, since the timeout applies to an
|
67 | ff1012ef | Petr Pudlak | invidual receive, it might be that the total duration is longer
|
68 | ff1012ef | Petr Pudlak | than timeout value passed (we make a hard limit at twice the read
|
69 | ff1012ef | Petr Pudlak | timeout).
|
70 | ff1012ef | Petr Pudlak |
|
71 | ff1012ef | Petr Pudlak | """
|
72 | ff1012ef | Petr Pudlak | self.address = address
|
73 | ff1012ef | Petr Pudlak | if timeouts is None: |
74 | ff1012ef | Petr Pudlak | self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO |
75 | ff1012ef | Petr Pudlak | else:
|
76 | ff1012ef | Petr Pudlak | self._ctimeout, self._rwtimeout = timeouts |
77 | ff1012ef | Petr Pudlak | |
78 | ff1012ef | Petr Pudlak | self.socket = None |
79 | ff1012ef | Petr Pudlak | self._buffer = "" |
80 | ff1012ef | Petr Pudlak | self._msgs = collections.deque()
|
81 | ff1012ef | Petr Pudlak | |
82 | ff1012ef | Petr Pudlak | try:
|
83 | ff1012ef | Petr Pudlak | self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
84 | ff1012ef | Petr Pudlak | |
85 | ff1012ef | Petr Pudlak | # Try to connect
|
86 | ff1012ef | Petr Pudlak | try:
|
87 | ff1012ef | Petr Pudlak | utils.Retry(self._Connect, 1.0, self._ctimeout, |
88 | ff1012ef | Petr Pudlak | args=(self.socket, address, self._ctimeout)) |
89 | ff1012ef | Petr Pudlak | except utils.RetryTimeout:
|
90 | ff1012ef | Petr Pudlak | raise errors.TimeoutError("Connect timed out") |
91 | ff1012ef | Petr Pudlak | |
92 | ff1012ef | Petr Pudlak | self.socket.settimeout(self._rwtimeout) |
93 | ff1012ef | Petr Pudlak | except (socket.error, errors.NoMasterError):
|
94 | ff1012ef | Petr Pudlak | if self.socket is not None: |
95 | ff1012ef | Petr Pudlak | self.socket.close()
|
96 | ff1012ef | Petr Pudlak | self.socket = None |
97 | ff1012ef | Petr Pudlak | raise
|
98 | ff1012ef | Petr Pudlak | |
99 | ff1012ef | Petr Pudlak | @staticmethod
|
100 | ff1012ef | Petr Pudlak | def _Connect(sock, address, timeout): |
101 | ff1012ef | Petr Pudlak | sock.settimeout(timeout) |
102 | ff1012ef | Petr Pudlak | try:
|
103 | ff1012ef | Petr Pudlak | sock.connect(address) |
104 | ff1012ef | Petr Pudlak | except socket.timeout, err:
|
105 | ff1012ef | Petr Pudlak | raise errors.TimeoutError("Connect timed out: %s" % str(err)) |
106 | ff1012ef | Petr Pudlak | except socket.error, err:
|
107 | ff1012ef | Petr Pudlak | error_code = err.args[0]
|
108 | ff1012ef | Petr Pudlak | if error_code in (errno.ENOENT, errno.ECONNREFUSED): |
109 | ff1012ef | Petr Pudlak | raise errors.NoMasterError(address)
|
110 | ff1012ef | Petr Pudlak | elif error_code in (errno.EPERM, errno.EACCES): |
111 | ff1012ef | Petr Pudlak | raise errors.PermissionError(address)
|
112 | ff1012ef | Petr Pudlak | elif error_code == errno.EAGAIN:
|
113 | ff1012ef | Petr Pudlak | # Server's socket backlog is full at the moment
|
114 | ff1012ef | Petr Pudlak | raise utils.RetryAgain()
|
115 | ff1012ef | Petr Pudlak | raise
|
116 | ff1012ef | Petr Pudlak | |
117 | ff1012ef | Petr Pudlak | def _CheckSocket(self): |
118 | ff1012ef | Petr Pudlak | """Make sure we are connected.
|
119 | ff1012ef | Petr Pudlak |
|
120 | ff1012ef | Petr Pudlak | """
|
121 | ff1012ef | Petr Pudlak | if self.socket is None: |
122 | ff1012ef | Petr Pudlak | raise errors.ProtocolError("Connection is closed") |
123 | ff1012ef | Petr Pudlak | |
124 | ff1012ef | Petr Pudlak | def Send(self, msg): |
125 | ff1012ef | Petr Pudlak | """Send a message.
|
126 | ff1012ef | Petr Pudlak |
|
127 | ff1012ef | Petr Pudlak | This just sends a message and doesn't wait for the response.
|
128 | ff1012ef | Petr Pudlak |
|
129 | ff1012ef | Petr Pudlak | """
|
130 | ff1012ef | Petr Pudlak | if constants.LUXI_EOM in msg: |
131 | ff1012ef | Petr Pudlak | raise errors.ProtocolError("Message terminator found in payload") |
132 | ff1012ef | Petr Pudlak | |
133 | ff1012ef | Petr Pudlak | self._CheckSocket()
|
134 | ff1012ef | Petr Pudlak | try:
|
135 | ff1012ef | Petr Pudlak | # TODO: sendall is not guaranteed to send everything
|
136 | ff1012ef | Petr Pudlak | self.socket.sendall(msg + constants.LUXI_EOM)
|
137 | ff1012ef | Petr Pudlak | except socket.timeout, err:
|
138 | ff1012ef | Petr Pudlak | raise errors.TimeoutError("Sending timeout: %s" % str(err)) |
139 | ff1012ef | Petr Pudlak | |
140 | ff1012ef | Petr Pudlak | def Recv(self): |
141 | ff1012ef | Petr Pudlak | """Try to receive a message from the socket.
|
142 | ff1012ef | Petr Pudlak |
|
143 | ff1012ef | Petr Pudlak | In case we already have messages queued, we just return from the
|
144 | ff1012ef | Petr Pudlak | queue. Otherwise, we try to read data with a _rwtimeout network
|
145 | ff1012ef | Petr Pudlak | timeout, and making sure we don't go over 2x_rwtimeout as a global
|
146 | ff1012ef | Petr Pudlak | limit.
|
147 | ff1012ef | Petr Pudlak |
|
148 | ff1012ef | Petr Pudlak | """
|
149 | ff1012ef | Petr Pudlak | self._CheckSocket()
|
150 | ff1012ef | Petr Pudlak | etime = time.time() + self._rwtimeout
|
151 | ff1012ef | Petr Pudlak | while not self._msgs: |
152 | ff1012ef | Petr Pudlak | if time.time() > etime:
|
153 | ff1012ef | Petr Pudlak | raise errors.TimeoutError("Extended receive timeout") |
154 | ff1012ef | Petr Pudlak | while True: |
155 | ff1012ef | Petr Pudlak | try:
|
156 | ff1012ef | Petr Pudlak | data = self.socket.recv(4096) |
157 | ff1012ef | Petr Pudlak | except socket.timeout, err:
|
158 | ff1012ef | Petr Pudlak | raise errors.TimeoutError("Receive timeout: %s" % str(err)) |
159 | ff1012ef | Petr Pudlak | except socket.error, err:
|
160 | ff1012ef | Petr Pudlak | if err.args and err.args[0] == errno.EAGAIN: |
161 | ff1012ef | Petr Pudlak | continue
|
162 | ff1012ef | Petr Pudlak | raise
|
163 | ff1012ef | Petr Pudlak | break
|
164 | ff1012ef | Petr Pudlak | if not data: |
165 | ff1012ef | Petr Pudlak | raise errors.ConnectionClosedError("Connection closed while reading") |
166 | ff1012ef | Petr Pudlak | new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
|
167 | ff1012ef | Petr Pudlak | self._buffer = new_msgs.pop()
|
168 | ff1012ef | Petr Pudlak | self._msgs.extend(new_msgs)
|
169 | ff1012ef | Petr Pudlak | return self._msgs.popleft() |
170 | ff1012ef | Petr Pudlak | |
171 | ff1012ef | Petr Pudlak | def Call(self, msg): |
172 | ff1012ef | Petr Pudlak | """Send a message and wait for the response.
|
173 | ff1012ef | Petr Pudlak |
|
174 | ff1012ef | Petr Pudlak | This is just a wrapper over Send and Recv.
|
175 | ff1012ef | Petr Pudlak |
|
176 | ff1012ef | Petr Pudlak | """
|
177 | ff1012ef | Petr Pudlak | self.Send(msg)
|
178 | ff1012ef | Petr Pudlak | return self.Recv() |
179 | ff1012ef | Petr Pudlak | |
180 | f3aebf6f | Petr Pudlak | @staticmethod
|
181 | f3aebf6f | Petr Pudlak | def RetryOnBrokenPipe(fn, on_error): |
182 | f3aebf6f | Petr Pudlak | """Calls a given function, retrying if it fails on the 'Broken pipe' IO
|
183 | f3aebf6f | Petr Pudlak | exception.
|
184 | f3aebf6f | Petr Pudlak |
|
185 | f3aebf6f | Petr Pudlak | This allows to re-establish a broken connection and retry an IO operation.
|
186 | f3aebf6f | Petr Pudlak |
|
187 | f3aebf6f | Petr Pudlak | The function receives one an integer argument stating the current retry
|
188 | f3aebf6f | Petr Pudlak | number, 0 being the first call, 1 being the retry.
|
189 | f3aebf6f | Petr Pudlak |
|
190 | f3aebf6f | Petr Pudlak | If any exception occurs, on_error is invoked first with the exception given
|
191 | f3aebf6f | Petr Pudlak | as an argument. Then, if the exception is 'Broken pipe', the function call
|
192 | f3aebf6f | Petr Pudlak | is retried once more.
|
193 | f3aebf6f | Petr Pudlak |
|
194 | f3aebf6f | Petr Pudlak | """
|
195 | f3aebf6f | Petr Pudlak | retries = 2
|
196 | f3aebf6f | Petr Pudlak | for try_no in range(0, retries): |
197 | f3aebf6f | Petr Pudlak | try:
|
198 | f3aebf6f | Petr Pudlak | return fn(try_no)
|
199 | f3aebf6f | Petr Pudlak | except socket.error, ex:
|
200 | f3aebf6f | Petr Pudlak | on_error(ex) |
201 | f3aebf6f | Petr Pudlak | # we retry on "Broken pipe", unless it's the last try
|
202 | f3aebf6f | Petr Pudlak | if try_no == retries - 1: |
203 | f3aebf6f | Petr Pudlak | raise
|
204 | f3aebf6f | Petr Pudlak | elif not (isinstance(ex.args, tuple) and (ex[0] == errno.EPIPE)): |
205 | f3aebf6f | Petr Pudlak | raise
|
206 | f3aebf6f | Petr Pudlak | except Exception, ex: |
207 | f3aebf6f | Petr Pudlak | on_error(ex) |
208 | f3aebf6f | Petr Pudlak | raise
|
209 | f3aebf6f | Petr Pudlak | assert False # we should never get here |
210 | f3aebf6f | Petr Pudlak | |
211 | ff1012ef | Petr Pudlak | def Close(self): |
212 | ff1012ef | Petr Pudlak | """Close the socket"""
|
213 | ff1012ef | Petr Pudlak | if self.socket is not None: |
214 | ff1012ef | Petr Pudlak | self.socket.close()
|
215 | ff1012ef | Petr Pudlak | self.socket = None |