Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 896cc964

History | View | Annotate | Download (46.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_uuid, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_uuid: string
143
    @param node_uuid: Node UUID for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_uuid = node_uuid
161
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162
    self._opts = opts.Copy()
163
    self._instance = instance
164
    self._component = component
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Set master daemon's timeout in options for import/export daemon
170
    assert self._opts.connect_timeout is None
171
    self._opts.connect_timeout = timeouts.connect
172

    
173
    # Parent loop
174
    self._loop = None
175

    
176
    # Timestamps
177
    self._ts_begin = None
178
    self._ts_connected = None
179
    self._ts_finished = None
180
    self._ts_cleanup = None
181
    self._ts_last_progress = None
182
    self._ts_last_error = None
183

    
184
    # Transfer status
185
    self.success = None
186
    self.final_message = None
187

    
188
    # Daemon status
189
    self._daemon_name = None
190
    self._daemon = None
191

    
192
  @property
193
  def recent_output(self):
194
    """Returns the most recent output from the daemon.
195

196
    """
197
    if self._daemon:
198
      return "\n".join(self._daemon.recent_output)
199

    
200
    return None
201

    
202
  @property
203
  def progress(self):
204
    """Returns transfer progress information.
205

206
    """
207
    if not self._daemon:
208
      return None
209

    
210
    return (self._daemon.progress_mbytes,
211
            self._daemon.progress_throughput,
212
            self._daemon.progress_percent,
213
            self._daemon.progress_eta)
214

    
215
  @property
216
  def magic(self):
217
    """Returns the magic value for this import/export.
218

219
    """
220
    return self._opts.magic
221

    
222
  @property
223
  def active(self):
224
    """Determines whether this transport is still active.
225

226
    """
227
    return self.success is None
228

    
229
  @property
230
  def loop(self):
231
    """Returns parent loop.
232

233
    @rtype: L{ImportExportLoop}
234

235
    """
236
    return self._loop
237

    
238
  def SetLoop(self, loop):
239
    """Sets the parent loop.
240

241
    @type loop: L{ImportExportLoop}
242

243
    """
244
    if self._loop:
245
      raise errors.ProgrammerError("Loop can only be set once")
246

    
247
    self._loop = loop
248

    
249
  def _StartDaemon(self):
250
    """Starts the import/export daemon.
251

252
    """
253
    raise NotImplementedError()
254

    
255
  def CheckDaemon(self):
256
    """Checks whether daemon has been started and if not, starts it.
257

258
    @rtype: string
259
    @return: Daemon name
260

261
    """
262
    assert self._ts_cleanup is None
263

    
264
    if self._daemon_name is None:
265
      assert self._ts_begin is None
266

    
267
      result = self._StartDaemon()
268
      if result.fail_msg:
269
        raise _ImportExportError("Failed to start %s on %s: %s" %
270
                                 (self.MODE_TEXT, self.node_name,
271
                                  result.fail_msg))
272

    
273
      daemon_name = result.payload
274

    
275
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276
                   self.node_name)
277

    
278
      self._ts_begin = time.time()
279
      self._daemon_name = daemon_name
280

    
281
    return self._daemon_name
282

    
283
  def GetDaemonName(self):
284
    """Returns the daemon name.
285

286
    """
287
    assert self._daemon_name, "Daemon has not been started"
288
    assert self._ts_cleanup is None
289
    return self._daemon_name
290

    
291
  def Abort(self):
292
    """Sends SIGTERM to import/export daemon (if still active).
293

294
    """
295
    if self._daemon_name:
296
      self._lu.LogWarning("Aborting %s '%s' on %s",
297
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299
      if result.fail_msg:
300
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301
                            self.MODE_TEXT, self._daemon_name,
302
                            self.node_uuid, result.fail_msg)
303
        return False
304

    
305
    return True
306

    
307
  def _SetDaemonData(self, data):
308
    """Internal function for updating status daemon data.
309

310
    @type data: L{objects.ImportExportStatus}
311
    @param data: Daemon status data
312

313
    """
314
    assert self._ts_begin is not None
315

    
316
    if not data:
317
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318
        raise _ImportExportError("Didn't become ready after %s seconds" %
319
                                 self._timeouts.ready)
320

    
321
      return False
322

    
323
    self._daemon = data
324

    
325
    return True
326

    
327
  def SetDaemonData(self, success, data):
328
    """Updates daemon status data.
329

330
    @type success: bool
331
    @param success: Whether fetching data was successful or not
332
    @type data: L{objects.ImportExportStatus}
333
    @param data: Daemon status data
334

335
    """
336
    if not success:
337
      if self._ts_last_error is None:
338
        self._ts_last_error = time.time()
339

    
340
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341
        raise _ImportExportError("Too many errors while updating data")
342

    
343
      return False
344

    
345
    self._ts_last_error = None
346

    
347
    return self._SetDaemonData(data)
348

    
349
  def CheckListening(self):
350
    """Checks whether the daemon is listening.
351

352
    """
353
    raise NotImplementedError()
354

    
355
  def _GetConnectedCheckEpoch(self):
356
    """Returns timeout to calculate connect timeout.
357

358
    """
359
    raise NotImplementedError()
360

    
361
  def CheckConnected(self):
362
    """Checks whether the daemon is connected.
363

364
    @rtype: bool
365
    @return: Whether the daemon is connected
366

367
    """
368
    assert self._daemon, "Daemon status missing"
369

    
370
    if self._ts_connected is not None:
371
      return True
372

    
373
    if self._daemon.connected:
374
      self._ts_connected = time.time()
375

    
376
      # TODO: Log remote peer
377
      logging.debug("%s '%s' on %s is now connected",
378
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379

    
380
      self._cbs.ReportConnected(self, self._private)
381

    
382
      return True
383

    
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385
                            self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
        utils.TimeoutExpired(self._ts_last_progress,
397
                             self._timeouts.progress)) and
398
        self._daemon and
399
        self._daemon.progress_mbytes is not None and
400
        self._daemon.progress_throughput is not None):
401
      self._cbs.ReportProgress(self, self._private)
402
      self._ts_last_progress = time.time()
403

    
404
  def CheckFinished(self):
405
    """Checks whether the daemon exited.
406

407
    @rtype: bool
408
    @return: Whether the transfer is finished
409

410
    """
411
    assert self._daemon, "Daemon status missing"
412

    
413
    if self._ts_finished:
414
      return True
415

    
416
    if self._daemon.exit_status is None:
417
      # TODO: Adjust delay for ETA expiring soon
418
      self._CheckProgress()
419
      return False
420

    
421
    self._ts_finished = time.time()
422

    
423
    self._ReportFinished(self._daemon.exit_status == 0,
424
                         self._daemon.error_message)
425

    
426
    return True
427

    
428
  def _ReportFinished(self, success, message):
429
    """Transfer is finished or daemon exited.
430

431
    @type success: bool
432
    @param success: Whether the transfer was successful
433
    @type message: string
434
    @param message: Error message
435

436
    """
437
    assert self.success is None
438

    
439
    self.success = success
440
    self.final_message = message
441

    
442
    if success:
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444
                   self._daemon_name, self.node_uuid)
445
    elif self._daemon_name:
446
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447
                          self.MODE_TEXT, self._daemon_name,
448
                          self._lu.cfg.GetNodeName(self.node_uuid),
449
                          message)
450
    else:
451
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453

    
454
    self._cbs.ReportFinished(self, self._private)
455

    
456
  def _Finalize(self):
457
    """Makes the RPC call to finalize this import/export.
458

459
    """
460
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461

    
462
  def Finalize(self, error=None):
463
    """Finalizes this import/export.
464

465
    """
466
    if self._daemon_name:
467
      logging.info("Finalizing %s '%s' on %s",
468
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469

    
470
      result = self._Finalize()
471
      if result.fail_msg:
472
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473
                            self.MODE_TEXT, self._daemon_name,
474
                            self.node_uuid, result.fail_msg)
475
        return False
476

    
477
      # Daemon is no longer running
478
      self._daemon_name = None
479
      self._ts_cleanup = time.time()
480

    
481
    if error:
482
      self._ReportFinished(False, error)
483

    
484
    return True
485

    
486

    
487
class DiskImport(_DiskImportExportBase):
488
  MODE_TEXT = "import"
489

    
490
  def __init__(self, lu, node_uuid, opts, instance, component,
491
               dest, dest_args, timeouts, cbs, private=None):
492
    """Initializes this class.
493

494
    @param lu: Logical unit instance
495
    @type node_uuid: string
496
    @param node_uuid: Node name for import
497
    @type opts: L{objects.ImportExportOptions}
498
    @param opts: Import/export daemon options
499
    @type instance: L{objects.Instance}
500
    @param instance: Instance object
501
    @type component: string
502
    @param component: which part of the instance is being imported
503
    @param dest: I/O destination
504
    @param dest_args: I/O arguments
505
    @type timeouts: L{ImportExportTimeouts}
506
    @param timeouts: Timeouts for this import
507
    @type cbs: L{ImportExportCbBase}
508
    @param cbs: Callbacks
509
    @param private: Private data for callback functions
510

511
    """
512
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513
                                   component, timeouts, cbs, private)
514
    self._dest = dest
515
    self._dest_args = dest_args
516

    
517
    # Timestamps
518
    self._ts_listening = None
519

    
520
  @property
521
  def listen_port(self):
522
    """Returns the port the daemon is listening on.
523

524
    """
525
    if self._daemon:
526
      return self._daemon.listen_port
527

    
528
    return None
529

    
530
  def _StartDaemon(self):
531
    """Starts the import daemon.
532

533
    """
534
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535
                                          self._instance, self._component,
536
                                          (self._dest, self._dest_args))
537

    
538
  def CheckListening(self):
539
    """Checks whether the daemon is listening.
540

541
    @rtype: bool
542
    @return: Whether the daemon is listening
543

544
    """
545
    assert self._daemon, "Daemon status missing"
546

    
547
    if self._ts_listening is not None:
548
      return True
549

    
550
    port = self._daemon.listen_port
551
    if port is not None:
552
      self._ts_listening = time.time()
553

    
554
      logging.debug("Import '%s' on %s is now listening on port %s",
555
                    self._daemon_name, self.node_uuid, port)
556

    
557
      self._cbs.ReportListening(self, self._private, self._component)
558

    
559
      return True
560

    
561
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562
      raise _ImportExportError("Not listening after %s seconds" %
563
                               self._timeouts.listen)
564

    
565
    return False
566

    
567
  def _GetConnectedCheckEpoch(self):
568
    """Returns the time since we started listening.
569

570
    """
571
    assert self._ts_listening is not None, \
572
           ("Checking whether an import is connected is only useful"
573
            " once it's been listening")
574

    
575
    return self._ts_listening
576

    
577

    
578
class DiskExport(_DiskImportExportBase):
579
  MODE_TEXT = "export"
580

    
581
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582
               instance, component, source, source_args,
583
               timeouts, cbs, private=None):
584
    """Initializes this class.
585

586
    @param lu: Logical unit instance
587
    @type node_uuid: string
588
    @param node_uuid: Node UUID for import
589
    @type opts: L{objects.ImportExportOptions}
590
    @param opts: Import/export daemon options
591
    @type dest_host: string
592
    @param dest_host: Destination host name or IP address
593
    @type dest_port: number
594
    @param dest_port: Destination port number
595
    @type instance: L{objects.Instance}
596
    @param instance: Instance object
597
    @type component: string
598
    @param component: which part of the instance is being imported
599
    @param source: I/O source
600
    @param source_args: I/O source
601
    @type timeouts: L{ImportExportTimeouts}
602
    @param timeouts: Timeouts for this import
603
    @type cbs: L{ImportExportCbBase}
604
    @param cbs: Callbacks
605
    @param private: Private data for callback functions
606

607
    """
608
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609
                                   component, timeouts, cbs, private)
610
    self._dest_host = dest_host
611
    self._dest_port = dest_port
612
    self._source = source
613
    self._source_args = source_args
614

    
615
  def _StartDaemon(self):
616
    """Starts the export daemon.
617

618
    """
619
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620
                                          self._dest_host, self._dest_port,
621
                                          self._instance, self._component,
622
                                          (self._source, self._source_args))
623

    
624
  def CheckListening(self):
625
    """Checks whether the daemon is listening.
626

627
    """
628
    # Only an import can be listening
629
    return True
630

    
631
  def _GetConnectedCheckEpoch(self):
632
    """Returns the time since the daemon started.
633

634
    """
635
    assert self._ts_begin is not None
636

    
637
    return self._ts_begin
638

    
639

    
640
def FormatProgress(progress):
641
  """Formats progress information for user consumption
642

643
  """
644
  (mbytes, throughput, percent, eta) = progress
645

    
646
  parts = [
647
    utils.FormatUnit(mbytes, "h"),
648

    
649
    # Not using FormatUnit as it doesn't support kilobytes
650
    "%0.1f MiB/s" % throughput,
651
    ]
652

    
653
  if percent is not None:
654
    parts.append("%d%%" % percent)
655

    
656
  if eta is not None:
657
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658

    
659
  return utils.CommaJoin(parts)
660

    
661

    
662
class ImportExportLoop:
663
  MIN_DELAY = 1.0
664
  MAX_DELAY = 20.0
665

    
666
  def __init__(self, lu):
667
    """Initializes this class.
668

669
    """
670
    self._lu = lu
671
    self._queue = []
672
    self._pending_add = []
673

    
674
  def Add(self, diskie):
675
    """Adds an import/export object to the loop.
676

677
    @type diskie: Subclass of L{_DiskImportExportBase}
678
    @param diskie: Import/export object
679

680
    """
681
    assert diskie not in self._pending_add
682
    assert diskie.loop is None
683

    
684
    diskie.SetLoop(self)
685

    
686
    # Adding new objects to a staging list is necessary, otherwise the main
687
    # loop gets confused if callbacks modify the queue while the main loop is
688
    # iterating over it.
689
    self._pending_add.append(diskie)
690

    
691
  @staticmethod
692
  def _CollectDaemonStatus(lu, daemons):
693
    """Collects the status for all import/export daemons.
694

695
    """
696
    daemon_status = {}
697

    
698
    for node_name, names in daemons.iteritems():
699
      result = lu.rpc.call_impexp_status(node_name, names)
700
      if result.fail_msg:
701
        lu.LogWarning("Failed to get daemon status on %s: %s",
702
                      node_name, result.fail_msg)
703
        continue
704

    
705
      assert len(names) == len(result.payload)
706

    
707
      daemon_status[node_name] = dict(zip(names, result.payload))
708

    
709
    return daemon_status
710

    
711
  @staticmethod
712
  def _GetActiveDaemonNames(queue):
713
    """Gets the names of all active daemons.
714

715
    """
716
    result = {}
717
    for diskie in queue:
718
      if not diskie.active:
719
        continue
720

    
721
      try:
722
        # Start daemon if necessary
723
        daemon_name = diskie.CheckDaemon()
724
      except _ImportExportError, err:
725
        logging.exception("%s failed", diskie.MODE_TEXT)
726
        diskie.Finalize(error=str(err))
727
        continue
728

    
729
      result.setdefault(diskie.node_name, []).append(daemon_name)
730

    
731
    assert len(queue) >= len(result)
732
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733

    
734
    logging.debug("daemons=%r", result)
735

    
736
    return result
737

    
738
  def _AddPendingToQueue(self):
739
    """Adds all pending import/export objects to the internal queue.
740

741
    """
742
    assert compat.all(diskie not in self._queue and diskie.loop == self
743
                      for diskie in self._pending_add)
744

    
745
    self._queue.extend(self._pending_add)
746

    
747
    del self._pending_add[:]
748

    
749
  def Run(self):
750
    """Utility main loop.
751

752
    """
753
    while True:
754
      self._AddPendingToQueue()
755

    
756
      # Collect all active daemon names
757
      daemons = self._GetActiveDaemonNames(self._queue)
758
      if not daemons:
759
        break
760

    
761
      # Collection daemon status data
762
      data = self._CollectDaemonStatus(self._lu, daemons)
763

    
764
      # Use data
765
      delay = self.MAX_DELAY
766
      for diskie in self._queue:
767
        if not diskie.active:
768
          continue
769

    
770
        try:
771
          try:
772
            all_daemon_data = data[diskie.node_name]
773
          except KeyError:
774
            result = diskie.SetDaemonData(False, None)
775
          else:
776
            result = \
777
              diskie.SetDaemonData(True,
778
                                   all_daemon_data[diskie.GetDaemonName()])
779

    
780
          if not result:
781
            # Daemon not yet ready, retry soon
782
            delay = min(3.0, delay)
783
            continue
784

    
785
          if diskie.CheckFinished():
786
            # Transfer finished
787
            diskie.Finalize()
788
            continue
789

    
790
          # Normal case: check again in 5 seconds
791
          delay = min(5.0, delay)
792

    
793
          if not diskie.CheckListening():
794
            # Not yet listening, retry soon
795
            delay = min(1.0, delay)
796
            continue
797

    
798
          if not diskie.CheckConnected():
799
            # Not yet connected, retry soon
800
            delay = min(1.0, delay)
801
            continue
802

    
803
        except _ImportExportError, err:
804
          logging.exception("%s failed", diskie.MODE_TEXT)
805
          diskie.Finalize(error=str(err))
806

    
807
      if not compat.any(diskie.active for diskie in self._queue):
808
        break
809

    
810
      # Wait a bit
811
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812
      logging.debug("Waiting for %ss", delay)
813
      time.sleep(delay)
814

    
815
  def FinalizeAll(self):
816
    """Finalizes all pending transfers.
817

818
    """
819
    success = True
820

    
821
    for diskie in self._queue:
822
      success = diskie.Finalize() and success
823

    
824
    return success
825

    
826

    
827
class _TransferInstCbBase(ImportExportCbBase):
828
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829
               src_cbs, dest_node_uuid, dest_ip):
830
    """Initializes this class.
831

832
    """
833
    ImportExportCbBase.__init__(self)
834

    
835
    self.lu = lu
836
    self.feedback_fn = feedback_fn
837
    self.instance = instance
838
    self.timeouts = timeouts
839
    self.src_node_uuid = src_node_uuid
840
    self.src_cbs = src_cbs
841
    self.dest_node_uuid = dest_node_uuid
842
    self.dest_ip = dest_ip
843

    
844

    
845
class _TransferInstSourceCb(_TransferInstCbBase):
846
  def ReportConnected(self, ie, dtp):
847
    """Called when a connection has been established.
848

849
    """
850
    assert self.src_cbs is None
851
    assert dtp.src_export == ie
852
    assert dtp.dest_import
853

    
854
    self.feedback_fn("%s is sending data on %s" %
855
                     (dtp.data.name, ie.node_name))
856

    
857
  def ReportProgress(self, ie, dtp):
858
    """Called when new progress information should be reported.
859

860
    """
861
    progress = ie.progress
862
    if not progress:
863
      return
864

    
865
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866

    
867
  def ReportFinished(self, ie, dtp):
868
    """Called when a transfer has finished.
869

870
    """
871
    assert self.src_cbs is None
872
    assert dtp.src_export == ie
873
    assert dtp.dest_import
874

    
875
    if ie.success:
876
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877
    else:
878
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879
                       (dtp.data.name, ie.final_message, ie.recent_output))
880

    
881
    dtp.RecordResult(ie.success)
882

    
883
    cb = dtp.data.finished_fn
884
    if cb:
885
      cb()
886

    
887
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888
    # give the daemon a moment to sort things out
889
    if dtp.dest_import and not ie.success:
890
      dtp.dest_import.Abort()
891

    
892

    
893
class _TransferInstDestCb(_TransferInstCbBase):
894
  def ReportListening(self, ie, dtp, component):
895
    """Called when daemon started listening.
896

897
    """
898
    assert self.src_cbs
899
    assert dtp.src_export is None
900
    assert dtp.dest_import
901
    assert dtp.export_opts
902

    
903
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904

    
905
    # Start export on source node
906
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907
                    self.dest_ip, ie.listen_port, self.instance,
908
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909
                    self.timeouts, self.src_cbs, private=dtp)
910
    ie.loop.Add(de)
911

    
912
    dtp.src_export = de
913

    
914
  def ReportConnected(self, ie, dtp):
915
    """Called when a connection has been established.
916

917
    """
918
    self.feedback_fn("%s is receiving data on %s" %
919
                     (dtp.data.name,
920
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921

    
922
  def ReportFinished(self, ie, dtp):
923
    """Called when a transfer has finished.
924

925
    """
926
    if ie.success:
927
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928
    else:
929
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930
                       (dtp.data.name, ie.final_message, ie.recent_output))
931

    
932
    dtp.RecordResult(ie.success)
933

    
934
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935
    # give the daemon a moment to sort things out
936
    if dtp.src_export and not ie.success:
937
      dtp.src_export.Abort()
938

    
939

    
940
class DiskTransfer(object):
941
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942
               finished_fn):
943
    """Initializes this class.
944

945
    @type name: string
946
    @param name: User-visible name for this transfer (e.g. "disk/0")
947
    @param src_io: Source I/O type
948
    @param src_ioargs: Source I/O arguments
949
    @param dest_io: Destination I/O type
950
    @param dest_ioargs: Destination I/O arguments
951
    @type finished_fn: callable
952
    @param finished_fn: Function called once transfer has finished
953

954
    """
955
    self.name = name
956

    
957
    self.src_io = src_io
958
    self.src_ioargs = src_ioargs
959

    
960
    self.dest_io = dest_io
961
    self.dest_ioargs = dest_ioargs
962

    
963
    self.finished_fn = finished_fn
964

    
965

    
966
class _DiskTransferPrivate(object):
967
  def __init__(self, data, success, export_opts):
968
    """Initializes this class.
969

970
    @type data: L{DiskTransfer}
971
    @type success: bool
972

973
    """
974
    self.data = data
975
    self.success = success
976
    self.export_opts = export_opts
977

    
978
    self.src_export = None
979
    self.dest_import = None
980

    
981
  def RecordResult(self, success):
982
    """Updates the status.
983

984
    One failed part will cause the whole transfer to fail.
985

986
    """
987
    self.success = self.success and success
988

    
989

    
990
def _GetInstDiskMagic(base, instance_name, index):
991
  """Computes the magic value for a disk export or import.
992

993
  @type base: string
994
  @param base: Random seed value (can be the same for all disks of a transfer)
995
  @type instance_name: string
996
  @param instance_name: Name of instance
997
  @type index: number
998
  @param index: Disk index
999

1000
  """
1001
  h = compat.sha1_hash()
1002
  h.update(str(constants.RIE_VERSION))
1003
  h.update(base)
1004
  h.update(instance_name)
1005
  h.update(str(index))
1006
  return h.hexdigest()
1007

    
1008

    
1009
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010
                         dest_ip, compress, instance, all_transfers):
1011
  """Transfers an instance's data from one node to another.
1012

1013
  @param lu: Logical unit instance
1014
  @param feedback_fn: Feedback function
1015
  @type src_node_uuid: string
1016
  @param src_node_uuid: Source node UUID
1017
  @type dest_node_uuid: string
1018
  @param dest_node_uuid: Destination node UUID
1019
  @type dest_ip: string
1020
  @param dest_ip: IP address of destination node
1021
  @type compress: string
1022
  @param compress: one of L{constants.IEC_ALL}
1023
  @type instance: L{objects.Instance}
1024
  @param instance: Instance object
1025
  @type all_transfers: list of L{DiskTransfer} instances
1026
  @param all_transfers: List of all disk transfers to be made
1027
  @rtype: list
1028
  @return: List with a boolean (True=successful, False=failed) for success for
1029
           each transfer
1030

1031
  """
1032
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1033
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1034

    
1035
  logging.debug("Source node %s, destination node %s, compression '%s'",
1036
                src_node_name, dest_node_name, compress)
1037

    
1038
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1039
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1040
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1041
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1042
                                 src_node_uuid, src_cbs, dest_node_uuid,
1043
                                 dest_ip)
1044

    
1045
  all_dtp = []
1046

    
1047
  base_magic = utils.GenerateSecret(6)
1048

    
1049
  ieloop = ImportExportLoop(lu)
1050
  try:
1051
    for idx, transfer in enumerate(all_transfers):
1052
      if transfer:
1053
        feedback_fn("Exporting %s from %s to %s" %
1054
                    (transfer.name, src_node_name, dest_node_name))
1055

    
1056
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1057
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1058
                                           compress=compress, magic=magic)
1059

    
1060
        dtp = _DiskTransferPrivate(transfer, True, opts)
1061

    
1062
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1063
                        transfer.dest_io, transfer.dest_ioargs,
1064
                        timeouts, dest_cbs, private=dtp)
1065
        ieloop.Add(di)
1066

    
1067
        dtp.dest_import = di
1068
      else:
1069
        dtp = _DiskTransferPrivate(None, False, None)
1070

    
1071
      all_dtp.append(dtp)
1072

    
1073
    ieloop.Run()
1074
  finally:
1075
    ieloop.FinalizeAll()
1076

    
1077
  assert len(all_dtp) == len(all_transfers)
1078
  assert compat.all((dtp.src_export is None or
1079
                      dtp.src_export.success is not None) and
1080
                     (dtp.dest_import is None or
1081
                      dtp.dest_import.success is not None)
1082
                     for dtp in all_dtp), \
1083
         "Not all imports/exports are finalized"
1084

    
1085
  return [bool(dtp.success) for dtp in all_dtp]
1086

    
1087

    
1088
class _RemoteExportCb(ImportExportCbBase):
1089
  def __init__(self, feedback_fn, disk_count):
1090
    """Initializes this class.
1091

1092
    """
1093
    ImportExportCbBase.__init__(self)
1094
    self._feedback_fn = feedback_fn
1095
    self._dresults = [None] * disk_count
1096

    
1097
  @property
1098
  def disk_results(self):
1099
    """Returns per-disk results.
1100

1101
    """
1102
    return self._dresults
1103

    
1104
  def ReportConnected(self, ie, private):
1105
    """Called when a connection has been established.
1106

1107
    """
1108
    (idx, _) = private
1109

    
1110
    self._feedback_fn("Disk %s is now sending data" % idx)
1111

    
1112
  def ReportProgress(self, ie, private):
1113
    """Called when new progress information should be reported.
1114

1115
    """
1116
    (idx, _) = private
1117

    
1118
    progress = ie.progress
1119
    if not progress:
1120
      return
1121

    
1122
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1123

    
1124
  def ReportFinished(self, ie, private):
1125
    """Called when a transfer has finished.
1126

1127
    """
1128
    (idx, finished_fn) = private
1129

    
1130
    if ie.success:
1131
      self._feedback_fn("Disk %s finished sending data" % idx)
1132
    else:
1133
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1134
                        (idx, ie.final_message, ie.recent_output))
1135

    
1136
    self._dresults[idx] = bool(ie.success)
1137

    
1138
    if finished_fn:
1139
      finished_fn()
1140

    
1141

    
1142
class ExportInstanceHelper:
1143
  def __init__(self, lu, feedback_fn, instance):
1144
    """Initializes this class.
1145

1146
    @param lu: Logical unit instance
1147
    @param feedback_fn: Feedback function
1148
    @type instance: L{objects.Instance}
1149
    @param instance: Instance object
1150

1151
    """
1152
    self._lu = lu
1153
    self._feedback_fn = feedback_fn
1154
    self._instance = instance
1155

    
1156
    self._snap_disks = []
1157
    self._removed_snaps = [False] * len(instance.disks)
1158

    
1159
  def CreateSnapshots(self):
1160
    """Creates an LVM snapshot for every disk of the instance.
1161

1162
    """
1163
    assert not self._snap_disks
1164

    
1165
    instance = self._instance
1166
    src_node = instance.primary_node
1167
    src_node_name = self._lu.cfg.GetNodeName(src_node)
1168

    
1169
    for idx, disk in enumerate(instance.disks):
1170
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171
                        (idx, src_node_name))
1172

    
1173
      # result.payload will be a snapshot of an lvm leaf of the one we
1174
      # passed
1175
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176
      new_dev = False
1177
      msg = result.fail_msg
1178
      if msg:
1179
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180
                            idx, src_node_name, msg)
1181
      elif (not isinstance(result.payload, (tuple, list)) or
1182
            len(result.payload) != 2):
1183
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184
                            " result '%s'", idx, src_node_name, result.payload)
1185
      else:
1186
        disk_id = tuple(result.payload)
1187
        disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188
        new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189
                               logical_id=disk_id, iv_name=disk.iv_name,
1190
                               params=disk_params)
1191

    
1192
      self._snap_disks.append(new_dev)
1193

    
1194
    assert len(self._snap_disks) == len(instance.disks)
1195
    assert len(self._removed_snaps) == len(instance.disks)
1196

    
1197
  def _RemoveSnapshot(self, disk_index):
1198
    """Removes an LVM snapshot.
1199

1200
    @type disk_index: number
1201
    @param disk_index: Index of the snapshot to be removed
1202

1203
    """
1204
    disk = self._snap_disks[disk_index]
1205
    if disk and not self._removed_snaps[disk_index]:
1206
      src_node = self._instance.primary_node
1207
      src_node_name = self._lu.cfg.GetNodeName(src_node)
1208

    
1209
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210
                        (disk_index, src_node_name))
1211

    
1212
      result = self._lu.rpc.call_blockdev_remove(src_node,
1213
                                                 (disk, self._instance))
1214
      if result.fail_msg:
1215
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1216
                            " %s: %s", disk_index, src_node_name,
1217
                            result.fail_msg)
1218
      else:
1219
        self._removed_snaps[disk_index] = True
1220

    
1221
  def LocalExport(self, dest_node, compress):
1222
    """Intra-cluster instance export.
1223

1224
    @type dest_node: L{objects.Node}
1225
    @param dest_node: Destination node
1226
    @type compress: string
1227
    @param compress: one of L{constants.IEC_ALL}
1228

1229
    """
1230
    instance = self._instance
1231
    src_node_uuid = instance.primary_node
1232

    
1233
    assert len(self._snap_disks) == len(instance.disks)
1234

    
1235
    transfers = []
1236

    
1237
    for idx, dev in enumerate(self._snap_disks):
1238
      if not dev:
1239
        transfers.append(None)
1240
        continue
1241

    
1242
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1243
                            dev.logical_id[1])
1244

    
1245
      finished_fn = compat.partial(self._TransferFinished, idx)
1246

    
1247
      # FIXME: pass debug option from opcode to backend
1248
      dt = DiskTransfer("snapshot/%s" % idx,
1249
                        constants.IEIO_SCRIPT, ((dev, instance), idx),
1250
                        constants.IEIO_FILE, (path, ),
1251
                        finished_fn)
1252
      transfers.append(dt)
1253

    
1254
    # Actually export data
1255
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1256
                                    src_node_uuid, dest_node.uuid,
1257
                                    dest_node.secondary_ip,
1258
                                    compress,
1259
                                    instance, transfers)
1260

    
1261
    assert len(dresults) == len(instance.disks)
1262

    
1263
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1264
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1265
                                               self._snap_disks)
1266
    msg = result.fail_msg
1267
    fin_resu = not msg
1268
    if msg:
1269
      self._lu.LogWarning("Could not finalize export for instance %s"
1270
                          " on node %s: %s", instance.name, dest_node.name, msg)
1271

    
1272
    return (fin_resu, dresults)
1273

    
1274
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1275
    """Inter-cluster instance export.
1276

1277
    @type disk_info: list
1278
    @param disk_info: Per-disk destination information
1279
    @type key_name: string
1280
    @param key_name: Name of X509 key to use
1281
    @type dest_ca_pem: string
1282
    @param dest_ca_pem: Destination X509 CA in PEM format
1283
    @type timeouts: L{ImportExportTimeouts}
1284
    @param timeouts: Timeouts for this import
1285

1286
    """
1287
    instance = self._instance
1288

    
1289
    assert len(disk_info) == len(instance.disks)
1290

    
1291
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1292

    
1293
    ieloop = ImportExportLoop(self._lu)
1294
    try:
1295
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1296
                                                           disk_info)):
1297
        # Decide whether to use IPv6
1298
        ipv6 = netutils.IP6Address.IsValid(host)
1299

    
1300
        opts = objects.ImportExportOptions(key_name=key_name,
1301
                                           ca_pem=dest_ca_pem,
1302
                                           magic=magic, ipv6=ipv6)
1303

    
1304
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1305
        finished_fn = compat.partial(self._TransferFinished, idx)
1306
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1307
                              opts, host, port, instance, "disk%d" % idx,
1308
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1309
                              timeouts, cbs, private=(idx, finished_fn)))
1310

    
1311
      ieloop.Run()
1312
    finally:
1313
      ieloop.FinalizeAll()
1314

    
1315
    return (True, cbs.disk_results)
1316

    
1317
  def _TransferFinished(self, idx):
1318
    """Called once a transfer has finished.
1319

1320
    @type idx: number
1321
    @param idx: Disk index
1322

1323
    """
1324
    logging.debug("Transfer %s finished", idx)
1325
    self._RemoveSnapshot(idx)
1326

    
1327
  def Cleanup(self):
1328
    """Remove all snapshots.
1329

1330
    """
1331
    assert len(self._removed_snaps) == len(self._instance.disks)
1332
    for idx in range(len(self._instance.disks)):
1333
      self._RemoveSnapshot(idx)
1334

    
1335

    
1336
class _RemoteImportCb(ImportExportCbBase):
1337
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1338
               external_address):
1339
    """Initializes this class.
1340

1341
    @type cds: string
1342
    @param cds: Cluster domain secret
1343
    @type x509_cert_pem: string
1344
    @param x509_cert_pem: CA used for signing import key
1345
    @type disk_count: number
1346
    @param disk_count: Number of disks
1347
    @type external_address: string
1348
    @param external_address: External address of destination node
1349

1350
    """
1351
    ImportExportCbBase.__init__(self)
1352
    self._feedback_fn = feedback_fn
1353
    self._cds = cds
1354
    self._x509_cert_pem = x509_cert_pem
1355
    self._disk_count = disk_count
1356
    self._external_address = external_address
1357

    
1358
    self._dresults = [None] * disk_count
1359
    self._daemon_port = [None] * disk_count
1360

    
1361
    self._salt = utils.GenerateSecret(8)
1362

    
1363
  @property
1364
  def disk_results(self):
1365
    """Returns per-disk results.
1366

1367
    """
1368
    return self._dresults
1369

    
1370
  def _CheckAllListening(self):
1371
    """Checks whether all daemons are listening.
1372

1373
    If all daemons are listening, the information is sent to the client.
1374

1375
    """
1376
    if not compat.all(dp is not None for dp in self._daemon_port):
1377
      return
1378

    
1379
    host = self._external_address
1380

    
1381
    disks = []
1382
    for idx, (port, magic) in enumerate(self._daemon_port):
1383
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1384
                                               idx, host, port, magic))
1385

    
1386
    assert len(disks) == self._disk_count
1387

    
1388
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1389
      "disks": disks,
1390
      "x509_ca": self._x509_cert_pem,
1391
      })
1392

    
1393
  def ReportListening(self, ie, private, _):
1394
    """Called when daemon started listening.
1395

1396
    """
1397
    (idx, ) = private
1398

    
1399
    self._feedback_fn("Disk %s is now listening" % idx)
1400

    
1401
    assert self._daemon_port[idx] is None
1402

    
1403
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1404

    
1405
    self._CheckAllListening()
1406

    
1407
  def ReportConnected(self, ie, private):
1408
    """Called when a connection has been established.
1409

1410
    """
1411
    (idx, ) = private
1412

    
1413
    self._feedback_fn("Disk %s is now receiving data" % idx)
1414

    
1415
  def ReportFinished(self, ie, private):
1416
    """Called when a transfer has finished.
1417

1418
    """
1419
    (idx, ) = private
1420

    
1421
    # Daemon is certainly no longer listening
1422
    self._daemon_port[idx] = None
1423

    
1424
    if ie.success:
1425
      self._feedback_fn("Disk %s finished receiving data" % idx)
1426
    else:
1427
      self._feedback_fn(("Disk %s failed to receive data: %s"
1428
                         " (recent output: %s)") %
1429
                        (idx, ie.final_message, ie.recent_output))
1430

    
1431
    self._dresults[idx] = bool(ie.success)
1432

    
1433

    
1434
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1435
                 cds, timeouts):
1436
  """Imports an instance from another cluster.
1437

1438
  @param lu: Logical unit instance
1439
  @param feedback_fn: Feedback function
1440
  @type instance: L{objects.Instance}
1441
  @param instance: Instance object
1442
  @type pnode: L{objects.Node}
1443
  @param pnode: Primary node of instance as an object
1444
  @type source_x509_ca: OpenSSL.crypto.X509
1445
  @param source_x509_ca: Import source's X509 CA
1446
  @type cds: string
1447
  @param cds: Cluster domain secret
1448
  @type timeouts: L{ImportExportTimeouts}
1449
  @param timeouts: Timeouts for this import
1450

1451
  """
1452
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1453
                                                  source_x509_ca)
1454

    
1455
  magic_base = utils.GenerateSecret(6)
1456

    
1457
  # Decide whether to use IPv6
1458
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1459

    
1460
  # Create crypto key
1461
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1462
                                        constants.RIE_CERT_VALIDITY)
1463
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1464

    
1465
  (x509_key_name, x509_cert_pem) = result.payload
1466
  try:
1467
    # Load certificate
1468
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1469
                                                x509_cert_pem)
1470

    
1471
    # Sign certificate
1472
    signed_x509_cert_pem = \
1473
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1474

    
1475
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1476
                          len(instance.disks), pnode.primary_ip)
1477

    
1478
    ieloop = ImportExportLoop(lu)
1479
    try:
1480
      for idx, dev in enumerate(instance.disks):
1481
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1482

    
1483
        # Import daemon options
1484
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1485
                                           ca_pem=source_ca_pem,
1486
                                           magic=magic, ipv6=ipv6)
1487

    
1488
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1489
                              "disk%d" % idx,
1490
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1491
                              timeouts, cbs, private=(idx, )))
1492

    
1493
      ieloop.Run()
1494
    finally:
1495
      ieloop.FinalizeAll()
1496
  finally:
1497
    # Remove crypto key and certificate
1498
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1499
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1500

    
1501
  return cbs.disk_results
1502

    
1503

    
1504
def _GetImportExportHandshakeMessage(version):
1505
  """Returns the handshake message for a RIE protocol version.
1506

1507
  @type version: number
1508

1509
  """
1510
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1511

    
1512

    
1513
def ComputeRemoteExportHandshake(cds):
1514
  """Computes the remote import/export handshake.
1515

1516
  @type cds: string
1517
  @param cds: Cluster domain secret
1518

1519
  """
1520
  salt = utils.GenerateSecret(8)
1521
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1522
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1523

    
1524

    
1525
def CheckRemoteExportHandshake(cds, handshake):
1526
  """Checks the handshake of a remote import/export.
1527

1528
  @type cds: string
1529
  @param cds: Cluster domain secret
1530
  @type handshake: sequence
1531
  @param handshake: Handshake sent by remote peer
1532

1533
  """
1534
  try:
1535
    (version, hmac_digest, hmac_salt) = handshake
1536
  except (TypeError, ValueError), err:
1537
    return "Invalid data: %s" % err
1538

    
1539
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1540
                              hmac_digest, salt=hmac_salt):
1541
    return "Hash didn't match, clusters don't share the same domain secret"
1542

    
1543
  if version != constants.RIE_VERSION:
1544
    return ("Clusters don't have the same remote import/export protocol"
1545
            " (local=%s, remote=%s)" %
1546
            (constants.RIE_VERSION, version))
1547

    
1548
  return None
1549

    
1550

    
1551
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1552
  """Returns the hashed text for import/export disk information.
1553

1554
  @type disk_index: number
1555
  @param disk_index: Index of disk (included in hash)
1556
  @type host: string
1557
  @param host: Hostname
1558
  @type port: number
1559
  @param port: Daemon port
1560
  @type magic: string
1561
  @param magic: Magic value
1562

1563
  """
1564
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1565

    
1566

    
1567
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1568
  """Verifies received disk information for an export.
1569

1570
  @type cds: string
1571
  @param cds: Cluster domain secret
1572
  @type disk_index: number
1573
  @param disk_index: Index of disk (included in hash)
1574
  @type disk_info: sequence
1575
  @param disk_info: Disk information sent by remote peer
1576

1577
  """
1578
  try:
1579
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1580
  except (TypeError, ValueError), err:
1581
    raise errors.GenericError("Invalid data: %s" % err)
1582

    
1583
  if not (host and port and magic):
1584
    raise errors.GenericError("Missing destination host, port or magic")
1585

    
1586
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1587

    
1588
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1589
    raise errors.GenericError("HMAC is wrong")
1590

    
1591
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1592
    destination = host
1593
  else:
1594
    destination = netutils.Hostname.GetNormalizedName(host)
1595

    
1596
  return (destination,
1597
          utils.ValidateServiceName(port),
1598
          magic)
1599

    
1600

    
1601
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1602
  """Computes the signed disk information for a remote import.
1603

1604
  @type cds: string
1605
  @param cds: Cluster domain secret
1606
  @type salt: string
1607
  @param salt: HMAC salt
1608
  @type disk_index: number
1609
  @param disk_index: Index of disk (included in hash)
1610
  @type host: string
1611
  @param host: Hostname
1612
  @type port: number
1613
  @param port: Daemon port
1614
  @type magic: string
1615
  @param magic: Magic value
1616

1617
  """
1618
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1619
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1620
  return (host, port, magic, hmac_digest, salt)
1621

    
1622

    
1623
def CalculateGroupIPolicy(cluster, group):
1624
  """Calculate instance policy for group.
1625

1626
  """
1627
  return cluster.SimpleFillIPolicy(group.ipolicy)
1628

    
1629

    
1630
def ComputeDiskSize(disk_template, disks):
1631
  """Compute disk size requirements according to disk template
1632

1633
  """
1634
  # Required free disk space as a function of disk and swap space
1635
  req_size_dict = {
1636
    constants.DT_DISKLESS: 0,
1637
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1638
    # 128 MB are added for drbd metadata for each disk
1639
    constants.DT_DRBD8:
1640
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1641
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1642
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1643
    constants.DT_BLOCK: 0,
1644
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1645
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1646
  }
1647

    
1648
  if disk_template not in req_size_dict:
1649
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1650
                                 " is unknown" % disk_template)
1651

    
1652
  return req_size_dict[disk_template]