root / lib / impexpd / __init__.py @ 3ae259d3
History | View | Annotate | Download (16.3 kB)
1 | bb44b1ae | Michael Hanselmann | #
|
---|---|---|---|
2 | bb44b1ae | Michael Hanselmann | #
|
3 | bb44b1ae | Michael Hanselmann | |
4 | bb44b1ae | Michael Hanselmann | # Copyright (C) 2010 Google Inc.
|
5 | bb44b1ae | Michael Hanselmann | #
|
6 | bb44b1ae | Michael Hanselmann | # This program is free software; you can redistribute it and/or modify
|
7 | bb44b1ae | Michael Hanselmann | # it under the terms of the GNU General Public License as published by
|
8 | bb44b1ae | Michael Hanselmann | # the Free Software Foundation; either version 2 of the License, or
|
9 | bb44b1ae | Michael Hanselmann | # (at your option) any later version.
|
10 | bb44b1ae | Michael Hanselmann | #
|
11 | bb44b1ae | Michael Hanselmann | # This program is distributed in the hope that it will be useful, but
|
12 | bb44b1ae | Michael Hanselmann | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | bb44b1ae | Michael Hanselmann | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | bb44b1ae | Michael Hanselmann | # General Public License for more details.
|
15 | bb44b1ae | Michael Hanselmann | #
|
16 | bb44b1ae | Michael Hanselmann | # You should have received a copy of the GNU General Public License
|
17 | bb44b1ae | Michael Hanselmann | # along with this program; if not, write to the Free Software
|
18 | bb44b1ae | Michael Hanselmann | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | bb44b1ae | Michael Hanselmann | # 02110-1301, USA.
|
20 | bb44b1ae | Michael Hanselmann | |
21 | bb44b1ae | Michael Hanselmann | |
22 | bb44b1ae | Michael Hanselmann | """Classes and functions for import/export daemon.
|
23 | bb44b1ae | Michael Hanselmann |
|
24 | bb44b1ae | Michael Hanselmann | """
|
25 | bb44b1ae | Michael Hanselmann | |
26 | c08d76f5 | Michael Hanselmann | import os |
27 | 34c9ee7b | Michael Hanselmann | import re |
28 | 34c9ee7b | Michael Hanselmann | import socket |
29 | 53dbf14c | Michael Hanselmann | import logging |
30 | c08d76f5 | Michael Hanselmann | import signal |
31 | c08d76f5 | Michael Hanselmann | import errno |
32 | c08d76f5 | Michael Hanselmann | import time |
33 | bb44b1ae | Michael Hanselmann | from cStringIO import StringIO |
34 | bb44b1ae | Michael Hanselmann | |
35 | bb44b1ae | Michael Hanselmann | from ganeti import constants |
36 | bb44b1ae | Michael Hanselmann | from ganeti import errors |
37 | bb44b1ae | Michael Hanselmann | from ganeti import utils |
38 | 58bb385c | Michael Hanselmann | from ganeti import netutils |
39 | b8028dcf | Michael Hanselmann | from ganeti import compat |
40 | bb44b1ae | Michael Hanselmann | |
41 | bb44b1ae | Michael Hanselmann | |
42 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) starts to listen on its socket.
|
43 | 34c9ee7b | Michael Hanselmann | #: The local address is required for the remote peer to connect (in particular
|
44 | 34c9ee7b | Michael Hanselmann | #: the port number).
|
45 | 34c9ee7b | Michael Hanselmann | LISTENING_RE = re.compile(r"^listening on\s+"
|
46 | 34c9ee7b | Michael Hanselmann | r"AF=(?P<family>\d+)\s+"
|
47 | 34c9ee7b | Michael Hanselmann | r"(?P<address>.+):(?P<port>\d+)$", re.I)
|
48 | 34c9ee7b | Michael Hanselmann | |
49 | 34c9ee7b | Michael Hanselmann | #: Used to recognize point at which socat(1) is sending data over the wire
|
50 | 34c9ee7b | Michael Hanselmann | TRANSFER_LOOP_RE = re.compile(r"^starting data transfer loop with FDs\s+.*$",
|
51 | 34c9ee7b | Michael Hanselmann | re.I) |
52 | 34c9ee7b | Michael Hanselmann | |
53 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG = "D"
|
54 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO = "I"
|
55 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE = "N"
|
56 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_WARNING = "W"
|
57 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_ERROR = "E"
|
58 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_FATAL = "F"
|
59 | 34c9ee7b | Michael Hanselmann | |
60 | b8028dcf | Michael Hanselmann | SOCAT_LOG_IGNORE = compat.UniqueFrozenset([ |
61 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_DEBUG, |
62 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_INFO, |
63 | 34c9ee7b | Michael Hanselmann | SOCAT_LOG_NOTICE, |
64 | 34c9ee7b | Michael Hanselmann | ]) |
65 | 34c9ee7b | Michael Hanselmann | |
66 | c08d76f5 | Michael Hanselmann | #: Used to parse GNU dd(1) statistics
|
67 | c08d76f5 | Michael Hanselmann | DD_INFO_RE = re.compile(r"^(?P<bytes>\d+)\s*byte(?:|s)\s.*\scopied,\s*"
|
68 | c08d76f5 | Michael Hanselmann | r"(?P<seconds>[\d.]+)\s*s(?:|econds),.*$", re.I)
|
69 | c08d76f5 | Michael Hanselmann | |
70 | c08d76f5 | Michael Hanselmann | #: Used to ignore "N+N records in/out" on dd(1)'s stderr
|
71 | c08d76f5 | Michael Hanselmann | DD_STDERR_IGNORE = re.compile(r"^\d+\+\d+\s*records\s+(?:in|out)$", re.I)
|
72 | c08d76f5 | Michael Hanselmann | |
73 | c08d76f5 | Michael Hanselmann | #: Signal upon which dd(1) will print statistics (on some platforms, SIGINFO is
|
74 | c08d76f5 | Michael Hanselmann | #: unavailable and SIGUSR1 is used instead)
|
75 | c08d76f5 | Michael Hanselmann | DD_INFO_SIGNAL = getattr(signal, "SIGINFO", signal.SIGUSR1) |
76 | c08d76f5 | Michael Hanselmann | |
77 | bb44b1ae | Michael Hanselmann | #: Buffer size: at most this many bytes are transferred at once
|
78 | bb44b1ae | Michael Hanselmann | BUFSIZE = 1024 * 1024 |
79 | bb44b1ae | Michael Hanselmann | |
80 | bb44b1ae | Michael Hanselmann | # Common options for socat
|
81 | bb44b1ae | Michael Hanselmann | SOCAT_TCP_OPTS = ["keepalive", "keepidle=60", "keepintvl=10", "keepcnt=5"] |
82 | 971bbd84 | Michael Hanselmann | SOCAT_OPENSSL_OPTS = ["verify=1", "method=TLSv1", |
83 | 971bbd84 | Michael Hanselmann | "cipher=%s" % constants.OPENSSL_CIPHERS]
|
84 | bb44b1ae | Michael Hanselmann | |
85 | e90739d6 | Michael Hanselmann | if constants.SOCAT_USE_COMPRESS:
|
86 | e90739d6 | Michael Hanselmann | # Disables all compression in by OpenSSL. Only supported in patched versions
|
87 | e90739d6 | Michael Hanselmann | # of socat (as of November 2010). See INSTALL for more information.
|
88 | e90739d6 | Michael Hanselmann | SOCAT_OPENSSL_OPTS.append("compress=none")
|
89 | e90739d6 | Michael Hanselmann | |
90 | 0559f745 | Michael Hanselmann | SOCAT_OPTION_MAXLEN = 400
|
91 | 0559f745 | Michael Hanselmann | |
92 | 34c9ee7b | Michael Hanselmann | (PROG_OTHER, |
93 | c08d76f5 | Michael Hanselmann | PROG_SOCAT, |
94 | c08d76f5 | Michael Hanselmann | PROG_DD, |
95 | f9323011 | Michael Hanselmann | PROG_DD_PID, |
96 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE) = range(1, 6) |
97 | b8028dcf | Michael Hanselmann | |
98 | b8028dcf | Michael Hanselmann | PROG_ALL = compat.UniqueFrozenset([ |
99 | 34c9ee7b | Michael Hanselmann | PROG_OTHER, |
100 | 34c9ee7b | Michael Hanselmann | PROG_SOCAT, |
101 | c08d76f5 | Michael Hanselmann | PROG_DD, |
102 | c08d76f5 | Michael Hanselmann | PROG_DD_PID, |
103 | f9323011 | Michael Hanselmann | PROG_EXP_SIZE, |
104 | 34c9ee7b | Michael Hanselmann | ]) |
105 | 34c9ee7b | Michael Hanselmann | |
106 | bb44b1ae | Michael Hanselmann | |
107 | bb44b1ae | Michael Hanselmann | class CommandBuilder(object): |
108 | c08d76f5 | Michael Hanselmann | def __init__(self, mode, opts, socat_stderr_fd, dd_stderr_fd, dd_pid_fd): |
109 | bb44b1ae | Michael Hanselmann | """Initializes this class.
|
110 | bb44b1ae | Michael Hanselmann |
|
111 | bb44b1ae | Michael Hanselmann | @param mode: Daemon mode (import or export)
|
112 | bb44b1ae | Michael Hanselmann | @param opts: Options object
|
113 | bb44b1ae | Michael Hanselmann | @type socat_stderr_fd: int
|
114 | bb44b1ae | Michael Hanselmann | @param socat_stderr_fd: File descriptor socat should write its stderr to
|
115 | c08d76f5 | Michael Hanselmann | @type dd_stderr_fd: int
|
116 | c08d76f5 | Michael Hanselmann | @param dd_stderr_fd: File descriptor dd should write its stderr to
|
117 | c08d76f5 | Michael Hanselmann | @type dd_pid_fd: int
|
118 | c08d76f5 | Michael Hanselmann | @param dd_pid_fd: File descriptor the child should write dd's PID to
|
119 | bb44b1ae | Michael Hanselmann |
|
120 | bb44b1ae | Michael Hanselmann | """
|
121 | bb44b1ae | Michael Hanselmann | self._opts = opts
|
122 | bb44b1ae | Michael Hanselmann | self._mode = mode
|
123 | bb44b1ae | Michael Hanselmann | self._socat_stderr_fd = socat_stderr_fd
|
124 | c08d76f5 | Michael Hanselmann | self._dd_stderr_fd = dd_stderr_fd
|
125 | c08d76f5 | Michael Hanselmann | self._dd_pid_fd = dd_pid_fd
|
126 | bb44b1ae | Michael Hanselmann | |
127 | 1d3dfa29 | Michael Hanselmann | assert (self._opts.magic is None or |
128 | 1d3dfa29 | Michael Hanselmann | constants.IE_MAGIC_RE.match(self._opts.magic))
|
129 | 1d3dfa29 | Michael Hanselmann | |
130 | bb44b1ae | Michael Hanselmann | @staticmethod
|
131 | bb44b1ae | Michael Hanselmann | def GetBashCommand(cmd): |
132 | bb44b1ae | Michael Hanselmann | """Prepares a command to be run in Bash.
|
133 | bb44b1ae | Michael Hanselmann |
|
134 | bb44b1ae | Michael Hanselmann | """
|
135 | bb44b1ae | Michael Hanselmann | return ["bash", "-o", "errexit", "-o", "pipefail", "-c", cmd] |
136 | bb44b1ae | Michael Hanselmann | |
137 | bb44b1ae | Michael Hanselmann | def _GetSocatCommand(self): |
138 | bb44b1ae | Michael Hanselmann | """Returns the socat command.
|
139 | bb44b1ae | Michael Hanselmann |
|
140 | bb44b1ae | Michael Hanselmann | """
|
141 | bb44b1ae | Michael Hanselmann | common_addr_opts = SOCAT_TCP_OPTS + SOCAT_OPENSSL_OPTS + [ |
142 | bb44b1ae | Michael Hanselmann | "key=%s" % self._opts.key, |
143 | bb44b1ae | Michael Hanselmann | "cert=%s" % self._opts.cert, |
144 | bb44b1ae | Michael Hanselmann | "cafile=%s" % self._opts.ca, |
145 | bb44b1ae | Michael Hanselmann | ] |
146 | bb44b1ae | Michael Hanselmann | |
147 | bb44b1ae | Michael Hanselmann | if self._opts.bind is not None: |
148 | bb44b1ae | Michael Hanselmann | common_addr_opts.append("bind=%s" % self._opts.bind) |
149 | bb44b1ae | Michael Hanselmann | |
150 | 58bb385c | Michael Hanselmann | assert not (self._opts.ipv4 and self._opts.ipv6) |
151 | 58bb385c | Michael Hanselmann | |
152 | 58bb385c | Michael Hanselmann | if self._opts.ipv4: |
153 | 58bb385c | Michael Hanselmann | common_addr_opts.append("pf=ipv4")
|
154 | 58bb385c | Michael Hanselmann | elif self._opts.ipv6: |
155 | 58bb385c | Michael Hanselmann | common_addr_opts.append("pf=ipv6")
|
156 | 58bb385c | Michael Hanselmann | |
157 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
158 | bb44b1ae | Michael Hanselmann | if self._opts.port is None: |
159 | bb44b1ae | Michael Hanselmann | port = 0
|
160 | bb44b1ae | Michael Hanselmann | else:
|
161 | bb44b1ae | Michael Hanselmann | port = self._opts.port
|
162 | bb44b1ae | Michael Hanselmann | |
163 | bb44b1ae | Michael Hanselmann | addr1 = [ |
164 | bb44b1ae | Michael Hanselmann | "OPENSSL-LISTEN:%s" % port,
|
165 | bb44b1ae | Michael Hanselmann | "reuseaddr",
|
166 | bb44b1ae | Michael Hanselmann | |
167 | bb44b1ae | Michael Hanselmann | # Retry to listen if connection wasn't established successfully, up to
|
168 | bb44b1ae | Michael Hanselmann | # 100 times a second. Note that this still leaves room for DoS attacks.
|
169 | bb44b1ae | Michael Hanselmann | "forever",
|
170 | bb44b1ae | Michael Hanselmann | "intervall=0.01",
|
171 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
172 | bb44b1ae | Michael Hanselmann | addr2 = ["stdout"]
|
173 | bb44b1ae | Michael Hanselmann | |
174 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
175 | 58bb385c | Michael Hanselmann | if self._opts.host and netutils.IP6Address.IsValid(self._opts.host): |
176 | 58bb385c | Michael Hanselmann | host = "[%s]" % self._opts.host |
177 | 58bb385c | Michael Hanselmann | else:
|
178 | 58bb385c | Michael Hanselmann | host = self._opts.host
|
179 | 58bb385c | Michael Hanselmann | |
180 | bb44b1ae | Michael Hanselmann | addr1 = ["stdin"]
|
181 | bb44b1ae | Michael Hanselmann | addr2 = [ |
182 | 58bb385c | Michael Hanselmann | "OPENSSL:%s:%s" % (host, self._opts.port), |
183 | bb44b1ae | Michael Hanselmann | |
184 | bb44b1ae | Michael Hanselmann | # How long to wait per connection attempt
|
185 | bb44b1ae | Michael Hanselmann | "connect-timeout=%s" % self._opts.connect_timeout, |
186 | bb44b1ae | Michael Hanselmann | |
187 | bb44b1ae | Michael Hanselmann | # Retry a few times before giving up to connect (once per second)
|
188 | bb44b1ae | Michael Hanselmann | "retry=%s" % self._opts.connect_retries, |
189 | bb44b1ae | Michael Hanselmann | "intervall=1",
|
190 | bb44b1ae | Michael Hanselmann | ] + common_addr_opts |
191 | bb44b1ae | Michael Hanselmann | |
192 | bb44b1ae | Michael Hanselmann | else:
|
193 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
194 | bb44b1ae | Michael Hanselmann | |
195 | bb44b1ae | Michael Hanselmann | for i in [addr1, addr2]: |
196 | bb44b1ae | Michael Hanselmann | for value in i: |
197 | 0559f745 | Michael Hanselmann | if len(value) > SOCAT_OPTION_MAXLEN: |
198 | 0559f745 | Michael Hanselmann | raise errors.GenericError("Socat option longer than %s" |
199 | 0559f745 | Michael Hanselmann | " characters: %r" %
|
200 | 0559f745 | Michael Hanselmann | (SOCAT_OPTION_MAXLEN, value)) |
201 | bb44b1ae | Michael Hanselmann | if "," in value: |
202 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Comma not allowed in socat option" |
203 | bb44b1ae | Michael Hanselmann | " value: %r" % value)
|
204 | bb44b1ae | Michael Hanselmann | |
205 | bb44b1ae | Michael Hanselmann | return [
|
206 | bb44b1ae | Michael Hanselmann | constants.SOCAT_PATH, |
207 | bb44b1ae | Michael Hanselmann | |
208 | bb44b1ae | Michael Hanselmann | # Log to stderr
|
209 | bb44b1ae | Michael Hanselmann | "-ls",
|
210 | bb44b1ae | Michael Hanselmann | |
211 | bb44b1ae | Michael Hanselmann | # Log level
|
212 | bb44b1ae | Michael Hanselmann | "-d", "-d", |
213 | bb44b1ae | Michael Hanselmann | |
214 | bb44b1ae | Michael Hanselmann | # Buffer size
|
215 | bb44b1ae | Michael Hanselmann | "-b%s" % BUFSIZE,
|
216 | bb44b1ae | Michael Hanselmann | |
217 | bb44b1ae | Michael Hanselmann | # Unidirectional mode, the first address is only used for reading, and the
|
218 | bb44b1ae | Michael Hanselmann | # second address is only used for writing
|
219 | bb44b1ae | Michael Hanselmann | "-u",
|
220 | bb44b1ae | Michael Hanselmann | |
221 | 3c286190 | Dimitris Aragiorgis | ",".join(addr1), ",".join(addr2), |
222 | bb44b1ae | Michael Hanselmann | ] |
223 | bb44b1ae | Michael Hanselmann | |
224 | 1d3dfa29 | Michael Hanselmann | def _GetMagicCommand(self): |
225 | 1d3dfa29 | Michael Hanselmann | """Returns the command to read/write the magic value.
|
226 | 1d3dfa29 | Michael Hanselmann |
|
227 | 1d3dfa29 | Michael Hanselmann | """
|
228 | 1d3dfa29 | Michael Hanselmann | if not self._opts.magic: |
229 | 1d3dfa29 | Michael Hanselmann | return None |
230 | 1d3dfa29 | Michael Hanselmann | |
231 | 1d3dfa29 | Michael Hanselmann | # Prefix to ensure magic isn't interpreted as option to "echo"
|
232 | 1d3dfa29 | Michael Hanselmann | magic = "M=%s" % self._opts.magic |
233 | 1d3dfa29 | Michael Hanselmann | |
234 | 1d3dfa29 | Michael Hanselmann | cmd = StringIO() |
235 | 1d3dfa29 | Michael Hanselmann | |
236 | 1d3dfa29 | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
237 | 1d3dfa29 | Michael Hanselmann | cmd.write("{ ")
|
238 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["read", "-n", str(len(magic)), "magic"])) |
239 | 1d3dfa29 | Michael Hanselmann | cmd.write(" && ")
|
240 | 1d3dfa29 | Michael Hanselmann | cmd.write("if test \"$magic\" != %s; then" % utils.ShellQuote(magic))
|
241 | 1d3dfa29 | Michael Hanselmann | cmd.write(" echo %s >&2;" % utils.ShellQuote("Magic value mismatch")) |
242 | 1d3dfa29 | Michael Hanselmann | cmd.write(" exit 1;")
|
243 | 1d3dfa29 | Michael Hanselmann | cmd.write("fi;")
|
244 | 1d3dfa29 | Michael Hanselmann | cmd.write(" }")
|
245 | 1d3dfa29 | Michael Hanselmann | |
246 | 1d3dfa29 | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
247 | 1d3dfa29 | Michael Hanselmann | cmd.write(utils.ShellQuoteArgs(["echo", "-E", "-n", magic])) |
248 | 1d3dfa29 | Michael Hanselmann | |
249 | 1d3dfa29 | Michael Hanselmann | else:
|
250 | 1d3dfa29 | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
251 | 1d3dfa29 | Michael Hanselmann | |
252 | 1d3dfa29 | Michael Hanselmann | return cmd.getvalue()
|
253 | 1d3dfa29 | Michael Hanselmann | |
254 | fbb6b864 | Michael Hanselmann | def _GetDdCommand(self): |
255 | fbb6b864 | Michael Hanselmann | """Returns the command for measuring throughput.
|
256 | bb44b1ae | Michael Hanselmann |
|
257 | bb44b1ae | Michael Hanselmann | """
|
258 | c08d76f5 | Michael Hanselmann | dd_cmd = StringIO() |
259 | 1d3dfa29 | Michael Hanselmann | |
260 | 1d3dfa29 | Michael Hanselmann | magic_cmd = self._GetMagicCommand()
|
261 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
262 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
263 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(magic_cmd) |
264 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" && ")
|
265 | 1d3dfa29 | Michael Hanselmann | |
266 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("{ ")
|
267 | 2ed0e208 | Iustin Pop | # Setting LC_ALL since we want to parse the output and explicitly
|
268 | 2ed0e208 | Iustin Pop | # redirecting stdin, as the background process (dd) would have
|
269 | 2ed0e208 | Iustin Pop | # /dev/null as stdin otherwise
|
270 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write("LC_ALL=C dd bs=%s <&0 2>&%d & pid=${!};" %
|
271 | c08d76f5 | Michael Hanselmann | (BUFSIZE, self._dd_stderr_fd))
|
272 | c08d76f5 | Michael Hanselmann | # Send PID to daemon
|
273 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" echo $pid >&%d;" % self._dd_pid_fd) |
274 | c08d76f5 | Michael Hanselmann | # And wait for dd
|
275 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" wait $pid;")
|
276 | c08d76f5 | Michael Hanselmann | dd_cmd.write(" }")
|
277 | 1d3dfa29 | Michael Hanselmann | |
278 | 1d3dfa29 | Michael Hanselmann | if magic_cmd:
|
279 | 1d3dfa29 | Michael Hanselmann | dd_cmd.write(" }")
|
280 | 1d3dfa29 | Michael Hanselmann | |
281 | fbb6b864 | Michael Hanselmann | return dd_cmd.getvalue()
|
282 | fbb6b864 | Michael Hanselmann | |
283 | fbb6b864 | Michael Hanselmann | def _GetTransportCommand(self): |
284 | fbb6b864 | Michael Hanselmann | """Returns the command for the transport part of the daemon.
|
285 | fbb6b864 | Michael Hanselmann |
|
286 | fbb6b864 | Michael Hanselmann | """
|
287 | fbb6b864 | Michael Hanselmann | socat_cmd = ("%s 2>&%d" %
|
288 | fbb6b864 | Michael Hanselmann | (utils.ShellQuoteArgs(self._GetSocatCommand()),
|
289 | fbb6b864 | Michael Hanselmann | self._socat_stderr_fd))
|
290 | fbb6b864 | Michael Hanselmann | dd_cmd = self._GetDdCommand()
|
291 | c08d76f5 | Michael Hanselmann | |
292 | bb44b1ae | Michael Hanselmann | compr = self._opts.compress
|
293 | bb44b1ae | Michael Hanselmann | |
294 | bb44b1ae | Michael Hanselmann | assert compr in constants.IEC_ALL |
295 | bb44b1ae | Michael Hanselmann | |
296 | fbb6b864 | Michael Hanselmann | parts = [] |
297 | fbb6b864 | Michael Hanselmann | |
298 | bb44b1ae | Michael Hanselmann | if self._mode == constants.IEM_IMPORT: |
299 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
300 | fbb6b864 | Michael Hanselmann | |
301 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
302 | fbb6b864 | Michael Hanselmann | parts.append("gunzip -c")
|
303 | fbb6b864 | Michael Hanselmann | |
304 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
305 | c08d76f5 | Michael Hanselmann | |
306 | bb44b1ae | Michael Hanselmann | elif self._mode == constants.IEM_EXPORT: |
307 | fbb6b864 | Michael Hanselmann | parts.append(dd_cmd) |
308 | fbb6b864 | Michael Hanselmann | |
309 | bb44b1ae | Michael Hanselmann | if compr == constants.IEC_GZIP:
|
310 | fbb6b864 | Michael Hanselmann | parts.append("gzip -c")
|
311 | fbb6b864 | Michael Hanselmann | |
312 | fbb6b864 | Michael Hanselmann | parts.append(socat_cmd) |
313 | c08d76f5 | Michael Hanselmann | |
314 | bb44b1ae | Michael Hanselmann | else:
|
315 | bb44b1ae | Michael Hanselmann | raise errors.GenericError("Invalid mode '%s'" % self._mode) |
316 | bb44b1ae | Michael Hanselmann | |
317 | bb44b1ae | Michael Hanselmann | # TODO: Run transport as separate user
|
318 | bb44b1ae | Michael Hanselmann | # The transport uses its own shell to simplify running it as a separate user
|
319 | bb44b1ae | Michael Hanselmann | # in the future.
|
320 | fbb6b864 | Michael Hanselmann | return self.GetBashCommand(" | ".join(parts)) |
321 | bb44b1ae | Michael Hanselmann | |
322 | bb44b1ae | Michael Hanselmann | def GetCommand(self): |
323 | bb44b1ae | Michael Hanselmann | """Returns the complete child process command.
|
324 | bb44b1ae | Michael Hanselmann |
|
325 | bb44b1ae | Michael Hanselmann | """
|
326 | bb44b1ae | Michael Hanselmann | transport_cmd = self._GetTransportCommand()
|
327 | bb44b1ae | Michael Hanselmann | |
328 | bb44b1ae | Michael Hanselmann | buf = StringIO() |
329 | bb44b1ae | Michael Hanselmann | |
330 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_prefix: |
331 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_prefix)
|
332 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
333 | bb44b1ae | Michael Hanselmann | |
334 | bb44b1ae | Michael Hanselmann | buf.write(utils.ShellQuoteArgs(transport_cmd)) |
335 | bb44b1ae | Michael Hanselmann | |
336 | bb44b1ae | Michael Hanselmann | if self._opts.cmd_suffix: |
337 | bb44b1ae | Michael Hanselmann | buf.write(" ")
|
338 | bb44b1ae | Michael Hanselmann | buf.write(self._opts.cmd_suffix)
|
339 | bb44b1ae | Michael Hanselmann | |
340 | bb44b1ae | Michael Hanselmann | return self.GetBashCommand(buf.getvalue()) |
341 | 34c9ee7b | Michael Hanselmann | |
342 | 34c9ee7b | Michael Hanselmann | |
343 | 34c9ee7b | Michael Hanselmann | def _VerifyListening(family, address, port): |
344 | 34c9ee7b | Michael Hanselmann | """Verify address given as listening address by socat.
|
345 | 34c9ee7b | Michael Hanselmann |
|
346 | 34c9ee7b | Michael Hanselmann | """
|
347 | 58bb385c | Michael Hanselmann | if family not in (socket.AF_INET, socket.AF_INET6): |
348 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Address family %r not supported" % family) |
349 | 34c9ee7b | Michael Hanselmann | |
350 | 58bb385c | Michael Hanselmann | if (family == socket.AF_INET6 and address.startswith("[") and |
351 | 58bb385c | Michael Hanselmann | address.endswith("]")):
|
352 | 58bb385c | Michael Hanselmann | address = address.lstrip("[").rstrip("]") |
353 | 58bb385c | Michael Hanselmann | |
354 | 34c9ee7b | Michael Hanselmann | try:
|
355 | 34c9ee7b | Michael Hanselmann | packed_address = socket.inet_pton(family, address) |
356 | 34c9ee7b | Michael Hanselmann | except socket.error:
|
357 | 34c9ee7b | Michael Hanselmann | raise errors.GenericError("Invalid address %r for family %s" % |
358 | 34c9ee7b | Michael Hanselmann | (address, family)) |
359 | 34c9ee7b | Michael Hanselmann | |
360 | 34c9ee7b | Michael Hanselmann | return (socket.inet_ntop(family, packed_address), port)
|
361 | 34c9ee7b | Michael Hanselmann | |
362 | 34c9ee7b | Michael Hanselmann | |
363 | 34c9ee7b | Michael Hanselmann | class ChildIOProcessor(object): |
364 | f9323011 | Michael Hanselmann | def __init__(self, debug, status_file, logger, throughput_samples, exp_size): |
365 | 34c9ee7b | Michael Hanselmann | """Initializes this class.
|
366 | 34c9ee7b | Michael Hanselmann |
|
367 | 34c9ee7b | Michael Hanselmann | """
|
368 | 34c9ee7b | Michael Hanselmann | self._debug = debug
|
369 | 34c9ee7b | Michael Hanselmann | self._status_file = status_file
|
370 | 34c9ee7b | Michael Hanselmann | self._logger = logger
|
371 | 34c9ee7b | Michael Hanselmann | |
372 | 34c9ee7b | Michael Hanselmann | self._splitter = dict([(prog, utils.LineSplitter(self._ProcessOutput, prog)) |
373 | 34c9ee7b | Michael Hanselmann | for prog in PROG_ALL]) |
374 | 34c9ee7b | Michael Hanselmann | |
375 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
376 | c08d76f5 | Michael Hanselmann | self._dd_ready = False |
377 | c08d76f5 | Michael Hanselmann | self._dd_tp_samples = throughput_samples
|
378 | c08d76f5 | Michael Hanselmann | self._dd_progress = []
|
379 | c08d76f5 | Michael Hanselmann | |
380 | c08d76f5 | Michael Hanselmann | # Expected size of transferred data
|
381 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
382 | c08d76f5 | Michael Hanselmann | |
383 | 34c9ee7b | Michael Hanselmann | def GetLineSplitter(self, prog): |
384 | 34c9ee7b | Michael Hanselmann | """Returns the line splitter for a program.
|
385 | 34c9ee7b | Michael Hanselmann |
|
386 | 34c9ee7b | Michael Hanselmann | """
|
387 | 34c9ee7b | Michael Hanselmann | return self._splitter[prog] |
388 | 34c9ee7b | Michael Hanselmann | |
389 | 34c9ee7b | Michael Hanselmann | def FlushAll(self): |
390 | 34c9ee7b | Michael Hanselmann | """Flushes all line splitters.
|
391 | 34c9ee7b | Michael Hanselmann |
|
392 | 34c9ee7b | Michael Hanselmann | """
|
393 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
394 | 34c9ee7b | Michael Hanselmann | ls.flush() |
395 | 34c9ee7b | Michael Hanselmann | |
396 | 34c9ee7b | Michael Hanselmann | def CloseAll(self): |
397 | 34c9ee7b | Michael Hanselmann | """Closes all line splitters.
|
398 | 34c9ee7b | Michael Hanselmann |
|
399 | 34c9ee7b | Michael Hanselmann | """
|
400 | 34c9ee7b | Michael Hanselmann | for ls in self._splitter.itervalues(): |
401 | 34c9ee7b | Michael Hanselmann | ls.close() |
402 | 34c9ee7b | Michael Hanselmann | self._splitter.clear()
|
403 | 34c9ee7b | Michael Hanselmann | |
404 | c08d76f5 | Michael Hanselmann | def NotifyDd(self): |
405 | c08d76f5 | Michael Hanselmann | """Tells dd(1) to write statistics.
|
406 | c08d76f5 | Michael Hanselmann |
|
407 | c08d76f5 | Michael Hanselmann | """
|
408 | c08d76f5 | Michael Hanselmann | if self._dd_pid is None: |
409 | c08d76f5 | Michael Hanselmann | # Can't notify
|
410 | c08d76f5 | Michael Hanselmann | return False |
411 | c08d76f5 | Michael Hanselmann | |
412 | c08d76f5 | Michael Hanselmann | if not self._dd_ready: |
413 | c08d76f5 | Michael Hanselmann | # There's a race condition between starting the program and sending
|
414 | c08d76f5 | Michael Hanselmann | # signals. The signal handler is only registered after some time, so we
|
415 | c08d76f5 | Michael Hanselmann | # have to check whether the program is ready. If it isn't, sending a
|
416 | c08d76f5 | Michael Hanselmann | # signal will invoke the default handler (and usually abort the program).
|
417 | c08d76f5 | Michael Hanselmann | if not utils.IsProcessHandlingSignal(self._dd_pid, DD_INFO_SIGNAL): |
418 | c08d76f5 | Michael Hanselmann | logging.debug("dd is not yet ready for signal %s", DD_INFO_SIGNAL)
|
419 | c08d76f5 | Michael Hanselmann | return False |
420 | c08d76f5 | Michael Hanselmann | |
421 | c08d76f5 | Michael Hanselmann | logging.debug("dd is now handling signal %s", DD_INFO_SIGNAL)
|
422 | c08d76f5 | Michael Hanselmann | self._dd_ready = True |
423 | c08d76f5 | Michael Hanselmann | |
424 | c08d76f5 | Michael Hanselmann | logging.debug("Sending signal %s to PID %s", DD_INFO_SIGNAL, self._dd_pid) |
425 | c08d76f5 | Michael Hanselmann | try:
|
426 | c08d76f5 | Michael Hanselmann | os.kill(self._dd_pid, DD_INFO_SIGNAL)
|
427 | c08d76f5 | Michael Hanselmann | except EnvironmentError, err: |
428 | c08d76f5 | Michael Hanselmann | if err.errno != errno.ESRCH:
|
429 | c08d76f5 | Michael Hanselmann | raise
|
430 | c08d76f5 | Michael Hanselmann | |
431 | c08d76f5 | Michael Hanselmann | # Process no longer exists
|
432 | 560cbec1 | Michael Hanselmann | logging.debug("dd exited")
|
433 | c08d76f5 | Michael Hanselmann | self._dd_pid = None |
434 | c08d76f5 | Michael Hanselmann | |
435 | c08d76f5 | Michael Hanselmann | return True |
436 | c08d76f5 | Michael Hanselmann | |
437 | 34c9ee7b | Michael Hanselmann | def _ProcessOutput(self, line, prog): |
438 | 34c9ee7b | Michael Hanselmann | """Takes care of child process output.
|
439 | 34c9ee7b | Michael Hanselmann |
|
440 | 34c9ee7b | Michael Hanselmann | @type line: string
|
441 | 34c9ee7b | Michael Hanselmann | @param line: Child output line
|
442 | 34c9ee7b | Michael Hanselmann | @type prog: number
|
443 | 34c9ee7b | Michael Hanselmann | @param prog: Program from which the line originates
|
444 | 34c9ee7b | Michael Hanselmann |
|
445 | 34c9ee7b | Michael Hanselmann | """
|
446 | 34c9ee7b | Michael Hanselmann | force_update = False
|
447 | 34c9ee7b | Michael Hanselmann | forward_line = line |
448 | 34c9ee7b | Michael Hanselmann | |
449 | 34c9ee7b | Michael Hanselmann | if prog == PROG_SOCAT:
|
450 | 34c9ee7b | Michael Hanselmann | level = None
|
451 | 34c9ee7b | Michael Hanselmann | parts = line.split(None, 4) |
452 | 34c9ee7b | Michael Hanselmann | |
453 | 34c9ee7b | Michael Hanselmann | if len(parts) == 5: |
454 | 34c9ee7b | Michael Hanselmann | (_, _, _, level, msg) = parts |
455 | 34c9ee7b | Michael Hanselmann | |
456 | 34c9ee7b | Michael Hanselmann | force_update = self._ProcessSocatOutput(self._status_file, level, msg) |
457 | 34c9ee7b | Michael Hanselmann | |
458 | 34c9ee7b | Michael Hanselmann | if self._debug or (level and level not in SOCAT_LOG_IGNORE): |
459 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s %s" % (level, msg)
|
460 | 34c9ee7b | Michael Hanselmann | else:
|
461 | 34c9ee7b | Michael Hanselmann | forward_line = None
|
462 | 34c9ee7b | Michael Hanselmann | else:
|
463 | 34c9ee7b | Michael Hanselmann | forward_line = "socat: %s" % line
|
464 | 34c9ee7b | Michael Hanselmann | |
465 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD:
|
466 | c08d76f5 | Michael Hanselmann | (should_forward, force_update) = self._ProcessDdOutput(line)
|
467 | c08d76f5 | Michael Hanselmann | |
468 | c08d76f5 | Michael Hanselmann | if should_forward or self._debug: |
469 | c08d76f5 | Michael Hanselmann | forward_line = "dd: %s" % line
|
470 | c08d76f5 | Michael Hanselmann | else:
|
471 | c08d76f5 | Michael Hanselmann | forward_line = None
|
472 | c08d76f5 | Michael Hanselmann | |
473 | c08d76f5 | Michael Hanselmann | elif prog == PROG_DD_PID:
|
474 | c08d76f5 | Michael Hanselmann | if self._dd_pid: |
475 | c08d76f5 | Michael Hanselmann | raise RuntimeError("dd PID reported more than once") |
476 | c08d76f5 | Michael Hanselmann | logging.debug("Received dd PID %r", line)
|
477 | c08d76f5 | Michael Hanselmann | self._dd_pid = int(line) |
478 | c08d76f5 | Michael Hanselmann | forward_line = None
|
479 | c08d76f5 | Michael Hanselmann | |
480 | f9323011 | Michael Hanselmann | elif prog == PROG_EXP_SIZE:
|
481 | f9323011 | Michael Hanselmann | logging.debug("Received predicted size %r", line)
|
482 | f9323011 | Michael Hanselmann | forward_line = None
|
483 | f9323011 | Michael Hanselmann | |
484 | f9323011 | Michael Hanselmann | if line:
|
485 | f9323011 | Michael Hanselmann | try:
|
486 | f9323011 | Michael Hanselmann | exp_size = utils.BytesToMebibyte(int(line))
|
487 | f9323011 | Michael Hanselmann | except (ValueError, TypeError), err: |
488 | f9323011 | Michael Hanselmann | logging.error("Failed to convert predicted size %r to number: %s",
|
489 | f9323011 | Michael Hanselmann | line, err) |
490 | f9323011 | Michael Hanselmann | exp_size = None
|
491 | f9323011 | Michael Hanselmann | else:
|
492 | f9323011 | Michael Hanselmann | exp_size = None
|
493 | f9323011 | Michael Hanselmann | |
494 | f9323011 | Michael Hanselmann | self._exp_size = exp_size
|
495 | f9323011 | Michael Hanselmann | |
496 | 34c9ee7b | Michael Hanselmann | if forward_line:
|
497 | 34c9ee7b | Michael Hanselmann | self._logger.info(forward_line)
|
498 | 34c9ee7b | Michael Hanselmann | self._status_file.AddRecentOutput(forward_line)
|
499 | 34c9ee7b | Michael Hanselmann | |
500 | 34c9ee7b | Michael Hanselmann | self._status_file.Update(force_update)
|
501 | 34c9ee7b | Michael Hanselmann | |
502 | 34c9ee7b | Michael Hanselmann | @staticmethod
|
503 | 34c9ee7b | Michael Hanselmann | def _ProcessSocatOutput(status_file, level, msg): |
504 | 34c9ee7b | Michael Hanselmann | """Interprets socat log output.
|
505 | 34c9ee7b | Michael Hanselmann |
|
506 | 34c9ee7b | Michael Hanselmann | """
|
507 | 34c9ee7b | Michael Hanselmann | if level == SOCAT_LOG_NOTICE:
|
508 | 34c9ee7b | Michael Hanselmann | if status_file.GetListenPort() is None: |
509 | 34c9ee7b | Michael Hanselmann | # TODO: Maybe implement timeout to not listen forever
|
510 | 34c9ee7b | Michael Hanselmann | m = LISTENING_RE.match(msg) |
511 | 34c9ee7b | Michael Hanselmann | if m:
|
512 | 34c9ee7b | Michael Hanselmann | (_, port) = _VerifyListening(int(m.group("family")), |
513 | 34c9ee7b | Michael Hanselmann | m.group("address"),
|
514 | 34c9ee7b | Michael Hanselmann | int(m.group("port"))) |
515 | 34c9ee7b | Michael Hanselmann | |
516 | 34c9ee7b | Michael Hanselmann | status_file.SetListenPort(port) |
517 | 34c9ee7b | Michael Hanselmann | return True |
518 | 34c9ee7b | Michael Hanselmann | |
519 | 34c9ee7b | Michael Hanselmann | if not status_file.GetConnected(): |
520 | 34c9ee7b | Michael Hanselmann | m = TRANSFER_LOOP_RE.match(msg) |
521 | 34c9ee7b | Michael Hanselmann | if m:
|
522 | 53dbf14c | Michael Hanselmann | logging.debug("Connection established")
|
523 | 34c9ee7b | Michael Hanselmann | status_file.SetConnected() |
524 | 34c9ee7b | Michael Hanselmann | return True |
525 | 34c9ee7b | Michael Hanselmann | |
526 | 34c9ee7b | Michael Hanselmann | return False |
527 | c08d76f5 | Michael Hanselmann | |
528 | c08d76f5 | Michael Hanselmann | def _ProcessDdOutput(self, line): |
529 | c08d76f5 | Michael Hanselmann | """Interprets a line of dd(1)'s output.
|
530 | c08d76f5 | Michael Hanselmann |
|
531 | c08d76f5 | Michael Hanselmann | """
|
532 | c08d76f5 | Michael Hanselmann | m = DD_INFO_RE.match(line) |
533 | c08d76f5 | Michael Hanselmann | if m:
|
534 | c08d76f5 | Michael Hanselmann | seconds = float(m.group("seconds")) |
535 | c08d76f5 | Michael Hanselmann | mbytes = utils.BytesToMebibyte(int(m.group("bytes"))) |
536 | c08d76f5 | Michael Hanselmann | self._UpdateDdProgress(seconds, mbytes)
|
537 | c08d76f5 | Michael Hanselmann | return (False, True) |
538 | c08d76f5 | Michael Hanselmann | |
539 | c08d76f5 | Michael Hanselmann | m = DD_STDERR_IGNORE.match(line) |
540 | c08d76f5 | Michael Hanselmann | if m:
|
541 | c08d76f5 | Michael Hanselmann | # Ignore
|
542 | c08d76f5 | Michael Hanselmann | return (False, False) |
543 | c08d76f5 | Michael Hanselmann | |
544 | c08d76f5 | Michael Hanselmann | # Forward line
|
545 | c08d76f5 | Michael Hanselmann | return (True, False) |
546 | c08d76f5 | Michael Hanselmann | |
547 | c08d76f5 | Michael Hanselmann | def _UpdateDdProgress(self, seconds, mbytes): |
548 | c08d76f5 | Michael Hanselmann | """Updates the internal status variables for dd(1) progress.
|
549 | c08d76f5 | Michael Hanselmann |
|
550 | c08d76f5 | Michael Hanselmann | @type seconds: float
|
551 | c08d76f5 | Michael Hanselmann | @param seconds: Timestamp of this update
|
552 | c08d76f5 | Michael Hanselmann | @type mbytes: float
|
553 | c08d76f5 | Michael Hanselmann | @param mbytes: Total number of MiB transferred so far
|
554 | c08d76f5 | Michael Hanselmann |
|
555 | c08d76f5 | Michael Hanselmann | """
|
556 | c08d76f5 | Michael Hanselmann | # Add latest sample
|
557 | c08d76f5 | Michael Hanselmann | self._dd_progress.append((seconds, mbytes))
|
558 | c08d76f5 | Michael Hanselmann | |
559 | c08d76f5 | Michael Hanselmann | # Remove old samples
|
560 | c08d76f5 | Michael Hanselmann | del self._dd_progress[:-self._dd_tp_samples] |
561 | c08d76f5 | Michael Hanselmann | |
562 | c08d76f5 | Michael Hanselmann | # Calculate throughput
|
563 | c08d76f5 | Michael Hanselmann | throughput = _CalcThroughput(self._dd_progress)
|
564 | c08d76f5 | Michael Hanselmann | |
565 | c08d76f5 | Michael Hanselmann | # Calculate percent and ETA
|
566 | c08d76f5 | Michael Hanselmann | percent = None
|
567 | c08d76f5 | Michael Hanselmann | eta = None
|
568 | c08d76f5 | Michael Hanselmann | |
569 | c08d76f5 | Michael Hanselmann | if self._exp_size is not None: |
570 | c08d76f5 | Michael Hanselmann | if self._exp_size != 0: |
571 | c08d76f5 | Michael Hanselmann | percent = max(0, min(100, (100.0 * mbytes) / self._exp_size)) |
572 | c08d76f5 | Michael Hanselmann | |
573 | c08d76f5 | Michael Hanselmann | if throughput:
|
574 | c08d76f5 | Michael Hanselmann | eta = max(0, float(self._exp_size - mbytes) / throughput) |
575 | c08d76f5 | Michael Hanselmann | |
576 | c08d76f5 | Michael Hanselmann | self._status_file.SetProgress(mbytes, throughput, percent, eta)
|
577 | c08d76f5 | Michael Hanselmann | |
578 | c08d76f5 | Michael Hanselmann | |
579 | c08d76f5 | Michael Hanselmann | def _CalcThroughput(samples): |
580 | c08d76f5 | Michael Hanselmann | """Calculates the throughput in MiB/second.
|
581 | c08d76f5 | Michael Hanselmann |
|
582 | c08d76f5 | Michael Hanselmann | @type samples: sequence
|
583 | c08d76f5 | Michael Hanselmann | @param samples: List of samples, each consisting of a (timestamp, mbytes)
|
584 | c08d76f5 | Michael Hanselmann | tuple
|
585 | c08d76f5 | Michael Hanselmann | @rtype: float or None
|
586 | c08d76f5 | Michael Hanselmann | @return: Throughput in MiB/second
|
587 | c08d76f5 | Michael Hanselmann |
|
588 | c08d76f5 | Michael Hanselmann | """
|
589 | c08d76f5 | Michael Hanselmann | if len(samples) < 2: |
590 | c08d76f5 | Michael Hanselmann | # Can't calculate throughput
|
591 | c08d76f5 | Michael Hanselmann | return None |
592 | c08d76f5 | Michael Hanselmann | |
593 | c08d76f5 | Michael Hanselmann | (start_time, start_mbytes) = samples[0]
|
594 | c08d76f5 | Michael Hanselmann | (end_time, end_mbytes) = samples[-1]
|
595 | c08d76f5 | Michael Hanselmann | |
596 | c08d76f5 | Michael Hanselmann | return (float(end_mbytes) - start_mbytes) / (float(end_time) - start_time) |