Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / instance.py @ 0c3d9c7c

History | View | Annotate | Download (46.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Instance-related functions and classes for masterd.
23

24
"""
25

    
26
import logging
27
import time
28
import OpenSSL
29

    
30
from ganeti import constants
31
from ganeti import errors
32
from ganeti import compat
33
from ganeti import utils
34
from ganeti import objects
35
from ganeti import netutils
36
from ganeti import pathutils
37

    
38

    
39
class _ImportExportError(Exception):
40
  """Local exception to report import/export errors.
41

42
  """
43

    
44

    
45
class ImportExportTimeouts(object):
46
  #: Time until daemon starts writing status file
47
  DEFAULT_READY_TIMEOUT = 10
48

    
49
  #: Length of time until errors cause hard failure
50
  DEFAULT_ERROR_TIMEOUT = 10
51

    
52
  #: Time after which daemon must be listening
53
  DEFAULT_LISTEN_TIMEOUT = 10
54

    
55
  #: Progress update interval
56
  DEFAULT_PROGRESS_INTERVAL = 60
57

    
58
  __slots__ = [
59
    "error",
60
    "ready",
61
    "listen",
62
    "connect",
63
    "progress",
64
    ]
65

    
66
  def __init__(self, connect,
67
               listen=DEFAULT_LISTEN_TIMEOUT,
68
               error=DEFAULT_ERROR_TIMEOUT,
69
               ready=DEFAULT_READY_TIMEOUT,
70
               progress=DEFAULT_PROGRESS_INTERVAL):
71
    """Initializes this class.
72

73
    @type connect: number
74
    @param connect: Timeout for establishing connection
75
    @type listen: number
76
    @param listen: Timeout for starting to listen for connections
77
    @type error: number
78
    @param error: Length of time until errors cause hard failure
79
    @type ready: number
80
    @param ready: Timeout for daemon to become ready
81
    @type progress: number
82
    @param progress: Progress update interval
83

84
    """
85
    self.error = error
86
    self.ready = ready
87
    self.listen = listen
88
    self.connect = connect
89
    self.progress = progress
90

    
91

    
92
class ImportExportCbBase(object):
93
  """Callbacks for disk import/export.
94

95
  """
96
  def ReportListening(self, ie, private, component):
97
    """Called when daemon started listening.
98

99
    @type ie: Subclass of L{_DiskImportExportBase}
100
    @param ie: Import/export object
101
    @param private: Private data passed to import/export object
102
    @param component: transfer component name
103

104
    """
105

    
106
  def ReportConnected(self, ie, private):
107
    """Called when a connection has been established.
108

109
    @type ie: Subclass of L{_DiskImportExportBase}
110
    @param ie: Import/export object
111
    @param private: Private data passed to import/export object
112

113
    """
114

    
115
  def ReportProgress(self, ie, private):
116
    """Called when new progress information should be reported.
117

118
    @type ie: Subclass of L{_DiskImportExportBase}
119
    @param ie: Import/export object
120
    @param private: Private data passed to import/export object
121

122
    """
123

    
124
  def ReportFinished(self, ie, private):
125
    """Called when a transfer has finished.
126

127
    @type ie: Subclass of L{_DiskImportExportBase}
128
    @param ie: Import/export object
129
    @param private: Private data passed to import/export object
130

131
    """
132

    
133

    
134
class _DiskImportExportBase(object):
135
  MODE_TEXT = None
136

    
137
  def __init__(self, lu, node_uuid, opts,
138
               instance, component, timeouts, cbs, private=None):
139
    """Initializes this class.
140

141
    @param lu: Logical unit instance
142
    @type node_uuid: string
143
    @param node_uuid: Node UUID for import
144
    @type opts: L{objects.ImportExportOptions}
145
    @param opts: Import/export daemon options
146
    @type instance: L{objects.Instance}
147
    @param instance: Instance object
148
    @type component: string
149
    @param component: which part of the instance is being imported
150
    @type timeouts: L{ImportExportTimeouts}
151
    @param timeouts: Timeouts for this import
152
    @type cbs: L{ImportExportCbBase}
153
    @param cbs: Callbacks
154
    @param private: Private data for callback functions
155

156
    """
157
    assert self.MODE_TEXT
158

    
159
    self._lu = lu
160
    self.node_uuid = node_uuid
161
    self.node_name = lu.cfg.GetNodeName(node_uuid)
162
    self._opts = opts.Copy()
163
    self._instance = instance
164
    self._component = component
165
    self._timeouts = timeouts
166
    self._cbs = cbs
167
    self._private = private
168

    
169
    # Set master daemon's timeout in options for import/export daemon
170
    assert self._opts.connect_timeout is None
171
    self._opts.connect_timeout = timeouts.connect
172

    
173
    # Parent loop
174
    self._loop = None
175

    
176
    # Timestamps
177
    self._ts_begin = None
178
    self._ts_connected = None
179
    self._ts_finished = None
180
    self._ts_cleanup = None
181
    self._ts_last_progress = None
182
    self._ts_last_error = None
183

    
184
    # Transfer status
185
    self.success = None
186
    self.final_message = None
187

    
188
    # Daemon status
189
    self._daemon_name = None
190
    self._daemon = None
191

    
192
  @property
193
  def recent_output(self):
194
    """Returns the most recent output from the daemon.
195

196
    """
197
    if self._daemon:
198
      return "\n".join(self._daemon.recent_output)
199

    
200
    return None
201

    
202
  @property
203
  def progress(self):
204
    """Returns transfer progress information.
205

206
    """
207
    if not self._daemon:
208
      return None
209

    
210
    return (self._daemon.progress_mbytes,
211
            self._daemon.progress_throughput,
212
            self._daemon.progress_percent,
213
            self._daemon.progress_eta)
214

    
215
  @property
216
  def magic(self):
217
    """Returns the magic value for this import/export.
218

219
    """
220
    return self._opts.magic
221

    
222
  @property
223
  def active(self):
224
    """Determines whether this transport is still active.
225

226
    """
227
    return self.success is None
228

    
229
  @property
230
  def loop(self):
231
    """Returns parent loop.
232

233
    @rtype: L{ImportExportLoop}
234

235
    """
236
    return self._loop
237

    
238
  def SetLoop(self, loop):
239
    """Sets the parent loop.
240

241
    @type loop: L{ImportExportLoop}
242

243
    """
244
    if self._loop:
245
      raise errors.ProgrammerError("Loop can only be set once")
246

    
247
    self._loop = loop
248

    
249
  def _StartDaemon(self):
250
    """Starts the import/export daemon.
251

252
    """
253
    raise NotImplementedError()
254

    
255
  def CheckDaemon(self):
256
    """Checks whether daemon has been started and if not, starts it.
257

258
    @rtype: string
259
    @return: Daemon name
260

261
    """
262
    assert self._ts_cleanup is None
263

    
264
    if self._daemon_name is None:
265
      assert self._ts_begin is None
266

    
267
      result = self._StartDaemon()
268
      if result.fail_msg:
269
        raise _ImportExportError("Failed to start %s on %s: %s" %
270
                                 (self.MODE_TEXT, self.node_name,
271
                                  result.fail_msg))
272

    
273
      daemon_name = result.payload
274

    
275
      logging.info("Started %s '%s' on %s", self.MODE_TEXT, daemon_name,
276
                   self.node_name)
277

    
278
      self._ts_begin = time.time()
279
      self._daemon_name = daemon_name
280

    
281
    return self._daemon_name
282

    
283
  def GetDaemonName(self):
284
    """Returns the daemon name.
285

286
    """
287
    assert self._daemon_name, "Daemon has not been started"
288
    assert self._ts_cleanup is None
289
    return self._daemon_name
290

    
291
  def Abort(self):
292
    """Sends SIGTERM to import/export daemon (if still active).
293

294
    """
295
    if self._daemon_name:
296
      self._lu.LogWarning("Aborting %s '%s' on %s",
297
                          self.MODE_TEXT, self._daemon_name, self.node_uuid)
298
      result = self._lu.rpc.call_impexp_abort(self.node_uuid, self._daemon_name)
299
      if result.fail_msg:
300
        self._lu.LogWarning("Failed to abort %s '%s' on %s: %s",
301
                            self.MODE_TEXT, self._daemon_name,
302
                            self.node_uuid, result.fail_msg)
303
        return False
304

    
305
    return True
306

    
307
  def _SetDaemonData(self, data):
308
    """Internal function for updating status daemon data.
309

310
    @type data: L{objects.ImportExportStatus}
311
    @param data: Daemon status data
312

313
    """
314
    assert self._ts_begin is not None
315

    
316
    if not data:
317
      if utils.TimeoutExpired(self._ts_begin, self._timeouts.ready):
318
        raise _ImportExportError("Didn't become ready after %s seconds" %
319
                                 self._timeouts.ready)
320

    
321
      return False
322

    
323
    self._daemon = data
324

    
325
    return True
326

    
327
  def SetDaemonData(self, success, data):
328
    """Updates daemon status data.
329

330
    @type success: bool
331
    @param success: Whether fetching data was successful or not
332
    @type data: L{objects.ImportExportStatus}
333
    @param data: Daemon status data
334

335
    """
336
    if not success:
337
      if self._ts_last_error is None:
338
        self._ts_last_error = time.time()
339

    
340
      elif utils.TimeoutExpired(self._ts_last_error, self._timeouts.error):
341
        raise _ImportExportError("Too many errors while updating data")
342

    
343
      return False
344

    
345
    self._ts_last_error = None
346

    
347
    return self._SetDaemonData(data)
348

    
349
  def CheckListening(self):
350
    """Checks whether the daemon is listening.
351

352
    """
353
    raise NotImplementedError()
354

    
355
  def _GetConnectedCheckEpoch(self):
356
    """Returns timeout to calculate connect timeout.
357

358
    """
359
    raise NotImplementedError()
360

    
361
  def CheckConnected(self):
362
    """Checks whether the daemon is connected.
363

364
    @rtype: bool
365
    @return: Whether the daemon is connected
366

367
    """
368
    assert self._daemon, "Daemon status missing"
369

    
370
    if self._ts_connected is not None:
371
      return True
372

    
373
    if self._daemon.connected:
374
      self._ts_connected = time.time()
375

    
376
      # TODO: Log remote peer
377
      logging.debug("%s '%s' on %s is now connected",
378
                    self.MODE_TEXT, self._daemon_name, self.node_uuid)
379

    
380
      self._cbs.ReportConnected(self, self._private)
381

    
382
      return True
383

    
384
    if utils.TimeoutExpired(self._GetConnectedCheckEpoch(),
385
                            self._timeouts.connect):
386
      raise _ImportExportError("Not connected after %s seconds" %
387
                               self._timeouts.connect)
388

    
389
    return False
390

    
391
  def _CheckProgress(self):
392
    """Checks whether a progress update should be reported.
393

394
    """
395
    if ((self._ts_last_progress is None or
396
        utils.TimeoutExpired(self._ts_last_progress,
397
                             self._timeouts.progress)) and
398
        self._daemon and
399
        self._daemon.progress_mbytes is not None and
400
        self._daemon.progress_throughput is not None):
401
      self._cbs.ReportProgress(self, self._private)
402
      self._ts_last_progress = time.time()
403

    
404
  def CheckFinished(self):
405
    """Checks whether the daemon exited.
406

407
    @rtype: bool
408
    @return: Whether the transfer is finished
409

410
    """
411
    assert self._daemon, "Daemon status missing"
412

    
413
    if self._ts_finished:
414
      return True
415

    
416
    if self._daemon.exit_status is None:
417
      # TODO: Adjust delay for ETA expiring soon
418
      self._CheckProgress()
419
      return False
420

    
421
    self._ts_finished = time.time()
422

    
423
    self._ReportFinished(self._daemon.exit_status == 0,
424
                         self._daemon.error_message)
425

    
426
    return True
427

    
428
  def _ReportFinished(self, success, message):
429
    """Transfer is finished or daemon exited.
430

431
    @type success: bool
432
    @param success: Whether the transfer was successful
433
    @type message: string
434
    @param message: Error message
435

436
    """
437
    assert self.success is None
438

    
439
    self.success = success
440
    self.final_message = message
441

    
442
    if success:
443
      logging.info("%s '%s' on %s succeeded", self.MODE_TEXT,
444
                   self._daemon_name, self.node_uuid)
445
    elif self._daemon_name:
446
      self._lu.LogWarning("%s '%s' on %s failed: %s",
447
                          self.MODE_TEXT, self._daemon_name,
448
                          self._lu.cfg.GetNodeName(self.node_uuid),
449
                          message)
450
    else:
451
      self._lu.LogWarning("%s on %s failed: %s", self.MODE_TEXT,
452
                          self._lu.cfg.GetNodeName(self.node_uuid), message)
453

    
454
    self._cbs.ReportFinished(self, self._private)
455

    
456
  def _Finalize(self):
457
    """Makes the RPC call to finalize this import/export.
458

459
    """
460
    return self._lu.rpc.call_impexp_cleanup(self.node_uuid, self._daemon_name)
461

    
462
  def Finalize(self, error=None):
463
    """Finalizes this import/export.
464

465
    """
466
    if self._daemon_name:
467
      logging.info("Finalizing %s '%s' on %s",
468
                   self.MODE_TEXT, self._daemon_name, self.node_uuid)
469

    
470
      result = self._Finalize()
471
      if result.fail_msg:
472
        self._lu.LogWarning("Failed to finalize %s '%s' on %s: %s",
473
                            self.MODE_TEXT, self._daemon_name,
474
                            self.node_uuid, result.fail_msg)
475
        return False
476

    
477
      # Daemon is no longer running
478
      self._daemon_name = None
479
      self._ts_cleanup = time.time()
480

    
481
    if error:
482
      self._ReportFinished(False, error)
483

    
484
    return True
485

    
486

    
487
class DiskImport(_DiskImportExportBase):
488
  MODE_TEXT = "import"
489

    
490
  def __init__(self, lu, node_uuid, opts, instance, component,
491
               dest, dest_args, timeouts, cbs, private=None):
492
    """Initializes this class.
493

494
    @param lu: Logical unit instance
495
    @type node_uuid: string
496
    @param node_uuid: Node name for import
497
    @type opts: L{objects.ImportExportOptions}
498
    @param opts: Import/export daemon options
499
    @type instance: L{objects.Instance}
500
    @param instance: Instance object
501
    @type component: string
502
    @param component: which part of the instance is being imported
503
    @param dest: I/O destination
504
    @param dest_args: I/O arguments
505
    @type timeouts: L{ImportExportTimeouts}
506
    @param timeouts: Timeouts for this import
507
    @type cbs: L{ImportExportCbBase}
508
    @param cbs: Callbacks
509
    @param private: Private data for callback functions
510

511
    """
512
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
513
                                   component, timeouts, cbs, private)
514
    self._dest = dest
515
    self._dest_args = dest_args
516

    
517
    # Timestamps
518
    self._ts_listening = None
519

    
520
  @property
521
  def listen_port(self):
522
    """Returns the port the daemon is listening on.
523

524
    """
525
    if self._daemon:
526
      return self._daemon.listen_port
527

    
528
    return None
529

    
530
  def _StartDaemon(self):
531
    """Starts the import daemon.
532

533
    """
534
    return self._lu.rpc.call_import_start(self.node_uuid, self._opts,
535
                                          self._instance, self._component,
536
                                          (self._dest, self._dest_args))
537

    
538
  def CheckListening(self):
539
    """Checks whether the daemon is listening.
540

541
    @rtype: bool
542
    @return: Whether the daemon is listening
543

544
    """
545
    assert self._daemon, "Daemon status missing"
546

    
547
    if self._ts_listening is not None:
548
      return True
549

    
550
    port = self._daemon.listen_port
551
    if port is not None:
552
      self._ts_listening = time.time()
553

    
554
      logging.debug("Import '%s' on %s is now listening on port %s",
555
                    self._daemon_name, self.node_uuid, port)
556

    
557
      self._cbs.ReportListening(self, self._private, self._component)
558

    
559
      return True
560

    
561
    if utils.TimeoutExpired(self._ts_begin, self._timeouts.listen):
562
      raise _ImportExportError("Not listening after %s seconds" %
563
                               self._timeouts.listen)
564

    
565
    return False
566

    
567
  def _GetConnectedCheckEpoch(self):
568
    """Returns the time since we started listening.
569

570
    """
571
    assert self._ts_listening is not None, \
572
           ("Checking whether an import is connected is only useful"
573
            " once it's been listening")
574

    
575
    return self._ts_listening
576

    
577

    
578
class DiskExport(_DiskImportExportBase):
579
  MODE_TEXT = "export"
580

    
581
  def __init__(self, lu, node_uuid, opts, dest_host, dest_port,
582
               instance, component, source, source_args,
583
               timeouts, cbs, private=None):
584
    """Initializes this class.
585

586
    @param lu: Logical unit instance
587
    @type node_uuid: string
588
    @param node_uuid: Node UUID for import
589
    @type opts: L{objects.ImportExportOptions}
590
    @param opts: Import/export daemon options
591
    @type dest_host: string
592
    @param dest_host: Destination host name or IP address
593
    @type dest_port: number
594
    @param dest_port: Destination port number
595
    @type instance: L{objects.Instance}
596
    @param instance: Instance object
597
    @type component: string
598
    @param component: which part of the instance is being imported
599
    @param source: I/O source
600
    @param source_args: I/O source
601
    @type timeouts: L{ImportExportTimeouts}
602
    @param timeouts: Timeouts for this import
603
    @type cbs: L{ImportExportCbBase}
604
    @param cbs: Callbacks
605
    @param private: Private data for callback functions
606

607
    """
608
    _DiskImportExportBase.__init__(self, lu, node_uuid, opts, instance,
609
                                   component, timeouts, cbs, private)
610
    self._dest_host = dest_host
611
    self._dest_port = dest_port
612
    self._source = source
613
    self._source_args = source_args
614

    
615
  def _StartDaemon(self):
616
    """Starts the export daemon.
617

618
    """
619
    return self._lu.rpc.call_export_start(self.node_uuid, self._opts,
620
                                          self._dest_host, self._dest_port,
621
                                          self._instance, self._component,
622
                                          (self._source, self._source_args))
623

    
624
  def CheckListening(self):
625
    """Checks whether the daemon is listening.
626

627
    """
628
    # Only an import can be listening
629
    return True
630

    
631
  def _GetConnectedCheckEpoch(self):
632
    """Returns the time since the daemon started.
633

634
    """
635
    assert self._ts_begin is not None
636

    
637
    return self._ts_begin
638

    
639

    
640
def FormatProgress(progress):
641
  """Formats progress information for user consumption
642

643
  """
644
  (mbytes, throughput, percent, eta) = progress
645

    
646
  parts = [
647
    utils.FormatUnit(mbytes, "h"),
648

    
649
    # Not using FormatUnit as it doesn't support kilobytes
650
    "%0.1f MiB/s" % throughput,
651
    ]
652

    
653
  if percent is not None:
654
    parts.append("%d%%" % percent)
655

    
656
  if eta is not None:
657
    parts.append("ETA %s" % utils.FormatSeconds(eta))
658

    
659
  return utils.CommaJoin(parts)
660

    
661

    
662
class ImportExportLoop:
663
  MIN_DELAY = 1.0
664
  MAX_DELAY = 20.0
665

    
666
  def __init__(self, lu):
667
    """Initializes this class.
668

669
    """
670
    self._lu = lu
671
    self._queue = []
672
    self._pending_add = []
673

    
674
  def Add(self, diskie):
675
    """Adds an import/export object to the loop.
676

677
    @type diskie: Subclass of L{_DiskImportExportBase}
678
    @param diskie: Import/export object
679

680
    """
681
    assert diskie not in self._pending_add
682
    assert diskie.loop is None
683

    
684
    diskie.SetLoop(self)
685

    
686
    # Adding new objects to a staging list is necessary, otherwise the main
687
    # loop gets confused if callbacks modify the queue while the main loop is
688
    # iterating over it.
689
    self._pending_add.append(diskie)
690

    
691
  @staticmethod
692
  def _CollectDaemonStatus(lu, daemons):
693
    """Collects the status for all import/export daemons.
694

695
    """
696
    daemon_status = {}
697

    
698
    for node_name, names in daemons.iteritems():
699
      result = lu.rpc.call_impexp_status(node_name, names)
700
      if result.fail_msg:
701
        lu.LogWarning("Failed to get daemon status on %s: %s",
702
                      node_name, result.fail_msg)
703
        continue
704

    
705
      assert len(names) == len(result.payload)
706

    
707
      daemon_status[node_name] = dict(zip(names, result.payload))
708

    
709
    return daemon_status
710

    
711
  @staticmethod
712
  def _GetActiveDaemonNames(queue):
713
    """Gets the names of all active daemons.
714

715
    """
716
    result = {}
717
    for diskie in queue:
718
      if not diskie.active:
719
        continue
720

    
721
      try:
722
        # Start daemon if necessary
723
        daemon_name = diskie.CheckDaemon()
724
      except _ImportExportError, err:
725
        logging.exception("%s failed", diskie.MODE_TEXT)
726
        diskie.Finalize(error=str(err))
727
        continue
728

    
729
      result.setdefault(diskie.node_name, []).append(daemon_name)
730

    
731
    assert len(queue) >= len(result)
732
    assert len(queue) >= sum([len(names) for names in result.itervalues()])
733

    
734
    logging.debug("daemons=%r", result)
735

    
736
    return result
737

    
738
  def _AddPendingToQueue(self):
739
    """Adds all pending import/export objects to the internal queue.
740

741
    """
742
    assert compat.all(diskie not in self._queue and diskie.loop == self
743
                      for diskie in self._pending_add)
744

    
745
    self._queue.extend(self._pending_add)
746

    
747
    del self._pending_add[:]
748

    
749
  def Run(self):
750
    """Utility main loop.
751

752
    """
753
    while True:
754
      self._AddPendingToQueue()
755

    
756
      # Collect all active daemon names
757
      daemons = self._GetActiveDaemonNames(self._queue)
758
      if not daemons:
759
        break
760

    
761
      # Collection daemon status data
762
      data = self._CollectDaemonStatus(self._lu, daemons)
763

    
764
      # Use data
765
      delay = self.MAX_DELAY
766
      for diskie in self._queue:
767
        if not diskie.active:
768
          continue
769

    
770
        try:
771
          try:
772
            all_daemon_data = data[diskie.node_name]
773
          except KeyError:
774
            result = diskie.SetDaemonData(False, None)
775
          else:
776
            result = \
777
              diskie.SetDaemonData(True,
778
                                   all_daemon_data[diskie.GetDaemonName()])
779

    
780
          if not result:
781
            # Daemon not yet ready, retry soon
782
            delay = min(3.0, delay)
783
            continue
784

    
785
          if diskie.CheckFinished():
786
            # Transfer finished
787
            diskie.Finalize()
788
            continue
789

    
790
          # Normal case: check again in 5 seconds
791
          delay = min(5.0, delay)
792

    
793
          if not diskie.CheckListening():
794
            # Not yet listening, retry soon
795
            delay = min(1.0, delay)
796
            continue
797

    
798
          if not diskie.CheckConnected():
799
            # Not yet connected, retry soon
800
            delay = min(1.0, delay)
801
            continue
802

    
803
        except _ImportExportError, err:
804
          logging.exception("%s failed", diskie.MODE_TEXT)
805
          diskie.Finalize(error=str(err))
806

    
807
      if not compat.any(diskie.active for diskie in self._queue):
808
        break
809

    
810
      # Wait a bit
811
      delay = min(self.MAX_DELAY, max(self.MIN_DELAY, delay))
812
      logging.debug("Waiting for %ss", delay)
813
      time.sleep(delay)
814

    
815
  def FinalizeAll(self):
816
    """Finalizes all pending transfers.
817

818
    """
819
    success = True
820

    
821
    for diskie in self._queue:
822
      success = diskie.Finalize() and success
823

    
824
    return success
825

    
826

    
827
class _TransferInstCbBase(ImportExportCbBase):
828
  def __init__(self, lu, feedback_fn, instance, timeouts, src_node_uuid,
829
               src_cbs, dest_node_uuid, dest_ip):
830
    """Initializes this class.
831

832
    """
833
    ImportExportCbBase.__init__(self)
834

    
835
    self.lu = lu
836
    self.feedback_fn = feedback_fn
837
    self.instance = instance
838
    self.timeouts = timeouts
839
    self.src_node_uuid = src_node_uuid
840
    self.src_cbs = src_cbs
841
    self.dest_node_uuid = dest_node_uuid
842
    self.dest_ip = dest_ip
843

    
844

    
845
class _TransferInstSourceCb(_TransferInstCbBase):
846
  def ReportConnected(self, ie, dtp):
847
    """Called when a connection has been established.
848

849
    """
850
    assert self.src_cbs is None
851
    assert dtp.src_export == ie
852
    assert dtp.dest_import
853

    
854
    self.feedback_fn("%s is sending data on %s" %
855
                     (dtp.data.name, ie.node_name))
856

    
857
  def ReportProgress(self, ie, dtp):
858
    """Called when new progress information should be reported.
859

860
    """
861
    progress = ie.progress
862
    if not progress:
863
      return
864

    
865
    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
866

    
867
  def ReportFinished(self, ie, dtp):
868
    """Called when a transfer has finished.
869

870
    """
871
    assert self.src_cbs is None
872
    assert dtp.src_export == ie
873
    assert dtp.dest_import
874

    
875
    if ie.success:
876
      self.feedback_fn("%s finished sending data" % dtp.data.name)
877
    else:
878
      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
879
                       (dtp.data.name, ie.final_message, ie.recent_output))
880

    
881
    dtp.RecordResult(ie.success)
882

    
883
    cb = dtp.data.finished_fn
884
    if cb:
885
      cb()
886

    
887
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
888
    # give the daemon a moment to sort things out
889
    if dtp.dest_import and not ie.success:
890
      dtp.dest_import.Abort()
891

    
892

    
893
class _TransferInstDestCb(_TransferInstCbBase):
894
  def ReportListening(self, ie, dtp, component):
895
    """Called when daemon started listening.
896

897
    """
898
    assert self.src_cbs
899
    assert dtp.src_export is None
900
    assert dtp.dest_import
901
    assert dtp.export_opts
902

    
903
    self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
904

    
905
    # Start export on source node
906
    de = DiskExport(self.lu, self.src_node_uuid, dtp.export_opts,
907
                    self.dest_ip, ie.listen_port, self.instance,
908
                    component, dtp.data.src_io, dtp.data.src_ioargs,
909
                    self.timeouts, self.src_cbs, private=dtp)
910
    ie.loop.Add(de)
911

    
912
    dtp.src_export = de
913

    
914
  def ReportConnected(self, ie, dtp):
915
    """Called when a connection has been established.
916

917
    """
918
    self.feedback_fn("%s is receiving data on %s" %
919
                     (dtp.data.name,
920
                      self.lu.cfg.GetNodeName(self.dest_node_uuid)))
921

    
922
  def ReportFinished(self, ie, dtp):
923
    """Called when a transfer has finished.
924

925
    """
926
    if ie.success:
927
      self.feedback_fn("%s finished receiving data" % dtp.data.name)
928
    else:
929
      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
930
                       (dtp.data.name, ie.final_message, ie.recent_output))
931

    
932
    dtp.RecordResult(ie.success)
933

    
934
    # TODO: Check whether sending SIGTERM right away is okay, maybe we should
935
    # give the daemon a moment to sort things out
936
    if dtp.src_export and not ie.success:
937
      dtp.src_export.Abort()
938

    
939

    
940
class DiskTransfer(object):
941
  def __init__(self, name, src_io, src_ioargs, dest_io, dest_ioargs,
942
               finished_fn):
943
    """Initializes this class.
944

945
    @type name: string
946
    @param name: User-visible name for this transfer (e.g. "disk/0")
947
    @param src_io: Source I/O type
948
    @param src_ioargs: Source I/O arguments
949
    @param dest_io: Destination I/O type
950
    @param dest_ioargs: Destination I/O arguments
951
    @type finished_fn: callable
952
    @param finished_fn: Function called once transfer has finished
953

954
    """
955
    self.name = name
956

    
957
    self.src_io = src_io
958
    self.src_ioargs = src_ioargs
959

    
960
    self.dest_io = dest_io
961
    self.dest_ioargs = dest_ioargs
962

    
963
    self.finished_fn = finished_fn
964

    
965

    
966
class _DiskTransferPrivate(object):
967
  def __init__(self, data, success, export_opts):
968
    """Initializes this class.
969

970
    @type data: L{DiskTransfer}
971
    @type success: bool
972

973
    """
974
    self.data = data
975
    self.success = success
976
    self.export_opts = export_opts
977

    
978
    self.src_export = None
979
    self.dest_import = None
980

    
981
  def RecordResult(self, success):
982
    """Updates the status.
983

984
    One failed part will cause the whole transfer to fail.
985

986
    """
987
    self.success = self.success and success
988

    
989

    
990
def _GetInstDiskMagic(base, instance_name, index):
991
  """Computes the magic value for a disk export or import.
992

993
  @type base: string
994
  @param base: Random seed value (can be the same for all disks of a transfer)
995
  @type instance_name: string
996
  @param instance_name: Name of instance
997
  @type index: number
998
  @param index: Disk index
999

1000
  """
1001
  h = compat.sha1_hash()
1002
  h.update(str(constants.RIE_VERSION))
1003
  h.update(base)
1004
  h.update(instance_name)
1005
  h.update(str(index))
1006
  return h.hexdigest()
1007

    
1008

    
1009
def TransferInstanceData(lu, feedback_fn, src_node_uuid, dest_node_uuid,
1010
                         dest_ip, instance, all_transfers):
1011
  """Transfers an instance's data from one node to another.
1012

1013
  @param lu: Logical unit instance
1014
  @param feedback_fn: Feedback function
1015
  @type src_node_uuid: string
1016
  @param src_node_uuid: Source node UUID
1017
  @type dest_node_uuid: string
1018
  @param dest_node_uuid: Destination node UUID
1019
  @type dest_ip: string
1020
  @param dest_ip: IP address of destination node
1021
  @type instance: L{objects.Instance}
1022
  @param instance: Instance object
1023
  @type all_transfers: list of L{DiskTransfer} instances
1024
  @param all_transfers: List of all disk transfers to be made
1025
  @rtype: list
1026
  @return: List with a boolean (True=successful, False=failed) for success for
1027
           each transfer
1028

1029
  """
1030
  # Disable compression for all moves as these are all within the same cluster
1031
  compress = constants.IEC_NONE
1032

    
1033
  src_node_name = lu.cfg.GetNodeName(src_node_uuid)
1034
  dest_node_name = lu.cfg.GetNodeName(dest_node_uuid)
1035

    
1036
  logging.debug("Source node %s, destination node %s, compression '%s'",
1037
                src_node_name, dest_node_name, compress)
1038

    
1039
  timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
1040
  src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
1041
                                  src_node_uuid, None, dest_node_uuid, dest_ip)
1042
  dest_cbs = _TransferInstDestCb(lu, feedback_fn, instance, timeouts,
1043
                                 src_node_uuid, src_cbs, dest_node_uuid,
1044
                                 dest_ip)
1045

    
1046
  all_dtp = []
1047

    
1048
  base_magic = utils.GenerateSecret(6)
1049

    
1050
  ieloop = ImportExportLoop(lu)
1051
  try:
1052
    for idx, transfer in enumerate(all_transfers):
1053
      if transfer:
1054
        feedback_fn("Exporting %s from %s to %s" %
1055
                    (transfer.name, src_node_name, dest_node_name))
1056

    
1057
        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
1058
        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
1059
                                           compress=compress, magic=magic)
1060

    
1061
        dtp = _DiskTransferPrivate(transfer, True, opts)
1062

    
1063
        di = DiskImport(lu, dest_node_uuid, opts, instance, "disk%d" % idx,
1064
                        transfer.dest_io, transfer.dest_ioargs,
1065
                        timeouts, dest_cbs, private=dtp)
1066
        ieloop.Add(di)
1067

    
1068
        dtp.dest_import = di
1069
      else:
1070
        dtp = _DiskTransferPrivate(None, False, None)
1071

    
1072
      all_dtp.append(dtp)
1073

    
1074
    ieloop.Run()
1075
  finally:
1076
    ieloop.FinalizeAll()
1077

    
1078
  assert len(all_dtp) == len(all_transfers)
1079
  assert compat.all((dtp.src_export is None or
1080
                      dtp.src_export.success is not None) and
1081
                     (dtp.dest_import is None or
1082
                      dtp.dest_import.success is not None)
1083
                     for dtp in all_dtp), \
1084
         "Not all imports/exports are finalized"
1085

    
1086
  return [bool(dtp.success) for dtp in all_dtp]
1087

    
1088

    
1089
class _RemoteExportCb(ImportExportCbBase):
1090
  def __init__(self, feedback_fn, disk_count):
1091
    """Initializes this class.
1092

1093
    """
1094
    ImportExportCbBase.__init__(self)
1095
    self._feedback_fn = feedback_fn
1096
    self._dresults = [None] * disk_count
1097

    
1098
  @property
1099
  def disk_results(self):
1100
    """Returns per-disk results.
1101

1102
    """
1103
    return self._dresults
1104

    
1105
  def ReportConnected(self, ie, private):
1106
    """Called when a connection has been established.
1107

1108
    """
1109
    (idx, _) = private
1110

    
1111
    self._feedback_fn("Disk %s is now sending data" % idx)
1112

    
1113
  def ReportProgress(self, ie, private):
1114
    """Called when new progress information should be reported.
1115

1116
    """
1117
    (idx, _) = private
1118

    
1119
    progress = ie.progress
1120
    if not progress:
1121
      return
1122

    
1123
    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
1124

    
1125
  def ReportFinished(self, ie, private):
1126
    """Called when a transfer has finished.
1127

1128
    """
1129
    (idx, finished_fn) = private
1130

    
1131
    if ie.success:
1132
      self._feedback_fn("Disk %s finished sending data" % idx)
1133
    else:
1134
      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
1135
                        (idx, ie.final_message, ie.recent_output))
1136

    
1137
    self._dresults[idx] = bool(ie.success)
1138

    
1139
    if finished_fn:
1140
      finished_fn()
1141

    
1142

    
1143
class ExportInstanceHelper:
1144
  def __init__(self, lu, feedback_fn, instance):
1145
    """Initializes this class.
1146

1147
    @param lu: Logical unit instance
1148
    @param feedback_fn: Feedback function
1149
    @type instance: L{objects.Instance}
1150
    @param instance: Instance object
1151

1152
    """
1153
    self._lu = lu
1154
    self._feedback_fn = feedback_fn
1155
    self._instance = instance
1156

    
1157
    self._snap_disks = []
1158
    self._removed_snaps = [False] * len(instance.disks)
1159

    
1160
  def CreateSnapshots(self):
1161
    """Creates an LVM snapshot for every disk of the instance.
1162

1163
    """
1164
    assert not self._snap_disks
1165

    
1166
    instance = self._instance
1167
    src_node = instance.primary_node
1168

    
1169
    for idx, disk in enumerate(instance.disks):
1170
      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
1171
                        (idx, src_node))
1172

    
1173
      # result.payload will be a snapshot of an lvm leaf of the one we
1174
      # passed
1175
      result = self._lu.rpc.call_blockdev_snapshot(src_node, (disk, instance))
1176
      new_dev = False
1177
      msg = result.fail_msg
1178
      if msg:
1179
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
1180
                            idx, src_node, msg)
1181
      elif (not isinstance(result.payload, (tuple, list)) or
1182
            len(result.payload) != 2):
1183
        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
1184
                            " result '%s'", idx, src_node, result.payload)
1185
      else:
1186
        disk_id = tuple(result.payload)
1187
        disk_params = constants.DISK_LD_DEFAULTS[constants.DT_PLAIN].copy()
1188
        new_dev = objects.Disk(dev_type=constants.DT_PLAIN, size=disk.size,
1189
                               logical_id=disk_id, physical_id=disk_id,
1190
                               iv_name=disk.iv_name,
1191
                               params=disk_params)
1192

    
1193
      self._snap_disks.append(new_dev)
1194

    
1195
    assert len(self._snap_disks) == len(instance.disks)
1196
    assert len(self._removed_snaps) == len(instance.disks)
1197

    
1198
  def _RemoveSnapshot(self, disk_index):
1199
    """Removes an LVM snapshot.
1200

1201
    @type disk_index: number
1202
    @param disk_index: Index of the snapshot to be removed
1203

1204
    """
1205
    disk = self._snap_disks[disk_index]
1206
    if disk and not self._removed_snaps[disk_index]:
1207
      src_node = self._instance.primary_node
1208

    
1209
      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
1210
                        (disk_index, src_node))
1211

    
1212
      result = self._lu.rpc.call_blockdev_remove(src_node,
1213
                                                 (disk, self._instance))
1214
      if result.fail_msg:
1215
        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
1216
                            " %s: %s", disk_index, src_node, result.fail_msg)
1217
      else:
1218
        self._removed_snaps[disk_index] = True
1219

    
1220
  def LocalExport(self, dest_node):
1221
    """Intra-cluster instance export.
1222

1223
    @type dest_node: L{objects.Node}
1224
    @param dest_node: Destination node
1225

1226
    """
1227
    instance = self._instance
1228
    src_node_uuid = instance.primary_node
1229

    
1230
    assert len(self._snap_disks) == len(instance.disks)
1231

    
1232
    transfers = []
1233

    
1234
    for idx, dev in enumerate(self._snap_disks):
1235
      if not dev:
1236
        transfers.append(None)
1237
        continue
1238

    
1239
      path = utils.PathJoin(pathutils.EXPORT_DIR, "%s.new" % instance.name,
1240
                            dev.physical_id[1])
1241

    
1242
      finished_fn = compat.partial(self._TransferFinished, idx)
1243

    
1244
      # FIXME: pass debug option from opcode to backend
1245
      dt = DiskTransfer("snapshot/%s" % idx,
1246
                        constants.IEIO_SCRIPT, ((dev, instance), idx),
1247
                        constants.IEIO_FILE, (path, ),
1248
                        finished_fn)
1249
      transfers.append(dt)
1250

    
1251
    # Actually export data
1252
    dresults = TransferInstanceData(self._lu, self._feedback_fn,
1253
                                    src_node_uuid, dest_node.uuid,
1254
                                    dest_node.secondary_ip,
1255
                                    instance, transfers)
1256

    
1257
    assert len(dresults) == len(instance.disks)
1258

    
1259
    self._feedback_fn("Finalizing export on %s" % dest_node.name)
1260
    result = self._lu.rpc.call_finalize_export(dest_node.uuid, instance,
1261
                                               self._snap_disks)
1262
    msg = result.fail_msg
1263
    fin_resu = not msg
1264
    if msg:
1265
      self._lu.LogWarning("Could not finalize export for instance %s"
1266
                          " on node %s: %s", instance.name, dest_node.name, msg)
1267

    
1268
    return (fin_resu, dresults)
1269

    
1270
  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
1271
    """Inter-cluster instance export.
1272

1273
    @type disk_info: list
1274
    @param disk_info: Per-disk destination information
1275
    @type key_name: string
1276
    @param key_name: Name of X509 key to use
1277
    @type dest_ca_pem: string
1278
    @param dest_ca_pem: Destination X509 CA in PEM format
1279
    @type timeouts: L{ImportExportTimeouts}
1280
    @param timeouts: Timeouts for this import
1281

1282
    """
1283
    instance = self._instance
1284

    
1285
    assert len(disk_info) == len(instance.disks)
1286

    
1287
    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
1288

    
1289
    ieloop = ImportExportLoop(self._lu)
1290
    try:
1291
      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
1292
                                                           disk_info)):
1293
        # Decide whether to use IPv6
1294
        ipv6 = netutils.IP6Address.IsValid(host)
1295

    
1296
        opts = objects.ImportExportOptions(key_name=key_name,
1297
                                           ca_pem=dest_ca_pem,
1298
                                           magic=magic, ipv6=ipv6)
1299

    
1300
        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
1301
        finished_fn = compat.partial(self._TransferFinished, idx)
1302
        ieloop.Add(DiskExport(self._lu, instance.primary_node,
1303
                              opts, host, port, instance, "disk%d" % idx,
1304
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1305
                              timeouts, cbs, private=(idx, finished_fn)))
1306

    
1307
      ieloop.Run()
1308
    finally:
1309
      ieloop.FinalizeAll()
1310

    
1311
    return (True, cbs.disk_results)
1312

    
1313
  def _TransferFinished(self, idx):
1314
    """Called once a transfer has finished.
1315

1316
    @type idx: number
1317
    @param idx: Disk index
1318

1319
    """
1320
    logging.debug("Transfer %s finished", idx)
1321
    self._RemoveSnapshot(idx)
1322

    
1323
  def Cleanup(self):
1324
    """Remove all snapshots.
1325

1326
    """
1327
    assert len(self._removed_snaps) == len(self._instance.disks)
1328
    for idx in range(len(self._instance.disks)):
1329
      self._RemoveSnapshot(idx)
1330

    
1331

    
1332
class _RemoteImportCb(ImportExportCbBase):
1333
  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
1334
               external_address):
1335
    """Initializes this class.
1336

1337
    @type cds: string
1338
    @param cds: Cluster domain secret
1339
    @type x509_cert_pem: string
1340
    @param x509_cert_pem: CA used for signing import key
1341
    @type disk_count: number
1342
    @param disk_count: Number of disks
1343
    @type external_address: string
1344
    @param external_address: External address of destination node
1345

1346
    """
1347
    ImportExportCbBase.__init__(self)
1348
    self._feedback_fn = feedback_fn
1349
    self._cds = cds
1350
    self._x509_cert_pem = x509_cert_pem
1351
    self._disk_count = disk_count
1352
    self._external_address = external_address
1353

    
1354
    self._dresults = [None] * disk_count
1355
    self._daemon_port = [None] * disk_count
1356

    
1357
    self._salt = utils.GenerateSecret(8)
1358

    
1359
  @property
1360
  def disk_results(self):
1361
    """Returns per-disk results.
1362

1363
    """
1364
    return self._dresults
1365

    
1366
  def _CheckAllListening(self):
1367
    """Checks whether all daemons are listening.
1368

1369
    If all daemons are listening, the information is sent to the client.
1370

1371
    """
1372
    if not compat.all(dp is not None for dp in self._daemon_port):
1373
      return
1374

    
1375
    host = self._external_address
1376

    
1377
    disks = []
1378
    for idx, (port, magic) in enumerate(self._daemon_port):
1379
      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
1380
                                               idx, host, port, magic))
1381

    
1382
    assert len(disks) == self._disk_count
1383

    
1384
    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
1385
      "disks": disks,
1386
      "x509_ca": self._x509_cert_pem,
1387
      })
1388

    
1389
  def ReportListening(self, ie, private, _):
1390
    """Called when daemon started listening.
1391

1392
    """
1393
    (idx, ) = private
1394

    
1395
    self._feedback_fn("Disk %s is now listening" % idx)
1396

    
1397
    assert self._daemon_port[idx] is None
1398

    
1399
    self._daemon_port[idx] = (ie.listen_port, ie.magic)
1400

    
1401
    self._CheckAllListening()
1402

    
1403
  def ReportConnected(self, ie, private):
1404
    """Called when a connection has been established.
1405

1406
    """
1407
    (idx, ) = private
1408

    
1409
    self._feedback_fn("Disk %s is now receiving data" % idx)
1410

    
1411
  def ReportFinished(self, ie, private):
1412
    """Called when a transfer has finished.
1413

1414
    """
1415
    (idx, ) = private
1416

    
1417
    # Daemon is certainly no longer listening
1418
    self._daemon_port[idx] = None
1419

    
1420
    if ie.success:
1421
      self._feedback_fn("Disk %s finished receiving data" % idx)
1422
    else:
1423
      self._feedback_fn(("Disk %s failed to receive data: %s"
1424
                         " (recent output: %s)") %
1425
                        (idx, ie.final_message, ie.recent_output))
1426

    
1427
    self._dresults[idx] = bool(ie.success)
1428

    
1429

    
1430
def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
1431
                 cds, timeouts):
1432
  """Imports an instance from another cluster.
1433

1434
  @param lu: Logical unit instance
1435
  @param feedback_fn: Feedback function
1436
  @type instance: L{objects.Instance}
1437
  @param instance: Instance object
1438
  @type pnode: L{objects.Node}
1439
  @param pnode: Primary node of instance as an object
1440
  @type source_x509_ca: OpenSSL.crypto.X509
1441
  @param source_x509_ca: Import source's X509 CA
1442
  @type cds: string
1443
  @param cds: Cluster domain secret
1444
  @type timeouts: L{ImportExportTimeouts}
1445
  @param timeouts: Timeouts for this import
1446

1447
  """
1448
  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1449
                                                  source_x509_ca)
1450

    
1451
  magic_base = utils.GenerateSecret(6)
1452

    
1453
  # Decide whether to use IPv6
1454
  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
1455

    
1456
  # Create crypto key
1457
  result = lu.rpc.call_x509_cert_create(instance.primary_node,
1458
                                        constants.RIE_CERT_VALIDITY)
1459
  result.Raise("Can't create X509 key and certificate on %s" % result.node)
1460

    
1461
  (x509_key_name, x509_cert_pem) = result.payload
1462
  try:
1463
    # Load certificate
1464
    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1465
                                                x509_cert_pem)
1466

    
1467
    # Sign certificate
1468
    signed_x509_cert_pem = \
1469
      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
1470

    
1471
    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
1472
                          len(instance.disks), pnode.primary_ip)
1473

    
1474
    ieloop = ImportExportLoop(lu)
1475
    try:
1476
      for idx, dev in enumerate(instance.disks):
1477
        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
1478

    
1479
        # Import daemon options
1480
        opts = objects.ImportExportOptions(key_name=x509_key_name,
1481
                                           ca_pem=source_ca_pem,
1482
                                           magic=magic, ipv6=ipv6)
1483

    
1484
        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
1485
                              "disk%d" % idx,
1486
                              constants.IEIO_SCRIPT, ((dev, instance), idx),
1487
                              timeouts, cbs, private=(idx, )))
1488

    
1489
      ieloop.Run()
1490
    finally:
1491
      ieloop.FinalizeAll()
1492
  finally:
1493
    # Remove crypto key and certificate
1494
    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
1495
    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
1496

    
1497
  return cbs.disk_results
1498

    
1499

    
1500
def _GetImportExportHandshakeMessage(version):
1501
  """Returns the handshake message for a RIE protocol version.
1502

1503
  @type version: number
1504

1505
  """
1506
  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
1507

    
1508

    
1509
def ComputeRemoteExportHandshake(cds):
1510
  """Computes the remote import/export handshake.
1511

1512
  @type cds: string
1513
  @param cds: Cluster domain secret
1514

1515
  """
1516
  salt = utils.GenerateSecret(8)
1517
  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
1518
  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
1519

    
1520

    
1521
def CheckRemoteExportHandshake(cds, handshake):
1522
  """Checks the handshake of a remote import/export.
1523

1524
  @type cds: string
1525
  @param cds: Cluster domain secret
1526
  @type handshake: sequence
1527
  @param handshake: Handshake sent by remote peer
1528

1529
  """
1530
  try:
1531
    (version, hmac_digest, hmac_salt) = handshake
1532
  except (TypeError, ValueError), err:
1533
    return "Invalid data: %s" % err
1534

    
1535
  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
1536
                              hmac_digest, salt=hmac_salt):
1537
    return "Hash didn't match, clusters don't share the same domain secret"
1538

    
1539
  if version != constants.RIE_VERSION:
1540
    return ("Clusters don't have the same remote import/export protocol"
1541
            " (local=%s, remote=%s)" %
1542
            (constants.RIE_VERSION, version))
1543

    
1544
  return None
1545

    
1546

    
1547
def _GetRieDiskInfoMessage(disk_index, host, port, magic):
1548
  """Returns the hashed text for import/export disk information.
1549

1550
  @type disk_index: number
1551
  @param disk_index: Index of disk (included in hash)
1552
  @type host: string
1553
  @param host: Hostname
1554
  @type port: number
1555
  @param port: Daemon port
1556
  @type magic: string
1557
  @param magic: Magic value
1558

1559
  """
1560
  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
1561

    
1562

    
1563
def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
1564
  """Verifies received disk information for an export.
1565

1566
  @type cds: string
1567
  @param cds: Cluster domain secret
1568
  @type disk_index: number
1569
  @param disk_index: Index of disk (included in hash)
1570
  @type disk_info: sequence
1571
  @param disk_info: Disk information sent by remote peer
1572

1573
  """
1574
  try:
1575
    (host, port, magic, hmac_digest, hmac_salt) = disk_info
1576
  except (TypeError, ValueError), err:
1577
    raise errors.GenericError("Invalid data: %s" % err)
1578

    
1579
  if not (host and port and magic):
1580
    raise errors.GenericError("Missing destination host, port or magic")
1581

    
1582
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1583

    
1584
  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
1585
    raise errors.GenericError("HMAC is wrong")
1586

    
1587
  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
1588
    destination = host
1589
  else:
1590
    destination = netutils.Hostname.GetNormalizedName(host)
1591

    
1592
  return (destination,
1593
          utils.ValidateServiceName(port),
1594
          magic)
1595

    
1596

    
1597
def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
1598
  """Computes the signed disk information for a remote import.
1599

1600
  @type cds: string
1601
  @param cds: Cluster domain secret
1602
  @type salt: string
1603
  @param salt: HMAC salt
1604
  @type disk_index: number
1605
  @param disk_index: Index of disk (included in hash)
1606
  @type host: string
1607
  @param host: Hostname
1608
  @type port: number
1609
  @param port: Daemon port
1610
  @type magic: string
1611
  @param magic: Magic value
1612

1613
  """
1614
  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
1615
  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
1616
  return (host, port, magic, hmac_digest, salt)
1617

    
1618

    
1619
def CalculateGroupIPolicy(cluster, group):
1620
  """Calculate instance policy for group.
1621

1622
  """
1623
  return cluster.SimpleFillIPolicy(group.ipolicy)
1624

    
1625

    
1626
def ComputeDiskSize(disk_template, disks):
1627
  """Compute disk size requirements according to disk template
1628

1629
  """
1630
  # Required free disk space as a function of disk and swap space
1631
  req_size_dict = {
1632
    constants.DT_DISKLESS: 0,
1633
    constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks),
1634
    # 128 MB are added for drbd metadata for each disk
1635
    constants.DT_DRBD8:
1636
      sum(d[constants.IDISK_SIZE] + constants.DRBD_META_SIZE for d in disks),
1637
    constants.DT_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1638
    constants.DT_SHARED_FILE: sum(d[constants.IDISK_SIZE] for d in disks),
1639
    constants.DT_BLOCK: 0,
1640
    constants.DT_RBD: sum(d[constants.IDISK_SIZE] for d in disks),
1641
    constants.DT_EXT: sum(d[constants.IDISK_SIZE] for d in disks),
1642
  }
1643

    
1644
  if disk_template not in req_size_dict:
1645
    raise errors.ProgrammerError("Disk template '%s' size requirement"
1646
                                 " is unknown" % disk_template)
1647

    
1648
  return req_size_dict[disk_template]