Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 8e8cf324

History | View | Annotate | Download (145.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterNodeName():
287
  """Returns the master node name.
288

289
  @rtype: string
290
  @return: name of the master node
291
  @raise RPCFail: in case of errors
292

293
  """
294
  try:
295
    return _GetConfig().GetMasterNode()
296
  except errors.ConfigurationError, err:
297
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
298

    
299

    
300
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
301
  """Decorator that runs hooks before and after the decorated function.
302

303
  @type hook_opcode: string
304
  @param hook_opcode: opcode of the hook
305
  @type hooks_path: string
306
  @param hooks_path: path of the hooks
307
  @type env_builder_fn: function
308
  @param env_builder_fn: function that returns a dictionary containing the
309
    environment variables for the hooks. Will get all the parameters of the
310
    decorated function.
311
  @raise RPCFail: in case of pre-hook failure
312

313
  """
314
  def decorator(fn):
315
    def wrapper(*args, **kwargs):
316
      _, myself = ssconf.GetMasterAndMyself()
317
      nodes = ([myself], [myself])  # these hooks run locally
318

    
319
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
320

    
321
      cfg = _GetConfig()
322
      hr = HooksRunner()
323
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
324
                                   hr.RunLocalHooks, None, env_fn,
325
                                   logging.warning, cfg.GetClusterName(),
326
                                   cfg.GetMasterNode())
327
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
328
      result = fn(*args, **kwargs)
329
      hm.RunPhase(constants.HOOKS_PHASE_POST)
330

    
331
      return result
332
    return wrapper
333
  return decorator
334

    
335

    
336
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
337
  """Builds environment variables for master IP hooks.
338

339
  @type master_params: L{objects.MasterNetworkParameters}
340
  @param master_params: network parameters of the master
341
  @type use_external_mip_script: boolean
342
  @param use_external_mip_script: whether to use an external master IP
343
    address setup script (unused, but necessary per the implementation of the
344
    _RunLocalHooks decorator)
345

346
  """
347
  # pylint: disable=W0613
348
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
349
  env = {
350
    "MASTER_NETDEV": master_params.netdev,
351
    "MASTER_IP": master_params.ip,
352
    "MASTER_NETMASK": str(master_params.netmask),
353
    "CLUSTER_IP_VERSION": str(ver),
354
  }
355

    
356
  return env
357

    
358

    
359
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
360
  """Execute the master IP address setup script.
361

362
  @type master_params: L{objects.MasterNetworkParameters}
363
  @param master_params: network parameters of the master
364
  @type action: string
365
  @param action: action to pass to the script. Must be one of
366
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
367
  @type use_external_mip_script: boolean
368
  @param use_external_mip_script: whether to use an external master IP
369
    address setup script
370
  @raise backend.RPCFail: if there are errors during the execution of the
371
    script
372

373
  """
374
  env = _BuildMasterIpEnv(master_params)
375

    
376
  if use_external_mip_script:
377
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
378
  else:
379
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
380

    
381
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
382

    
383
  if result.failed:
384
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
385
          (action, result.exit_code, result.output), log=True)
386

    
387

    
388
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
389
               _BuildMasterIpEnv)
390
def ActivateMasterIp(master_params, use_external_mip_script):
391
  """Activate the IP address of the master daemon.
392

393
  @type master_params: L{objects.MasterNetworkParameters}
394
  @param master_params: network parameters of the master
395
  @type use_external_mip_script: boolean
396
  @param use_external_mip_script: whether to use an external master IP
397
    address setup script
398
  @raise RPCFail: in case of errors during the IP startup
399

400
  """
401
  _RunMasterSetupScript(master_params, _MASTER_START,
402
                        use_external_mip_script)
403

    
404

    
405
def StartMasterDaemons(no_voting):
406
  """Activate local node as master node.
407

408
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
409

410
  @type no_voting: boolean
411
  @param no_voting: whether to start ganeti-masterd without a node vote
412
      but still non-interactively
413
  @rtype: None
414

415
  """
416

    
417
  if no_voting:
418
    masterd_args = "--no-voting --yes-do-it"
419
  else:
420
    masterd_args = ""
421

    
422
  env = {
423
    "EXTRA_MASTERD_ARGS": masterd_args,
424
    }
425

    
426
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
427
  if result.failed:
428
    msg = "Can't start Ganeti master: %s" % result.output
429
    logging.error(msg)
430
    _Fail(msg)
431

    
432

    
433
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
434
               _BuildMasterIpEnv)
435
def DeactivateMasterIp(master_params, use_external_mip_script):
436
  """Deactivate the master IP on this node.
437

438
  @type master_params: L{objects.MasterNetworkParameters}
439
  @param master_params: network parameters of the master
440
  @type use_external_mip_script: boolean
441
  @param use_external_mip_script: whether to use an external master IP
442
    address setup script
443
  @raise RPCFail: in case of errors during the IP turndown
444

445
  """
446
  _RunMasterSetupScript(master_params, _MASTER_STOP,
447
                        use_external_mip_script)
448

    
449

    
450
def StopMasterDaemons():
451
  """Stop the master daemons on this node.
452

453
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
454

455
  @rtype: None
456

457
  """
458
  # TODO: log and report back to the caller the error failures; we
459
  # need to decide in which case we fail the RPC for this
460

    
461
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
462
  if result.failed:
463
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
464
                  " and error %s",
465
                  result.cmd, result.exit_code, result.output)
466

    
467

    
468
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
469
  """Change the netmask of the master IP.
470

471
  @param old_netmask: the old value of the netmask
472
  @param netmask: the new value of the netmask
473
  @param master_ip: the master IP
474
  @param master_netdev: the master network device
475

476
  """
477
  if old_netmask == netmask:
478
    return
479

    
480
  if not netutils.IPAddress.Own(master_ip):
481
    _Fail("The master IP address is not up, not attempting to change its"
482
          " netmask")
483

    
484
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
485
                         "%s/%s" % (master_ip, netmask),
486
                         "dev", master_netdev, "label",
487
                         "%s:0" % master_netdev])
488
  if result.failed:
489
    _Fail("Could not set the new netmask on the master IP address")
490

    
491
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
492
                         "%s/%s" % (master_ip, old_netmask),
493
                         "dev", master_netdev, "label",
494
                         "%s:0" % master_netdev])
495
  if result.failed:
496
    _Fail("Could not bring down the master IP address with the old netmask")
497

    
498

    
499
def EtcHostsModify(mode, host, ip):
500
  """Modify a host entry in /etc/hosts.
501

502
  @param mode: The mode to operate. Either add or remove entry
503
  @param host: The host to operate on
504
  @param ip: The ip associated with the entry
505

506
  """
507
  if mode == constants.ETC_HOSTS_ADD:
508
    if not ip:
509
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
510
              " present")
511
    utils.AddHostToEtcHosts(host, ip)
512
  elif mode == constants.ETC_HOSTS_REMOVE:
513
    if ip:
514
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
515
              " parameter is present")
516
    utils.RemoveHostFromEtcHosts(host)
517
  else:
518
    RPCFail("Mode not supported")
519

    
520

    
521
def LeaveCluster(modify_ssh_setup):
522
  """Cleans up and remove the current node.
523

524
  This function cleans up and prepares the current node to be removed
525
  from the cluster.
526

527
  If processing is successful, then it raises an
528
  L{errors.QuitGanetiException} which is used as a special case to
529
  shutdown the node daemon.
530

531
  @param modify_ssh_setup: boolean
532

533
  """
534
  _CleanDirectory(pathutils.DATA_DIR)
535
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
536
  JobQueuePurge()
537

    
538
  if modify_ssh_setup:
539
    try:
540
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
541

    
542
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
543

    
544
      utils.RemoveFile(priv_key)
545
      utils.RemoveFile(pub_key)
546
    except errors.OpExecError:
547
      logging.exception("Error while processing ssh files")
548

    
549
  try:
550
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
551
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
552
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
553
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
554
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
555
  except: # pylint: disable=W0702
556
    logging.exception("Error while removing cluster secrets")
557

    
558
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
559
  if result.failed:
560
    logging.error("Command %s failed with exitcode %s and error %s",
561
                  result.cmd, result.exit_code, result.output)
562

    
563
  # Raise a custom exception (handled in ganeti-noded)
564
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
565

    
566

    
567
def _CheckStorageParams(params, num_params):
568
  """Performs sanity checks for storage parameters.
569

570
  @type params: list
571
  @param params: list of storage parameters
572
  @type num_params: int
573
  @param num_params: expected number of parameters
574

575
  """
576
  if params is None:
577
    raise errors.ProgrammerError("No storage parameters for storage"
578
                                 " reporting is provided.")
579
  if not isinstance(params, list):
580
    raise errors.ProgrammerError("The storage parameters are not of type"
581
                                 " list: '%s'" % params)
582
  if not len(params) == num_params:
583
    raise errors.ProgrammerError("Did not receive the expected number of"
584
                                 "storage parameters: expected %s,"
585
                                 " received '%s'" % (num_params, len(params)))
586

    
587

    
588
def _CheckLvmStorageParams(params):
589
  """Performs sanity check for the 'exclusive storage' flag.
590

591
  @see: C{_CheckStorageParams}
592

593
  """
594
  _CheckStorageParams(params, 1)
595
  excl_stor = params[0]
596
  if not isinstance(params[0], bool):
597
    raise errors.ProgrammerError("Exclusive storage parameter is not"
598
                                 " boolean: '%s'." % excl_stor)
599
  return excl_stor
600

    
601

    
602
def _GetLvmVgSpaceInfo(name, params):
603
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
604

605
  @type name: string
606
  @param name: name of the volume group
607
  @type params: list
608
  @param params: list of storage parameters, which in this case should be
609
    containing only one for exclusive storage
610

611
  """
612
  excl_stor = _CheckLvmStorageParams(params)
613
  return _GetVgInfo(name, excl_stor)
614

    
615

    
616
def _GetVgInfo(
617
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
618
  """Retrieves information about a LVM volume group.
619

620
  """
621
  # TODO: GetVGInfo supports returning information for multiple VGs at once
622
  vginfo = info_fn([name], excl_stor)
623
  if vginfo:
624
    vg_free = int(round(vginfo[0][0], 0))
625
    vg_size = int(round(vginfo[0][1], 0))
626
  else:
627
    vg_free = None
628
    vg_size = None
629

    
630
  return {
631
    "type": constants.ST_LVM_VG,
632
    "name": name,
633
    "storage_free": vg_free,
634
    "storage_size": vg_size,
635
    }
636

    
637

    
638
def _GetLvmPvSpaceInfo(name, params):
639
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
640

641
  @see: C{_GetLvmVgSpaceInfo}
642

643
  """
644
  excl_stor = _CheckLvmStorageParams(params)
645
  return _GetVgSpindlesInfo(name, excl_stor)
646

    
647

    
648
def _GetVgSpindlesInfo(
649
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
650
  """Retrieves information about spindles in an LVM volume group.
651

652
  @type name: string
653
  @param name: VG name
654
  @type excl_stor: bool
655
  @param excl_stor: exclusive storage
656
  @rtype: dict
657
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
658
      free spindles, total spindles respectively
659

660
  """
661
  if excl_stor:
662
    (vg_free, vg_size) = info_fn(name)
663
  else:
664
    vg_free = 0
665
    vg_size = 0
666
  return {
667
    "type": constants.ST_LVM_PV,
668
    "name": name,
669
    "storage_free": vg_free,
670
    "storage_size": vg_size,
671
    }
672

    
673

    
674
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
675
  """Retrieves node information from a hypervisor.
676

677
  The information returned depends on the hypervisor. Common items:
678

679
    - vg_size is the size of the configured volume group in MiB
680
    - vg_free is the free size of the volume group in MiB
681
    - memory_dom0 is the memory allocated for domain0 in MiB
682
    - memory_free is the currently available (free) ram in MiB
683
    - memory_total is the total number of ram in MiB
684
    - hv_version: the hypervisor version, if available
685

686
  @type hvparams: dict of string
687
  @param hvparams: the hypervisor's hvparams
688

689
  """
690
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
691

    
692

    
693
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
694
  """Retrieves node information for all hypervisors.
695

696
  See C{_GetHvInfo} for information on the output.
697

698
  @type hv_specs: list of pairs (string, dict of strings)
699
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
700

701
  """
702
  if hv_specs is None:
703
    return None
704

    
705
  result = []
706
  for hvname, hvparams in hv_specs:
707
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
708
  return result
709

    
710

    
711
def _GetNamedNodeInfo(names, fn):
712
  """Calls C{fn} for all names in C{names} and returns a dictionary.
713

714
  @rtype: None or dict
715

716
  """
717
  if names is None:
718
    return None
719
  else:
720
    return map(fn, names)
721

    
722

    
723
def GetNodeInfo(storage_units, hv_specs):
724
  """Gives back a hash with different information about the node.
725

726
  @type storage_units: list of tuples (string, string, list)
727
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
728
    ask for disk space information. In case of lvm-vg, the identifier is
729
    the VG name. The parameters can contain additional, storage-type-specific
730
    parameters, for example exclusive storage for lvm storage.
731
  @type hv_specs: list of pairs (string, dict of strings)
732
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
733
  @rtype: tuple; (string, None/dict, None/dict)
734
  @return: Tuple containing boot ID, volume group information and hypervisor
735
    information
736

737
  """
738
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
739
  storage_info = _GetNamedNodeInfo(
740
    storage_units,
741
    (lambda (storage_type, storage_key, storage_params):
742
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
743
  hv_info = _GetHvInfoAll(hv_specs)
744
  return (bootid, storage_info, hv_info)
745

    
746

    
747
def _GetFileStorageSpaceInfo(path, params):
748
  """Wrapper around filestorage.GetSpaceInfo.
749

750
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
751
  and ignore the *args parameter to not leak it into the filestorage
752
  module's code.
753

754
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
755
    parameters.
756

757
  """
758
  _CheckStorageParams(params, 0)
759
  return filestorage.GetFileStorageSpaceInfo(path)
760

    
761

    
762
# FIXME: implement storage reporting for all missing storage types.
763
_STORAGE_TYPE_INFO_FN = {
764
  constants.ST_BLOCK: None,
765
  constants.ST_DISKLESS: None,
766
  constants.ST_EXT: None,
767
  constants.ST_FILE: _GetFileStorageSpaceInfo,
768
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
769
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
770
  constants.ST_SHARED_FILE: None,
771
  constants.ST_RADOS: None,
772
}
773

    
774

    
775
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
776
  """Looks up and applies the correct function to calculate free and total
777
  storage for the given storage type.
778

779
  @type storage_type: string
780
  @param storage_type: the storage type for which the storage shall be reported.
781
  @type storage_key: string
782
  @param storage_key: identifier of a storage unit, e.g. the volume group name
783
    of an LVM storage unit
784
  @type args: any
785
  @param args: various parameters that can be used for storage reporting. These
786
    parameters and their semantics vary from storage type to storage type and
787
    are just propagated in this function.
788
  @return: the results of the application of the storage space function (see
789
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
790
    storage type
791
  @raises NotImplementedError: for storage types who don't support space
792
    reporting yet
793
  """
794
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
795
  if fn is not None:
796
    return fn(storage_key, *args)
797
  else:
798
    raise NotImplementedError
799

    
800

    
801
def _CheckExclusivePvs(pvi_list):
802
  """Check that PVs are not shared among LVs
803

804
  @type pvi_list: list of L{objects.LvmPvInfo} objects
805
  @param pvi_list: information about the PVs
806

807
  @rtype: list of tuples (string, list of strings)
808
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
809

810
  """
811
  res = []
812
  for pvi in pvi_list:
813
    if len(pvi.lv_list) > 1:
814
      res.append((pvi.name, pvi.lv_list))
815
  return res
816

    
817

    
818
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
819
                       get_hv_fn=hypervisor.GetHypervisor):
820
  """Verifies the hypervisor. Appends the results to the 'results' list.
821

822
  @type what: C{dict}
823
  @param what: a dictionary of things to check
824
  @type vm_capable: boolean
825
  @param vm_capable: whether or not this node is vm capable
826
  @type result: dict
827
  @param result: dictionary of verification results; results of the
828
    verifications in this function will be added here
829
  @type all_hvparams: dict of dict of string
830
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
831
  @type get_hv_fn: function
832
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
833

834
  """
835
  if not vm_capable:
836
    return
837

    
838
  if constants.NV_HYPERVISOR in what:
839
    result[constants.NV_HYPERVISOR] = {}
840
    for hv_name in what[constants.NV_HYPERVISOR]:
841
      hvparams = all_hvparams[hv_name]
842
      try:
843
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
844
      except errors.HypervisorError, err:
845
        val = "Error while checking hypervisor: %s" % str(err)
846
      result[constants.NV_HYPERVISOR][hv_name] = val
847

    
848

    
849
def _VerifyHvparams(what, vm_capable, result,
850
                    get_hv_fn=hypervisor.GetHypervisor):
851
  """Verifies the hvparams. Appends the results to the 'results' list.
852

853
  @type what: C{dict}
854
  @param what: a dictionary of things to check
855
  @type vm_capable: boolean
856
  @param vm_capable: whether or not this node is vm capable
857
  @type result: dict
858
  @param result: dictionary of verification results; results of the
859
    verifications in this function will be added here
860
  @type get_hv_fn: function
861
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
862

863
  """
864
  if not vm_capable:
865
    return
866

    
867
  if constants.NV_HVPARAMS in what:
868
    result[constants.NV_HVPARAMS] = []
869
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
870
      try:
871
        logging.info("Validating hv %s, %s", hv_name, hvparms)
872
        get_hv_fn(hv_name).ValidateParameters(hvparms)
873
      except errors.HypervisorError, err:
874
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
875

    
876

    
877
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
878
  """Verifies the instance list.
879

880
  @type what: C{dict}
881
  @param what: a dictionary of things to check
882
  @type vm_capable: boolean
883
  @param vm_capable: whether or not this node is vm capable
884
  @type result: dict
885
  @param result: dictionary of verification results; results of the
886
    verifications in this function will be added here
887
  @type all_hvparams: dict of dict of string
888
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
889

890
  """
891
  if constants.NV_INSTANCELIST in what and vm_capable:
892
    # GetInstanceList can fail
893
    try:
894
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
895
                            all_hvparams=all_hvparams)
896
    except RPCFail, err:
897
      val = str(err)
898
    result[constants.NV_INSTANCELIST] = val
899

    
900

    
901
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
902
  """Verifies the node info.
903

904
  @type what: C{dict}
905
  @param what: a dictionary of things to check
906
  @type vm_capable: boolean
907
  @param vm_capable: whether or not this node is vm capable
908
  @type result: dict
909
  @param result: dictionary of verification results; results of the
910
    verifications in this function will be added here
911
  @type all_hvparams: dict of dict of string
912
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
913

914
  """
915
  if constants.NV_HVINFO in what and vm_capable:
916
    hvname = what[constants.NV_HVINFO]
917
    hyper = hypervisor.GetHypervisor(hvname)
918
    hvparams = all_hvparams[hvname]
919
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
920

    
921

    
922
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
923
  """Verify the existance and validity of the client SSL certificate.
924

925
  """
926
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
927
  if not os.path.exists(cert_file):
928
    return (constants.CV_ERROR,
929
            "The client certificate does not exist. Run '%s' to create"
930
            "client certificates for all nodes." % create_cert_cmd)
931

    
932
  (errcode, msg) = utils.VerifyCertificate(cert_file)
933
  if errcode is not None:
934
    return (errcode, msg)
935
  else:
936
    # if everything is fine, we return the digest to be compared to the config
937
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
938

    
939

    
940
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
941
  """Verify the status of the local node.
942

943
  Based on the input L{what} parameter, various checks are done on the
944
  local node.
945

946
  If the I{filelist} key is present, this list of
947
  files is checksummed and the file/checksum pairs are returned.
948

949
  If the I{nodelist} key is present, we check that we have
950
  connectivity via ssh with the target nodes (and check the hostname
951
  report).
952

953
  If the I{node-net-test} key is present, we check that we have
954
  connectivity to the given nodes via both primary IP and, if
955
  applicable, secondary IPs.
956

957
  @type what: C{dict}
958
  @param what: a dictionary of things to check:
959
      - filelist: list of files for which to compute checksums
960
      - nodelist: list of nodes we should check ssh communication with
961
      - node-net-test: list of nodes we should check node daemon port
962
        connectivity with
963
      - hypervisor: list with hypervisors to run the verify for
964
  @type cluster_name: string
965
  @param cluster_name: the cluster's name
966
  @type all_hvparams: dict of dict of strings
967
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
968
  @type node_groups: a dict of strings
969
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
970
      have only those nodes that are in `what["nodelist"]`)
971
  @type groups_cfg: a dict of dict of strings
972
  @param groups_cfg: a dictionary mapping group uuids to their configuration
973
  @rtype: dict
974
  @return: a dictionary with the same keys as the input dict, and
975
      values representing the result of the checks
976

977
  """
978
  result = {}
979
  my_name = netutils.Hostname.GetSysName()
980
  port = netutils.GetDaemonPort(constants.NODED)
981
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
982

    
983
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
984
  _VerifyHvparams(what, vm_capable, result)
985

    
986
  if constants.NV_FILELIST in what:
987
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
988
                                              what[constants.NV_FILELIST]))
989
    result[constants.NV_FILELIST] = \
990
      dict((vcluster.MakeVirtualPath(key), value)
991
           for (key, value) in fingerprints.items())
992

    
993
  if constants.NV_CLIENT_CERT in what:
994
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
995

    
996
  if constants.NV_NODELIST in what:
997
    (nodes, bynode) = what[constants.NV_NODELIST]
998

    
999
    # Add nodes from other groups (different for each node)
1000
    try:
1001
      nodes.extend(bynode[my_name])
1002
    except KeyError:
1003
      pass
1004

    
1005
    # Use a random order
1006
    random.shuffle(nodes)
1007

    
1008
    # Try to contact all nodes
1009
    val = {}
1010
    for node in nodes:
1011
      params = groups_cfg.get(node_groups.get(node))
1012
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1013
      logging.debug("Ssh port %s (None = default) for node %s",
1014
                    str(ssh_port), node)
1015
      success, message = _GetSshRunner(cluster_name). \
1016
                            VerifyNodeHostname(node, ssh_port)
1017
      if not success:
1018
        val[node] = message
1019

    
1020
    result[constants.NV_NODELIST] = val
1021

    
1022
  if constants.NV_NODENETTEST in what:
1023
    result[constants.NV_NODENETTEST] = tmp = {}
1024
    my_pip = my_sip = None
1025
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1026
      if name == my_name:
1027
        my_pip = pip
1028
        my_sip = sip
1029
        break
1030
    if not my_pip:
1031
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1032
                      " in the node list")
1033
    else:
1034
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1035
        fail = []
1036
        if not netutils.TcpPing(pip, port, source=my_pip):
1037
          fail.append("primary")
1038
        if sip != pip:
1039
          if not netutils.TcpPing(sip, port, source=my_sip):
1040
            fail.append("secondary")
1041
        if fail:
1042
          tmp[name] = ("failure using the %s interface(s)" %
1043
                       " and ".join(fail))
1044

    
1045
  if constants.NV_MASTERIP in what:
1046
    # FIXME: add checks on incoming data structures (here and in the
1047
    # rest of the function)
1048
    master_name, master_ip = what[constants.NV_MASTERIP]
1049
    if master_name == my_name:
1050
      source = constants.IP4_ADDRESS_LOCALHOST
1051
    else:
1052
      source = None
1053
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1054
                                                     source=source)
1055

    
1056
  if constants.NV_USERSCRIPTS in what:
1057
    result[constants.NV_USERSCRIPTS] = \
1058
      [script for script in what[constants.NV_USERSCRIPTS]
1059
       if not utils.IsExecutable(script)]
1060

    
1061
  if constants.NV_OOB_PATHS in what:
1062
    result[constants.NV_OOB_PATHS] = tmp = []
1063
    for path in what[constants.NV_OOB_PATHS]:
1064
      try:
1065
        st = os.stat(path)
1066
      except OSError, err:
1067
        tmp.append("error stating out of band helper: %s" % err)
1068
      else:
1069
        if stat.S_ISREG(st.st_mode):
1070
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1071
            tmp.append(None)
1072
          else:
1073
            tmp.append("out of band helper %s is not executable" % path)
1074
        else:
1075
          tmp.append("out of band helper %s is not a file" % path)
1076

    
1077
  if constants.NV_LVLIST in what and vm_capable:
1078
    try:
1079
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1080
    except RPCFail, err:
1081
      val = str(err)
1082
    result[constants.NV_LVLIST] = val
1083

    
1084
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1085

    
1086
  if constants.NV_VGLIST in what and vm_capable:
1087
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1088

    
1089
  if constants.NV_PVLIST in what and vm_capable:
1090
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1091
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1092
                                       filter_allocatable=False,
1093
                                       include_lvs=check_exclusive_pvs)
1094
    if check_exclusive_pvs:
1095
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1096
      for pvi in val:
1097
        # Avoid sending useless data on the wire
1098
        pvi.lv_list = []
1099
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1100

    
1101
  if constants.NV_VERSION in what:
1102
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1103
                                    constants.RELEASE_VERSION)
1104

    
1105
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1106

    
1107
  if constants.NV_DRBDVERSION in what and vm_capable:
1108
    try:
1109
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1110
    except errors.BlockDeviceError, err:
1111
      logging.warning("Can't get DRBD version", exc_info=True)
1112
      drbd_version = str(err)
1113
    result[constants.NV_DRBDVERSION] = drbd_version
1114

    
1115
  if constants.NV_DRBDLIST in what and vm_capable:
1116
    try:
1117
      used_minors = drbd.DRBD8.GetUsedDevs()
1118
    except errors.BlockDeviceError, err:
1119
      logging.warning("Can't get used minors list", exc_info=True)
1120
      used_minors = str(err)
1121
    result[constants.NV_DRBDLIST] = used_minors
1122

    
1123
  if constants.NV_DRBDHELPER in what and vm_capable:
1124
    status = True
1125
    try:
1126
      payload = drbd.DRBD8.GetUsermodeHelper()
1127
    except errors.BlockDeviceError, err:
1128
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1129
      status = False
1130
      payload = str(err)
1131
    result[constants.NV_DRBDHELPER] = (status, payload)
1132

    
1133
  if constants.NV_NODESETUP in what:
1134
    result[constants.NV_NODESETUP] = tmpr = []
1135
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1136
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1137
                  " under /sys, missing required directories /sys/block"
1138
                  " and /sys/class/net")
1139
    if (not os.path.isdir("/proc/sys") or
1140
        not os.path.isfile("/proc/sysrq-trigger")):
1141
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1142
                  " under /proc, missing required directory /proc/sys and"
1143
                  " the file /proc/sysrq-trigger")
1144

    
1145
  if constants.NV_TIME in what:
1146
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1147

    
1148
  if constants.NV_OSLIST in what and vm_capable:
1149
    result[constants.NV_OSLIST] = DiagnoseOS()
1150

    
1151
  if constants.NV_BRIDGES in what and vm_capable:
1152
    result[constants.NV_BRIDGES] = [bridge
1153
                                    for bridge in what[constants.NV_BRIDGES]
1154
                                    if not utils.BridgeExists(bridge)]
1155

    
1156
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1157
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1158
        filestorage.ComputeWrongFileStoragePaths()
1159

    
1160
  if what.get(constants.NV_FILE_STORAGE_PATH):
1161
    pathresult = filestorage.CheckFileStoragePath(
1162
        what[constants.NV_FILE_STORAGE_PATH])
1163
    if pathresult:
1164
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1165

    
1166
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1167
    pathresult = filestorage.CheckFileStoragePath(
1168
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1169
    if pathresult:
1170
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1171

    
1172
  return result
1173

    
1174

    
1175
def GetCryptoTokens(token_requests):
1176
  """Perform actions on the node's cryptographic tokens.
1177

1178
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1179
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1180
  certificate. Action 'create' creates a new client certificate and private key
1181
  and also returns the digest of the certificate. The third parameter of a
1182
  token request are optional parameters for the actions, so far only the
1183
  filename is supported.
1184

1185
  @type token_requests: list of tuples of (string, string, dict), where the
1186
    first string is in constants.CRYPTO_TYPES, the second in
1187
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1188
    to string.
1189
  @param token_requests: list of requests of cryptographic tokens and actions
1190
    to perform on them. The actions come with a dictionary of options.
1191
  @rtype: list of tuples (string, string)
1192
  @return: list of tuples of the token type and the public crypto token
1193

1194
  """
1195
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1196
                       pathutils.NODED_CLIENT_CERT_FILE,
1197
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1198
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1199
  tokens = []
1200
  for (token_type, action, options) in token_requests:
1201
    if token_type not in constants.CRYPTO_TYPES:
1202
      raise errors.ProgrammerError("Token type '%s' not supported." %
1203
                                   token_type)
1204
    if action not in constants.CRYPTO_ACTIONS:
1205
      raise errors.ProgrammerError("Action '%s' is not supported." %
1206
                                   action)
1207
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1208
      if action == constants.CRYPTO_ACTION_CREATE:
1209

    
1210
        # extract file name from options
1211
        cert_filename = None
1212
        if options:
1213
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1214
        if not cert_filename:
1215
          cert_filename = _DEFAULT_CERT_FILE
1216
        # For security reason, we don't allow arbitrary filenames
1217
        if not cert_filename in _VALID_CERT_FILES:
1218
          raise errors.ProgrammerError(
1219
            "The certificate file name path '%s' is not allowed." %
1220
            cert_filename)
1221

    
1222
        # extract serial number from options
1223
        serial_no = None
1224
        if options:
1225
          try:
1226
            serial_no = int(options[constants.CRYPTO_OPTION_SERIAL_NO])
1227
          except ValueError:
1228
            raise errors.ProgrammerError(
1229
              "The given serial number is not an intenger: %s." %
1230
              options.get(constants.CRYPTO_OPTION_SERIAL_NO))
1231
          except KeyError:
1232
            raise errors.ProgrammerError("No serial number was provided.")
1233

    
1234
        if not serial_no:
1235
          raise errors.ProgrammerError(
1236
            "Cannot create an SSL certificate without a serial no.")
1237

    
1238
        utils.GenerateNewSslCert(
1239
          True, cert_filename, serial_no,
1240
          "Create new client SSL certificate in %s." % cert_filename)
1241
        tokens.append((token_type,
1242
                       utils.GetCertificateDigest(
1243
                         cert_filename=cert_filename)))
1244
      elif action == constants.CRYPTO_ACTION_GET:
1245
        tokens.append((token_type,
1246
                       utils.GetCertificateDigest()))
1247
  return tokens
1248

    
1249

    
1250
def GetBlockDevSizes(devices):
1251
  """Return the size of the given block devices
1252

1253
  @type devices: list
1254
  @param devices: list of block device nodes to query
1255
  @rtype: dict
1256
  @return:
1257
    dictionary of all block devices under /dev (key). The value is their
1258
    size in MiB.
1259

1260
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1261

1262
  """
1263
  DEV_PREFIX = "/dev/"
1264
  blockdevs = {}
1265

    
1266
  for devpath in devices:
1267
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1268
      continue
1269

    
1270
    try:
1271
      st = os.stat(devpath)
1272
    except EnvironmentError, err:
1273
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1274
      continue
1275

    
1276
    if stat.S_ISBLK(st.st_mode):
1277
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1278
      if result.failed:
1279
        # We don't want to fail, just do not list this device as available
1280
        logging.warning("Cannot get size for block device %s", devpath)
1281
        continue
1282

    
1283
      size = int(result.stdout) / (1024 * 1024)
1284
      blockdevs[devpath] = size
1285
  return blockdevs
1286

    
1287

    
1288
def GetVolumeList(vg_names):
1289
  """Compute list of logical volumes and their size.
1290

1291
  @type vg_names: list
1292
  @param vg_names: the volume groups whose LVs we should list, or
1293
      empty for all volume groups
1294
  @rtype: dict
1295
  @return:
1296
      dictionary of all partions (key) with value being a tuple of
1297
      their size (in MiB), inactive and online status::
1298

1299
        {'xenvg/test1': ('20.06', True, True)}
1300

1301
      in case of errors, a string is returned with the error
1302
      details.
1303

1304
  """
1305
  lvs = {}
1306
  sep = "|"
1307
  if not vg_names:
1308
    vg_names = []
1309
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1310
                         "--separator=%s" % sep,
1311
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1312
  if result.failed:
1313
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1314

    
1315
  for line in result.stdout.splitlines():
1316
    line = line.strip()
1317
    match = _LVSLINE_REGEX.match(line)
1318
    if not match:
1319
      logging.error("Invalid line returned from lvs output: '%s'", line)
1320
      continue
1321
    vg_name, name, size, attr = match.groups()
1322
    inactive = attr[4] == "-"
1323
    online = attr[5] == "o"
1324
    virtual = attr[0] == "v"
1325
    if virtual:
1326
      # we don't want to report such volumes as existing, since they
1327
      # don't really hold data
1328
      continue
1329
    lvs[vg_name + "/" + name] = (size, inactive, online)
1330

    
1331
  return lvs
1332

    
1333

    
1334
def ListVolumeGroups():
1335
  """List the volume groups and their size.
1336

1337
  @rtype: dict
1338
  @return: dictionary with keys volume name and values the
1339
      size of the volume
1340

1341
  """
1342
  return utils.ListVolumeGroups()
1343

    
1344

    
1345
def NodeVolumes():
1346
  """List all volumes on this node.
1347

1348
  @rtype: list
1349
  @return:
1350
    A list of dictionaries, each having four keys:
1351
      - name: the logical volume name,
1352
      - size: the size of the logical volume
1353
      - dev: the physical device on which the LV lives
1354
      - vg: the volume group to which it belongs
1355

1356
    In case of errors, we return an empty list and log the
1357
    error.
1358

1359
    Note that since a logical volume can live on multiple physical
1360
    volumes, the resulting list might include a logical volume
1361
    multiple times.
1362

1363
  """
1364
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1365
                         "--separator=|",
1366
                         "--options=lv_name,lv_size,devices,vg_name"])
1367
  if result.failed:
1368
    _Fail("Failed to list logical volumes, lvs output: %s",
1369
          result.output)
1370

    
1371
  def parse_dev(dev):
1372
    return dev.split("(")[0]
1373

    
1374
  def handle_dev(dev):
1375
    return [parse_dev(x) for x in dev.split(",")]
1376

    
1377
  def map_line(line):
1378
    line = [v.strip() for v in line]
1379
    return [{"name": line[0], "size": line[1],
1380
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1381

    
1382
  all_devs = []
1383
  for line in result.stdout.splitlines():
1384
    if line.count("|") >= 3:
1385
      all_devs.extend(map_line(line.split("|")))
1386
    else:
1387
      logging.warning("Strange line in the output from lvs: '%s'", line)
1388
  return all_devs
1389

    
1390

    
1391
def BridgesExist(bridges_list):
1392
  """Check if a list of bridges exist on the current node.
1393

1394
  @rtype: boolean
1395
  @return: C{True} if all of them exist, C{False} otherwise
1396

1397
  """
1398
  missing = []
1399
  for bridge in bridges_list:
1400
    if not utils.BridgeExists(bridge):
1401
      missing.append(bridge)
1402

    
1403
  if missing:
1404
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1405

    
1406

    
1407
def GetInstanceListForHypervisor(hname, hvparams=None,
1408
                                 get_hv_fn=hypervisor.GetHypervisor):
1409
  """Provides a list of instances of the given hypervisor.
1410

1411
  @type hname: string
1412
  @param hname: name of the hypervisor
1413
  @type hvparams: dict of strings
1414
  @param hvparams: hypervisor parameters for the given hypervisor
1415
  @type get_hv_fn: function
1416
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1417
    name; optional parameter to increase testability
1418

1419
  @rtype: list
1420
  @return: a list of all running instances on the current node
1421
    - instance1.example.com
1422
    - instance2.example.com
1423

1424
  """
1425
  results = []
1426
  try:
1427
    hv = get_hv_fn(hname)
1428
    names = hv.ListInstances(hvparams=hvparams)
1429
    results.extend(names)
1430
  except errors.HypervisorError, err:
1431
    _Fail("Error enumerating instances (hypervisor %s): %s",
1432
          hname, err, exc=True)
1433
  return results
1434

    
1435

    
1436
def GetInstanceList(hypervisor_list, all_hvparams=None,
1437
                    get_hv_fn=hypervisor.GetHypervisor):
1438
  """Provides a list of instances.
1439

1440
  @type hypervisor_list: list
1441
  @param hypervisor_list: the list of hypervisors to query information
1442
  @type all_hvparams: dict of dict of strings
1443
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1444
    cluster-wide hypervisor parameters
1445
  @type get_hv_fn: function
1446
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1447
    name; optional parameter to increase testability
1448

1449
  @rtype: list
1450
  @return: a list of all running instances on the current node
1451
    - instance1.example.com
1452
    - instance2.example.com
1453

1454
  """
1455
  results = []
1456
  for hname in hypervisor_list:
1457
    hvparams = all_hvparams[hname]
1458
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1459
                                                get_hv_fn=get_hv_fn))
1460
  return results
1461

    
1462

    
1463
def GetInstanceInfo(instance, hname, hvparams=None):
1464
  """Gives back the information about an instance as a dictionary.
1465

1466
  @type instance: string
1467
  @param instance: the instance name
1468
  @type hname: string
1469
  @param hname: the hypervisor type of the instance
1470
  @type hvparams: dict of strings
1471
  @param hvparams: the instance's hvparams
1472

1473
  @rtype: dict
1474
  @return: dictionary with the following keys:
1475
      - memory: memory size of instance (int)
1476
      - state: state of instance (HvInstanceState)
1477
      - time: cpu time of instance (float)
1478
      - vcpus: the number of vcpus (int)
1479

1480
  """
1481
  output = {}
1482

    
1483
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1484
                                                          hvparams=hvparams)
1485
  if iinfo is not None:
1486
    output["memory"] = iinfo[2]
1487
    output["vcpus"] = iinfo[3]
1488
    output["state"] = iinfo[4]
1489
    output["time"] = iinfo[5]
1490

    
1491
  return output
1492

    
1493

    
1494
def GetInstanceMigratable(instance):
1495
  """Computes whether an instance can be migrated.
1496

1497
  @type instance: L{objects.Instance}
1498
  @param instance: object representing the instance to be checked.
1499

1500
  @rtype: tuple
1501
  @return: tuple of (result, description) where:
1502
      - result: whether the instance can be migrated or not
1503
      - description: a description of the issue, if relevant
1504

1505
  """
1506
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1507
  iname = instance.name
1508
  if iname not in hyper.ListInstances(instance.hvparams):
1509
    _Fail("Instance %s is not running", iname)
1510

    
1511
  for idx in range(len(instance.disks_info)):
1512
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1513
    if not os.path.islink(link_name):
1514
      logging.warning("Instance %s is missing symlink %s for disk %d",
1515
                      iname, link_name, idx)
1516

    
1517

    
1518
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1519
  """Gather data about all instances.
1520

1521
  This is the equivalent of L{GetInstanceInfo}, except that it
1522
  computes data for all instances at once, thus being faster if one
1523
  needs data about more than one instance.
1524

1525
  @type hypervisor_list: list
1526
  @param hypervisor_list: list of hypervisors to query for instance data
1527
  @type all_hvparams: dict of dict of strings
1528
  @param all_hvparams: mapping of hypervisor names to hvparams
1529

1530
  @rtype: dict
1531
  @return: dictionary of instance: data, with data having the following keys:
1532
      - memory: memory size of instance (int)
1533
      - state: xen state of instance (string)
1534
      - time: cpu time of instance (float)
1535
      - vcpus: the number of vcpus
1536

1537
  """
1538
  output = {}
1539
  for hname in hypervisor_list:
1540
    hvparams = all_hvparams[hname]
1541
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1542
    if iinfo:
1543
      for name, _, memory, vcpus, state, times in iinfo:
1544
        value = {
1545
          "memory": memory,
1546
          "vcpus": vcpus,
1547
          "state": state,
1548
          "time": times,
1549
          }
1550
        if name in output:
1551
          # we only check static parameters, like memory and vcpus,
1552
          # and not state and time which can change between the
1553
          # invocations of the different hypervisors
1554
          for key in "memory", "vcpus":
1555
            if value[key] != output[name][key]:
1556
              _Fail("Instance %s is running twice"
1557
                    " with different parameters", name)
1558
        output[name] = value
1559

    
1560
  return output
1561

    
1562

    
1563
def GetInstanceConsoleInfo(instance_param_dict,
1564
                           get_hv_fn=hypervisor.GetHypervisor):
1565
  """Gather data about the console access of a set of instances of this node.
1566

1567
  This function assumes that the caller already knows which instances are on
1568
  this node, by calling a function such as L{GetAllInstancesInfo} or
1569
  L{GetInstanceList}.
1570

1571
  For every instance, a large amount of configuration data needs to be
1572
  provided to the hypervisor interface in order to receive the console
1573
  information. Whether this could or should be cut down can be discussed.
1574
  The information is provided in a dictionary indexed by instance name,
1575
  allowing any number of instance queries to be done.
1576

1577
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1578
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1579
    L{objects.NodeGroup}, HvParams, BeParams
1580
  @param instance_param_dict: mapping of instance name to parameters necessary
1581
    for console information retrieval
1582

1583
  @rtype: dict
1584
  @return: dictionary of instance: data, with data having the following keys:
1585
      - instance: instance name
1586
      - kind: console kind
1587
      - message: used with kind == CONS_MESSAGE, indicates console to be
1588
                 unavailable, supplies error message
1589
      - host: host to connect to
1590
      - port: port to use
1591
      - user: user for login
1592
      - command: the command, broken into parts as an array
1593
      - display: unknown, potentially unused?
1594

1595
  """
1596

    
1597
  output = {}
1598
  for inst_name in instance_param_dict:
1599
    instance = instance_param_dict[inst_name]["instance"]
1600
    pnode = instance_param_dict[inst_name]["node"]
1601
    group = instance_param_dict[inst_name]["group"]
1602
    hvparams = instance_param_dict[inst_name]["hvParams"]
1603
    beparams = instance_param_dict[inst_name]["beParams"]
1604

    
1605
    instance = objects.Instance.FromDict(instance)
1606
    pnode = objects.Node.FromDict(pnode)
1607
    group = objects.NodeGroup.FromDict(group)
1608

    
1609
    h = get_hv_fn(instance.hypervisor)
1610
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1611
                                             hvparams, beparams).ToDict()
1612

    
1613
  return output
1614

    
1615

    
1616
def _InstanceLogName(kind, os_name, instance, component):
1617
  """Compute the OS log filename for a given instance and operation.
1618

1619
  The instance name and os name are passed in as strings since not all
1620
  operations have these as part of an instance object.
1621

1622
  @type kind: string
1623
  @param kind: the operation type (e.g. add, import, etc.)
1624
  @type os_name: string
1625
  @param os_name: the os name
1626
  @type instance: string
1627
  @param instance: the name of the instance being imported/added/etc.
1628
  @type component: string or None
1629
  @param component: the name of the component of the instance being
1630
      transferred
1631

1632
  """
1633
  # TODO: Use tempfile.mkstemp to create unique filename
1634
  if component:
1635
    assert "/" not in component
1636
    c_msg = "-%s" % component
1637
  else:
1638
    c_msg = ""
1639
  base = ("%s-%s-%s%s-%s.log" %
1640
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1641
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1642

    
1643

    
1644
def InstanceOsAdd(instance, reinstall, debug):
1645
  """Add an OS to an instance.
1646

1647
  @type instance: L{objects.Instance}
1648
  @param instance: Instance whose OS is to be installed
1649
  @type reinstall: boolean
1650
  @param reinstall: whether this is an instance reinstall
1651
  @type debug: integer
1652
  @param debug: debug level, passed to the OS scripts
1653
  @rtype: None
1654

1655
  """
1656
  inst_os = OSFromDisk(instance.os)
1657

    
1658
  create_env = OSEnvironment(instance, inst_os, debug)
1659
  if reinstall:
1660
    create_env["INSTANCE_REINSTALL"] = "1"
1661

    
1662
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1663

    
1664
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1665
                        cwd=inst_os.path, output=logfile, reset_env=True)
1666
  if result.failed:
1667
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1668
                  " output: %s", result.cmd, result.fail_reason, logfile,
1669
                  result.output)
1670
    lines = [utils.SafeEncode(val)
1671
             for val in utils.TailFile(logfile, lines=20)]
1672
    _Fail("OS create script failed (%s), last lines in the"
1673
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1674

    
1675

    
1676
def RunRenameInstance(instance, old_name, debug):
1677
  """Run the OS rename script for an instance.
1678

1679
  @type instance: L{objects.Instance}
1680
  @param instance: Instance whose OS is to be installed
1681
  @type old_name: string
1682
  @param old_name: previous instance name
1683
  @type debug: integer
1684
  @param debug: debug level, passed to the OS scripts
1685
  @rtype: boolean
1686
  @return: the success of the operation
1687

1688
  """
1689
  inst_os = OSFromDisk(instance.os)
1690

    
1691
  rename_env = OSEnvironment(instance, inst_os, debug)
1692
  rename_env["OLD_INSTANCE_NAME"] = old_name
1693

    
1694
  logfile = _InstanceLogName("rename", instance.os,
1695
                             "%s-%s" % (old_name, instance.name), None)
1696

    
1697
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1698
                        cwd=inst_os.path, output=logfile, reset_env=True)
1699

    
1700
  if result.failed:
1701
    logging.error("os create command '%s' returned error: %s output: %s",
1702
                  result.cmd, result.fail_reason, result.output)
1703
    lines = [utils.SafeEncode(val)
1704
             for val in utils.TailFile(logfile, lines=20)]
1705
    _Fail("OS rename script failed (%s), last lines in the"
1706
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1707

    
1708

    
1709
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1710
  """Returns symlink path for block device.
1711

1712
  """
1713
  if _dir is None:
1714
    _dir = pathutils.DISK_LINKS_DIR
1715

    
1716
  return utils.PathJoin(_dir,
1717
                        ("%s%s%s" %
1718
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1719

    
1720

    
1721
def _SymlinkBlockDev(instance_name, device_path, idx):
1722
  """Set up symlinks to a instance's block device.
1723

1724
  This is an auxiliary function run when an instance is start (on the primary
1725
  node) or when an instance is migrated (on the target node).
1726

1727

1728
  @param instance_name: the name of the target instance
1729
  @param device_path: path of the physical block device, on the node
1730
  @param idx: the disk index
1731
  @return: absolute path to the disk's symlink
1732

1733
  """
1734
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1735
  try:
1736
    os.symlink(device_path, link_name)
1737
  except OSError, err:
1738
    if err.errno == errno.EEXIST:
1739
      if (not os.path.islink(link_name) or
1740
          os.readlink(link_name) != device_path):
1741
        os.remove(link_name)
1742
        os.symlink(device_path, link_name)
1743
    else:
1744
      raise
1745

    
1746
  return link_name
1747

    
1748

    
1749
def _RemoveBlockDevLinks(instance_name, disks):
1750
  """Remove the block device symlinks belonging to the given instance.
1751

1752
  """
1753
  for idx, _ in enumerate(disks):
1754
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1755
    if os.path.islink(link_name):
1756
      try:
1757
        os.remove(link_name)
1758
      except OSError:
1759
        logging.exception("Can't remove symlink '%s'", link_name)
1760

    
1761

    
1762
def _CalculateDeviceURI(instance, disk, device):
1763
  """Get the URI for the device.
1764

1765
  @type instance: L{objects.Instance}
1766
  @param instance: the instance which disk belongs to
1767
  @type disk: L{objects.Disk}
1768
  @param disk: the target disk object
1769
  @type device: L{bdev.BlockDev}
1770
  @param device: the corresponding BlockDevice
1771
  @rtype: string
1772
  @return: the device uri if any else None
1773

1774
  """
1775
  access_mode = disk.params.get(constants.LDP_ACCESS,
1776
                                constants.DISK_KERNELSPACE)
1777
  if access_mode == constants.DISK_USERSPACE:
1778
    # This can raise errors.BlockDeviceError
1779
    return device.GetUserspaceAccessUri(instance.hypervisor)
1780
  else:
1781
    return None
1782

    
1783

    
1784
def _GatherAndLinkBlockDevs(instance):
1785
  """Set up an instance's block device(s).
1786

1787
  This is run on the primary node at instance startup. The block
1788
  devices must be already assembled.
1789

1790
  @type instance: L{objects.Instance}
1791
  @param instance: the instance whose disks we should assemble
1792
  @rtype: list
1793
  @return: list of (disk_object, link_name, drive_uri)
1794

1795
  """
1796
  block_devices = []
1797
  for idx, disk in enumerate(instance.disks_info):
1798
    device = _RecursiveFindBD(disk)
1799
    if device is None:
1800
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1801
                                    str(disk))
1802
    device.Open()
1803
    try:
1804
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1805
    except OSError, e:
1806
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1807
                                    e.strerror)
1808
    uri = _CalculateDeviceURI(instance, disk, device)
1809

    
1810
    block_devices.append((disk, link_name, uri))
1811

    
1812
  return block_devices
1813

    
1814

    
1815
def StartInstance(instance, startup_paused, reason, store_reason=True):
1816
  """Start an instance.
1817

1818
  @type instance: L{objects.Instance}
1819
  @param instance: the instance object
1820
  @type startup_paused: bool
1821
  @param instance: pause instance at startup?
1822
  @type reason: list of reasons
1823
  @param reason: the reason trail for this startup
1824
  @type store_reason: boolean
1825
  @param store_reason: whether to store the shutdown reason trail on file
1826
  @rtype: None
1827

1828
  """
1829
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1830
                                                   instance.hvparams)
1831

    
1832
  if instance.name in running_instances:
1833
    logging.info("Instance %s already running, not starting", instance.name)
1834
    return
1835

    
1836
  try:
1837
    block_devices = _GatherAndLinkBlockDevs(instance)
1838
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1839
    hyper.StartInstance(instance, block_devices, startup_paused)
1840
    if store_reason:
1841
      _StoreInstReasonTrail(instance.name, reason)
1842
  except errors.BlockDeviceError, err:
1843
    _Fail("Block device error: %s", err, exc=True)
1844
  except errors.HypervisorError, err:
1845
    _RemoveBlockDevLinks(instance.name, instance.disks_info)
1846
    _Fail("Hypervisor error: %s", err, exc=True)
1847

    
1848

    
1849
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1850
  """Shut an instance down.
1851

1852
  @note: this functions uses polling with a hardcoded timeout.
1853

1854
  @type instance: L{objects.Instance}
1855
  @param instance: the instance object
1856
  @type timeout: integer
1857
  @param timeout: maximum timeout for soft shutdown
1858
  @type reason: list of reasons
1859
  @param reason: the reason trail for this shutdown
1860
  @type store_reason: boolean
1861
  @param store_reason: whether to store the shutdown reason trail on file
1862
  @rtype: None
1863

1864
  """
1865
  hv_name = instance.hypervisor
1866
  hyper = hypervisor.GetHypervisor(hv_name)
1867
  iname = instance.name
1868

    
1869
  if instance.name not in hyper.ListInstances(instance.hvparams):
1870
    logging.info("Instance %s not running, doing nothing", iname)
1871
    return
1872

    
1873
  class _TryShutdown:
1874
    def __init__(self):
1875
      self.tried_once = False
1876

    
1877
    def __call__(self):
1878
      if iname not in hyper.ListInstances(instance.hvparams):
1879
        return
1880

    
1881
      try:
1882
        hyper.StopInstance(instance, retry=self.tried_once)
1883
        if store_reason:
1884
          _StoreInstReasonTrail(instance.name, reason)
1885
      except errors.HypervisorError, err:
1886
        if iname not in hyper.ListInstances(instance.hvparams):
1887
          # if the instance is no longer existing, consider this a
1888
          # success and go to cleanup
1889
          return
1890

    
1891
        _Fail("Failed to stop instance %s: %s", iname, err)
1892

    
1893
      self.tried_once = True
1894

    
1895
      raise utils.RetryAgain()
1896

    
1897
  try:
1898
    utils.Retry(_TryShutdown(), 5, timeout)
1899
  except utils.RetryTimeout:
1900
    # the shutdown did not succeed
1901
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1902

    
1903
    try:
1904
      hyper.StopInstance(instance, force=True)
1905
    except errors.HypervisorError, err:
1906
      if iname in hyper.ListInstances(instance.hvparams):
1907
        # only raise an error if the instance still exists, otherwise
1908
        # the error could simply be "instance ... unknown"!
1909
        _Fail("Failed to force stop instance %s: %s", iname, err)
1910

    
1911
    time.sleep(1)
1912

    
1913
    if iname in hyper.ListInstances(instance.hvparams):
1914
      _Fail("Could not shutdown instance %s even by destroy", iname)
1915

    
1916
  try:
1917
    hyper.CleanupInstance(instance.name)
1918
  except errors.HypervisorError, err:
1919
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1920

    
1921
  _RemoveBlockDevLinks(iname, instance.disks_info)
1922

    
1923

    
1924
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1925
  """Reboot an instance.
1926

1927
  @type instance: L{objects.Instance}
1928
  @param instance: the instance object to reboot
1929
  @type reboot_type: str
1930
  @param reboot_type: the type of reboot, one the following
1931
    constants:
1932
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1933
        instance OS, do not recreate the VM
1934
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1935
        restart the VM (at the hypervisor level)
1936
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1937
        not accepted here, since that mode is handled differently, in
1938
        cmdlib, and translates into full stop and start of the
1939
        instance (instead of a call_instance_reboot RPC)
1940
  @type shutdown_timeout: integer
1941
  @param shutdown_timeout: maximum timeout for soft shutdown
1942
  @type reason: list of reasons
1943
  @param reason: the reason trail for this reboot
1944
  @rtype: None
1945

1946
  """
1947
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1948
                                                   instance.hvparams)
1949

    
1950
  if instance.name not in running_instances:
1951
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1952

    
1953
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1954
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1955
    try:
1956
      hyper.RebootInstance(instance)
1957
    except errors.HypervisorError, err:
1958
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1959
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1960
    try:
1961
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1962
      result = StartInstance(instance, False, reason, store_reason=False)
1963
      _StoreInstReasonTrail(instance.name, reason)
1964
      return result
1965
    except errors.HypervisorError, err:
1966
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1967
  else:
1968
    _Fail("Invalid reboot_type received: %s", reboot_type)
1969

    
1970

    
1971
def InstanceBalloonMemory(instance, memory):
1972
  """Resize an instance's memory.
1973

1974
  @type instance: L{objects.Instance}
1975
  @param instance: the instance object
1976
  @type memory: int
1977
  @param memory: new memory amount in MB
1978
  @rtype: None
1979

1980
  """
1981
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1982
  running = hyper.ListInstances(instance.hvparams)
1983
  if instance.name not in running:
1984
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1985
    return
1986
  try:
1987
    hyper.BalloonInstanceMemory(instance, memory)
1988
  except errors.HypervisorError, err:
1989
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1990

    
1991

    
1992
def MigrationInfo(instance):
1993
  """Gather information about an instance to be migrated.
1994

1995
  @type instance: L{objects.Instance}
1996
  @param instance: the instance definition
1997

1998
  """
1999
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2000
  try:
2001
    info = hyper.MigrationInfo(instance)
2002
  except errors.HypervisorError, err:
2003
    _Fail("Failed to fetch migration information: %s", err, exc=True)
2004
  return info
2005

    
2006

    
2007
def AcceptInstance(instance, info, target):
2008
  """Prepare the node to accept an instance.
2009

2010
  @type instance: L{objects.Instance}
2011
  @param instance: the instance definition
2012
  @type info: string/data (opaque)
2013
  @param info: migration information, from the source node
2014
  @type target: string
2015
  @param target: target host (usually ip), on this node
2016

2017
  """
2018
  # TODO: why is this required only for DTS_EXT_MIRROR?
2019
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2020
    # Create the symlinks, as the disks are not active
2021
    # in any way
2022
    try:
2023
      _GatherAndLinkBlockDevs(instance)
2024
    except errors.BlockDeviceError, err:
2025
      _Fail("Block device error: %s", err, exc=True)
2026

    
2027
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2028
  try:
2029
    hyper.AcceptInstance(instance, info, target)
2030
  except errors.HypervisorError, err:
2031
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2032
      _RemoveBlockDevLinks(instance.name, instance.disks_info)
2033
    _Fail("Failed to accept instance: %s", err, exc=True)
2034

    
2035

    
2036
def FinalizeMigrationDst(instance, info, success):
2037
  """Finalize any preparation to accept an instance.
2038

2039
  @type instance: L{objects.Instance}
2040
  @param instance: the instance definition
2041
  @type info: string/data (opaque)
2042
  @param info: migration information, from the source node
2043
  @type success: boolean
2044
  @param success: whether the migration was a success or a failure
2045

2046
  """
2047
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2048
  try:
2049
    hyper.FinalizeMigrationDst(instance, info, success)
2050
  except errors.HypervisorError, err:
2051
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2052

    
2053

    
2054
def MigrateInstance(cluster_name, instance, target, live):
2055
  """Migrates an instance to another node.
2056

2057
  @type cluster_name: string
2058
  @param cluster_name: name of the cluster
2059
  @type instance: L{objects.Instance}
2060
  @param instance: the instance definition
2061
  @type target: string
2062
  @param target: the target node name
2063
  @type live: boolean
2064
  @param live: whether the migration should be done live or not (the
2065
      interpretation of this parameter is left to the hypervisor)
2066
  @raise RPCFail: if migration fails for some reason
2067

2068
  """
2069
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2070

    
2071
  try:
2072
    hyper.MigrateInstance(cluster_name, instance, target, live)
2073
  except errors.HypervisorError, err:
2074
    _Fail("Failed to migrate instance: %s", err, exc=True)
2075

    
2076

    
2077
def FinalizeMigrationSource(instance, success, live):
2078
  """Finalize the instance migration on the source node.
2079

2080
  @type instance: L{objects.Instance}
2081
  @param instance: the instance definition of the migrated instance
2082
  @type success: bool
2083
  @param success: whether the migration succeeded or not
2084
  @type live: bool
2085
  @param live: whether the user requested a live migration or not
2086
  @raise RPCFail: If the execution fails for some reason
2087

2088
  """
2089
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2090

    
2091
  try:
2092
    hyper.FinalizeMigrationSource(instance, success, live)
2093
  except Exception, err:  # pylint: disable=W0703
2094
    _Fail("Failed to finalize the migration on the source node: %s", err,
2095
          exc=True)
2096

    
2097

    
2098
def GetMigrationStatus(instance):
2099
  """Get the migration status
2100

2101
  @type instance: L{objects.Instance}
2102
  @param instance: the instance that is being migrated
2103
  @rtype: L{objects.MigrationStatus}
2104
  @return: the status of the current migration (one of
2105
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2106
           progress info that can be retrieved from the hypervisor
2107
  @raise RPCFail: If the migration status cannot be retrieved
2108

2109
  """
2110
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2111
  try:
2112
    return hyper.GetMigrationStatus(instance)
2113
  except Exception, err:  # pylint: disable=W0703
2114
    _Fail("Failed to get migration status: %s", err, exc=True)
2115

    
2116

    
2117
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2118
  """Hotplug a device
2119

2120
  Hotplug is currently supported only for KVM Hypervisor.
2121
  @type instance: L{objects.Instance}
2122
  @param instance: the instance to which we hotplug a device
2123
  @type action: string
2124
  @param action: the hotplug action to perform
2125
  @type dev_type: string
2126
  @param dev_type: the device type to hotplug
2127
  @type device: either L{objects.NIC} or L{objects.Disk}
2128
  @param device: the device object to hotplug
2129
  @type extra: string
2130
  @param extra: extra info used by hotplug code (e.g. disk link)
2131
  @type seq: int
2132
  @param seq: the index of the device from master perspective
2133
  @raise RPCFail: in case instance does not have KVM hypervisor
2134

2135
  """
2136
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2137
  try:
2138
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2139
  except errors.HotplugError, err:
2140
    _Fail("Hotplug is not supported: %s", err)
2141

    
2142
  if action == constants.HOTPLUG_ACTION_ADD:
2143
    fn = hyper.HotAddDevice
2144
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2145
    fn = hyper.HotDelDevice
2146
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2147
    fn = hyper.HotModDevice
2148
  else:
2149
    assert action in constants.HOTPLUG_ALL_ACTIONS
2150

    
2151
  return fn(instance, dev_type, device, extra, seq)
2152

    
2153

    
2154
def HotplugSupported(instance):
2155
  """Checks if hotplug is generally supported.
2156

2157
  """
2158
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2159
  try:
2160
    hyper.HotplugSupported(instance)
2161
  except errors.HotplugError, err:
2162
    _Fail("Hotplug is not supported: %s", err)
2163

    
2164

    
2165
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2166
  """Creates a block device for an instance.
2167

2168
  @type disk: L{objects.Disk}
2169
  @param disk: the object describing the disk we should create
2170
  @type size: int
2171
  @param size: the size of the physical underlying device, in MiB
2172
  @type owner: str
2173
  @param owner: the name of the instance for which disk is created,
2174
      used for device cache data
2175
  @type on_primary: boolean
2176
  @param on_primary:  indicates if it is the primary node or not
2177
  @type info: string
2178
  @param info: string that will be sent to the physical device
2179
      creation, used for example to set (LVM) tags on LVs
2180
  @type excl_stor: boolean
2181
  @param excl_stor: Whether exclusive_storage is active
2182

2183
  @return: the new unique_id of the device (this can sometime be
2184
      computed only after creation), or None. On secondary nodes,
2185
      it's not required to return anything.
2186

2187
  """
2188
  # TODO: remove the obsolete "size" argument
2189
  # pylint: disable=W0613
2190
  clist = []
2191
  if disk.children:
2192
    for child in disk.children:
2193
      try:
2194
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2195
      except errors.BlockDeviceError, err:
2196
        _Fail("Can't assemble device %s: %s", child, err)
2197
      if on_primary or disk.AssembleOnSecondary():
2198
        # we need the children open in case the device itself has to
2199
        # be assembled
2200
        try:
2201
          # pylint: disable=E1103
2202
          crdev.Open()
2203
        except errors.BlockDeviceError, err:
2204
          _Fail("Can't make child '%s' read-write: %s", child, err)
2205
      clist.append(crdev)
2206

    
2207
  try:
2208
    device = bdev.Create(disk, clist, excl_stor)
2209
  except errors.BlockDeviceError, err:
2210
    _Fail("Can't create block device: %s", err)
2211

    
2212
  if on_primary or disk.AssembleOnSecondary():
2213
    try:
2214
      device.Assemble()
2215
    except errors.BlockDeviceError, err:
2216
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2217
    if on_primary or disk.OpenOnSecondary():
2218
      try:
2219
        device.Open(force=True)
2220
      except errors.BlockDeviceError, err:
2221
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2222
    DevCacheManager.UpdateCache(device.dev_path, owner,
2223
                                on_primary, disk.iv_name)
2224

    
2225
  device.SetInfo(info)
2226

    
2227
  return device.unique_id
2228

    
2229

    
2230
def _WipeDevice(path, offset, size):
2231
  """This function actually wipes the device.
2232

2233
  @param path: The path to the device to wipe
2234
  @param offset: The offset in MiB in the file
2235
  @param size: The size in MiB to write
2236

2237
  """
2238
  # Internal sizes are always in Mebibytes; if the following "dd" command
2239
  # should use a different block size the offset and size given to this
2240
  # function must be adjusted accordingly before being passed to "dd".
2241
  block_size = 1024 * 1024
2242

    
2243
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2244
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2245
         "count=%d" % size]
2246
  result = utils.RunCmd(cmd)
2247

    
2248
  if result.failed:
2249
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2250
          result.fail_reason, result.output)
2251

    
2252

    
2253
def BlockdevWipe(disk, offset, size):
2254
  """Wipes a block device.
2255

2256
  @type disk: L{objects.Disk}
2257
  @param disk: the disk object we want to wipe
2258
  @type offset: int
2259
  @param offset: The offset in MiB in the file
2260
  @type size: int
2261
  @param size: The size in MiB to write
2262

2263
  """
2264
  try:
2265
    rdev = _RecursiveFindBD(disk)
2266
  except errors.BlockDeviceError:
2267
    rdev = None
2268

    
2269
  if not rdev:
2270
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2271

    
2272
  # Do cross verify some of the parameters
2273
  if offset < 0:
2274
    _Fail("Negative offset")
2275
  if size < 0:
2276
    _Fail("Negative size")
2277
  if offset > rdev.size:
2278
    _Fail("Offset is bigger than device size")
2279
  if (offset + size) > rdev.size:
2280
    _Fail("The provided offset and size to wipe is bigger than device size")
2281

    
2282
  _WipeDevice(rdev.dev_path, offset, size)
2283

    
2284

    
2285
def BlockdevPauseResumeSync(disks, pause):
2286
  """Pause or resume the sync of the block device.
2287

2288
  @type disks: list of L{objects.Disk}
2289
  @param disks: the disks object we want to pause/resume
2290
  @type pause: bool
2291
  @param pause: Wheater to pause or resume
2292

2293
  """
2294
  success = []
2295
  for disk in disks:
2296
    try:
2297
      rdev = _RecursiveFindBD(disk)
2298
    except errors.BlockDeviceError:
2299
      rdev = None
2300

    
2301
    if not rdev:
2302
      success.append((False, ("Cannot change sync for device %s:"
2303
                              " device not found" % disk.iv_name)))
2304
      continue
2305

    
2306
    result = rdev.PauseResumeSync(pause)
2307

    
2308
    if result:
2309
      success.append((result, None))
2310
    else:
2311
      if pause:
2312
        msg = "Pause"
2313
      else:
2314
        msg = "Resume"
2315
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2316

    
2317
  return success
2318

    
2319

    
2320
def BlockdevRemove(disk):
2321
  """Remove a block device.
2322

2323
  @note: This is intended to be called recursively.
2324

2325
  @type disk: L{objects.Disk}
2326
  @param disk: the disk object we should remove
2327
  @rtype: boolean
2328
  @return: the success of the operation
2329

2330
  """
2331
  msgs = []
2332
  try:
2333
    rdev = _RecursiveFindBD(disk)
2334
  except errors.BlockDeviceError, err:
2335
    # probably can't attach
2336
    logging.info("Can't attach to device %s in remove", disk)
2337
    rdev = None
2338
  if rdev is not None:
2339
    r_path = rdev.dev_path
2340

    
2341
    def _TryRemove():
2342
      try:
2343
        rdev.Remove()
2344
        return []
2345
      except errors.BlockDeviceError, err:
2346
        return [str(err)]
2347

    
2348
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2349
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2350
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2351

    
2352
    if not msgs:
2353
      DevCacheManager.RemoveCache(r_path)
2354

    
2355
  if disk.children:
2356
    for child in disk.children:
2357
      try:
2358
        BlockdevRemove(child)
2359
      except RPCFail, err:
2360
        msgs.append(str(err))
2361

    
2362
  if msgs:
2363
    _Fail("; ".join(msgs))
2364

    
2365

    
2366
def _RecursiveAssembleBD(disk, owner, as_primary):
2367
  """Activate a block device for an instance.
2368

2369
  This is run on the primary and secondary nodes for an instance.
2370

2371
  @note: this function is called recursively.
2372

2373
  @type disk: L{objects.Disk}
2374
  @param disk: the disk we try to assemble
2375
  @type owner: str
2376
  @param owner: the name of the instance which owns the disk
2377
  @type as_primary: boolean
2378
  @param as_primary: if we should make the block device
2379
      read/write
2380

2381
  @return: the assembled device or None (in case no device
2382
      was assembled)
2383
  @raise errors.BlockDeviceError: in case there is an error
2384
      during the activation of the children or the device
2385
      itself
2386

2387
  """
2388
  children = []
2389
  if disk.children:
2390
    mcn = disk.ChildrenNeeded()
2391
    if mcn == -1:
2392
      mcn = 0 # max number of Nones allowed
2393
    else:
2394
      mcn = len(disk.children) - mcn # max number of Nones
2395
    for chld_disk in disk.children:
2396
      try:
2397
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2398
      except errors.BlockDeviceError, err:
2399
        if children.count(None) >= mcn:
2400
          raise
2401
        cdev = None
2402
        logging.error("Error in child activation (but continuing): %s",
2403
                      str(err))
2404
      children.append(cdev)
2405

    
2406
  if as_primary or disk.AssembleOnSecondary():
2407
    r_dev = bdev.Assemble(disk, children)
2408
    result = r_dev
2409
    if as_primary or disk.OpenOnSecondary():
2410
      r_dev.Open()
2411
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2412
                                as_primary, disk.iv_name)
2413

    
2414
  else:
2415
    result = True
2416
  return result
2417

    
2418

    
2419
def BlockdevAssemble(disk, owner, as_primary, idx):
2420
  """Activate a block device for an instance.
2421

2422
  This is a wrapper over _RecursiveAssembleBD.
2423

2424
  @rtype: str or boolean
2425
  @return: a tuple with the C{/dev/...} path and the created symlink
2426
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2427

2428
  """
2429
  try:
2430
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2431
    if isinstance(result, BlockDev):
2432
      # pylint: disable=E1103
2433
      dev_path = result.dev_path
2434
      link_name = None
2435
      if as_primary:
2436
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2437
    elif result:
2438
      return result, result
2439
    else:
2440
      _Fail("Unexpected result from _RecursiveAssembleBD")
2441
  except errors.BlockDeviceError, err:
2442
    _Fail("Error while assembling disk: %s", err, exc=True)
2443
  except OSError, err:
2444
    _Fail("Error while symlinking disk: %s", err, exc=True)
2445

    
2446
  return dev_path, link_name
2447

    
2448

    
2449
def BlockdevShutdown(disk):
2450
  """Shut down a block device.
2451

2452
  First, if the device is assembled (Attach() is successful), then
2453
  the device is shutdown. Then the children of the device are
2454
  shutdown.
2455

2456
  This function is called recursively. Note that we don't cache the
2457
  children or such, as oppossed to assemble, shutdown of different
2458
  devices doesn't require that the upper device was active.
2459

2460
  @type disk: L{objects.Disk}
2461
  @param disk: the description of the disk we should
2462
      shutdown
2463
  @rtype: None
2464

2465
  """
2466
  msgs = []
2467
  r_dev = _RecursiveFindBD(disk)
2468
  if r_dev is not None:
2469
    r_path = r_dev.dev_path
2470
    try:
2471
      r_dev.Shutdown()
2472
      DevCacheManager.RemoveCache(r_path)
2473
    except errors.BlockDeviceError, err:
2474
      msgs.append(str(err))
2475

    
2476
  if disk.children:
2477
    for child in disk.children:
2478
      try:
2479
        BlockdevShutdown(child)
2480
      except RPCFail, err:
2481
        msgs.append(str(err))
2482

    
2483
  if msgs:
2484
    _Fail("; ".join(msgs))
2485

    
2486

    
2487
def BlockdevAddchildren(parent_cdev, new_cdevs):
2488
  """Extend a mirrored block device.
2489

2490
  @type parent_cdev: L{objects.Disk}
2491
  @param parent_cdev: the disk to which we should add children
2492
  @type new_cdevs: list of L{objects.Disk}
2493
  @param new_cdevs: the list of children which we should add
2494
  @rtype: None
2495

2496
  """
2497
  parent_bdev = _RecursiveFindBD(parent_cdev)
2498
  if parent_bdev is None:
2499
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2500
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2501
  if new_bdevs.count(None) > 0:
2502
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2503
  parent_bdev.AddChildren(new_bdevs)
2504

    
2505

    
2506
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2507
  """Shrink a mirrored block device.
2508

2509
  @type parent_cdev: L{objects.Disk}
2510
  @param parent_cdev: the disk from which we should remove children
2511
  @type new_cdevs: list of L{objects.Disk}
2512
  @param new_cdevs: the list of children which we should remove
2513
  @rtype: None
2514

2515
  """
2516
  parent_bdev = _RecursiveFindBD(parent_cdev)
2517
  if parent_bdev is None:
2518
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2519
  devs = []
2520
  for disk in new_cdevs:
2521
    rpath = disk.StaticDevPath()
2522
    if rpath is None:
2523
      bd = _RecursiveFindBD(disk)
2524
      if bd is None:
2525
        _Fail("Can't find device %s while removing children", disk)
2526
      else:
2527
        devs.append(bd.dev_path)
2528
    else:
2529
      if not utils.IsNormAbsPath(rpath):
2530
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2531
      devs.append(rpath)
2532
  parent_bdev.RemoveChildren(devs)
2533

    
2534

    
2535
def BlockdevGetmirrorstatus(disks):
2536
  """Get the mirroring status of a list of devices.
2537

2538
  @type disks: list of L{objects.Disk}
2539
  @param disks: the list of disks which we should query
2540
  @rtype: disk
2541
  @return: List of L{objects.BlockDevStatus}, one for each disk
2542
  @raise errors.BlockDeviceError: if any of the disks cannot be
2543
      found
2544

2545
  """
2546
  stats = []
2547
  for dsk in disks:
2548
    rbd = _RecursiveFindBD(dsk)
2549
    if rbd is None:
2550
      _Fail("Can't find device %s", dsk)
2551

    
2552
    stats.append(rbd.CombinedSyncStatus())
2553

    
2554
  return stats
2555

    
2556

    
2557
def BlockdevGetmirrorstatusMulti(disks):
2558
  """Get the mirroring status of a list of devices.
2559

2560
  @type disks: list of L{objects.Disk}
2561
  @param disks: the list of disks which we should query
2562
  @rtype: disk
2563
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2564
    success/failure, status is L{objects.BlockDevStatus} on success, string
2565
    otherwise
2566

2567
  """
2568
  result = []
2569
  for disk in disks:
2570
    try:
2571
      rbd = _RecursiveFindBD(disk)
2572
      if rbd is None:
2573
        result.append((False, "Can't find device %s" % disk))
2574
        continue
2575

    
2576
      status = rbd.CombinedSyncStatus()
2577
    except errors.BlockDeviceError, err:
2578
      logging.exception("Error while getting disk status")
2579
      result.append((False, str(err)))
2580
    else:
2581
      result.append((True, status))
2582

    
2583
  assert len(disks) == len(result)
2584

    
2585
  return result
2586

    
2587

    
2588
def _RecursiveFindBD(disk):
2589
  """Check if a device is activated.
2590

2591
  If so, return information about the real device.
2592

2593
  @type disk: L{objects.Disk}
2594
  @param disk: the disk object we need to find
2595

2596
  @return: None if the device can't be found,
2597
      otherwise the device instance
2598

2599
  """
2600
  children = []
2601
  if disk.children:
2602
    for chdisk in disk.children:
2603
      children.append(_RecursiveFindBD(chdisk))
2604

    
2605
  return bdev.FindDevice(disk, children)
2606

    
2607

    
2608
def _OpenRealBD(disk):
2609
  """Opens the underlying block device of a disk.
2610

2611
  @type disk: L{objects.Disk}
2612
  @param disk: the disk object we want to open
2613

2614
  """
2615
  real_disk = _RecursiveFindBD(disk)
2616
  if real_disk is None:
2617
    _Fail("Block device '%s' is not set up", disk)
2618

    
2619
  real_disk.Open()
2620

    
2621
  return real_disk
2622

    
2623

    
2624
def BlockdevFind(disk):
2625
  """Check if a device is activated.
2626

2627
  If it is, return information about the real device.
2628

2629
  @type disk: L{objects.Disk}
2630
  @param disk: the disk to find
2631
  @rtype: None or objects.BlockDevStatus
2632
  @return: None if the disk cannot be found, otherwise a the current
2633
           information
2634

2635
  """
2636
  try:
2637
    rbd = _RecursiveFindBD(disk)
2638
  except errors.BlockDeviceError, err:
2639
    _Fail("Failed to find device: %s", err, exc=True)
2640

    
2641
  if rbd is None:
2642
    return None
2643

    
2644
  return rbd.GetSyncStatus()
2645

    
2646

    
2647
def BlockdevGetdimensions(disks):
2648
  """Computes the size of the given disks.
2649

2650
  If a disk is not found, returns None instead.
2651

2652
  @type disks: list of L{objects.Disk}
2653
  @param disks: the list of disk to compute the size for
2654
  @rtype: list
2655
  @return: list with elements None if the disk cannot be found,
2656
      otherwise the pair (size, spindles), where spindles is None if the
2657
      device doesn't support that
2658

2659
  """
2660
  result = []
2661
  for cf in disks:
2662
    try:
2663
      rbd = _RecursiveFindBD(cf)
2664
    except errors.BlockDeviceError:
2665
      result.append(None)
2666
      continue
2667
    if rbd is None:
2668
      result.append(None)
2669
    else:
2670
      result.append(rbd.GetActualDimensions())
2671
  return result
2672

    
2673

    
2674
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2675
  """Write a file to the filesystem.
2676

2677
  This allows the master to overwrite(!) a file. It will only perform
2678
  the operation if the file belongs to a list of configuration files.
2679

2680
  @type file_name: str
2681
  @param file_name: the target file name
2682
  @type data: str
2683
  @param data: the new contents of the file
2684
  @type mode: int
2685
  @param mode: the mode to give the file (can be None)
2686
  @type uid: string
2687
  @param uid: the owner of the file
2688
  @type gid: string
2689
  @param gid: the group of the file
2690
  @type atime: float
2691
  @param atime: the atime to set on the file (can be None)
2692
  @type mtime: float
2693
  @param mtime: the mtime to set on the file (can be None)
2694
  @rtype: None
2695

2696
  """
2697
  file_name = vcluster.LocalizeVirtualPath(file_name)
2698

    
2699
  if not os.path.isabs(file_name):
2700
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2701

    
2702
  if file_name not in _ALLOWED_UPLOAD_FILES:
2703
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2704
          file_name)
2705

    
2706
  raw_data = _Decompress(data)
2707

    
2708
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2709
    _Fail("Invalid username/groupname type")
2710

    
2711
  getents = runtime.GetEnts()
2712
  uid = getents.LookupUser(uid)
2713
  gid = getents.LookupGroup(gid)
2714

    
2715
  utils.SafeWriteFile(file_name, None,
2716
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2717
                      atime=atime, mtime=mtime)
2718

    
2719

    
2720
def RunOob(oob_program, command, node, timeout):
2721
  """Executes oob_program with given command on given node.
2722

2723
  @param oob_program: The path to the executable oob_program
2724
  @param command: The command to invoke on oob_program
2725
  @param node: The node given as an argument to the program
2726
  @param timeout: Timeout after which we kill the oob program
2727

2728
  @return: stdout
2729
  @raise RPCFail: If execution fails for some reason
2730

2731
  """
2732
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2733

    
2734
  if result.failed:
2735
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2736
          result.fail_reason, result.output)
2737

    
2738
  return result.stdout
2739

    
2740

    
2741
def _OSOndiskAPIVersion(os_dir):
2742
  """Compute and return the API version of a given OS.
2743

2744
  This function will try to read the API version of the OS residing in
2745
  the 'os_dir' directory.
2746

2747
  @type os_dir: str
2748
  @param os_dir: the directory in which we should look for the OS
2749
  @rtype: tuple
2750
  @return: tuple (status, data) with status denoting the validity and
2751
      data holding either the vaid versions or an error message
2752

2753
  """
2754
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2755

    
2756
  try:
2757
    st = os.stat(api_file)
2758
  except EnvironmentError, err:
2759
    return False, ("Required file '%s' not found under path %s: %s" %
2760
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2761

    
2762
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2763
    return False, ("File '%s' in %s is not a regular file" %
2764
                   (constants.OS_API_FILE, os_dir))
2765

    
2766
  try:
2767
    api_versions = utils.ReadFile(api_file).splitlines()
2768
  except EnvironmentError, err:
2769
    return False, ("Error while reading the API version file at %s: %s" %
2770
                   (api_file, utils.ErrnoOrStr(err)))
2771

    
2772
  try:
2773
    api_versions = [int(version.strip()) for version in api_versions]
2774
  except (TypeError, ValueError), err:
2775
    return False, ("API version(s) can't be converted to integer: %s" %
2776
                   str(err))
2777

    
2778
  return True, api_versions
2779

    
2780

    
2781
def DiagnoseOS(top_dirs=None):
2782
  """Compute the validity for all OSes.
2783

2784
  @type top_dirs: list
2785
  @param top_dirs: the list of directories in which to
2786
      search (if not given defaults to
2787
      L{pathutils.OS_SEARCH_PATH})
2788
  @rtype: list of L{objects.OS}
2789
  @return: a list of tuples (name, path, status, diagnose, variants,
2790
      parameters, api_version) for all (potential) OSes under all
2791
      search paths, where:
2792
          - name is the (potential) OS name
2793
          - path is the full path to the OS
2794
          - status True/False is the validity of the OS
2795
          - diagnose is the error message for an invalid OS, otherwise empty
2796
          - variants is a list of supported OS variants, if any
2797
          - parameters is a list of (name, help) parameters, if any
2798
          - api_version is a list of support OS API versions
2799

2800
  """
2801
  if top_dirs is None:
2802
    top_dirs = pathutils.OS_SEARCH_PATH
2803

    
2804
  result = []
2805
  for dir_name in top_dirs:
2806
    if os.path.isdir(dir_name):
2807
      try:
2808
        f_names = utils.ListVisibleFiles(dir_name)
2809
      except EnvironmentError, err:
2810
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2811
        break
2812
      for name in f_names:
2813
        os_path = utils.PathJoin(dir_name, name)
2814
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2815
        if status:
2816
          diagnose = ""
2817
          variants = os_inst.supported_variants
2818
          parameters = os_inst.supported_parameters
2819
          api_versions = os_inst.api_versions
2820
        else:
2821
          diagnose = os_inst
2822
          variants = parameters = api_versions = []
2823
        result.append((name, os_path, status, diagnose, variants,
2824
                       parameters, api_versions))
2825

    
2826
  return result
2827

    
2828

    
2829
def _TryOSFromDisk(name, base_dir=None):
2830
  """Create an OS instance from disk.
2831

2832
  This function will return an OS instance if the given name is a
2833
  valid OS name.
2834

2835
  @type base_dir: string
2836
  @keyword base_dir: Base directory containing OS installations.
2837
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2838
  @rtype: tuple
2839
  @return: success and either the OS instance if we find a valid one,
2840
      or error message
2841

2842
  """
2843
  if base_dir is None:
2844
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2845
  else:
2846
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2847

    
2848
  if os_dir is None:
2849
    return False, "Directory for OS %s not found in search path" % name
2850

    
2851
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2852
  if not status:
2853
    # push the error up
2854
    return status, api_versions
2855

    
2856
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2857
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2858
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2859

    
2860
  # OS Files dictionary, we will populate it with the absolute path
2861
  # names; if the value is True, then it is a required file, otherwise
2862
  # an optional one
2863
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2864

    
2865
  if max(api_versions) >= constants.OS_API_V15:
2866
    os_files[constants.OS_VARIANTS_FILE] = False
2867

    
2868
  if max(api_versions) >= constants.OS_API_V20:
2869
    os_files[constants.OS_PARAMETERS_FILE] = True
2870
  else:
2871
    del os_files[constants.OS_SCRIPT_VERIFY]
2872

    
2873
  for (filename, required) in os_files.items():
2874
    os_files[filename] = utils.PathJoin(os_dir, filename)
2875

    
2876
    try:
2877
      st = os.stat(os_files[filename])
2878
    except EnvironmentError, err:
2879
      if err.errno == errno.ENOENT and not required:
2880
        del os_files[filename]
2881
        continue
2882
      return False, ("File '%s' under path '%s' is missing (%s)" %
2883
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2884

    
2885
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2886
      return False, ("File '%s' under path '%s' is not a regular file" %
2887
                     (filename, os_dir))
2888

    
2889
    if filename in constants.OS_SCRIPTS:
2890
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2891
        return False, ("File '%s' under path '%s' is not executable" %
2892
                       (filename, os_dir))
2893

    
2894
  variants = []
2895
  if constants.OS_VARIANTS_FILE in os_files:
2896
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2897
    try:
2898
      variants = \
2899
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2900
    except EnvironmentError, err:
2901
      # we accept missing files, but not other errors
2902
      if err.errno != errno.ENOENT:
2903
        return False, ("Error while reading the OS variants file at %s: %s" %
2904
                       (variants_file, utils.ErrnoOrStr(err)))
2905

    
2906
  parameters = []
2907
  if constants.OS_PARAMETERS_FILE in os_files:
2908
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2909
    try:
2910
      parameters = utils.ReadFile(parameters_file).splitlines()
2911
    except EnvironmentError, err:
2912
      return False, ("Error while reading the OS parameters file at %s: %s" %
2913
                     (parameters_file, utils.ErrnoOrStr(err)))
2914
    parameters = [v.split(None, 1) for v in parameters]
2915

    
2916
  os_obj = objects.OS(name=name, path=os_dir,
2917
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2918
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2919
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2920
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2921
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2922
                                                 None),
2923
                      supported_variants=variants,
2924
                      supported_parameters=parameters,
2925
                      api_versions=api_versions)
2926
  return True, os_obj
2927

    
2928

    
2929
def OSFromDisk(name, base_dir=None):
2930
  """Create an OS instance from disk.
2931

2932
  This function will return an OS instance if the given name is a
2933
  valid OS name. Otherwise, it will raise an appropriate
2934
  L{RPCFail} exception, detailing why this is not a valid OS.
2935

2936
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2937
  an exception but returns true/false status data.
2938

2939
  @type base_dir: string
2940
  @keyword base_dir: Base directory containing OS installations.
2941
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2942
  @rtype: L{objects.OS}
2943
  @return: the OS instance if we find a valid one
2944
  @raise RPCFail: if we don't find a valid OS
2945

2946
  """
2947
  name_only = objects.OS.GetName(name)
2948
  status, payload = _TryOSFromDisk(name_only, base_dir)
2949

    
2950
  if not status:
2951
    _Fail(payload)
2952

    
2953
  return payload
2954

    
2955

    
2956
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2957
  """Calculate the basic environment for an os script.
2958

2959
  @type os_name: str
2960
  @param os_name: full operating system name (including variant)
2961
  @type inst_os: L{objects.OS}
2962
  @param inst_os: operating system for which the environment is being built
2963
  @type os_params: dict
2964
  @param os_params: the OS parameters
2965
  @type debug: integer
2966
  @param debug: debug level (0 or 1, for OS Api 10)
2967
  @rtype: dict
2968
  @return: dict of environment variables
2969
  @raise errors.BlockDeviceError: if the block device
2970
      cannot be found
2971

2972
  """
2973
  result = {}
2974
  api_version = \
2975
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2976
  result["OS_API_VERSION"] = "%d" % api_version
2977
  result["OS_NAME"] = inst_os.name
2978
  result["DEBUG_LEVEL"] = "%d" % debug
2979

    
2980
  # OS variants
2981
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2982
    variant = objects.OS.GetVariant(os_name)
2983
    if not variant:
2984
      variant = inst_os.supported_variants[0]
2985
  else:
2986
    variant = ""
2987
  result["OS_VARIANT"] = variant
2988

    
2989
  # OS params
2990
  for pname, pvalue in os_params.items():
2991
    result["OSP_%s" % pname.upper()] = pvalue
2992

    
2993
  # Set a default path otherwise programs called by OS scripts (or
2994
  # even hooks called from OS scripts) might break, and we don't want
2995
  # to have each script require setting a PATH variable
2996
  result["PATH"] = constants.HOOKS_PATH
2997

    
2998
  return result
2999

    
3000

    
3001
def OSEnvironment(instance, inst_os, debug=0):
3002
  """Calculate the environment for an os script.
3003

3004
  @type instance: L{objects.Instance}
3005
  @param instance: target instance for the os script run
3006
  @type inst_os: L{objects.OS}
3007
  @param inst_os: operating system for which the environment is being built
3008
  @type debug: integer
3009
  @param debug: debug level (0 or 1, for OS Api 10)
3010
  @rtype: dict
3011
  @return: dict of environment variables
3012
  @raise errors.BlockDeviceError: if the block device
3013
      cannot be found
3014

3015
  """
3016
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3017

    
3018
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3019
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3020

    
3021
  result["HYPERVISOR"] = instance.hypervisor
3022
  result["DISK_COUNT"] = "%d" % len(instance.disks_info)
3023
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3024
  result["INSTANCE_SECONDARY_NODES"] = \
3025
      ("%s" % " ".join(instance.secondary_nodes))
3026

    
3027
  # Disks
3028
  for idx, disk in enumerate(instance.disks_info):
3029
    real_disk = _OpenRealBD(disk)
3030
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3031
    result["DISK_%d_ACCESS" % idx] = disk.mode
3032
    result["DISK_%d_UUID" % idx] = disk.uuid
3033
    if disk.name:
3034
      result["DISK_%d_NAME" % idx] = disk.name
3035
    if constants.HV_DISK_TYPE in instance.hvparams:
3036
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3037
        instance.hvparams[constants.HV_DISK_TYPE]
3038
    if disk.dev_type in constants.DTS_BLOCK:
3039
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3040
    elif disk.dev_type in constants.DTS_FILEBASED:
3041
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3042
        "file:%s" % disk.logical_id[0]
3043

    
3044
  # NICs
3045
  for idx, nic in enumerate(instance.nics):
3046
    result["NIC_%d_MAC" % idx] = nic.mac
3047
    result["NIC_%d_UUID" % idx] = nic.uuid
3048
    if nic.name:
3049
      result["NIC_%d_NAME" % idx] = nic.name
3050
    if nic.ip:
3051
      result["NIC_%d_IP" % idx] = nic.ip
3052
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3053
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3054
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3055
    if nic.nicparams[constants.NIC_LINK]:
3056
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3057
    if nic.netinfo:
3058
      nobj = objects.Network.FromDict(nic.netinfo)
3059
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3060
    if constants.HV_NIC_TYPE in instance.hvparams:
3061
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3062
        instance.hvparams[constants.HV_NIC_TYPE]
3063

    
3064
  # HV/BE params
3065
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3066
    for key, value in source.items():
3067
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3068

    
3069
  return result
3070

    
3071

    
3072
def DiagnoseExtStorage(top_dirs=None):
3073
  """Compute the validity for all ExtStorage Providers.
3074

3075
  @type top_dirs: list
3076
  @param top_dirs: the list of directories in which to
3077
      search (if not given defaults to
3078
      L{pathutils.ES_SEARCH_PATH})
3079
  @rtype: list of L{objects.ExtStorage}
3080
  @return: a list of tuples (name, path, status, diagnose, parameters)
3081
      for all (potential) ExtStorage Providers under all
3082
      search paths, where:
3083
          - name is the (potential) ExtStorage Provider
3084
          - path is the full path to the ExtStorage Provider
3085
          - status True/False is the validity of the ExtStorage Provider
3086
          - diagnose is the error message for an invalid ExtStorage Provider,
3087
            otherwise empty
3088
          - parameters is a list of (name, help) parameters, if any
3089

3090
  """
3091
  if top_dirs is None:
3092
    top_dirs = pathutils.ES_SEARCH_PATH
3093

    
3094
  result = []
3095
  for dir_name in top_dirs:
3096
    if os.path.isdir(dir_name):
3097
      try:
3098
        f_names = utils.ListVisibleFiles(dir_name)
3099
      except EnvironmentError, err:
3100
        logging.exception("Can't list the ExtStorage directory %s: %s",
3101
                          dir_name, err)
3102
        break
3103
      for name in f_names:
3104
        es_path = utils.PathJoin(dir_name, name)
3105
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3106
        if status:
3107
          diagnose = ""
3108
          parameters = es_inst.supported_parameters
3109
        else:
3110
          diagnose = es_inst
3111
          parameters = []
3112
        result.append((name, es_path, status, diagnose, parameters))
3113

    
3114
  return result
3115

    
3116

    
3117
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3118
  """Grow a stack of block devices.
3119

3120
  This function is called recursively, with the childrens being the
3121
  first ones to resize.
3122

3123
  @type disk: L{objects.Disk}
3124
  @param disk: the disk to be grown
3125
  @type amount: integer
3126
  @param amount: the amount (in mebibytes) to grow with
3127
  @type dryrun: boolean
3128
  @param dryrun: whether to execute the operation in simulation mode
3129
      only, without actually increasing the size
3130
  @param backingstore: whether to execute the operation on backing storage
3131
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3132
      whereas LVM, file, RBD are backing storage
3133
  @rtype: (status, result)
3134
  @type excl_stor: boolean
3135
  @param excl_stor: Whether exclusive_storage is active
3136
  @return: a tuple with the status of the operation (True/False), and
3137
      the errors message if status is False
3138

3139
  """
3140
  r_dev = _RecursiveFindBD(disk)
3141
  if r_dev is None:
3142
    _Fail("Cannot find block device %s", disk)
3143

    
3144
  try:
3145
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3146
  except errors.BlockDeviceError, err:
3147
    _Fail("Failed to grow block device: %s", err, exc=True)
3148

    
3149

    
3150
def BlockdevSnapshot(disk):
3151
  """Create a snapshot copy of a block device.
3152

3153
  This function is called recursively, and the snapshot is actually created
3154
  just for the leaf lvm backend device.
3155

3156
  @type disk: L{objects.Disk}
3157
  @param disk: the disk to be snapshotted
3158
  @rtype: string
3159
  @return: snapshot disk ID as (vg, lv)
3160

3161
  """
3162
  if disk.dev_type == constants.DT_DRBD8:
3163
    if not disk.children:
3164
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3165
            disk.unique_id)
3166
    return BlockdevSnapshot(disk.children[0])
3167
  elif disk.dev_type == constants.DT_PLAIN:
3168
    r_dev = _RecursiveFindBD(disk)
3169
    if r_dev is not None:
3170
      # FIXME: choose a saner value for the snapshot size
3171
      # let's stay on the safe side and ask for the full size, for now
3172
      return r_dev.Snapshot(disk.size)
3173
    else:
3174
      _Fail("Cannot find block device %s", disk)
3175
  else:
3176
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3177
          disk.unique_id, disk.dev_type)
3178

    
3179

    
3180
def BlockdevSetInfo(disk, info):
3181
  """Sets 'metadata' information on block devices.
3182

3183
  This function sets 'info' metadata on block devices. Initial
3184
  information is set at device creation; this function should be used
3185
  for example after renames.
3186

3187
  @type disk: L{objects.Disk}
3188
  @param disk: the disk to be grown
3189
  @type info: string
3190
  @param info: new 'info' metadata
3191
  @rtype: (status, result)
3192
  @return: a tuple with the status of the operation (True/False), and
3193
      the errors message if status is False
3194

3195
  """
3196
  r_dev = _RecursiveFindBD(disk)
3197
  if r_dev is None:
3198
    _Fail("Cannot find block device %s", disk)
3199

    
3200
  try:
3201
    r_dev.SetInfo(info)
3202
  except errors.BlockDeviceError, err:
3203
    _Fail("Failed to set information on block device: %s", err, exc=True)
3204

    
3205

    
3206
def FinalizeExport(instance, snap_disks):
3207
  """Write out the export configuration information.
3208

3209
  @type instance: L{objects.Instance}
3210
  @param instance: the instance which we export, used for
3211
      saving configuration
3212
  @type snap_disks: list of L{objects.Disk}
3213
  @param snap_disks: list of snapshot block devices, which
3214
      will be used to get the actual name of the dump file
3215

3216
  @rtype: None
3217

3218
  """
3219
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3220
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3221

    
3222
  config = objects.SerializableConfigParser()
3223

    
3224
  config.add_section(constants.INISECT_EXP)
3225
  config.set(constants.INISECT_EXP, "version", "0")
3226
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3227
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3228
  config.set(constants.INISECT_EXP, "os", instance.os)
3229
  config.set(constants.INISECT_EXP, "compression", "none")
3230

    
3231
  config.add_section(constants.INISECT_INS)
3232
  config.set(constants.INISECT_INS, "name", instance.name)
3233
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3234
             instance.beparams[constants.BE_MAXMEM])
3235
  config.set(constants.INISECT_INS, "minmem", "%d" %
3236
             instance.beparams[constants.BE_MINMEM])
3237
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3238
  config.set(constants.INISECT_INS, "memory", "%d" %
3239
             instance.beparams[constants.BE_MAXMEM])
3240
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3241
             instance.beparams[constants.BE_VCPUS])
3242
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3243
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3244
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3245

    
3246
  nic_total = 0
3247
  for nic_count, nic in enumerate(instance.nics):
3248
    nic_total += 1
3249
    config.set(constants.INISECT_INS, "nic%d_mac" %
3250
               nic_count, "%s" % nic.mac)
3251
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3252
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3253
               "%s" % nic.network)
3254
    for param in constants.NICS_PARAMETER_TYPES:
3255
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3256
                 "%s" % nic.nicparams.get(param, None))
3257
  # TODO: redundant: on load can read nics until it doesn't exist
3258
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3259

    
3260
  disk_total = 0
3261
  for disk_count, disk in enumerate(snap_disks):
3262
    if disk:
3263
      disk_total += 1
3264
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3265
                 ("%s" % disk.iv_name))
3266
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3267
                 ("%s" % disk.logical_id[1]))
3268
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3269
                 ("%d" % disk.size))
3270

    
3271
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3272

    
3273
  # New-style hypervisor/backend parameters
3274

    
3275
  config.add_section(constants.INISECT_HYP)
3276
  for name, value in instance.hvparams.items():
3277
    if name not in constants.HVC_GLOBALS:
3278
      config.set(constants.INISECT_HYP, name, str(value))
3279

    
3280
  config.add_section(constants.INISECT_BEP)
3281
  for name, value in instance.beparams.items():
3282
    config.set(constants.INISECT_BEP, name, str(value))
3283

    
3284
  config.add_section(constants.INISECT_OSP)
3285
  for name, value in instance.osparams.items():
3286
    config.set(constants.INISECT_OSP, name, str(value))
3287

    
3288
  config.add_section(constants.INISECT_OSP_PRIVATE)
3289
  for name, value in instance.osparams_private.items():
3290
    config.set(constants.INISECT_OSP_PRIVATE, name, str(value.Get()))
3291

    
3292
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3293
                  data=config.Dumps())
3294
  shutil.rmtree(finaldestdir, ignore_errors=True)
3295
  shutil.move(destdir, finaldestdir)
3296

    
3297

    
3298
def ExportInfo(dest):
3299
  """Get export configuration information.
3300

3301
  @type dest: str
3302
  @param dest: directory containing the export
3303

3304
  @rtype: L{objects.SerializableConfigParser}
3305
  @return: a serializable config file containing the
3306
      export info
3307

3308
  """
3309
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3310

    
3311
  config = objects.SerializableConfigParser()
3312
  config.read(cff)
3313

    
3314
  if (not config.has_section(constants.INISECT_EXP) or
3315
      not config.has_section(constants.INISECT_INS)):
3316
    _Fail("Export info file doesn't have the required fields")
3317

    
3318
  return config.Dumps()
3319

    
3320

    
3321
def ListExports():
3322
  """Return a list of exports currently available on this machine.
3323

3324
  @rtype: list
3325
  @return: list of the exports
3326

3327
  """
3328
  if os.path.isdir(pathutils.EXPORT_DIR):
3329
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3330
  else:
3331
    _Fail("No exports directory")
3332

    
3333

    
3334
def RemoveExport(export):
3335
  """Remove an existing export from the node.
3336

3337
  @type export: str
3338
  @param export: the name of the export to remove
3339
  @rtype: None
3340

3341
  """
3342
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3343

    
3344
  try:
3345
    shutil.rmtree(target)
3346
  except EnvironmentError, err:
3347
    _Fail("Error while removing the export: %s", err, exc=True)
3348

    
3349

    
3350
def BlockdevRename(devlist):
3351
  """Rename a list of block devices.
3352

3353
  @type devlist: list of tuples
3354
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3355
      an L{objects.Disk} object describing the current disk, and new
3356
      unique_id is the name we rename it to
3357
  @rtype: boolean
3358
  @return: True if all renames succeeded, False otherwise
3359

3360
  """
3361
  msgs = []
3362
  result = True
3363
  for disk, unique_id in devlist:
3364
    dev = _RecursiveFindBD(disk)
3365
    if dev is None:
3366
      msgs.append("Can't find device %s in rename" % str(disk))
3367
      result = False
3368
      continue
3369
    try:
3370
      old_rpath = dev.dev_path
3371
      dev.Rename(unique_id)
3372
      new_rpath = dev.dev_path
3373
      if old_rpath != new_rpath:
3374
        DevCacheManager.RemoveCache(old_rpath)
3375
        # FIXME: we should add the new cache information here, like:
3376
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3377
        # but we don't have the owner here - maybe parse from existing
3378
        # cache? for now, we only lose lvm data when we rename, which
3379
        # is less critical than DRBD or MD
3380
    except errors.BlockDeviceError, err:
3381
      msgs.append("Can't rename device '%s' to '%s': %s" %
3382
                  (dev, unique_id, err))
3383
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3384
      result = False
3385
  if not result:
3386
    _Fail("; ".join(msgs))
3387

    
3388

    
3389
def _TransformFileStorageDir(fs_dir):
3390
  """Checks whether given file_storage_dir is valid.
3391

3392
  Checks wheter the given fs_dir is within the cluster-wide default
3393
  file_storage_dir or the shared_file_storage_dir, which are stored in
3394
  SimpleStore. Only paths under those directories are allowed.
3395

3396
  @type fs_dir: str
3397
  @param fs_dir: the path to check
3398

3399
  @return: the normalized path if valid, None otherwise
3400

3401
  """
3402
  filestorage.CheckFileStoragePath(fs_dir)
3403

    
3404
  return os.path.normpath(fs_dir)
3405

    
3406

    
3407
def CreateFileStorageDir(file_storage_dir):
3408
  """Create file storage directory.
3409

3410
  @type file_storage_dir: str
3411
  @param file_storage_dir: directory to create
3412

3413
  @rtype: tuple
3414
  @return: tuple with first element a boolean indicating wheter dir
3415
      creation was successful or not
3416

3417
  """
3418
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3419
  if os.path.exists(file_storage_dir):
3420
    if not os.path.isdir(file_storage_dir):
3421
      _Fail("Specified storage dir '%s' is not a directory",
3422
            file_storage_dir)
3423
  else:
3424
    try:
3425
      os.makedirs(file_storage_dir, 0750)
3426
    except OSError, err:
3427
      _Fail("Cannot create file storage directory '%s': %s",
3428
            file_storage_dir, err, exc=True)
3429

    
3430

    
3431
def RemoveFileStorageDir(file_storage_dir):
3432
  """Remove file storage directory.
3433

3434
  Remove it only if it's empty. If not log an error and return.
3435

3436
  @type file_storage_dir: str
3437
  @param file_storage_dir: the directory we should cleanup
3438
  @rtype: tuple (success,)
3439
  @return: tuple of one element, C{success}, denoting
3440
      whether the operation was successful
3441

3442
  """
3443
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3444
  if os.path.exists(file_storage_dir):
3445
    if not os.path.isdir(file_storage_dir):
3446
      _Fail("Specified Storage directory '%s' is not a directory",
3447
            file_storage_dir)
3448
    # deletes dir only if empty, otherwise we want to fail the rpc call
3449
    try:
3450
      os.rmdir(file_storage_dir)
3451
    except OSError, err:
3452
      _Fail("Cannot remove file storage directory '%s': %s",
3453
            file_storage_dir, err)
3454

    
3455

    
3456
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3457
  """Rename the file storage directory.
3458

3459
  @type old_file_storage_dir: str
3460
  @param old_file_storage_dir: the current path
3461
  @type new_file_storage_dir: str
3462
  @param new_file_storage_dir: the name we should rename to
3463
  @rtype: tuple (success,)
3464
  @return: tuple of one element, C{success}, denoting
3465
      whether the operation was successful
3466

3467
  """
3468
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3469
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3470
  if not os.path.exists(new_file_storage_dir):
3471
    if os.path.isdir(old_file_storage_dir):
3472
      try:
3473
        os.rename(old_file_storage_dir, new_file_storage_dir)
3474
      except OSError, err:
3475
        _Fail("Cannot rename '%s' to '%s': %s",
3476
              old_file_storage_dir, new_file_storage_dir, err)
3477
    else:
3478
      _Fail("Specified storage dir '%s' is not a directory",
3479
            old_file_storage_dir)
3480
  else:
3481
    if os.path.exists(old_file_storage_dir):
3482
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3483
            old_file_storage_dir, new_file_storage_dir)
3484

    
3485

    
3486
def _EnsureJobQueueFile(file_name):
3487
  """Checks whether the given filename is in the queue directory.
3488

3489
  @type file_name: str
3490
  @param file_name: the file name we should check
3491
  @rtype: None
3492
  @raises RPCFail: if the file is not valid
3493

3494
  """
3495
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3496
    _Fail("Passed job queue file '%s' does not belong to"
3497
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3498

    
3499

    
3500
def JobQueueUpdate(file_name, content):
3501
  """Updates a file in the queue directory.
3502

3503
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3504
  checking.
3505

3506
  @type file_name: str
3507
  @param file_name: the job file name
3508
  @type content: str
3509
  @param content: the new job contents
3510
  @rtype: boolean
3511
  @return: the success of the operation
3512

3513
  """
3514
  file_name = vcluster.LocalizeVirtualPath(file_name)
3515

    
3516
  _EnsureJobQueueFile(file_name)
3517
  getents = runtime.GetEnts()
3518

    
3519
  # Write and replace the file atomically
3520
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3521
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3522

    
3523

    
3524
def JobQueueRename(old, new):
3525
  """Renames a job queue file.
3526

3527
  This is just a wrapper over os.rename with proper checking.
3528

3529
  @type old: str
3530
  @param old: the old (actual) file name
3531
  @type new: str
3532
  @param new: the desired file name
3533
  @rtype: tuple
3534
  @return: the success of the operation and payload
3535

3536
  """
3537
  old = vcluster.LocalizeVirtualPath(old)
3538
  new = vcluster.LocalizeVirtualPath(new)
3539

    
3540
  _EnsureJobQueueFile(old)
3541
  _EnsureJobQueueFile(new)
3542

    
3543
  getents = runtime.GetEnts()
3544

    
3545
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3546
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3547

    
3548

    
3549
def BlockdevClose(instance_name, disks):
3550
  """Closes the given block devices.
3551

3552
  This means they will be switched to secondary mode (in case of
3553
  DRBD).
3554

3555
  @param instance_name: if the argument is not empty, the symlinks
3556
      of this instance will be removed
3557
  @type disks: list of L{objects.Disk}
3558
  @param disks: the list of disks to be closed
3559
  @rtype: tuple (success, message)
3560
  @return: a tuple of success and message, where success
3561
      indicates the succes of the operation, and message
3562
      which will contain the error details in case we
3563
      failed
3564

3565
  """
3566
  bdevs = []
3567
  for cf in disks:
3568
    rd = _RecursiveFindBD(cf)
3569
    if rd is None:
3570
      _Fail("Can't find device %s", cf)
3571
    bdevs.append(rd)
3572

    
3573
  msg = []
3574
  for rd in bdevs:
3575
    try:
3576
      rd.Close()
3577
    except errors.BlockDeviceError, err:
3578
      msg.append(str(err))
3579
  if msg:
3580
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3581
  else:
3582
    if instance_name:
3583
      _RemoveBlockDevLinks(instance_name, disks)
3584

    
3585

    
3586
def ValidateHVParams(hvname, hvparams):
3587
  """Validates the given hypervisor parameters.
3588

3589
  @type hvname: string
3590
  @param hvname: the hypervisor name
3591
  @type hvparams: dict
3592
  @param hvparams: the hypervisor parameters to be validated
3593
  @rtype: None
3594

3595
  """
3596
  try:
3597
    hv_type = hypervisor.GetHypervisor(hvname)
3598
    hv_type.ValidateParameters(hvparams)
3599
  except errors.HypervisorError, err:
3600
    _Fail(str(err), log=False)
3601

    
3602

    
3603
def _CheckOSPList(os_obj, parameters):
3604
  """Check whether a list of parameters is supported by the OS.
3605

3606
  @type os_obj: L{objects.OS}
3607
  @param os_obj: OS object to check
3608
  @type parameters: list
3609
  @param parameters: the list of parameters to check
3610

3611
  """
3612
  supported = [v[0] for v in os_obj.supported_parameters]
3613
  delta = frozenset(parameters).difference(supported)
3614
  if delta:
3615
    _Fail("The following parameters are not supported"
3616
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3617

    
3618

    
3619
def ValidateOS(required, osname, checks, osparams):
3620
  """Validate the given OS' parameters.
3621

3622
  @type required: boolean
3623
  @param required: whether absence of the OS should translate into
3624
      failure or not
3625
  @type osname: string
3626
  @param osname: the OS to be validated
3627
  @type checks: list
3628
  @param checks: list of the checks to run (currently only 'parameters')
3629
  @type osparams: dict
3630
  @param osparams: dictionary with OS parameters, some of which may be
3631
                   private.
3632
  @rtype: boolean
3633
  @return: True if the validation passed, or False if the OS was not
3634
      found and L{required} was false
3635

3636
  """
3637
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3638
    _Fail("Unknown checks required for OS %s: %s", osname,
3639
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3640

    
3641
  name_only = objects.OS.GetName(osname)
3642
  status, tbv = _TryOSFromDisk(name_only, None)
3643

    
3644
  if not status:
3645
    if required:
3646
      _Fail(tbv)
3647
    else:
3648
      return False
3649

    
3650
  if max(tbv.api_versions) < constants.OS_API_V20:
3651
    return True
3652

    
3653
  if constants.OS_VALIDATE_PARAMETERS in checks:
3654
    _CheckOSPList(tbv, osparams.keys())
3655

    
3656
  validate_env = OSCoreEnv(osname, tbv, osparams)
3657
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3658
                        cwd=tbv.path, reset_env=True)
3659
  if result.failed:
3660
    logging.error("os validate command '%s' returned error: %s output: %s",
3661
                  result.cmd, result.fail_reason, result.output)
3662
    _Fail("OS validation script failed (%s), output: %s",
3663
          result.fail_reason, result.output, log=False)
3664

    
3665
  return True
3666

    
3667

    
3668
def DemoteFromMC():
3669
  """Demotes the current node from master candidate role.
3670

3671
  """
3672
  # try to ensure we're not the master by mistake
3673
  master, myself = ssconf.GetMasterAndMyself()
3674
  if master == myself:
3675
    _Fail("ssconf status shows I'm the master node, will not demote")
3676

    
3677
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3678
  if not result.failed:
3679
    _Fail("The master daemon is running, will not demote")
3680

    
3681
  try:
3682
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3683
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3684
  except EnvironmentError, err:
3685
    if err.errno != errno.ENOENT:
3686
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3687

    
3688
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3689

    
3690

    
3691
def _GetX509Filenames(cryptodir, name):
3692
  """Returns the full paths for the private key and certificate.
3693

3694
  """
3695
  return (utils.PathJoin(cryptodir, name),
3696
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3697
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3698

    
3699

    
3700
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3701
  """Creates a new X509 certificate for SSL/TLS.
3702

3703
  @type validity: int
3704
  @param validity: Validity in seconds
3705
  @rtype: tuple; (string, string)
3706
  @return: Certificate name and public part
3707

3708
  """
3709
  (key_pem, cert_pem) = \
3710
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3711
                                     min(validity, _MAX_SSL_CERT_VALIDITY), 1)
3712

    
3713
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3714
                              prefix="x509-%s-" % utils.TimestampForFilename())
3715
  try:
3716
    name = os.path.basename(cert_dir)
3717
    assert len(name) > 5
3718

    
3719
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3720

    
3721
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3722
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3723

    
3724
    # Never return private key as it shouldn't leave the node
3725
    return (name, cert_pem)
3726
  except Exception:
3727
    shutil.rmtree(cert_dir, ignore_errors=True)
3728
    raise
3729

    
3730

    
3731
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3732
  """Removes a X509 certificate.
3733

3734
  @type name: string
3735
  @param name: Certificate name
3736

3737
  """
3738
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3739

    
3740
  utils.RemoveFile(key_file)
3741
  utils.RemoveFile(cert_file)
3742

    
3743
  try:
3744
    os.rmdir(cert_dir)
3745
  except EnvironmentError, err:
3746
    _Fail("Cannot remove certificate directory '%s': %s",
3747
          cert_dir, err)
3748

    
3749

    
3750
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3751
  """Returns the command for the requested input/output.
3752

3753
  @type instance: L{objects.Instance}
3754
  @param instance: The instance object
3755
  @param mode: Import/export mode
3756
  @param ieio: Input/output type
3757
  @param ieargs: Input/output arguments
3758

3759
  """
3760
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3761

    
3762
  env = None
3763
  prefix = None
3764
  suffix = None
3765
  exp_size = None
3766

    
3767
  if ieio == constants.IEIO_FILE:
3768
    (filename, ) = ieargs
3769

    
3770
    if not utils.IsNormAbsPath(filename):
3771
      _Fail("Path '%s' is not normalized or absolute", filename)
3772

    
3773
    real_filename = os.path.realpath(filename)
3774
    directory = os.path.dirname(real_filename)
3775

    
3776
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3777
      _Fail("File '%s' is not under exports directory '%s': %s",
3778
            filename, pathutils.EXPORT_DIR, real_filename)
3779

    
3780
    # Create directory
3781
    utils.Makedirs(directory, mode=0750)
3782

    
3783
    quoted_filename = utils.ShellQuote(filename)
3784

    
3785
    if mode == constants.IEM_IMPORT:
3786
      suffix = "> %s" % quoted_filename
3787
    elif mode == constants.IEM_EXPORT:
3788
      suffix = "< %s" % quoted_filename
3789

    
3790
      # Retrieve file size
3791
      try:
3792
        st = os.stat(filename)
3793
      except EnvironmentError, err:
3794
        logging.error("Can't stat(2) %s: %s", filename, err)
3795
      else:
3796
        exp_size = utils.BytesToMebibyte(st.st_size)
3797

    
3798
  elif ieio == constants.IEIO_RAW_DISK:
3799
    (disk, ) = ieargs
3800

    
3801
    real_disk = _OpenRealBD(disk)
3802

    
3803
    if mode == constants.IEM_IMPORT:
3804
      # we use nocreat to fail if the device is not already there or we pass a
3805
      # wrong path; we use notrunc to no attempt truncate on an LV device
3806
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3807
                                   real_disk.dev_path,
3808
                                   str(1024 * 1024)) # 1 MB
3809

    
3810
    elif mode == constants.IEM_EXPORT:
3811
      # the block size on the read dd is 1MiB to match our units
3812
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3813
                                   real_disk.dev_path,
3814
                                   str(1024 * 1024), # 1 MB
3815
                                   str(disk.size))
3816
      exp_size = disk.size
3817

    
3818
  elif ieio == constants.IEIO_SCRIPT:
3819
    (disk, disk_index, ) = ieargs
3820

    
3821
    assert isinstance(disk_index, (int, long))
3822

    
3823
    inst_os = OSFromDisk(instance.os)
3824
    env = OSEnvironment(instance, inst_os)
3825

    
3826
    if mode == constants.IEM_IMPORT:
3827
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3828
      env["IMPORT_INDEX"] = str(disk_index)
3829
      script = inst_os.import_script
3830

    
3831
    elif mode == constants.IEM_EXPORT:
3832
      real_disk = _OpenRealBD(disk)
3833
      env["EXPORT_DEVICE"] = real_disk.dev_path
3834
      env["EXPORT_INDEX"] = str(disk_index)
3835
      script = inst_os.export_script
3836

    
3837
    # TODO: Pass special environment only to script
3838
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3839

    
3840
    if mode == constants.IEM_IMPORT:
3841
      suffix = "| %s" % script_cmd
3842

    
3843
    elif mode == constants.IEM_EXPORT:
3844
      prefix = "%s |" % script_cmd
3845

    
3846
    # Let script predict size
3847
    exp_size = constants.IE_CUSTOM_SIZE
3848

    
3849
  else:
3850
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3851

    
3852
  return (env, prefix, suffix, exp_size)
3853

    
3854

    
3855
def _CreateImportExportStatusDir(prefix):
3856
  """Creates status directory for import/export.
3857

3858
  """
3859
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3860
                          prefix=("%s-%s-" %
3861
                                  (prefix, utils.TimestampForFilename())))
3862

    
3863

    
3864
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3865
                            ieio, ieioargs):
3866
  """Starts an import or export daemon.
3867

3868
  @param mode: Import/output mode
3869
  @type opts: L{objects.ImportExportOptions}
3870
  @param opts: Daemon options
3871
  @type host: string
3872
  @param host: Remote host for export (None for import)
3873
  @type port: int
3874
  @param port: Remote port for export (None for import)
3875
  @type instance: L{objects.Instance}
3876
  @param instance: Instance object
3877
  @type component: string
3878
  @param component: which part of the instance is transferred now,
3879
      e.g. 'disk/0'
3880
  @param ieio: Input/output type
3881
  @param ieioargs: Input/output arguments
3882

3883
  """
3884
  if mode == constants.IEM_IMPORT:
3885
    prefix = "import"
3886

    
3887
    if not (host is None and port is None):
3888
      _Fail("Can not specify host or port on import")
3889

    
3890
  elif mode == constants.IEM_EXPORT:
3891
    prefix = "export"
3892

    
3893
    if host is None or port is None:
3894
      _Fail("Host and port must be specified for an export")
3895

    
3896
  else:
3897
    _Fail("Invalid mode %r", mode)
3898

    
3899
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3900
    _Fail("Cluster certificate can only be used for both key and CA")
3901

    
3902
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3903
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3904

    
3905
  if opts.key_name is None:
3906
    # Use server.pem
3907
    key_path = pathutils.NODED_CERT_FILE
3908
    cert_path = pathutils.NODED_CERT_FILE
3909
    assert opts.ca_pem is None
3910
  else:
3911
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3912
                                                 opts.key_name)
3913
    assert opts.ca_pem is not None
3914

    
3915
  for i in [key_path, cert_path]:
3916
    if not os.path.exists(i):
3917
      _Fail("File '%s' does not exist" % i)
3918

    
3919
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3920
  try:
3921
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3922
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3923
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3924

    
3925
    if opts.ca_pem is None:
3926
      # Use server.pem
3927
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3928
    else:
3929
      ca = opts.ca_pem
3930

    
3931
    # Write CA file
3932
    utils.WriteFile(ca_file, data=ca, mode=0400)
3933

    
3934
    cmd = [
3935
      pathutils.IMPORT_EXPORT_DAEMON,
3936
      status_file, mode,
3937
      "--key=%s" % key_path,
3938
      "--cert=%s" % cert_path,
3939
      "--ca=%s" % ca_file,
3940
      ]
3941

    
3942
    if host:
3943
      cmd.append("--host=%s" % host)
3944

    
3945
    if port:
3946
      cmd.append("--port=%s" % port)
3947

    
3948
    if opts.ipv6:
3949
      cmd.append("--ipv6")
3950
    else:
3951
      cmd.append("--ipv4")
3952

    
3953
    if opts.compress:
3954
      cmd.append("--compress=%s" % opts.compress)
3955

    
3956
    if opts.magic:
3957
      cmd.append("--magic=%s" % opts.magic)
3958

    
3959
    if exp_size is not None:
3960
      cmd.append("--expected-size=%s" % exp_size)
3961

    
3962
    if cmd_prefix:
3963
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3964

    
3965
    if cmd_suffix:
3966
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3967

    
3968
    if mode == constants.IEM_EXPORT:
3969
      # Retry connection a few times when connecting to remote peer
3970
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3971
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3972
    elif opts.connect_timeout is not None:
3973
      assert mode == constants.IEM_IMPORT
3974
      # Overall timeout for establishing connection while listening
3975
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3976

    
3977
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3978

    
3979
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3980
    # support for receiving a file descriptor for output
3981
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3982
                      output=logfile)
3983

    
3984
    # The import/export name is simply the status directory name
3985
    return os.path.basename(status_dir)
3986

    
3987
  except Exception:
3988
    shutil.rmtree(status_dir, ignore_errors=True)
3989
    raise
3990

    
3991

    
3992
def GetImportExportStatus(names):
3993
  """Returns import/export daemon status.
3994

3995
  @type names: sequence
3996
  @param names: List of names
3997
  @rtype: List of dicts
3998
  @return: Returns a list of the state of each named import/export or None if a
3999
           status couldn't be read
4000

4001
  """
4002
  result = []
4003

    
4004
  for name in names:
4005
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
4006
                                 _IES_STATUS_FILE)
4007

    
4008
    try:
4009
      data = utils.ReadFile(status_file)
4010
    except EnvironmentError, err:
4011
      if err.errno != errno.ENOENT:
4012
        raise
4013
      data = None
4014

    
4015
    if not data:
4016
      result.append(None)
4017
      continue
4018

    
4019
    result.append(serializer.LoadJson(data))
4020

    
4021
  return result
4022

    
4023

    
4024
def AbortImportExport(name):
4025
  """Sends SIGTERM to a running import/export daemon.
4026

4027
  """
4028
  logging.info("Abort import/export %s", name)
4029

    
4030
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4031
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4032

    
4033
  if pid:
4034
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4035
                 name, pid)
4036
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4037

    
4038

    
4039
def CleanupImportExport(name):
4040
  """Cleanup after an import or export.
4041

4042
  If the import/export daemon is still running it's killed. Afterwards the
4043
  whole status directory is removed.
4044

4045
  """
4046
  logging.info("Finalizing import/export %s", name)
4047

    
4048
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4049

    
4050
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4051

    
4052
  if pid:
4053
    logging.info("Import/export %s is still running with PID %s",
4054
                 name, pid)
4055
    utils.KillProcess(pid, waitpid=False)
4056

    
4057
  shutil.rmtree(status_dir, ignore_errors=True)
4058

    
4059

    
4060
def _FindDisks(disks):
4061
  """Finds attached L{BlockDev}s for the given disks.
4062

4063
  @type disks: list of L{objects.Disk}
4064
  @param disks: the disk objects we need to find
4065

4066
  @return: list of L{BlockDev} objects or C{None} if a given disk
4067
           was not found or was no attached.
4068

4069
  """
4070
  bdevs = []
4071

    
4072
  for disk in disks:
4073
    rd = _RecursiveFindBD(disk)
4074
    if rd is None:
4075
      _Fail("Can't find device %s", disk)
4076
    bdevs.append(rd)
4077
  return bdevs
4078

    
4079

    
4080
def DrbdDisconnectNet(disks):
4081
  """Disconnects the network on a list of drbd devices.
4082

4083