Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ f198cf91

History | View | Annotate | Download (46.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_uuid, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_uuid: string
143
    @param node_uuid: Node UUID for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_uuid = node_uuid
161
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162
    self._opts = opts.Copy()
163
    self._instance = instance
164
    self._component = component
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Set master daemon's timeout in options for import/export daemon
170
    assert self._opts.connect_timeout is None
171
    self._opts.connect_timeout = timeouts.connect
172

    
173
    # Parent loop
174
    self._loop = None
175

    
176
    # Timestamps
177
    self._ts_begin = None
178
    self._ts_connected = None
179
    self._ts_finished = None
180
    self._ts_cleanup = None
181
    self._ts_last_progress = None
182
    self._ts_last_error = None
183

    
184
    # Transfer status
185
    self.success = None
186
    self.final_message = None
187

    
188
    # Daemon status
189
    self._daemon_name = None
190
    self._daemon = None
191

    
192
  @property
193
  def recent_output(self):
194
    """Returns the most recent output from the daemon.
195

196
    """
197
    if self._daemon:
198
      return "\n".join(self._daemon.recent_output)
199

    
200
    return None
201

    
202
  @property
203
  def progress(self):
204
    """Returns transfer progress information.
205

206
    """
207
    if not self._daemon:
208
      return None
209

    
210
    return (self._daemon.progress_mbytes,
211
            self._daemon.progress_throughput,
212
            self._daemon.progress_percent,
213
            self._daemon.progress_eta)
214

    
215
  @property
216
  def magic(self):
217
    """Returns the magic value for this import/export.
218

219
    """
220
    return self._opts.magic
221

    
222
  @property
223
  def active(self):
224
    """Determines whether this transport is still active.
225

226
    """
227
    return self.success is None
228

    
229
  @property
230
  def loop(self):
231
    """Returns parent loop.
232

233
    @rtype: L{ImportExportLoop}
234

235
    """
236
    return self._loop
237

    
238
  def SetLoop(self, loop):
239
    """Sets the parent loop.
240

241
    @type loop: L{ImportExportLoop}
242

243
    """
244
    if self._loop:
245
      raise errors.ProgrammerError("Loop can only be set once")
246

    
247
    self._loop = loop
248

    
249
  def _StartDaemon(self):
250
    """Starts the import/export daemon.
251

252
    """
253
    raise NotImplementedError()
254

    
255
  def CheckDaemon(self):
256
    """Checks whether daemon has been started and if not, starts it.
257

258
    @rtype: string
259
    @return: Daemon name
260

261
    """
262
    assert self._ts_cleanup is None
263

    
264
    if self._daemon_name is None:
265
      assert self._ts_begin is None
266

    
267
      result = self._StartDaemon()
268
      if result.fail_msg:
269
        raise _ImportExportError("Failed to start %s on %s: %s" %
270
                                 (self.MODE_TEXT, self.node_name,
271
                                  result.fail_msg))
272

    
273
      daemon_name = result.payload
274

    
275
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276
                   self.node_name)
277

    
278
      self._ts_begin = time.time()
279
      self._daemon_name = daemon_name
280

    
281
    return self._daemon_name
282

    
283
  def GetDaemonName(self):
284
    """Returns the daemon name.
285

286
    """
287
    assert self._daemon_name, "Daemon has not been started"
288
    assert self._ts_cleanup is None
289
    return self._daemon_name
290

    
291
  def Abort(self):
292
    """Sends SIGTERM to import/export daemon (if still active).
293

294
    """
295
    if self._daemon_name:
296
      self._lu.LogWarning("Aborting %s '%s' on %s",
297
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299
      if result.fail_msg:
300
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301
                            self.MODE_TEXT, self._daemon_name,
302
                            self.node_uuid, result.fail_msg)
303
        return False
304

    
305
    return True
306

    
307
  def _SetDaemonData(self, data):
308
    """Internal function for updating status daemon data.
309

310
    @type data: L{objects.ImportExportStatus}
311
    @param data: Daemon status data
312

313
    """
314
    assert self._ts_begin is not None
315

    
316
    if not data:
317
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318
        raise _ImportExportError("Didn't become ready after %s seconds" %
319
                                 self._timeouts.ready)
320

    
321
      return False
322

    
323
    self._daemon = data
324

    
325
    return True
326

    
327
  def SetDaemonData(self, success, data):
328
    """Updates daemon status data.
329

330
    @type success: bool
331
    @param success: Whether fetching data was successful or not
332
    @type data: L{objects.ImportExportStatus}
333
    @param data: Daemon status data
334

335
    """
336
    if not success:
337
      if self._ts_last_error is None:
338
        self._ts_last_error = time.time()
339

    
340
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341
        raise _ImportExportError("Too many errors while updating data")
342

    
343
      return False
344

    
345
    self._ts_last_error = None
346

    
347
    return self._SetDaemonData(data)
348

    
349
  def CheckListening(self):
350
    """Checks whether the daemon is listening.
351

352
    """
353
    raise NotImplementedError()
354

    
355
  def _GetConnectedCheckEpoch(self):
356
    """Returns timeout to calculate connect timeout.
357

358
    """
359
    raise NotImplementedError()
360

    
361
  def CheckConnected(self):
362
    """Checks whether the daemon is connected.
363

364
    @rtype: bool
365
    @return: Whether the daemon is connected
366

367
    """
368
    assert self._daemon, "Daemon status missing"
369

    
370
    if self._ts_connected is not None:
371
      return True
372

    
373
    if self._daemon.connected:
374
      self._ts_connected = time.time()
375

    
376
      # TODO: Log remote peer
377
      logging.debug("%s '%s' on %s is now connected",
378
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379

    
380
      self._cbs.ReportConnected(self, self._private)
381

    
382
      return True
383

    
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385
                            self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
        utils.TimeoutExpired(self._ts_last_progress,
397
                             self._timeouts.progress)) and
398
        self._daemon and
399
        self._daemon.progress_mbytes is not None and
400
        self._daemon.progress_throughput is not None):
401
      self._cbs.ReportProgress(self, self._private)
402
      self._ts_last_progress = time.time()
403

    
404
  def CheckFinished(self):
405
    """Checks whether the daemon exited.
406

407
    @rtype: bool
408
    @return: Whether the transfer is finished
409

410
    """
411
    assert self._daemon, "Daemon status missing"
412

    
413
    if self._ts_finished:
414
      return True
415

    
416
    if self._daemon.exit_status is None:
417
      # TODO: Adjust delay for ETA expiring soon
418
      self._CheckProgress()
419
      return False
420

    
421
    self._ts_finished = time.time()
422

    
423
    self._ReportFinished(self._daemon.exit_status == 0,
424
                         self._daemon.error_message)
425

    
426
    return True
427

    
428
  def _ReportFinished(self, success, message):
429
    """Transfer is finished or daemon exited.
430

431
    @type success: bool
432
    @param success: Whether the transfer was successful
433
    @type message: string
434
    @param message: Error message
435

436
    """
437
    assert self.success is None
438

    
439
    self.success = success
440
    self.final_message = message
441

    
442
    if success:
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444
                   self._daemon_name, self.node_uuid)
445
    elif self._daemon_name:
446
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447
                          self.MODE_TEXT, self._daemon_name,
448
                          self._lu.cfg.GetNodeName(self.node_uuid),
449
                          message)
450
    else:
451
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453

    
454
    self._cbs.ReportFinished(self, self._private)
455

    
456
  def _Finalize(self):
457
    """Makes the RPC call to finalize this import/export.
458

459
    """
460
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461

    
462
  def Finalize(self, error=None):
463
    """Finalizes this import/export.
464

465
    """
466
    if self._daemon_name:
467
      logging.info("Finalizing %s '%s' on %s",
468
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469

    
470
      result = self._Finalize()
471
      if result.fail_msg:
472
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473
                            self.MODE_TEXT, self._daemon_name,
474
                            self.node_uuid, result.fail_msg)
475
        return False
476

    
477
      # Daemon is no longer running
478
      self._daemon_name = None
479
      self._ts_cleanup = time.time()
480

    
481
    if error:
482
      self._ReportFinished(False, error)
483

    
484
    return True
485

    
486

    
487
class DiskImport(_DiskImportExportBase):
488
  MODE_TEXT = "import"
489

    
490
  def __init__(self, lu, node_uuid, opts, instance, component,
491
               dest, dest_args, timeouts, cbs, private=None):
492
    """Initializes this class.
493

494
    @param lu: Logical unit instance
495
    @type node_uuid: string
496
    @param node_uuid: Node name for import
497
    @type opts: L{objects.ImportExportOptions}
498
    @param opts: Import/export daemon options
499
    @type instance: L{objects.Instance}
500
    @param instance: Instance object
501
    @type component: string
502
    @param component: which part of the instance is being imported
503
    @param dest: I/O destination
504
    @param dest_args: I/O arguments
505
    @type timeouts: L{ImportExportTimeouts}
506
    @param timeouts: Timeouts for this import
507
    @type cbs: L{ImportExportCbBase}
508
    @param cbs: Callbacks
509
    @param private: Private data for callback functions
510

511
    """
512
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513
                                   component, timeouts, cbs, private)
514
    self._dest = dest
515
    self._dest_args = dest_args
516

    
517
    # Timestamps
518
    self._ts_listening = None
519

    
520
  @property
521
  def listen_port(self):
522
    """Returns the port the daemon is listening on.
523

524
    """
525
    if self._daemon:
526
      return self._daemon.listen_port
527

    
528
    return None
529

    
530
  def _StartDaemon(self):
531
    """Starts the import daemon.
532

533
    """
534
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535
                                          self._instance, self._component,
536
                                          (self._dest, self._dest_args))
537

    
538
  def CheckListening(self):
539
    """Checks whether the daemon is listening.
540

541
    @rtype: bool
542
    @return: Whether the daemon is listening
543

544
    """
545
    assert self._daemon, "Daemon status missing"
546

    
547
    if self._ts_listening is not None:
548
      return True
549

    
550
    port = self._daemon.listen_port
551
    if port is not None:
552
      self._ts_listening = time.time()
553

    
554
      logging.debug("Import '%s' on %s is now listening on port %s",
555
                    self._daemon_name, self.node_uuid, port)
556

    
557
      self._cbs.ReportListening(self, self._private, self._component)
558

    
559
      return True
560

    
561
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562
      raise _ImportExportError("Not listening after %s seconds" %
563
                               self._timeouts.listen)
564

    
565
    return False
566

    
567
  def _GetConnectedCheckEpoch(self):
568
    """Returns the time since we started listening.
569

570
    """
571
    assert self._ts_listening is not None, \
572
           ("Checking whether an import is connected is only useful"
573
            " once it's been listening")
574

    
575
    return self._ts_listening
576

    
577

    
578
class DiskExport(_DiskImportExportBase):
579
  MODE_TEXT = "export"
580

    
581
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582
               instance, component, source, source_args,
583
               timeouts, cbs, private=None):
584
    """Initializes this class.
585

586
    @param lu: Logical unit instance
587
    @type node_uuid: string
588
    @param node_uuid: Node UUID for import
589
    @type opts: L{objects.ImportExportOptions}
590
    @param opts: Import/export daemon options
591
    @type dest_host: string
592
    @param dest_host: Destination host name or IP address
593
    @type dest_port: number
594
    @param dest_port: Destination port number
595
    @type instance: L{objects.Instance}
596
    @param instance: Instance object
597
    @type component: string
598
    @param component: which part of the instance is being imported
599
    @param source: I/O source
600
    @param source_args: I/O source
601
    @type timeouts: L{ImportExportTimeouts}
602
    @param timeouts: Timeouts for this import
603
    @type cbs: L{ImportExportCbBase}
604
    @param cbs: Callbacks
605
    @param private: Private data for callback functions
606

607
    """
608
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609
                                   component, timeouts, cbs, private)
610
    self._dest_host = dest_host
611
    self._dest_port = dest_port
612
    self._source = source
613
    self._source_args = source_args
614

    
615
  def _StartDaemon(self):
616
    """Starts the export daemon.
617

618
    """
619
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620
                                          self._dest_host, self._dest_port,
621
                                          self._instance, self._component,
622
                                          (self._source, self._source_args))
623

    
624
  def CheckListening(self):
625
    """Checks whether the daemon is listening.
626

627
    """
628
    # Only an import can be listening
629
    return True
630

    
631
  def _GetConnectedCheckEpoch(self):
632
    """Returns the time since the daemon started.
633

634
    """
635
    assert self._ts_begin is not None
636

    
637
    return self._ts_begin
638

    
639

    
640
def FormatProgress(progress):
641
  """Formats progress information for user consumption
642

643
  """
644
  (mbytes, throughput, percent, eta) = progress
645

    
646
  parts = [
647
    utils.FormatUnit(mbytes, "h"),
648

    
649
    # Not using FormatUnit as it doesn't support kilobytes
650
    "%0.1f MiB/s" % throughput,
651
    ]
652

    
653
  if percent is not None:
654
    parts.append("%d%%" % percent)
655

    
656
  if eta is not None:
657
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658

    
659
  return utils.CommaJoin(parts)
660

    
661

    
662
class ImportExportLoop:
663
  MIN_DELAY = 1.0
664
  MAX_DELAY = 20.0
665

    
666
  def __init__(self, lu):
667
    """Initializes this class.
668

669
    """
670
    self._lu = lu
671
    self._queue = []
672
    self._pending_add = []
673

    
674
  def Add(self, diskie):
675
    """Adds an import/export object to the loop.
676

677
    @type diskie: Subclass of L{_DiskImportExportBase}
678
    @param diskie: Import/export object
679

680
    """
681
    assert diskie not in self._pending_add
682
    assert diskie.loop is None
683

    
684
    diskie.SetLoop(self)
685

    
686
    # Adding new objects to a staging list is necessary, otherwise the main
687
    # loop gets confused if callbacks modify the queue while the main loop is
688
    # iterating over it.
689
    self._pending_add.append(diskie)
690

    
691
  @staticmethod
692
  def _CollectDaemonStatus(lu, daemons):
693
    """Collects the status for all import/export daemons.
694

695
    """
696
    daemon_status = {}
697

    
698
    for node_name, names in daemons.iteritems():
699
      result = lu.rpc.call_impexp_status(node_name, names)
700
      if result.fail_msg:
701
        lu.LogWarning("Failed to get daemon status on %s: %s",
702
                      node_name, result.fail_msg)
703
        continue
704

    
705
      assert len(names) == len(result.payload)
706

    
707
      daemon_status[node_name] = dict(zip(names, result.payload))
708

    
709
    return daemon_status
710

    
711
  @staticmethod
712
  def _GetActiveDaemonNames(queue):
713
    """Gets the names of all active daemons.
714

715
    """
716
    result = {}
717
    for diskie in queue:
718
      if not diskie.active:
719
        continue
720

    
721
      try:
722
        # Start daemon if necessary
723
        daemon_name = diskie.CheckDaemon()
724
      except _ImportExportError, err:
725
        logging.exception("%s failed", diskie.MODE_TEXT)
726
        diskie.Finalize(error=str(err))
727
        continue
728

    
729
      result.setdefault(diskie.node_name, []).append(daemon_name)
730

    
731
    assert len(queue) >= len(result)
732
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733

    
734
    logging.debug("daemons=%r", result)
735

    
736
    return result
737

    
738
  def _AddPendingToQueue(self):
739
    """Adds all pending import/export objects to the internal queue.
740

741
    """
742
    assert compat.all(diskie not in self._queue and diskie.loop == self
743
                      for diskie in self._pending_add)
744

    
745
    self._queue.extend(self._pending_add)
746

    
747
    del self._pending_add[:]
748

    
749
  def Run(self):
750
    """Utility main loop.
751

752
    """
753
    while True:
754
      self._AddPendingToQueue()
755

    
756
      # Collect all active daemon names
757
      daemons = self._GetActiveDaemonNames(self._queue)
758
      if not daemons:
759
        break
760

    
761
      # Collection daemon status data
762
      data = self._CollectDaemonStatus(self._lu, daemons)
763

    
764
      # Use data
765
      delay = self.MAX_DELAY
766
      for diskie in self._queue:
767
        if not diskie.active:
768
          continue
769

    
770
        try:
771
          try:
772
            all_daemon_data = data[diskie.node_name]
773
          except KeyError:
774
            result = diskie.SetDaemonData(False, None)
775
          else:
776
            result = \
777
              diskie.SetDaemonData(True,
778
                                   all_daemon_data[diskie.GetDaemonName()])
779

    
780
          if not result:
781
            # Daemon not yet ready, retry soon
782
            delay = min(3.0, delay)
783
            continue
784

    
785
          if diskie.CheckFinished():
786
            # Transfer finished
787
            diskie.Finalize()
788
            continue
789

    
790
          # Normal case: check again in 5 seconds
791
          delay = min(5.0, delay)
792

    
793
          if not diskie.CheckListening():
794
            # Not yet listening, retry soon
795
            delay = min(1.0, delay)
796
            continue
797

    
798
          if not diskie.CheckConnected():
799
            # Not yet connected, retry soon
800
            delay = min(1.0, delay)
801
            continue
802

    
803
        except _ImportExportError, err:
804
          logging.exception("%s failed", diskie.MODE_TEXT)
805
          diskie.Finalize(error=str(err))
806

    
807
      if not compat.any(diskie.active for diskie in self._queue):
808
        break
809

    
810
      # Wait a bit
811
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812
      logging.debug("Waiting for %ss", delay)
813
      time.sleep(delay)
814

    
815
  def FinalizeAll(self):
816
    """Finalizes all pending transfers.
817

818
    """
819
    success = True
820

    
821
    for diskie in self._queue:
822
      success = diskie.Finalize() and success
823

    
824
    return success
825

    
826

    
827
class _TransferInstCbBase(ImportExportCbBase):
828
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829
               src_cbs, dest_node_uuid, dest_ip):
830
    """Initializes this class.
831

832
    """
833
    ImportExportCbBase.__init__(self)
834

    
835
    self.lu = lu
836
    self.feedback_fn = feedback_fn
837
    self.instance = instance
838
    self.timeouts = timeouts
839
    self.src_node_uuid = src_node_uuid
840
    self.src_cbs = src_cbs
841
    self.dest_node_uuid = dest_node_uuid
842
    self.dest_ip = dest_ip
843

    
844

    
845
class _TransferInstSourceCb(_TransferInstCbBase):
846
  def ReportConnected(self, ie, dtp):
847
    """Called when a connection has been established.
848

849
    """
850
    assert self.src_cbs is None
851
    assert dtp.src_export == ie
852
    assert dtp.dest_import
853

    
854
    self.feedback_fn("%s is sending data on %s" %
855
                     (dtp.data.name, ie.node_name))
856

    
857
  def ReportProgress(self, ie, dtp):
858
    """Called when new progress information should be reported.
859

860
    """
861
    progress = ie.progress
862
    if not progress:
863
      return
864

    
865
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866

    
867
  def ReportFinished(self, ie, dtp):
868
    """Called when a transfer has finished.
869

870
    """
871
    assert self.src_cbs is None
872
    assert dtp.src_export == ie
873
    assert dtp.dest_import
874

    
875
    if ie.success:
876
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877
    else:
878
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879
                       (dtp.data.name, ie.final_message, ie.recent_output))
880

    
881
    dtp.RecordResult(ie.success)
882

    
883
    cb = dtp.data.finished_fn
884
    if cb:
885
      cb()
886

    
887
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888
    # give the daemon a moment to sort things out
889
    if dtp.dest_import and not ie.success:
890
      dtp.dest_import.Abort()
891

    
892

    
893
class _TransferInstDestCb(_TransferInstCbBase):
894
  def ReportListening(self, ie, dtp, component):
895
    """Called when daemon started listening.
896

897
    """
898
    assert self.src_cbs
899
    assert dtp.src_export is None
900
    assert dtp.dest_import
901
    assert dtp.export_opts
902

    
903
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904

    
905
    # Start export on source node
906
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907
                    self.dest_ip, ie.listen_port, self.instance,
908
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909
                    self.timeouts, self.src_cbs, private=dtp)
910
    ie.loop.Add(de)
911

    
912
    dtp.src_export = de
913

    
914
  def ReportConnected(self, ie, dtp):
915
    """Called when a connection has been established.
916

917
    """
918
    self.feedback_fn("%s is receiving data on %s" %
919
                     (dtp.data.name,
920
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921

    
922
  def ReportFinished(self, ie, dtp):
923
    """Called when a transfer has finished.
924

925
    """
926
    if ie.success:
927
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928
    else:
929
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930
                       (dtp.data.name, ie.final_message, ie.recent_output))
931

    
932
    dtp.RecordResult(ie.success)
933

    
934
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935
    # give the daemon a moment to sort things out
936
    if dtp.src_export and not ie.success:
937
      dtp.src_export.Abort()
938

    
939

    
940
class DiskTransfer(object):
941
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942
               finished_fn):
943
    """Initializes this class.
944

945
    @type name: string
946
    @param name: User-visible name for this transfer (e.g. "disk/0")
947
    @param src_io: Source I/O type
948
    @param src_ioargs: Source I/O arguments
949
    @param dest_io: Destination I/O type
950
    @param dest_ioargs: Destination I/O arguments
951
    @type finished_fn: callable
952
    @param finished_fn: Function called once transfer has finished
953

954
    """
955
    self.name = name
956

    
957
    self.src_io = src_io
958
    self.src_ioargs = src_ioargs
959

    
960
    self.dest_io = dest_io
961
    self.dest_ioargs = dest_ioargs
962

    
963
    self.finished_fn = finished_fn
964

    
965

    
966
class _DiskTransferPrivate(object):
967
  def __init__(self, data, success, export_opts):
968
    """Initializes this class.
969

970
    @type data: L{DiskTransfer}
971
    @type success: bool
972

973
    """
974
    self.data = data
975
    self.success = success
976
    self.export_opts = export_opts
977

    
978
    self.src_export = None
979
    self.dest_import = None
980

    
981
  def RecordResult(self, success):
982
    """Updates the status.
983

984
    One failed part will cause the whole transfer to fail.
985

986
    """
987
    self.success = self.success and success
988

    
989

    
990
def _GetInstDiskMagic(base, instance_name, index):
991
  """Computes the magic value for a disk export or import.
992

993
  @type base: string
994
  @param base: Random seed value (can be the same for all disks of a transfer)
995
  @type instance_name: string
996
  @param instance_name: Name of instance
997
  @type index: number
998
  @param index: Disk index
999

1000
  """
1001
  h = compat.sha1_hash()
1002
  h.update(str(constants.RIE_VERSION))
1003
  h.update(base)
1004
  h.update(instance_name)
1005
  h.update(str(index))
1006
  return h.hexdigest()
1007

    
1008

    
1009
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010
                         dest_ip, compress, instance, all_transfers):
1011
  """Transfers an instance's data from one node to another.
1012

1013
  @param lu: Logical unit instance
1014
  @param feedback_fn: Feedback function
1015
  @type src_node_uuid: string
1016
  @param src_node_uuid: Source node UUID
1017
  @type dest_node_uuid: string
1018
  @param dest_node_uuid: Destination node UUID
1019
  @type dest_ip: string
1020
  @param dest_ip: IP address of destination node
1021
  @type compress: string
1022
  @param compress: one of L{constants.IEC_ALL}
1023
  @type instance: L{objects.Instance}
1024
  @param instance: Instance object
1025
  @type all_transfers: list of L{DiskTransfer} instances
1026
  @param all_transfers: List of all disk transfers to be made
1027
  @rtype: list
1028
  @return: List with a boolean (True=successful, False=failed) for success for
1029
           each transfer
1030

1031
  """
1032
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1033
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1034

    
1035
  logging.debug("Source node %s, destination node %s, compression '%s'",
1036
                src_node_name, dest_node_name, compress)
1037

    
1038
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1039
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1040
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1041
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1042
                                 src_node_uuid, src_cbs, dest_node_uuid,
1043
                                 dest_ip)
1044

    
1045
  all_dtp = []
1046

    
1047
  base_magic = utils.GenerateSecret(6)
1048

    
1049
  ieloop = ImportExportLoop(lu)
1050
  try:
1051
    for idx, transfer in enumerate(all_transfers):
1052
      if transfer:
1053
        feedback_fn("Exporting %s from %s to %s" %
1054
                    (transfer.name, src_node_name, dest_node_name))
1055

    
1056
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1057
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1058
                                           compress=compress, magic=magic)
1059

    
1060
        dtp = _DiskTransferPrivate(transfer, True, opts)
1061

    
1062
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1063
                        transfer.dest_io, transfer.dest_ioargs,
1064
                        timeouts, dest_cbs, private=dtp)
1065
        ieloop.Add(di)
1066

    
1067
        dtp.dest_import = di
1068
      else:
1069
        dtp = _DiskTransferPrivate(None, False, None)
1070

    
1071
      all_dtp.append(dtp)
1072

    
1073
    ieloop.Run()
1074
  finally:
1075
    ieloop.FinalizeAll()
1076

    
1077
  assert len(all_dtp) == len(all_transfers)
1078
  assert compat.all((dtp.src_export is None or
1079
                      dtp.src_export.success is not None) and
1080
                     (dtp.dest_import is None or
1081
                      dtp.dest_import.success is not None)
1082
                     for dtp in all_dtp), \
1083
         "Not all imports/exports are finalized"
1084

    
1085
  return [bool(dtp.success) for dtp in all_dtp]
1086

    
1087

    
1088
class _RemoteExportCb(ImportExportCbBase):
1089
  def __init__(self, feedback_fn, disk_count):
1090
    """Initializes this class.
1091

1092
    """
1093
    ImportExportCbBase.__init__(self)
1094
    self._feedback_fn = feedback_fn
1095
    self._dresults = [None] * disk_count
1096

    
1097
  @property
1098
  def disk_results(self):
1099
    """Returns per-disk results.
1100

1101
    """
1102
    return self._dresults
1103

    
1104
  def ReportConnected(self, ie, private):
1105
    """Called when a connection has been established.
1106

1107
    """
1108
    (idx, _) = private
1109

    
1110
    self._feedback_fn("Disk %s is now sending data" % idx)
1111

    
1112
  def ReportProgress(self, ie, private):
1113
    """Called when new progress information should be reported.
1114

1115
    """
1116
    (idx, _) = private
1117

    
1118
    progress = ie.progress
1119
    if not progress:
1120
      return
1121

    
1122
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1123

    
1124
  def ReportFinished(self, ie, private):
1125
    """Called when a transfer has finished.
1126

1127
    """
1128
    (idx, finished_fn) = private
1129

    
1130
    if ie.success:
1131
      self._feedback_fn("Disk %s finished sending data" % idx)
1132
    else:
1133
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1134
                        (idx, ie.final_message, ie.recent_output))
1135

    
1136
    self._dresults[idx] = bool(ie.success)
1137

    
1138
    if finished_fn:
1139
      finished_fn()
1140

    
1141

    
1142
class ExportInstanceHelper:
1143
  def __init__(self, lu, feedback_fn, instance):
1144
    """Initializes this class.
1145

1146
    @param lu: Logical unit instance
1147
    @param feedback_fn: Feedback function
1148
    @type instance: L{objects.Instance}
1149
    @param instance: Instance object
1150

1151
    """
1152
    self._lu = lu
1153
    self._feedback_fn = feedback_fn
1154
    self._instance = instance
1155

    
1156
    self._snap_disks = []
1157
    self._removed_snaps = [False] * len(instance.disks)
1158

    
1159
  def CreateSnapshots(self):
1160
    """Creates an LVM snapshot for every disk of the instance.
1161

1162
    """
1163
    assert not self._snap_disks
1164

    
1165
    instance = self._instance
1166
    src_node = instance.primary_node
1167
    src_node_name = self._lu.cfg.GetNodeName(src_node)
1168

    
1169
    for idx, disk in enumerate(instance.disks):
1170
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171
                        (idx, src_node_name))
1172

    
1173
      # result.payload will be a snapshot of an lvm leaf of the one we
1174
      # passed
1175
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176
      new_dev = False
1177
      msg = result.fail_msg
1178
      if msg:
1179
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180
                            idx, src_node_name, msg)
1181
      elif (not isinstance(result.payload, (tuple, list)) or
1182
            len(result.payload) != 2):
1183
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184
                            " result '%s'", idx, src_node_name, result.payload)
1185
      else:
1186
        disk_id = tuple(result.payload)
1187
        disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188
        new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189
                               logical_id=disk_id, iv_name=disk.iv_name,
1190
                               params=disk_params)
1191

    
1192
      self._snap_disks.append(new_dev)
1193

    
1194
    assert len(self._snap_disks) == len(instance.disks)
1195
    assert len(self._removed_snaps) == len(instance.disks)
1196

    
1197
  def _RemoveSnapshot(self, disk_index):
1198
    """Removes an LVM snapshot.
1199

1200
    @type disk_index: number
1201
    @param disk_index: Index of the snapshot to be removed
1202

1203
    """
1204
    disk = self._snap_disks[disk_index]
1205
    if disk and not self._removed_snaps[disk_index]:
1206
      src_node = self._instance.primary_node
1207
      src_node_name = self._lu.cfg.GetNodeName(src_node)
1208

    
1209
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210
                        (disk_index, src_node_name))
1211

    
1212
      result = self._lu.rpc.call_blockdev_remove(src_node,
1213
                                                 (disk, self._instance))
1214
      if result.fail_msg:
1215
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1216
                            " %s: %s", disk_index, src_node_name,
1217
                            result.fail_msg)
1218
      else:
1219
        self._removed_snaps[disk_index] = True
1220

    
1221
  def LocalExport(self, dest_node):
1222
    """Intra-cluster instance export.
1223

1224
    @type dest_node: L{objects.Node}
1225
    @param dest_node: Destination node
1226

1227
    """
1228
    instance = self._instance
1229
    src_node_uuid = instance.primary_node
1230

    
1231
    assert len(self._snap_disks) == len(instance.disks)
1232

    
1233
    transfers = []
1234

    
1235
    for idx, dev in enumerate(self._snap_disks):
1236
      if not dev:
1237
        transfers.append(None)
1238
        continue
1239

    
1240
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1241
                            dev.logical_id[1])
1242

    
1243
      finished_fn = compat.partial(self._TransferFinished, idx)
1244

    
1245
      # FIXME: pass debug option from opcode to backend
1246
      dt = DiskTransfer("snapshot/%s" % idx,
1247
                        constants.IEIO_SCRIPT, ((dev, instance), idx),
1248
                        constants.IEIO_FILE, (path, ),
1249
                        finished_fn)
1250
      transfers.append(dt)
1251

    
1252
    # Actually export data
1253
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1254
                                    src_node_uuid, dest_node.uuid,
1255
                                    dest_node.secondary_ip,
1256
                                    constants.IEC_NONE,
1257
                                    instance, transfers)
1258

    
1259
    assert len(dresults) == len(instance.disks)
1260

    
1261
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1262
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1263
                                               self._snap_disks)
1264
    msg = result.fail_msg
1265
    fin_resu = not msg
1266
    if msg:
1267
      self._lu.LogWarning("Could not finalize export for instance %s"
1268
                          " on node %s: %s", instance.name, dest_node.name, msg)
1269

    
1270
    return (fin_resu, dresults)
1271

    
1272
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1273
    """Inter-cluster instance export.
1274

1275
    @type disk_info: list
1276
    @param disk_info: Per-disk destination information
1277
    @type key_name: string
1278
    @param key_name: Name of X509 key to use
1279
    @type dest_ca_pem: string
1280
    @param dest_ca_pem: Destination X509 CA in PEM format
1281
    @type timeouts: L{ImportExportTimeouts}
1282
    @param timeouts: Timeouts for this import
1283

1284
    """
1285
    instance = self._instance
1286

    
1287
    assert len(disk_info) == len(instance.disks)
1288

    
1289
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1290

    
1291
    ieloop = ImportExportLoop(self._lu)
1292
    try:
1293
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1294
                                                           disk_info)):
1295
        # Decide whether to use IPv6
1296
        ipv6 = netutils.IP6Address.IsValid(host)
1297

    
1298
        opts = objects.ImportExportOptions(key_name=key_name,
1299
                                           ca_pem=dest_ca_pem,
1300
                                           magic=magic, ipv6=ipv6)
1301

    
1302
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1303
        finished_fn = compat.partial(self._TransferFinished, idx)
1304
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1305
                              opts, host, port, instance, "disk%d" % idx,
1306
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1307
                              timeouts, cbs, private=(idx, finished_fn)))
1308

    
1309
      ieloop.Run()
1310
    finally:
1311
      ieloop.FinalizeAll()
1312

    
1313
    return (True, cbs.disk_results)
1314

    
1315
  def _TransferFinished(self, idx):
1316
    """Called once a transfer has finished.
1317

1318
    @type idx: number
1319
    @param idx: Disk index
1320

1321
    """
1322
    logging.debug("Transfer %s finished", idx)
1323
    self._RemoveSnapshot(idx)
1324

    
1325
  def Cleanup(self):
1326
    """Remove all snapshots.
1327

1328
    """
1329
    assert len(self._removed_snaps) == len(self._instance.disks)
1330
    for idx in range(len(self._instance.disks)):
1331
      self._RemoveSnapshot(idx)
1332

    
1333

    
1334
class _RemoteImportCb(ImportExportCbBase):
1335
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1336
               external_address):
1337
    """Initializes this class.
1338

1339
    @type cds: string
1340
    @param cds: Cluster domain secret
1341
    @type x509_cert_pem: string
1342
    @param x509_cert_pem: CA used for signing import key
1343
    @type disk_count: number
1344
    @param disk_count: Number of disks
1345
    @type external_address: string
1346
    @param external_address: External address of destination node
1347

1348
    """
1349
    ImportExportCbBase.__init__(self)
1350
    self._feedback_fn = feedback_fn
1351
    self._cds = cds
1352
    self._x509_cert_pem = x509_cert_pem
1353
    self._disk_count = disk_count
1354
    self._external_address = external_address
1355

    
1356
    self._dresults = [None] * disk_count
1357
    self._daemon_port = [None] * disk_count
1358

    
1359
    self._salt = utils.GenerateSecret(8)
1360

    
1361
  @property
1362
  def disk_results(self):
1363
    """Returns per-disk results.
1364

1365
    """
1366
    return self._dresults
1367

    
1368
  def _CheckAllListening(self):
1369
    """Checks whether all daemons are listening.
1370

1371
    If all daemons are listening, the information is sent to the client.
1372

1373
    """
1374
    if not compat.all(dp is not None for dp in self._daemon_port):
1375
      return
1376

    
1377
    host = self._external_address
1378

    
1379
    disks = []
1380
    for idx, (port, magic) in enumerate(self._daemon_port):
1381
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1382
                                               idx, host, port, magic))
1383

    
1384
    assert len(disks) == self._disk_count
1385

    
1386
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1387
      "disks": disks,
1388
      "x509_ca": self._x509_cert_pem,
1389
      })
1390

    
1391
  def ReportListening(self, ie, private, _):
1392
    """Called when daemon started listening.
1393

1394
    """
1395
    (idx, ) = private
1396

    
1397
    self._feedback_fn("Disk %s is now listening" % idx)
1398

    
1399
    assert self._daemon_port[idx] is None
1400

    
1401
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1402

    
1403
    self._CheckAllListening()
1404

    
1405
  def ReportConnected(self, ie, private):
1406
    """Called when a connection has been established.
1407

1408
    """
1409
    (idx, ) = private
1410

    
1411
    self._feedback_fn("Disk %s is now receiving data" % idx)
1412

    
1413
  def ReportFinished(self, ie, private):
1414
    """Called when a transfer has finished.
1415

1416
    """
1417
    (idx, ) = private
1418

    
1419
    # Daemon is certainly no longer listening
1420
    self._daemon_port[idx] = None
1421

    
1422
    if ie.success:
1423
      self._feedback_fn("Disk %s finished receiving data" % idx)
1424
    else:
1425
      self._feedback_fn(("Disk %s failed to receive data: %s"
1426
                         " (recent output: %s)") %
1427
                        (idx, ie.final_message, ie.recent_output))
1428

    
1429
    self._dresults[idx] = bool(ie.success)
1430

    
1431

    
1432
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1433
                 cds, timeouts):
1434
  """Imports an instance from another cluster.
1435

1436
  @param lu: Logical unit instance
1437
  @param feedback_fn: Feedback function
1438
  @type instance: L{objects.Instance}
1439
  @param instance: Instance object
1440
  @type pnode: L{objects.Node}
1441
  @param pnode: Primary node of instance as an object
1442
  @type source_x509_ca: OpenSSL.crypto.X509
1443
  @param source_x509_ca: Import source's X509 CA
1444
  @type cds: string
1445
  @param cds: Cluster domain secret
1446
  @type timeouts: L{ImportExportTimeouts}
1447
  @param timeouts: Timeouts for this import
1448

1449
  """
1450
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1451
                                                  source_x509_ca)
1452

    
1453
  magic_base = utils.GenerateSecret(6)
1454

    
1455
  # Decide whether to use IPv6
1456
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1457

    
1458
  # Create crypto key
1459
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1460
                                        constants.RIE_CERT_VALIDITY)
1461
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1462

    
1463
  (x509_key_name, x509_cert_pem) = result.payload
1464
  try:
1465
    # Load certificate
1466
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1467
                                                x509_cert_pem)
1468

    
1469
    # Sign certificate
1470
    signed_x509_cert_pem = \
1471
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1472

    
1473
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1474
                          len(instance.disks), pnode.primary_ip)
1475

    
1476
    ieloop = ImportExportLoop(lu)
1477
    try:
1478
      for idx, dev in enumerate(instance.disks):
1479
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1480

    
1481
        # Import daemon options
1482
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1483
                                           ca_pem=source_ca_pem,
1484
                                           magic=magic, ipv6=ipv6)
1485

    
1486
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1487
                              "disk%d" % idx,
1488
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1489
                              timeouts, cbs, private=(idx, )))
1490

    
1491
      ieloop.Run()
1492
    finally:
1493
      ieloop.FinalizeAll()
1494
  finally:
1495
    # Remove crypto key and certificate
1496
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1497
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1498

    
1499
  return cbs.disk_results
1500

    
1501

    
1502
def _GetImportExportHandshakeMessage(version):
1503
  """Returns the handshake message for a RIE protocol version.
1504

1505
  @type version: number
1506

1507
  """
1508
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1509

    
1510

    
1511
def ComputeRemoteExportHandshake(cds):
1512
  """Computes the remote import/export handshake.
1513

1514
  @type cds: string
1515
  @param cds: Cluster domain secret
1516

1517
  """
1518
  salt = utils.GenerateSecret(8)
1519
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1520
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1521

    
1522

    
1523
def CheckRemoteExportHandshake(cds, handshake):
1524
  """Checks the handshake of a remote import/export.
1525

1526
  @type cds: string
1527
  @param cds: Cluster domain secret
1528
  @type handshake: sequence
1529
  @param handshake: Handshake sent by remote peer
1530

1531
  """
1532
  try:
1533
    (version, hmac_digest, hmac_salt) = handshake
1534
  except (TypeError, ValueError), err:
1535
    return "Invalid data: %s" % err
1536

    
1537
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1538
                              hmac_digest, salt=hmac_salt):
1539
    return "Hash didn't match, clusters don't share the same domain secret"
1540

    
1541
  if version != constants.RIE_VERSION:
1542
    return ("Clusters don't have the same remote import/export protocol"
1543
            " (local=%s, remote=%s)" %
1544
            (constants.RIE_VERSION, version))
1545

    
1546
  return None
1547

    
1548

    
1549
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1550
  """Returns the hashed text for import/export disk information.
1551

1552
  @type disk_index: number
1553
  @param disk_index: Index of disk (included in hash)
1554
  @type host: string
1555
  @param host: Hostname
1556
  @type port: number
1557
  @param port: Daemon port
1558
  @type magic: string
1559
  @param magic: Magic value
1560

1561
  """
1562
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1563

    
1564

    
1565
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1566
  """Verifies received disk information for an export.
1567

1568
  @type cds: string
1569
  @param cds: Cluster domain secret
1570
  @type disk_index: number
1571
  @param disk_index: Index of disk (included in hash)
1572
  @type disk_info: sequence
1573
  @param disk_info: Disk information sent by remote peer
1574

1575
  """
1576
  try:
1577
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1578
  except (TypeError, ValueError), err:
1579
    raise errors.GenericError("Invalid data: %s" % err)
1580

    
1581
  if not (host and port and magic):
1582
    raise errors.GenericError("Missing destination host, port or magic")
1583

    
1584
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1585

    
1586
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1587
    raise errors.GenericError("HMAC is wrong")
1588

    
1589
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1590
    destination = host
1591
  else:
1592
    destination = netutils.Hostname.GetNormalizedName(host)
1593

    
1594
  return (destination,
1595
          utils.ValidateServiceName(port),
1596
          magic)
1597

    
1598

    
1599
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1600
  """Computes the signed disk information for a remote import.
1601

1602
  @type cds: string
1603
  @param cds: Cluster domain secret
1604
  @type salt: string
1605
  @param salt: HMAC salt
1606
  @type disk_index: number
1607
  @param disk_index: Index of disk (included in hash)
1608
  @type host: string
1609
  @param host: Hostname
1610
  @type port: number
1611
  @param port: Daemon port
1612
  @type magic: string
1613
  @param magic: Magic value
1614

1615
  """
1616
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1617
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1618
  return (host, port, magic, hmac_digest, salt)
1619

    
1620

    
1621
def CalculateGroupIPolicy(cluster, group):
1622
  """Calculate instance policy for group.
1623

1624
  """
1625
  return cluster.SimpleFillIPolicy(group.ipolicy)
1626

    
1627

    
1628
def ComputeDiskSize(disk_template, disks):
1629
  """Compute disk size requirements according to disk template
1630

1631
  """
1632
  # Required free disk space as a function of disk and swap space
1633
  req_size_dict = {
1634
    constants.DT_DISKLESS: 0,
1635
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1636
    # 128 MB are added for drbd metadata for each disk
1637
    constants.DT_DRBD8:
1638
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1639
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1640
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1641
    constants.DT_BLOCK: 0,
1642
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1643
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1644
  }
1645

    
1646
  if disk_template not in req_size_dict:
1647
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1648
                                 " is unknown" % disk_template)
1649

    
1650
  return req_size_dict[disk_template]