Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ f7f03738

History | View | Annotate | Download (46 kB)

1 033a1d00 Michael Hanselmann
#
2 033a1d00 Michael Hanselmann
#
3 033a1d00 Michael Hanselmann
4 c9300bb3 Iustin Pop
# Copyright (C) 2010, 2011 Google Inc.
5 033a1d00 Michael Hanselmann
#
6 033a1d00 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 033a1d00 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 033a1d00 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 033a1d00 Michael Hanselmann
# (at your option) any later version.
10 033a1d00 Michael Hanselmann
#
11 033a1d00 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 033a1d00 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 033a1d00 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 033a1d00 Michael Hanselmann
# General Public License for more details.
15 033a1d00 Michael Hanselmann
#
16 033a1d00 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 033a1d00 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 033a1d00 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 033a1d00 Michael Hanselmann
# 02110-1301, USA.
20 033a1d00 Michael Hanselmann
21 033a1d00 Michael Hanselmann
22 033a1d00 Michael Hanselmann
"""Instance-related functions and classes for masterd.
23 033a1d00 Michael Hanselmann

24 033a1d00 Michael Hanselmann
"""
25 033a1d00 Michael Hanselmann
26 033a1d00 Michael Hanselmann
import logging
27 033a1d00 Michael Hanselmann
import time
28 4a96f1d1 Michael Hanselmann
import OpenSSL
29 033a1d00 Michael Hanselmann
30 033a1d00 Michael Hanselmann
from ganeti import constants
31 033a1d00 Michael Hanselmann
from ganeti import errors
32 033a1d00 Michael Hanselmann
from ganeti import compat
33 387794f8 Michael Hanselmann
from ganeti import utils
34 387794f8 Michael Hanselmann
from ganeti import objects
35 a744b676 Manuel Franceschini
from ganeti import netutils
36 9c492c2d Michael Hanselmann
from ganeti import pathutils
37 033a1d00 Michael Hanselmann
38 033a1d00 Michael Hanselmann
39 033a1d00 Michael Hanselmann
class _ImportExportError(Exception):
40 033a1d00 Michael Hanselmann
  """Local exception to report import/export errors.
41 033a1d00 Michael Hanselmann

42 033a1d00 Michael Hanselmann
  """
43 033a1d00 Michael Hanselmann
44 033a1d00 Michael Hanselmann
45 033a1d00 Michael Hanselmann
class ImportExportTimeouts(object):
46 033a1d00 Michael Hanselmann
  #: Time until daemon starts writing status file
47 033a1d00 Michael Hanselmann
  DEFAULT_READY_TIMEOUT = 10
48 033a1d00 Michael Hanselmann
49 033a1d00 Michael Hanselmann
  #: Length of time until errors cause hard failure
50 033a1d00 Michael Hanselmann
  DEFAULT_ERROR_TIMEOUT = 10
51 033a1d00 Michael Hanselmann
52 033a1d00 Michael Hanselmann
  #: Time after which daemon must be listening
53 033a1d00 Michael Hanselmann
  DEFAULT_LISTEN_TIMEOUT = 10
54 033a1d00 Michael Hanselmann
55 1a2e7fe9 Michael Hanselmann
  #: Progress update interval
56 1a2e7fe9 Michael Hanselmann
  DEFAULT_PROGRESS_INTERVAL = 60
57 1a2e7fe9 Michael Hanselmann
58 033a1d00 Michael Hanselmann
  __slots__ = [
59 033a1d00 Michael Hanselmann
    "error",
60 033a1d00 Michael Hanselmann
    "ready",
61 033a1d00 Michael Hanselmann
    "listen",
62 033a1d00 Michael Hanselmann
    "connect",
63 1a2e7fe9 Michael Hanselmann
    "progress",
64 033a1d00 Michael Hanselmann
    ]
65 033a1d00 Michael Hanselmann
66 033a1d00 Michael Hanselmann
  def __init__(self, connect,
67 033a1d00 Michael Hanselmann
               listen=DEFAULT_LISTEN_TIMEOUT,
68 033a1d00 Michael Hanselmann
               error=DEFAULT_ERROR_TIMEOUT,
69 1a2e7fe9 Michael Hanselmann
               ready=DEFAULT_READY_TIMEOUT,
70 1a2e7fe9 Michael Hanselmann
               progress=DEFAULT_PROGRESS_INTERVAL):
71 033a1d00 Michael Hanselmann
    """Initializes this class.
72 033a1d00 Michael Hanselmann

73 033a1d00 Michael Hanselmann
    @type connect: number
74 033a1d00 Michael Hanselmann
    @param connect: Timeout for establishing connection
75 033a1d00 Michael Hanselmann
    @type listen: number
76 033a1d00 Michael Hanselmann
    @param listen: Timeout for starting to listen for connections
77 033a1d00 Michael Hanselmann
    @type error: number
78 033a1d00 Michael Hanselmann
    @param error: Length of time until errors cause hard failure
79 033a1d00 Michael Hanselmann
    @type ready: number
80 033a1d00 Michael Hanselmann
    @param ready: Timeout for daemon to become ready
81 1a2e7fe9 Michael Hanselmann
    @type progress: number
82 1a2e7fe9 Michael Hanselmann
    @param progress: Progress update interval
83 033a1d00 Michael Hanselmann

84 033a1d00 Michael Hanselmann
    """
85 033a1d00 Michael Hanselmann
    self.error = error
86 033a1d00 Michael Hanselmann
    self.ready = ready
87 033a1d00 Michael Hanselmann
    self.listen = listen
88 033a1d00 Michael Hanselmann
    self.connect = connect
89 1a2e7fe9 Michael Hanselmann
    self.progress = progress
90 033a1d00 Michael Hanselmann
91 033a1d00 Michael Hanselmann
92 033a1d00 Michael Hanselmann
class ImportExportCbBase(object):
93 033a1d00 Michael Hanselmann
  """Callbacks for disk import/export.
94 033a1d00 Michael Hanselmann

95 033a1d00 Michael Hanselmann
  """
96 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, component):
97 033a1d00 Michael Hanselmann
    """Called when daemon started listening.
98 033a1d00 Michael Hanselmann

99 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
100 033a1d00 Michael Hanselmann
    @param ie: Import/export object
101 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
102 5e26c4d9 Iustin Pop
    @param component: transfer component name
103 033a1d00 Michael Hanselmann

104 033a1d00 Michael Hanselmann
    """
105 033a1d00 Michael Hanselmann
106 033a1d00 Michael Hanselmann
  def ReportConnected(self, ie, private):
107 033a1d00 Michael Hanselmann
    """Called when a connection has been established.
108 033a1d00 Michael Hanselmann

109 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
110 033a1d00 Michael Hanselmann
    @param ie: Import/export object
111 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
112 033a1d00 Michael Hanselmann

113 033a1d00 Michael Hanselmann
    """
114 033a1d00 Michael Hanselmann
115 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
116 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
117 1a2e7fe9 Michael Hanselmann

118 1a2e7fe9 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
119 1a2e7fe9 Michael Hanselmann
    @param ie: Import/export object
120 1a2e7fe9 Michael Hanselmann
    @param private: Private data passed to import/export object
121 1a2e7fe9 Michael Hanselmann

122 1a2e7fe9 Michael Hanselmann
    """
123 1a2e7fe9 Michael Hanselmann
124 033a1d00 Michael Hanselmann
  def ReportFinished(self, ie, private):
125 033a1d00 Michael Hanselmann
    """Called when a transfer has finished.
126 033a1d00 Michael Hanselmann

127 033a1d00 Michael Hanselmann
    @type ie: Subclass of L{_DiskImportExportBase}
128 033a1d00 Michael Hanselmann
    @param ie: Import/export object
129 033a1d00 Michael Hanselmann
    @param private: Private data passed to import/export object
130 033a1d00 Michael Hanselmann

131 033a1d00 Michael Hanselmann
    """
132 033a1d00 Michael Hanselmann
133 033a1d00 Michael Hanselmann
134 033a1d00 Michael Hanselmann
class _DiskImportExportBase(object):
135 033a1d00 Michael Hanselmann
  MODE_TEXT = None
136 033a1d00 Michael Hanselmann
137 eb630f50 Michael Hanselmann
  def __init__(self, lu, node_name, opts,
138 5e26c4d9 Iustin Pop
               instance, component, timeouts, cbs, private=None):
139 033a1d00 Michael Hanselmann
    """Initializes this class.
140 033a1d00 Michael Hanselmann

141 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
142 033a1d00 Michael Hanselmann
    @type node_name: string
143 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
144 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
145 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
146 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
147 033a1d00 Michael Hanselmann
    @param instance: Instance object
148 5e26c4d9 Iustin Pop
    @type component: string
149 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
150 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
151 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
152 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
153 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
154 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
155 033a1d00 Michael Hanselmann

156 033a1d00 Michael Hanselmann
    """
157 033a1d00 Michael Hanselmann
    assert self.MODE_TEXT
158 033a1d00 Michael Hanselmann
159 033a1d00 Michael Hanselmann
    self._lu = lu
160 033a1d00 Michael Hanselmann
    self.node_name = node_name
161 4478301b Michael Hanselmann
    self._opts = opts.Copy()
162 033a1d00 Michael Hanselmann
    self._instance = instance
163 5e26c4d9 Iustin Pop
    self._component = component
164 033a1d00 Michael Hanselmann
    self._timeouts = timeouts
165 033a1d00 Michael Hanselmann
    self._cbs = cbs
166 033a1d00 Michael Hanselmann
    self._private = private
167 033a1d00 Michael Hanselmann
168 4478301b Michael Hanselmann
    # Set master daemon's timeout in options for import/export daemon
169 4478301b Michael Hanselmann
    assert self._opts.connect_timeout is None
170 4478301b Michael Hanselmann
    self._opts.connect_timeout = timeouts.connect
171 4478301b Michael Hanselmann
172 033a1d00 Michael Hanselmann
    # Parent loop
173 033a1d00 Michael Hanselmann
    self._loop = None
174 033a1d00 Michael Hanselmann
175 033a1d00 Michael Hanselmann
    # Timestamps
176 033a1d00 Michael Hanselmann
    self._ts_begin = None
177 033a1d00 Michael Hanselmann
    self._ts_connected = None
178 033a1d00 Michael Hanselmann
    self._ts_finished = None
179 033a1d00 Michael Hanselmann
    self._ts_cleanup = None
180 1a2e7fe9 Michael Hanselmann
    self._ts_last_progress = None
181 033a1d00 Michael Hanselmann
    self._ts_last_error = None
182 033a1d00 Michael Hanselmann
183 033a1d00 Michael Hanselmann
    # Transfer status
184 033a1d00 Michael Hanselmann
    self.success = None
185 033a1d00 Michael Hanselmann
    self.final_message = None
186 033a1d00 Michael Hanselmann
187 033a1d00 Michael Hanselmann
    # Daemon status
188 033a1d00 Michael Hanselmann
    self._daemon_name = None
189 033a1d00 Michael Hanselmann
    self._daemon = None
190 033a1d00 Michael Hanselmann
191 033a1d00 Michael Hanselmann
  @property
192 033a1d00 Michael Hanselmann
  def recent_output(self):
193 033a1d00 Michael Hanselmann
    """Returns the most recent output from the daemon.
194 033a1d00 Michael Hanselmann

195 033a1d00 Michael Hanselmann
    """
196 033a1d00 Michael Hanselmann
    if self._daemon:
197 c9300bb3 Iustin Pop
      return "\n".join(self._daemon.recent_output)
198 033a1d00 Michael Hanselmann
199 033a1d00 Michael Hanselmann
    return None
200 033a1d00 Michael Hanselmann
201 033a1d00 Michael Hanselmann
  @property
202 1a2e7fe9 Michael Hanselmann
  def progress(self):
203 1a2e7fe9 Michael Hanselmann
    """Returns transfer progress information.
204 1a2e7fe9 Michael Hanselmann

205 1a2e7fe9 Michael Hanselmann
    """
206 1a2e7fe9 Michael Hanselmann
    if not self._daemon:
207 1a2e7fe9 Michael Hanselmann
      return None
208 1a2e7fe9 Michael Hanselmann
209 1a2e7fe9 Michael Hanselmann
    return (self._daemon.progress_mbytes,
210 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_throughput,
211 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_percent,
212 1a2e7fe9 Michael Hanselmann
            self._daemon.progress_eta)
213 1a2e7fe9 Michael Hanselmann
214 1a2e7fe9 Michael Hanselmann
  @property
215 af1d39b1 Michael Hanselmann
  def magic(self):
216 af1d39b1 Michael Hanselmann
    """Returns the magic value for this import/export.
217 af1d39b1 Michael Hanselmann

218 af1d39b1 Michael Hanselmann
    """
219 af1d39b1 Michael Hanselmann
    return self._opts.magic
220 af1d39b1 Michael Hanselmann
221 af1d39b1 Michael Hanselmann
  @property
222 033a1d00 Michael Hanselmann
  def active(self):
223 033a1d00 Michael Hanselmann
    """Determines whether this transport is still active.
224 033a1d00 Michael Hanselmann

225 033a1d00 Michael Hanselmann
    """
226 033a1d00 Michael Hanselmann
    return self.success is None
227 033a1d00 Michael Hanselmann
228 033a1d00 Michael Hanselmann
  @property
229 033a1d00 Michael Hanselmann
  def loop(self):
230 033a1d00 Michael Hanselmann
    """Returns parent loop.
231 033a1d00 Michael Hanselmann

232 033a1d00 Michael Hanselmann
    @rtype: L{ImportExportLoop}
233 033a1d00 Michael Hanselmann

234 033a1d00 Michael Hanselmann
    """
235 033a1d00 Michael Hanselmann
    return self._loop
236 033a1d00 Michael Hanselmann
237 033a1d00 Michael Hanselmann
  def SetLoop(self, loop):
238 033a1d00 Michael Hanselmann
    """Sets the parent loop.
239 033a1d00 Michael Hanselmann

240 033a1d00 Michael Hanselmann
    @type loop: L{ImportExportLoop}
241 033a1d00 Michael Hanselmann

242 033a1d00 Michael Hanselmann
    """
243 033a1d00 Michael Hanselmann
    if self._loop:
244 033a1d00 Michael Hanselmann
      raise errors.ProgrammerError("Loop can only be set once")
245 033a1d00 Michael Hanselmann
246 033a1d00 Michael Hanselmann
    self._loop = loop
247 033a1d00 Michael Hanselmann
248 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
249 033a1d00 Michael Hanselmann
    """Starts the import/export daemon.
250 033a1d00 Michael Hanselmann

251 033a1d00 Michael Hanselmann
    """
252 033a1d00 Michael Hanselmann
    raise NotImplementedError()
253 033a1d00 Michael Hanselmann
254 033a1d00 Michael Hanselmann
  def CheckDaemon(self):
255 033a1d00 Michael Hanselmann
    """Checks whether daemon has been started and if not, starts it.
256 033a1d00 Michael Hanselmann

257 033a1d00 Michael Hanselmann
    @rtype: string
258 033a1d00 Michael Hanselmann
    @return: Daemon name
259 033a1d00 Michael Hanselmann

260 033a1d00 Michael Hanselmann
    """
261 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
262 033a1d00 Michael Hanselmann
263 033a1d00 Michael Hanselmann
    if self._daemon_name is None:
264 033a1d00 Michael Hanselmann
      assert self._ts_begin is None
265 033a1d00 Michael Hanselmann
266 033a1d00 Michael Hanselmann
      result = self._StartDaemon()
267 033a1d00 Michael Hanselmann
      if result.fail_msg:
268 033a1d00 Michael Hanselmann
        raise _ImportExportError("Failed to start %s on %s: %s" %
269 033a1d00 Michael Hanselmann
                                 (self.MODE_TEXT, self.node_name,
270 033a1d00 Michael Hanselmann
                                  result.fail_msg))
271 033a1d00 Michael Hanselmann
272 033a1d00 Michael Hanselmann
      daemon_name = result.payload
273 033a1d00 Michael Hanselmann
274 194e8648 Iustin Pop
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
275 033a1d00 Michael Hanselmann
                   self.node_name)
276 033a1d00 Michael Hanselmann
277 033a1d00 Michael Hanselmann
      self._ts_begin = time.time()
278 033a1d00 Michael Hanselmann
      self._daemon_name = daemon_name
279 033a1d00 Michael Hanselmann
280 033a1d00 Michael Hanselmann
    return self._daemon_name
281 033a1d00 Michael Hanselmann
282 033a1d00 Michael Hanselmann
  def GetDaemonName(self):
283 033a1d00 Michael Hanselmann
    """Returns the daemon name.
284 033a1d00 Michael Hanselmann

285 033a1d00 Michael Hanselmann
    """
286 033a1d00 Michael Hanselmann
    assert self._daemon_name, "Daemon has not been started"
287 033a1d00 Michael Hanselmann
    assert self._ts_cleanup is None
288 033a1d00 Michael Hanselmann
    return self._daemon_name
289 033a1d00 Michael Hanselmann
290 033a1d00 Michael Hanselmann
  def Abort(self):
291 033a1d00 Michael Hanselmann
    """Sends SIGTERM to import/export daemon (if still active).
292 033a1d00 Michael Hanselmann

293 033a1d00 Michael Hanselmann
    """
294 033a1d00 Michael Hanselmann
    if self._daemon_name:
295 194e8648 Iustin Pop
      self._lu.LogWarning("Aborting %s '%s' on %s",
296 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name)
297 033a1d00 Michael Hanselmann
      result = self._lu.rpc.call_impexp_abort(self.node_name, self._daemon_name)
298 033a1d00 Michael Hanselmann
      if result.fail_msg:
299 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
300 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
301 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
302 033a1d00 Michael Hanselmann
        return False
303 033a1d00 Michael Hanselmann
304 033a1d00 Michael Hanselmann
    return True
305 033a1d00 Michael Hanselmann
306 033a1d00 Michael Hanselmann
  def _SetDaemonData(self, data):
307 033a1d00 Michael Hanselmann
    """Internal function for updating status daemon data.
308 033a1d00 Michael Hanselmann

309 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
310 033a1d00 Michael Hanselmann
    @param data: Daemon status data
311 033a1d00 Michael Hanselmann

312 033a1d00 Michael Hanselmann
    """
313 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
314 033a1d00 Michael Hanselmann
315 033a1d00 Michael Hanselmann
    if not data:
316 f8326fca Andrea Spadaccini
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
317 033a1d00 Michael Hanselmann
        raise _ImportExportError("Didn't become ready after %s seconds" %
318 033a1d00 Michael Hanselmann
                                 self._timeouts.ready)
319 033a1d00 Michael Hanselmann
320 033a1d00 Michael Hanselmann
      return False
321 033a1d00 Michael Hanselmann
322 033a1d00 Michael Hanselmann
    self._daemon = data
323 033a1d00 Michael Hanselmann
324 033a1d00 Michael Hanselmann
    return True
325 033a1d00 Michael Hanselmann
326 033a1d00 Michael Hanselmann
  def SetDaemonData(self, success, data):
327 033a1d00 Michael Hanselmann
    """Updates daemon status data.
328 033a1d00 Michael Hanselmann

329 033a1d00 Michael Hanselmann
    @type success: bool
330 033a1d00 Michael Hanselmann
    @param success: Whether fetching data was successful or not
331 033a1d00 Michael Hanselmann
    @type data: L{objects.ImportExportStatus}
332 033a1d00 Michael Hanselmann
    @param data: Daemon status data
333 033a1d00 Michael Hanselmann

334 033a1d00 Michael Hanselmann
    """
335 033a1d00 Michael Hanselmann
    if not success:
336 033a1d00 Michael Hanselmann
      if self._ts_last_error is None:
337 033a1d00 Michael Hanselmann
        self._ts_last_error = time.time()
338 033a1d00 Michael Hanselmann
339 f8326fca Andrea Spadaccini
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
340 033a1d00 Michael Hanselmann
        raise _ImportExportError("Too many errors while updating data")
341 033a1d00 Michael Hanselmann
342 033a1d00 Michael Hanselmann
      return False
343 033a1d00 Michael Hanselmann
344 033a1d00 Michael Hanselmann
    self._ts_last_error = None
345 033a1d00 Michael Hanselmann
346 033a1d00 Michael Hanselmann
    return self._SetDaemonData(data)
347 033a1d00 Michael Hanselmann
348 033a1d00 Michael Hanselmann
  def CheckListening(self):
349 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
350 033a1d00 Michael Hanselmann

351 033a1d00 Michael Hanselmann
    """
352 033a1d00 Michael Hanselmann
    raise NotImplementedError()
353 033a1d00 Michael Hanselmann
354 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
355 033a1d00 Michael Hanselmann
    """Returns timeout to calculate connect timeout.
356 033a1d00 Michael Hanselmann

357 033a1d00 Michael Hanselmann
    """
358 033a1d00 Michael Hanselmann
    raise NotImplementedError()
359 033a1d00 Michael Hanselmann
360 033a1d00 Michael Hanselmann
  def CheckConnected(self):
361 033a1d00 Michael Hanselmann
    """Checks whether the daemon is connected.
362 033a1d00 Michael Hanselmann

363 033a1d00 Michael Hanselmann
    @rtype: bool
364 033a1d00 Michael Hanselmann
    @return: Whether the daemon is connected
365 033a1d00 Michael Hanselmann

366 033a1d00 Michael Hanselmann
    """
367 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
368 033a1d00 Michael Hanselmann
369 033a1d00 Michael Hanselmann
    if self._ts_connected is not None:
370 033a1d00 Michael Hanselmann
      return True
371 033a1d00 Michael Hanselmann
372 033a1d00 Michael Hanselmann
    if self._daemon.connected:
373 033a1d00 Michael Hanselmann
      self._ts_connected = time.time()
374 033a1d00 Michael Hanselmann
375 033a1d00 Michael Hanselmann
      # TODO: Log remote peer
376 194e8648 Iustin Pop
      logging.debug("%s '%s' on %s is now connected",
377 033a1d00 Michael Hanselmann
                    self.MODE_TEXT, self._daemon_name, self.node_name)
378 033a1d00 Michael Hanselmann
379 033a1d00 Michael Hanselmann
      self._cbs.ReportConnected(self, self._private)
380 033a1d00 Michael Hanselmann
381 033a1d00 Michael Hanselmann
      return True
382 033a1d00 Michael Hanselmann
383 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
384 f8326fca Andrea Spadaccini
                            self._timeouts.connect):
385 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not connected after %s seconds" %
386 033a1d00 Michael Hanselmann
                               self._timeouts.connect)
387 033a1d00 Michael Hanselmann
388 033a1d00 Michael Hanselmann
    return False
389 033a1d00 Michael Hanselmann
390 1a2e7fe9 Michael Hanselmann
  def _CheckProgress(self):
391 1a2e7fe9 Michael Hanselmann
    """Checks whether a progress update should be reported.
392 1a2e7fe9 Michael Hanselmann

393 1a2e7fe9 Michael Hanselmann
    """
394 1a2e7fe9 Michael Hanselmann
    if ((self._ts_last_progress is None or
395 f8326fca Andrea Spadaccini
        utils.TimeoutExpired(self._ts_last_progress,
396 f8326fca Andrea Spadaccini
                             self._timeouts.progress)) and
397 1a2e7fe9 Michael Hanselmann
        self._daemon and
398 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_mbytes is not None and
399 1a2e7fe9 Michael Hanselmann
        self._daemon.progress_throughput is not None):
400 1a2e7fe9 Michael Hanselmann
      self._cbs.ReportProgress(self, self._private)
401 1a2e7fe9 Michael Hanselmann
      self._ts_last_progress = time.time()
402 1a2e7fe9 Michael Hanselmann
403 033a1d00 Michael Hanselmann
  def CheckFinished(self):
404 033a1d00 Michael Hanselmann
    """Checks whether the daemon exited.
405 033a1d00 Michael Hanselmann

406 033a1d00 Michael Hanselmann
    @rtype: bool
407 033a1d00 Michael Hanselmann
    @return: Whether the transfer is finished
408 033a1d00 Michael Hanselmann

409 033a1d00 Michael Hanselmann
    """
410 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
411 033a1d00 Michael Hanselmann
412 033a1d00 Michael Hanselmann
    if self._ts_finished:
413 033a1d00 Michael Hanselmann
      return True
414 033a1d00 Michael Hanselmann
415 033a1d00 Michael Hanselmann
    if self._daemon.exit_status is None:
416 1a2e7fe9 Michael Hanselmann
      # TODO: Adjust delay for ETA expiring soon
417 1a2e7fe9 Michael Hanselmann
      self._CheckProgress()
418 033a1d00 Michael Hanselmann
      return False
419 033a1d00 Michael Hanselmann
420 033a1d00 Michael Hanselmann
    self._ts_finished = time.time()
421 033a1d00 Michael Hanselmann
422 033a1d00 Michael Hanselmann
    self._ReportFinished(self._daemon.exit_status == 0,
423 033a1d00 Michael Hanselmann
                         self._daemon.error_message)
424 033a1d00 Michael Hanselmann
425 033a1d00 Michael Hanselmann
    return True
426 033a1d00 Michael Hanselmann
427 033a1d00 Michael Hanselmann
  def _ReportFinished(self, success, message):
428 033a1d00 Michael Hanselmann
    """Transfer is finished or daemon exited.
429 033a1d00 Michael Hanselmann

430 033a1d00 Michael Hanselmann
    @type success: bool
431 033a1d00 Michael Hanselmann
    @param success: Whether the transfer was successful
432 033a1d00 Michael Hanselmann
    @type message: string
433 033a1d00 Michael Hanselmann
    @param message: Error message
434 033a1d00 Michael Hanselmann

435 033a1d00 Michael Hanselmann
    """
436 033a1d00 Michael Hanselmann
    assert self.success is None
437 033a1d00 Michael Hanselmann
438 033a1d00 Michael Hanselmann
    self.success = success
439 033a1d00 Michael Hanselmann
    self.final_message = message
440 033a1d00 Michael Hanselmann
441 033a1d00 Michael Hanselmann
    if success:
442 194e8648 Iustin Pop
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
443 194e8648 Iustin Pop
                   self._daemon_name, self.node_name)
444 033a1d00 Michael Hanselmann
    elif self._daemon_name:
445 194e8648 Iustin Pop
      self._lu.LogWarning("%s '%s' on %s failed: %s",
446 033a1d00 Michael Hanselmann
                          self.MODE_TEXT, self._daemon_name, self.node_name,
447 033a1d00 Michael Hanselmann
                          message)
448 033a1d00 Michael Hanselmann
    else:
449 033a1d00 Michael Hanselmann
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
450 033a1d00 Michael Hanselmann
                          self.node_name, message)
451 033a1d00 Michael Hanselmann
452 033a1d00 Michael Hanselmann
    self._cbs.ReportFinished(self, self._private)
453 033a1d00 Michael Hanselmann
454 033a1d00 Michael Hanselmann
  def _Finalize(self):
455 033a1d00 Michael Hanselmann
    """Makes the RPC call to finalize this import/export.
456 033a1d00 Michael Hanselmann

457 033a1d00 Michael Hanselmann
    """
458 033a1d00 Michael Hanselmann
    return self._lu.rpc.call_impexp_cleanup(self.node_name, self._daemon_name)
459 033a1d00 Michael Hanselmann
460 033a1d00 Michael Hanselmann
  def Finalize(self, error=None):
461 033a1d00 Michael Hanselmann
    """Finalizes this import/export.
462 033a1d00 Michael Hanselmann

463 033a1d00 Michael Hanselmann
    """
464 033a1d00 Michael Hanselmann
    if self._daemon_name:
465 194e8648 Iustin Pop
      logging.info("Finalizing %s '%s' on %s",
466 033a1d00 Michael Hanselmann
                   self.MODE_TEXT, self._daemon_name, self.node_name)
467 033a1d00 Michael Hanselmann
468 033a1d00 Michael Hanselmann
      result = self._Finalize()
469 033a1d00 Michael Hanselmann
      if result.fail_msg:
470 194e8648 Iustin Pop
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
471 033a1d00 Michael Hanselmann
                            self.MODE_TEXT, self._daemon_name,
472 033a1d00 Michael Hanselmann
                            self.node_name, result.fail_msg)
473 033a1d00 Michael Hanselmann
        return False
474 033a1d00 Michael Hanselmann
475 033a1d00 Michael Hanselmann
      # Daemon is no longer running
476 033a1d00 Michael Hanselmann
      self._daemon_name = None
477 033a1d00 Michael Hanselmann
      self._ts_cleanup = time.time()
478 033a1d00 Michael Hanselmann
479 033a1d00 Michael Hanselmann
    if error:
480 033a1d00 Michael Hanselmann
      self._ReportFinished(False, error)
481 033a1d00 Michael Hanselmann
482 033a1d00 Michael Hanselmann
    return True
483 033a1d00 Michael Hanselmann
484 033a1d00 Michael Hanselmann
485 033a1d00 Michael Hanselmann
class DiskImport(_DiskImportExportBase):
486 033a1d00 Michael Hanselmann
  MODE_TEXT = "import"
487 033a1d00 Michael Hanselmann
488 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, instance, component,
489 033a1d00 Michael Hanselmann
               dest, dest_args, timeouts, cbs, private=None):
490 033a1d00 Michael Hanselmann
    """Initializes this class.
491 033a1d00 Michael Hanselmann

492 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
493 033a1d00 Michael Hanselmann
    @type node_name: string
494 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
495 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
496 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
497 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
498 033a1d00 Michael Hanselmann
    @param instance: Instance object
499 5e26c4d9 Iustin Pop
    @type component: string
500 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
501 033a1d00 Michael Hanselmann
    @param dest: I/O destination
502 033a1d00 Michael Hanselmann
    @param dest_args: I/O arguments
503 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
504 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
505 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
506 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
507 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
508 033a1d00 Michael Hanselmann

509 033a1d00 Michael Hanselmann
    """
510 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
511 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
512 033a1d00 Michael Hanselmann
    self._dest = dest
513 033a1d00 Michael Hanselmann
    self._dest_args = dest_args
514 033a1d00 Michael Hanselmann
515 033a1d00 Michael Hanselmann
    # Timestamps
516 033a1d00 Michael Hanselmann
    self._ts_listening = None
517 033a1d00 Michael Hanselmann
518 033a1d00 Michael Hanselmann
  @property
519 033a1d00 Michael Hanselmann
  def listen_port(self):
520 033a1d00 Michael Hanselmann
    """Returns the port the daemon is listening on.
521 033a1d00 Michael Hanselmann

522 033a1d00 Michael Hanselmann
    """
523 033a1d00 Michael Hanselmann
    if self._daemon:
524 033a1d00 Michael Hanselmann
      return self._daemon.listen_port
525 033a1d00 Michael Hanselmann
526 033a1d00 Michael Hanselmann
    return None
527 033a1d00 Michael Hanselmann
528 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
529 033a1d00 Michael Hanselmann
    """Starts the import daemon.
530 033a1d00 Michael Hanselmann

531 033a1d00 Michael Hanselmann
    """
532 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_import_start(self.node_name, self._opts,
533 6613661a Iustin Pop
                                          self._instance, self._component,
534 b8c160c1 Michael Hanselmann
                                          (self._dest, self._dest_args))
535 033a1d00 Michael Hanselmann
536 033a1d00 Michael Hanselmann
  def CheckListening(self):
537 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
538 033a1d00 Michael Hanselmann

539 033a1d00 Michael Hanselmann
    @rtype: bool
540 033a1d00 Michael Hanselmann
    @return: Whether the daemon is listening
541 033a1d00 Michael Hanselmann

542 033a1d00 Michael Hanselmann
    """
543 033a1d00 Michael Hanselmann
    assert self._daemon, "Daemon status missing"
544 033a1d00 Michael Hanselmann
545 033a1d00 Michael Hanselmann
    if self._ts_listening is not None:
546 033a1d00 Michael Hanselmann
      return True
547 033a1d00 Michael Hanselmann
548 033a1d00 Michael Hanselmann
    port = self._daemon.listen_port
549 033a1d00 Michael Hanselmann
    if port is not None:
550 033a1d00 Michael Hanselmann
      self._ts_listening = time.time()
551 033a1d00 Michael Hanselmann
552 194e8648 Iustin Pop
      logging.debug("Import '%s' on %s is now listening on port %s",
553 033a1d00 Michael Hanselmann
                    self._daemon_name, self.node_name, port)
554 033a1d00 Michael Hanselmann
555 5e26c4d9 Iustin Pop
      self._cbs.ReportListening(self, self._private, self._component)
556 033a1d00 Michael Hanselmann
557 033a1d00 Michael Hanselmann
      return True
558 033a1d00 Michael Hanselmann
559 f8326fca Andrea Spadaccini
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
560 033a1d00 Michael Hanselmann
      raise _ImportExportError("Not listening after %s seconds" %
561 033a1d00 Michael Hanselmann
                               self._timeouts.listen)
562 033a1d00 Michael Hanselmann
563 033a1d00 Michael Hanselmann
    return False
564 033a1d00 Michael Hanselmann
565 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
566 033a1d00 Michael Hanselmann
    """Returns the time since we started listening.
567 033a1d00 Michael Hanselmann

568 033a1d00 Michael Hanselmann
    """
569 033a1d00 Michael Hanselmann
    assert self._ts_listening is not None, \
570 033a1d00 Michael Hanselmann
           ("Checking whether an import is connected is only useful"
571 033a1d00 Michael Hanselmann
            " once it's been listening")
572 033a1d00 Michael Hanselmann
573 033a1d00 Michael Hanselmann
    return self._ts_listening
574 033a1d00 Michael Hanselmann
575 033a1d00 Michael Hanselmann
576 033a1d00 Michael Hanselmann
class DiskExport(_DiskImportExportBase):
577 033a1d00 Michael Hanselmann
  MODE_TEXT = "export"
578 033a1d00 Michael Hanselmann
579 5e26c4d9 Iustin Pop
  def __init__(self, lu, node_name, opts, dest_host, dest_port,
580 5e26c4d9 Iustin Pop
               instance, component, source, source_args,
581 033a1d00 Michael Hanselmann
               timeouts, cbs, private=None):
582 033a1d00 Michael Hanselmann
    """Initializes this class.
583 033a1d00 Michael Hanselmann

584 033a1d00 Michael Hanselmann
    @param lu: Logical unit instance
585 033a1d00 Michael Hanselmann
    @type node_name: string
586 033a1d00 Michael Hanselmann
    @param node_name: Node name for import
587 eb630f50 Michael Hanselmann
    @type opts: L{objects.ImportExportOptions}
588 eb630f50 Michael Hanselmann
    @param opts: Import/export daemon options
589 033a1d00 Michael Hanselmann
    @type dest_host: string
590 033a1d00 Michael Hanselmann
    @param dest_host: Destination host name or IP address
591 033a1d00 Michael Hanselmann
    @type dest_port: number
592 033a1d00 Michael Hanselmann
    @param dest_port: Destination port number
593 033a1d00 Michael Hanselmann
    @type instance: L{objects.Instance}
594 033a1d00 Michael Hanselmann
    @param instance: Instance object
595 5e26c4d9 Iustin Pop
    @type component: string
596 5e26c4d9 Iustin Pop
    @param component: which part of the instance is being imported
597 033a1d00 Michael Hanselmann
    @param source: I/O source
598 033a1d00 Michael Hanselmann
    @param source_args: I/O source
599 033a1d00 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
600 033a1d00 Michael Hanselmann
    @param timeouts: Timeouts for this import
601 033a1d00 Michael Hanselmann
    @type cbs: L{ImportExportCbBase}
602 033a1d00 Michael Hanselmann
    @param cbs: Callbacks
603 033a1d00 Michael Hanselmann
    @param private: Private data for callback functions
604 033a1d00 Michael Hanselmann

605 033a1d00 Michael Hanselmann
    """
606 5e26c4d9 Iustin Pop
    _DiskImportExportBase.__init__(self, lu, node_name, opts, instance,
607 5e26c4d9 Iustin Pop
                                   component, timeouts, cbs, private)
608 033a1d00 Michael Hanselmann
    self._dest_host = dest_host
609 033a1d00 Michael Hanselmann
    self._dest_port = dest_port
610 033a1d00 Michael Hanselmann
    self._source = source
611 033a1d00 Michael Hanselmann
    self._source_args = source_args
612 033a1d00 Michael Hanselmann
613 033a1d00 Michael Hanselmann
  def _StartDaemon(self):
614 033a1d00 Michael Hanselmann
    """Starts the export daemon.
615 033a1d00 Michael Hanselmann

616 033a1d00 Michael Hanselmann
    """
617 eb630f50 Michael Hanselmann
    return self._lu.rpc.call_export_start(self.node_name, self._opts,
618 033a1d00 Michael Hanselmann
                                          self._dest_host, self._dest_port,
619 6613661a Iustin Pop
                                          self._instance, self._component,
620 b8c160c1 Michael Hanselmann
                                          (self._source, self._source_args))
621 033a1d00 Michael Hanselmann
622 033a1d00 Michael Hanselmann
  def CheckListening(self):
623 033a1d00 Michael Hanselmann
    """Checks whether the daemon is listening.
624 033a1d00 Michael Hanselmann

625 033a1d00 Michael Hanselmann
    """
626 033a1d00 Michael Hanselmann
    # Only an import can be listening
627 033a1d00 Michael Hanselmann
    return True
628 033a1d00 Michael Hanselmann
629 033a1d00 Michael Hanselmann
  def _GetConnectedCheckEpoch(self):
630 033a1d00 Michael Hanselmann
    """Returns the time since the daemon started.
631 033a1d00 Michael Hanselmann

632 033a1d00 Michael Hanselmann
    """
633 033a1d00 Michael Hanselmann
    assert self._ts_begin is not None
634 033a1d00 Michael Hanselmann
635 033a1d00 Michael Hanselmann
    return self._ts_begin
636 033a1d00 Michael Hanselmann
637 033a1d00 Michael Hanselmann
638 1a2e7fe9 Michael Hanselmann
def FormatProgress(progress):
639 1a2e7fe9 Michael Hanselmann
  """Formats progress information for user consumption
640 1a2e7fe9 Michael Hanselmann

641 1a2e7fe9 Michael Hanselmann
  """
642 e6b8d02d Michael Hanselmann
  (mbytes, throughput, percent, eta) = progress
643 1a2e7fe9 Michael Hanselmann
644 1a2e7fe9 Michael Hanselmann
  parts = [
645 1a2e7fe9 Michael Hanselmann
    utils.FormatUnit(mbytes, "h"),
646 1a2e7fe9 Michael Hanselmann
647 1a2e7fe9 Michael Hanselmann
    # Not using FormatUnit as it doesn't support kilobytes
648 1a2e7fe9 Michael Hanselmann
    "%0.1f MiB/s" % throughput,
649 1a2e7fe9 Michael Hanselmann
    ]
650 1a2e7fe9 Michael Hanselmann
651 1a2e7fe9 Michael Hanselmann
  if percent is not None:
652 1a2e7fe9 Michael Hanselmann
    parts.append("%d%%" % percent)
653 1a2e7fe9 Michael Hanselmann
654 e6b8d02d Michael Hanselmann
  if eta is not None:
655 e6b8d02d Michael Hanselmann
    parts.append("ETA %s" % utils.FormatSeconds(eta))
656 1a2e7fe9 Michael Hanselmann
657 1a2e7fe9 Michael Hanselmann
  return utils.CommaJoin(parts)
658 1a2e7fe9 Michael Hanselmann
659 1a2e7fe9 Michael Hanselmann
660 033a1d00 Michael Hanselmann
class ImportExportLoop:
661 033a1d00 Michael Hanselmann
  MIN_DELAY = 1.0
662 033a1d00 Michael Hanselmann
  MAX_DELAY = 20.0
663 033a1d00 Michael Hanselmann
664 033a1d00 Michael Hanselmann
  def __init__(self, lu):
665 033a1d00 Michael Hanselmann
    """Initializes this class.
666 033a1d00 Michael Hanselmann

667 033a1d00 Michael Hanselmann
    """
668 033a1d00 Michael Hanselmann
    self._lu = lu
669 033a1d00 Michael Hanselmann
    self._queue = []
670 033a1d00 Michael Hanselmann
    self._pending_add = []
671 033a1d00 Michael Hanselmann
672 033a1d00 Michael Hanselmann
  def Add(self, diskie):
673 033a1d00 Michael Hanselmann
    """Adds an import/export object to the loop.
674 033a1d00 Michael Hanselmann

675 033a1d00 Michael Hanselmann
    @type diskie: Subclass of L{_DiskImportExportBase}
676 033a1d00 Michael Hanselmann
    @param diskie: Import/export object
677 033a1d00 Michael Hanselmann

678 033a1d00 Michael Hanselmann
    """
679 033a1d00 Michael Hanselmann
    assert diskie not in self._pending_add
680 033a1d00 Michael Hanselmann
    assert diskie.loop is None
681 033a1d00 Michael Hanselmann
682 033a1d00 Michael Hanselmann
    diskie.SetLoop(self)
683 033a1d00 Michael Hanselmann
684 033a1d00 Michael Hanselmann
    # Adding new objects to a staging list is necessary, otherwise the main
685 033a1d00 Michael Hanselmann
    # loop gets confused if callbacks modify the queue while the main loop is
686 033a1d00 Michael Hanselmann
    # iterating over it.
687 033a1d00 Michael Hanselmann
    self._pending_add.append(diskie)
688 033a1d00 Michael Hanselmann
689 033a1d00 Michael Hanselmann
  @staticmethod
690 033a1d00 Michael Hanselmann
  def _CollectDaemonStatus(lu, daemons):
691 033a1d00 Michael Hanselmann
    """Collects the status for all import/export daemons.
692 033a1d00 Michael Hanselmann

693 033a1d00 Michael Hanselmann
    """
694 033a1d00 Michael Hanselmann
    daemon_status = {}
695 033a1d00 Michael Hanselmann
696 033a1d00 Michael Hanselmann
    for node_name, names in daemons.iteritems():
697 033a1d00 Michael Hanselmann
      result = lu.rpc.call_impexp_status(node_name, names)
698 033a1d00 Michael Hanselmann
      if result.fail_msg:
699 033a1d00 Michael Hanselmann
        lu.LogWarning("Failed to get daemon status on %s: %s",
700 033a1d00 Michael Hanselmann
                      node_name, result.fail_msg)
701 033a1d00 Michael Hanselmann
        continue
702 033a1d00 Michael Hanselmann
703 033a1d00 Michael Hanselmann
      assert len(names) == len(result.payload)
704 033a1d00 Michael Hanselmann
705 033a1d00 Michael Hanselmann
      daemon_status[node_name] = dict(zip(names, result.payload))
706 033a1d00 Michael Hanselmann
707 033a1d00 Michael Hanselmann
    return daemon_status
708 033a1d00 Michael Hanselmann
709 033a1d00 Michael Hanselmann
  @staticmethod
710 033a1d00 Michael Hanselmann
  def _GetActiveDaemonNames(queue):
711 033a1d00 Michael Hanselmann
    """Gets the names of all active daemons.
712 033a1d00 Michael Hanselmann

713 033a1d00 Michael Hanselmann
    """
714 033a1d00 Michael Hanselmann
    result = {}
715 033a1d00 Michael Hanselmann
    for diskie in queue:
716 033a1d00 Michael Hanselmann
      if not diskie.active:
717 033a1d00 Michael Hanselmann
        continue
718 033a1d00 Michael Hanselmann
719 033a1d00 Michael Hanselmann
      try:
720 033a1d00 Michael Hanselmann
        # Start daemon if necessary
721 033a1d00 Michael Hanselmann
        daemon_name = diskie.CheckDaemon()
722 033a1d00 Michael Hanselmann
      except _ImportExportError, err:
723 033a1d00 Michael Hanselmann
        logging.exception("%s failed", diskie.MODE_TEXT)
724 033a1d00 Michael Hanselmann
        diskie.Finalize(error=str(err))
725 033a1d00 Michael Hanselmann
        continue
726 033a1d00 Michael Hanselmann
727 033a1d00 Michael Hanselmann
      result.setdefault(diskie.node_name, []).append(daemon_name)
728 033a1d00 Michael Hanselmann
729 033a1d00 Michael Hanselmann
    assert len(queue) >= len(result)
730 033a1d00 Michael Hanselmann
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
731 033a1d00 Michael Hanselmann
732 033a1d00 Michael Hanselmann
    logging.debug("daemons=%r", result)
733 033a1d00 Michael Hanselmann
734 033a1d00 Michael Hanselmann
    return result
735 033a1d00 Michael Hanselmann
736 033a1d00 Michael Hanselmann
  def _AddPendingToQueue(self):
737 033a1d00 Michael Hanselmann
    """Adds all pending import/export objects to the internal queue.
738 033a1d00 Michael Hanselmann

739 033a1d00 Michael Hanselmann
    """
740 033a1d00 Michael Hanselmann
    assert compat.all(diskie not in self._queue and diskie.loop == self
741 033a1d00 Michael Hanselmann
                      for diskie in self._pending_add)
742 033a1d00 Michael Hanselmann
743 033a1d00 Michael Hanselmann
    self._queue.extend(self._pending_add)
744 033a1d00 Michael Hanselmann
745 033a1d00 Michael Hanselmann
    del self._pending_add[:]
746 033a1d00 Michael Hanselmann
747 033a1d00 Michael Hanselmann
  def Run(self):
748 033a1d00 Michael Hanselmann
    """Utility main loop.
749 033a1d00 Michael Hanselmann

750 033a1d00 Michael Hanselmann
    """
751 033a1d00 Michael Hanselmann
    while True:
752 033a1d00 Michael Hanselmann
      self._AddPendingToQueue()
753 033a1d00 Michael Hanselmann
754 033a1d00 Michael Hanselmann
      # Collect all active daemon names
755 033a1d00 Michael Hanselmann
      daemons = self._GetActiveDaemonNames(self._queue)
756 033a1d00 Michael Hanselmann
      if not daemons:
757 033a1d00 Michael Hanselmann
        break
758 033a1d00 Michael Hanselmann
759 033a1d00 Michael Hanselmann
      # Collection daemon status data
760 033a1d00 Michael Hanselmann
      data = self._CollectDaemonStatus(self._lu, daemons)
761 033a1d00 Michael Hanselmann
762 033a1d00 Michael Hanselmann
      # Use data
763 033a1d00 Michael Hanselmann
      delay = self.MAX_DELAY
764 033a1d00 Michael Hanselmann
      for diskie in self._queue:
765 033a1d00 Michael Hanselmann
        if not diskie.active:
766 033a1d00 Michael Hanselmann
          continue
767 033a1d00 Michael Hanselmann
768 033a1d00 Michael Hanselmann
        try:
769 033a1d00 Michael Hanselmann
          try:
770 033a1d00 Michael Hanselmann
            all_daemon_data = data[diskie.node_name]
771 033a1d00 Michael Hanselmann
          except KeyError:
772 033a1d00 Michael Hanselmann
            result = diskie.SetDaemonData(False, None)
773 033a1d00 Michael Hanselmann
          else:
774 033a1d00 Michael Hanselmann
            result = \
775 033a1d00 Michael Hanselmann
              diskie.SetDaemonData(True,
776 033a1d00 Michael Hanselmann
                                   all_daemon_data[diskie.GetDaemonName()])
777 033a1d00 Michael Hanselmann
778 033a1d00 Michael Hanselmann
          if not result:
779 033a1d00 Michael Hanselmann
            # Daemon not yet ready, retry soon
780 033a1d00 Michael Hanselmann
            delay = min(3.0, delay)
781 033a1d00 Michael Hanselmann
            continue
782 033a1d00 Michael Hanselmann
783 033a1d00 Michael Hanselmann
          if diskie.CheckFinished():
784 033a1d00 Michael Hanselmann
            # Transfer finished
785 033a1d00 Michael Hanselmann
            diskie.Finalize()
786 033a1d00 Michael Hanselmann
            continue
787 033a1d00 Michael Hanselmann
788 033a1d00 Michael Hanselmann
          # Normal case: check again in 5 seconds
789 033a1d00 Michael Hanselmann
          delay = min(5.0, delay)
790 033a1d00 Michael Hanselmann
791 033a1d00 Michael Hanselmann
          if not diskie.CheckListening():
792 033a1d00 Michael Hanselmann
            # Not yet listening, retry soon
793 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
794 033a1d00 Michael Hanselmann
            continue
795 033a1d00 Michael Hanselmann
796 033a1d00 Michael Hanselmann
          if not diskie.CheckConnected():
797 033a1d00 Michael Hanselmann
            # Not yet connected, retry soon
798 033a1d00 Michael Hanselmann
            delay = min(1.0, delay)
799 033a1d00 Michael Hanselmann
            continue
800 033a1d00 Michael Hanselmann
801 033a1d00 Michael Hanselmann
        except _ImportExportError, err:
802 033a1d00 Michael Hanselmann
          logging.exception("%s failed", diskie.MODE_TEXT)
803 033a1d00 Michael Hanselmann
          diskie.Finalize(error=str(err))
804 033a1d00 Michael Hanselmann
805 403f5172 Guido Trotter
      if not compat.any(diskie.active for diskie in self._queue):
806 033a1d00 Michael Hanselmann
        break
807 033a1d00 Michael Hanselmann
808 033a1d00 Michael Hanselmann
      # Wait a bit
809 033a1d00 Michael Hanselmann
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
810 033a1d00 Michael Hanselmann
      logging.debug("Waiting for %ss", delay)
811 033a1d00 Michael Hanselmann
      time.sleep(delay)
812 033a1d00 Michael Hanselmann
813 033a1d00 Michael Hanselmann
  def FinalizeAll(self):
814 033a1d00 Michael Hanselmann
    """Finalizes all pending transfers.
815 033a1d00 Michael Hanselmann

816 033a1d00 Michael Hanselmann
    """
817 033a1d00 Michael Hanselmann
    success = True
818 033a1d00 Michael Hanselmann
819 033a1d00 Michael Hanselmann
    for diskie in self._queue:
820 033a1d00 Michael Hanselmann
      success = diskie.Finalize() and success
821 033a1d00 Michael Hanselmann
822 033a1d00 Michael Hanselmann
    return success
823 5d97d6dd Michael Hanselmann
824 5d97d6dd Michael Hanselmann
825 5d97d6dd Michael Hanselmann
class _TransferInstCbBase(ImportExportCbBase):
826 5d97d6dd Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node, src_cbs,
827 d51ae04c Michael Hanselmann
               dest_node, dest_ip):
828 5d97d6dd Michael Hanselmann
    """Initializes this class.
829 5d97d6dd Michael Hanselmann

830 5d97d6dd Michael Hanselmann
    """
831 5d97d6dd Michael Hanselmann
    ImportExportCbBase.__init__(self)
832 5d97d6dd Michael Hanselmann
833 5d97d6dd Michael Hanselmann
    self.lu = lu
834 5d97d6dd Michael Hanselmann
    self.feedback_fn = feedback_fn
835 5d97d6dd Michael Hanselmann
    self.instance = instance
836 5d97d6dd Michael Hanselmann
    self.timeouts = timeouts
837 5d97d6dd Michael Hanselmann
    self.src_node = src_node
838 5d97d6dd Michael Hanselmann
    self.src_cbs = src_cbs
839 5d97d6dd Michael Hanselmann
    self.dest_node = dest_node
840 5d97d6dd Michael Hanselmann
    self.dest_ip = dest_ip
841 5d97d6dd Michael Hanselmann
842 5d97d6dd Michael Hanselmann
843 5d97d6dd Michael Hanselmann
class _TransferInstSourceCb(_TransferInstCbBase):
844 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
845 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
846 5d97d6dd Michael Hanselmann

847 5d97d6dd Michael Hanselmann
    """
848 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
849 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
850 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
851 5d97d6dd Michael Hanselmann
852 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is sending data on %s" %
853 5d97d6dd Michael Hanselmann
                     (dtp.data.name, ie.node_name))
854 5d97d6dd Michael Hanselmann
855 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, dtp):
856 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
857 1a2e7fe9 Michael Hanselmann

858 1a2e7fe9 Michael Hanselmann
    """
859 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
860 1a2e7fe9 Michael Hanselmann
    if not progress:
861 1a2e7fe9 Michael Hanselmann
      return
862 1a2e7fe9 Michael Hanselmann
863 1a2e7fe9 Michael Hanselmann
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
864 1a2e7fe9 Michael Hanselmann
865 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
866 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
867 5d97d6dd Michael Hanselmann

868 5d97d6dd Michael Hanselmann
    """
869 5d97d6dd Michael Hanselmann
    assert self.src_cbs is None
870 5d97d6dd Michael Hanselmann
    assert dtp.src_export == ie
871 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
872 5d97d6dd Michael Hanselmann
873 5d97d6dd Michael Hanselmann
    if ie.success:
874 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished sending data" % dtp.data.name)
875 5d97d6dd Michael Hanselmann
    else:
876 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
877 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
878 5d97d6dd Michael Hanselmann
879 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
880 5d97d6dd Michael Hanselmann
881 5d97d6dd Michael Hanselmann
    cb = dtp.data.finished_fn
882 5d97d6dd Michael Hanselmann
    if cb:
883 5d97d6dd Michael Hanselmann
      cb()
884 5d97d6dd Michael Hanselmann
885 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
886 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
887 5d97d6dd Michael Hanselmann
    if dtp.dest_import and not ie.success:
888 5d97d6dd Michael Hanselmann
      dtp.dest_import.Abort()
889 5d97d6dd Michael Hanselmann
890 5d97d6dd Michael Hanselmann
891 5d97d6dd Michael Hanselmann
class _TransferInstDestCb(_TransferInstCbBase):
892 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, dtp, component):
893 5d97d6dd Michael Hanselmann
    """Called when daemon started listening.
894 5d97d6dd Michael Hanselmann

895 5d97d6dd Michael Hanselmann
    """
896 5d97d6dd Michael Hanselmann
    assert self.src_cbs
897 5d97d6dd Michael Hanselmann
    assert dtp.src_export is None
898 5d97d6dd Michael Hanselmann
    assert dtp.dest_import
899 d51ae04c Michael Hanselmann
    assert dtp.export_opts
900 5d97d6dd Michael Hanselmann
901 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
902 5d97d6dd Michael Hanselmann
903 5d97d6dd Michael Hanselmann
    # Start export on source node
904 d51ae04c Michael Hanselmann
    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
905 5e26c4d9 Iustin Pop
                    self.dest_ip, ie.listen_port, self.instance,
906 5e26c4d9 Iustin Pop
                    component, dtp.data.src_io, dtp.data.src_ioargs,
907 5d97d6dd Michael Hanselmann
                    self.timeouts, self.src_cbs, private=dtp)
908 5d97d6dd Michael Hanselmann
    ie.loop.Add(de)
909 5d97d6dd Michael Hanselmann
910 5d97d6dd Michael Hanselmann
    dtp.src_export = de
911 5d97d6dd Michael Hanselmann
912 5d97d6dd Michael Hanselmann
  def ReportConnected(self, ie, dtp):
913 5d97d6dd Michael Hanselmann
    """Called when a connection has been established.
914 5d97d6dd Michael Hanselmann

915 5d97d6dd Michael Hanselmann
    """
916 5d97d6dd Michael Hanselmann
    self.feedback_fn("%s is receiving data on %s" %
917 5d97d6dd Michael Hanselmann
                     (dtp.data.name, self.dest_node))
918 5d97d6dd Michael Hanselmann
919 5d97d6dd Michael Hanselmann
  def ReportFinished(self, ie, dtp):
920 5d97d6dd Michael Hanselmann
    """Called when a transfer has finished.
921 5d97d6dd Michael Hanselmann

922 5d97d6dd Michael Hanselmann
    """
923 5d97d6dd Michael Hanselmann
    if ie.success:
924 5d97d6dd Michael Hanselmann
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
925 5d97d6dd Michael Hanselmann
    else:
926 c9300bb3 Iustin Pop
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
927 5d97d6dd Michael Hanselmann
                       (dtp.data.name, ie.final_message, ie.recent_output))
928 5d97d6dd Michael Hanselmann
929 5d97d6dd Michael Hanselmann
    dtp.RecordResult(ie.success)
930 5d97d6dd Michael Hanselmann
931 5d97d6dd Michael Hanselmann
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
932 5d97d6dd Michael Hanselmann
    # give the daemon a moment to sort things out
933 5d97d6dd Michael Hanselmann
    if dtp.src_export and not ie.success:
934 5d97d6dd Michael Hanselmann
      dtp.src_export.Abort()
935 5d97d6dd Michael Hanselmann
936 5d97d6dd Michael Hanselmann
937 5d97d6dd Michael Hanselmann
class DiskTransfer(object):
938 5d97d6dd Michael Hanselmann
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
939 5d97d6dd Michael Hanselmann
               finished_fn):
940 5d97d6dd Michael Hanselmann
    """Initializes this class.
941 5d97d6dd Michael Hanselmann

942 5d97d6dd Michael Hanselmann
    @type name: string
943 5d97d6dd Michael Hanselmann
    @param name: User-visible name for this transfer (e.g. "disk/0")
944 5d97d6dd Michael Hanselmann
    @param src_io: Source I/O type
945 5d97d6dd Michael Hanselmann
    @param src_ioargs: Source I/O arguments
946 5d97d6dd Michael Hanselmann
    @param dest_io: Destination I/O type
947 5d97d6dd Michael Hanselmann
    @param dest_ioargs: Destination I/O arguments
948 5d97d6dd Michael Hanselmann
    @type finished_fn: callable
949 5d97d6dd Michael Hanselmann
    @param finished_fn: Function called once transfer has finished
950 5d97d6dd Michael Hanselmann

951 5d97d6dd Michael Hanselmann
    """
952 5d97d6dd Michael Hanselmann
    self.name = name
953 5d97d6dd Michael Hanselmann
954 5d97d6dd Michael Hanselmann
    self.src_io = src_io
955 5d97d6dd Michael Hanselmann
    self.src_ioargs = src_ioargs
956 5d97d6dd Michael Hanselmann
957 5d97d6dd Michael Hanselmann
    self.dest_io = dest_io
958 5d97d6dd Michael Hanselmann
    self.dest_ioargs = dest_ioargs
959 5d97d6dd Michael Hanselmann
960 5d97d6dd Michael Hanselmann
    self.finished_fn = finished_fn
961 5d97d6dd Michael Hanselmann
962 5d97d6dd Michael Hanselmann
963 5d97d6dd Michael Hanselmann
class _DiskTransferPrivate(object):
964 d51ae04c Michael Hanselmann
  def __init__(self, data, success, export_opts):
965 5d97d6dd Michael Hanselmann
    """Initializes this class.
966 5d97d6dd Michael Hanselmann

967 5d97d6dd Michael Hanselmann
    @type data: L{DiskTransfer}
968 5d97d6dd Michael Hanselmann
    @type success: bool
969 5d97d6dd Michael Hanselmann

970 5d97d6dd Michael Hanselmann
    """
971 5d97d6dd Michael Hanselmann
    self.data = data
972 d51ae04c Michael Hanselmann
    self.success = success
973 d51ae04c Michael Hanselmann
    self.export_opts = export_opts
974 5d97d6dd Michael Hanselmann
975 5d97d6dd Michael Hanselmann
    self.src_export = None
976 5d97d6dd Michael Hanselmann
    self.dest_import = None
977 5d97d6dd Michael Hanselmann
978 5d97d6dd Michael Hanselmann
  def RecordResult(self, success):
979 5d97d6dd Michael Hanselmann
    """Updates the status.
980 5d97d6dd Michael Hanselmann

981 5d97d6dd Michael Hanselmann
    One failed part will cause the whole transfer to fail.
982 5d97d6dd Michael Hanselmann

983 5d97d6dd Michael Hanselmann
    """
984 5d97d6dd Michael Hanselmann
    self.success = self.success and success
985 5d97d6dd Michael Hanselmann
986 5d97d6dd Michael Hanselmann
987 d51ae04c Michael Hanselmann
def _GetInstDiskMagic(base, instance_name, index):
988 d51ae04c Michael Hanselmann
  """Computes the magic value for a disk export or import.
989 d51ae04c Michael Hanselmann

990 d51ae04c Michael Hanselmann
  @type base: string
991 d51ae04c Michael Hanselmann
  @param base: Random seed value (can be the same for all disks of a transfer)
992 d51ae04c Michael Hanselmann
  @type instance_name: string
993 d51ae04c Michael Hanselmann
  @param instance_name: Name of instance
994 d51ae04c Michael Hanselmann
  @type index: number
995 d51ae04c Michael Hanselmann
  @param index: Disk index
996 d51ae04c Michael Hanselmann

997 d51ae04c Michael Hanselmann
  """
998 d51ae04c Michael Hanselmann
  h = compat.sha1_hash()
999 d51ae04c Michael Hanselmann
  h.update(str(constants.RIE_VERSION))
1000 d51ae04c Michael Hanselmann
  h.update(base)
1001 d51ae04c Michael Hanselmann
  h.update(instance_name)
1002 d51ae04c Michael Hanselmann
  h.update(str(index))
1003 d51ae04c Michael Hanselmann
  return h.hexdigest()
1004 d51ae04c Michael Hanselmann
1005 d51ae04c Michael Hanselmann
1006 5d97d6dd Michael Hanselmann
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
1007 5d97d6dd Michael Hanselmann
                         instance, all_transfers):
1008 5d97d6dd Michael Hanselmann
  """Transfers an instance's data from one node to another.
1009 5d97d6dd Michael Hanselmann

1010 5d97d6dd Michael Hanselmann
  @param lu: Logical unit instance
1011 5d97d6dd Michael Hanselmann
  @param feedback_fn: Feedback function
1012 5d97d6dd Michael Hanselmann
  @type src_node: string
1013 5d97d6dd Michael Hanselmann
  @param src_node: Source node name
1014 5d97d6dd Michael Hanselmann
  @type dest_node: string
1015 5d97d6dd Michael Hanselmann
  @param dest_node: Destination node name
1016 5d97d6dd Michael Hanselmann
  @type dest_ip: string
1017 5d97d6dd Michael Hanselmann
  @param dest_ip: IP address of destination node
1018 5d97d6dd Michael Hanselmann
  @type instance: L{objects.Instance}
1019 5d97d6dd Michael Hanselmann
  @param instance: Instance object
1020 5d97d6dd Michael Hanselmann
  @type all_transfers: list of L{DiskTransfer} instances
1021 5d97d6dd Michael Hanselmann
  @param all_transfers: List of all disk transfers to be made
1022 5d97d6dd Michael Hanselmann
  @rtype: list
1023 5d97d6dd Michael Hanselmann
  @return: List with a boolean (True=successful, False=failed) for success for
1024 5d97d6dd Michael Hanselmann
           each transfer
1025 5d97d6dd Michael Hanselmann

1026 5d97d6dd Michael Hanselmann
  """
1027 5bb95572 Michael Hanselmann
  # Disable compression for all moves as these are all within the same cluster
1028 5bb95572 Michael Hanselmann
  compress = constants.IEC_NONE
1029 a5310c2a Michael Hanselmann
1030 a5310c2a Michael Hanselmann
  logging.debug("Source node %s, destination node %s, compression '%s'",
1031 a5310c2a Michael Hanselmann
                src_node, dest_node, compress)
1032 a5310c2a Michael Hanselmann
1033 5d97d6dd Michael Hanselmann
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1034 5d97d6dd Michael Hanselmann
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1035 d51ae04c Michael Hanselmann
                                  src_node, None, dest_node, dest_ip)
1036 5d97d6dd Michael Hanselmann
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1037 d51ae04c Michael Hanselmann
                                 src_node, src_cbs, dest_node, dest_ip)
1038 5d97d6dd Michael Hanselmann
1039 5d97d6dd Michael Hanselmann
  all_dtp = []
1040 5d97d6dd Michael Hanselmann
1041 d51ae04c Michael Hanselmann
  base_magic = utils.GenerateSecret(6)
1042 d51ae04c Michael Hanselmann
1043 5d97d6dd Michael Hanselmann
  ieloop = ImportExportLoop(lu)
1044 5d97d6dd Michael Hanselmann
  try:
1045 d51ae04c Michael Hanselmann
    for idx, transfer in enumerate(all_transfers):
1046 5d97d6dd Michael Hanselmann
      if transfer:
1047 5d97d6dd Michael Hanselmann
        feedback_fn("Exporting %s from %s to %s" %
1048 5d97d6dd Michael Hanselmann
                    (transfer.name, src_node, dest_node))
1049 5d97d6dd Michael Hanselmann
1050 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1051 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1052 d51ae04c Michael Hanselmann
                                           compress=compress, magic=magic)
1053 d51ae04c Michael Hanselmann
1054 d51ae04c Michael Hanselmann
        dtp = _DiskTransferPrivate(transfer, True, opts)
1055 5d97d6dd Michael Hanselmann
1056 5e26c4d9 Iustin Pop
        di = DiskImport(lu, dest_node, opts, instance, "disk%d" % idx,
1057 5d97d6dd Michael Hanselmann
                        transfer.dest_io, transfer.dest_ioargs,
1058 5d97d6dd Michael Hanselmann
                        timeouts, dest_cbs, private=dtp)
1059 5d97d6dd Michael Hanselmann
        ieloop.Add(di)
1060 5d97d6dd Michael Hanselmann
1061 5d97d6dd Michael Hanselmann
        dtp.dest_import = di
1062 5d97d6dd Michael Hanselmann
      else:
1063 85b3901b Michael Hanselmann
        dtp = _DiskTransferPrivate(None, False, None)
1064 5d97d6dd Michael Hanselmann
1065 5d97d6dd Michael Hanselmann
      all_dtp.append(dtp)
1066 5d97d6dd Michael Hanselmann
1067 5d97d6dd Michael Hanselmann
    ieloop.Run()
1068 5d97d6dd Michael Hanselmann
  finally:
1069 5d97d6dd Michael Hanselmann
    ieloop.FinalizeAll()
1070 5d97d6dd Michael Hanselmann
1071 5d97d6dd Michael Hanselmann
  assert len(all_dtp) == len(all_transfers)
1072 403f5172 Guido Trotter
  assert compat.all((dtp.src_export is None or
1073 5d97d6dd Michael Hanselmann
                      dtp.src_export.success is not None) and
1074 5d97d6dd Michael Hanselmann
                     (dtp.dest_import is None or
1075 5d97d6dd Michael Hanselmann
                      dtp.dest_import.success is not None)
1076 403f5172 Guido Trotter
                     for dtp in all_dtp), \
1077 5d97d6dd Michael Hanselmann
         "Not all imports/exports are finalized"
1078 5d97d6dd Michael Hanselmann
1079 5d97d6dd Michael Hanselmann
  return [bool(dtp.success) for dtp in all_dtp]
1080 387794f8 Michael Hanselmann
1081 387794f8 Michael Hanselmann
1082 4a96f1d1 Michael Hanselmann
class _RemoteExportCb(ImportExportCbBase):
1083 4a96f1d1 Michael Hanselmann
  def __init__(self, feedback_fn, disk_count):
1084 4a96f1d1 Michael Hanselmann
    """Initializes this class.
1085 4a96f1d1 Michael Hanselmann

1086 4a96f1d1 Michael Hanselmann
    """
1087 4a96f1d1 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1088 4a96f1d1 Michael Hanselmann
    self._feedback_fn = feedback_fn
1089 4a96f1d1 Michael Hanselmann
    self._dresults = [None] * disk_count
1090 4a96f1d1 Michael Hanselmann
1091 4a96f1d1 Michael Hanselmann
  @property
1092 4a96f1d1 Michael Hanselmann
  def disk_results(self):
1093 4a96f1d1 Michael Hanselmann
    """Returns per-disk results.
1094 4a96f1d1 Michael Hanselmann

1095 4a96f1d1 Michael Hanselmann
    """
1096 4a96f1d1 Michael Hanselmann
    return self._dresults
1097 4a96f1d1 Michael Hanselmann
1098 4a96f1d1 Michael Hanselmann
  def ReportConnected(self, ie, private):
1099 4a96f1d1 Michael Hanselmann
    """Called when a connection has been established.
1100 4a96f1d1 Michael Hanselmann

1101 4a96f1d1 Michael Hanselmann
    """
1102 4a96f1d1 Michael Hanselmann
    (idx, _) = private
1103 4a96f1d1 Michael Hanselmann
1104 4a96f1d1 Michael Hanselmann
    self._feedback_fn("Disk %s is now sending data" % idx)
1105 4a96f1d1 Michael Hanselmann
1106 1a2e7fe9 Michael Hanselmann
  def ReportProgress(self, ie, private):
1107 1a2e7fe9 Michael Hanselmann
    """Called when new progress information should be reported.
1108 1a2e7fe9 Michael Hanselmann

1109 1a2e7fe9 Michael Hanselmann
    """
1110 1a2e7fe9 Michael Hanselmann
    (idx, _) = private
1111 1a2e7fe9 Michael Hanselmann
1112 1a2e7fe9 Michael Hanselmann
    progress = ie.progress
1113 1a2e7fe9 Michael Hanselmann
    if not progress:
1114 1a2e7fe9 Michael Hanselmann
      return
1115 1a2e7fe9 Michael Hanselmann
1116 1a2e7fe9 Michael Hanselmann
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1117 1a2e7fe9 Michael Hanselmann
1118 4a96f1d1 Michael Hanselmann
  def ReportFinished(self, ie, private):
1119 4a96f1d1 Michael Hanselmann
    """Called when a transfer has finished.
1120 4a96f1d1 Michael Hanselmann

1121 4a96f1d1 Michael Hanselmann
    """
1122 4a96f1d1 Michael Hanselmann
    (idx, finished_fn) = private
1123 4a96f1d1 Michael Hanselmann
1124 4a96f1d1 Michael Hanselmann
    if ie.success:
1125 4a96f1d1 Michael Hanselmann
      self._feedback_fn("Disk %s finished sending data" % idx)
1126 4a96f1d1 Michael Hanselmann
    else:
1127 c9300bb3 Iustin Pop
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1128 4a96f1d1 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1129 4a96f1d1 Michael Hanselmann
1130 4a96f1d1 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1131 4a96f1d1 Michael Hanselmann
1132 4a96f1d1 Michael Hanselmann
    if finished_fn:
1133 4a96f1d1 Michael Hanselmann
      finished_fn()
1134 4a96f1d1 Michael Hanselmann
1135 4a96f1d1 Michael Hanselmann
1136 387794f8 Michael Hanselmann
class ExportInstanceHelper:
1137 387794f8 Michael Hanselmann
  def __init__(self, lu, feedback_fn, instance):
1138 387794f8 Michael Hanselmann
    """Initializes this class.
1139 387794f8 Michael Hanselmann

1140 387794f8 Michael Hanselmann
    @param lu: Logical unit instance
1141 387794f8 Michael Hanselmann
    @param feedback_fn: Feedback function
1142 387794f8 Michael Hanselmann
    @type instance: L{objects.Instance}
1143 387794f8 Michael Hanselmann
    @param instance: Instance object
1144 387794f8 Michael Hanselmann

1145 387794f8 Michael Hanselmann
    """
1146 387794f8 Michael Hanselmann
    self._lu = lu
1147 387794f8 Michael Hanselmann
    self._feedback_fn = feedback_fn
1148 387794f8 Michael Hanselmann
    self._instance = instance
1149 387794f8 Michael Hanselmann
1150 387794f8 Michael Hanselmann
    self._snap_disks = []
1151 387794f8 Michael Hanselmann
    self._removed_snaps = [False] * len(instance.disks)
1152 387794f8 Michael Hanselmann
1153 387794f8 Michael Hanselmann
  def CreateSnapshots(self):
1154 387794f8 Michael Hanselmann
    """Creates an LVM snapshot for every disk of the instance.
1155 387794f8 Michael Hanselmann

1156 387794f8 Michael Hanselmann
    """
1157 387794f8 Michael Hanselmann
    assert not self._snap_disks
1158 387794f8 Michael Hanselmann
1159 387794f8 Michael Hanselmann
    instance = self._instance
1160 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1161 387794f8 Michael Hanselmann
1162 387794f8 Michael Hanselmann
    for idx, disk in enumerate(instance.disks):
1163 387794f8 Michael Hanselmann
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1164 387794f8 Michael Hanselmann
                        (idx, src_node))
1165 387794f8 Michael Hanselmann
1166 387794f8 Michael Hanselmann
      # result.payload will be a snapshot of an lvm leaf of the one we
1167 387794f8 Michael Hanselmann
      # passed
1168 62bfbc7d René Nussbaumer
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1169 800ac399 Iustin Pop
      new_dev = False
1170 387794f8 Michael Hanselmann
      msg = result.fail_msg
1171 387794f8 Michael Hanselmann
      if msg:
1172 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1173 387794f8 Michael Hanselmann
                            idx, src_node, msg)
1174 800ac399 Iustin Pop
      elif (not isinstance(result.payload, (tuple, list)) or
1175 800ac399 Iustin Pop
            len(result.payload) != 2):
1176 800ac399 Iustin Pop
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1177 800ac399 Iustin Pop
                            " result '%s'", idx, src_node, result.payload)
1178 387794f8 Michael Hanselmann
      else:
1179 800ac399 Iustin Pop
        disk_id = tuple(result.payload)
1180 bc5d0215 Andrea Spadaccini
        disk_params = constants.DISK_LD_DEFAULTS[constants.LD_LV].copy()
1181 387794f8 Michael Hanselmann
        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
1182 387794f8 Michael Hanselmann
                               logical_id=disk_id, physical_id=disk_id,
1183 bc5d0215 Andrea Spadaccini
                               iv_name=disk.iv_name,
1184 bc5d0215 Andrea Spadaccini
                               params=disk_params)
1185 387794f8 Michael Hanselmann
1186 387794f8 Michael Hanselmann
      self._snap_disks.append(new_dev)
1187 387794f8 Michael Hanselmann
1188 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1189 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(instance.disks)
1190 387794f8 Michael Hanselmann
1191 387794f8 Michael Hanselmann
  def _RemoveSnapshot(self, disk_index):
1192 387794f8 Michael Hanselmann
    """Removes an LVM snapshot.
1193 387794f8 Michael Hanselmann

1194 387794f8 Michael Hanselmann
    @type disk_index: number
1195 387794f8 Michael Hanselmann
    @param disk_index: Index of the snapshot to be removed
1196 387794f8 Michael Hanselmann

1197 387794f8 Michael Hanselmann
    """
1198 387794f8 Michael Hanselmann
    disk = self._snap_disks[disk_index]
1199 387794f8 Michael Hanselmann
    if disk and not self._removed_snaps[disk_index]:
1200 387794f8 Michael Hanselmann
      src_node = self._instance.primary_node
1201 387794f8 Michael Hanselmann
1202 387794f8 Michael Hanselmann
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1203 387794f8 Michael Hanselmann
                        (disk_index, src_node))
1204 387794f8 Michael Hanselmann
1205 387794f8 Michael Hanselmann
      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
1206 387794f8 Michael Hanselmann
      if result.fail_msg:
1207 387794f8 Michael Hanselmann
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1208 387794f8 Michael Hanselmann
                            " %s: %s", disk_index, src_node, result.fail_msg)
1209 387794f8 Michael Hanselmann
      else:
1210 387794f8 Michael Hanselmann
        self._removed_snaps[disk_index] = True
1211 387794f8 Michael Hanselmann
1212 387794f8 Michael Hanselmann
  def LocalExport(self, dest_node):
1213 387794f8 Michael Hanselmann
    """Intra-cluster instance export.
1214 387794f8 Michael Hanselmann

1215 387794f8 Michael Hanselmann
    @type dest_node: L{objects.Node}
1216 387794f8 Michael Hanselmann
    @param dest_node: Destination node
1217 387794f8 Michael Hanselmann

1218 387794f8 Michael Hanselmann
    """
1219 387794f8 Michael Hanselmann
    instance = self._instance
1220 387794f8 Michael Hanselmann
    src_node = instance.primary_node
1221 387794f8 Michael Hanselmann
1222 387794f8 Michael Hanselmann
    assert len(self._snap_disks) == len(instance.disks)
1223 387794f8 Michael Hanselmann
1224 387794f8 Michael Hanselmann
    transfers = []
1225 387794f8 Michael Hanselmann
1226 387794f8 Michael Hanselmann
    for idx, dev in enumerate(self._snap_disks):
1227 387794f8 Michael Hanselmann
      if not dev:
1228 387794f8 Michael Hanselmann
        transfers.append(None)
1229 387794f8 Michael Hanselmann
        continue
1230 387794f8 Michael Hanselmann
1231 9c492c2d Michael Hanselmann
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1232 387794f8 Michael Hanselmann
                            dev.physical_id[1])
1233 387794f8 Michael Hanselmann
1234 387794f8 Michael Hanselmann
      finished_fn = compat.partial(self._TransferFinished, idx)
1235 387794f8 Michael Hanselmann
1236 387794f8 Michael Hanselmann
      # FIXME: pass debug option from opcode to backend
1237 387794f8 Michael Hanselmann
      dt = DiskTransfer("snapshot/%s" % idx,
1238 387794f8 Michael Hanselmann
                        constants.IEIO_SCRIPT, (dev, idx),
1239 387794f8 Michael Hanselmann
                        constants.IEIO_FILE, (path, ),
1240 387794f8 Michael Hanselmann
                        finished_fn)
1241 387794f8 Michael Hanselmann
      transfers.append(dt)
1242 387794f8 Michael Hanselmann
1243 387794f8 Michael Hanselmann
    # Actually export data
1244 387794f8 Michael Hanselmann
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1245 387794f8 Michael Hanselmann
                                    src_node, dest_node.name,
1246 387794f8 Michael Hanselmann
                                    dest_node.secondary_ip,
1247 387794f8 Michael Hanselmann
                                    instance, transfers)
1248 387794f8 Michael Hanselmann
1249 387794f8 Michael Hanselmann
    assert len(dresults) == len(instance.disks)
1250 387794f8 Michael Hanselmann
1251 387794f8 Michael Hanselmann
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1252 387794f8 Michael Hanselmann
    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
1253 387794f8 Michael Hanselmann
                                               self._snap_disks)
1254 387794f8 Michael Hanselmann
    msg = result.fail_msg
1255 387794f8 Michael Hanselmann
    fin_resu = not msg
1256 387794f8 Michael Hanselmann
    if msg:
1257 387794f8 Michael Hanselmann
      self._lu.LogWarning("Could not finalize export for instance %s"
1258 387794f8 Michael Hanselmann
                          " on node %s: %s", instance.name, dest_node.name, msg)
1259 387794f8 Michael Hanselmann
1260 387794f8 Michael Hanselmann
    return (fin_resu, dresults)
1261 387794f8 Michael Hanselmann
1262 d51ae04c Michael Hanselmann
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1263 4a96f1d1 Michael Hanselmann
    """Inter-cluster instance export.
1264 4a96f1d1 Michael Hanselmann

1265 4a96f1d1 Michael Hanselmann
    @type disk_info: list
1266 4a96f1d1 Michael Hanselmann
    @param disk_info: Per-disk destination information
1267 d51ae04c Michael Hanselmann
    @type key_name: string
1268 d51ae04c Michael Hanselmann
    @param key_name: Name of X509 key to use
1269 d51ae04c Michael Hanselmann
    @type dest_ca_pem: string
1270 d51ae04c Michael Hanselmann
    @param dest_ca_pem: Destination X509 CA in PEM format
1271 4a96f1d1 Michael Hanselmann
    @type timeouts: L{ImportExportTimeouts}
1272 4a96f1d1 Michael Hanselmann
    @param timeouts: Timeouts for this import
1273 4a96f1d1 Michael Hanselmann

1274 4a96f1d1 Michael Hanselmann
    """
1275 4a96f1d1 Michael Hanselmann
    instance = self._instance
1276 4a96f1d1 Michael Hanselmann
1277 4a96f1d1 Michael Hanselmann
    assert len(disk_info) == len(instance.disks)
1278 4a96f1d1 Michael Hanselmann
1279 4a96f1d1 Michael Hanselmann
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1280 4a96f1d1 Michael Hanselmann
1281 4a96f1d1 Michael Hanselmann
    ieloop = ImportExportLoop(self._lu)
1282 4a96f1d1 Michael Hanselmann
    try:
1283 d51ae04c Michael Hanselmann
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1284 d51ae04c Michael Hanselmann
                                                           disk_info)):
1285 ba5619c2 Michael Hanselmann
        # Decide whether to use IPv6
1286 ba5619c2 Michael Hanselmann
        ipv6 = netutils.IP6Address.IsValid(host)
1287 ba5619c2 Michael Hanselmann
1288 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=key_name,
1289 d51ae04c Michael Hanselmann
                                           ca_pem=dest_ca_pem,
1290 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1291 d51ae04c Michael Hanselmann
1292 4a96f1d1 Michael Hanselmann
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1293 4a96f1d1 Michael Hanselmann
        finished_fn = compat.partial(self._TransferFinished, idx)
1294 4a96f1d1 Michael Hanselmann
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1295 5e26c4d9 Iustin Pop
                              opts, host, port, instance, "disk%d" % idx,
1296 4a96f1d1 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1297 4a96f1d1 Michael Hanselmann
                              timeouts, cbs, private=(idx, finished_fn)))
1298 4a96f1d1 Michael Hanselmann
1299 4a96f1d1 Michael Hanselmann
      ieloop.Run()
1300 4a96f1d1 Michael Hanselmann
    finally:
1301 4a96f1d1 Michael Hanselmann
      ieloop.FinalizeAll()
1302 4a96f1d1 Michael Hanselmann
1303 4a96f1d1 Michael Hanselmann
    return (True, cbs.disk_results)
1304 4a96f1d1 Michael Hanselmann
1305 387794f8 Michael Hanselmann
  def _TransferFinished(self, idx):
1306 387794f8 Michael Hanselmann
    """Called once a transfer has finished.
1307 387794f8 Michael Hanselmann

1308 387794f8 Michael Hanselmann
    @type idx: number
1309 387794f8 Michael Hanselmann
    @param idx: Disk index
1310 387794f8 Michael Hanselmann

1311 387794f8 Michael Hanselmann
    """
1312 387794f8 Michael Hanselmann
    logging.debug("Transfer %s finished", idx)
1313 387794f8 Michael Hanselmann
    self._RemoveSnapshot(idx)
1314 387794f8 Michael Hanselmann
1315 387794f8 Michael Hanselmann
  def Cleanup(self):
1316 387794f8 Michael Hanselmann
    """Remove all snapshots.
1317 387794f8 Michael Hanselmann

1318 387794f8 Michael Hanselmann
    """
1319 387794f8 Michael Hanselmann
    assert len(self._removed_snaps) == len(self._instance.disks)
1320 387794f8 Michael Hanselmann
    for idx in range(len(self._instance.disks)):
1321 387794f8 Michael Hanselmann
      self._RemoveSnapshot(idx)
1322 1410fa8d Michael Hanselmann
1323 1410fa8d Michael Hanselmann
1324 9bf56d77 Michael Hanselmann
class _RemoteImportCb(ImportExportCbBase):
1325 9bf56d77 Michael Hanselmann
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1326 9bf56d77 Michael Hanselmann
               external_address):
1327 9bf56d77 Michael Hanselmann
    """Initializes this class.
1328 9bf56d77 Michael Hanselmann

1329 9bf56d77 Michael Hanselmann
    @type cds: string
1330 9bf56d77 Michael Hanselmann
    @param cds: Cluster domain secret
1331 9bf56d77 Michael Hanselmann
    @type x509_cert_pem: string
1332 9bf56d77 Michael Hanselmann
    @param x509_cert_pem: CA used for signing import key
1333 9bf56d77 Michael Hanselmann
    @type disk_count: number
1334 9bf56d77 Michael Hanselmann
    @param disk_count: Number of disks
1335 9bf56d77 Michael Hanselmann
    @type external_address: string
1336 9bf56d77 Michael Hanselmann
    @param external_address: External address of destination node
1337 9bf56d77 Michael Hanselmann

1338 9bf56d77 Michael Hanselmann
    """
1339 9bf56d77 Michael Hanselmann
    ImportExportCbBase.__init__(self)
1340 9bf56d77 Michael Hanselmann
    self._feedback_fn = feedback_fn
1341 9bf56d77 Michael Hanselmann
    self._cds = cds
1342 9bf56d77 Michael Hanselmann
    self._x509_cert_pem = x509_cert_pem
1343 9bf56d77 Michael Hanselmann
    self._disk_count = disk_count
1344 9bf56d77 Michael Hanselmann
    self._external_address = external_address
1345 9bf56d77 Michael Hanselmann
1346 9bf56d77 Michael Hanselmann
    self._dresults = [None] * disk_count
1347 9bf56d77 Michael Hanselmann
    self._daemon_port = [None] * disk_count
1348 9bf56d77 Michael Hanselmann
1349 9bf56d77 Michael Hanselmann
    self._salt = utils.GenerateSecret(8)
1350 9bf56d77 Michael Hanselmann
1351 9bf56d77 Michael Hanselmann
  @property
1352 9bf56d77 Michael Hanselmann
  def disk_results(self):
1353 9bf56d77 Michael Hanselmann
    """Returns per-disk results.
1354 9bf56d77 Michael Hanselmann

1355 9bf56d77 Michael Hanselmann
    """
1356 9bf56d77 Michael Hanselmann
    return self._dresults
1357 9bf56d77 Michael Hanselmann
1358 9bf56d77 Michael Hanselmann
  def _CheckAllListening(self):
1359 9bf56d77 Michael Hanselmann
    """Checks whether all daemons are listening.
1360 9bf56d77 Michael Hanselmann

1361 9bf56d77 Michael Hanselmann
    If all daemons are listening, the information is sent to the client.
1362 9bf56d77 Michael Hanselmann

1363 9bf56d77 Michael Hanselmann
    """
1364 9bf56d77 Michael Hanselmann
    if not compat.all(dp is not None for dp in self._daemon_port):
1365 9bf56d77 Michael Hanselmann
      return
1366 9bf56d77 Michael Hanselmann
1367 9bf56d77 Michael Hanselmann
    host = self._external_address
1368 9bf56d77 Michael Hanselmann
1369 9bf56d77 Michael Hanselmann
    disks = []
1370 d51ae04c Michael Hanselmann
    for idx, (port, magic) in enumerate(self._daemon_port):
1371 9bf56d77 Michael Hanselmann
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1372 d51ae04c Michael Hanselmann
                                               idx, host, port, magic))
1373 9bf56d77 Michael Hanselmann
1374 9bf56d77 Michael Hanselmann
    assert len(disks) == self._disk_count
1375 9bf56d77 Michael Hanselmann
1376 9bf56d77 Michael Hanselmann
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1377 9bf56d77 Michael Hanselmann
      "disks": disks,
1378 9bf56d77 Michael Hanselmann
      "x509_ca": self._x509_cert_pem,
1379 9bf56d77 Michael Hanselmann
      })
1380 9bf56d77 Michael Hanselmann
1381 5e26c4d9 Iustin Pop
  def ReportListening(self, ie, private, _):
1382 9bf56d77 Michael Hanselmann
    """Called when daemon started listening.
1383 9bf56d77 Michael Hanselmann

1384 9bf56d77 Michael Hanselmann
    """
1385 9bf56d77 Michael Hanselmann
    (idx, ) = private
1386 9bf56d77 Michael Hanselmann
1387 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now listening" % idx)
1388 9bf56d77 Michael Hanselmann
1389 9bf56d77 Michael Hanselmann
    assert self._daemon_port[idx] is None
1390 9bf56d77 Michael Hanselmann
1391 d51ae04c Michael Hanselmann
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1392 9bf56d77 Michael Hanselmann
1393 9bf56d77 Michael Hanselmann
    self._CheckAllListening()
1394 9bf56d77 Michael Hanselmann
1395 9bf56d77 Michael Hanselmann
  def ReportConnected(self, ie, private):
1396 9bf56d77 Michael Hanselmann
    """Called when a connection has been established.
1397 9bf56d77 Michael Hanselmann

1398 9bf56d77 Michael Hanselmann
    """
1399 9bf56d77 Michael Hanselmann
    (idx, ) = private
1400 9bf56d77 Michael Hanselmann
1401 9bf56d77 Michael Hanselmann
    self._feedback_fn("Disk %s is now receiving data" % idx)
1402 9bf56d77 Michael Hanselmann
1403 9bf56d77 Michael Hanselmann
  def ReportFinished(self, ie, private):
1404 9bf56d77 Michael Hanselmann
    """Called when a transfer has finished.
1405 9bf56d77 Michael Hanselmann

1406 9bf56d77 Michael Hanselmann
    """
1407 9bf56d77 Michael Hanselmann
    (idx, ) = private
1408 9bf56d77 Michael Hanselmann
1409 9bf56d77 Michael Hanselmann
    # Daemon is certainly no longer listening
1410 9bf56d77 Michael Hanselmann
    self._daemon_port[idx] = None
1411 9bf56d77 Michael Hanselmann
1412 9bf56d77 Michael Hanselmann
    if ie.success:
1413 9bf56d77 Michael Hanselmann
      self._feedback_fn("Disk %s finished receiving data" % idx)
1414 9bf56d77 Michael Hanselmann
    else:
1415 9bf56d77 Michael Hanselmann
      self._feedback_fn(("Disk %s failed to receive data: %s"
1416 c9300bb3 Iustin Pop
                         " (recent output: %s)") %
1417 9bf56d77 Michael Hanselmann
                        (idx, ie.final_message, ie.recent_output))
1418 9bf56d77 Michael Hanselmann
1419 9bf56d77 Michael Hanselmann
    self._dresults[idx] = bool(ie.success)
1420 9bf56d77 Michael Hanselmann
1421 9bf56d77 Michael Hanselmann
1422 ba5619c2 Michael Hanselmann
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1423 ba5619c2 Michael Hanselmann
                 cds, timeouts):
1424 9bf56d77 Michael Hanselmann
  """Imports an instance from another cluster.
1425 9bf56d77 Michael Hanselmann

1426 9bf56d77 Michael Hanselmann
  @param lu: Logical unit instance
1427 9bf56d77 Michael Hanselmann
  @param feedback_fn: Feedback function
1428 9bf56d77 Michael Hanselmann
  @type instance: L{objects.Instance}
1429 9bf56d77 Michael Hanselmann
  @param instance: Instance object
1430 ba5619c2 Michael Hanselmann
  @type pnode: L{objects.Node}
1431 ba5619c2 Michael Hanselmann
  @param pnode: Primary node of instance as an object
1432 9bf56d77 Michael Hanselmann
  @type source_x509_ca: OpenSSL.crypto.X509
1433 9bf56d77 Michael Hanselmann
  @param source_x509_ca: Import source's X509 CA
1434 9bf56d77 Michael Hanselmann
  @type cds: string
1435 9bf56d77 Michael Hanselmann
  @param cds: Cluster domain secret
1436 9bf56d77 Michael Hanselmann
  @type timeouts: L{ImportExportTimeouts}
1437 9bf56d77 Michael Hanselmann
  @param timeouts: Timeouts for this import
1438 9bf56d77 Michael Hanselmann

1439 9bf56d77 Michael Hanselmann
  """
1440 9bf56d77 Michael Hanselmann
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1441 9bf56d77 Michael Hanselmann
                                                  source_x509_ca)
1442 9bf56d77 Michael Hanselmann
1443 d51ae04c Michael Hanselmann
  magic_base = utils.GenerateSecret(6)
1444 d51ae04c Michael Hanselmann
1445 ba5619c2 Michael Hanselmann
  # Decide whether to use IPv6
1446 ba5619c2 Michael Hanselmann
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1447 ba5619c2 Michael Hanselmann
1448 9bf56d77 Michael Hanselmann
  # Create crypto key
1449 9bf56d77 Michael Hanselmann
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1450 9bf56d77 Michael Hanselmann
                                        constants.RIE_CERT_VALIDITY)
1451 9bf56d77 Michael Hanselmann
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1452 9bf56d77 Michael Hanselmann
1453 9bf56d77 Michael Hanselmann
  (x509_key_name, x509_cert_pem) = result.payload
1454 9bf56d77 Michael Hanselmann
  try:
1455 9bf56d77 Michael Hanselmann
    # Load certificate
1456 9bf56d77 Michael Hanselmann
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1457 9bf56d77 Michael Hanselmann
                                                x509_cert_pem)
1458 9bf56d77 Michael Hanselmann
1459 9bf56d77 Michael Hanselmann
    # Sign certificate
1460 9bf56d77 Michael Hanselmann
    signed_x509_cert_pem = \
1461 9bf56d77 Michael Hanselmann
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1462 9bf56d77 Michael Hanselmann
1463 9bf56d77 Michael Hanselmann
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1464 ba5619c2 Michael Hanselmann
                          len(instance.disks), pnode.primary_ip)
1465 9bf56d77 Michael Hanselmann
1466 9bf56d77 Michael Hanselmann
    ieloop = ImportExportLoop(lu)
1467 9bf56d77 Michael Hanselmann
    try:
1468 9bf56d77 Michael Hanselmann
      for idx, dev in enumerate(instance.disks):
1469 d51ae04c Michael Hanselmann
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1470 d51ae04c Michael Hanselmann
1471 d51ae04c Michael Hanselmann
        # Import daemon options
1472 d51ae04c Michael Hanselmann
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1473 d51ae04c Michael Hanselmann
                                           ca_pem=source_ca_pem,
1474 ba5619c2 Michael Hanselmann
                                           magic=magic, ipv6=ipv6)
1475 d51ae04c Michael Hanselmann
1476 eb630f50 Michael Hanselmann
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1477 5e26c4d9 Iustin Pop
                              "disk%d" % idx,
1478 9bf56d77 Michael Hanselmann
                              constants.IEIO_SCRIPT, (dev, idx),
1479 9bf56d77 Michael Hanselmann
                              timeouts, cbs, private=(idx, )))
1480 9bf56d77 Michael Hanselmann
1481 9bf56d77 Michael Hanselmann
      ieloop.Run()
1482 9bf56d77 Michael Hanselmann
    finally:
1483 9bf56d77 Michael Hanselmann
      ieloop.FinalizeAll()
1484 9bf56d77 Michael Hanselmann
  finally:
1485 9bf56d77 Michael Hanselmann
    # Remove crypto key and certificate
1486 9bf56d77 Michael Hanselmann
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1487 9bf56d77 Michael Hanselmann
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1488 9bf56d77 Michael Hanselmann
1489 9bf56d77 Michael Hanselmann
  return cbs.disk_results
1490 9bf56d77 Michael Hanselmann
1491 9bf56d77 Michael Hanselmann
1492 1410fa8d Michael Hanselmann
def _GetImportExportHandshakeMessage(version):
1493 1410fa8d Michael Hanselmann
  """Returns the handshake message for a RIE protocol version.
1494 1410fa8d Michael Hanselmann

1495 1410fa8d Michael Hanselmann
  @type version: number
1496 1410fa8d Michael Hanselmann

1497 1410fa8d Michael Hanselmann
  """
1498 1410fa8d Michael Hanselmann
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1499 1410fa8d Michael Hanselmann
1500 1410fa8d Michael Hanselmann
1501 1410fa8d Michael Hanselmann
def ComputeRemoteExportHandshake(cds):
1502 1410fa8d Michael Hanselmann
  """Computes the remote import/export handshake.
1503 1410fa8d Michael Hanselmann

1504 1410fa8d Michael Hanselmann
  @type cds: string
1505 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1506 1410fa8d Michael Hanselmann

1507 1410fa8d Michael Hanselmann
  """
1508 1410fa8d Michael Hanselmann
  salt = utils.GenerateSecret(8)
1509 1410fa8d Michael Hanselmann
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1510 1410fa8d Michael Hanselmann
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1511 1410fa8d Michael Hanselmann
1512 1410fa8d Michael Hanselmann
1513 1410fa8d Michael Hanselmann
def CheckRemoteExportHandshake(cds, handshake):
1514 1410fa8d Michael Hanselmann
  """Checks the handshake of a remote import/export.
1515 1410fa8d Michael Hanselmann

1516 1410fa8d Michael Hanselmann
  @type cds: string
1517 1410fa8d Michael Hanselmann
  @param cds: Cluster domain secret
1518 1410fa8d Michael Hanselmann
  @type handshake: sequence
1519 1410fa8d Michael Hanselmann
  @param handshake: Handshake sent by remote peer
1520 1410fa8d Michael Hanselmann

1521 1410fa8d Michael Hanselmann
  """
1522 1410fa8d Michael Hanselmann
  try:
1523 1410fa8d Michael Hanselmann
    (version, hmac_digest, hmac_salt) = handshake
1524 1410fa8d Michael Hanselmann
  except (TypeError, ValueError), err:
1525 1410fa8d Michael Hanselmann
    return "Invalid data: %s" % err
1526 1410fa8d Michael Hanselmann
1527 1410fa8d Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1528 1410fa8d Michael Hanselmann
                              hmac_digest, salt=hmac_salt):
1529 1410fa8d Michael Hanselmann
    return "Hash didn't match, clusters don't share the same domain secret"
1530 1410fa8d Michael Hanselmann
1531 1410fa8d Michael Hanselmann
  if version != constants.RIE_VERSION:
1532 1410fa8d Michael Hanselmann
    return ("Clusters don't have the same remote import/export protocol"
1533 1410fa8d Michael Hanselmann
            " (local=%s, remote=%s)" %
1534 1410fa8d Michael Hanselmann
            (constants.RIE_VERSION, version))
1535 1410fa8d Michael Hanselmann
1536 1410fa8d Michael Hanselmann
  return None
1537 4a96f1d1 Michael Hanselmann
1538 4a96f1d1 Michael Hanselmann
1539 d51ae04c Michael Hanselmann
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1540 4a96f1d1 Michael Hanselmann
  """Returns the hashed text for import/export disk information.
1541 4a96f1d1 Michael Hanselmann

1542 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1543 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1544 4a96f1d1 Michael Hanselmann
  @type host: string
1545 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1546 4a96f1d1 Michael Hanselmann
  @type port: number
1547 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1548 d51ae04c Michael Hanselmann
  @type magic: string
1549 d51ae04c Michael Hanselmann
  @param magic: Magic value
1550 4a96f1d1 Michael Hanselmann

1551 4a96f1d1 Michael Hanselmann
  """
1552 d51ae04c Michael Hanselmann
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1553 4a96f1d1 Michael Hanselmann
1554 4a96f1d1 Michael Hanselmann
1555 4a96f1d1 Michael Hanselmann
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1556 4a96f1d1 Michael Hanselmann
  """Verifies received disk information for an export.
1557 4a96f1d1 Michael Hanselmann

1558 4a96f1d1 Michael Hanselmann
  @type cds: string
1559 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1560 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1561 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1562 4a96f1d1 Michael Hanselmann
  @type disk_info: sequence
1563 4a96f1d1 Michael Hanselmann
  @param disk_info: Disk information sent by remote peer
1564 4a96f1d1 Michael Hanselmann

1565 4a96f1d1 Michael Hanselmann
  """
1566 4a96f1d1 Michael Hanselmann
  try:
1567 d51ae04c Michael Hanselmann
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1568 4a96f1d1 Michael Hanselmann
  except (TypeError, ValueError), err:
1569 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("Invalid data: %s" % err)
1570 4a96f1d1 Michael Hanselmann
1571 d51ae04c Michael Hanselmann
  if not (host and port and magic):
1572 d51ae04c Michael Hanselmann
    raise errors.GenericError("Missing destination host, port or magic")
1573 4a96f1d1 Michael Hanselmann
1574 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1575 4a96f1d1 Michael Hanselmann
1576 4a96f1d1 Michael Hanselmann
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1577 4a96f1d1 Michael Hanselmann
    raise errors.GenericError("HMAC is wrong")
1578 4a96f1d1 Michael Hanselmann
1579 ba5619c2 Michael Hanselmann
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1580 ba5619c2 Michael Hanselmann
    destination = host
1581 ba5619c2 Michael Hanselmann
  else:
1582 ba5619c2 Michael Hanselmann
    destination = netutils.Hostname.GetNormalizedName(host)
1583 ba5619c2 Michael Hanselmann
1584 ba5619c2 Michael Hanselmann
  return (destination,
1585 d51ae04c Michael Hanselmann
          utils.ValidateServiceName(port),
1586 d51ae04c Michael Hanselmann
          magic)
1587 4a96f1d1 Michael Hanselmann
1588 4a96f1d1 Michael Hanselmann
1589 d51ae04c Michael Hanselmann
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1590 4a96f1d1 Michael Hanselmann
  """Computes the signed disk information for a remote import.
1591 4a96f1d1 Michael Hanselmann

1592 4a96f1d1 Michael Hanselmann
  @type cds: string
1593 4a96f1d1 Michael Hanselmann
  @param cds: Cluster domain secret
1594 4a96f1d1 Michael Hanselmann
  @type salt: string
1595 4a96f1d1 Michael Hanselmann
  @param salt: HMAC salt
1596 4a96f1d1 Michael Hanselmann
  @type disk_index: number
1597 4a96f1d1 Michael Hanselmann
  @param disk_index: Index of disk (included in hash)
1598 4a96f1d1 Michael Hanselmann
  @type host: string
1599 4a96f1d1 Michael Hanselmann
  @param host: Hostname
1600 4a96f1d1 Michael Hanselmann
  @type port: number
1601 4a96f1d1 Michael Hanselmann
  @param port: Daemon port
1602 d51ae04c Michael Hanselmann
  @type magic: string
1603 d51ae04c Michael Hanselmann
  @param magic: Magic value
1604 4a96f1d1 Michael Hanselmann

1605 4a96f1d1 Michael Hanselmann
  """
1606 d51ae04c Michael Hanselmann
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1607 4a96f1d1 Michael Hanselmann
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1608 d51ae04c Michael Hanselmann
  return (host, port, magic, hmac_digest, salt)
1609 0c77c331 René Nussbaumer
1610 0c77c331 René Nussbaumer
1611 0c77c331 René Nussbaumer
def CalculateGroupIPolicy(cluster, group):
1612 0c77c331 René Nussbaumer
  """Calculate instance policy for group.
1613 0c77c331 René Nussbaumer

1614 0c77c331 René Nussbaumer
  """
1615 0c77c331 René Nussbaumer
  return cluster.SimpleFillIPolicy(group.ipolicy)
1616 0c77c331 René Nussbaumer
1617 0c77c331 René Nussbaumer
1618 0c77c331 René Nussbaumer
def ComputeDiskSize(disk_template, disks):
1619 0c77c331 René Nussbaumer
  """Compute disk size requirements according to disk template
1620 0c77c331 René Nussbaumer

1621 0c77c331 René Nussbaumer
  """
1622 0c77c331 René Nussbaumer
  # Required free disk space as a function of disk and swap space
1623 0c77c331 René Nussbaumer
  req_size_dict = {
1624 0c77c331 René Nussbaumer
    constants.DT_DISKLESS: None,
1625 0c77c331 René Nussbaumer
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1626 0c77c331 René Nussbaumer
    # 128 MB are added for drbd metadata for each disk
1627 0c77c331 René Nussbaumer
    constants.DT_DRBD8:
1628 0c77c331 René Nussbaumer
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1629 0c77c331 René Nussbaumer
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1630 0c77c331 René Nussbaumer
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1631 0c77c331 René Nussbaumer
    constants.DT_BLOCK: 0,
1632 0c77c331 René Nussbaumer
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1633 376631d1 Constantinos Venetsanopoulos
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1634 0c77c331 René Nussbaumer
  }
1635 0c77c331 René Nussbaumer
1636 0c77c331 René Nussbaumer
  if disk_template not in req_size_dict:
1637 0c77c331 René Nussbaumer
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1638 0c77c331 René Nussbaumer
                                 " is unknown" % disk_template)
1639 0c77c331 René Nussbaumer
1640 0c77c331 René Nussbaumer
  return req_size_dict[disk_template]