Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ acd65a16

History | View | Annotate | Download (41.9 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 033a1d00 Michael Hanselmann
# Copyright (C) 2010 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 033a1d00 Michael Hanselmann
36 033a1d00 Michael Hanselmann
37 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
38 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
39 033a1d00 Michael Hanselmann

40 033a1d00 Michael Hanselmann
  """
41 033a1d00 Michael Hanselmann
42 033a1d00 Michael Hanselmann
43 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
44 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
45 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
46 033a1d00 Michael Hanselmann
47 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
48 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
49 033a1d00 Michael Hanselmann
50 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
51 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
52 033a1d00 Michael Hanselmann
53 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
54 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
55 1a2e7fe9 Michael Hanselmann
56 033a1d00 Michael Hanselmann
  __slots__ = [
57 033a1d00 Michael Hanselmann
    "error",
58 033a1d00 Michael Hanselmann
    "ready",
59 033a1d00 Michael Hanselmann
    "listen",
60 033a1d00 Michael Hanselmann
    "connect",
61 1a2e7fe9 Michael Hanselmann
    "progress",
62 033a1d00 Michael Hanselmann
    ]
63 033a1d00 Michael Hanselmann
64 033a1d00 Michael Hanselmann
  def __init__(self, connect,
65 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
66 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
67 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
68 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
69 033a1d00 Michael Hanselmann
    """Initializes this class.
70 033a1d00 Michael Hanselmann

71 033a1d00 Michael Hanselmann
    @type connect: number
72 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
73 033a1d00 Michael Hanselmann
    @type listen: number
74 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
75 033a1d00 Michael Hanselmann
    @type error: number
76 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
77 033a1d00 Michael Hanselmann
    @type ready: number
78 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
79 1a2e7fe9 Michael Hanselmann
    @type progress: number
80 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
81 033a1d00 Michael Hanselmann

82 033a1d00 Michael Hanselmann
    """
83 033a1d00 Michael Hanselmann
    self.error = error
84 033a1d00 Michael Hanselmann
    self.ready = ready
85 033a1d00 Michael Hanselmann
    self.listen = listen
86 033a1d00 Michael Hanselmann
    self.connect = connect
87 1a2e7fe9 Michael Hanselmann
    self.progress = progress
88 033a1d00 Michael Hanselmann
89 033a1d00 Michael Hanselmann
90 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
91 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
92 033a1d00 Michael Hanselmann

93 033a1d00 Michael Hanselmann
  """
94 033a1d00 Michael Hanselmann
  def ReportListening(self, ie, private):
95 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
96 033a1d00 Michael Hanselmann

97 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
98 033a1d00 Michael Hanselmann
    @param ie: Import/export object
99 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
100 033a1d00 Michael Hanselmann

101 033a1d00 Michael Hanselmann
    """
102 033a1d00 Michael Hanselmann
103 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
104 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
105 033a1d00 Michael Hanselmann

106 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
107 033a1d00 Michael Hanselmann
    @param ie: Import/export object
108 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
109 033a1d00 Michael Hanselmann

110 033a1d00 Michael Hanselmann
    """
111 033a1d00 Michael Hanselmann
112 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
113 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
114 1a2e7fe9 Michael Hanselmann

115 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
116 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
117 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
118 1a2e7fe9 Michael Hanselmann

119 1a2e7fe9 Michael Hanselmann
    """
120 1a2e7fe9 Michael Hanselmann
121 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
122 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
123 033a1d00 Michael Hanselmann

124 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
125 033a1d00 Michael Hanselmann
    @param ie: Import/export object
126 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
127 033a1d00 Michael Hanselmann

128 033a1d00 Michael Hanselmann
    """
129 033a1d00 Michael Hanselmann
130 033a1d00 Michael Hanselmann
131 033a1d00 Michael Hanselmann
def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
132 033a1d00 Michael Hanselmann
  """Checks whether a timeout has expired.
133 033a1d00 Michael Hanselmann

134 033a1d00 Michael Hanselmann
  """
135 033a1d00 Michael Hanselmann
  return _time_fn() > (epoch + timeout)
136 033a1d00 Michael Hanselmann
137 033a1d00 Michael Hanselmann
138 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
139 033a1d00 Michael Hanselmann
  MODE_TEXT = None
140 033a1d00 Michael Hanselmann
141 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
142 033a1d00 Michael Hanselmann
               instance, timeouts, cbs, private=None):
143 033a1d00 Michael Hanselmann
    """Initializes this class.
144 033a1d00 Michael Hanselmann

145 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
146 033a1d00 Michael Hanselmann
    @type node_name: string
147 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
148 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
149 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
150 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
151 033a1d00 Michael Hanselmann
    @param instance: Instance object
152 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
153 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
154 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
155 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
156 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
157 033a1d00 Michael Hanselmann

158 033a1d00 Michael Hanselmann
    """
159 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
160 033a1d00 Michael Hanselmann
161 033a1d00 Michael Hanselmann
    self._lu = lu
162 033a1d00 Michael Hanselmann
    self.node_name = node_name
163 eb630f50 Michael Hanselmann
    self._opts = opts
164 033a1d00 Michael Hanselmann
    self._instance = instance
165 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
166 033a1d00 Michael Hanselmann
    self._cbs = cbs
167 033a1d00 Michael Hanselmann
    self._private = private
168 033a1d00 Michael Hanselmann
169 033a1d00 Michael Hanselmann
    # Parent loop
170 033a1d00 Michael Hanselmann
    self._loop = None
171 033a1d00 Michael Hanselmann
172 033a1d00 Michael Hanselmann
    # Timestamps
173 033a1d00 Michael Hanselmann
    self._ts_begin = None
174 033a1d00 Michael Hanselmann
    self._ts_connected = None
175 033a1d00 Michael Hanselmann
    self._ts_finished = None
176 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
177 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
178 033a1d00 Michael Hanselmann
    self._ts_last_error = None
179 033a1d00 Michael Hanselmann
180 033a1d00 Michael Hanselmann
    # Transfer status
181 033a1d00 Michael Hanselmann
    self.success = None
182 033a1d00 Michael Hanselmann
    self.final_message = None
183 033a1d00 Michael Hanselmann
184 033a1d00 Michael Hanselmann
    # Daemon status
185 033a1d00 Michael Hanselmann
    self._daemon_name = None
186 033a1d00 Michael Hanselmann
    self._daemon = None
187 033a1d00 Michael Hanselmann
188 033a1d00 Michael Hanselmann
  @property
189 033a1d00 Michael Hanselmann
  def recent_output(self):
190 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
191 033a1d00 Michael Hanselmann

192 033a1d00 Michael Hanselmann
    """
193 033a1d00 Michael Hanselmann
    if self._daemon:
194 033a1d00 Michael Hanselmann
      return self._daemon.recent_output
195 033a1d00 Michael Hanselmann
196 033a1d00 Michael Hanselmann
    return None
197 033a1d00 Michael Hanselmann
198 033a1d00 Michael Hanselmann
  @property
199 1a2e7fe9 Michael Hanselmann
  def progress(self):
200 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
201 1a2e7fe9 Michael Hanselmann

202 1a2e7fe9 Michael Hanselmann
    """
203 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
204 1a2e7fe9 Michael Hanselmann
      return None
205 1a2e7fe9 Michael Hanselmann
206 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
207 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
208 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
209 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
210 1a2e7fe9 Michael Hanselmann
211 1a2e7fe9 Michael Hanselmann
  @property
212 033a1d00 Michael Hanselmann
  def active(self):
213 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
214 033a1d00 Michael Hanselmann

215 033a1d00 Michael Hanselmann
    """
216 033a1d00 Michael Hanselmann
    return self.success is None
217 033a1d00 Michael Hanselmann
218 033a1d00 Michael Hanselmann
  @property
219 033a1d00 Michael Hanselmann
  def loop(self):
220 033a1d00 Michael Hanselmann
    """Returns parent loop.
221 033a1d00 Michael Hanselmann

222 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
223 033a1d00 Michael Hanselmann

224 033a1d00 Michael Hanselmann
    """
225 033a1d00 Michael Hanselmann
    return self._loop
226 033a1d00 Michael Hanselmann
227 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
228 033a1d00 Michael Hanselmann
    """Sets the parent loop.
229 033a1d00 Michael Hanselmann

230 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
231 033a1d00 Michael Hanselmann

232 033a1d00 Michael Hanselmann
    """
233 033a1d00 Michael Hanselmann
    if self._loop:
234 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
235 033a1d00 Michael Hanselmann
236 033a1d00 Michael Hanselmann
    self._loop = loop
237 033a1d00 Michael Hanselmann
238 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
239 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
240 033a1d00 Michael Hanselmann

241 033a1d00 Michael Hanselmann
    """
242 033a1d00 Michael Hanselmann
    raise NotImplementedError()
243 033a1d00 Michael Hanselmann
244 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
245 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
246 033a1d00 Michael Hanselmann

247 033a1d00 Michael Hanselmann
    @rtype: string
248 033a1d00 Michael Hanselmann
    @return: Daemon name
249 033a1d00 Michael Hanselmann

250 033a1d00 Michael Hanselmann
    """
251 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
252 033a1d00 Michael Hanselmann
253 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
254 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
255 033a1d00 Michael Hanselmann
256 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
257 033a1d00 Michael Hanselmann
      if result.fail_msg:
258 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
259 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
260 033a1d00 Michael Hanselmann
                                  result.fail_msg))
261 033a1d00 Michael Hanselmann
262 033a1d00 Michael Hanselmann
      daemon_name = result.payload
263 033a1d00 Michael Hanselmann
264 033a1d00 Michael Hanselmann
      logging.info("Started %s %r on %s", self.MODE_TEXT, daemon_name,
265 033a1d00 Michael Hanselmann
                   self.node_name)
266 033a1d00 Michael Hanselmann
267 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
268 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
269 033a1d00 Michael Hanselmann
270 033a1d00 Michael Hanselmann
    return self._daemon_name
271 033a1d00 Michael Hanselmann
272 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
273 033a1d00 Michael Hanselmann
    """Returns the daemon name.
274 033a1d00 Michael Hanselmann

275 033a1d00 Michael Hanselmann
    """
276 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
277 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
278 033a1d00 Michael Hanselmann
    return self._daemon_name
279 033a1d00 Michael Hanselmann
280 033a1d00 Michael Hanselmann
  def Abort(self):
281 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
282 033a1d00 Michael Hanselmann

283 033a1d00 Michael Hanselmann
    """
284 033a1d00 Michael Hanselmann
    if self._daemon_name:
285 033a1d00 Michael Hanselmann
      self._lu.LogWarning("Aborting %s %r on %s",
286 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
287 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
288 033a1d00 Michael Hanselmann
      if result.fail_msg:
289 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to abort %s %r on %s: %s",
290 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
291 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
292 033a1d00 Michael Hanselmann
        return False
293 033a1d00 Michael Hanselmann
294 033a1d00 Michael Hanselmann
    return True
295 033a1d00 Michael Hanselmann
296 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
297 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
298 033a1d00 Michael Hanselmann

299 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
300 033a1d00 Michael Hanselmann
    @param data: Daemon status data
301 033a1d00 Michael Hanselmann

302 033a1d00 Michael Hanselmann
    """
303 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
304 033a1d00 Michael Hanselmann
305 033a1d00 Michael Hanselmann
    if not data:
306 033a1d00 Michael Hanselmann
      if _TimeoutExpired(self._ts_begin, self._timeouts.ready):
307 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
308 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
309 033a1d00 Michael Hanselmann
310 033a1d00 Michael Hanselmann
      return False
311 033a1d00 Michael Hanselmann
312 033a1d00 Michael Hanselmann
    self._daemon = data
313 033a1d00 Michael Hanselmann
314 033a1d00 Michael Hanselmann
    return True
315 033a1d00 Michael Hanselmann
316 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
317 033a1d00 Michael Hanselmann
    """Updates daemon status data.
318 033a1d00 Michael Hanselmann

319 033a1d00 Michael Hanselmann
    @type success: bool
320 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
321 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
322 033a1d00 Michael Hanselmann
    @param data: Daemon status data
323 033a1d00 Michael Hanselmann

324 033a1d00 Michael Hanselmann
    """
325 033a1d00 Michael Hanselmann
    if not success:
326 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
327 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
328 033a1d00 Michael Hanselmann
329 033a1d00 Michael Hanselmann
      elif _TimeoutExpired(self._ts_last_error, self._timeouts.error):
330 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
331 033a1d00 Michael Hanselmann
332 033a1d00 Michael Hanselmann
      return False
333 033a1d00 Michael Hanselmann
334 033a1d00 Michael Hanselmann
    self._ts_last_error = None
335 033a1d00 Michael Hanselmann
336 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
337 033a1d00 Michael Hanselmann
338 033a1d00 Michael Hanselmann
  def CheckListening(self):
339 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
340 033a1d00 Michael Hanselmann

341 033a1d00 Michael Hanselmann
    """
342 033a1d00 Michael Hanselmann
    raise NotImplementedError()
343 033a1d00 Michael Hanselmann
344 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
345 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
346 033a1d00 Michael Hanselmann

347 033a1d00 Michael Hanselmann
    """
348 033a1d00 Michael Hanselmann
    raise NotImplementedError()
349 033a1d00 Michael Hanselmann
350 033a1d00 Michael Hanselmann
  def CheckConnected(self):
351 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
352 033a1d00 Michael Hanselmann

353 033a1d00 Michael Hanselmann
    @rtype: bool
354 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
355 033a1d00 Michael Hanselmann

356 033a1d00 Michael Hanselmann
    """
357 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
358 033a1d00 Michael Hanselmann
359 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
360 033a1d00 Michael Hanselmann
      return True
361 033a1d00 Michael Hanselmann
362 033a1d00 Michael Hanselmann
    if self._daemon.connected:
363 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
364 033a1d00 Michael Hanselmann
365 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
366 033a1d00 Michael Hanselmann
      logging.debug("%s %r on %s is now connected",
367 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
368 033a1d00 Michael Hanselmann
369 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
370 033a1d00 Michael Hanselmann
371 033a1d00 Michael Hanselmann
      return True
372 033a1d00 Michael Hanselmann
373 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._GetConnectedCheckEpoch(), self._timeouts.connect):
374 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
375 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
376 033a1d00 Michael Hanselmann
377 033a1d00 Michael Hanselmann
    return False
378 033a1d00 Michael Hanselmann
379 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
380 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
381 1a2e7fe9 Michael Hanselmann

382 1a2e7fe9 Michael Hanselmann
    """
383 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
384 1a2e7fe9 Michael Hanselmann
         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
385 1a2e7fe9 Michael Hanselmann
        self._daemon and
386 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
387 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
388 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
389 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
390 1a2e7fe9 Michael Hanselmann
391 033a1d00 Michael Hanselmann
  def CheckFinished(self):
392 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
393 033a1d00 Michael Hanselmann

394 033a1d00 Michael Hanselmann
    @rtype: bool
395 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
396 033a1d00 Michael Hanselmann

397 033a1d00 Michael Hanselmann
    """
398 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
399 033a1d00 Michael Hanselmann
400 033a1d00 Michael Hanselmann
    if self._ts_finished:
401 033a1d00 Michael Hanselmann
      return True
402 033a1d00 Michael Hanselmann
403 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
404 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
405 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
406 033a1d00 Michael Hanselmann
      return False
407 033a1d00 Michael Hanselmann
408 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
409 033a1d00 Michael Hanselmann
410 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
411 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
412 033a1d00 Michael Hanselmann
413 033a1d00 Michael Hanselmann
    return True
414 033a1d00 Michael Hanselmann
415 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
416 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
417 033a1d00 Michael Hanselmann

418 033a1d00 Michael Hanselmann
    @type success: bool
419 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
420 033a1d00 Michael Hanselmann
    @type message: string
421 033a1d00 Michael Hanselmann
    @param message: Error message
422 033a1d00 Michael Hanselmann

423 033a1d00 Michael Hanselmann
    """
424 033a1d00 Michael Hanselmann
    assert self.success is None
425 033a1d00 Michael Hanselmann
426 033a1d00 Michael Hanselmann
    self.success = success
427 033a1d00 Michael Hanselmann
    self.final_message = message
428 033a1d00 Michael Hanselmann
429 033a1d00 Michael Hanselmann
    if success:
430 033a1d00 Michael Hanselmann
      logging.info("%s %r on %s succeeded", self.MODE_TEXT, self._daemon_name,
431 033a1d00 Michael Hanselmann
                   self.node_name)
432 033a1d00 Michael Hanselmann
    elif self._daemon_name:
433 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s %r on %s failed: %s",
434 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
435 033a1d00 Michael Hanselmann
                          message)
436 033a1d00 Michael Hanselmann
    else:
437 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
438 033a1d00 Michael Hanselmann
                          self.node_name, message)
439 033a1d00 Michael Hanselmann
440 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
441 033a1d00 Michael Hanselmann
442 033a1d00 Michael Hanselmann
  def _Finalize(self):
443 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
444 033a1d00 Michael Hanselmann

445 033a1d00 Michael Hanselmann
    """
446 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
447 033a1d00 Michael Hanselmann
448 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
449 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
450 033a1d00 Michael Hanselmann

451 033a1d00 Michael Hanselmann
    """
452 033a1d00 Michael Hanselmann
    if self._daemon_name:
453 033a1d00 Michael Hanselmann
      logging.info("Finalizing %s %r on %s",
454 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
455 033a1d00 Michael Hanselmann
456 033a1d00 Michael Hanselmann
      result = self._Finalize()
457 033a1d00 Michael Hanselmann
      if result.fail_msg:
458 033a1d00 Michael Hanselmann
        self._lu.LogWarning("Failed to finalize %s %r on %s: %s",
459 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
460 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
461 033a1d00 Michael Hanselmann
        return False
462 033a1d00 Michael Hanselmann
463 033a1d00 Michael Hanselmann
      # Daemon is no longer running
464 033a1d00 Michael Hanselmann
      self._daemon_name = None
465 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
466 033a1d00 Michael Hanselmann
467 033a1d00 Michael Hanselmann
    if error:
468 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
469 033a1d00 Michael Hanselmann
470 033a1d00 Michael Hanselmann
    return True
471 033a1d00 Michael Hanselmann
472 033a1d00 Michael Hanselmann
473 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
474 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
475 033a1d00 Michael Hanselmann
476 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts, instance,
477 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
478 033a1d00 Michael Hanselmann
    """Initializes this class.
479 033a1d00 Michael Hanselmann

480 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
481 033a1d00 Michael Hanselmann
    @type node_name: string
482 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
483 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
484 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
485 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
486 033a1d00 Michael Hanselmann
    @param instance: Instance object
487 033a1d00 Michael Hanselmann
    @param dest: I/O destination
488 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
489 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
490 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
491 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
492 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
493 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
494 033a1d00 Michael Hanselmann

495 033a1d00 Michael Hanselmann
    """
496 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
497 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
498 033a1d00 Michael Hanselmann
    self._dest = dest
499 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
500 033a1d00 Michael Hanselmann
501 033a1d00 Michael Hanselmann
    # Timestamps
502 033a1d00 Michael Hanselmann
    self._ts_listening = None
503 033a1d00 Michael Hanselmann
504 033a1d00 Michael Hanselmann
  @property
505 033a1d00 Michael Hanselmann
  def listen_port(self):
506 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
507 033a1d00 Michael Hanselmann

508 033a1d00 Michael Hanselmann
    """
509 033a1d00 Michael Hanselmann
    if self._daemon:
510 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
511 033a1d00 Michael Hanselmann
512 033a1d00 Michael Hanselmann
    return None
513 033a1d00 Michael Hanselmann
514 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
515 033a1d00 Michael Hanselmann
    """Starts the import daemon.
516 033a1d00 Michael Hanselmann

517 033a1d00 Michael Hanselmann
    """
518 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
519 eb630f50 Michael Hanselmann
                                          self._instance,
520 033a1d00 Michael Hanselmann
                                          self._dest, self._dest_args)
521 033a1d00 Michael Hanselmann
522 033a1d00 Michael Hanselmann
  def CheckListening(self):
523 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
524 033a1d00 Michael Hanselmann

525 033a1d00 Michael Hanselmann
    @rtype: bool
526 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
527 033a1d00 Michael Hanselmann

528 033a1d00 Michael Hanselmann
    """
529 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
530 033a1d00 Michael Hanselmann
531 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
532 033a1d00 Michael Hanselmann
      return True
533 033a1d00 Michael Hanselmann
534 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
535 033a1d00 Michael Hanselmann
    if port is not None:
536 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
537 033a1d00 Michael Hanselmann
538 033a1d00 Michael Hanselmann
      logging.debug("Import %r on %s is now listening on port %s",
539 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
540 033a1d00 Michael Hanselmann
541 033a1d00 Michael Hanselmann
      self._cbs.ReportListening(self, self._private)
542 033a1d00 Michael Hanselmann
543 033a1d00 Michael Hanselmann
      return True
544 033a1d00 Michael Hanselmann
545 033a1d00 Michael Hanselmann
    if _TimeoutExpired(self._ts_begin, self._timeouts.listen):
546 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
547 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
548 033a1d00 Michael Hanselmann
549 033a1d00 Michael Hanselmann
    return False
550 033a1d00 Michael Hanselmann
551 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
552 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
553 033a1d00 Michael Hanselmann

554 033a1d00 Michael Hanselmann
    """
555 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
556 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
557 033a1d00 Michael Hanselmann
            " once it's been listening")
558 033a1d00 Michael Hanselmann
559 033a1d00 Michael Hanselmann
    return self._ts_listening
560 033a1d00 Michael Hanselmann
561 033a1d00 Michael Hanselmann
562 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
563 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
564 033a1d00 Michael Hanselmann
565 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
566 033a1d00 Michael Hanselmann
               dest_host, dest_port, instance, source, source_args,
567 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
568 033a1d00 Michael Hanselmann
    """Initializes this class.
569 033a1d00 Michael Hanselmann

570 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
571 033a1d00 Michael Hanselmann
    @type node_name: string
572 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
573 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
574 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
575 033a1d00 Michael Hanselmann
    @type dest_host: string
576 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
577 033a1d00 Michael Hanselmann
    @type dest_port: number
578 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
579 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
580 033a1d00 Michael Hanselmann
    @param instance: Instance object
581 033a1d00 Michael Hanselmann
    @param source: I/O source
582 033a1d00 Michael Hanselmann
    @param source_args: I/O source
583 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
584 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
585 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
586 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
587 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
588 033a1d00 Michael Hanselmann

589 033a1d00 Michael Hanselmann
    """
590 eb630f50 Michael Hanselmann
    _DiskImportExportBase.__init__(self, lu, node_name, opts,
591 033a1d00 Michael Hanselmann
                                   instance, timeouts, cbs, private)
592 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
593 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
594 033a1d00 Michael Hanselmann
    self._source = source
595 033a1d00 Michael Hanselmann
    self._source_args = source_args
596 033a1d00 Michael Hanselmann
597 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
598 033a1d00 Michael Hanselmann
    """Starts the export daemon.
599 033a1d00 Michael Hanselmann

600 033a1d00 Michael Hanselmann
    """
601 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
602 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
603 033a1d00 Michael Hanselmann
                                          self._instance, self._source,
604 033a1d00 Michael Hanselmann
                                          self._source_args)
605 033a1d00 Michael Hanselmann
606 033a1d00 Michael Hanselmann
  def CheckListening(self):
607 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
608 033a1d00 Michael Hanselmann

609 033a1d00 Michael Hanselmann
    """
610 033a1d00 Michael Hanselmann
    # Only an import can be listening
611 033a1d00 Michael Hanselmann
    return True
612 033a1d00 Michael Hanselmann
613 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
614 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
615 033a1d00 Michael Hanselmann

616 033a1d00 Michael Hanselmann
    """
617 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
618 033a1d00 Michael Hanselmann
619 033a1d00 Michael Hanselmann
    return self._ts_begin
620 033a1d00 Michael Hanselmann
621 033a1d00 Michael Hanselmann
622 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
623 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
624 1a2e7fe9 Michael Hanselmann

625 1a2e7fe9 Michael Hanselmann
  """
626 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
627 1a2e7fe9 Michael Hanselmann
628 1a2e7fe9 Michael Hanselmann
  parts = [
629 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
630 1a2e7fe9 Michael Hanselmann
631 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
632 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
633 1a2e7fe9 Michael Hanselmann
    ]
634 1a2e7fe9 Michael Hanselmann
635 1a2e7fe9 Michael Hanselmann
  if percent is not None:
636 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
637 1a2e7fe9 Michael Hanselmann
638 e6b8d02d Michael Hanselmann
  if eta is not None:
639 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
640 1a2e7fe9 Michael Hanselmann
641 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
642 1a2e7fe9 Michael Hanselmann
643 1a2e7fe9 Michael Hanselmann
644 033a1d00 Michael Hanselmann
class ImportExportLoop:
645 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
646 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
647 033a1d00 Michael Hanselmann
648 033a1d00 Michael Hanselmann
  def __init__(self, lu):
649 033a1d00 Michael Hanselmann
    """Initializes this class.
650 033a1d00 Michael Hanselmann

651 033a1d00 Michael Hanselmann
    """
652 033a1d00 Michael Hanselmann
    self._lu = lu
653 033a1d00 Michael Hanselmann
    self._queue = []
654 033a1d00 Michael Hanselmann
    self._pending_add = []
655 033a1d00 Michael Hanselmann
656 033a1d00 Michael Hanselmann
  def Add(self, diskie):
657 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
658 033a1d00 Michael Hanselmann

659 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
660 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
661 033a1d00 Michael Hanselmann

662 033a1d00 Michael Hanselmann
    """
663 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
664 033a1d00 Michael Hanselmann
    assert diskie.loop is None
665 033a1d00 Michael Hanselmann
666 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
667 033a1d00 Michael Hanselmann
668 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
669 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
670 033a1d00 Michael Hanselmann
    # iterating over it.
671 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
672 033a1d00 Michael Hanselmann
673 033a1d00 Michael Hanselmann
  @staticmethod
674 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
675 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
676 033a1d00 Michael Hanselmann

677 033a1d00 Michael Hanselmann
    """
678 033a1d00 Michael Hanselmann
    daemon_status = {}
679 033a1d00 Michael Hanselmann
680 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
681 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
682 033a1d00 Michael Hanselmann
      if result.fail_msg:
683 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
684 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
685 033a1d00 Michael Hanselmann
        continue
686 033a1d00 Michael Hanselmann
687 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
688 033a1d00 Michael Hanselmann
689 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
690 033a1d00 Michael Hanselmann
691 033a1d00 Michael Hanselmann
    return daemon_status
692 033a1d00 Michael Hanselmann
693 033a1d00 Michael Hanselmann
  @staticmethod
694 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
695 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
696 033a1d00 Michael Hanselmann

697 033a1d00 Michael Hanselmann
    """
698 033a1d00 Michael Hanselmann
    result = {}
699 033a1d00 Michael Hanselmann
    for diskie in queue:
700 033a1d00 Michael Hanselmann
      if not diskie.active:
701 033a1d00 Michael Hanselmann
        continue
702 033a1d00 Michael Hanselmann
703 033a1d00 Michael Hanselmann
      try:
704 033a1d00 Michael Hanselmann
        # Start daemon if necessary
705 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
706 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
707 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
708 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
709 033a1d00 Michael Hanselmann
        continue
710 033a1d00 Michael Hanselmann
711 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
712 033a1d00 Michael Hanselmann
713 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
714 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
715 033a1d00 Michael Hanselmann
716 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
717 033a1d00 Michael Hanselmann
718 033a1d00 Michael Hanselmann
    return result
719 033a1d00 Michael Hanselmann
720 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
721 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
722 033a1d00 Michael Hanselmann

723 033a1d00 Michael Hanselmann
    """
724 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
725 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
726 033a1d00 Michael Hanselmann
727 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
728 033a1d00 Michael Hanselmann
729 033a1d00 Michael Hanselmann
    del self._pending_add[:]
730 033a1d00 Michael Hanselmann
731 033a1d00 Michael Hanselmann
  def Run(self):
732 033a1d00 Michael Hanselmann
    """Utility main loop.
733 033a1d00 Michael Hanselmann

734 033a1d00 Michael Hanselmann
    """
735 033a1d00 Michael Hanselmann
    while True:
736 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
737 033a1d00 Michael Hanselmann
738 033a1d00 Michael Hanselmann
      # Collect all active daemon names
739 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
740 033a1d00 Michael Hanselmann
      if not daemons:
741 033a1d00 Michael Hanselmann
        break
742 033a1d00 Michael Hanselmann
743 033a1d00 Michael Hanselmann
      # Collection daemon status data
744 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
745 033a1d00 Michael Hanselmann
746 033a1d00 Michael Hanselmann
      # Use data
747 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
748 033a1d00 Michael Hanselmann
      for diskie in self._queue:
749 033a1d00 Michael Hanselmann
        if not diskie.active:
750 033a1d00 Michael Hanselmann
          continue
751 033a1d00 Michael Hanselmann
752 033a1d00 Michael Hanselmann
        try:
753 033a1d00 Michael Hanselmann
          try:
754 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
755 033a1d00 Michael Hanselmann
          except KeyError:
756 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
757 033a1d00 Michael Hanselmann
          else:
758 033a1d00 Michael Hanselmann
            result = \
759 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
760 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
761 033a1d00 Michael Hanselmann
762 033a1d00 Michael Hanselmann
          if not result:
763 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
764 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
765 033a1d00 Michael Hanselmann
            continue
766 033a1d00 Michael Hanselmann
767 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
768 033a1d00 Michael Hanselmann
            # Transfer finished
769 033a1d00 Michael Hanselmann
            diskie.Finalize()
770 033a1d00 Michael Hanselmann
            continue
771 033a1d00 Michael Hanselmann
772 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
773 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
774 033a1d00 Michael Hanselmann
775 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
776 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
777 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
778 033a1d00 Michael Hanselmann
            continue
779 033a1d00 Michael Hanselmann
780 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
781 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
782 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
783 033a1d00 Michael Hanselmann
            continue
784 033a1d00 Michael Hanselmann
785 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
786 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
787 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
788 033a1d00 Michael Hanselmann
789 033a1d00 Michael Hanselmann
      if not compat.any([diskie.active for diskie in self._queue]):
790 033a1d00 Michael Hanselmann
        break
791 033a1d00 Michael Hanselmann
792 033a1d00 Michael Hanselmann
      # Wait a bit
793 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
794 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
795 033a1d00 Michael Hanselmann
      time.sleep(delay)
796 033a1d00 Michael Hanselmann
797 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
798 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
799 033a1d00 Michael Hanselmann

800 033a1d00 Michael Hanselmann
    """
801 033a1d00 Michael Hanselmann
    success = True
802 033a1d00 Michael Hanselmann
803 033a1d00 Michael Hanselmann
    for diskie in self._queue:
804 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
805 033a1d00 Michael Hanselmann
806 033a1d00 Michael Hanselmann
    return success
807 5d97d6dd Michael Hanselmann
808 5d97d6dd Michael Hanselmann
809 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
810 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
811 a5310c2a Michael Hanselmann
               dest_node, dest_ip, export_opts):
812 5d97d6dd Michael Hanselmann
    """Initializes this class.
813 5d97d6dd Michael Hanselmann

814 5d97d6dd Michael Hanselmann
    """
815 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
816 5d97d6dd Michael Hanselmann
817 5d97d6dd Michael Hanselmann
    self.lu = lu
818 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
819 5d97d6dd Michael Hanselmann
    self.instance = instance
820 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
821 5d97d6dd Michael Hanselmann
    self.src_node = src_node
822 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
823 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
824 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
825 a5310c2a Michael Hanselmann
    self.export_opts = export_opts
826 5d97d6dd Michael Hanselmann
827 5d97d6dd Michael Hanselmann
828 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
829 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
830 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
831 5d97d6dd Michael Hanselmann

832 5d97d6dd Michael Hanselmann
    """
833 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
834 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
835 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
836 5d97d6dd Michael Hanselmann
837 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
838 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
839 5d97d6dd Michael Hanselmann
840 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
841 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
842 1a2e7fe9 Michael Hanselmann

843 1a2e7fe9 Michael Hanselmann
    """
844 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
845 1a2e7fe9 Michael Hanselmann
    if not progress:
846 1a2e7fe9 Michael Hanselmann
      return
847 1a2e7fe9 Michael Hanselmann
848 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
849 1a2e7fe9 Michael Hanselmann
850 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
851 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
852 5d97d6dd Michael Hanselmann

853 5d97d6dd Michael Hanselmann
    """
854 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
855 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
856 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
857 5d97d6dd Michael Hanselmann
858 5d97d6dd Michael Hanselmann
    if ie.success:
859 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
860 5d97d6dd Michael Hanselmann
    else:
861 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
862 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
863 5d97d6dd Michael Hanselmann
864 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
865 5d97d6dd Michael Hanselmann
866 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
867 5d97d6dd Michael Hanselmann
    if cb:
868 5d97d6dd Michael Hanselmann
      cb()
869 5d97d6dd Michael Hanselmann
870 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
871 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
872 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
873 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
874 5d97d6dd Michael Hanselmann
875 5d97d6dd Michael Hanselmann
876 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
877 5d97d6dd Michael Hanselmann
  def ReportListening(self, ie, dtp):
878 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
879 5d97d6dd Michael Hanselmann

880 5d97d6dd Michael Hanselmann
    """
881 5d97d6dd Michael Hanselmann
    assert self.src_cbs
882 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
883 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
884 5d97d6dd Michael Hanselmann
885 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
886 5d97d6dd Michael Hanselmann
887 5d97d6dd Michael Hanselmann
    # Start export on source node
888 a5310c2a Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, self.export_opts,
889 a5310c2a Michael Hanselmann
                    self.dest_ip, ie.listen_port,
890 eb630f50 Michael Hanselmann
                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
891 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
892 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
893 5d97d6dd Michael Hanselmann
894 5d97d6dd Michael Hanselmann
    dtp.src_export = de
895 5d97d6dd Michael Hanselmann
896 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
897 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
898 5d97d6dd Michael Hanselmann

899 5d97d6dd Michael Hanselmann
    """
900 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
901 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
902 5d97d6dd Michael Hanselmann
903 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
904 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
905 5d97d6dd Michael Hanselmann

906 5d97d6dd Michael Hanselmann
    """
907 5d97d6dd Michael Hanselmann
    if ie.success:
908 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
909 5d97d6dd Michael Hanselmann
    else:
910 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
911 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
912 5d97d6dd Michael Hanselmann
913 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
914 5d97d6dd Michael Hanselmann
915 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
916 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
917 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
918 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
919 5d97d6dd Michael Hanselmann
920 5d97d6dd Michael Hanselmann
921 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
922 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
923 5d97d6dd Michael Hanselmann
               finished_fn):
924 5d97d6dd Michael Hanselmann
    """Initializes this class.
925 5d97d6dd Michael Hanselmann

926 5d97d6dd Michael Hanselmann
    @type name: string
927 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
928 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
929 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
930 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
931 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
932 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
933 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
934 5d97d6dd Michael Hanselmann

935 5d97d6dd Michael Hanselmann
    """
936 5d97d6dd Michael Hanselmann
    self.name = name
937 5d97d6dd Michael Hanselmann
938 5d97d6dd Michael Hanselmann
    self.src_io = src_io
939 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
940 5d97d6dd Michael Hanselmann
941 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
942 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
943 5d97d6dd Michael Hanselmann
944 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
945 5d97d6dd Michael Hanselmann
946 5d97d6dd Michael Hanselmann
947 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
948 5d97d6dd Michael Hanselmann
  def __init__(self, data, success):
949 5d97d6dd Michael Hanselmann
    """Initializes this class.
950 5d97d6dd Michael Hanselmann

951 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
952 5d97d6dd Michael Hanselmann
    @type success: bool
953 5d97d6dd Michael Hanselmann

954 5d97d6dd Michael Hanselmann
    """
955 5d97d6dd Michael Hanselmann
    self.data = data
956 5d97d6dd Michael Hanselmann
957 5d97d6dd Michael Hanselmann
    self.src_export = None
958 5d97d6dd Michael Hanselmann
    self.dest_import = None
959 5d97d6dd Michael Hanselmann
960 5d97d6dd Michael Hanselmann
    self.success = success
961 5d97d6dd Michael Hanselmann
962 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
963 5d97d6dd Michael Hanselmann
    """Updates the status.
964 5d97d6dd Michael Hanselmann

965 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
966 5d97d6dd Michael Hanselmann

967 5d97d6dd Michael Hanselmann
    """
968 5d97d6dd Michael Hanselmann
    self.success = self.success and success
969 5d97d6dd Michael Hanselmann
970 5d97d6dd Michael Hanselmann
971 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
972 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
973 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
974 5d97d6dd Michael Hanselmann

975 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
976 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
977 5d97d6dd Michael Hanselmann
  @type src_node: string
978 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
979 5d97d6dd Michael Hanselmann
  @type dest_node: string
980 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
981 5d97d6dd Michael Hanselmann
  @type dest_ip: string
982 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
983 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
984 5d97d6dd Michael Hanselmann
  @param instance: Instance object
985 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
986 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
987 5d97d6dd Michael Hanselmann
  @rtype: list
988 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
989 5d97d6dd Michael Hanselmann
           each transfer
990 5d97d6dd Michael Hanselmann

991 5d97d6dd Michael Hanselmann
  """
992 a5310c2a Michael Hanselmann
  # Compress only if transfer is to another node
993 a5310c2a Michael Hanselmann
  if src_node == dest_node:
994 a5310c2a Michael Hanselmann
    compress = constants.IEC_NONE
995 a5310c2a Michael Hanselmann
  else:
996 a5310c2a Michael Hanselmann
    compress = constants.IEC_GZIP
997 a5310c2a Michael Hanselmann
998 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
999 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1000 a5310c2a Michael Hanselmann
1001 a5310c2a Michael Hanselmann
  opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1002 a5310c2a Michael Hanselmann
                                     compress=compress)
1003 a5310c2a Michael Hanselmann
1004 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1005 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1006 a5310c2a Michael Hanselmann
                                  src_node, None, dest_node, dest_ip, opts)
1007 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1008 a5310c2a Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip, opts)
1009 5d97d6dd Michael Hanselmann
1010 5d97d6dd Michael Hanselmann
  all_dtp = []
1011 5d97d6dd Michael Hanselmann
1012 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1013 5d97d6dd Michael Hanselmann
  try:
1014 5d97d6dd Michael Hanselmann
    for transfer in all_transfers:
1015 5d97d6dd Michael Hanselmann
      if transfer:
1016 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1017 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1018 5d97d6dd Michael Hanselmann
1019 5d97d6dd Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True)
1020 5d97d6dd Michael Hanselmann
1021 eb630f50 Michael Hanselmann
        di = DiskImport(lu, dest_node, opts, instance,
1022 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1023 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1024 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1025 5d97d6dd Michael Hanselmann
1026 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1027 5d97d6dd Michael Hanselmann
      else:
1028 5d97d6dd Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False)
1029 5d97d6dd Michael Hanselmann
1030 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1031 5d97d6dd Michael Hanselmann
1032 5d97d6dd Michael Hanselmann
    ieloop.Run()
1033 5d97d6dd Michael Hanselmann
  finally:
1034 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1035 5d97d6dd Michael Hanselmann
1036 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1037 5d97d6dd Michael Hanselmann
  assert compat.all([(dtp.src_export is None or
1038 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1039 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1040 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1041 5d97d6dd Michael Hanselmann
                     for dtp in all_dtp]), \
1042 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1043 5d97d6dd Michael Hanselmann
1044 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1045 387794f8 Michael Hanselmann
1046 387794f8 Michael Hanselmann
1047 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1048 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1049 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1050 4a96f1d1 Michael Hanselmann

1051 4a96f1d1 Michael Hanselmann
    """
1052 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1053 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1054 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1055 4a96f1d1 Michael Hanselmann
1056 4a96f1d1 Michael Hanselmann
  @property
1057 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1058 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1059 4a96f1d1 Michael Hanselmann

1060 4a96f1d1 Michael Hanselmann
    """
1061 4a96f1d1 Michael Hanselmann
    return self._dresults
1062 4a96f1d1 Michael Hanselmann
1063 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1064 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1065 4a96f1d1 Michael Hanselmann

1066 4a96f1d1 Michael Hanselmann
    """
1067 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1068 4a96f1d1 Michael Hanselmann
1069 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1070 4a96f1d1 Michael Hanselmann
1071 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1072 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1073 1a2e7fe9 Michael Hanselmann

1074 1a2e7fe9 Michael Hanselmann
    """
1075 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1076 1a2e7fe9 Michael Hanselmann
1077 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1078 1a2e7fe9 Michael Hanselmann
    if not progress:
1079 1a2e7fe9 Michael Hanselmann
      return
1080 1a2e7fe9 Michael Hanselmann
1081 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1082 1a2e7fe9 Michael Hanselmann
1083 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1084 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1085 4a96f1d1 Michael Hanselmann

1086 4a96f1d1 Michael Hanselmann
    """
1087 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1088 4a96f1d1 Michael Hanselmann
1089 4a96f1d1 Michael Hanselmann
    if ie.success:
1090 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1091 4a96f1d1 Michael Hanselmann
    else:
1092 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %r)" %
1093 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1094 4a96f1d1 Michael Hanselmann
1095 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1096 4a96f1d1 Michael Hanselmann
1097 4a96f1d1 Michael Hanselmann
    if finished_fn:
1098 4a96f1d1 Michael Hanselmann
      finished_fn()
1099 4a96f1d1 Michael Hanselmann
1100 4a96f1d1 Michael Hanselmann
1101 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1102 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1103 387794f8 Michael Hanselmann
    """Initializes this class.
1104 387794f8 Michael Hanselmann

1105 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1106 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1107 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1108 387794f8 Michael Hanselmann
    @param instance: Instance object
1109 387794f8 Michael Hanselmann

1110 387794f8 Michael Hanselmann
    """
1111 387794f8 Michael Hanselmann
    self._lu = lu
1112 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1113 387794f8 Michael Hanselmann
    self._instance = instance
1114 387794f8 Michael Hanselmann
1115 387794f8 Michael Hanselmann
    self._snap_disks = []
1116 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1117 387794f8 Michael Hanselmann
1118 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1119 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1120 387794f8 Michael Hanselmann

1121 387794f8 Michael Hanselmann
    """
1122 387794f8 Michael Hanselmann
    assert not self._snap_disks
1123 387794f8 Michael Hanselmann
1124 387794f8 Michael Hanselmann
    instance = self._instance
1125 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1126 387794f8 Michael Hanselmann
1127 387794f8 Michael Hanselmann
    vgname = self._lu.cfg.GetVGName()
1128 387794f8 Michael Hanselmann
1129 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1130 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1131 387794f8 Michael Hanselmann
                        (idx, src_node))
1132 387794f8 Michael Hanselmann
1133 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1134 387794f8 Michael Hanselmann
      # passed
1135 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
1136 387794f8 Michael Hanselmann
      msg = result.fail_msg
1137 387794f8 Michael Hanselmann
      if msg:
1138 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1139 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1140 387794f8 Michael Hanselmann
        new_dev = False
1141 387794f8 Michael Hanselmann
      else:
1142 387794f8 Michael Hanselmann
        disk_id = (vgname, result.payload)
1143 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1144 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1145 387794f8 Michael Hanselmann
                               iv_name=disk.iv_name)
1146 387794f8 Michael Hanselmann
1147 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1148 387794f8 Michael Hanselmann
1149 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1150 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1151 387794f8 Michael Hanselmann
1152 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1153 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1154 387794f8 Michael Hanselmann

1155 387794f8 Michael Hanselmann
    @type disk_index: number
1156 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1157 387794f8 Michael Hanselmann

1158 387794f8 Michael Hanselmann
    """
1159 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1160 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1161 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1162 387794f8 Michael Hanselmann
1163 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1164 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1165 387794f8 Michael Hanselmann
1166 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1167 387794f8 Michael Hanselmann
      if result.fail_msg:
1168 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1169 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1170 387794f8 Michael Hanselmann
      else:
1171 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1172 387794f8 Michael Hanselmann
1173 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1174 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1175 387794f8 Michael Hanselmann

1176 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1177 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1178 387794f8 Michael Hanselmann

1179 387794f8 Michael Hanselmann
    """
1180 387794f8 Michael Hanselmann
    instance = self._instance
1181 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1182 387794f8 Michael Hanselmann
1183 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1184 387794f8 Michael Hanselmann
1185 387794f8 Michael Hanselmann
    transfers = []
1186 387794f8 Michael Hanselmann
1187 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1188 387794f8 Michael Hanselmann
      if not dev:
1189 387794f8 Michael Hanselmann
        transfers.append(None)
1190 387794f8 Michael Hanselmann
        continue
1191 387794f8 Michael Hanselmann
1192 387794f8 Michael Hanselmann
      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
1193 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1194 387794f8 Michael Hanselmann
1195 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1196 387794f8 Michael Hanselmann
1197 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1198 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1199 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1200 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1201 387794f8 Michael Hanselmann
                        finished_fn)
1202 387794f8 Michael Hanselmann
      transfers.append(dt)
1203 387794f8 Michael Hanselmann
1204 387794f8 Michael Hanselmann
    # Actually export data
1205 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1206 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1207 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1208 387794f8 Michael Hanselmann
                                    instance, transfers)
1209 387794f8 Michael Hanselmann
1210 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1211 387794f8 Michael Hanselmann
1212 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1213 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1214 387794f8 Michael Hanselmann
                                               self._snap_disks)
1215 387794f8 Michael Hanselmann
    msg = result.fail_msg
1216 387794f8 Michael Hanselmann
    fin_resu = not msg
1217 387794f8 Michael Hanselmann
    if msg:
1218 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1219 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1220 387794f8 Michael Hanselmann
1221 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1222 387794f8 Michael Hanselmann
1223 eb630f50 Michael Hanselmann
  def RemoteExport(self, opts, disk_info, timeouts):
1224 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1225 4a96f1d1 Michael Hanselmann

1226 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
1227 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
1228 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1229 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1230 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1231 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1232 4a96f1d1 Michael Hanselmann

1233 4a96f1d1 Michael Hanselmann
    """
1234 4a96f1d1 Michael Hanselmann
    instance = self._instance
1235 4a96f1d1 Michael Hanselmann
1236 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1237 4a96f1d1 Michael Hanselmann
1238 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1239 4a96f1d1 Michael Hanselmann
1240 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1241 4a96f1d1 Michael Hanselmann
    try:
1242 acd65a16 Michael Hanselmann
      for idx, (dev, (host, port)) in enumerate(zip(instance.disks,
1243 acd65a16 Michael Hanselmann
                                                    disk_info)):
1244 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1245 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1246 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1247 eb630f50 Michael Hanselmann
                              opts, host, port, instance,
1248 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1249 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1250 4a96f1d1 Michael Hanselmann
1251 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1252 4a96f1d1 Michael Hanselmann
    finally:
1253 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1254 4a96f1d1 Michael Hanselmann
1255 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1256 4a96f1d1 Michael Hanselmann
1257 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1258 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1259 387794f8 Michael Hanselmann

1260 387794f8 Michael Hanselmann
    @type idx: number
1261 387794f8 Michael Hanselmann
    @param idx: Disk index
1262 387794f8 Michael Hanselmann

1263 387794f8 Michael Hanselmann
    """
1264 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1265 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1266 387794f8 Michael Hanselmann
1267 387794f8 Michael Hanselmann
  def Cleanup(self):
1268 387794f8 Michael Hanselmann
    """Remove all snapshots.
1269 387794f8 Michael Hanselmann

1270 387794f8 Michael Hanselmann
    """
1271 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1272 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1273 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1274 1410fa8d Michael Hanselmann
1275 1410fa8d Michael Hanselmann
1276 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1277 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1278 9bf56d77 Michael Hanselmann
               external_address):
1279 9bf56d77 Michael Hanselmann
    """Initializes this class.
1280 9bf56d77 Michael Hanselmann

1281 9bf56d77 Michael Hanselmann
    @type cds: string
1282 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1283 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1284 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1285 9bf56d77 Michael Hanselmann
    @type disk_count: number
1286 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1287 9bf56d77 Michael Hanselmann
    @type external_address: string
1288 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1289 9bf56d77 Michael Hanselmann

1290 9bf56d77 Michael Hanselmann
    """
1291 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1292 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1293 9bf56d77 Michael Hanselmann
    self._cds = cds
1294 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1295 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1296 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1297 9bf56d77 Michael Hanselmann
1298 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1299 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1300 9bf56d77 Michael Hanselmann
1301 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1302 9bf56d77 Michael Hanselmann
1303 9bf56d77 Michael Hanselmann
  @property
1304 9bf56d77 Michael Hanselmann
  def disk_results(self):
1305 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1306 9bf56d77 Michael Hanselmann

1307 9bf56d77 Michael Hanselmann
    """
1308 9bf56d77 Michael Hanselmann
    return self._dresults
1309 9bf56d77 Michael Hanselmann
1310 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1311 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1312 9bf56d77 Michael Hanselmann

1313 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1314 9bf56d77 Michael Hanselmann

1315 9bf56d77 Michael Hanselmann
    """
1316 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1317 9bf56d77 Michael Hanselmann
      return
1318 9bf56d77 Michael Hanselmann
1319 9bf56d77 Michael Hanselmann
    host = self._external_address
1320 9bf56d77 Michael Hanselmann
1321 9bf56d77 Michael Hanselmann
    disks = []
1322 9bf56d77 Michael Hanselmann
    for idx, port in enumerate(self._daemon_port):
1323 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1324 9bf56d77 Michael Hanselmann
                                               idx, host, port))
1325 9bf56d77 Michael Hanselmann
1326 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1327 9bf56d77 Michael Hanselmann
1328 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1329 9bf56d77 Michael Hanselmann
      "disks": disks,
1330 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1331 9bf56d77 Michael Hanselmann
      })
1332 9bf56d77 Michael Hanselmann
1333 9bf56d77 Michael Hanselmann
  def ReportListening(self, ie, private):
1334 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1335 9bf56d77 Michael Hanselmann

1336 9bf56d77 Michael Hanselmann
    """
1337 9bf56d77 Michael Hanselmann
    (idx, ) = private
1338 9bf56d77 Michael Hanselmann
1339 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1340 9bf56d77 Michael Hanselmann
1341 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1342 9bf56d77 Michael Hanselmann
1343 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = ie.listen_port
1344 9bf56d77 Michael Hanselmann
1345 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1346 9bf56d77 Michael Hanselmann
1347 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1348 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1349 9bf56d77 Michael Hanselmann

1350 9bf56d77 Michael Hanselmann
    """
1351 9bf56d77 Michael Hanselmann
    (idx, ) = private
1352 9bf56d77 Michael Hanselmann
1353 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1354 9bf56d77 Michael Hanselmann
1355 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1356 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1357 9bf56d77 Michael Hanselmann

1358 9bf56d77 Michael Hanselmann
    """
1359 9bf56d77 Michael Hanselmann
    (idx, ) = private
1360 9bf56d77 Michael Hanselmann
1361 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1362 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1363 9bf56d77 Michael Hanselmann
1364 9bf56d77 Michael Hanselmann
    if ie.success:
1365 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1366 9bf56d77 Michael Hanselmann
    else:
1367 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1368 9bf56d77 Michael Hanselmann
                         " (recent output: %r)") %
1369 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1370 9bf56d77 Michael Hanselmann
1371 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1372 9bf56d77 Michael Hanselmann
1373 9bf56d77 Michael Hanselmann
1374 9bf56d77 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, source_x509_ca, cds, timeouts):
1375 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1376 9bf56d77 Michael Hanselmann

1377 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1378 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1379 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1380 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1381 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1382 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1383 9bf56d77 Michael Hanselmann
  @type cds: string
1384 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1385 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1386 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1387 9bf56d77 Michael Hanselmann

1388 9bf56d77 Michael Hanselmann
  """
1389 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1390 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1391 9bf56d77 Michael Hanselmann
1392 9bf56d77 Michael Hanselmann
  # Create crypto key
1393 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1394 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1395 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1396 9bf56d77 Michael Hanselmann
1397 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1398 9bf56d77 Michael Hanselmann
  try:
1399 9bf56d77 Michael Hanselmann
    # Load certificate
1400 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1401 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1402 9bf56d77 Michael Hanselmann
1403 eb630f50 Michael Hanselmann
    # Import daemon options
1404 eb630f50 Michael Hanselmann
    opts = objects.ImportExportOptions(key_name=x509_key_name,
1405 eb630f50 Michael Hanselmann
                                       ca_pem=source_ca_pem)
1406 eb630f50 Michael Hanselmann
1407 9bf56d77 Michael Hanselmann
    # Sign certificate
1408 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1409 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1410 9bf56d77 Michael Hanselmann
1411 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1412 9bf56d77 Michael Hanselmann
                          len(instance.disks), instance.primary_node)
1413 9bf56d77 Michael Hanselmann
1414 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1415 9bf56d77 Michael Hanselmann
    try:
1416 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1417 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1418 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1419 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1420 9bf56d77 Michael Hanselmann
1421 9bf56d77 Michael Hanselmann
      ieloop.Run()
1422 9bf56d77 Michael Hanselmann
    finally:
1423 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1424 9bf56d77 Michael Hanselmann
  finally:
1425 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1426 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1427 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1428 9bf56d77 Michael Hanselmann
1429 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1430 9bf56d77 Michael Hanselmann
1431 9bf56d77 Michael Hanselmann
1432 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1433 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1434 1410fa8d Michael Hanselmann

1435 1410fa8d Michael Hanselmann
  @type version: number
1436 1410fa8d Michael Hanselmann

1437 1410fa8d Michael Hanselmann
  """
1438 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1439 1410fa8d Michael Hanselmann
1440 1410fa8d Michael Hanselmann
1441 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1442 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1443 1410fa8d Michael Hanselmann

1444 1410fa8d Michael Hanselmann
  @type cds: string
1445 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1446 1410fa8d Michael Hanselmann

1447 1410fa8d Michael Hanselmann
  """
1448 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1449 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1450 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1451 1410fa8d Michael Hanselmann
1452 1410fa8d Michael Hanselmann
1453 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1454 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1455 1410fa8d Michael Hanselmann

1456 1410fa8d Michael Hanselmann
  @type cds: string
1457 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1458 1410fa8d Michael Hanselmann
  @type handshake: sequence
1459 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1460 1410fa8d Michael Hanselmann

1461 1410fa8d Michael Hanselmann
  """
1462 1410fa8d Michael Hanselmann
  try:
1463 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1464 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1465 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1466 1410fa8d Michael Hanselmann
1467 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1468 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1469 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1470 1410fa8d Michael Hanselmann
1471 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1472 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1473 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1474 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1475 1410fa8d Michael Hanselmann
1476 1410fa8d Michael Hanselmann
  return None
1477 4a96f1d1 Michael Hanselmann
1478 4a96f1d1 Michael Hanselmann
1479 4a96f1d1 Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port):
1480 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1481 4a96f1d1 Michael Hanselmann

1482 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1483 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1484 4a96f1d1 Michael Hanselmann
  @type host: string
1485 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1486 4a96f1d1 Michael Hanselmann
  @type port: number
1487 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1488 4a96f1d1 Michael Hanselmann

1489 4a96f1d1 Michael Hanselmann
  """
1490 4a96f1d1 Michael Hanselmann
  return "%s:%s:%s" % (disk_index, host, port)
1491 4a96f1d1 Michael Hanselmann
1492 4a96f1d1 Michael Hanselmann
1493 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1494 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1495 4a96f1d1 Michael Hanselmann

1496 4a96f1d1 Michael Hanselmann
  @type cds: string
1497 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1498 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1499 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1500 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1501 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1502 4a96f1d1 Michael Hanselmann

1503 4a96f1d1 Michael Hanselmann
  """
1504 4a96f1d1 Michael Hanselmann
  try:
1505 4a96f1d1 Michael Hanselmann
    (host, port, hmac_digest, hmac_salt) = disk_info
1506 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1507 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1508 4a96f1d1 Michael Hanselmann
1509 4a96f1d1 Michael Hanselmann
  if not (host and port):
1510 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Missing destination host or port")
1511 4a96f1d1 Michael Hanselmann
1512 4a96f1d1 Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1513 4a96f1d1 Michael Hanselmann
1514 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1515 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1516 4a96f1d1 Michael Hanselmann
1517 acd65a16 Michael Hanselmann
  return (utils.HostInfo.NormalizeName(host),
1518 acd65a16 Michael Hanselmann
          utils.ValidateServiceName(port))
1519 4a96f1d1 Michael Hanselmann
1520 4a96f1d1 Michael Hanselmann
1521 4a96f1d1 Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port):
1522 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1523 4a96f1d1 Michael Hanselmann

1524 4a96f1d1 Michael Hanselmann
  @type cds: string
1525 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1526 4a96f1d1 Michael Hanselmann
  @type salt: string
1527 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1528 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1529 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1530 4a96f1d1 Michael Hanselmann
  @type host: string
1531 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1532 4a96f1d1 Michael Hanselmann
  @type port: number
1533 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1534 4a96f1d1 Michael Hanselmann

1535 4a96f1d1 Michael Hanselmann
  """
1536 4a96f1d1 Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port)
1537 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1538 4a96f1d1 Michael Hanselmann
  return (host, port, hmac_digest, salt)