root / lib / impexpd / __init__.py @ b43dcc5a
History | View | Annotate | Download (15.5 kB)
1 | bb44b1ae | Michael Hanselmann | #
|
---|---|---|---|
2 | bb44b1ae | Michael Hanselmann | #
|
3 | bb44b1ae | Michael Hanselmann | |
4 | bb44b1ae | Michael Hanselmann | # Copyright (C) 2010 Google Inc.
|
5 | bb44b1ae | Michael Hanselmann | #
|
6 | bb44b1ae | Michael Hanselmann | # This program is free software; you can redistribute it and/or modify
|
7 | bb44b1ae | Michael Hanselmann | # it under the terms of the GNU General Public License as published by
|
8 | bb44b1ae | Michael Hanselmann | # the Free Software Foundation; either version 2 of the License, or
|
9 | bb44b1ae | Michael Hanselmann | # (at your option) any later version.
|
10 | bb44b1ae | Michael Hanselmann | #
|
11 | bb44b1ae | Michael Hanselmann | # This program is distributed in the hope that it will be useful, but
|
12 | bb44b1ae | Michael Hanselmann | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | bb44b1ae | Michael Hanselmann | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | bb44b1ae | Michael Hanselmann | # General Public License for more details.
|
15 | bb44b1ae | Michael Hanselmann | #
|
16 | bb44b1ae | Michael Hanselmann | # You should have received a copy of the GNU General Public License
|
17 | bb44b1ae | Michael Hanselmann | # along with this program; if not, write to the Free Software
|
18 | bb44b1ae | Michael Hanselmann | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | bb44b1ae | Michael Hanselmann | # 02110-1301, USA.
|
20 | bb44b1ae | Michael Hanselmann | |
21 | bb44b1ae | Michael Hanselmann | |
22 | bb44b1ae | Michael Hanselmann | """Classes and functions for import/export daemon.
|
23 | bb44b1ae | Michael Hanselmann |
|
24 | bb44b1ae | Michael Hanselmann | """
|
25 | bb44b1ae | Michael Hanselmann | |
26 | c08d76f5 | Michael Hanselmann | import os |
27 | 34c9ee7b | Michael Hanselmann | import re |
28 | 34c9ee7b | Michael Hanselmann | import socket |
29 | 53dbf14c | Michael Hanselmann | import logging |
30 | c08d76f5 | Michael Hanselmann | import signal |
31 | c08d76f5 | Michael Hanselmann | import errno |
32 | c08d76f5 | Michael Hanselmann | import time |
33 | bb44b1ae | Michael Hanselmann | from cStringIO import StringIO |
34 | bb44b1ae | Michael Hanselmann | |
35 | bb44b1ae | Michael Hanselmann | from ganeti import constants |
36 | bb44b1ae | Michael Hanselmann | from ganeti import errors |
37 | bb44b1ae | Michael Hanselmann | from ganeti import utils |
38 | bb44b1ae | Michael Hanselmann | |
39 | bb44b1ae | Michael Hanselmann | |
40 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) starts to listen on its socket.
|
41 | 34c9ee7b | Michael Hanselmann | #: The local address is required for the remote peer to connect (in particular
|
42 | 34c9ee7b | Michael Hanselmann | #: the port number).
|
43 | 34c9ee7b | Michael Hanselmann | LISTENING_RE = re.compile(r"^listening on\s+"
|
44 | 34c9ee7b | Michael Hanselmann | r"AF=(?P<family>\d+)\s+"
|
45 | 34c9ee7b | Michael Hanselmann | r"(?P<address>.+):(?P<port>\d+)$", re.I)
|
46 | 34c9ee7b | Michael Hanselmann | |
47 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) is sending data over the wire
|
48 | 34c9ee7b | Michael Hanselmann | TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
|
49 | 34c9ee7b | Michael Hanselmann | re.I) |
50 | 34c9ee7b | Michael Hanselmann | |
51 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG = "D"
|
52 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO = "I"
|
53 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE = "N"
|
54 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_WARNING = "W"
|
55 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_ERROR = "E"
|
56 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_FATAL = "F"
|
57 | 34c9ee7b | Michael Hanselmann | |
58 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_IGNORE = frozenset([
|
59 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG, |
60 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO, |
61 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE, |
62 | 34c9ee7b | Michael Hanselmann | ]) |
63 | 34c9ee7b | Michael Hanselmann | |
64 | c08d76f5 | Michael Hanselmann | #: Used to parse GNU dd(1) statistics
|
65 | c08d76f5 | Michael Hanselmann | DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
|
66 | c08d76f5 | Michael Hanselmann | r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
|
67 | c08d76f5 | Michael Hanselmann | |
68 | c08d76f5 | Michael Hanselmann | #: Used to ignore "N+N records in/out" on dd(1)'s stderr
|
69 | c08d76f5 | Michael Hanselmann | DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
|
70 | c08d76f5 | Michael Hanselmann | |
71 | c08d76f5 | Michael Hanselmann | #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
|
72 | c08d76f5 | Michael Hanselmann | #: unavailable and SIGUSR1 is used instead)
|
73 | c08d76f5 | Michael Hanselmann | DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
74 | c08d76f5 | Michael Hanselmann | |
75 | bb44b1ae | Michael Hanselmann | #: Buffer size: at most this many bytes are transferred at once
|
76 | bb44b1ae | Michael Hanselmann | BUFSIZE = 1024 * 1024 |
77 | bb44b1ae | Michael Hanselmann | |
78 | bb44b1ae | Michael Hanselmann | # Common options for socat
|
79 | bb44b1ae | Michael Hanselmann | SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
80 | 971bbd84 | Michael Hanselmann | SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", |
81 | 971bbd84 | Michael Hanselmann | "cipher=%s" % constants.OPENSSL_CIPHERS]
|
82 | bb44b1ae | Michael Hanselmann | |
83 | 0559f745 | Michael Hanselmann | SOCAT_OPTION_MAXLEN = 400
|
84 | 0559f745 | Michael Hanselmann | |
85 | 34c9ee7b | Michael Hanselmann | (PROG_OTHER, |
86 | c08d76f5 | Michael Hanselmann | PROG_SOCAT, |
87 | c08d76f5 | Michael Hanselmann | PROG_DD, |
88 | f9323011 | Michael Hanselmann | PROG_DD_PID, |
89 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE) = range(1, 6) |
90 | 34c9ee7b | Michael Hanselmann | PROG_ALL = frozenset([
|
91 | 34c9ee7b | Michael Hanselmann | PROG_OTHER, |
92 | 34c9ee7b | Michael Hanselmann | PROG_SOCAT, |
93 | c08d76f5 | Michael Hanselmann | PROG_DD, |
94 | c08d76f5 | Michael Hanselmann | PROG_DD_PID, |
95 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE, |
96 | 34c9ee7b | Michael Hanselmann | ]) |
97 | 34c9ee7b | Michael Hanselmann | |
98 | bb44b1ae | Michael Hanselmann | |
99 | bb44b1ae | Michael Hanselmann | class CommandBuilder(object): |
100 | c08d76f5 | Michael Hanselmann | def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): |
101 | bb44b1ae | Michael Hanselmann | """Initializes this class.
|
102 | bb44b1ae | Michael Hanselmann |
|
103 | bb44b1ae | Michael Hanselmann | @param mode: Daemon mode (import or export)
|
104 | bb44b1ae | Michael Hanselmann | @param opts: Options object
|
105 | bb44b1ae | Michael Hanselmann | @type socat_stderr_fd: int
|
106 | bb44b1ae | Michael Hanselmann | @param socat_stderr_fd: File descriptor socat should write its stderr to
|
107 | c08d76f5 | Michael Hanselmann | @type dd_stderr_fd: int
|
108 | c08d76f5 | Michael Hanselmann | @param dd_stderr_fd: File descriptor dd should write its stderr to
|
109 | c08d76f5 | Michael Hanselmann | @type dd_pid_fd: int
|
110 | c08d76f5 | Michael Hanselmann | @param dd_pid_fd: File descriptor the child should write dd's PID to
|
111 | bb44b1ae | Michael Hanselmann |
|
112 | bb44b1ae | Michael Hanselmann | """
|
113 | bb44b1ae | Michael Hanselmann | self._opts = opts
|
114 | bb44b1ae | Michael Hanselmann | self._mode = mode
|
115 | bb44b1ae | Michael Hanselmann | self._socat_stderr_fd = socat_stderr_fd
|
116 | c08d76f5 | Michael Hanselmann | self._dd_stderr_fd = dd_stderr_fd
|
117 | c08d76f5 | Michael Hanselmann | self._dd_pid_fd = dd_pid_fd
|
118 | bb44b1ae | Michael Hanselmann | |
119 | 1d3dfa29 | Michael Hanselmann | assert (self._opts.magic is None or |
120 | 1d3dfa29 | Michael Hanselmann | constants.IE_MAGIC_RE.match(self._opts.magic))
|
121 | 1d3dfa29 | Michael Hanselmann | |
122 | bb44b1ae | Michael Hanselmann | @staticmethod
|
123 | bb44b1ae | Michael Hanselmann | def GetBashCommand(cmd): |
124 | bb44b1ae | Michael Hanselmann | """Prepares a command to be run in Bash.
|
125 | bb44b1ae | Michael Hanselmann |
|
126 | bb44b1ae | Michael Hanselmann | """
|
127 | bb44b1ae | Michael Hanselmann | return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
128 | bb44b1ae | Michael Hanselmann | |
129 | bb44b1ae | Michael Hanselmann | def _GetSocatCommand(self): |
130 | bb44b1ae | Michael Hanselmann | """Returns the socat command.
|
131 | bb44b1ae | Michael Hanselmann |
|
132 | bb44b1ae | Michael Hanselmann | """
|
133 | bb44b1ae | Michael Hanselmann | common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
134 | bb44b1ae | Michael Hanselmann | "key=%s" % self._opts.key, |
135 | bb44b1ae | Michael Hanselmann | "cert=%s" % self._opts.cert, |
136 | bb44b1ae | Michael Hanselmann | "cafile=%s" % self._opts.ca, |
137 | bb44b1ae | Michael Hanselmann | ] |
138 | bb44b1ae | Michael Hanselmann | |
139 | bb44b1ae | Michael Hanselmann | if self._opts.bind is not None: |
140 | bb44b1ae | Michael Hanselmann | common_addr_opts.append("bind=%s" % self._opts.bind) |
141 | bb44b1ae | Michael Hanselmann | |
142 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
143 | bb44b1ae | Michael Hanselmann | if self._opts.port is None: |
144 | bb44b1ae | Michael Hanselmann | port = 0
|
145 | bb44b1ae | Michael Hanselmann | else:
|
146 | bb44b1ae | Michael Hanselmann | port = self._opts.port
|
147 | bb44b1ae | Michael Hanselmann | |
148 | bb44b1ae | Michael Hanselmann | addr1 = [ |
149 | bb44b1ae | Michael Hanselmann | "OPENSSL-LISTEN:%s" % port,
|
150 | bb44b1ae | Michael Hanselmann | "reuseaddr",
|
151 | bb44b1ae | Michael Hanselmann | |
152 | bb44b1ae | Michael Hanselmann | # Retry to listen if connection wasn't established successfully, up to
|
153 | bb44b1ae | Michael Hanselmann | # 100 times a second. Note that this still leaves room for DoS attacks.
|
154 | bb44b1ae | Michael Hanselmann | "forever",
|
155 | bb44b1ae | Michael Hanselmann | "intervall=0.01",
|
156 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
157 | bb44b1ae | Michael Hanselmann | addr2 = ["stdout"]
|
158 | bb44b1ae | Michael Hanselmann | |
159 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
160 | bb44b1ae | Michael Hanselmann | addr1 = ["stdin"]
|
161 | bb44b1ae | Michael Hanselmann | addr2 = [ |
162 | bb44b1ae | Michael Hanselmann | "OPENSSL:%s:%s" % (self._opts.host, self._opts.port), |
163 | bb44b1ae | Michael Hanselmann | |
164 | bb44b1ae | Michael Hanselmann | # How long to wait per connection attempt
|
165 | bb44b1ae | Michael Hanselmann | "connect-timeout=%s" % self._opts.connect_timeout, |
166 | bb44b1ae | Michael Hanselmann | |
167 | bb44b1ae | Michael Hanselmann | # Retry a few times before giving up to connect (once per second)
|
168 | bb44b1ae | Michael Hanselmann | "retry=%s" % self._opts.connect_retries, |
169 | bb44b1ae | Michael Hanselmann | "intervall=1",
|
170 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
171 | bb44b1ae | Michael Hanselmann | |
172 | bb44b1ae | Michael Hanselmann | else:
|
173 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
174 | bb44b1ae | Michael Hanselmann | |
175 | bb44b1ae | Michael Hanselmann | for i in [addr1, addr2]: |
176 | bb44b1ae | Michael Hanselmann | for value in i: |
177 | 0559f745 | Michael Hanselmann | if len(value) > SOCAT_OPTION_MAXLEN: |
178 | 0559f745 | Michael Hanselmann | raise errors.GenericError("Socat option longer than %s" |
179 | 0559f745 | Michael Hanselmann | " characters: %r" %
|
180 | 0559f745 | Michael Hanselmann | (SOCAT_OPTION_MAXLEN, value)) |
181 | bb44b1ae | Michael Hanselmann | if "," in value: |
182 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Comma not allowed in socat option" |
183 | bb44b1ae | Michael Hanselmann | " value: %r" % value)
|
184 | bb44b1ae | Michael Hanselmann | |
185 | bb44b1ae | Michael Hanselmann | return [
|
186 | bb44b1ae | Michael Hanselmann | constants.SOCAT_PATH, |
187 | bb44b1ae | Michael Hanselmann | |
188 | bb44b1ae | Michael Hanselmann | # Log to stderr
|
189 | bb44b1ae | Michael Hanselmann | "-ls",
|
190 | bb44b1ae | Michael Hanselmann | |
191 | bb44b1ae | Michael Hanselmann | # Log level
|
192 | bb44b1ae | Michael Hanselmann | "-d", "-d", |
193 | bb44b1ae | Michael Hanselmann | |
194 | bb44b1ae | Michael Hanselmann | # Buffer size
|
195 | bb44b1ae | Michael Hanselmann | "-b%s" % BUFSIZE,
|
196 | bb44b1ae | Michael Hanselmann | |
197 | bb44b1ae | Michael Hanselmann | # Unidirectional mode, the first address is only used for reading, and the
|
198 | bb44b1ae | Michael Hanselmann | # second address is only used for writing
|
199 | bb44b1ae | Michael Hanselmann | "-u",
|
200 | bb44b1ae | Michael Hanselmann | |
201 | bb44b1ae | Michael Hanselmann | ",".join(addr1), ",".join(addr2) |
202 | bb44b1ae | Michael Hanselmann | ] |
203 | bb44b1ae | Michael Hanselmann | |
204 | 1d3dfa29 | Michael Hanselmann | def _GetMagicCommand(self): |
205 | 1d3dfa29 | Michael Hanselmann | """Returns the command to read/write the magic value.
|
206 | 1d3dfa29 | Michael Hanselmann |
|
207 | 1d3dfa29 | Michael Hanselmann | """
|
208 | 1d3dfa29 | Michael Hanselmann | if not self._opts.magic: |
209 | 1d3dfa29 | Michael Hanselmann | return None |
210 | 1d3dfa29 | Michael Hanselmann | |
211 | 1d3dfa29 | Michael Hanselmann | # Prefix to ensure magic isn't interpreted as option to "echo"
|
212 | 1d3dfa29 | Michael Hanselmann | magic = "M=%s" % self._opts.magic |
213 | 1d3dfa29 | Michael Hanselmann | |
214 | 1d3dfa29 | Michael Hanselmann | cmd = StringIO() |
215 | 1d3dfa29 | Michael Hanselmann | |
216 | 1d3dfa29 | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
217 | 1d3dfa29 | Michael Hanselmann | cmd.write("{ ")
|
218 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) |
219 | 1d3dfa29 | Michael Hanselmann | cmd.write(" && ")
|
220 | 1d3dfa29 | Michael Hanselmann | cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
|
221 | 1d3dfa29 | Michael Hanselmann | cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) |
222 | 1d3dfa29 | Michael Hanselmann | cmd.write(" exit 1;")
|
223 | 1d3dfa29 | Michael Hanselmann | cmd.write("fi;")
|
224 | 1d3dfa29 | Michael Hanselmann | cmd.write(" }")
|
225 | 1d3dfa29 | Michael Hanselmann | |
226 | 1d3dfa29 | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
227 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) |
228 | 1d3dfa29 | Michael Hanselmann | |
229 | 1d3dfa29 | Michael Hanselmann | else:
|
230 | 1d3dfa29 | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
231 | 1d3dfa29 | Michael Hanselmann | |
232 | 1d3dfa29 | Michael Hanselmann | return cmd.getvalue()
|
233 | 1d3dfa29 | Michael Hanselmann | |
234 | fbb6b864 | Michael Hanselmann | def _GetDdCommand(self): |
235 | fbb6b864 | Michael Hanselmann | """Returns the command for measuring throughput.
|
236 | bb44b1ae | Michael Hanselmann |
|
237 | bb44b1ae | Michael Hanselmann | """
|
238 | c08d76f5 | Michael Hanselmann | dd_cmd = StringIO() |
239 | 1d3dfa29 | Michael Hanselmann | |
240 | 1d3dfa29 | Michael Hanselmann | magic_cmd = self._GetMagicCommand()
|
241 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
242 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
243 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(magic_cmd) |
244 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" && ")
|
245 | 1d3dfa29 | Michael Hanselmann | |
246 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
247 | c08d76f5 | Michael Hanselmann | # Setting LC_ALL since we want to parse the output and explicitely
|
248 | c08d76f5 | Michael Hanselmann | # redirecting stdin, as the background process (dd) would have /dev/null as
|
249 | c08d76f5 | Michael Hanselmann | # stdin otherwise
|
250 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
|
251 | c08d76f5 | Michael Hanselmann | (BUFSIZE, self._dd_stderr_fd))
|
252 | c08d76f5 | Michael Hanselmann | # Send PID to daemon
|
253 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
254 | c08d76f5 | Michael Hanselmann | # And wait for dd
|
255 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" wait $pid;")
|
256 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" }")
|
257 | 1d3dfa29 | Michael Hanselmann | |
258 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
259 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" }")
|
260 | 1d3dfa29 | Michael Hanselmann | |
261 | fbb6b864 | Michael Hanselmann | return dd_cmd.getvalue()
|
262 | fbb6b864 | Michael Hanselmann | |
263 | fbb6b864 | Michael Hanselmann | def _GetTransportCommand(self): |
264 | fbb6b864 | Michael Hanselmann | """Returns the command for the transport part of the daemon.
|
265 | fbb6b864 | Michael Hanselmann |
|
266 | fbb6b864 | Michael Hanselmann | """
|
267 | fbb6b864 | Michael Hanselmann | socat_cmd = ("%s 2>&%d" %
|
268 | fbb6b864 | Michael Hanselmann | (utils.ShellQuoteArgs(self._GetSocatCommand()),
|
269 | fbb6b864 | Michael Hanselmann | self._socat_stderr_fd))
|
270 | fbb6b864 | Michael Hanselmann | dd_cmd = self._GetDdCommand()
|
271 | c08d76f5 | Michael Hanselmann | |
272 | bb44b1ae | Michael Hanselmann | compr = self._opts.compress
|
273 | bb44b1ae | Michael Hanselmann | |
274 | bb44b1ae | Michael Hanselmann | assert compr in constants.IEC_ALL |
275 | bb44b1ae | Michael Hanselmann | |
276 | fbb6b864 | Michael Hanselmann | parts = [] |
277 | fbb6b864 | Michael Hanselmann | |
278 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
279 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
280 | fbb6b864 | Michael Hanselmann | |
281 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
282 | fbb6b864 | Michael Hanselmann | parts.append("gunzip -c")
|
283 | fbb6b864 | Michael Hanselmann | |
284 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
285 | c08d76f5 | Michael Hanselmann | |
286 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
287 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
288 | fbb6b864 | Michael Hanselmann | |
289 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
290 | fbb6b864 | Michael Hanselmann | parts.append("gzip -c")
|
291 | fbb6b864 | Michael Hanselmann | |
292 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
293 | c08d76f5 | Michael Hanselmann | |
294 | bb44b1ae | Michael Hanselmann | else:
|
295 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
296 | bb44b1ae | Michael Hanselmann | |
297 | bb44b1ae | Michael Hanselmann | # TODO: Run transport as separate user
|
298 | bb44b1ae | Michael Hanselmann | # The transport uses its own shell to simplify running it as a separate user
|
299 | bb44b1ae | Michael Hanselmann | # in the future.
|
300 | fbb6b864 | Michael Hanselmann | return self.GetBashCommand(" | ".join(parts)) |
301 | bb44b1ae | Michael Hanselmann | |
302 | bb44b1ae | Michael Hanselmann | def GetCommand(self): |
303 | bb44b1ae | Michael Hanselmann | """Returns the complete child process command.
|
304 | bb44b1ae | Michael Hanselmann |
|
305 | bb44b1ae | Michael Hanselmann | """
|
306 | bb44b1ae | Michael Hanselmann | transport_cmd = self._GetTransportCommand()
|
307 | bb44b1ae | Michael Hanselmann | |
308 | bb44b1ae | Michael Hanselmann | buf = StringIO() |
309 | bb44b1ae | Michael Hanselmann | |
310 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_prefix: |
311 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_prefix)
|
312 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
313 | bb44b1ae | Michael Hanselmann | |
314 | bb44b1ae | Michael Hanselmann | buf.write(utils.ShellQuoteArgs(transport_cmd)) |
315 | bb44b1ae | Michael Hanselmann | |
316 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_suffix: |
317 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
318 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_suffix)
|
319 | bb44b1ae | Michael Hanselmann | |
320 | bb44b1ae | Michael Hanselmann | return self.GetBashCommand(buf.getvalue()) |
321 | 34c9ee7b | Michael Hanselmann | |
322 | 34c9ee7b | Michael Hanselmann | |
323 | 34c9ee7b | Michael Hanselmann | def _VerifyListening(family, address, port): |
324 | 34c9ee7b | Michael Hanselmann | """Verify address given as listening address by socat.
|
325 | 34c9ee7b | Michael Hanselmann |
|
326 | 34c9ee7b | Michael Hanselmann | """
|
327 | 34c9ee7b | Michael Hanselmann | # TODO: Implement IPv6 support
|
328 | 34c9ee7b | Michael Hanselmann | if family != socket.AF_INET:
|
329 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Address family %r not supported" % family) |
330 | 34c9ee7b | Michael Hanselmann | |
331 | 34c9ee7b | Michael Hanselmann | try:
|
332 | 34c9ee7b | Michael Hanselmann | packed_address = socket.inet_pton(family, address) |
333 | 34c9ee7b | Michael Hanselmann | except socket.error:
|
334 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Invalid address %r for family %s" % |
335 | 34c9ee7b | Michael Hanselmann | (address, family)) |
336 | 34c9ee7b | Michael Hanselmann | |
337 | 34c9ee7b | Michael Hanselmann | return (socket.inet_ntop(family, packed_address), port)
|
338 | 34c9ee7b | Michael Hanselmann | |
339 | 34c9ee7b | Michael Hanselmann | |
340 | 34c9ee7b | Michael Hanselmann | class ChildIOProcessor(object): |
341 | f9323011 | Michael Hanselmann | def __init__(self, debug, status_file, logger, throughput_samples, exp_size): |
342 | 34c9ee7b | Michael Hanselmann | """Initializes this class.
|
343 | 34c9ee7b | Michael Hanselmann |
|
344 | 34c9ee7b | Michael Hanselmann | """
|
345 | 34c9ee7b | Michael Hanselmann | self._debug = debug
|
346 | 34c9ee7b | Michael Hanselmann | self._status_file = status_file
|
347 | 34c9ee7b | Michael Hanselmann | self._logger = logger
|
348 | 34c9ee7b | Michael Hanselmann | |
349 | 34c9ee7b | Michael Hanselmann | self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
350 | 34c9ee7b | Michael Hanselmann | for prog in PROG_ALL]) |
351 | 34c9ee7b | Michael Hanselmann | |
352 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
353 | c08d76f5 | Michael Hanselmann | self._dd_ready = False |
354 | c08d76f5 | Michael Hanselmann | self._dd_tp_samples = throughput_samples
|
355 | c08d76f5 | Michael Hanselmann | self._dd_progress = []
|
356 | c08d76f5 | Michael Hanselmann | |
357 | c08d76f5 | Michael Hanselmann | # Expected size of transferred data
|
358 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
359 | c08d76f5 | Michael Hanselmann | |
360 | 34c9ee7b | Michael Hanselmann | def GetLineSplitter(self, prog): |
361 | 34c9ee7b | Michael Hanselmann | """Returns the line splitter for a program.
|
362 | 34c9ee7b | Michael Hanselmann |
|
363 | 34c9ee7b | Michael Hanselmann | """
|
364 | 34c9ee7b | Michael Hanselmann | return self._splitter[prog] |
365 | 34c9ee7b | Michael Hanselmann | |
366 | 34c9ee7b | Michael Hanselmann | def FlushAll(self): |
367 | 34c9ee7b | Michael Hanselmann | """Flushes all line splitters.
|
368 | 34c9ee7b | Michael Hanselmann |
|
369 | 34c9ee7b | Michael Hanselmann | """
|
370 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
371 | 34c9ee7b | Michael Hanselmann | ls.flush() |
372 | 34c9ee7b | Michael Hanselmann | |
373 | 34c9ee7b | Michael Hanselmann | def CloseAll(self): |
374 | 34c9ee7b | Michael Hanselmann | """Closes all line splitters.
|
375 | 34c9ee7b | Michael Hanselmann |
|
376 | 34c9ee7b | Michael Hanselmann | """
|
377 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
378 | 34c9ee7b | Michael Hanselmann | ls.close() |
379 | 34c9ee7b | Michael Hanselmann | self._splitter.clear()
|
380 | 34c9ee7b | Michael Hanselmann | |
381 | c08d76f5 | Michael Hanselmann | def NotifyDd(self): |
382 | c08d76f5 | Michael Hanselmann | """Tells dd(1) to write statistics.
|
383 | c08d76f5 | Michael Hanselmann |
|
384 | c08d76f5 | Michael Hanselmann | """
|
385 | c08d76f5 | Michael Hanselmann | if self._dd_pid is None: |
386 | c08d76f5 | Michael Hanselmann | # Can't notify
|
387 | c08d76f5 | Michael Hanselmann | return False |
388 | c08d76f5 | Michael Hanselmann | |
389 | c08d76f5 | Michael Hanselmann | if not self._dd_ready: |
390 | c08d76f5 | Michael Hanselmann | # There's a race condition between starting the program and sending
|
391 | c08d76f5 | Michael Hanselmann | # signals. The signal handler is only registered after some time, so we
|
392 | c08d76f5 | Michael Hanselmann | # have to check whether the program is ready. If it isn't, sending a
|
393 | c08d76f5 | Michael Hanselmann | # signal will invoke the default handler (and usually abort the program).
|
394 | c08d76f5 | Michael Hanselmann | if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
395 | c08d76f5 | Michael Hanselmann | logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
|
396 | c08d76f5 | Michael Hanselmann | return False |
397 | c08d76f5 | Michael Hanselmann | |
398 | c08d76f5 | Michael Hanselmann | logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
|
399 | c08d76f5 | Michael Hanselmann | self._dd_ready = True |
400 | c08d76f5 | Michael Hanselmann | |
401 | c08d76f5 | Michael Hanselmann | logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
402 | c08d76f5 | Michael Hanselmann | try:
|
403 | c08d76f5 | Michael Hanselmann | os.kill(self._dd_pid, DD_INFO_SIGNAL)
|
404 | c08d76f5 | Michael Hanselmann | except EnvironmentError, err: |
405 | c08d76f5 | Michael Hanselmann | if err.errno != errno.ESRCH:
|
406 | c08d76f5 | Michael Hanselmann | raise
|
407 | c08d76f5 | Michael Hanselmann | |
408 | c08d76f5 | Michael Hanselmann | # Process no longer exists
|
409 | 560cbec1 | Michael Hanselmann | logging.debug("dd exited")
|
410 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
411 | c08d76f5 | Michael Hanselmann | |
412 | c08d76f5 | Michael Hanselmann | return True |
413 | c08d76f5 | Michael Hanselmann | |
414 | 34c9ee7b | Michael Hanselmann | def _ProcessOutput(self, line, prog): |
415 | 34c9ee7b | Michael Hanselmann | """Takes care of child process output.
|
416 | 34c9ee7b | Michael Hanselmann |
|
417 | 34c9ee7b | Michael Hanselmann | @type line: string
|
418 | 34c9ee7b | Michael Hanselmann | @param line: Child output line
|
419 | 34c9ee7b | Michael Hanselmann | @type prog: number
|
420 | 34c9ee7b | Michael Hanselmann | @param prog: Program from which the line originates
|
421 | 34c9ee7b | Michael Hanselmann |
|
422 | 34c9ee7b | Michael Hanselmann | """
|
423 | 34c9ee7b | Michael Hanselmann | force_update = False
|
424 | 34c9ee7b | Michael Hanselmann | forward_line = line |
425 | 34c9ee7b | Michael Hanselmann | |
426 | 34c9ee7b | Michael Hanselmann | if prog == PROG_SOCAT:
|
427 | 34c9ee7b | Michael Hanselmann | level = None
|
428 | 34c9ee7b | Michael Hanselmann | parts = line.split(None, 4) |
429 | 34c9ee7b | Michael Hanselmann | |
430 | 34c9ee7b | Michael Hanselmann | if len(parts) == 5: |
431 | 34c9ee7b | Michael Hanselmann | (_, _, _, level, msg) = parts |
432 | 34c9ee7b | Michael Hanselmann | |
433 | 34c9ee7b | Michael Hanselmann | force_update = self._ProcessSocatOutput(self._status_file, level, msg) |
434 | 34c9ee7b | Michael Hanselmann | |
435 | 34c9ee7b | Michael Hanselmann | if self._debug or (level and level not in SOCAT_LOG_IGNORE): |
436 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s %s" % (level, msg)
|
437 | 34c9ee7b | Michael Hanselmann | else:
|
438 | 34c9ee7b | Michael Hanselmann | forward_line = None
|
439 | 34c9ee7b | Michael Hanselmann | else:
|
440 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s" % line
|
441 | 34c9ee7b | Michael Hanselmann | |
442 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD:
|
443 | c08d76f5 | Michael Hanselmann | (should_forward, force_update) = self._ProcessDdOutput(line)
|
444 | c08d76f5 | Michael Hanselmann | |
445 | c08d76f5 | Michael Hanselmann | if should_forward or self._debug: |
446 | c08d76f5 | Michael Hanselmann | forward_line = "dd: %s" % line
|
447 | c08d76f5 | Michael Hanselmann | else:
|
448 | c08d76f5 | Michael Hanselmann | forward_line = None
|
449 | c08d76f5 | Michael Hanselmann | |
450 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD_PID:
|
451 | c08d76f5 | Michael Hanselmann | if self._dd_pid: |
452 | c08d76f5 | Michael Hanselmann | raise RuntimeError("dd PID reported more than once") |
453 | c08d76f5 | Michael Hanselmann | logging.debug("Received dd PID %r", line)
|
454 | c08d76f5 | Michael Hanselmann | self._dd_pid = int(line) |
455 | c08d76f5 | Michael Hanselmann | forward_line = None
|
456 | c08d76f5 | Michael Hanselmann | |
457 | f9323011 | Michael Hanselmann | elif prog == PROG_EXP_SIZE:
|
458 | f9323011 | Michael Hanselmann | logging.debug("Received predicted size %r", line)
|
459 | f9323011 | Michael Hanselmann | forward_line = None
|
460 | f9323011 | Michael Hanselmann | |
461 | f9323011 | Michael Hanselmann | if line:
|
462 | f9323011 | Michael Hanselmann | try:
|
463 | f9323011 | Michael Hanselmann | exp_size = utils.BytesToMebibyte(int(line))
|
464 | f9323011 | Michael Hanselmann | except (ValueError, TypeError), err: |
465 | f9323011 | Michael Hanselmann | logging.error("Failed to convert predicted size %r to number: %s",
|
466 | f9323011 | Michael Hanselmann | line, err) |
467 | f9323011 | Michael Hanselmann | exp_size = None
|
468 | f9323011 | Michael Hanselmann | else:
|
469 | f9323011 | Michael Hanselmann | exp_size = None
|
470 | f9323011 | Michael Hanselmann | |
471 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
472 | f9323011 | Michael Hanselmann | |
473 | 34c9ee7b | Michael Hanselmann | if forward_line:
|
474 | 34c9ee7b | Michael Hanselmann | self._logger.info(forward_line)
|
475 | 34c9ee7b | Michael Hanselmann | self._status_file.AddRecentOutput(forward_line)
|
476 | 34c9ee7b | Michael Hanselmann | |
477 | 34c9ee7b | Michael Hanselmann | self._status_file.Update(force_update)
|
478 | 34c9ee7b | Michael Hanselmann | |
479 | 34c9ee7b | Michael Hanselmann | @staticmethod
|
480 | 34c9ee7b | Michael Hanselmann | def _ProcessSocatOutput(status_file, level, msg): |
481 | 34c9ee7b | Michael Hanselmann | """Interprets socat log output.
|
482 | 34c9ee7b | Michael Hanselmann |
|
483 | 34c9ee7b | Michael Hanselmann | """
|
484 | 34c9ee7b | Michael Hanselmann | if level == SOCAT_LOG_NOTICE:
|
485 | 34c9ee7b | Michael Hanselmann | if status_file.GetListenPort() is None: |
486 | 34c9ee7b | Michael Hanselmann | # TODO: Maybe implement timeout to not listen forever
|
487 | 34c9ee7b | Michael Hanselmann | m = LISTENING_RE.match(msg) |
488 | 34c9ee7b | Michael Hanselmann | if m:
|
489 | 34c9ee7b | Michael Hanselmann | (_, port) = _VerifyListening(int(m.group("family")), |
490 | 34c9ee7b | Michael Hanselmann | m.group("address"),
|
491 | 34c9ee7b | Michael Hanselmann | int(m.group("port"))) |
492 | 34c9ee7b | Michael Hanselmann | |
493 | 34c9ee7b | Michael Hanselmann | status_file.SetListenPort(port) |
494 | 34c9ee7b | Michael Hanselmann | return True |
495 | 34c9ee7b | Michael Hanselmann | |
496 | 34c9ee7b | Michael Hanselmann | if not status_file.GetConnected(): |
497 | 34c9ee7b | Michael Hanselmann | m = TRANSFER_LOOP_RE.match(msg) |
498 | 34c9ee7b | Michael Hanselmann | if m:
|
499 | 53dbf14c | Michael Hanselmann | logging.debug("Connection established")
|
500 | 34c9ee7b | Michael Hanselmann | status_file.SetConnected() |
501 | 34c9ee7b | Michael Hanselmann | return True |
502 | 34c9ee7b | Michael Hanselmann | |
503 | 34c9ee7b | Michael Hanselmann | return False |
504 | c08d76f5 | Michael Hanselmann | |
505 | c08d76f5 | Michael Hanselmann | def _ProcessDdOutput(self, line): |
506 | c08d76f5 | Michael Hanselmann | """Interprets a line of dd(1)'s output.
|
507 | c08d76f5 | Michael Hanselmann |
|
508 | c08d76f5 | Michael Hanselmann | """
|
509 | c08d76f5 | Michael Hanselmann | m = DD_INFO_RE.match(line) |
510 | c08d76f5 | Michael Hanselmann | if m:
|
511 | c08d76f5 | Michael Hanselmann | seconds = float(m.group("seconds")) |
512 | c08d76f5 | Michael Hanselmann | mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
513 | c08d76f5 | Michael Hanselmann | self._UpdateDdProgress(seconds, mbytes)
|
514 | c08d76f5 | Michael Hanselmann | return (False, True) |
515 | c08d76f5 | Michael Hanselmann | |
516 | c08d76f5 | Michael Hanselmann | m = DD_STDERR_IGNORE.match(line) |
517 | c08d76f5 | Michael Hanselmann | if m:
|
518 | c08d76f5 | Michael Hanselmann | # Ignore
|
519 | c08d76f5 | Michael Hanselmann | return (False, False) |
520 | c08d76f5 | Michael Hanselmann | |
521 | c08d76f5 | Michael Hanselmann | # Forward line
|
522 | c08d76f5 | Michael Hanselmann | return (True, False) |
523 | c08d76f5 | Michael Hanselmann | |
524 | c08d76f5 | Michael Hanselmann | def _UpdateDdProgress(self, seconds, mbytes): |
525 | c08d76f5 | Michael Hanselmann | """Updates the internal status variables for dd(1) progress.
|
526 | c08d76f5 | Michael Hanselmann |
|
527 | c08d76f5 | Michael Hanselmann | @type seconds: float
|
528 | c08d76f5 | Michael Hanselmann | @param seconds: Timestamp of this update
|
529 | c08d76f5 | Michael Hanselmann | @type mbytes: float
|
530 | c08d76f5 | Michael Hanselmann | @param mbytes: Total number of MiB transferred so far
|
531 | c08d76f5 | Michael Hanselmann |
|
532 | c08d76f5 | Michael Hanselmann | """
|
533 | c08d76f5 | Michael Hanselmann | # Add latest sample
|
534 | c08d76f5 | Michael Hanselmann | self._dd_progress.append((seconds, mbytes))
|
535 | c08d76f5 | Michael Hanselmann | |
536 | c08d76f5 | Michael Hanselmann | # Remove old samples
|
537 | c08d76f5 | Michael Hanselmann | del self._dd_progress[:-self._dd_tp_samples] |
538 | c08d76f5 | Michael Hanselmann | |
539 | c08d76f5 | Michael Hanselmann | # Calculate throughput
|
540 | c08d76f5 | Michael Hanselmann | throughput = _CalcThroughput(self._dd_progress)
|
541 | c08d76f5 | Michael Hanselmann | |
542 | c08d76f5 | Michael Hanselmann | # Calculate percent and ETA
|
543 | c08d76f5 | Michael Hanselmann | percent = None
|
544 | c08d76f5 | Michael Hanselmann | eta = None
|
545 | c08d76f5 | Michael Hanselmann | |
546 | c08d76f5 | Michael Hanselmann | if self._exp_size is not None: |
547 | c08d76f5 | Michael Hanselmann | if self._exp_size != 0: |
548 | c08d76f5 | Michael Hanselmann | percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
549 | c08d76f5 | Michael Hanselmann | |
550 | c08d76f5 | Michael Hanselmann | if throughput:
|
551 | c08d76f5 | Michael Hanselmann | eta = max(0, float(self._exp_size - mbytes) / throughput) |
552 | c08d76f5 | Michael Hanselmann | |
553 | c08d76f5 | Michael Hanselmann | self._status_file.SetProgress(mbytes, throughput, percent, eta)
|
554 | c08d76f5 | Michael Hanselmann | |
555 | c08d76f5 | Michael Hanselmann | |
556 | c08d76f5 | Michael Hanselmann | def _CalcThroughput(samples): |
557 | c08d76f5 | Michael Hanselmann | """Calculates the throughput in MiB/second.
|
558 | c08d76f5 | Michael Hanselmann |
|
559 | c08d76f5 | Michael Hanselmann | @type samples: sequence
|
560 | c08d76f5 | Michael Hanselmann | @param samples: List of samples, each consisting of a (timestamp, mbytes)
|
561 | c08d76f5 | Michael Hanselmann | tuple
|
562 | c08d76f5 | Michael Hanselmann | @rtype: float or None
|
563 | c08d76f5 | Michael Hanselmann | @return: Throughput in MiB/second
|
564 | c08d76f5 | Michael Hanselmann |
|
565 | c08d76f5 | Michael Hanselmann | """
|
566 | c08d76f5 | Michael Hanselmann | if len(samples) < 2: |
567 | c08d76f5 | Michael Hanselmann | # Can't calculate throughput
|
568 | c08d76f5 | Michael Hanselmann | return None |
569 | c08d76f5 | Michael Hanselmann | |
570 | c08d76f5 | Michael Hanselmann | (start_time, start_mbytes) = samples[0]
|
571 | c08d76f5 | Michael Hanselmann | (end_time, end_mbytes) = samples[-1]
|
572 | c08d76f5 | Michael Hanselmann | |
573 | c08d76f5 | Michael Hanselmann | return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |