Statistics
| Branch: | Tag: | Revision:

root / lib / rpc / transport.py @ 51b69fc8

History | View | Annotate | Download (6.3 kB)

1 ff1012ef Petr Pudlak
#
2 ff1012ef Petr Pudlak
#
3 ff1012ef Petr Pudlak
4 ff1012ef Petr Pudlak
# Copyright (C) 2013 Google Inc.
5 ff1012ef Petr Pudlak
#
6 ff1012ef Petr Pudlak
# This program is free software; you can redistribute it and/or modify
7 ff1012ef Petr Pudlak
# it under the terms of the GNU General Public License as published by
8 ff1012ef Petr Pudlak
# the Free Software Foundation; either version 2 of the License, or
9 ff1012ef Petr Pudlak
# (at your option) any later version.
10 ff1012ef Petr Pudlak
#
11 ff1012ef Petr Pudlak
# This program is distributed in the hope that it will be useful, but
12 ff1012ef Petr Pudlak
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 ff1012ef Petr Pudlak
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 ff1012ef Petr Pudlak
# General Public License for more details.
15 ff1012ef Petr Pudlak
#
16 ff1012ef Petr Pudlak
# You should have received a copy of the GNU General Public License
17 ff1012ef Petr Pudlak
# along with this program; if not, write to the Free Software
18 ff1012ef Petr Pudlak
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 ff1012ef Petr Pudlak
# 02110-1301, USA.
20 ff1012ef Petr Pudlak
21 ff1012ef Petr Pudlak
22 ff1012ef Petr Pudlak
"""Module that defines a transport for RPC connections.
23 ff1012ef Petr Pudlak

24 ff1012ef Petr Pudlak
A transport can send to and receive messages from some endpoint.
25 ff1012ef Petr Pudlak

26 ff1012ef Petr Pudlak
"""
27 ff1012ef Petr Pudlak
28 ff1012ef Petr Pudlak
import collections
29 ff1012ef Petr Pudlak
import errno
30 ff1012ef Petr Pudlak
import socket
31 ff1012ef Petr Pudlak
import time
32 ff1012ef Petr Pudlak
33 ff1012ef Petr Pudlak
from ganeti import constants
34 ff1012ef Petr Pudlak
from ganeti import utils
35 ff1012ef Petr Pudlak
from ganeti.rpc import errors
36 ff1012ef Petr Pudlak
37 ff1012ef Petr Pudlak
38 ff1012ef Petr Pudlak
DEF_CTMO = constants.LUXI_DEF_CTMO
39 ff1012ef Petr Pudlak
DEF_RWTO = constants.LUXI_DEF_RWTO
40 ff1012ef Petr Pudlak
41 ff1012ef Petr Pudlak
42 ff1012ef Petr Pudlak
class Transport:
43 ff1012ef Petr Pudlak
  """Low-level transport class.
44 ff1012ef Petr Pudlak

45 ff1012ef Petr Pudlak
  This is used on the client side.
46 ff1012ef Petr Pudlak

47 ff1012ef Petr Pudlak
  This could be replace by any other class that provides the same
48 ff1012ef Petr Pudlak
  semantics to the Client. This means:
49 ff1012ef Petr Pudlak
    - can send messages and receive messages
50 ff1012ef Petr Pudlak
    - safe for multithreading
51 ff1012ef Petr Pudlak

52 ff1012ef Petr Pudlak
  """
53 ff1012ef Petr Pudlak
54 ff1012ef Petr Pudlak
  def __init__(self, address, timeouts=None):
55 ff1012ef Petr Pudlak
    """Constructor for the Client class.
56 ff1012ef Petr Pudlak

57 ff1012ef Petr Pudlak
    Arguments:
58 ff1012ef Petr Pudlak
      - address: a valid address the the used transport class
59 ff1012ef Petr Pudlak
      - timeout: a list of timeouts, to be used on connect and read/write
60 ff1012ef Petr Pudlak

61 ff1012ef Petr Pudlak
    There are two timeouts used since we might want to wait for a long
62 ff1012ef Petr Pudlak
    time for a response, but the connect timeout should be lower.
63 ff1012ef Petr Pudlak

64 ff1012ef Petr Pudlak
    If not passed, we use a default of 10 and respectively 60 seconds.
65 ff1012ef Petr Pudlak

66 ff1012ef Petr Pudlak
    Note that on reading data, since the timeout applies to an
67 ff1012ef Petr Pudlak
    invidual receive, it might be that the total duration is longer
68 ff1012ef Petr Pudlak
    than timeout value passed (we make a hard limit at twice the read
69 ff1012ef Petr Pudlak
    timeout).
70 ff1012ef Petr Pudlak

71 ff1012ef Petr Pudlak
    """
72 ff1012ef Petr Pudlak
    self.address = address
73 ff1012ef Petr Pudlak
    if timeouts is None:
74 ff1012ef Petr Pudlak
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
75 ff1012ef Petr Pudlak
    else:
76 ff1012ef Petr Pudlak
      self._ctimeout, self._rwtimeout = timeouts
77 ff1012ef Petr Pudlak
78 ff1012ef Petr Pudlak
    self.socket = None
79 ff1012ef Petr Pudlak
    self._buffer = ""
80 ff1012ef Petr Pudlak
    self._msgs = collections.deque()
81 ff1012ef Petr Pudlak
82 ff1012ef Petr Pudlak
    try:
83 ff1012ef Petr Pudlak
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
84 ff1012ef Petr Pudlak
85 ff1012ef Petr Pudlak
      # Try to connect
86 ff1012ef Petr Pudlak
      try:
87 ff1012ef Petr Pudlak
        utils.Retry(self._Connect, 1.0, self._ctimeout,
88 ff1012ef Petr Pudlak
                    args=(self.socket, address, self._ctimeout))
89 ff1012ef Petr Pudlak
      except utils.RetryTimeout:
90 ff1012ef Petr Pudlak
        raise errors.TimeoutError("Connect timed out")
91 ff1012ef Petr Pudlak
92 ff1012ef Petr Pudlak
      self.socket.settimeout(self._rwtimeout)
93 ff1012ef Petr Pudlak
    except (socket.error, errors.NoMasterError):
94 ff1012ef Petr Pudlak
      if self.socket is not None:
95 ff1012ef Petr Pudlak
        self.socket.close()
96 ff1012ef Petr Pudlak
      self.socket = None
97 ff1012ef Petr Pudlak
      raise
98 ff1012ef Petr Pudlak
99 ff1012ef Petr Pudlak
  @staticmethod
100 ff1012ef Petr Pudlak
  def _Connect(sock, address, timeout):
101 ff1012ef Petr Pudlak
    sock.settimeout(timeout)
102 ff1012ef Petr Pudlak
    try:
103 ff1012ef Petr Pudlak
      sock.connect(address)
104 ff1012ef Petr Pudlak
    except socket.timeout, err:
105 ff1012ef Petr Pudlak
      raise errors.TimeoutError("Connect timed out: %s" % str(err))
106 ff1012ef Petr Pudlak
    except socket.error, err:
107 ff1012ef Petr Pudlak
      error_code = err.args[0]
108 ff1012ef Petr Pudlak
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
109 ff1012ef Petr Pudlak
        raise errors.NoMasterError(address)
110 ff1012ef Petr Pudlak
      elif error_code in (errno.EPERM, errno.EACCES):
111 ff1012ef Petr Pudlak
        raise errors.PermissionError(address)
112 ff1012ef Petr Pudlak
      elif error_code == errno.EAGAIN:
113 ff1012ef Petr Pudlak
        # Server's socket backlog is full at the moment
114 ff1012ef Petr Pudlak
        raise utils.RetryAgain()
115 ff1012ef Petr Pudlak
      raise
116 ff1012ef Petr Pudlak
117 ff1012ef Petr Pudlak
  def _CheckSocket(self):
118 ff1012ef Petr Pudlak
    """Make sure we are connected.
119 ff1012ef Petr Pudlak

120 ff1012ef Petr Pudlak
    """
121 ff1012ef Petr Pudlak
    if self.socket is None:
122 ff1012ef Petr Pudlak
      raise errors.ProtocolError("Connection is closed")
123 ff1012ef Petr Pudlak
124 ff1012ef Petr Pudlak
  def Send(self, msg):
125 ff1012ef Petr Pudlak
    """Send a message.
126 ff1012ef Petr Pudlak

127 ff1012ef Petr Pudlak
    This just sends a message and doesn't wait for the response.
128 ff1012ef Petr Pudlak

129 ff1012ef Petr Pudlak
    """
130 ff1012ef Petr Pudlak
    if constants.LUXI_EOM in msg:
131 ff1012ef Petr Pudlak
      raise errors.ProtocolError("Message terminator found in payload")
132 ff1012ef Petr Pudlak
133 ff1012ef Petr Pudlak
    self._CheckSocket()
134 ff1012ef Petr Pudlak
    try:
135 ff1012ef Petr Pudlak
      # TODO: sendall is not guaranteed to send everything
136 ff1012ef Petr Pudlak
      self.socket.sendall(msg + constants.LUXI_EOM)
137 ff1012ef Petr Pudlak
    except socket.timeout, err:
138 ff1012ef Petr Pudlak
      raise errors.TimeoutError("Sending timeout: %s" % str(err))
139 ff1012ef Petr Pudlak
140 ff1012ef Petr Pudlak
  def Recv(self):
141 ff1012ef Petr Pudlak
    """Try to receive a message from the socket.
142 ff1012ef Petr Pudlak

143 ff1012ef Petr Pudlak
    In case we already have messages queued, we just return from the
144 ff1012ef Petr Pudlak
    queue. Otherwise, we try to read data with a _rwtimeout network
145 ff1012ef Petr Pudlak
    timeout, and making sure we don't go over 2x_rwtimeout as a global
146 ff1012ef Petr Pudlak
    limit.
147 ff1012ef Petr Pudlak

148 ff1012ef Petr Pudlak
    """
149 ff1012ef Petr Pudlak
    self._CheckSocket()
150 ff1012ef Petr Pudlak
    etime = time.time() + self._rwtimeout
151 ff1012ef Petr Pudlak
    while not self._msgs:
152 ff1012ef Petr Pudlak
      if time.time() > etime:
153 ff1012ef Petr Pudlak
        raise errors.TimeoutError("Extended receive timeout")
154 ff1012ef Petr Pudlak
      while True:
155 ff1012ef Petr Pudlak
        try:
156 ff1012ef Petr Pudlak
          data = self.socket.recv(4096)
157 ff1012ef Petr Pudlak
        except socket.timeout, err:
158 ff1012ef Petr Pudlak
          raise errors.TimeoutError("Receive timeout: %s" % str(err))
159 ff1012ef Petr Pudlak
        except socket.error, err:
160 ff1012ef Petr Pudlak
          if err.args and err.args[0] == errno.EAGAIN:
161 ff1012ef Petr Pudlak
            continue
162 ff1012ef Petr Pudlak
          raise
163 ff1012ef Petr Pudlak
        break
164 ff1012ef Petr Pudlak
      if not data:
165 ff1012ef Petr Pudlak
        raise errors.ConnectionClosedError("Connection closed while reading")
166 ff1012ef Petr Pudlak
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
167 ff1012ef Petr Pudlak
      self._buffer = new_msgs.pop()
168 ff1012ef Petr Pudlak
      self._msgs.extend(new_msgs)
169 ff1012ef Petr Pudlak
    return self._msgs.popleft()
170 ff1012ef Petr Pudlak
171 ff1012ef Petr Pudlak
  def Call(self, msg):
172 ff1012ef Petr Pudlak
    """Send a message and wait for the response.
173 ff1012ef Petr Pudlak

174 ff1012ef Petr Pudlak
    This is just a wrapper over Send and Recv.
175 ff1012ef Petr Pudlak

176 ff1012ef Petr Pudlak
    """
177 ff1012ef Petr Pudlak
    self.Send(msg)
178 ff1012ef Petr Pudlak
    return self.Recv()
179 ff1012ef Petr Pudlak
180 f3aebf6f Petr Pudlak
  @staticmethod
181 f3aebf6f Petr Pudlak
  def RetryOnBrokenPipe(fn, on_error):
182 f3aebf6f Petr Pudlak
    """Calls a given function, retrying if it fails on the 'Broken pipe' IO
183 f3aebf6f Petr Pudlak
    exception.
184 f3aebf6f Petr Pudlak

185 f3aebf6f Petr Pudlak
    This allows to re-establish a broken connection and retry an IO operation.
186 f3aebf6f Petr Pudlak

187 f3aebf6f Petr Pudlak
    The function receives one an integer argument stating the current retry
188 f3aebf6f Petr Pudlak
    number, 0 being the first call, 1 being the retry.
189 f3aebf6f Petr Pudlak

190 f3aebf6f Petr Pudlak
    If any exception occurs, on_error is invoked first with the exception given
191 f3aebf6f Petr Pudlak
    as an argument. Then, if the exception is 'Broken pipe', the function call
192 f3aebf6f Petr Pudlak
    is retried once more.
193 f3aebf6f Petr Pudlak

194 f3aebf6f Petr Pudlak
    """
195 f3aebf6f Petr Pudlak
    retries = 2
196 f3aebf6f Petr Pudlak
    for try_no in range(0, retries):
197 f3aebf6f Petr Pudlak
      try:
198 f3aebf6f Petr Pudlak
        return fn(try_no)
199 f3aebf6f Petr Pudlak
      except socket.error, ex:
200 f3aebf6f Petr Pudlak
        on_error(ex)
201 f3aebf6f Petr Pudlak
        # we retry on "Broken pipe", unless it's the last try
202 f3aebf6f Petr Pudlak
        if try_no == retries - 1:
203 f3aebf6f Petr Pudlak
          raise
204 f3aebf6f Petr Pudlak
        elif not (isinstance(ex.args, tuple) and (ex[0] == errno.EPIPE)):
205 f3aebf6f Petr Pudlak
          raise
206 f3aebf6f Petr Pudlak
      except Exception, ex:
207 f3aebf6f Petr Pudlak
        on_error(ex)
208 f3aebf6f Petr Pudlak
        raise
209 f3aebf6f Petr Pudlak
    assert False # we should never get here
210 f3aebf6f Petr Pudlak
211 ff1012ef Petr Pudlak
  def Close(self):
212 ff1012ef Petr Pudlak
    """Close the socket"""
213 ff1012ef Petr Pudlak
    if self.socket is not None:
214 ff1012ef Petr Pudlak
      self.socket.close()
215 ff1012ef Petr Pudlak
      self.socket = None