Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 8106dd64

History | View | Annotate | Download (47.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_uuid, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_uuid: string
143
    @param node_uuid: Node UUID for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_uuid = node_uuid
161
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162
    self._opts = opts.Copy()
163
    self._instance = instance
164
    self._component = component
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Set master daemon's timeout in options for import/export daemon
170
    assert self._opts.connect_timeout is None
171
    self._opts.connect_timeout = timeouts.connect
172

    
173
    # Parent loop
174
    self._loop = None
175

    
176
    # Timestamps
177
    self._ts_begin = None
178
    self._ts_connected = None
179
    self._ts_finished = None
180
    self._ts_cleanup = None
181
    self._ts_last_progress = None
182
    self._ts_last_error = None
183

    
184
    # Transfer status
185
    self.success = None
186
    self.final_message = None
187

    
188
    # Daemon status
189
    self._daemon_name = None
190
    self._daemon = None
191

    
192
  @property
193
  def recent_output(self):
194
    """Returns the most recent output from the daemon.
195

196
    """
197
    if self._daemon:
198
      return "\n".join(self._daemon.recent_output)
199

    
200
    return None
201

    
202
  @property
203
  def progress(self):
204
    """Returns transfer progress information.
205

206
    """
207
    if not self._daemon:
208
      return None
209

    
210
    return (self._daemon.progress_mbytes,
211
            self._daemon.progress_throughput,
212
            self._daemon.progress_percent,
213
            self._daemon.progress_eta)
214

    
215
  @property
216
  def magic(self):
217
    """Returns the magic value for this import/export.
218

219
    """
220
    return self._opts.magic
221

    
222
  @property
223
  def active(self):
224
    """Determines whether this transport is still active.
225

226
    """
227
    return self.success is None
228

    
229
  @property
230
  def loop(self):
231
    """Returns parent loop.
232

233
    @rtype: L{ImportExportLoop}
234

235
    """
236
    return self._loop
237

    
238
  def SetLoop(self, loop):
239
    """Sets the parent loop.
240

241
    @type loop: L{ImportExportLoop}
242

243
    """
244
    if self._loop:
245
      raise errors.ProgrammerError("Loop can only be set once")
246

    
247
    self._loop = loop
248

    
249
  def _StartDaemon(self):
250
    """Starts the import/export daemon.
251

252
    """
253
    raise NotImplementedError()
254

    
255
  def CheckDaemon(self):
256
    """Checks whether daemon has been started and if not, starts it.
257

258
    @rtype: string
259
    @return: Daemon name
260

261
    """
262
    assert self._ts_cleanup is None
263

    
264
    if self._daemon_name is None:
265
      assert self._ts_begin is None
266

    
267
      result = self._StartDaemon()
268
      if result.fail_msg:
269
        raise _ImportExportError("Failed to start %s on %s: %s" %
270
                                 (self.MODE_TEXT, self.node_name,
271
                                  result.fail_msg))
272

    
273
      daemon_name = result.payload
274

    
275
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276
                   self.node_name)
277

    
278
      self._ts_begin = time.time()
279
      self._daemon_name = daemon_name
280

    
281
    return self._daemon_name
282

    
283
  def GetDaemonName(self):
284
    """Returns the daemon name.
285

286
    """
287
    assert self._daemon_name, "Daemon has not been started"
288
    assert self._ts_cleanup is None
289
    return self._daemon_name
290

    
291
  def Abort(self):
292
    """Sends SIGTERM to import/export daemon (if still active).
293

294
    """
295
    if self._daemon_name:
296
      self._lu.LogWarning("Aborting %s '%s' on %s",
297
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299
      if result.fail_msg:
300
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301
                            self.MODE_TEXT, self._daemon_name,
302
                            self.node_uuid, result.fail_msg)
303
        return False
304

    
305
    return True
306

    
307
  def _SetDaemonData(self, data):
308
    """Internal function for updating status daemon data.
309

310
    @type data: L{objects.ImportExportStatus}
311
    @param data: Daemon status data
312

313
    """
314
    assert self._ts_begin is not None
315

    
316
    if not data:
317
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318
        raise _ImportExportError("Didn't become ready after %s seconds" %
319
                                 self._timeouts.ready)
320

    
321
      return False
322

    
323
    self._daemon = data
324

    
325
    return True
326

    
327
  def SetDaemonData(self, success, data):
328
    """Updates daemon status data.
329

330
    @type success: bool
331
    @param success: Whether fetching data was successful or not
332
    @type data: L{objects.ImportExportStatus}
333
    @param data: Daemon status data
334

335
    """
336
    if not success:
337
      if self._ts_last_error is None:
338
        self._ts_last_error = time.time()
339

    
340
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341
        raise _ImportExportError("Too many errors while updating data")
342

    
343
      return False
344

    
345
    self._ts_last_error = None
346

    
347
    return self._SetDaemonData(data)
348

    
349
  def CheckListening(self):
350
    """Checks whether the daemon is listening.
351

352
    """
353
    raise NotImplementedError()
354

    
355
  def _GetConnectedCheckEpoch(self):
356
    """Returns timeout to calculate connect timeout.
357

358
    """
359
    raise NotImplementedError()
360

    
361
  def CheckConnected(self):
362
    """Checks whether the daemon is connected.
363

364
    @rtype: bool
365
    @return: Whether the daemon is connected
366

367
    """
368
    assert self._daemon, "Daemon status missing"
369

    
370
    if self._ts_connected is not None:
371
      return True
372

    
373
    if self._daemon.connected:
374
      self._ts_connected = time.time()
375

    
376
      # TODO: Log remote peer
377
      logging.debug("%s '%s' on %s is now connected",
378
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379

    
380
      self._cbs.ReportConnected(self, self._private)
381

    
382
      return True
383

    
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385
                            self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
        utils.TimeoutExpired(self._ts_last_progress,
397
                             self._timeouts.progress)) and
398
        self._daemon and
399
        self._daemon.progress_mbytes is not None and
400
        self._daemon.progress_throughput is not None):
401
      self._cbs.ReportProgress(self, self._private)
402
      self._ts_last_progress = time.time()
403

    
404
  def CheckFinished(self):
405
    """Checks whether the daemon exited.
406

407
    @rtype: bool
408
    @return: Whether the transfer is finished
409

410
    """
411
    assert self._daemon, "Daemon status missing"
412

    
413
    if self._ts_finished:
414
      return True
415

    
416
    if self._daemon.exit_status is None:
417
      # TODO: Adjust delay for ETA expiring soon
418
      self._CheckProgress()
419
      return False
420

    
421
    self._ts_finished = time.time()
422

    
423
    self._ReportFinished(self._daemon.exit_status == 0,
424
                         self._daemon.error_message)
425

    
426
    return True
427

    
428
  def _ReportFinished(self, success, message):
429
    """Transfer is finished or daemon exited.
430

431
    @type success: bool
432
    @param success: Whether the transfer was successful
433
    @type message: string
434
    @param message: Error message
435

436
    """
437
    assert self.success is None
438

    
439
    self.success = success
440
    self.final_message = message
441

    
442
    if success:
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444
                   self._daemon_name, self.node_uuid)
445
    elif self._daemon_name:
446
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447
                          self.MODE_TEXT, self._daemon_name,
448
                          self._lu.cfg.GetNodeName(self.node_uuid),
449
                          message)
450
    else:
451
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453

    
454
    self._cbs.ReportFinished(self, self._private)
455

    
456
  def _Finalize(self):
457
    """Makes the RPC call to finalize this import/export.
458

459
    """
460
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461

    
462
  def Finalize(self, error=None):
463
    """Finalizes this import/export.
464

465
    """
466
    if self._daemon_name:
467
      logging.info("Finalizing %s '%s' on %s",
468
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469

    
470
      result = self._Finalize()
471
      if result.fail_msg:
472
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473
                            self.MODE_TEXT, self._daemon_name,
474
                            self.node_uuid, result.fail_msg)
475
        return False
476

    
477
      # Daemon is no longer running
478
      self._daemon_name = None
479
      self._ts_cleanup = time.time()
480

    
481
    if error:
482
      self._ReportFinished(False, error)
483

    
484
    return True
485

    
486

    
487
class DiskImport(_DiskImportExportBase):
488
  MODE_TEXT = "import"
489

    
490
  def __init__(self, lu, node_uuid, opts, instance, component,
491
               dest, dest_args, timeouts, cbs, private=None):
492
    """Initializes this class.
493

494
    @param lu: Logical unit instance
495
    @type node_uuid: string
496
    @param node_uuid: Node name for import
497
    @type opts: L{objects.ImportExportOptions}
498
    @param opts: Import/export daemon options
499
    @type instance: L{objects.Instance}
500
    @param instance: Instance object
501
    @type component: string
502
    @param component: which part of the instance is being imported
503
    @param dest: I/O destination
504
    @param dest_args: I/O arguments
505
    @type timeouts: L{ImportExportTimeouts}
506
    @param timeouts: Timeouts for this import
507
    @type cbs: L{ImportExportCbBase}
508
    @param cbs: Callbacks
509
    @param private: Private data for callback functions
510

511
    """
512
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513
                                   component, timeouts, cbs, private)
514
    self._dest = dest
515
    self._dest_args = dest_args
516

    
517
    # Timestamps
518
    self._ts_listening = None
519

    
520
  @property
521
  def listen_port(self):
522
    """Returns the port the daemon is listening on.
523

524
    """
525
    if self._daemon:
526
      return self._daemon.listen_port
527

    
528
    return None
529

    
530
  def _StartDaemon(self):
531
    """Starts the import daemon.
532

533
    """
534
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535
                                          self._instance, self._component,
536
                                          (self._dest, self._dest_args))
537

    
538
  def CheckListening(self):
539
    """Checks whether the daemon is listening.
540

541
    @rtype: bool
542
    @return: Whether the daemon is listening
543

544
    """
545
    assert self._daemon, "Daemon status missing"
546

    
547
    if self._ts_listening is not None:
548
      return True
549

    
550
    port = self._daemon.listen_port
551
    if port is not None:
552
      self._ts_listening = time.time()
553

    
554
      logging.debug("Import '%s' on %s is now listening on port %s",
555
                    self._daemon_name, self.node_uuid, port)
556

    
557
      self._cbs.ReportListening(self, self._private, self._component)
558

    
559
      return True
560

    
561
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562
      raise _ImportExportError("Not listening after %s seconds" %
563
                               self._timeouts.listen)
564

    
565
    return False
566

    
567
  def _GetConnectedCheckEpoch(self):
568
    """Returns the time since we started listening.
569

570
    """
571
    assert self._ts_listening is not None, \
572
           ("Checking whether an import is connected is only useful"
573
            " once it's been listening")
574

    
575
    return self._ts_listening
576

    
577

    
578
class DiskExport(_DiskImportExportBase):
579
  MODE_TEXT = "export"
580

    
581
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582
               instance, component, source, source_args,
583
               timeouts, cbs, private=None):
584
    """Initializes this class.
585

586
    @param lu: Logical unit instance
587
    @type node_uuid: string
588
    @param node_uuid: Node UUID for import
589
    @type opts: L{objects.ImportExportOptions}
590
    @param opts: Import/export daemon options
591
    @type dest_host: string
592
    @param dest_host: Destination host name or IP address
593
    @type dest_port: number
594
    @param dest_port: Destination port number
595
    @type instance: L{objects.Instance}
596
    @param instance: Instance object
597
    @type component: string
598
    @param component: which part of the instance is being imported
599
    @param source: I/O source
600
    @param source_args: I/O source
601
    @type timeouts: L{ImportExportTimeouts}
602
    @param timeouts: Timeouts for this import
603
    @type cbs: L{ImportExportCbBase}
604
    @param cbs: Callbacks
605
    @param private: Private data for callback functions
606

607
    """
608
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609
                                   component, timeouts, cbs, private)
610
    self._dest_host = dest_host
611
    self._dest_port = dest_port
612
    self._source = source
613
    self._source_args = source_args
614

    
615
  def _StartDaemon(self):
616
    """Starts the export daemon.
617

618
    """
619
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620
                                          self._dest_host, self._dest_port,
621
                                          self._instance, self._component,
622
                                          (self._source, self._source_args))
623

    
624
  def CheckListening(self):
625
    """Checks whether the daemon is listening.
626

627
    """
628
    # Only an import can be listening
629
    return True
630

    
631
  def _GetConnectedCheckEpoch(self):
632
    """Returns the time since the daemon started.
633

634
    """
635
    assert self._ts_begin is not None
636

    
637
    return self._ts_begin
638

    
639

    
640
def FormatProgress(progress):
641
  """Formats progress information for user consumption
642

643
  """
644
  (mbytes, throughput, percent, eta) = progress
645

    
646
  parts = [
647
    utils.FormatUnit(mbytes, "h"),
648

    
649
    # Not using FormatUnit as it doesn't support kilobytes
650
    "%0.1f MiB/s" % throughput,
651
    ]
652

    
653
  if percent is not None:
654
    parts.append("%d%%" % percent)
655

    
656
  if eta is not None:
657
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658

    
659
  return utils.CommaJoin(parts)
660

    
661

    
662
class ImportExportLoop:
663
  MIN_DELAY = 1.0
664
  MAX_DELAY = 20.0
665

    
666
  def __init__(self, lu):
667
    """Initializes this class.
668

669
    """
670
    self._lu = lu
671
    self._queue = []
672
    self._pending_add = []
673

    
674
  def Add(self, diskie):
675
    """Adds an import/export object to the loop.
676

677
    @type diskie: Subclass of L{_DiskImportExportBase}
678
    @param diskie: Import/export object
679

680
    """
681
    assert diskie not in self._pending_add
682
    assert diskie.loop is None
683

    
684
    diskie.SetLoop(self)
685

    
686
    # Adding new objects to a staging list is necessary, otherwise the main
687
    # loop gets confused if callbacks modify the queue while the main loop is
688
    # iterating over it.
689
    self._pending_add.append(diskie)
690

    
691
  @staticmethod
692
  def _CollectDaemonStatus(lu, daemons):
693
    """Collects the status for all import/export daemons.
694

695
    """
696
    daemon_status = {}
697

    
698
    for node_name, names in daemons.iteritems():
699
      result = lu.rpc.call_impexp_status(node_name, names)
700
      if result.fail_msg:
701
        lu.LogWarning("Failed to get daemon status on %s: %s",
702
                      node_name, result.fail_msg)
703
        continue
704

    
705
      assert len(names) == len(result.payload)
706

    
707
      daemon_status[node_name] = dict(zip(names, result.payload))
708

    
709
    return daemon_status
710

    
711
  @staticmethod
712
  def _GetActiveDaemonNames(queue):
713
    """Gets the names of all active daemons.
714

715
    """
716
    result = {}
717
    for diskie in queue:
718
      if not diskie.active:
719
        continue
720

    
721
      try:
722
        # Start daemon if necessary
723
        daemon_name = diskie.CheckDaemon()
724
      except _ImportExportError, err:
725
        logging.exception("%s failed", diskie.MODE_TEXT)
726
        diskie.Finalize(error=str(err))
727
        continue
728

    
729
      result.setdefault(diskie.node_name, []).append(daemon_name)
730

    
731
    assert len(queue) >= len(result)
732
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733

    
734
    logging.debug("daemons=%r", result)
735

    
736
    return result
737

    
738
  def _AddPendingToQueue(self):
739
    """Adds all pending import/export objects to the internal queue.
740

741
    """
742
    assert compat.all(diskie not in self._queue and diskie.loop == self
743
                      for diskie in self._pending_add)
744

    
745
    self._queue.extend(self._pending_add)
746

    
747
    del self._pending_add[:]
748

    
749
  def Run(self):
750
    """Utility main loop.
751

752
    """
753
    while True:
754
      self._AddPendingToQueue()
755

    
756
      # Collect all active daemon names
757
      daemons = self._GetActiveDaemonNames(self._queue)
758
      if not daemons:
759
        break
760

    
761
      # Collection daemon status data
762
      data = self._CollectDaemonStatus(self._lu, daemons)
763

    
764
      # Use data
765
      delay = self.MAX_DELAY
766
      for diskie in self._queue:
767
        if not diskie.active:
768
          continue
769

    
770
        try:
771
          try:
772
            all_daemon_data = data[diskie.node_name]
773
          except KeyError:
774
            result = diskie.SetDaemonData(False, None)
775
          else:
776
            result = \
777
              diskie.SetDaemonData(True,
778
                                   all_daemon_data[diskie.GetDaemonName()])
779

    
780
          if not result:
781
            # Daemon not yet ready, retry soon
782
            delay = min(3.0, delay)
783
            continue
784

    
785
          if diskie.CheckFinished():
786
            # Transfer finished
787
            diskie.Finalize()
788
            continue
789

    
790
          # Normal case: check again in 5 seconds
791
          delay = min(5.0, delay)
792

    
793
          if not diskie.CheckListening():
794
            # Not yet listening, retry soon
795
            delay = min(1.0, delay)
796
            continue
797

    
798
          if not diskie.CheckConnected():
799
            # Not yet connected, retry soon
800
            delay = min(1.0, delay)
801
            continue
802

    
803
        except _ImportExportError, err:
804
          logging.exception("%s failed", diskie.MODE_TEXT)
805
          diskie.Finalize(error=str(err))
806

    
807
      if not compat.any(diskie.active for diskie in self._queue):
808
        break
809

    
810
      # Wait a bit
811
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812
      logging.debug("Waiting for %ss", delay)
813
      time.sleep(delay)
814

    
815
  def FinalizeAll(self):
816
    """Finalizes all pending transfers.
817

818
    """
819
    success = True
820

    
821
    for diskie in self._queue:
822
      success = diskie.Finalize() and success
823

    
824
    return success
825

    
826

    
827
class _TransferInstCbBase(ImportExportCbBase):
828
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829
               src_cbs, dest_node_uuid, dest_ip):
830
    """Initializes this class.
831

832
    """
833
    ImportExportCbBase.__init__(self)
834

    
835
    self.lu = lu
836
    self.feedback_fn = feedback_fn
837
    self.instance = instance
838
    self.timeouts = timeouts
839
    self.src_node_uuid = src_node_uuid
840
    self.src_cbs = src_cbs
841
    self.dest_node_uuid = dest_node_uuid
842
    self.dest_ip = dest_ip
843

    
844

    
845
class _TransferInstSourceCb(_TransferInstCbBase):
846
  def ReportConnected(self, ie, dtp):
847
    """Called when a connection has been established.
848

849
    """
850
    assert self.src_cbs is None
851
    assert dtp.src_export == ie
852
    assert dtp.dest_import
853

    
854
    self.feedback_fn("%s is sending data on %s" %
855
                     (dtp.data.name, ie.node_name))
856

    
857
  def ReportProgress(self, ie, dtp):
858
    """Called when new progress information should be reported.
859

860
    """
861
    progress = ie.progress
862
    if not progress:
863
      return
864

    
865
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866

    
867
  def ReportFinished(self, ie, dtp):
868
    """Called when a transfer has finished.
869

870
    """
871
    assert self.src_cbs is None
872
    assert dtp.src_export == ie
873
    assert dtp.dest_import
874

    
875
    if ie.success:
876
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877
    else:
878
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879
                       (dtp.data.name, ie.final_message, ie.recent_output))
880

    
881
    dtp.RecordResult(ie.success)
882

    
883
    cb = dtp.data.finished_fn
884
    if cb:
885
      cb()
886

    
887
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888
    # give the daemon a moment to sort things out
889
    if dtp.dest_import and not ie.success:
890
      dtp.dest_import.Abort()
891

    
892

    
893
class _TransferInstDestCb(_TransferInstCbBase):
894
  def ReportListening(self, ie, dtp, component):
895
    """Called when daemon started listening.
896

897
    """
898
    assert self.src_cbs
899
    assert dtp.src_export is None
900
    assert dtp.dest_import
901
    assert dtp.export_opts
902

    
903
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904

    
905
    # Start export on source node
906
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907
                    self.dest_ip, ie.listen_port, self.instance,
908
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909
                    self.timeouts, self.src_cbs, private=dtp)
910
    ie.loop.Add(de)
911

    
912
    dtp.src_export = de
913

    
914
  def ReportConnected(self, ie, dtp):
915
    """Called when a connection has been established.
916

917
    """
918
    self.feedback_fn("%s is receiving data on %s" %
919
                     (dtp.data.name,
920
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921

    
922
  def ReportFinished(self, ie, dtp):
923
    """Called when a transfer has finished.
924

925
    """
926
    if ie.success:
927
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928
    else:
929
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930
                       (dtp.data.name, ie.final_message, ie.recent_output))
931

    
932
    dtp.RecordResult(ie.success)
933

    
934
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935
    # give the daemon a moment to sort things out
936
    if dtp.src_export and not ie.success:
937
      dtp.src_export.Abort()
938

    
939

    
940
class DiskTransfer(object):
941
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942
               finished_fn):
943
    """Initializes this class.
944

945
    @type name: string
946
    @param name: User-visible name for this transfer (e.g. "disk/0")
947
    @param src_io: Source I/O type
948
    @param src_ioargs: Source I/O arguments
949
    @param dest_io: Destination I/O type
950
    @param dest_ioargs: Destination I/O arguments
951
    @type finished_fn: callable
952
    @param finished_fn: Function called once transfer has finished
953

954
    """
955
    self.name = name
956

    
957
    self.src_io = src_io
958
    self.src_ioargs = src_ioargs
959

    
960
    self.dest_io = dest_io
961
    self.dest_ioargs = dest_ioargs
962

    
963
    self.finished_fn = finished_fn
964

    
965

    
966
class _DiskTransferPrivate(object):
967
  def __init__(self, data, success, export_opts):
968
    """Initializes this class.
969

970
    @type data: L{DiskTransfer}
971
    @type success: bool
972

973
    """
974
    self.data = data
975
    self.success = success
976
    self.export_opts = export_opts
977

    
978
    self.src_export = None
979
    self.dest_import = None
980

    
981
  def RecordResult(self, success):
982
    """Updates the status.
983

984
    One failed part will cause the whole transfer to fail.
985

986
    """
987
    self.success = self.success and success
988

    
989

    
990
def _GetInstDiskMagic(base, instance_name, index):
991
  """Computes the magic value for a disk export or import.
992

993
  @type base: string
994
  @param base: Random seed value (can be the same for all disks of a transfer)
995
  @type instance_name: string
996
  @param instance_name: Name of instance
997
  @type index: number
998
  @param index: Disk index
999

1000
  """
1001
  h = compat.sha1_hash()
1002
  h.update(str(constants.RIE_VERSION))
1003
  h.update(base)
1004
  h.update(instance_name)
1005
  h.update(str(index))
1006
  return h.hexdigest()
1007

    
1008

    
1009
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010
                         dest_ip, compress, instance, all_transfers):
1011
  """Transfers an instance's data from one node to another.
1012

1013
  @param lu: Logical unit instance
1014
  @param feedback_fn: Feedback function
1015
  @type src_node_uuid: string
1016
  @param src_node_uuid: Source node UUID
1017
  @type dest_node_uuid: string
1018
  @param dest_node_uuid: Destination node UUID
1019
  @type dest_ip: string
1020
  @param dest_ip: IP address of destination node
1021
  @type compress: string
1022
  @param compress: one of L{constants.IEC_ALL}
1023
  @type instance: L{objects.Instance}
1024
  @param instance: Instance object
1025
  @type all_transfers: list of L{DiskTransfer} instances
1026
  @param all_transfers: List of all disk transfers to be made
1027
  @rtype: list
1028
  @return: List with a boolean (True=successful, False=failed) for success for
1029
           each transfer
1030

1031
  """
1032
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1033
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1034

    
1035
  logging.debug("Source node %s, destination node %s, compression '%s'",
1036
                src_node_name, dest_node_name, compress)
1037

    
1038
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1039
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1040
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1041
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1042
                                 src_node_uuid, src_cbs, dest_node_uuid,
1043
                                 dest_ip)
1044

    
1045
  all_dtp = []
1046

    
1047
  base_magic = utils.GenerateSecret(6)
1048

    
1049
  ieloop = ImportExportLoop(lu)
1050
  try:
1051
    for idx, transfer in enumerate(all_transfers):
1052
      if transfer:
1053
        feedback_fn("Exporting %s from %s to %s" %
1054
                    (transfer.name, src_node_name, dest_node_name))
1055

    
1056
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1057
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1058
                                           compress=compress, magic=magic)
1059

    
1060
        dtp = _DiskTransferPrivate(transfer, True, opts)
1061

    
1062
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1063
                        transfer.dest_io, transfer.dest_ioargs,
1064
                        timeouts, dest_cbs, private=dtp)
1065
        ieloop.Add(di)
1066

    
1067
        dtp.dest_import = di
1068
      else:
1069
        dtp = _DiskTransferPrivate(None, False, None)
1070

    
1071
      all_dtp.append(dtp)
1072

    
1073
    ieloop.Run()
1074
  finally:
1075
    ieloop.FinalizeAll()
1076

    
1077
  assert len(all_dtp) == len(all_transfers)
1078
  assert compat.all((dtp.src_export is None or
1079
                      dtp.src_export.success is not None) and
1080
                     (dtp.dest_import is None or
1081
                      dtp.dest_import.success is not None)
1082
                     for dtp in all_dtp), \
1083
         "Not all imports/exports are finalized"
1084

    
1085
  return [bool(dtp.success) for dtp in all_dtp]
1086

    
1087

    
1088
class _RemoteExportCb(ImportExportCbBase):
1089
  def __init__(self, feedback_fn, disk_count):
1090
    """Initializes this class.
1091

1092
    """
1093
    ImportExportCbBase.__init__(self)
1094
    self._feedback_fn = feedback_fn
1095
    self._dresults = [None] * disk_count
1096

    
1097
  @property
1098
  def disk_results(self):
1099
    """Returns per-disk results.
1100

1101
    """
1102
    return self._dresults
1103

    
1104
  def ReportConnected(self, ie, private):
1105
    """Called when a connection has been established.
1106

1107
    """
1108
    (idx, _) = private
1109

    
1110
    self._feedback_fn("Disk %s is now sending data" % idx)
1111

    
1112
  def ReportProgress(self, ie, private):
1113
    """Called when new progress information should be reported.
1114

1115
    """
1116
    (idx, _) = private
1117

    
1118
    progress = ie.progress
1119
    if not progress:
1120
      return
1121

    
1122
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1123

    
1124
  def ReportFinished(self, ie, private):
1125
    """Called when a transfer has finished.
1126

1127
    """
1128
    (idx, finished_fn) = private
1129

    
1130
    if ie.success:
1131
      self._feedback_fn("Disk %s finished sending data" % idx)
1132
    else:
1133
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1134
                        (idx, ie.final_message, ie.recent_output))
1135

    
1136
    self._dresults[idx] = bool(ie.success)
1137

    
1138
    if finished_fn:
1139
      finished_fn()
1140

    
1141

    
1142
class ExportInstanceHelper:
1143
  def __init__(self, lu, feedback_fn, instance):
1144
    """Initializes this class.
1145

1146
    @param lu: Logical unit instance
1147
    @param feedback_fn: Feedback function
1148
    @type instance: L{objects.Instance}
1149
    @param instance: Instance object
1150

1151
    """
1152
    self._lu = lu
1153
    self._feedback_fn = feedback_fn
1154
    self._instance = instance
1155

    
1156
    self._snap_disks = []
1157
    self._removed_snaps = [False] * len(instance.disks)
1158

    
1159
  def CreateSnapshots(self):
1160
    """Creates an LVM snapshot for every disk of the instance.
1161

1162
    """
1163
    assert not self._snap_disks
1164

    
1165
    instance = self._instance
1166
    src_node = instance.primary_node
1167
    src_node_name = self._lu.cfg.GetNodeName(src_node)
1168

    
1169
    for idx, disk in enumerate(instance.disks):
1170
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171
                        (idx, src_node_name))
1172

    
1173
      # result.payload will be a snapshot of an lvm leaf of the one we
1174
      # passed
1175
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176
      new_dev = False
1177
      msg = result.fail_msg
1178
      if msg:
1179
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180
                            idx, src_node_name, msg)
1181
      elif (not isinstance(result.payload, (tuple, list)) or
1182
            len(result.payload) != 2):
1183
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184
                            " result '%s'", idx, src_node_name, result.payload)
1185
      else:
1186
        disk_id = tuple(result.payload)
1187
        disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188
        new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189
                               logical_id=disk_id, iv_name=disk.iv_name,
1190
                               params=disk_params)
1191

    
1192
      self._snap_disks.append(new_dev)
1193

    
1194
    assert len(self._snap_disks) == len(instance.disks)
1195
    assert len(self._removed_snaps) == len(instance.disks)
1196

    
1197
  def _RemoveSnapshot(self, disk_index):
1198
    """Removes an LVM snapshot.
1199

1200
    @type disk_index: number
1201
    @param disk_index: Index of the snapshot to be removed
1202

1203
    """
1204
    disk = self._snap_disks[disk_index]
1205
    if disk and not self._removed_snaps[disk_index]:
1206
      src_node = self._instance.primary_node
1207
      src_node_name = self._lu.cfg.GetNodeName(src_node)
1208

    
1209
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210
                        (disk_index, src_node_name))
1211

    
1212
      result = self._lu.rpc.call_blockdev_remove(src_node,
1213
                                                 (disk, self._instance))
1214
      if result.fail_msg:
1215
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1216
                            " %s: %s", disk_index, src_node_name,
1217
                            result.fail_msg)
1218
      else:
1219
        self._removed_snaps[disk_index] = True
1220

    
1221
  def LocalExport(self, dest_node, compress):
1222
    """Intra-cluster instance export.
1223

1224
    @type dest_node: L{objects.Node}
1225
    @param dest_node: Destination node
1226
    @type compress: string
1227
    @param compress: one of L{constants.IEC_ALL}
1228

1229
    """
1230
    instance = self._instance
1231
    src_node_uuid = instance.primary_node
1232

    
1233
    assert len(self._snap_disks) == len(instance.disks)
1234

    
1235
    transfers = []
1236

    
1237
    for idx, dev in enumerate(self._snap_disks):
1238
      if not dev:
1239
        transfers.append(None)
1240
        continue
1241

    
1242
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1243
                            dev.logical_id[1])
1244

    
1245
      finished_fn = compat.partial(self._TransferFinished, idx)
1246

    
1247
      # FIXME: pass debug option from opcode to backend
1248
      dt = DiskTransfer("snapshot/%s" % idx,
1249
                        constants.IEIO_SCRIPT, ((dev, instance), idx),
1250
                        constants.IEIO_FILE, (path, ),
1251
                        finished_fn)
1252
      transfers.append(dt)
1253

    
1254
    # Actually export data
1255
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1256
                                    src_node_uuid, dest_node.uuid,
1257
                                    dest_node.secondary_ip,
1258
                                    compress,
1259
                                    instance, transfers)
1260

    
1261
    assert len(dresults) == len(instance.disks)
1262

    
1263
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1264
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1265
                                               self._snap_disks)
1266
    msg = result.fail_msg
1267
    fin_resu = not msg
1268
    if msg:
1269
      self._lu.LogWarning("Could not finalize export for instance %s"
1270
                          " on node %s: %s", instance.name, dest_node.name, msg)
1271

    
1272
    return (fin_resu, dresults)
1273

    
1274
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, compress, timeouts):
1275
    """Inter-cluster instance export.
1276

1277
    @type disk_info: list
1278
    @param disk_info: Per-disk destination information
1279
    @type key_name: string
1280
    @param key_name: Name of X509 key to use
1281
    @type dest_ca_pem: string
1282
    @param dest_ca_pem: Destination X509 CA in PEM format
1283
    @type compress: string
1284
    @param compress: one of L{constants.IEC_ALL}
1285
    @type timeouts: L{ImportExportTimeouts}
1286
    @param timeouts: Timeouts for this import
1287

1288
    """
1289
    instance = self._instance
1290

    
1291
    assert len(disk_info) == len(instance.disks)
1292

    
1293
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1294

    
1295
    ieloop = ImportExportLoop(self._lu)
1296
    try:
1297
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1298
                                                           disk_info)):
1299
        # Decide whether to use IPv6
1300
        ipv6 = netutils.IP6Address.IsValid(host)
1301

    
1302
        opts = objects.ImportExportOptions(key_name=key_name,
1303
                                           ca_pem=dest_ca_pem,
1304
                                           magic=magic,
1305
                                           compress=compress,
1306
                                           ipv6=ipv6)
1307

    
1308
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1309
        finished_fn = compat.partial(self._TransferFinished, idx)
1310
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1311
                              opts, host, port, instance, "disk%d" % idx,
1312
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1313
                              timeouts, cbs, private=(idx, finished_fn)))
1314

    
1315
      ieloop.Run()
1316
    finally:
1317
      ieloop.FinalizeAll()
1318

    
1319
    return (True, cbs.disk_results)
1320

    
1321
  def _TransferFinished(self, idx):
1322
    """Called once a transfer has finished.
1323

1324
    @type idx: number
1325
    @param idx: Disk index
1326

1327
    """
1328
    logging.debug("Transfer %s finished", idx)
1329
    self._RemoveSnapshot(idx)
1330

    
1331
  def Cleanup(self):
1332
    """Remove all snapshots.
1333

1334
    """
1335
    assert len(self._removed_snaps) == len(self._instance.disks)
1336
    for idx in range(len(self._instance.disks)):
1337
      self._RemoveSnapshot(idx)
1338

    
1339

    
1340
class _RemoteImportCb(ImportExportCbBase):
1341
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1342
               external_address):
1343
    """Initializes this class.
1344

1345
    @type cds: string
1346
    @param cds: Cluster domain secret
1347
    @type x509_cert_pem: string
1348
    @param x509_cert_pem: CA used for signing import key
1349
    @type disk_count: number
1350
    @param disk_count: Number of disks
1351
    @type external_address: string
1352
    @param external_address: External address of destination node
1353

1354
    """
1355
    ImportExportCbBase.__init__(self)
1356
    self._feedback_fn = feedback_fn
1357
    self._cds = cds
1358
    self._x509_cert_pem = x509_cert_pem
1359
    self._disk_count = disk_count
1360
    self._external_address = external_address
1361

    
1362
    self._dresults = [None] * disk_count
1363
    self._daemon_port = [None] * disk_count
1364

    
1365
    self._salt = utils.GenerateSecret(8)
1366

    
1367
  @property
1368
  def disk_results(self):
1369
    """Returns per-disk results.
1370

1371
    """
1372
    return self._dresults
1373

    
1374
  def _CheckAllListening(self):
1375
    """Checks whether all daemons are listening.
1376

1377
    If all daemons are listening, the information is sent to the client.
1378

1379
    """
1380
    if not compat.all(dp is not None for dp in self._daemon_port):
1381
      return
1382

    
1383
    host = self._external_address
1384

    
1385
    disks = []
1386
    for idx, (port, magic) in enumerate(self._daemon_port):
1387
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1388
                                               idx, host, port, magic))
1389

    
1390
    assert len(disks) == self._disk_count
1391

    
1392
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1393
      "disks": disks,
1394
      "x509_ca": self._x509_cert_pem,
1395
      })
1396

    
1397
  def ReportListening(self, ie, private, _):
1398
    """Called when daemon started listening.
1399

1400
    """
1401
    (idx, ) = private
1402

    
1403
    self._feedback_fn("Disk %s is now listening" % idx)
1404

    
1405
    assert self._daemon_port[idx] is None
1406

    
1407
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1408

    
1409
    self._CheckAllListening()
1410

    
1411
  def ReportConnected(self, ie, private):
1412
    """Called when a connection has been established.
1413

1414
    """
1415
    (idx, ) = private
1416

    
1417
    self._feedback_fn("Disk %s is now receiving data" % idx)
1418

    
1419
  def ReportFinished(self, ie, private):
1420
    """Called when a transfer has finished.
1421

1422
    """
1423
    (idx, ) = private
1424

    
1425
    # Daemon is certainly no longer listening
1426
    self._daemon_port[idx] = None
1427

    
1428
    if ie.success:
1429
      self._feedback_fn("Disk %s finished receiving data" % idx)
1430
    else:
1431
      self._feedback_fn(("Disk %s failed to receive data: %s"
1432
                         " (recent output: %s)") %
1433
                        (idx, ie.final_message, ie.recent_output))
1434

    
1435
    self._dresults[idx] = bool(ie.success)
1436

    
1437

    
1438
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1439
                 cds, compress, timeouts):
1440
  """Imports an instance from another cluster.
1441

1442
  @param lu: Logical unit instance
1443
  @param feedback_fn: Feedback function
1444
  @type instance: L{objects.Instance}
1445
  @param instance: Instance object
1446
  @type pnode: L{objects.Node}
1447
  @param pnode: Primary node of instance as an object
1448
  @type source_x509_ca: OpenSSL.crypto.X509
1449
  @param source_x509_ca: Import source's X509 CA
1450
  @type cds: string
1451
  @param cds: Cluster domain secret
1452
  @type compress: string
1453
  @param compress: one of L{constants.IEC_ALL}
1454
  @type timeouts: L{ImportExportTimeouts}
1455
  @param timeouts: Timeouts for this import
1456

1457
  """
1458
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1459
                                                  source_x509_ca)
1460

    
1461
  magic_base = utils.GenerateSecret(6)
1462

    
1463
  # Decide whether to use IPv6
1464
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1465

    
1466
  # Create crypto key
1467
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1468
                                        constants.RIE_CERT_VALIDITY)
1469
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1470

    
1471
  (x509_key_name, x509_cert_pem) = result.payload
1472
  try:
1473
    # Load certificate
1474
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1475
                                                x509_cert_pem)
1476

    
1477
    # Sign certificate
1478
    signed_x509_cert_pem = \
1479
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1480

    
1481
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1482
                          len(instance.disks), pnode.primary_ip)
1483

    
1484
    ieloop = ImportExportLoop(lu)
1485
    try:
1486
      for idx, dev in enumerate(instance.disks):
1487
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1488

    
1489
        # Import daemon options
1490
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1491
                                           ca_pem=source_ca_pem,
1492
                                           magic=magic,
1493
                                           compress=compress,
1494
                                           ipv6=ipv6)
1495

    
1496
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1497
                              "disk%d" % idx,
1498
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1499
                              timeouts, cbs, private=(idx, )))
1500

    
1501
      ieloop.Run()
1502
    finally:
1503
      ieloop.FinalizeAll()
1504
  finally:
1505
    # Remove crypto key and certificate
1506
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1507
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1508

    
1509
  return cbs.disk_results
1510

    
1511

    
1512
def _GetImportExportHandshakeMessage(version):
1513
  """Returns the handshake message for a RIE protocol version.
1514

1515
  @type version: number
1516

1517
  """
1518
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1519

    
1520

    
1521
def ComputeRemoteExportHandshake(cds):
1522
  """Computes the remote import/export handshake.
1523

1524
  @type cds: string
1525
  @param cds: Cluster domain secret
1526

1527
  """
1528
  salt = utils.GenerateSecret(8)
1529
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1530
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1531

    
1532

    
1533
def CheckRemoteExportHandshake(cds, handshake):
1534
  """Checks the handshake of a remote import/export.
1535

1536
  @type cds: string
1537
  @param cds: Cluster domain secret
1538
  @type handshake: sequence
1539
  @param handshake: Handshake sent by remote peer
1540

1541
  """
1542
  try:
1543
    (version, hmac_digest, hmac_salt) = handshake
1544
  except (TypeError, ValueError), err:
1545
    return "Invalid data: %s" % err
1546

    
1547
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1548
                              hmac_digest, salt=hmac_salt):
1549
    return "Hash didn't match, clusters don't share the same domain secret"
1550

    
1551
  if version != constants.RIE_VERSION:
1552
    return ("Clusters don't have the same remote import/export protocol"
1553
            " (local=%s, remote=%s)" %
1554
            (constants.RIE_VERSION, version))
1555

    
1556
  return None
1557

    
1558

    
1559
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1560
  """Returns the hashed text for import/export disk information.
1561

1562
  @type disk_index: number
1563
  @param disk_index: Index of disk (included in hash)
1564
  @type host: string
1565
  @param host: Hostname
1566
  @type port: number
1567
  @param port: Daemon port
1568
  @type magic: string
1569
  @param magic: Magic value
1570

1571
  """
1572
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1573

    
1574

    
1575
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1576
  """Verifies received disk information for an export.
1577

1578
  @type cds: string
1579
  @param cds: Cluster domain secret
1580
  @type disk_index: number
1581
  @param disk_index: Index of disk (included in hash)
1582
  @type disk_info: sequence
1583
  @param disk_info: Disk information sent by remote peer
1584

1585
  """
1586
  try:
1587
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1588
  except (TypeError, ValueError), err:
1589
    raise errors.GenericError("Invalid data: %s" % err)
1590

    
1591
  if not (host and port and magic):
1592
    raise errors.GenericError("Missing destination host, port or magic")
1593

    
1594
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1595

    
1596
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1597
    raise errors.GenericError("HMAC is wrong")
1598

    
1599
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1600
    destination = host
1601
  else:
1602
    destination = netutils.Hostname.GetNormalizedName(host)
1603

    
1604
  return (destination,
1605
          utils.ValidateServiceName(port),
1606
          magic)
1607

    
1608

    
1609
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1610
  """Computes the signed disk information for a remote import.
1611

1612
  @type cds: string
1613
  @param cds: Cluster domain secret
1614
  @type salt: string
1615
  @param salt: HMAC salt
1616
  @type disk_index: number
1617
  @param disk_index: Index of disk (included in hash)
1618
  @type host: string
1619
  @param host: Hostname
1620
  @type port: number
1621
  @param port: Daemon port
1622
  @type magic: string
1623
  @param magic: Magic value
1624

1625
  """
1626
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1627
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1628
  return (host, port, magic, hmac_digest, salt)
1629

    
1630

    
1631
def CalculateGroupIPolicy(cluster, group):
1632
  """Calculate instance policy for group.
1633

1634
  """
1635
  return cluster.SimpleFillIPolicy(group.ipolicy)
1636

    
1637

    
1638
def ComputeDiskSize(disk_template, disks):
1639
  """Compute disk size requirements according to disk template
1640

1641
  """
1642
  # Required free disk space as a function of disk and swap space
1643
  req_size_dict = {
1644
    constants.DT_DISKLESS: 0,
1645
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1646
    # 128 MB are added for drbd metadata for each disk
1647
    constants.DT_DRBD8:
1648
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1649
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1650
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1651
    constants.DT_GLUSTER: sum(d[constants.IDISK_SIZE] for d in disks),
1652
    constants.DT_BLOCK: 0,
1653
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1654
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1655
  }
1656

    
1657
  if disk_template not in req_size_dict:
1658
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1659
                                 " is unknown" % disk_template)
1660

    
1661
  return req_size_dict[disk_template]