Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 73cdf9a3

History | View | Annotate | Download (44.2 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 c9300bb3 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 033a1d00 Michael Hanselmann
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
39 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
40 033a1d00 Michael Hanselmann

41 033a1d00 Michael Hanselmann
  """
42 033a1d00 Michael Hanselmann
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
45 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
46 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
47 033a1d00 Michael Hanselmann
48 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
49 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
50 033a1d00 Michael Hanselmann
51 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
52 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
53 033a1d00 Michael Hanselmann
54 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
55 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
56 1a2e7fe9 Michael Hanselmann
57 033a1d00 Michael Hanselmann
  __slots__ = [
58 033a1d00 Michael Hanselmann
    "error",
59 033a1d00 Michael Hanselmann
    "ready",
60 033a1d00 Michael Hanselmann
    "listen",
61 033a1d00 Michael Hanselmann
    "connect",
62 1a2e7fe9 Michael Hanselmann
    "progress",
63 033a1d00 Michael Hanselmann
    ]
64 033a1d00 Michael Hanselmann
65 033a1d00 Michael Hanselmann
  def __init__(self, connect,
66 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
67 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
68 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
70 033a1d00 Michael Hanselmann
    """Initializes this class.
71 033a1d00 Michael Hanselmann

72 033a1d00 Michael Hanselmann
    @type connect: number
73 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
74 033a1d00 Michael Hanselmann
    @type listen: number
75 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
76 033a1d00 Michael Hanselmann
    @type error: number
77 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
78 033a1d00 Michael Hanselmann
    @type ready: number
79 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
80 1a2e7fe9 Michael Hanselmann
    @type progress: number
81 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
82 033a1d00 Michael Hanselmann

83 033a1d00 Michael Hanselmann
    """
84 033a1d00 Michael Hanselmann
    self.error = error
85 033a1d00 Michael Hanselmann
    self.ready = ready
86 033a1d00 Michael Hanselmann
    self.listen = listen
87 033a1d00 Michael Hanselmann
    self.connect = connect
88 1a2e7fe9 Michael Hanselmann
    self.progress = progress
89 033a1d00 Michael Hanselmann
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
92 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
93 033a1d00 Michael Hanselmann

94 033a1d00 Michael Hanselmann
  """
95 033a1d00 Michael Hanselmann
  def ReportListening(self, ie, private):
96 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
97 033a1d00 Michael Hanselmann

98 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
99 033a1d00 Michael Hanselmann
    @param ie: Import/export object
100 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
101 033a1d00 Michael Hanselmann

102 033a1d00 Michael Hanselmann
    """
103 033a1d00 Michael Hanselmann
104 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
105 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
106 033a1d00 Michael Hanselmann

107 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
108 033a1d00 Michael Hanselmann
    @param ie: Import/export object
109 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
110 033a1d00 Michael Hanselmann

111 033a1d00 Michael Hanselmann
    """
112 033a1d00 Michael Hanselmann
113 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
114 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
115 1a2e7fe9 Michael Hanselmann

116 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
117 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
118 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
119 1a2e7fe9 Michael Hanselmann

120 1a2e7fe9 Michael Hanselmann
    """
121 1a2e7fe9 Michael Hanselmann
122 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
123 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
124 033a1d00 Michael Hanselmann

125 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
126 033a1d00 Michael Hanselmann
    @param ie: Import/export object
127 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
128 033a1d00 Michael Hanselmann

129 033a1d00 Michael Hanselmann
    """
130 033a1d00 Michael Hanselmann
131 033a1d00 Michael Hanselmann
132 033a1d00 Michael Hanselmann
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
133 033a1d00 Michael Hanselmann
  """Checks whether a timeout has expired.
134 033a1d00 Michael Hanselmann

135 033a1d00 Michael Hanselmann
  """
136 033a1d00 Michael Hanselmann
  return _time_fn() > (epoch + timeout)
137 033a1d00 Michael Hanselmann
138 033a1d00 Michael Hanselmann
139 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
140 033a1d00 Michael Hanselmann
  MODE_TEXT = None
141 033a1d00 Michael Hanselmann
142 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
143 033a1d00 Michael Hanselmann
               instance, timeouts, cbs, private=None):
144 033a1d00 Michael Hanselmann
    """Initializes this class.
145 033a1d00 Michael Hanselmann

146 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
147 033a1d00 Michael Hanselmann
    @type node_name: string
148 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
149 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
150 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
151 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
152 033a1d00 Michael Hanselmann
    @param instance: Instance object
153 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
154 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
155 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
156 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
157 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
158 033a1d00 Michael Hanselmann

159 033a1d00 Michael Hanselmann
    """
160 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
161 033a1d00 Michael Hanselmann
162 033a1d00 Michael Hanselmann
    self._lu = lu
163 033a1d00 Michael Hanselmann
    self.node_name = node_name
164 4478301b Michael Hanselmann
    self._opts = opts.Copy()
165 033a1d00 Michael Hanselmann
    self._instance = instance
166 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
167 033a1d00 Michael Hanselmann
    self._cbs = cbs
168 033a1d00 Michael Hanselmann
    self._private = private
169 033a1d00 Michael Hanselmann
170 4478301b Michael Hanselmann
    # Set master daemon's timeout in options for import/export daemon
171 4478301b Michael Hanselmann
    assert self._opts.connect_timeout is None
172 4478301b Michael Hanselmann
    self._opts.connect_timeout = timeouts.connect
173 4478301b Michael Hanselmann
174 033a1d00 Michael Hanselmann
    # Parent loop
175 033a1d00 Michael Hanselmann
    self._loop = None
176 033a1d00 Michael Hanselmann
177 033a1d00 Michael Hanselmann
    # Timestamps
178 033a1d00 Michael Hanselmann
    self._ts_begin = None
179 033a1d00 Michael Hanselmann
    self._ts_connected = None
180 033a1d00 Michael Hanselmann
    self._ts_finished = None
181 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
182 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
183 033a1d00 Michael Hanselmann
    self._ts_last_error = None
184 033a1d00 Michael Hanselmann
185 033a1d00 Michael Hanselmann
    # Transfer status
186 033a1d00 Michael Hanselmann
    self.success = None
187 033a1d00 Michael Hanselmann
    self.final_message = None
188 033a1d00 Michael Hanselmann
189 033a1d00 Michael Hanselmann
    # Daemon status
190 033a1d00 Michael Hanselmann
    self._daemon_name = None
191 033a1d00 Michael Hanselmann
    self._daemon = None
192 033a1d00 Michael Hanselmann
193 033a1d00 Michael Hanselmann
  @property
194 033a1d00 Michael Hanselmann
  def recent_output(self):
195 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
196 033a1d00 Michael Hanselmann

197 033a1d00 Michael Hanselmann
    """
198 033a1d00 Michael Hanselmann
    if self._daemon:
199 c9300bb3 Iustin Pop
      return "\n".join(self._daemon.recent_output)
200 033a1d00 Michael Hanselmann
201 033a1d00 Michael Hanselmann
    return None
202 033a1d00 Michael Hanselmann
203 033a1d00 Michael Hanselmann
  @property
204 1a2e7fe9 Michael Hanselmann
  def progress(self):
205 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
206 1a2e7fe9 Michael Hanselmann

207 1a2e7fe9 Michael Hanselmann
    """
208 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
209 1a2e7fe9 Michael Hanselmann
      return None
210 1a2e7fe9 Michael Hanselmann
211 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
212 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
213 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
214 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
215 1a2e7fe9 Michael Hanselmann
216 1a2e7fe9 Michael Hanselmann
  @property
217 af1d39b1 Michael Hanselmann
  def magic(self):
218 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
219 af1d39b1 Michael Hanselmann

220 af1d39b1 Michael Hanselmann
    """
221 af1d39b1 Michael Hanselmann
    return self._opts.magic
222 af1d39b1 Michael Hanselmann
223 af1d39b1 Michael Hanselmann
  @property
224 033a1d00 Michael Hanselmann
  def active(self):
225 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
226 033a1d00 Michael Hanselmann

227 033a1d00 Michael Hanselmann
    """
228 033a1d00 Michael Hanselmann
    return self.success is None
229 033a1d00 Michael Hanselmann
230 033a1d00 Michael Hanselmann
  @property
231 033a1d00 Michael Hanselmann
  def loop(self):
232 033a1d00 Michael Hanselmann
    """Returns parent loop.
233 033a1d00 Michael Hanselmann

234 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
235 033a1d00 Michael Hanselmann

236 033a1d00 Michael Hanselmann
    """
237 033a1d00 Michael Hanselmann
    return self._loop
238 033a1d00 Michael Hanselmann
239 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
240 033a1d00 Michael Hanselmann
    """Sets the parent loop.
241 033a1d00 Michael Hanselmann

242 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
243 033a1d00 Michael Hanselmann

244 033a1d00 Michael Hanselmann
    """
245 033a1d00 Michael Hanselmann
    if self._loop:
246 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
247 033a1d00 Michael Hanselmann
248 033a1d00 Michael Hanselmann
    self._loop = loop
249 033a1d00 Michael Hanselmann
250 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
251 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
252 033a1d00 Michael Hanselmann

253 033a1d00 Michael Hanselmann
    """
254 033a1d00 Michael Hanselmann
    raise NotImplementedError()
255 033a1d00 Michael Hanselmann
256 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
257 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
258 033a1d00 Michael Hanselmann

259 033a1d00 Michael Hanselmann
    @rtype: string
260 033a1d00 Michael Hanselmann
    @return: Daemon name
261 033a1d00 Michael Hanselmann

262 033a1d00 Michael Hanselmann
    """
263 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
264 033a1d00 Michael Hanselmann
265 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
266 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
267 033a1d00 Michael Hanselmann
268 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
269 033a1d00 Michael Hanselmann
      if result.fail_msg:
270 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
271 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
272 033a1d00 Michael Hanselmann
                                  result.fail_msg))
273 033a1d00 Michael Hanselmann
274 033a1d00 Michael Hanselmann
      daemon_name = result.payload
275 033a1d00 Michael Hanselmann
276 033a1d00 Michael Hanselmann
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
277 033a1d00 Michael Hanselmann
                   self.node_name)
278 033a1d00 Michael Hanselmann
279 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
280 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
281 033a1d00 Michael Hanselmann
282 033a1d00 Michael Hanselmann
    return self._daemon_name
283 033a1d00 Michael Hanselmann
284 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
285 033a1d00 Michael Hanselmann
    """Returns the daemon name.
286 033a1d00 Michael Hanselmann

287 033a1d00 Michael Hanselmann
    """
288 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
289 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
290 033a1d00 Michael Hanselmann
    return self._daemon_name
291 033a1d00 Michael Hanselmann
292 033a1d00 Michael Hanselmann
  def Abort(self):
293 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
294 033a1d00 Michael Hanselmann

295 033a1d00 Michael Hanselmann
    """
296 033a1d00 Michael Hanselmann
    if self._daemon_name:
297 033a1d00 Michael Hanselmann
      self._lu.LogWarning("Aborting %s %r on %s",
298 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
299 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
300 033a1d00 Michael Hanselmann
      if result.fail_msg:
301 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
302 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
303 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
304 033a1d00 Michael Hanselmann
        return False
305 033a1d00 Michael Hanselmann
306 033a1d00 Michael Hanselmann
    return True
307 033a1d00 Michael Hanselmann
308 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
309 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
310 033a1d00 Michael Hanselmann

311 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
312 033a1d00 Michael Hanselmann
    @param data: Daemon status data
313 033a1d00 Michael Hanselmann

314 033a1d00 Michael Hanselmann
    """
315 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
316 033a1d00 Michael Hanselmann
317 033a1d00 Michael Hanselmann
    if not data:
318 033a1d00 Michael Hanselmann
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
319 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
320 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
321 033a1d00 Michael Hanselmann
322 033a1d00 Michael Hanselmann
      return False
323 033a1d00 Michael Hanselmann
324 033a1d00 Michael Hanselmann
    self._daemon = data
325 033a1d00 Michael Hanselmann
326 033a1d00 Michael Hanselmann
    return True
327 033a1d00 Michael Hanselmann
328 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
329 033a1d00 Michael Hanselmann
    """Updates daemon status data.
330 033a1d00 Michael Hanselmann

331 033a1d00 Michael Hanselmann
    @type success: bool
332 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
333 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
334 033a1d00 Michael Hanselmann
    @param data: Daemon status data
335 033a1d00 Michael Hanselmann

336 033a1d00 Michael Hanselmann
    """
337 033a1d00 Michael Hanselmann
    if not success:
338 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
339 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
340 033a1d00 Michael Hanselmann
341 033a1d00 Michael Hanselmann
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
342 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
343 033a1d00 Michael Hanselmann
344 033a1d00 Michael Hanselmann
      return False
345 033a1d00 Michael Hanselmann
346 033a1d00 Michael Hanselmann
    self._ts_last_error = None
347 033a1d00 Michael Hanselmann
348 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
349 033a1d00 Michael Hanselmann
350 033a1d00 Michael Hanselmann
  def CheckListening(self):
351 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
352 033a1d00 Michael Hanselmann

353 033a1d00 Michael Hanselmann
    """
354 033a1d00 Michael Hanselmann
    raise NotImplementedError()
355 033a1d00 Michael Hanselmann
356 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
357 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
358 033a1d00 Michael Hanselmann

359 033a1d00 Michael Hanselmann
    """
360 033a1d00 Michael Hanselmann
    raise NotImplementedError()
361 033a1d00 Michael Hanselmann
362 033a1d00 Michael Hanselmann
  def CheckConnected(self):
363 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
364 033a1d00 Michael Hanselmann

365 033a1d00 Michael Hanselmann
    @rtype: bool
366 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
367 033a1d00 Michael Hanselmann

368 033a1d00 Michael Hanselmann
    """
369 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
370 033a1d00 Michael Hanselmann
371 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
372 033a1d00 Michael Hanselmann
      return True
373 033a1d00 Michael Hanselmann
374 033a1d00 Michael Hanselmann
    if self._daemon.connected:
375 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
376 033a1d00 Michael Hanselmann
377 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
378 033a1d00 Michael Hanselmann
      logging.debug("%s %r on %s is now connected",
379 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
380 033a1d00 Michael Hanselmann
381 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
382 033a1d00 Michael Hanselmann
383 033a1d00 Michael Hanselmann
      return True
384 033a1d00 Michael Hanselmann
385 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
386 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
387 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
388 033a1d00 Michael Hanselmann
389 033a1d00 Michael Hanselmann
    return False
390 033a1d00 Michael Hanselmann
391 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
392 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
393 1a2e7fe9 Michael Hanselmann

394 1a2e7fe9 Michael Hanselmann
    """
395 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
396 1a2e7fe9 Michael Hanselmann
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
397 1a2e7fe9 Michael Hanselmann
        self._daemon and
398 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
399 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
400 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
401 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
402 1a2e7fe9 Michael Hanselmann
403 033a1d00 Michael Hanselmann
  def CheckFinished(self):
404 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
405 033a1d00 Michael Hanselmann

406 033a1d00 Michael Hanselmann
    @rtype: bool
407 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
408 033a1d00 Michael Hanselmann

409 033a1d00 Michael Hanselmann
    """
410 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
411 033a1d00 Michael Hanselmann
412 033a1d00 Michael Hanselmann
    if self._ts_finished:
413 033a1d00 Michael Hanselmann
      return True
414 033a1d00 Michael Hanselmann
415 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
416 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
417 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
418 033a1d00 Michael Hanselmann
      return False
419 033a1d00 Michael Hanselmann
420 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
421 033a1d00 Michael Hanselmann
422 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
423 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
424 033a1d00 Michael Hanselmann
425 033a1d00 Michael Hanselmann
    return True
426 033a1d00 Michael Hanselmann
427 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
428 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
429 033a1d00 Michael Hanselmann

430 033a1d00 Michael Hanselmann
    @type success: bool
431 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
432 033a1d00 Michael Hanselmann
    @type message: string
433 033a1d00 Michael Hanselmann
    @param message: Error message
434 033a1d00 Michael Hanselmann

435 033a1d00 Michael Hanselmann
    """
436 033a1d00 Michael Hanselmann
    assert self.success is None
437 033a1d00 Michael Hanselmann
438 033a1d00 Michael Hanselmann
    self.success = success
439 033a1d00 Michael Hanselmann
    self.final_message = message
440 033a1d00 Michael Hanselmann
441 033a1d00 Michael Hanselmann
    if success:
442 033a1d00 Michael Hanselmann
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
443 033a1d00 Michael Hanselmann
                   self.node_name)
444 033a1d00 Michael Hanselmann
    elif self._daemon_name:
445 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s %r on %s failed: %s",
446 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
447 033a1d00 Michael Hanselmann
                          message)
448 033a1d00 Michael Hanselmann
    else:
449 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 033a1d00 Michael Hanselmann
                          self.node_name, message)
451 033a1d00 Michael Hanselmann
452 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
453 033a1d00 Michael Hanselmann
454 033a1d00 Michael Hanselmann
  def _Finalize(self):
455 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
456 033a1d00 Michael Hanselmann

457 033a1d00 Michael Hanselmann
    """
458 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459 033a1d00 Michael Hanselmann
460 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
461 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
462 033a1d00 Michael Hanselmann

463 033a1d00 Michael Hanselmann
    """
464 033a1d00 Michael Hanselmann
    if self._daemon_name:
465 033a1d00 Michael Hanselmann
      logging.info("Finalizing %s %r on %s",
466 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
467 033a1d00 Michael Hanselmann
468 033a1d00 Michael Hanselmann
      result = self._Finalize()
469 033a1d00 Michael Hanselmann
      if result.fail_msg:
470 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
471 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
472 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
473 033a1d00 Michael Hanselmann
        return False
474 033a1d00 Michael Hanselmann
475 033a1d00 Michael Hanselmann
      # Daemon is no longer running
476 033a1d00 Michael Hanselmann
      self._daemon_name = None
477 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
478 033a1d00 Michael Hanselmann
479 033a1d00 Michael Hanselmann
    if error:
480 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
481 033a1d00 Michael Hanselmann
482 033a1d00 Michael Hanselmann
    return True
483 033a1d00 Michael Hanselmann
484 033a1d00 Michael Hanselmann
485 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
486 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
487 033a1d00 Michael Hanselmann
488 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts, instance,
489 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
490 033a1d00 Michael Hanselmann
    """Initializes this class.
491 033a1d00 Michael Hanselmann

492 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
493 033a1d00 Michael Hanselmann
    @type node_name: string
494 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
495 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
496 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
497 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
498 033a1d00 Michael Hanselmann
    @param instance: Instance object
499 033a1d00 Michael Hanselmann
    @param dest: I/O destination
500 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
501 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
502 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
503 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
504 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
505 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
506 033a1d00 Michael Hanselmann

507 033a1d00 Michael Hanselmann
    """
508 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
509 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
510 033a1d00 Michael Hanselmann
    self._dest = dest
511 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
512 033a1d00 Michael Hanselmann
513 033a1d00 Michael Hanselmann
    # Timestamps
514 033a1d00 Michael Hanselmann
    self._ts_listening = None
515 033a1d00 Michael Hanselmann
516 033a1d00 Michael Hanselmann
  @property
517 033a1d00 Michael Hanselmann
  def listen_port(self):
518 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
519 033a1d00 Michael Hanselmann

520 033a1d00 Michael Hanselmann
    """
521 033a1d00 Michael Hanselmann
    if self._daemon:
522 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
523 033a1d00 Michael Hanselmann
524 033a1d00 Michael Hanselmann
    return None
525 033a1d00 Michael Hanselmann
526 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
527 033a1d00 Michael Hanselmann
    """Starts the import daemon.
528 033a1d00 Michael Hanselmann

529 033a1d00 Michael Hanselmann
    """
530 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
531 eb630f50 Michael Hanselmann
                                          self._instance,
532 033a1d00 Michael Hanselmann
                                          self._dest, self._dest_args)
533 033a1d00 Michael Hanselmann
534 033a1d00 Michael Hanselmann
  def CheckListening(self):
535 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
536 033a1d00 Michael Hanselmann

537 033a1d00 Michael Hanselmann
    @rtype: bool
538 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
539 033a1d00 Michael Hanselmann

540 033a1d00 Michael Hanselmann
    """
541 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
542 033a1d00 Michael Hanselmann
543 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
544 033a1d00 Michael Hanselmann
      return True
545 033a1d00 Michael Hanselmann
546 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
547 033a1d00 Michael Hanselmann
    if port is not None:
548 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
549 033a1d00 Michael Hanselmann
550 033a1d00 Michael Hanselmann
      logging.debug("Import %r on %s is now listening on port %s",
551 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
552 033a1d00 Michael Hanselmann
553 033a1d00 Michael Hanselmann
      self._cbs.ReportListening(self, self._private)
554 033a1d00 Michael Hanselmann
555 033a1d00 Michael Hanselmann
      return True
556 033a1d00 Michael Hanselmann
557 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
558 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
559 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
560 033a1d00 Michael Hanselmann
561 033a1d00 Michael Hanselmann
    return False
562 033a1d00 Michael Hanselmann
563 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
564 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
565 033a1d00 Michael Hanselmann

566 033a1d00 Michael Hanselmann
    """
567 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
568 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
569 033a1d00 Michael Hanselmann
            " once it's been listening")
570 033a1d00 Michael Hanselmann
571 033a1d00 Michael Hanselmann
    return self._ts_listening
572 033a1d00 Michael Hanselmann
573 033a1d00 Michael Hanselmann
574 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
575 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
576 033a1d00 Michael Hanselmann
577 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
578 033a1d00 Michael Hanselmann
               dest_host, dest_port, instance, source, source_args,
579 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
580 033a1d00 Michael Hanselmann
    """Initializes this class.
581 033a1d00 Michael Hanselmann

582 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
583 033a1d00 Michael Hanselmann
    @type node_name: string
584 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
585 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
586 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
587 033a1d00 Michael Hanselmann
    @type dest_host: string
588 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
589 033a1d00 Michael Hanselmann
    @type dest_port: number
590 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
591 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
592 033a1d00 Michael Hanselmann
    @param instance: Instance object
593 033a1d00 Michael Hanselmann
    @param source: I/O source
594 033a1d00 Michael Hanselmann
    @param source_args: I/O source
595 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
596 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
597 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
598 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
599 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
600 033a1d00 Michael Hanselmann

601 033a1d00 Michael Hanselmann
    """
602 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
603 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
604 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
605 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
606 033a1d00 Michael Hanselmann
    self._source = source
607 033a1d00 Michael Hanselmann
    self._source_args = source_args
608 033a1d00 Michael Hanselmann
609 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
610 033a1d00 Michael Hanselmann
    """Starts the export daemon.
611 033a1d00 Michael Hanselmann

612 033a1d00 Michael Hanselmann
    """
613 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
614 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
615 033a1d00 Michael Hanselmann
                                          self._instance, self._source,
616 033a1d00 Michael Hanselmann
                                          self._source_args)
617 033a1d00 Michael Hanselmann
618 033a1d00 Michael Hanselmann
  def CheckListening(self):
619 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
620 033a1d00 Michael Hanselmann

621 033a1d00 Michael Hanselmann
    """
622 033a1d00 Michael Hanselmann
    # Only an import can be listening
623 033a1d00 Michael Hanselmann
    return True
624 033a1d00 Michael Hanselmann
625 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
626 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
627 033a1d00 Michael Hanselmann

628 033a1d00 Michael Hanselmann
    """
629 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
630 033a1d00 Michael Hanselmann
631 033a1d00 Michael Hanselmann
    return self._ts_begin
632 033a1d00 Michael Hanselmann
633 033a1d00 Michael Hanselmann
634 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
635 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
636 1a2e7fe9 Michael Hanselmann

637 1a2e7fe9 Michael Hanselmann
  """
638 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
639 1a2e7fe9 Michael Hanselmann
640 1a2e7fe9 Michael Hanselmann
  parts = [
641 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
642 1a2e7fe9 Michael Hanselmann
643 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
644 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
645 1a2e7fe9 Michael Hanselmann
    ]
646 1a2e7fe9 Michael Hanselmann
647 1a2e7fe9 Michael Hanselmann
  if percent is not None:
648 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
649 1a2e7fe9 Michael Hanselmann
650 e6b8d02d Michael Hanselmann
  if eta is not None:
651 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
652 1a2e7fe9 Michael Hanselmann
653 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
654 1a2e7fe9 Michael Hanselmann
655 1a2e7fe9 Michael Hanselmann
656 033a1d00 Michael Hanselmann
class ImportExportLoop:
657 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
658 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
659 033a1d00 Michael Hanselmann
660 033a1d00 Michael Hanselmann
  def __init__(self, lu):
661 033a1d00 Michael Hanselmann
    """Initializes this class.
662 033a1d00 Michael Hanselmann

663 033a1d00 Michael Hanselmann
    """
664 033a1d00 Michael Hanselmann
    self._lu = lu
665 033a1d00 Michael Hanselmann
    self._queue = []
666 033a1d00 Michael Hanselmann
    self._pending_add = []
667 033a1d00 Michael Hanselmann
668 033a1d00 Michael Hanselmann
  def Add(self, diskie):
669 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
670 033a1d00 Michael Hanselmann

671 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
672 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
673 033a1d00 Michael Hanselmann

674 033a1d00 Michael Hanselmann
    """
675 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
676 033a1d00 Michael Hanselmann
    assert diskie.loop is None
677 033a1d00 Michael Hanselmann
678 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
679 033a1d00 Michael Hanselmann
680 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
681 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
682 033a1d00 Michael Hanselmann
    # iterating over it.
683 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
684 033a1d00 Michael Hanselmann
685 033a1d00 Michael Hanselmann
  @staticmethod
686 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
687 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
688 033a1d00 Michael Hanselmann

689 033a1d00 Michael Hanselmann
    """
690 033a1d00 Michael Hanselmann
    daemon_status = {}
691 033a1d00 Michael Hanselmann
692 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
693 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
694 033a1d00 Michael Hanselmann
      if result.fail_msg:
695 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
696 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
697 033a1d00 Michael Hanselmann
        continue
698 033a1d00 Michael Hanselmann
699 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
700 033a1d00 Michael Hanselmann
701 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
702 033a1d00 Michael Hanselmann
703 033a1d00 Michael Hanselmann
    return daemon_status
704 033a1d00 Michael Hanselmann
705 033a1d00 Michael Hanselmann
  @staticmethod
706 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
707 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
708 033a1d00 Michael Hanselmann

709 033a1d00 Michael Hanselmann
    """
710 033a1d00 Michael Hanselmann
    result = {}
711 033a1d00 Michael Hanselmann
    for diskie in queue:
712 033a1d00 Michael Hanselmann
      if not diskie.active:
713 033a1d00 Michael Hanselmann
        continue
714 033a1d00 Michael Hanselmann
715 033a1d00 Michael Hanselmann
      try:
716 033a1d00 Michael Hanselmann
        # Start daemon if necessary
717 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
718 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
719 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
720 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
721 033a1d00 Michael Hanselmann
        continue
722 033a1d00 Michael Hanselmann
723 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
724 033a1d00 Michael Hanselmann
725 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
726 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
727 033a1d00 Michael Hanselmann
728 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
729 033a1d00 Michael Hanselmann
730 033a1d00 Michael Hanselmann
    return result
731 033a1d00 Michael Hanselmann
732 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
733 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
734 033a1d00 Michael Hanselmann

735 033a1d00 Michael Hanselmann
    """
736 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
737 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
738 033a1d00 Michael Hanselmann
739 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
740 033a1d00 Michael Hanselmann
741 033a1d00 Michael Hanselmann
    del self._pending_add[:]
742 033a1d00 Michael Hanselmann
743 033a1d00 Michael Hanselmann
  def Run(self):
744 033a1d00 Michael Hanselmann
    """Utility main loop.
745 033a1d00 Michael Hanselmann

746 033a1d00 Michael Hanselmann
    """
747 033a1d00 Michael Hanselmann
    while True:
748 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
749 033a1d00 Michael Hanselmann
750 033a1d00 Michael Hanselmann
      # Collect all active daemon names
751 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
752 033a1d00 Michael Hanselmann
      if not daemons:
753 033a1d00 Michael Hanselmann
        break
754 033a1d00 Michael Hanselmann
755 033a1d00 Michael Hanselmann
      # Collection daemon status data
756 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
757 033a1d00 Michael Hanselmann
758 033a1d00 Michael Hanselmann
      # Use data
759 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
760 033a1d00 Michael Hanselmann
      for diskie in self._queue:
761 033a1d00 Michael Hanselmann
        if not diskie.active:
762 033a1d00 Michael Hanselmann
          continue
763 033a1d00 Michael Hanselmann
764 033a1d00 Michael Hanselmann
        try:
765 033a1d00 Michael Hanselmann
          try:
766 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
767 033a1d00 Michael Hanselmann
          except KeyError:
768 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
769 033a1d00 Michael Hanselmann
          else:
770 033a1d00 Michael Hanselmann
            result = \
771 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
772 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
773 033a1d00 Michael Hanselmann
774 033a1d00 Michael Hanselmann
          if not result:
775 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
776 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
777 033a1d00 Michael Hanselmann
            continue
778 033a1d00 Michael Hanselmann
779 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
780 033a1d00 Michael Hanselmann
            # Transfer finished
781 033a1d00 Michael Hanselmann
            diskie.Finalize()
782 033a1d00 Michael Hanselmann
            continue
783 033a1d00 Michael Hanselmann
784 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
785 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
786 033a1d00 Michael Hanselmann
787 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
788 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
789 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
790 033a1d00 Michael Hanselmann
            continue
791 033a1d00 Michael Hanselmann
792 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
793 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
794 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
795 033a1d00 Michael Hanselmann
            continue
796 033a1d00 Michael Hanselmann
797 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
798 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
799 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
800 033a1d00 Michael Hanselmann
801 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
802 033a1d00 Michael Hanselmann
        break
803 033a1d00 Michael Hanselmann
804 033a1d00 Michael Hanselmann
      # Wait a bit
805 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
806 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
807 033a1d00 Michael Hanselmann
      time.sleep(delay)
808 033a1d00 Michael Hanselmann
809 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
810 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
811 033a1d00 Michael Hanselmann

812 033a1d00 Michael Hanselmann
    """
813 033a1d00 Michael Hanselmann
    success = True
814 033a1d00 Michael Hanselmann
815 033a1d00 Michael Hanselmann
    for diskie in self._queue:
816 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
817 033a1d00 Michael Hanselmann
818 033a1d00 Michael Hanselmann
    return success
819 5d97d6dd Michael Hanselmann
820 5d97d6dd Michael Hanselmann
821 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
822 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
823 d51ae04c Michael Hanselmann
               dest_node, dest_ip):
824 5d97d6dd Michael Hanselmann
    """Initializes this class.
825 5d97d6dd Michael Hanselmann

826 5d97d6dd Michael Hanselmann
    """
827 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
828 5d97d6dd Michael Hanselmann
829 5d97d6dd Michael Hanselmann
    self.lu = lu
830 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
831 5d97d6dd Michael Hanselmann
    self.instance = instance
832 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
833 5d97d6dd Michael Hanselmann
    self.src_node = src_node
834 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
835 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
836 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
837 5d97d6dd Michael Hanselmann
838 5d97d6dd Michael Hanselmann
839 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
840 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
841 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
842 5d97d6dd Michael Hanselmann

843 5d97d6dd Michael Hanselmann
    """
844 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
845 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
846 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
847 5d97d6dd Michael Hanselmann
848 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
849 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
850 5d97d6dd Michael Hanselmann
851 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
852 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
853 1a2e7fe9 Michael Hanselmann

854 1a2e7fe9 Michael Hanselmann
    """
855 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
856 1a2e7fe9 Michael Hanselmann
    if not progress:
857 1a2e7fe9 Michael Hanselmann
      return
858 1a2e7fe9 Michael Hanselmann
859 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
860 1a2e7fe9 Michael Hanselmann
861 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
862 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
863 5d97d6dd Michael Hanselmann

864 5d97d6dd Michael Hanselmann
    """
865 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
866 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
867 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
868 5d97d6dd Michael Hanselmann
869 5d97d6dd Michael Hanselmann
    if ie.success:
870 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
871 5d97d6dd Michael Hanselmann
    else:
872 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
873 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
874 5d97d6dd Michael Hanselmann
875 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
876 5d97d6dd Michael Hanselmann
877 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
878 5d97d6dd Michael Hanselmann
    if cb:
879 5d97d6dd Michael Hanselmann
      cb()
880 5d97d6dd Michael Hanselmann
881 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
882 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
883 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
884 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
885 5d97d6dd Michael Hanselmann
886 5d97d6dd Michael Hanselmann
887 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
888 5d97d6dd Michael Hanselmann
  def ReportListening(self, ie, dtp):
889 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
890 5d97d6dd Michael Hanselmann

891 5d97d6dd Michael Hanselmann
    """
892 5d97d6dd Michael Hanselmann
    assert self.src_cbs
893 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
894 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
895 d51ae04c Michael Hanselmann
    assert dtp.export_opts
896 5d97d6dd Michael Hanselmann
897 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
898 5d97d6dd Michael Hanselmann
899 5d97d6dd Michael Hanselmann
    # Start export on source node
900 d51ae04c Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
901 a5310c2a Michael Hanselmann
                    self.dest_ip, ie.listen_port,
902 eb630f50 Michael Hanselmann
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
903 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
904 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
905 5d97d6dd Michael Hanselmann
906 5d97d6dd Michael Hanselmann
    dtp.src_export = de
907 5d97d6dd Michael Hanselmann
908 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
909 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
910 5d97d6dd Michael Hanselmann

911 5d97d6dd Michael Hanselmann
    """
912 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
913 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
914 5d97d6dd Michael Hanselmann
915 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
916 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
917 5d97d6dd Michael Hanselmann

918 5d97d6dd Michael Hanselmann
    """
919 5d97d6dd Michael Hanselmann
    if ie.success:
920 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
921 5d97d6dd Michael Hanselmann
    else:
922 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
923 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
924 5d97d6dd Michael Hanselmann
925 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
926 5d97d6dd Michael Hanselmann
927 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
928 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
929 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
930 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
931 5d97d6dd Michael Hanselmann
932 5d97d6dd Michael Hanselmann
933 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
934 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
935 5d97d6dd Michael Hanselmann
               finished_fn):
936 5d97d6dd Michael Hanselmann
    """Initializes this class.
937 5d97d6dd Michael Hanselmann

938 5d97d6dd Michael Hanselmann
    @type name: string
939 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
940 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
941 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
942 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
943 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
944 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
945 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
946 5d97d6dd Michael Hanselmann

947 5d97d6dd Michael Hanselmann
    """
948 5d97d6dd Michael Hanselmann
    self.name = name
949 5d97d6dd Michael Hanselmann
950 5d97d6dd Michael Hanselmann
    self.src_io = src_io
951 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
952 5d97d6dd Michael Hanselmann
953 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
954 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
955 5d97d6dd Michael Hanselmann
956 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
957 5d97d6dd Michael Hanselmann
958 5d97d6dd Michael Hanselmann
959 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
960 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
961 5d97d6dd Michael Hanselmann
    """Initializes this class.
962 5d97d6dd Michael Hanselmann

963 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
964 5d97d6dd Michael Hanselmann
    @type success: bool
965 5d97d6dd Michael Hanselmann

966 5d97d6dd Michael Hanselmann
    """
967 5d97d6dd Michael Hanselmann
    self.data = data
968 d51ae04c Michael Hanselmann
    self.success = success
969 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
970 5d97d6dd Michael Hanselmann
971 5d97d6dd Michael Hanselmann
    self.src_export = None
972 5d97d6dd Michael Hanselmann
    self.dest_import = None
973 5d97d6dd Michael Hanselmann
974 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
975 5d97d6dd Michael Hanselmann
    """Updates the status.
976 5d97d6dd Michael Hanselmann

977 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
978 5d97d6dd Michael Hanselmann

979 5d97d6dd Michael Hanselmann
    """
980 5d97d6dd Michael Hanselmann
    self.success = self.success and success
981 5d97d6dd Michael Hanselmann
982 5d97d6dd Michael Hanselmann
983 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
984 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
985 d51ae04c Michael Hanselmann

986 d51ae04c Michael Hanselmann
  @type base: string
987 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
988 d51ae04c Michael Hanselmann
  @type instance_name: string
989 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
990 d51ae04c Michael Hanselmann
  @type index: number
991 d51ae04c Michael Hanselmann
  @param index: Disk index
992 d51ae04c Michael Hanselmann

993 d51ae04c Michael Hanselmann
  """
994 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
995 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
996 d51ae04c Michael Hanselmann
  h.update(base)
997 d51ae04c Michael Hanselmann
  h.update(instance_name)
998 d51ae04c Michael Hanselmann
  h.update(str(index))
999 d51ae04c Michael Hanselmann
  return h.hexdigest()
1000 d51ae04c Michael Hanselmann
1001 d51ae04c Michael Hanselmann
1002 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1003 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
1004 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1005 5d97d6dd Michael Hanselmann

1006 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1007 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1008 5d97d6dd Michael Hanselmann
  @type src_node: string
1009 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
1010 5d97d6dd Michael Hanselmann
  @type dest_node: string
1011 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
1012 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1013 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1014 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1015 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1016 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1017 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1018 5d97d6dd Michael Hanselmann
  @rtype: list
1019 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1020 5d97d6dd Michael Hanselmann
           each transfer
1021 5d97d6dd Michael Hanselmann

1022 5d97d6dd Michael Hanselmann
  """
1023 5bb95572 Michael Hanselmann
  # Disable compression for all moves as these are all within the same cluster
1024 5bb95572 Michael Hanselmann
  compress = constants.IEC_NONE
1025 a5310c2a Michael Hanselmann
1026 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1027 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1028 a5310c2a Michael Hanselmann
1029 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1030 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1031 d51ae04c Michael Hanselmann
                                  src_node, None, dest_node, dest_ip)
1032 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1033 d51ae04c Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip)
1034 5d97d6dd Michael Hanselmann
1035 5d97d6dd Michael Hanselmann
  all_dtp = []
1036 5d97d6dd Michael Hanselmann
1037 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1038 d51ae04c Michael Hanselmann
1039 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1040 5d97d6dd Michael Hanselmann
  try:
1041 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1042 5d97d6dd Michael Hanselmann
      if transfer:
1043 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1044 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1045 5d97d6dd Michael Hanselmann
1046 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1047 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1048 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1049 d51ae04c Michael Hanselmann
1050 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1051 5d97d6dd Michael Hanselmann
1052 eb630f50 Michael Hanselmann
        di = DiskImport(lu, dest_node, opts, instance,
1053 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1054 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1055 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1056 5d97d6dd Michael Hanselmann
1057 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1058 5d97d6dd Michael Hanselmann
      else:
1059 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1060 5d97d6dd Michael Hanselmann
1061 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1062 5d97d6dd Michael Hanselmann
1063 5d97d6dd Michael Hanselmann
    ieloop.Run()
1064 5d97d6dd Michael Hanselmann
  finally:
1065 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1066 5d97d6dd Michael Hanselmann
1067 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1068 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1069 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1070 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1071 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1072 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1073 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1074 5d97d6dd Michael Hanselmann
1075 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1076 387794f8 Michael Hanselmann
1077 387794f8 Michael Hanselmann
1078 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1079 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1080 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1081 4a96f1d1 Michael Hanselmann

1082 4a96f1d1 Michael Hanselmann
    """
1083 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1084 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1085 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1086 4a96f1d1 Michael Hanselmann
1087 4a96f1d1 Michael Hanselmann
  @property
1088 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1089 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1090 4a96f1d1 Michael Hanselmann

1091 4a96f1d1 Michael Hanselmann
    """
1092 4a96f1d1 Michael Hanselmann
    return self._dresults
1093 4a96f1d1 Michael Hanselmann
1094 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1095 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1096 4a96f1d1 Michael Hanselmann

1097 4a96f1d1 Michael Hanselmann
    """
1098 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1099 4a96f1d1 Michael Hanselmann
1100 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1101 4a96f1d1 Michael Hanselmann
1102 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1103 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1104 1a2e7fe9 Michael Hanselmann

1105 1a2e7fe9 Michael Hanselmann
    """
1106 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1107 1a2e7fe9 Michael Hanselmann
1108 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1109 1a2e7fe9 Michael Hanselmann
    if not progress:
1110 1a2e7fe9 Michael Hanselmann
      return
1111 1a2e7fe9 Michael Hanselmann
1112 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1113 1a2e7fe9 Michael Hanselmann
1114 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1115 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1116 4a96f1d1 Michael Hanselmann

1117 4a96f1d1 Michael Hanselmann
    """
1118 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1119 4a96f1d1 Michael Hanselmann
1120 4a96f1d1 Michael Hanselmann
    if ie.success:
1121 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1122 4a96f1d1 Michael Hanselmann
    else:
1123 c9300bb3 Iustin Pop
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1124 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1125 4a96f1d1 Michael Hanselmann
1126 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1127 4a96f1d1 Michael Hanselmann
1128 4a96f1d1 Michael Hanselmann
    if finished_fn:
1129 4a96f1d1 Michael Hanselmann
      finished_fn()
1130 4a96f1d1 Michael Hanselmann
1131 4a96f1d1 Michael Hanselmann
1132 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1133 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1134 387794f8 Michael Hanselmann
    """Initializes this class.
1135 387794f8 Michael Hanselmann

1136 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1137 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1138 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1139 387794f8 Michael Hanselmann
    @param instance: Instance object
1140 387794f8 Michael Hanselmann

1141 387794f8 Michael Hanselmann
    """
1142 387794f8 Michael Hanselmann
    self._lu = lu
1143 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1144 387794f8 Michael Hanselmann
    self._instance = instance
1145 387794f8 Michael Hanselmann
1146 387794f8 Michael Hanselmann
    self._snap_disks = []
1147 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1148 387794f8 Michael Hanselmann
1149 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1150 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1151 387794f8 Michael Hanselmann

1152 387794f8 Michael Hanselmann
    """
1153 387794f8 Michael Hanselmann
    assert not self._snap_disks
1154 387794f8 Michael Hanselmann
1155 387794f8 Michael Hanselmann
    instance = self._instance
1156 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1157 387794f8 Michael Hanselmann
1158 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1159 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1160 387794f8 Michael Hanselmann
                        (idx, src_node))
1161 387794f8 Michael Hanselmann
1162 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1163 387794f8 Michael Hanselmann
      # passed
1164 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1165 800ac399 Iustin Pop
      new_dev = False
1166 387794f8 Michael Hanselmann
      msg = result.fail_msg
1167 387794f8 Michael Hanselmann
      if msg:
1168 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1169 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1170 800ac399 Iustin Pop
      elif (not isinstance(result.payload, (tuple, list)) or
1171 800ac399 Iustin Pop
            len(result.payload) != 2):
1172 800ac399 Iustin Pop
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1173 800ac399 Iustin Pop
                            " result '%s'", idx, src_node, result.payload)
1174 387794f8 Michael Hanselmann
      else:
1175 800ac399 Iustin Pop
        disk_id = tuple(result.payload)
1176 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1177 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1178 387794f8 Michael Hanselmann
                               iv_name=disk.iv_name)
1179 387794f8 Michael Hanselmann
1180 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1181 387794f8 Michael Hanselmann
1182 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1183 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1184 387794f8 Michael Hanselmann
1185 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1186 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1187 387794f8 Michael Hanselmann

1188 387794f8 Michael Hanselmann
    @type disk_index: number
1189 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1190 387794f8 Michael Hanselmann

1191 387794f8 Michael Hanselmann
    """
1192 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1193 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1194 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1195 387794f8 Michael Hanselmann
1196 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1197 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1198 387794f8 Michael Hanselmann
1199 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1200 387794f8 Michael Hanselmann
      if result.fail_msg:
1201 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1202 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1203 387794f8 Michael Hanselmann
      else:
1204 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1205 387794f8 Michael Hanselmann
1206 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1207 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1208 387794f8 Michael Hanselmann

1209 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1210 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1211 387794f8 Michael Hanselmann

1212 387794f8 Michael Hanselmann
    """
1213 387794f8 Michael Hanselmann
    instance = self._instance
1214 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1215 387794f8 Michael Hanselmann
1216 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1217 387794f8 Michael Hanselmann
1218 387794f8 Michael Hanselmann
    transfers = []
1219 387794f8 Michael Hanselmann
1220 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1221 387794f8 Michael Hanselmann
      if not dev:
1222 387794f8 Michael Hanselmann
        transfers.append(None)
1223 387794f8 Michael Hanselmann
        continue
1224 387794f8 Michael Hanselmann
1225 387794f8 Michael Hanselmann
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1226 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1227 387794f8 Michael Hanselmann
1228 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1229 387794f8 Michael Hanselmann
1230 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1231 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1232 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1233 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1234 387794f8 Michael Hanselmann
                        finished_fn)
1235 387794f8 Michael Hanselmann
      transfers.append(dt)
1236 387794f8 Michael Hanselmann
1237 387794f8 Michael Hanselmann
    # Actually export data
1238 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1239 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1240 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1241 387794f8 Michael Hanselmann
                                    instance, transfers)
1242 387794f8 Michael Hanselmann
1243 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1244 387794f8 Michael Hanselmann
1245 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1246 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1247 387794f8 Michael Hanselmann
                                               self._snap_disks)
1248 387794f8 Michael Hanselmann
    msg = result.fail_msg
1249 387794f8 Michael Hanselmann
    fin_resu = not msg
1250 387794f8 Michael Hanselmann
    if msg:
1251 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1252 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1253 387794f8 Michael Hanselmann
1254 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1255 387794f8 Michael Hanselmann
1256 d51ae04c Michael Hanselmann
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1257 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1258 4a96f1d1 Michael Hanselmann

1259 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1260 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1261 d51ae04c Michael Hanselmann
    @type key_name: string
1262 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1263 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1264 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1265 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1266 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1267 4a96f1d1 Michael Hanselmann

1268 4a96f1d1 Michael Hanselmann
    """
1269 4a96f1d1 Michael Hanselmann
    instance = self._instance
1270 4a96f1d1 Michael Hanselmann
1271 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1272 4a96f1d1 Michael Hanselmann
1273 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1274 4a96f1d1 Michael Hanselmann
1275 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1276 4a96f1d1 Michael Hanselmann
    try:
1277 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1278 d51ae04c Michael Hanselmann
                                                           disk_info)):
1279 ba5619c2 Michael Hanselmann
        # Decide whether to use IPv6
1280 ba5619c2 Michael Hanselmann
        ipv6 = netutils.IP6Address.IsValid(host)
1281 ba5619c2 Michael Hanselmann
1282 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1283 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1284 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1285 d51ae04c Michael Hanselmann
1286 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1287 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1288 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1289 eb630f50 Michael Hanselmann
                              opts, host, port, instance,
1290 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1291 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1292 4a96f1d1 Michael Hanselmann
1293 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1294 4a96f1d1 Michael Hanselmann
    finally:
1295 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1296 4a96f1d1 Michael Hanselmann
1297 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1298 4a96f1d1 Michael Hanselmann
1299 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1300 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1301 387794f8 Michael Hanselmann

1302 387794f8 Michael Hanselmann
    @type idx: number
1303 387794f8 Michael Hanselmann
    @param idx: Disk index
1304 387794f8 Michael Hanselmann

1305 387794f8 Michael Hanselmann
    """
1306 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1307 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1308 387794f8 Michael Hanselmann
1309 387794f8 Michael Hanselmann
  def Cleanup(self):
1310 387794f8 Michael Hanselmann
    """Remove all snapshots.
1311 387794f8 Michael Hanselmann

1312 387794f8 Michael Hanselmann
    """
1313 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1314 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1315 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1316 1410fa8d Michael Hanselmann
1317 1410fa8d Michael Hanselmann
1318 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1319 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1320 9bf56d77 Michael Hanselmann
               external_address):
1321 9bf56d77 Michael Hanselmann
    """Initializes this class.
1322 9bf56d77 Michael Hanselmann

1323 9bf56d77 Michael Hanselmann
    @type cds: string
1324 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1325 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1326 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1327 9bf56d77 Michael Hanselmann
    @type disk_count: number
1328 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1329 9bf56d77 Michael Hanselmann
    @type external_address: string
1330 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1331 9bf56d77 Michael Hanselmann

1332 9bf56d77 Michael Hanselmann
    """
1333 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1334 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1335 9bf56d77 Michael Hanselmann
    self._cds = cds
1336 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1337 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1338 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1339 9bf56d77 Michael Hanselmann
1340 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1341 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1342 9bf56d77 Michael Hanselmann
1343 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1344 9bf56d77 Michael Hanselmann
1345 9bf56d77 Michael Hanselmann
  @property
1346 9bf56d77 Michael Hanselmann
  def disk_results(self):
1347 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1348 9bf56d77 Michael Hanselmann

1349 9bf56d77 Michael Hanselmann
    """
1350 9bf56d77 Michael Hanselmann
    return self._dresults
1351 9bf56d77 Michael Hanselmann
1352 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1353 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1354 9bf56d77 Michael Hanselmann

1355 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1356 9bf56d77 Michael Hanselmann

1357 9bf56d77 Michael Hanselmann
    """
1358 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1359 9bf56d77 Michael Hanselmann
      return
1360 9bf56d77 Michael Hanselmann
1361 9bf56d77 Michael Hanselmann
    host = self._external_address
1362 9bf56d77 Michael Hanselmann
1363 9bf56d77 Michael Hanselmann
    disks = []
1364 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1365 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1366 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1367 9bf56d77 Michael Hanselmann
1368 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1369 9bf56d77 Michael Hanselmann
1370 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1371 9bf56d77 Michael Hanselmann
      "disks": disks,
1372 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1373 9bf56d77 Michael Hanselmann
      })
1374 9bf56d77 Michael Hanselmann
1375 9bf56d77 Michael Hanselmann
  def ReportListening(self, ie, private):
1376 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1377 9bf56d77 Michael Hanselmann

1378 9bf56d77 Michael Hanselmann
    """
1379 9bf56d77 Michael Hanselmann
    (idx, ) = private
1380 9bf56d77 Michael Hanselmann
1381 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1382 9bf56d77 Michael Hanselmann
1383 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1384 9bf56d77 Michael Hanselmann
1385 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1386 9bf56d77 Michael Hanselmann
1387 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1388 9bf56d77 Michael Hanselmann
1389 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1390 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1391 9bf56d77 Michael Hanselmann

1392 9bf56d77 Michael Hanselmann
    """
1393 9bf56d77 Michael Hanselmann
    (idx, ) = private
1394 9bf56d77 Michael Hanselmann
1395 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1396 9bf56d77 Michael Hanselmann
1397 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1398 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1399 9bf56d77 Michael Hanselmann

1400 9bf56d77 Michael Hanselmann
    """
1401 9bf56d77 Michael Hanselmann
    (idx, ) = private
1402 9bf56d77 Michael Hanselmann
1403 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1404 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1405 9bf56d77 Michael Hanselmann
1406 9bf56d77 Michael Hanselmann
    if ie.success:
1407 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1408 9bf56d77 Michael Hanselmann
    else:
1409 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1410 c9300bb3 Iustin Pop
                         " (recent output: %s)") %
1411 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1412 9bf56d77 Michael Hanselmann
1413 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1414 9bf56d77 Michael Hanselmann
1415 9bf56d77 Michael Hanselmann
1416 ba5619c2 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1417 ba5619c2 Michael Hanselmann
                 cds, timeouts):
1418 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1419 9bf56d77 Michael Hanselmann

1420 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1421 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1422 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1423 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1424 ba5619c2 Michael Hanselmann
  @type pnode: L{objects.Node}
1425 ba5619c2 Michael Hanselmann
  @param pnode: Primary node of instance as an object
1426 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1427 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1428 9bf56d77 Michael Hanselmann
  @type cds: string
1429 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1430 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1431 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1432 9bf56d77 Michael Hanselmann

1433 9bf56d77 Michael Hanselmann
  """
1434 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1435 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1436 9bf56d77 Michael Hanselmann
1437 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1438 d51ae04c Michael Hanselmann
1439 ba5619c2 Michael Hanselmann
  # Decide whether to use IPv6
1440 ba5619c2 Michael Hanselmann
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1441 ba5619c2 Michael Hanselmann
1442 9bf56d77 Michael Hanselmann
  # Create crypto key
1443 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1444 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1445 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1446 9bf56d77 Michael Hanselmann
1447 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1448 9bf56d77 Michael Hanselmann
  try:
1449 9bf56d77 Michael Hanselmann
    # Load certificate
1450 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1451 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1452 9bf56d77 Michael Hanselmann
1453 9bf56d77 Michael Hanselmann
    # Sign certificate
1454 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1455 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1456 9bf56d77 Michael Hanselmann
1457 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1458 ba5619c2 Michael Hanselmann
                          len(instance.disks), pnode.primary_ip)
1459 9bf56d77 Michael Hanselmann
1460 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1461 9bf56d77 Michael Hanselmann
    try:
1462 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1463 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1464 d51ae04c Michael Hanselmann
1465 d51ae04c Michael Hanselmann
        # Import daemon options
1466 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1467 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1468 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1469 d51ae04c Michael Hanselmann
1470 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1471 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1472 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1473 9bf56d77 Michael Hanselmann
1474 9bf56d77 Michael Hanselmann
      ieloop.Run()
1475 9bf56d77 Michael Hanselmann
    finally:
1476 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1477 9bf56d77 Michael Hanselmann
  finally:
1478 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1479 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1480 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1481 9bf56d77 Michael Hanselmann
1482 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1483 9bf56d77 Michael Hanselmann
1484 9bf56d77 Michael Hanselmann
1485 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1486 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1487 1410fa8d Michael Hanselmann

1488 1410fa8d Michael Hanselmann
  @type version: number
1489 1410fa8d Michael Hanselmann

1490 1410fa8d Michael Hanselmann
  """
1491 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1492 1410fa8d Michael Hanselmann
1493 1410fa8d Michael Hanselmann
1494 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1495 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1496 1410fa8d Michael Hanselmann

1497 1410fa8d Michael Hanselmann
  @type cds: string
1498 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1499 1410fa8d Michael Hanselmann

1500 1410fa8d Michael Hanselmann
  """
1501 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1502 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1503 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1504 1410fa8d Michael Hanselmann
1505 1410fa8d Michael Hanselmann
1506 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1507 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1508 1410fa8d Michael Hanselmann

1509 1410fa8d Michael Hanselmann
  @type cds: string
1510 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1511 1410fa8d Michael Hanselmann
  @type handshake: sequence
1512 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1513 1410fa8d Michael Hanselmann

1514 1410fa8d Michael Hanselmann
  """
1515 1410fa8d Michael Hanselmann
  try:
1516 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1517 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1518 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1519 1410fa8d Michael Hanselmann
1520 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1521 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1522 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1523 1410fa8d Michael Hanselmann
1524 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1525 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1526 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1527 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1528 1410fa8d Michael Hanselmann
1529 1410fa8d Michael Hanselmann
  return None
1530 4a96f1d1 Michael Hanselmann
1531 4a96f1d1 Michael Hanselmann
1532 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1533 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1534 4a96f1d1 Michael Hanselmann

1535 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1536 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1537 4a96f1d1 Michael Hanselmann
  @type host: string
1538 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1539 4a96f1d1 Michael Hanselmann
  @type port: number
1540 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1541 d51ae04c Michael Hanselmann
  @type magic: string
1542 d51ae04c Michael Hanselmann
  @param magic: Magic value
1543 4a96f1d1 Michael Hanselmann

1544 4a96f1d1 Michael Hanselmann
  """
1545 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1546 4a96f1d1 Michael Hanselmann
1547 4a96f1d1 Michael Hanselmann
1548 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1549 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1550 4a96f1d1 Michael Hanselmann

1551 4a96f1d1 Michael Hanselmann
  @type cds: string
1552 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1553 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1554 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1555 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1556 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1557 4a96f1d1 Michael Hanselmann

1558 4a96f1d1 Michael Hanselmann
  """
1559 4a96f1d1 Michael Hanselmann
  try:
1560 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1561 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1562 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1563 4a96f1d1 Michael Hanselmann
1564 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1565 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1566 4a96f1d1 Michael Hanselmann
1567 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1568 4a96f1d1 Michael Hanselmann
1569 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1570 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1571 4a96f1d1 Michael Hanselmann
1572 ba5619c2 Michael Hanselmann
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1573 ba5619c2 Michael Hanselmann
    destination = host
1574 ba5619c2 Michael Hanselmann
  else:
1575 ba5619c2 Michael Hanselmann
    destination = netutils.Hostname.GetNormalizedName(host)
1576 ba5619c2 Michael Hanselmann
1577 ba5619c2 Michael Hanselmann
  return (destination,
1578 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1579 d51ae04c Michael Hanselmann
          magic)
1580 4a96f1d1 Michael Hanselmann
1581 4a96f1d1 Michael Hanselmann
1582 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1583 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1584 4a96f1d1 Michael Hanselmann

1585 4a96f1d1 Michael Hanselmann
  @type cds: string
1586 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1587 4a96f1d1 Michael Hanselmann
  @type salt: string
1588 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1589 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1590 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1591 4a96f1d1 Michael Hanselmann
  @type host: string
1592 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1593 4a96f1d1 Michael Hanselmann
  @type port: number
1594 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1595 d51ae04c Michael Hanselmann
  @type magic: string
1596 d51ae04c Michael Hanselmann
  @param magic: Magic value
1597 4a96f1d1 Michael Hanselmann

1598 4a96f1d1 Michael Hanselmann
  """
1599 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1600 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1601 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)