Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ f3aebf6f

History | View | Annotate | Download (47.1 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 c9300bb3 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 9c492c2d Michael Hanselmann
from ganeti import pathutils
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
39 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
40 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
41 033a1d00 Michael Hanselmann

42 033a1d00 Michael Hanselmann
  """
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
45 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
46 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
47 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
48 033a1d00 Michael Hanselmann
49 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
50 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
51 033a1d00 Michael Hanselmann
52 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
53 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
54 033a1d00 Michael Hanselmann
55 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
56 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
57 1a2e7fe9 Michael Hanselmann
58 033a1d00 Michael Hanselmann
  __slots__ = [
59 033a1d00 Michael Hanselmann
    "error",
60 033a1d00 Michael Hanselmann
    "ready",
61 033a1d00 Michael Hanselmann
    "listen",
62 033a1d00 Michael Hanselmann
    "connect",
63 1a2e7fe9 Michael Hanselmann
    "progress",
64 033a1d00 Michael Hanselmann
    ]
65 033a1d00 Michael Hanselmann
66 033a1d00 Michael Hanselmann
  def __init__(self, connect,
67 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
68 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
70 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
71 033a1d00 Michael Hanselmann
    """Initializes this class.
72 033a1d00 Michael Hanselmann

73 033a1d00 Michael Hanselmann
    @type connect: number
74 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
75 033a1d00 Michael Hanselmann
    @type listen: number
76 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
77 033a1d00 Michael Hanselmann
    @type error: number
78 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
79 033a1d00 Michael Hanselmann
    @type ready: number
80 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
81 1a2e7fe9 Michael Hanselmann
    @type progress: number
82 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
83 033a1d00 Michael Hanselmann

84 033a1d00 Michael Hanselmann
    """
85 033a1d00 Michael Hanselmann
    self.error = error
86 033a1d00 Michael Hanselmann
    self.ready = ready
87 033a1d00 Michael Hanselmann
    self.listen = listen
88 033a1d00 Michael Hanselmann
    self.connect = connect
89 1a2e7fe9 Michael Hanselmann
    self.progress = progress
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
92 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
93 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
94 033a1d00 Michael Hanselmann

95 033a1d00 Michael Hanselmann
  """
96 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, component):
97 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
98 033a1d00 Michael Hanselmann

99 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
100 033a1d00 Michael Hanselmann
    @param ie: Import/export object
101 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
102 5e26c4d9 Iustin Pop
    @param component: transfer component name
103 033a1d00 Michael Hanselmann

104 033a1d00 Michael Hanselmann
    """
105 033a1d00 Michael Hanselmann
106 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
107 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
108 033a1d00 Michael Hanselmann

109 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
110 033a1d00 Michael Hanselmann
    @param ie: Import/export object
111 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
112 033a1d00 Michael Hanselmann

113 033a1d00 Michael Hanselmann
    """
114 033a1d00 Michael Hanselmann
115 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
116 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
117 1a2e7fe9 Michael Hanselmann

118 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
119 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
120 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
121 1a2e7fe9 Michael Hanselmann

122 1a2e7fe9 Michael Hanselmann
    """
123 1a2e7fe9 Michael Hanselmann
124 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
125 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
126 033a1d00 Michael Hanselmann

127 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
128 033a1d00 Michael Hanselmann
    @param ie: Import/export object
129 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
130 033a1d00 Michael Hanselmann

131 033a1d00 Michael Hanselmann
    """
132 033a1d00 Michael Hanselmann
133 033a1d00 Michael Hanselmann
134 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
135 033a1d00 Michael Hanselmann
  MODE_TEXT = None
136 033a1d00 Michael Hanselmann
137 1c3231aa Thomas Thrainer
  def __init__(self, lu, node_uuid, opts,
138 5e26c4d9 Iustin Pop
               instance, component, timeouts, cbs, private=None):
139 033a1d00 Michael Hanselmann
    """Initializes this class.
140 033a1d00 Michael Hanselmann

141 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
142 1c3231aa Thomas Thrainer
    @type node_uuid: string
143 1c3231aa Thomas Thrainer
    @param node_uuid: Node UUID for import
144 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
145 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
146 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
147 033a1d00 Michael Hanselmann
    @param instance: Instance object
148 5e26c4d9 Iustin Pop
    @type component: string
149 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
150 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
151 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
152 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
153 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
154 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
155 033a1d00 Michael Hanselmann

156 033a1d00 Michael Hanselmann
    """
157 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
158 033a1d00 Michael Hanselmann
159 033a1d00 Michael Hanselmann
    self._lu = lu
160 1c3231aa Thomas Thrainer
    self.node_uuid = node_uuid
161 1c3231aa Thomas Thrainer
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162 4478301b Michael Hanselmann
    self._opts = opts.Copy()
163 033a1d00 Michael Hanselmann
    self._instance = instance
164 5e26c4d9 Iustin Pop
    self._component = component
165 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
166 033a1d00 Michael Hanselmann
    self._cbs = cbs
167 033a1d00 Michael Hanselmann
    self._private = private
168 033a1d00 Michael Hanselmann
169 4478301b Michael Hanselmann
    # Set master daemon's timeout in options for import/export daemon
170 4478301b Michael Hanselmann
    assert self._opts.connect_timeout is None
171 4478301b Michael Hanselmann
    self._opts.connect_timeout = timeouts.connect
172 4478301b Michael Hanselmann
173 033a1d00 Michael Hanselmann
    # Parent loop
174 033a1d00 Michael Hanselmann
    self._loop = None
175 033a1d00 Michael Hanselmann
176 033a1d00 Michael Hanselmann
    # Timestamps
177 033a1d00 Michael Hanselmann
    self._ts_begin = None
178 033a1d00 Michael Hanselmann
    self._ts_connected = None
179 033a1d00 Michael Hanselmann
    self._ts_finished = None
180 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
181 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
182 033a1d00 Michael Hanselmann
    self._ts_last_error = None
183 033a1d00 Michael Hanselmann
184 033a1d00 Michael Hanselmann
    # Transfer status
185 033a1d00 Michael Hanselmann
    self.success = None
186 033a1d00 Michael Hanselmann
    self.final_message = None
187 033a1d00 Michael Hanselmann
188 033a1d00 Michael Hanselmann
    # Daemon status
189 033a1d00 Michael Hanselmann
    self._daemon_name = None
190 033a1d00 Michael Hanselmann
    self._daemon = None
191 033a1d00 Michael Hanselmann
192 033a1d00 Michael Hanselmann
  @property
193 033a1d00 Michael Hanselmann
  def recent_output(self):
194 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
195 033a1d00 Michael Hanselmann

196 033a1d00 Michael Hanselmann
    """
197 033a1d00 Michael Hanselmann
    if self._daemon:
198 c9300bb3 Iustin Pop
      return "\n".join(self._daemon.recent_output)
199 033a1d00 Michael Hanselmann
200 033a1d00 Michael Hanselmann
    return None
201 033a1d00 Michael Hanselmann
202 033a1d00 Michael Hanselmann
  @property
203 1a2e7fe9 Michael Hanselmann
  def progress(self):
204 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
205 1a2e7fe9 Michael Hanselmann

206 1a2e7fe9 Michael Hanselmann
    """
207 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
208 1a2e7fe9 Michael Hanselmann
      return None
209 1a2e7fe9 Michael Hanselmann
210 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
211 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
212 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
213 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
214 1a2e7fe9 Michael Hanselmann
215 1a2e7fe9 Michael Hanselmann
  @property
216 af1d39b1 Michael Hanselmann
  def magic(self):
217 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
218 af1d39b1 Michael Hanselmann

219 af1d39b1 Michael Hanselmann
    """
220 af1d39b1 Michael Hanselmann
    return self._opts.magic
221 af1d39b1 Michael Hanselmann
222 af1d39b1 Michael Hanselmann
  @property
223 033a1d00 Michael Hanselmann
  def active(self):
224 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
225 033a1d00 Michael Hanselmann

226 033a1d00 Michael Hanselmann
    """
227 033a1d00 Michael Hanselmann
    return self.success is None
228 033a1d00 Michael Hanselmann
229 033a1d00 Michael Hanselmann
  @property
230 033a1d00 Michael Hanselmann
  def loop(self):
231 033a1d00 Michael Hanselmann
    """Returns parent loop.
232 033a1d00 Michael Hanselmann

233 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
234 033a1d00 Michael Hanselmann

235 033a1d00 Michael Hanselmann
    """
236 033a1d00 Michael Hanselmann
    return self._loop
237 033a1d00 Michael Hanselmann
238 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
239 033a1d00 Michael Hanselmann
    """Sets the parent loop.
240 033a1d00 Michael Hanselmann

241 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
242 033a1d00 Michael Hanselmann

243 033a1d00 Michael Hanselmann
    """
244 033a1d00 Michael Hanselmann
    if self._loop:
245 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
246 033a1d00 Michael Hanselmann
247 033a1d00 Michael Hanselmann
    self._loop = loop
248 033a1d00 Michael Hanselmann
249 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
250 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
251 033a1d00 Michael Hanselmann

252 033a1d00 Michael Hanselmann
    """
253 033a1d00 Michael Hanselmann
    raise NotImplementedError()
254 033a1d00 Michael Hanselmann
255 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
256 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
257 033a1d00 Michael Hanselmann

258 033a1d00 Michael Hanselmann
    @rtype: string
259 033a1d00 Michael Hanselmann
    @return: Daemon name
260 033a1d00 Michael Hanselmann

261 033a1d00 Michael Hanselmann
    """
262 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
263 033a1d00 Michael Hanselmann
264 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
265 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
266 033a1d00 Michael Hanselmann
267 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
268 033a1d00 Michael Hanselmann
      if result.fail_msg:
269 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
270 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
271 033a1d00 Michael Hanselmann
                                  result.fail_msg))
272 033a1d00 Michael Hanselmann
273 033a1d00 Michael Hanselmann
      daemon_name = result.payload
274 033a1d00 Michael Hanselmann
275 194e8648 Iustin Pop
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276 033a1d00 Michael Hanselmann
                   self.node_name)
277 033a1d00 Michael Hanselmann
278 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
279 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
280 033a1d00 Michael Hanselmann
281 033a1d00 Michael Hanselmann
    return self._daemon_name
282 033a1d00 Michael Hanselmann
283 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
284 033a1d00 Michael Hanselmann
    """Returns the daemon name.
285 033a1d00 Michael Hanselmann

286 033a1d00 Michael Hanselmann
    """
287 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
288 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
289 033a1d00 Michael Hanselmann
    return self._daemon_name
290 033a1d00 Michael Hanselmann
291 033a1d00 Michael Hanselmann
  def Abort(self):
292 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
293 033a1d00 Michael Hanselmann

294 033a1d00 Michael Hanselmann
    """
295 033a1d00 Michael Hanselmann
    if self._daemon_name:
296 194e8648 Iustin Pop
      self._lu.LogWarning("Aborting %s '%s' on %s",
297 1c3231aa Thomas Thrainer
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298 1c3231aa Thomas Thrainer
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299 033a1d00 Michael Hanselmann
      if result.fail_msg:
300 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
302 1c3231aa Thomas Thrainer
                            self.node_uuid, result.fail_msg)
303 033a1d00 Michael Hanselmann
        return False
304 033a1d00 Michael Hanselmann
305 033a1d00 Michael Hanselmann
    return True
306 033a1d00 Michael Hanselmann
307 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
308 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
309 033a1d00 Michael Hanselmann

310 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
311 033a1d00 Michael Hanselmann
    @param data: Daemon status data
312 033a1d00 Michael Hanselmann

313 033a1d00 Michael Hanselmann
    """
314 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
315 033a1d00 Michael Hanselmann
316 033a1d00 Michael Hanselmann
    if not data:
317 f8326fca Andrea Spadaccini
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
319 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
320 033a1d00 Michael Hanselmann
321 033a1d00 Michael Hanselmann
      return False
322 033a1d00 Michael Hanselmann
323 033a1d00 Michael Hanselmann
    self._daemon = data
324 033a1d00 Michael Hanselmann
325 033a1d00 Michael Hanselmann
    return True
326 033a1d00 Michael Hanselmann
327 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
328 033a1d00 Michael Hanselmann
    """Updates daemon status data.
329 033a1d00 Michael Hanselmann

330 033a1d00 Michael Hanselmann
    @type success: bool
331 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
332 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
333 033a1d00 Michael Hanselmann
    @param data: Daemon status data
334 033a1d00 Michael Hanselmann

335 033a1d00 Michael Hanselmann
    """
336 033a1d00 Michael Hanselmann
    if not success:
337 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
338 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
339 033a1d00 Michael Hanselmann
340 f8326fca Andrea Spadaccini
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
342 033a1d00 Michael Hanselmann
343 033a1d00 Michael Hanselmann
      return False
344 033a1d00 Michael Hanselmann
345 033a1d00 Michael Hanselmann
    self._ts_last_error = None
346 033a1d00 Michael Hanselmann
347 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
348 033a1d00 Michael Hanselmann
349 033a1d00 Michael Hanselmann
  def CheckListening(self):
350 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
351 033a1d00 Michael Hanselmann

352 033a1d00 Michael Hanselmann
    """
353 033a1d00 Michael Hanselmann
    raise NotImplementedError()
354 033a1d00 Michael Hanselmann
355 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
356 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
357 033a1d00 Michael Hanselmann

358 033a1d00 Michael Hanselmann
    """
359 033a1d00 Michael Hanselmann
    raise NotImplementedError()
360 033a1d00 Michael Hanselmann
361 033a1d00 Michael Hanselmann
  def CheckConnected(self):
362 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
363 033a1d00 Michael Hanselmann

364 033a1d00 Michael Hanselmann
    @rtype: bool
365 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
366 033a1d00 Michael Hanselmann

367 033a1d00 Michael Hanselmann
    """
368 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
369 033a1d00 Michael Hanselmann
370 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
371 033a1d00 Michael Hanselmann
      return True
372 033a1d00 Michael Hanselmann
373 033a1d00 Michael Hanselmann
    if self._daemon.connected:
374 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
375 033a1d00 Michael Hanselmann
376 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
377 194e8648 Iustin Pop
      logging.debug("%s '%s' on %s is now connected",
378 1c3231aa Thomas Thrainer
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379 033a1d00 Michael Hanselmann
380 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
381 033a1d00 Michael Hanselmann
382 033a1d00 Michael Hanselmann
      return True
383 033a1d00 Michael Hanselmann
384 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385 f8326fca Andrea Spadaccini
                            self._timeouts.connect):
386 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
387 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
388 033a1d00 Michael Hanselmann
389 033a1d00 Michael Hanselmann
    return False
390 033a1d00 Michael Hanselmann
391 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
392 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
393 1a2e7fe9 Michael Hanselmann

394 1a2e7fe9 Michael Hanselmann
    """
395 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
396 f8326fca Andrea Spadaccini
        utils.TimeoutExpired(self._ts_last_progress,
397 f8326fca Andrea Spadaccini
                             self._timeouts.progress)) and
398 1a2e7fe9 Michael Hanselmann
        self._daemon and
399 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
400 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
401 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
402 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
403 1a2e7fe9 Michael Hanselmann
404 033a1d00 Michael Hanselmann
  def CheckFinished(self):
405 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
406 033a1d00 Michael Hanselmann

407 033a1d00 Michael Hanselmann
    @rtype: bool
408 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
409 033a1d00 Michael Hanselmann

410 033a1d00 Michael Hanselmann
    """
411 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
412 033a1d00 Michael Hanselmann
413 033a1d00 Michael Hanselmann
    if self._ts_finished:
414 033a1d00 Michael Hanselmann
      return True
415 033a1d00 Michael Hanselmann
416 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
417 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
418 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
419 033a1d00 Michael Hanselmann
      return False
420 033a1d00 Michael Hanselmann
421 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
422 033a1d00 Michael Hanselmann
423 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
424 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
425 033a1d00 Michael Hanselmann
426 033a1d00 Michael Hanselmann
    return True
427 033a1d00 Michael Hanselmann
428 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
429 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
430 033a1d00 Michael Hanselmann

431 033a1d00 Michael Hanselmann
    @type success: bool
432 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
433 033a1d00 Michael Hanselmann
    @type message: string
434 033a1d00 Michael Hanselmann
    @param message: Error message
435 033a1d00 Michael Hanselmann

436 033a1d00 Michael Hanselmann
    """
437 033a1d00 Michael Hanselmann
    assert self.success is None
438 033a1d00 Michael Hanselmann
439 033a1d00 Michael Hanselmann
    self.success = success
440 033a1d00 Michael Hanselmann
    self.final_message = message
441 033a1d00 Michael Hanselmann
442 033a1d00 Michael Hanselmann
    if success:
443 194e8648 Iustin Pop
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444 1c3231aa Thomas Thrainer
                   self._daemon_name, self.node_uuid)
445 033a1d00 Michael Hanselmann
    elif self._daemon_name:
446 194e8648 Iustin Pop
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447 1c3231aa Thomas Thrainer
                          self.MODE_TEXT, self._daemon_name,
448 1c3231aa Thomas Thrainer
                          self._lu.cfg.GetNodeName(self.node_uuid),
449 033a1d00 Michael Hanselmann
                          message)
450 033a1d00 Michael Hanselmann
    else:
451 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452 1c3231aa Thomas Thrainer
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453 033a1d00 Michael Hanselmann
454 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
455 033a1d00 Michael Hanselmann
456 033a1d00 Michael Hanselmann
  def _Finalize(self):
457 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
458 033a1d00 Michael Hanselmann

459 033a1d00 Michael Hanselmann
    """
460 1c3231aa Thomas Thrainer
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461 033a1d00 Michael Hanselmann
462 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
463 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
464 033a1d00 Michael Hanselmann

465 033a1d00 Michael Hanselmann
    """
466 033a1d00 Michael Hanselmann
    if self._daemon_name:
467 194e8648 Iustin Pop
      logging.info("Finalizing %s '%s' on %s",
468 1c3231aa Thomas Thrainer
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469 033a1d00 Michael Hanselmann
470 033a1d00 Michael Hanselmann
      result = self._Finalize()
471 033a1d00 Michael Hanselmann
      if result.fail_msg:
472 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
474 1c3231aa Thomas Thrainer
                            self.node_uuid, result.fail_msg)
475 033a1d00 Michael Hanselmann
        return False
476 033a1d00 Michael Hanselmann
477 033a1d00 Michael Hanselmann
      # Daemon is no longer running
478 033a1d00 Michael Hanselmann
      self._daemon_name = None
479 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
480 033a1d00 Michael Hanselmann
481 033a1d00 Michael Hanselmann
    if error:
482 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
483 033a1d00 Michael Hanselmann
484 033a1d00 Michael Hanselmann
    return True
485 033a1d00 Michael Hanselmann
486 033a1d00 Michael Hanselmann
487 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
488 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
489 033a1d00 Michael Hanselmann
490 1c3231aa Thomas Thrainer
  def __init__(self, lu, node_uuid, opts, instance, component,
491 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
492 033a1d00 Michael Hanselmann
    """Initializes this class.
493 033a1d00 Michael Hanselmann

494 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
495 1c3231aa Thomas Thrainer
    @type node_uuid: string
496 1c3231aa Thomas Thrainer
    @param node_uuid: Node name for import
497 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
498 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
499 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
500 033a1d00 Michael Hanselmann
    @param instance: Instance object
501 5e26c4d9 Iustin Pop
    @type component: string
502 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
503 033a1d00 Michael Hanselmann
    @param dest: I/O destination
504 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
505 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
506 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
507 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
508 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
509 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
510 033a1d00 Michael Hanselmann

511 033a1d00 Michael Hanselmann
    """
512 1c3231aa Thomas Thrainer
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
514 033a1d00 Michael Hanselmann
    self._dest = dest
515 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
516 033a1d00 Michael Hanselmann
517 033a1d00 Michael Hanselmann
    # Timestamps
518 033a1d00 Michael Hanselmann
    self._ts_listening = None
519 033a1d00 Michael Hanselmann
520 033a1d00 Michael Hanselmann
  @property
521 033a1d00 Michael Hanselmann
  def listen_port(self):
522 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
523 033a1d00 Michael Hanselmann

524 033a1d00 Michael Hanselmann
    """
525 033a1d00 Michael Hanselmann
    if self._daemon:
526 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
527 033a1d00 Michael Hanselmann
528 033a1d00 Michael Hanselmann
    return None
529 033a1d00 Michael Hanselmann
530 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
531 033a1d00 Michael Hanselmann
    """Starts the import daemon.
532 033a1d00 Michael Hanselmann

533 033a1d00 Michael Hanselmann
    """
534 1c3231aa Thomas Thrainer
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535 6613661a Iustin Pop
                                          self._instance, self._component,
536 b8c160c1 Michael Hanselmann
                                          (self._dest, self._dest_args))
537 033a1d00 Michael Hanselmann
538 033a1d00 Michael Hanselmann
  def CheckListening(self):
539 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
540 033a1d00 Michael Hanselmann

541 033a1d00 Michael Hanselmann
    @rtype: bool
542 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
543 033a1d00 Michael Hanselmann

544 033a1d00 Michael Hanselmann
    """
545 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
546 033a1d00 Michael Hanselmann
547 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
548 033a1d00 Michael Hanselmann
      return True
549 033a1d00 Michael Hanselmann
550 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
551 033a1d00 Michael Hanselmann
    if port is not None:
552 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
553 033a1d00 Michael Hanselmann
554 194e8648 Iustin Pop
      logging.debug("Import '%s' on %s is now listening on port %s",
555 1c3231aa Thomas Thrainer
                    self._daemon_name, self.node_uuid, port)
556 033a1d00 Michael Hanselmann
557 5e26c4d9 Iustin Pop
      self._cbs.ReportListening(self, self._private, self._component)
558 033a1d00 Michael Hanselmann
559 033a1d00 Michael Hanselmann
      return True
560 033a1d00 Michael Hanselmann
561 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
563 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
564 033a1d00 Michael Hanselmann
565 033a1d00 Michael Hanselmann
    return False
566 033a1d00 Michael Hanselmann
567 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
568 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
569 033a1d00 Michael Hanselmann

570 033a1d00 Michael Hanselmann
    """
571 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
572 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
573 033a1d00 Michael Hanselmann
            " once it's been listening")
574 033a1d00 Michael Hanselmann
575 033a1d00 Michael Hanselmann
    return self._ts_listening
576 033a1d00 Michael Hanselmann
577 033a1d00 Michael Hanselmann
578 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
579 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
580 033a1d00 Michael Hanselmann
581 1c3231aa Thomas Thrainer
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582 5e26c4d9 Iustin Pop
               instance, component, source, source_args,
583 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
584 033a1d00 Michael Hanselmann
    """Initializes this class.
585 033a1d00 Michael Hanselmann

586 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
587 1c3231aa Thomas Thrainer
    @type node_uuid: string
588 1c3231aa Thomas Thrainer
    @param node_uuid: Node UUID for import
589 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
590 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
591 033a1d00 Michael Hanselmann
    @type dest_host: string
592 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
593 033a1d00 Michael Hanselmann
    @type dest_port: number
594 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
595 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
596 033a1d00 Michael Hanselmann
    @param instance: Instance object
597 5e26c4d9 Iustin Pop
    @type component: string
598 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
599 033a1d00 Michael Hanselmann
    @param source: I/O source
600 033a1d00 Michael Hanselmann
    @param source_args: I/O source
601 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
602 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
603 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
604 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
605 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
606 033a1d00 Michael Hanselmann

607 033a1d00 Michael Hanselmann
    """
608 1c3231aa Thomas Thrainer
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
610 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
611 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
612 033a1d00 Michael Hanselmann
    self._source = source
613 033a1d00 Michael Hanselmann
    self._source_args = source_args
614 033a1d00 Michael Hanselmann
615 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
616 033a1d00 Michael Hanselmann
    """Starts the export daemon.
617 033a1d00 Michael Hanselmann

618 033a1d00 Michael Hanselmann
    """
619 1c3231aa Thomas Thrainer
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
621 6613661a Iustin Pop
                                          self._instance, self._component,
622 b8c160c1 Michael Hanselmann
                                          (self._source, self._source_args))
623 033a1d00 Michael Hanselmann
624 033a1d00 Michael Hanselmann
  def CheckListening(self):
625 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
626 033a1d00 Michael Hanselmann

627 033a1d00 Michael Hanselmann
    """
628 033a1d00 Michael Hanselmann
    # Only an import can be listening
629 033a1d00 Michael Hanselmann
    return True
630 033a1d00 Michael Hanselmann
631 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
632 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
633 033a1d00 Michael Hanselmann

634 033a1d00 Michael Hanselmann
    """
635 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
636 033a1d00 Michael Hanselmann
637 033a1d00 Michael Hanselmann
    return self._ts_begin
638 033a1d00 Michael Hanselmann
639 033a1d00 Michael Hanselmann
640 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
641 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
642 1a2e7fe9 Michael Hanselmann

643 1a2e7fe9 Michael Hanselmann
  """
644 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
645 1a2e7fe9 Michael Hanselmann
646 1a2e7fe9 Michael Hanselmann
  parts = [
647 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
648 1a2e7fe9 Michael Hanselmann
649 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
650 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
651 1a2e7fe9 Michael Hanselmann
    ]
652 1a2e7fe9 Michael Hanselmann
653 1a2e7fe9 Michael Hanselmann
  if percent is not None:
654 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
655 1a2e7fe9 Michael Hanselmann
656 e6b8d02d Michael Hanselmann
  if eta is not None:
657 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658 1a2e7fe9 Michael Hanselmann
659 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
660 1a2e7fe9 Michael Hanselmann
661 1a2e7fe9 Michael Hanselmann
662 033a1d00 Michael Hanselmann
class ImportExportLoop:
663 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
664 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
665 033a1d00 Michael Hanselmann
666 033a1d00 Michael Hanselmann
  def __init__(self, lu):
667 033a1d00 Michael Hanselmann
    """Initializes this class.
668 033a1d00 Michael Hanselmann

669 033a1d00 Michael Hanselmann
    """
670 033a1d00 Michael Hanselmann
    self._lu = lu
671 033a1d00 Michael Hanselmann
    self._queue = []
672 033a1d00 Michael Hanselmann
    self._pending_add = []
673 033a1d00 Michael Hanselmann
674 033a1d00 Michael Hanselmann
  def Add(self, diskie):
675 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
676 033a1d00 Michael Hanselmann

677 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
678 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
679 033a1d00 Michael Hanselmann

680 033a1d00 Michael Hanselmann
    """
681 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
682 033a1d00 Michael Hanselmann
    assert diskie.loop is None
683 033a1d00 Michael Hanselmann
684 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
685 033a1d00 Michael Hanselmann
686 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
687 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
688 033a1d00 Michael Hanselmann
    # iterating over it.
689 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
690 033a1d00 Michael Hanselmann
691 033a1d00 Michael Hanselmann
  @staticmethod
692 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
693 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
694 033a1d00 Michael Hanselmann

695 033a1d00 Michael Hanselmann
    """
696 033a1d00 Michael Hanselmann
    daemon_status = {}
697 033a1d00 Michael Hanselmann
698 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
699 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
700 033a1d00 Michael Hanselmann
      if result.fail_msg:
701 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
702 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
703 033a1d00 Michael Hanselmann
        continue
704 033a1d00 Michael Hanselmann
705 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
706 033a1d00 Michael Hanselmann
707 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
708 033a1d00 Michael Hanselmann
709 033a1d00 Michael Hanselmann
    return daemon_status
710 033a1d00 Michael Hanselmann
711 033a1d00 Michael Hanselmann
  @staticmethod
712 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
713 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
714 033a1d00 Michael Hanselmann

715 033a1d00 Michael Hanselmann
    """
716 033a1d00 Michael Hanselmann
    result = {}
717 033a1d00 Michael Hanselmann
    for diskie in queue:
718 033a1d00 Michael Hanselmann
      if not diskie.active:
719 033a1d00 Michael Hanselmann
        continue
720 033a1d00 Michael Hanselmann
721 033a1d00 Michael Hanselmann
      try:
722 033a1d00 Michael Hanselmann
        # Start daemon if necessary
723 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
724 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
725 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
726 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
727 033a1d00 Michael Hanselmann
        continue
728 033a1d00 Michael Hanselmann
729 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
730 033a1d00 Michael Hanselmann
731 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
732 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733 033a1d00 Michael Hanselmann
734 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
735 033a1d00 Michael Hanselmann
736 033a1d00 Michael Hanselmann
    return result
737 033a1d00 Michael Hanselmann
738 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
739 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
740 033a1d00 Michael Hanselmann

741 033a1d00 Michael Hanselmann
    """
742 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
743 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
744 033a1d00 Michael Hanselmann
745 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
746 033a1d00 Michael Hanselmann
747 033a1d00 Michael Hanselmann
    del self._pending_add[:]
748 033a1d00 Michael Hanselmann
749 033a1d00 Michael Hanselmann
  def Run(self):
750 033a1d00 Michael Hanselmann
    """Utility main loop.
751 033a1d00 Michael Hanselmann

752 033a1d00 Michael Hanselmann
    """
753 033a1d00 Michael Hanselmann
    while True:
754 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
755 033a1d00 Michael Hanselmann
756 033a1d00 Michael Hanselmann
      # Collect all active daemon names
757 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
758 033a1d00 Michael Hanselmann
      if not daemons:
759 033a1d00 Michael Hanselmann
        break
760 033a1d00 Michael Hanselmann
761 033a1d00 Michael Hanselmann
      # Collection daemon status data
762 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
763 033a1d00 Michael Hanselmann
764 033a1d00 Michael Hanselmann
      # Use data
765 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
766 033a1d00 Michael Hanselmann
      for diskie in self._queue:
767 033a1d00 Michael Hanselmann
        if not diskie.active:
768 033a1d00 Michael Hanselmann
          continue
769 033a1d00 Michael Hanselmann
770 033a1d00 Michael Hanselmann
        try:
771 033a1d00 Michael Hanselmann
          try:
772 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
773 033a1d00 Michael Hanselmann
          except KeyError:
774 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
775 033a1d00 Michael Hanselmann
          else:
776 033a1d00 Michael Hanselmann
            result = \
777 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
778 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
779 033a1d00 Michael Hanselmann
780 033a1d00 Michael Hanselmann
          if not result:
781 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
782 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
783 033a1d00 Michael Hanselmann
            continue
784 033a1d00 Michael Hanselmann
785 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
786 033a1d00 Michael Hanselmann
            # Transfer finished
787 033a1d00 Michael Hanselmann
            diskie.Finalize()
788 033a1d00 Michael Hanselmann
            continue
789 033a1d00 Michael Hanselmann
790 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
791 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
792 033a1d00 Michael Hanselmann
793 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
794 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
795 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
796 033a1d00 Michael Hanselmann
            continue
797 033a1d00 Michael Hanselmann
798 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
799 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
800 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
801 033a1d00 Michael Hanselmann
            continue
802 033a1d00 Michael Hanselmann
803 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
804 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
805 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
806 033a1d00 Michael Hanselmann
807 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
808 033a1d00 Michael Hanselmann
        break
809 033a1d00 Michael Hanselmann
810 033a1d00 Michael Hanselmann
      # Wait a bit
811 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
813 033a1d00 Michael Hanselmann
      time.sleep(delay)
814 033a1d00 Michael Hanselmann
815 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
816 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
817 033a1d00 Michael Hanselmann

818 033a1d00 Michael Hanselmann
    """
819 033a1d00 Michael Hanselmann
    success = True
820 033a1d00 Michael Hanselmann
821 033a1d00 Michael Hanselmann
    for diskie in self._queue:
822 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
823 033a1d00 Michael Hanselmann
824 033a1d00 Michael Hanselmann
    return success
825 5d97d6dd Michael Hanselmann
826 5d97d6dd Michael Hanselmann
827 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
828 1c3231aa Thomas Thrainer
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829 1c3231aa Thomas Thrainer
               src_cbs, dest_node_uuid, dest_ip):
830 5d97d6dd Michael Hanselmann
    """Initializes this class.
831 5d97d6dd Michael Hanselmann

832 5d97d6dd Michael Hanselmann
    """
833 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
834 5d97d6dd Michael Hanselmann
835 5d97d6dd Michael Hanselmann
    self.lu = lu
836 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
837 5d97d6dd Michael Hanselmann
    self.instance = instance
838 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
839 1c3231aa Thomas Thrainer
    self.src_node_uuid = src_node_uuid
840 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
841 1c3231aa Thomas Thrainer
    self.dest_node_uuid = dest_node_uuid
842 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
843 5d97d6dd Michael Hanselmann
844 5d97d6dd Michael Hanselmann
845 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
846 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
847 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
848 5d97d6dd Michael Hanselmann

849 5d97d6dd Michael Hanselmann
    """
850 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
851 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
852 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
853 5d97d6dd Michael Hanselmann
854 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
855 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
856 5d97d6dd Michael Hanselmann
857 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
858 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
859 1a2e7fe9 Michael Hanselmann

860 1a2e7fe9 Michael Hanselmann
    """
861 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
862 1a2e7fe9 Michael Hanselmann
    if not progress:
863 1a2e7fe9 Michael Hanselmann
      return
864 1a2e7fe9 Michael Hanselmann
865 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866 1a2e7fe9 Michael Hanselmann
867 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
868 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
869 5d97d6dd Michael Hanselmann

870 5d97d6dd Michael Hanselmann
    """
871 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
872 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
873 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
874 5d97d6dd Michael Hanselmann
875 5d97d6dd Michael Hanselmann
    if ie.success:
876 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877 5d97d6dd Michael Hanselmann
    else:
878 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
880 5d97d6dd Michael Hanselmann
881 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
882 5d97d6dd Michael Hanselmann
883 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
884 5d97d6dd Michael Hanselmann
    if cb:
885 5d97d6dd Michael Hanselmann
      cb()
886 5d97d6dd Michael Hanselmann
887 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
889 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
890 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
891 5d97d6dd Michael Hanselmann
892 5d97d6dd Michael Hanselmann
893 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
894 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, dtp, component):
895 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
896 5d97d6dd Michael Hanselmann

897 5d97d6dd Michael Hanselmann
    """
898 5d97d6dd Michael Hanselmann
    assert self.src_cbs
899 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
900 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
901 d51ae04c Michael Hanselmann
    assert dtp.export_opts
902 5d97d6dd Michael Hanselmann
903 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904 5d97d6dd Michael Hanselmann
905 5d97d6dd Michael Hanselmann
    # Start export on source node
906 1c3231aa Thomas Thrainer
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907 5e26c4d9 Iustin Pop
                    self.dest_ip, ie.listen_port, self.instance,
908 5e26c4d9 Iustin Pop
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
910 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
911 5d97d6dd Michael Hanselmann
912 5d97d6dd Michael Hanselmann
    dtp.src_export = de
913 5d97d6dd Michael Hanselmann
914 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
915 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
916 5d97d6dd Michael Hanselmann

917 5d97d6dd Michael Hanselmann
    """
918 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
919 1c3231aa Thomas Thrainer
                     (dtp.data.name,
920 1c3231aa Thomas Thrainer
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921 5d97d6dd Michael Hanselmann
922 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
923 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
924 5d97d6dd Michael Hanselmann

925 5d97d6dd Michael Hanselmann
    """
926 5d97d6dd Michael Hanselmann
    if ie.success:
927 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928 5d97d6dd Michael Hanselmann
    else:
929 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
931 5d97d6dd Michael Hanselmann
932 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
933 5d97d6dd Michael Hanselmann
934 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
936 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
937 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
938 5d97d6dd Michael Hanselmann
939 5d97d6dd Michael Hanselmann
940 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
941 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942 5d97d6dd Michael Hanselmann
               finished_fn):
943 5d97d6dd Michael Hanselmann
    """Initializes this class.
944 5d97d6dd Michael Hanselmann

945 5d97d6dd Michael Hanselmann
    @type name: string
946 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
947 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
948 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
949 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
950 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
951 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
952 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
953 5d97d6dd Michael Hanselmann

954 5d97d6dd Michael Hanselmann
    """
955 5d97d6dd Michael Hanselmann
    self.name = name
956 5d97d6dd Michael Hanselmann
957 5d97d6dd Michael Hanselmann
    self.src_io = src_io
958 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
959 5d97d6dd Michael Hanselmann
960 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
961 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
962 5d97d6dd Michael Hanselmann
963 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
964 5d97d6dd Michael Hanselmann
965 5d97d6dd Michael Hanselmann
966 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
967 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
968 5d97d6dd Michael Hanselmann
    """Initializes this class.
969 5d97d6dd Michael Hanselmann

970 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
971 5d97d6dd Michael Hanselmann
    @type success: bool
972 5d97d6dd Michael Hanselmann

973 5d97d6dd Michael Hanselmann
    """
974 5d97d6dd Michael Hanselmann
    self.data = data
975 d51ae04c Michael Hanselmann
    self.success = success
976 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
977 5d97d6dd Michael Hanselmann
978 5d97d6dd Michael Hanselmann
    self.src_export = None
979 5d97d6dd Michael Hanselmann
    self.dest_import = None
980 5d97d6dd Michael Hanselmann
981 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
982 5d97d6dd Michael Hanselmann
    """Updates the status.
983 5d97d6dd Michael Hanselmann

984 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
985 5d97d6dd Michael Hanselmann

986 5d97d6dd Michael Hanselmann
    """
987 5d97d6dd Michael Hanselmann
    self.success = self.success and success
988 5d97d6dd Michael Hanselmann
989 5d97d6dd Michael Hanselmann
990 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
991 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
992 d51ae04c Michael Hanselmann

993 d51ae04c Michael Hanselmann
  @type base: string
994 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
995 d51ae04c Michael Hanselmann
  @type instance_name: string
996 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
997 d51ae04c Michael Hanselmann
  @type index: number
998 d51ae04c Michael Hanselmann
  @param index: Disk index
999 d51ae04c Michael Hanselmann

1000 d51ae04c Michael Hanselmann
  """
1001 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
1002 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
1003 d51ae04c Michael Hanselmann
  h.update(base)
1004 d51ae04c Michael Hanselmann
  h.update(instance_name)
1005 d51ae04c Michael Hanselmann
  h.update(str(index))
1006 d51ae04c Michael Hanselmann
  return h.hexdigest()
1007 d51ae04c Michael Hanselmann
1008 d51ae04c Michael Hanselmann
1009 1c3231aa Thomas Thrainer
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010 f198cf91 Thomas Thrainer
                         dest_ip, compress, instance, all_transfers):
1011 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1012 5d97d6dd Michael Hanselmann

1013 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1014 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1015 1c3231aa Thomas Thrainer
  @type src_node_uuid: string
1016 1c3231aa Thomas Thrainer
  @param src_node_uuid: Source node UUID
1017 1c3231aa Thomas Thrainer
  @type dest_node_uuid: string
1018 1c3231aa Thomas Thrainer
  @param dest_node_uuid: Destination node UUID
1019 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1020 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1021 f198cf91 Thomas Thrainer
  @type compress: string
1022 f198cf91 Thomas Thrainer
  @param compress: one of L{constants.IEC_ALL}
1023 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1024 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1025 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1026 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1027 5d97d6dd Michael Hanselmann
  @rtype: list
1028 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1029 5d97d6dd Michael Hanselmann
           each transfer
1030 5d97d6dd Michael Hanselmann

1031 5d97d6dd Michael Hanselmann
  """
1032 1c3231aa Thomas Thrainer
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1033 1c3231aa Thomas Thrainer
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1034 1c3231aa Thomas Thrainer
1035 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1036 1c3231aa Thomas Thrainer
                src_node_name, dest_node_name, compress)
1037 a5310c2a Michael Hanselmann
1038 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1039 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1040 1c3231aa Thomas Thrainer
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1041 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1042 1c3231aa Thomas Thrainer
                                 src_node_uuid, src_cbs, dest_node_uuid,
1043 1c3231aa Thomas Thrainer
                                 dest_ip)
1044 5d97d6dd Michael Hanselmann
1045 5d97d6dd Michael Hanselmann
  all_dtp = []
1046 5d97d6dd Michael Hanselmann
1047 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1048 d51ae04c Michael Hanselmann
1049 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1050 5d97d6dd Michael Hanselmann
  try:
1051 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1052 5d97d6dd Michael Hanselmann
      if transfer:
1053 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1054 1c3231aa Thomas Thrainer
                    (transfer.name, src_node_name, dest_node_name))
1055 5d97d6dd Michael Hanselmann
1056 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1057 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1058 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1059 d51ae04c Michael Hanselmann
1060 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1061 5d97d6dd Michael Hanselmann
1062 1c3231aa Thomas Thrainer
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1063 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1064 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1065 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1066 5d97d6dd Michael Hanselmann
1067 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1068 5d97d6dd Michael Hanselmann
      else:
1069 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1070 5d97d6dd Michael Hanselmann
1071 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1072 5d97d6dd Michael Hanselmann
1073 5d97d6dd Michael Hanselmann
    ieloop.Run()
1074 5d97d6dd Michael Hanselmann
  finally:
1075 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1076 5d97d6dd Michael Hanselmann
1077 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1078 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1079 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1080 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1081 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1082 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1083 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1084 5d97d6dd Michael Hanselmann
1085 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1086 387794f8 Michael Hanselmann
1087 387794f8 Michael Hanselmann
1088 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1089 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1090 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1091 4a96f1d1 Michael Hanselmann

1092 4a96f1d1 Michael Hanselmann
    """
1093 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1094 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1095 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1096 4a96f1d1 Michael Hanselmann
1097 4a96f1d1 Michael Hanselmann
  @property
1098 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1099 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1100 4a96f1d1 Michael Hanselmann

1101 4a96f1d1 Michael Hanselmann
    """
1102 4a96f1d1 Michael Hanselmann
    return self._dresults
1103 4a96f1d1 Michael Hanselmann
1104 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1105 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1106 4a96f1d1 Michael Hanselmann

1107 4a96f1d1 Michael Hanselmann
    """
1108 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1109 4a96f1d1 Michael Hanselmann
1110 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1111 4a96f1d1 Michael Hanselmann
1112 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1113 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1114 1a2e7fe9 Michael Hanselmann

1115 1a2e7fe9 Michael Hanselmann
    """
1116 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1117 1a2e7fe9 Michael Hanselmann
1118 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1119 1a2e7fe9 Michael Hanselmann
    if not progress:
1120 1a2e7fe9 Michael Hanselmann
      return
1121 1a2e7fe9 Michael Hanselmann
1122 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1123 1a2e7fe9 Michael Hanselmann
1124 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1125 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1126 4a96f1d1 Michael Hanselmann

1127 4a96f1d1 Michael Hanselmann
    """
1128 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1129 4a96f1d1 Michael Hanselmann
1130 4a96f1d1 Michael Hanselmann
    if ie.success:
1131 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1132 4a96f1d1 Michael Hanselmann
    else:
1133 c9300bb3 Iustin Pop
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1134 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1135 4a96f1d1 Michael Hanselmann
1136 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1137 4a96f1d1 Michael Hanselmann
1138 4a96f1d1 Michael Hanselmann
    if finished_fn:
1139 4a96f1d1 Michael Hanselmann
      finished_fn()
1140 4a96f1d1 Michael Hanselmann
1141 4a96f1d1 Michael Hanselmann
1142 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1143 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1144 387794f8 Michael Hanselmann
    """Initializes this class.
1145 387794f8 Michael Hanselmann

1146 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1147 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1148 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1149 387794f8 Michael Hanselmann
    @param instance: Instance object
1150 387794f8 Michael Hanselmann

1151 387794f8 Michael Hanselmann
    """
1152 387794f8 Michael Hanselmann
    self._lu = lu
1153 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1154 387794f8 Michael Hanselmann
    self._instance = instance
1155 387794f8 Michael Hanselmann
1156 387794f8 Michael Hanselmann
    self._snap_disks = []
1157 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1158 387794f8 Michael Hanselmann
1159 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1160 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1161 387794f8 Michael Hanselmann

1162 387794f8 Michael Hanselmann
    """
1163 387794f8 Michael Hanselmann
    assert not self._snap_disks
1164 387794f8 Michael Hanselmann
1165 387794f8 Michael Hanselmann
    instance = self._instance
1166 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1167 76b920e6 Thomas Thrainer
    src_node_name = self._lu.cfg.GetNodeName(src_node)
1168 387794f8 Michael Hanselmann
1169 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1170 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171 76b920e6 Thomas Thrainer
                        (idx, src_node_name))
1172 387794f8 Michael Hanselmann
1173 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1174 387794f8 Michael Hanselmann
      # passed
1175 62bfbc7d René Nussbaumer
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176 800ac399 Iustin Pop
      new_dev = False
1177 387794f8 Michael Hanselmann
      msg = result.fail_msg
1178 387794f8 Michael Hanselmann
      if msg:
1179 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180 76b920e6 Thomas Thrainer
                            idx, src_node_name, msg)
1181 800ac399 Iustin Pop
      elif (not isinstance(result.payload, (tuple, list)) or
1182 800ac399 Iustin Pop
            len(result.payload) != 2):
1183 800ac399 Iustin Pop
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184 76b920e6 Thomas Thrainer
                            " result '%s'", idx, src_node_name, result.payload)
1185 387794f8 Michael Hanselmann
      else:
1186 800ac399 Iustin Pop
        disk_id = tuple(result.payload)
1187 6da90c0a Helga Velroyen
        disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188 cd3b4ff4 Helga Velroyen
        new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189 a57e502a Thomas Thrainer
                               logical_id=disk_id, iv_name=disk.iv_name,
1190 bc5d0215 Andrea Spadaccini
                               params=disk_params)
1191 387794f8 Michael Hanselmann
1192 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1193 387794f8 Michael Hanselmann
1194 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1195 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1196 387794f8 Michael Hanselmann
1197 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1198 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1199 387794f8 Michael Hanselmann

1200 387794f8 Michael Hanselmann
    @type disk_index: number
1201 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1202 387794f8 Michael Hanselmann

1203 387794f8 Michael Hanselmann
    """
1204 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1205 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1206 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1207 76b920e6 Thomas Thrainer
      src_node_name = self._lu.cfg.GetNodeName(src_node)
1208 387794f8 Michael Hanselmann
1209 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210 76b920e6 Thomas Thrainer
                        (disk_index, src_node_name))
1211 387794f8 Michael Hanselmann
1212 0c3d9c7c Thomas Thrainer
      result = self._lu.rpc.call_blockdev_remove(src_node,
1213 0c3d9c7c Thomas Thrainer
                                                 (disk, self._instance))
1214 387794f8 Michael Hanselmann
      if result.fail_msg:
1215 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1216 76b920e6 Thomas Thrainer
                            " %s: %s", disk_index, src_node_name,
1217 76b920e6 Thomas Thrainer
                            result.fail_msg)
1218 387794f8 Michael Hanselmann
      else:
1219 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1220 387794f8 Michael Hanselmann
1221 896cc964 Thomas Thrainer
  def LocalExport(self, dest_node, compress):
1222 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1223 387794f8 Michael Hanselmann

1224 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1225 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1226 896cc964 Thomas Thrainer
    @type compress: string
1227 896cc964 Thomas Thrainer
    @param compress: one of L{constants.IEC_ALL}
1228 387794f8 Michael Hanselmann

1229 387794f8 Michael Hanselmann
    """
1230 387794f8 Michael Hanselmann
    instance = self._instance
1231 1c3231aa Thomas Thrainer
    src_node_uuid = instance.primary_node
1232 387794f8 Michael Hanselmann
1233 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1234 387794f8 Michael Hanselmann
1235 387794f8 Michael Hanselmann
    transfers = []
1236 387794f8 Michael Hanselmann
1237 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1238 387794f8 Michael Hanselmann
      if not dev:
1239 387794f8 Michael Hanselmann
        transfers.append(None)
1240 387794f8 Michael Hanselmann
        continue
1241 387794f8 Michael Hanselmann
1242 9c492c2d Michael Hanselmann
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1243 a57e502a Thomas Thrainer
                            dev.logical_id[1])
1244 387794f8 Michael Hanselmann
1245 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1246 387794f8 Michael Hanselmann
1247 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1248 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1249 0c3d9c7c Thomas Thrainer
                        constants.IEIO_SCRIPT, ((dev, instance), idx),
1250 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1251 387794f8 Michael Hanselmann
                        finished_fn)
1252 387794f8 Michael Hanselmann
      transfers.append(dt)
1253 387794f8 Michael Hanselmann
1254 387794f8 Michael Hanselmann
    # Actually export data
1255 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1256 1c3231aa Thomas Thrainer
                                    src_node_uuid, dest_node.uuid,
1257 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1258 896cc964 Thomas Thrainer
                                    compress,
1259 387794f8 Michael Hanselmann
                                    instance, transfers)
1260 387794f8 Michael Hanselmann
1261 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1262 387794f8 Michael Hanselmann
1263 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1264 1c3231aa Thomas Thrainer
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1265 387794f8 Michael Hanselmann
                                               self._snap_disks)
1266 387794f8 Michael Hanselmann
    msg = result.fail_msg
1267 387794f8 Michael Hanselmann
    fin_resu = not msg
1268 387794f8 Michael Hanselmann
    if msg:
1269 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1270 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1271 387794f8 Michael Hanselmann
1272 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1273 387794f8 Michael Hanselmann
1274 258de3fe Thomas Thrainer
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, compress, timeouts):
1275 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1276 4a96f1d1 Michael Hanselmann

1277 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1278 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1279 d51ae04c Michael Hanselmann
    @type key_name: string
1280 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1281 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1282 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1283 258de3fe Thomas Thrainer
    @type compress: string
1284 258de3fe Thomas Thrainer
    @param compress: one of L{constants.IEC_ALL}
1285 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1286 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1287 4a96f1d1 Michael Hanselmann

1288 4a96f1d1 Michael Hanselmann
    """
1289 4a96f1d1 Michael Hanselmann
    instance = self._instance
1290 4a96f1d1 Michael Hanselmann
1291 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1292 4a96f1d1 Michael Hanselmann
1293 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1294 4a96f1d1 Michael Hanselmann
1295 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1296 4a96f1d1 Michael Hanselmann
    try:
1297 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1298 d51ae04c Michael Hanselmann
                                                           disk_info)):
1299 ba5619c2 Michael Hanselmann
        # Decide whether to use IPv6
1300 ba5619c2 Michael Hanselmann
        ipv6 = netutils.IP6Address.IsValid(host)
1301 ba5619c2 Michael Hanselmann
1302 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1303 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1304 258de3fe Thomas Thrainer
                                           magic=magic,
1305 258de3fe Thomas Thrainer
                                           compress=compress,
1306 258de3fe Thomas Thrainer
                                           ipv6=ipv6)
1307 d51ae04c Michael Hanselmann
1308 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1309 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1310 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1311 5e26c4d9 Iustin Pop
                              opts, host, port, instance, "disk%d" % idx,
1312 0c3d9c7c Thomas Thrainer
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1313 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1314 4a96f1d1 Michael Hanselmann
1315 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1316 4a96f1d1 Michael Hanselmann
    finally:
1317 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1318 4a96f1d1 Michael Hanselmann
1319 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1320 4a96f1d1 Michael Hanselmann
1321 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1322 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1323 387794f8 Michael Hanselmann

1324 387794f8 Michael Hanselmann
    @type idx: number
1325 387794f8 Michael Hanselmann
    @param idx: Disk index
1326 387794f8 Michael Hanselmann

1327 387794f8 Michael Hanselmann
    """
1328 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1329 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1330 387794f8 Michael Hanselmann
1331 387794f8 Michael Hanselmann
  def Cleanup(self):
1332 387794f8 Michael Hanselmann
    """Remove all snapshots.
1333 387794f8 Michael Hanselmann

1334 387794f8 Michael Hanselmann
    """
1335 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1336 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1337 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1338 1410fa8d Michael Hanselmann
1339 1410fa8d Michael Hanselmann
1340 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1341 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1342 9bf56d77 Michael Hanselmann
               external_address):
1343 9bf56d77 Michael Hanselmann
    """Initializes this class.
1344 9bf56d77 Michael Hanselmann

1345 9bf56d77 Michael Hanselmann
    @type cds: string
1346 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1347 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1348 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1349 9bf56d77 Michael Hanselmann
    @type disk_count: number
1350 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1351 9bf56d77 Michael Hanselmann
    @type external_address: string
1352 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1353 9bf56d77 Michael Hanselmann

1354 9bf56d77 Michael Hanselmann
    """
1355 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1356 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1357 9bf56d77 Michael Hanselmann
    self._cds = cds
1358 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1359 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1360 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1361 9bf56d77 Michael Hanselmann
1362 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1363 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1364 9bf56d77 Michael Hanselmann
1365 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1366 9bf56d77 Michael Hanselmann
1367 9bf56d77 Michael Hanselmann
  @property
1368 9bf56d77 Michael Hanselmann
  def disk_results(self):
1369 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1370 9bf56d77 Michael Hanselmann

1371 9bf56d77 Michael Hanselmann
    """
1372 9bf56d77 Michael Hanselmann
    return self._dresults
1373 9bf56d77 Michael Hanselmann
1374 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1375 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1376 9bf56d77 Michael Hanselmann

1377 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1378 9bf56d77 Michael Hanselmann

1379 9bf56d77 Michael Hanselmann
    """
1380 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1381 9bf56d77 Michael Hanselmann
      return
1382 9bf56d77 Michael Hanselmann
1383 9bf56d77 Michael Hanselmann
    host = self._external_address
1384 9bf56d77 Michael Hanselmann
1385 9bf56d77 Michael Hanselmann
    disks = []
1386 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1387 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1388 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1389 9bf56d77 Michael Hanselmann
1390 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1391 9bf56d77 Michael Hanselmann
1392 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1393 9bf56d77 Michael Hanselmann
      "disks": disks,
1394 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1395 9bf56d77 Michael Hanselmann
      })
1396 9bf56d77 Michael Hanselmann
1397 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, _):
1398 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1399 9bf56d77 Michael Hanselmann

1400 9bf56d77 Michael Hanselmann
    """
1401 9bf56d77 Michael Hanselmann
    (idx, ) = private
1402 9bf56d77 Michael Hanselmann
1403 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1404 9bf56d77 Michael Hanselmann
1405 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1406 9bf56d77 Michael Hanselmann
1407 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1408 9bf56d77 Michael Hanselmann
1409 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1410 9bf56d77 Michael Hanselmann
1411 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1412 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1413 9bf56d77 Michael Hanselmann

1414 9bf56d77 Michael Hanselmann
    """
1415 9bf56d77 Michael Hanselmann
    (idx, ) = private
1416 9bf56d77 Michael Hanselmann
1417 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1418 9bf56d77 Michael Hanselmann
1419 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1420 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1421 9bf56d77 Michael Hanselmann

1422 9bf56d77 Michael Hanselmann
    """
1423 9bf56d77 Michael Hanselmann
    (idx, ) = private
1424 9bf56d77 Michael Hanselmann
1425 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1426 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1427 9bf56d77 Michael Hanselmann
1428 9bf56d77 Michael Hanselmann
    if ie.success:
1429 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1430 9bf56d77 Michael Hanselmann
    else:
1431 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1432 c9300bb3 Iustin Pop
                         " (recent output: %s)") %
1433 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1434 9bf56d77 Michael Hanselmann
1435 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1436 9bf56d77 Michael Hanselmann
1437 9bf56d77 Michael Hanselmann
1438 ba5619c2 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1439 88acff3f Thomas Thrainer
                 cds, compress, timeouts):
1440 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1441 9bf56d77 Michael Hanselmann

1442 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1443 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1444 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1445 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1446 ba5619c2 Michael Hanselmann
  @type pnode: L{objects.Node}
1447 ba5619c2 Michael Hanselmann
  @param pnode: Primary node of instance as an object
1448 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1449 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1450 9bf56d77 Michael Hanselmann
  @type cds: string
1451 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1452 88acff3f Thomas Thrainer
  @type compress: string
1453 88acff3f Thomas Thrainer
  @param compress: one of L{constants.IEC_ALL}
1454 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1455 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1456 9bf56d77 Michael Hanselmann

1457 9bf56d77 Michael Hanselmann
  """
1458 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1460 9bf56d77 Michael Hanselmann
1461 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1462 d51ae04c Michael Hanselmann
1463 ba5619c2 Michael Hanselmann
  # Decide whether to use IPv6
1464 ba5619c2 Michael Hanselmann
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1465 ba5619c2 Michael Hanselmann
1466 9bf56d77 Michael Hanselmann
  # Create crypto key
1467 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1468 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1469 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1470 9bf56d77 Michael Hanselmann
1471 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1472 9bf56d77 Michael Hanselmann
  try:
1473 9bf56d77 Michael Hanselmann
    # Load certificate
1474 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1475 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1476 9bf56d77 Michael Hanselmann
1477 9bf56d77 Michael Hanselmann
    # Sign certificate
1478 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1479 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1480 9bf56d77 Michael Hanselmann
1481 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1482 ba5619c2 Michael Hanselmann
                          len(instance.disks), pnode.primary_ip)
1483 9bf56d77 Michael Hanselmann
1484 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1485 9bf56d77 Michael Hanselmann
    try:
1486 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1487 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1488 d51ae04c Michael Hanselmann
1489 d51ae04c Michael Hanselmann
        # Import daemon options
1490 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1491 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1492 88acff3f Thomas Thrainer
                                           magic=magic,
1493 88acff3f Thomas Thrainer
                                           compress=compress,
1494 88acff3f Thomas Thrainer
                                           ipv6=ipv6)
1495 d51ae04c Michael Hanselmann
1496 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1497 5e26c4d9 Iustin Pop
                              "disk%d" % idx,
1498 0c3d9c7c Thomas Thrainer
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1499 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1500 9bf56d77 Michael Hanselmann
1501 9bf56d77 Michael Hanselmann
      ieloop.Run()
1502 9bf56d77 Michael Hanselmann
    finally:
1503 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1504 9bf56d77 Michael Hanselmann
  finally:
1505 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1506 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1507 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1508 9bf56d77 Michael Hanselmann
1509 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1510 9bf56d77 Michael Hanselmann
1511 9bf56d77 Michael Hanselmann
1512 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1513 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1514 1410fa8d Michael Hanselmann

1515 1410fa8d Michael Hanselmann
  @type version: number
1516 1410fa8d Michael Hanselmann

1517 1410fa8d Michael Hanselmann
  """
1518 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1519 1410fa8d Michael Hanselmann
1520 1410fa8d Michael Hanselmann
1521 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1522 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1523 1410fa8d Michael Hanselmann

1524 1410fa8d Michael Hanselmann
  @type cds: string
1525 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1526 1410fa8d Michael Hanselmann

1527 1410fa8d Michael Hanselmann
  """
1528 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1529 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1530 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1531 1410fa8d Michael Hanselmann
1532 1410fa8d Michael Hanselmann
1533 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1534 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1535 1410fa8d Michael Hanselmann

1536 1410fa8d Michael Hanselmann
  @type cds: string
1537 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1538 1410fa8d Michael Hanselmann
  @type handshake: sequence
1539 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1540 1410fa8d Michael Hanselmann

1541 1410fa8d Michael Hanselmann
  """
1542 1410fa8d Michael Hanselmann
  try:
1543 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1544 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1545 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1546 1410fa8d Michael Hanselmann
1547 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1548 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1549 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1550 1410fa8d Michael Hanselmann
1551 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1552 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1553 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1554 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1555 1410fa8d Michael Hanselmann
1556 1410fa8d Michael Hanselmann
  return None
1557 4a96f1d1 Michael Hanselmann
1558 4a96f1d1 Michael Hanselmann
1559 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1560 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1561 4a96f1d1 Michael Hanselmann

1562 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1563 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1564 4a96f1d1 Michael Hanselmann
  @type host: string
1565 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1566 4a96f1d1 Michael Hanselmann
  @type port: number
1567 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1568 d51ae04c Michael Hanselmann
  @type magic: string
1569 d51ae04c Michael Hanselmann
  @param magic: Magic value
1570 4a96f1d1 Michael Hanselmann

1571 4a96f1d1 Michael Hanselmann
  """
1572 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1573 4a96f1d1 Michael Hanselmann
1574 4a96f1d1 Michael Hanselmann
1575 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1576 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1577 4a96f1d1 Michael Hanselmann

1578 4a96f1d1 Michael Hanselmann
  @type cds: string
1579 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1580 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1581 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1582 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1583 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1584 4a96f1d1 Michael Hanselmann

1585 4a96f1d1 Michael Hanselmann
  """
1586 4a96f1d1 Michael Hanselmann
  try:
1587 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1588 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1589 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1590 4a96f1d1 Michael Hanselmann
1591 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1592 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1593 4a96f1d1 Michael Hanselmann
1594 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1595 4a96f1d1 Michael Hanselmann
1596 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1597 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1598 4a96f1d1 Michael Hanselmann
1599 ba5619c2 Michael Hanselmann
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1600 ba5619c2 Michael Hanselmann
    destination = host
1601 ba5619c2 Michael Hanselmann
  else:
1602 ba5619c2 Michael Hanselmann
    destination = netutils.Hostname.GetNormalizedName(host)
1603 ba5619c2 Michael Hanselmann
1604 ba5619c2 Michael Hanselmann
  return (destination,
1605 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1606 d51ae04c Michael Hanselmann
          magic)
1607 4a96f1d1 Michael Hanselmann
1608 4a96f1d1 Michael Hanselmann
1609 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1610 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1611 4a96f1d1 Michael Hanselmann

1612 4a96f1d1 Michael Hanselmann
  @type cds: string
1613 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1614 4a96f1d1 Michael Hanselmann
  @type salt: string
1615 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1616 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1617 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1618 4a96f1d1 Michael Hanselmann
  @type host: string
1619 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1620 4a96f1d1 Michael Hanselmann
  @type port: number
1621 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1622 d51ae04c Michael Hanselmann
  @type magic: string
1623 d51ae04c Michael Hanselmann
  @param magic: Magic value
1624 4a96f1d1 Michael Hanselmann

1625 4a96f1d1 Michael Hanselmann
  """
1626 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1627 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1628 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)
1629 0c77c331 René Nussbaumer
1630 0c77c331 René Nussbaumer
1631 0c77c331 René Nussbaumer
def CalculateGroupIPolicy(cluster, group):
1632 0c77c331 René Nussbaumer
  """Calculate instance policy for group.
1633 0c77c331 René Nussbaumer

1634 0c77c331 René Nussbaumer
  """
1635 0c77c331 René Nussbaumer
  return cluster.SimpleFillIPolicy(group.ipolicy)
1636 0c77c331 René Nussbaumer
1637 0c77c331 René Nussbaumer
1638 0c77c331 René Nussbaumer
def ComputeDiskSize(disk_template, disks):
1639 0c77c331 René Nussbaumer
  """Compute disk size requirements according to disk template
1640 0c77c331 René Nussbaumer

1641 0c77c331 René Nussbaumer
  """
1642 0c77c331 René Nussbaumer
  # Required free disk space as a function of disk and swap space
1643 0c77c331 René Nussbaumer
  req_size_dict = {
1644 9e946416 Klaus Aehlig
    constants.DT_DISKLESS: 0,
1645 0c77c331 René Nussbaumer
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1646 0c77c331 René Nussbaumer
    # 128 MB are added for drbd metadata for each disk
1647 0c77c331 René Nussbaumer
    constants.DT_DRBD8:
1648 0c77c331 René Nussbaumer
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1649 0c77c331 René Nussbaumer
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1650 0c77c331 René Nussbaumer
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1651 8106dd64 Santi Raffa
    constants.DT_GLUSTER: sum(d[constants.IDISK_SIZE] for d in disks),
1652 0c77c331 René Nussbaumer
    constants.DT_BLOCK: 0,
1653 0c77c331 René Nussbaumer
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1654 376631d1 Constantinos Venetsanopoulos
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1655 0c77c331 René Nussbaumer
  }
1656 0c77c331 René Nussbaumer
1657 0c77c331 René Nussbaumer
  if disk_template not in req_size_dict:
1658 0c77c331 René Nussbaumer
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1659 0c77c331 René Nussbaumer
                                 " is unknown" % disk_template)
1660 0c77c331 René Nussbaumer
1661 0c77c331 René Nussbaumer
  return req_size_dict[disk_template]