Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 22114677

History | View | Annotate | Download (146 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103,C0302
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37
# C0302: This module has become too big and should be split up
38

    
39

    
40
import os
41
import os.path
42
import shutil
43
import time
44
import stat
45
import errno
46
import re
47
import random
48
import logging
49
import tempfile
50
import zlib
51
import base64
52
import signal
53

    
54
from ganeti import errors
55
from ganeti import utils
56
from ganeti import ssh
57
from ganeti import hypervisor
58
from ganeti import constants
59
from ganeti.storage import bdev
60
from ganeti.storage import drbd
61
from ganeti.storage import filestorage
62
from ganeti import objects
63
from ganeti import ssconf
64
from ganeti import serializer
65
from ganeti import netutils
66
from ganeti import runtime
67
from ganeti import compat
68
from ganeti import pathutils
69
from ganeti import vcluster
70
from ganeti import ht
71
from ganeti.storage.base import BlockDev
72
from ganeti.storage.drbd import DRBD8
73
from ganeti import hooksmaster
74

    
75

    
76
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
77
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
78
  pathutils.DATA_DIR,
79
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
80
  pathutils.QUEUE_DIR,
81
  pathutils.CRYPTO_KEYS_DIR,
82
  ])
83
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
84
_X509_KEY_FILE = "key"
85
_X509_CERT_FILE = "cert"
86
_IES_STATUS_FILE = "status"
87
_IES_PID_FILE = "pid"
88
_IES_CA_FILE = "ca"
89

    
90
#: Valid LVS output line regex
91
_LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
92

    
93
# Actions for the master setup script
94
_MASTER_START = "start"
95
_MASTER_STOP = "stop"
96

    
97
#: Maximum file permissions for restricted command directory and executables
98
_RCMD_MAX_MODE = (stat.S_IRWXU |
99
                  stat.S_IRGRP | stat.S_IXGRP |
100
                  stat.S_IROTH | stat.S_IXOTH)
101

    
102
#: Delay before returning an error for restricted commands
103
_RCMD_INVALID_DELAY = 10
104

    
105
#: How long to wait to acquire lock for restricted commands (shorter than
106
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
107
#: command requests arrive
108
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
109

    
110

    
111
class RPCFail(Exception):
112
  """Class denoting RPC failure.
113

114
  Its argument is the error message.
115

116
  """
117

    
118

    
119
def _GetInstReasonFilename(instance_name):
120
  """Path of the file containing the reason of the instance status change.
121

122
  @type instance_name: string
123
  @param instance_name: The name of the instance
124
  @rtype: string
125
  @return: The path of the file
126

127
  """
128
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
129

    
130

    
131
def _StoreInstReasonTrail(instance_name, trail):
132
  """Serialize a reason trail related to an instance change of state to file.
133

134
  The exact location of the file depends on the name of the instance and on
135
  the configuration of the Ganeti cluster defined at deploy time.
136

137
  @type instance_name: string
138
  @param instance_name: The name of the instance
139
  @rtype: None
140

141
  """
142
  json = serializer.DumpJson(trail)
143
  filename = _GetInstReasonFilename(instance_name)
144
  utils.WriteFile(filename, data=json)
145

    
146

    
147
def _Fail(msg, *args, **kwargs):
148
  """Log an error and the raise an RPCFail exception.
149

150
  This exception is then handled specially in the ganeti daemon and
151
  turned into a 'failed' return type. As such, this function is a
152
  useful shortcut for logging the error and returning it to the master
153
  daemon.
154

155
  @type msg: string
156
  @param msg: the text of the exception
157
  @raise RPCFail
158

159
  """
160
  if args:
161
    msg = msg % args
162
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
163
    if "exc" in kwargs and kwargs["exc"]:
164
      logging.exception(msg)
165
    else:
166
      logging.error(msg)
167
  raise RPCFail(msg)
168

    
169

    
170
def _GetConfig():
171
  """Simple wrapper to return a SimpleStore.
172

173
  @rtype: L{ssconf.SimpleStore}
174
  @return: a SimpleStore instance
175

176
  """
177
  return ssconf.SimpleStore()
178

    
179

    
180
def _GetSshRunner(cluster_name):
181
  """Simple wrapper to return an SshRunner.
182

183
  @type cluster_name: str
184
  @param cluster_name: the cluster name, which is needed
185
      by the SshRunner constructor
186
  @rtype: L{ssh.SshRunner}
187
  @return: an SshRunner instance
188

189
  """
190
  return ssh.SshRunner(cluster_name)
191

    
192

    
193
def _Decompress(data):
194
  """Unpacks data compressed by the RPC client.
195

196
  @type data: list or tuple
197
  @param data: Data sent by RPC client
198
  @rtype: str
199
  @return: Decompressed data
200

201
  """
202
  assert isinstance(data, (list, tuple))
203
  assert len(data) == 2
204
  (encoding, content) = data
205
  if encoding == constants.RPC_ENCODING_NONE:
206
    return content
207
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
208
    return zlib.decompress(base64.b64decode(content))
209
  else:
210
    raise AssertionError("Unknown data encoding")
211

    
212

    
213
def _CleanDirectory(path, exclude=None):
214
  """Removes all regular files in a directory.
215

216
  @type path: str
217
  @param path: the directory to clean
218
  @type exclude: list
219
  @param exclude: list of files to be excluded, defaults
220
      to the empty list
221

222
  """
223
  if path not in _ALLOWED_CLEAN_DIRS:
224
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
225
          path)
226

    
227
  if not os.path.isdir(path):
228
    return
229
  if exclude is None:
230
    exclude = []
231
  else:
232
    # Normalize excluded paths
233
    exclude = [os.path.normpath(i) for i in exclude]
234

    
235
  for rel_name in utils.ListVisibleFiles(path):
236
    full_name = utils.PathJoin(path, rel_name)
237
    if full_name in exclude:
238
      continue
239
    if os.path.isfile(full_name) and not os.path.islink(full_name):
240
      utils.RemoveFile(full_name)
241

    
242

    
243
def _BuildUploadFileList():
244
  """Build the list of allowed upload files.
245

246
  This is abstracted so that it's built only once at module import time.
247

248
  """
249
  allowed_files = set([
250
    pathutils.CLUSTER_CONF_FILE,
251
    pathutils.ETC_HOSTS,
252
    pathutils.SSH_KNOWN_HOSTS_FILE,
253
    pathutils.VNC_PASSWORD_FILE,
254
    pathutils.RAPI_CERT_FILE,
255
    pathutils.SPICE_CERT_FILE,
256
    pathutils.SPICE_CACERT_FILE,
257
    pathutils.RAPI_USERS_FILE,
258
    pathutils.CONFD_HMAC_KEY,
259
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
260
    ])
261

    
262
  for hv_name in constants.HYPER_TYPES:
263
    hv_class = hypervisor.GetHypervisorClass(hv_name)
264
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
265

    
266
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
267
    "Allowed file storage paths should never be uploaded via RPC"
268

    
269
  return frozenset(allowed_files)
270

    
271

    
272
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
273

    
274

    
275
def JobQueuePurge():
276
  """Removes job queue files and archived jobs.
277

278
  @rtype: tuple
279
  @return: True, None
280

281
  """
282
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
283
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
284

    
285

    
286
def GetMasterNodeName():
287
  """Returns the master node name.
288

289
  @rtype: string
290
  @return: name of the master node
291
  @raise RPCFail: in case of errors
292

293
  """
294
  try:
295
    return _GetConfig().GetMasterNode()
296
  except errors.ConfigurationError, err:
297
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
298

    
299

    
300
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
301
  """Decorator that runs hooks before and after the decorated function.
302

303
  @type hook_opcode: string
304
  @param hook_opcode: opcode of the hook
305
  @type hooks_path: string
306
  @param hooks_path: path of the hooks
307
  @type env_builder_fn: function
308
  @param env_builder_fn: function that returns a dictionary containing the
309
    environment variables for the hooks. Will get all the parameters of the
310
    decorated function.
311
  @raise RPCFail: in case of pre-hook failure
312

313
  """
314
  def decorator(fn):
315
    def wrapper(*args, **kwargs):
316
      _, myself = ssconf.GetMasterAndMyself()
317
      nodes = ([myself], [myself])  # these hooks run locally
318

    
319
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
320

    
321
      cfg = _GetConfig()
322
      hr = HooksRunner()
323
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
324
                                   hr.RunLocalHooks, None, env_fn,
325
                                   logging.warning, cfg.GetClusterName(),
326
                                   cfg.GetMasterNode())
327
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
328
      result = fn(*args, **kwargs)
329
      hm.RunPhase(constants.HOOKS_PHASE_POST)
330

    
331
      return result
332
    return wrapper
333
  return decorator
334

    
335

    
336
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
337
  """Builds environment variables for master IP hooks.
338

339
  @type master_params: L{objects.MasterNetworkParameters}
340
  @param master_params: network parameters of the master
341
  @type use_external_mip_script: boolean
342
  @param use_external_mip_script: whether to use an external master IP
343
    address setup script (unused, but necessary per the implementation of the
344
    _RunLocalHooks decorator)
345

346
  """
347
  # pylint: disable=W0613
348
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
349
  env = {
350
    "MASTER_NETDEV": master_params.netdev,
351
    "MASTER_IP": master_params.ip,
352
    "MASTER_NETMASK": str(master_params.netmask),
353
    "CLUSTER_IP_VERSION": str(ver),
354
  }
355

    
356
  return env
357

    
358

    
359
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
360
  """Execute the master IP address setup script.
361

362
  @type master_params: L{objects.MasterNetworkParameters}
363
  @param master_params: network parameters of the master
364
  @type action: string
365
  @param action: action to pass to the script. Must be one of
366
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
367
  @type use_external_mip_script: boolean
368
  @param use_external_mip_script: whether to use an external master IP
369
    address setup script
370
  @raise backend.RPCFail: if there are errors during the execution of the
371
    script
372

373
  """
374
  env = _BuildMasterIpEnv(master_params)
375

    
376
  if use_external_mip_script:
377
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
378
  else:
379
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
380

    
381
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
382

    
383
  if result.failed:
384
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
385
          (action, result.exit_code, result.output), log=True)
386

    
387

    
388
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
389
               _BuildMasterIpEnv)
390
def ActivateMasterIp(master_params, use_external_mip_script):
391
  """Activate the IP address of the master daemon.
392

393
  @type master_params: L{objects.MasterNetworkParameters}
394
  @param master_params: network parameters of the master
395
  @type use_external_mip_script: boolean
396
  @param use_external_mip_script: whether to use an external master IP
397
    address setup script
398
  @raise RPCFail: in case of errors during the IP startup
399

400
  """
401
  _RunMasterSetupScript(master_params, _MASTER_START,
402
                        use_external_mip_script)
403

    
404

    
405
def StartMasterDaemons(no_voting):
406
  """Activate local node as master node.
407

408
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
409

410
  @type no_voting: boolean
411
  @param no_voting: whether to start ganeti-masterd without a node vote
412
      but still non-interactively
413
  @rtype: None
414

415
  """
416

    
417
  if no_voting:
418
    masterd_args = "--no-voting --yes-do-it"
419
  else:
420
    masterd_args = ""
421

    
422
  env = {
423
    "EXTRA_MASTERD_ARGS": masterd_args,
424
    }
425

    
426
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
427
  if result.failed:
428
    msg = "Can't start Ganeti master: %s" % result.output
429
    logging.error(msg)
430
    _Fail(msg)
431

    
432

    
433
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
434
               _BuildMasterIpEnv)
435
def DeactivateMasterIp(master_params, use_external_mip_script):
436
  """Deactivate the master IP on this node.
437

438
  @type master_params: L{objects.MasterNetworkParameters}
439
  @param master_params: network parameters of the master
440
  @type use_external_mip_script: boolean
441
  @param use_external_mip_script: whether to use an external master IP
442
    address setup script
443
  @raise RPCFail: in case of errors during the IP turndown
444

445
  """
446
  _RunMasterSetupScript(master_params, _MASTER_STOP,
447
                        use_external_mip_script)
448

    
449

    
450
def StopMasterDaemons():
451
  """Stop the master daemons on this node.
452

453
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
454

455
  @rtype: None
456

457
  """
458
  # TODO: log and report back to the caller the error failures; we
459
  # need to decide in which case we fail the RPC for this
460

    
461
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
462
  if result.failed:
463
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
464
                  " and error %s",
465
                  result.cmd, result.exit_code, result.output)
466

    
467

    
468
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
469
  """Change the netmask of the master IP.
470

471
  @param old_netmask: the old value of the netmask
472
  @param netmask: the new value of the netmask
473
  @param master_ip: the master IP
474
  @param master_netdev: the master network device
475

476
  """
477
  if old_netmask == netmask:
478
    return
479

    
480
  if not netutils.IPAddress.Own(master_ip):
481
    _Fail("The master IP address is not up, not attempting to change its"
482
          " netmask")
483

    
484
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
485
                         "%s/%s" % (master_ip, netmask),
486
                         "dev", master_netdev, "label",
487
                         "%s:0" % master_netdev])
488
  if result.failed:
489
    _Fail("Could not set the new netmask on the master IP address")
490

    
491
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
492
                         "%s/%s" % (master_ip, old_netmask),
493
                         "dev", master_netdev, "label",
494
                         "%s:0" % master_netdev])
495
  if result.failed:
496
    _Fail("Could not bring down the master IP address with the old netmask")
497

    
498

    
499
def EtcHostsModify(mode, host, ip):
500
  """Modify a host entry in /etc/hosts.
501

502
  @param mode: The mode to operate. Either add or remove entry
503
  @param host: The host to operate on
504
  @param ip: The ip associated with the entry
505

506
  """
507
  if mode == constants.ETC_HOSTS_ADD:
508
    if not ip:
509
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
510
              " present")
511
    utils.AddHostToEtcHosts(host, ip)
512
  elif mode == constants.ETC_HOSTS_REMOVE:
513
    if ip:
514
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
515
              " parameter is present")
516
    utils.RemoveHostFromEtcHosts(host)
517
  else:
518
    RPCFail("Mode not supported")
519

    
520

    
521
def LeaveCluster(modify_ssh_setup):
522
  """Cleans up and remove the current node.
523

524
  This function cleans up and prepares the current node to be removed
525
  from the cluster.
526

527
  If processing is successful, then it raises an
528
  L{errors.QuitGanetiException} which is used as a special case to
529
  shutdown the node daemon.
530

531
  @param modify_ssh_setup: boolean
532

533
  """
534
  _CleanDirectory(pathutils.DATA_DIR)
535
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
536
  JobQueuePurge()
537

    
538
  if modify_ssh_setup:
539
    try:
540
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
541

    
542
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
543

    
544
      utils.RemoveFile(priv_key)
545
      utils.RemoveFile(pub_key)
546
    except errors.OpExecError:
547
      logging.exception("Error while processing ssh files")
548

    
549
  try:
550
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
551
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
552
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
553
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
554
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
555
  except: # pylint: disable=W0702
556
    logging.exception("Error while removing cluster secrets")
557

    
558
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
559
  if result.failed:
560
    logging.error("Command %s failed with exitcode %s and error %s",
561
                  result.cmd, result.exit_code, result.output)
562

    
563
  # Raise a custom exception (handled in ganeti-noded)
564
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
565

    
566

    
567
def _CheckStorageParams(params, num_params):
568
  """Performs sanity checks for storage parameters.
569

570
  @type params: list
571
  @param params: list of storage parameters
572
  @type num_params: int
573
  @param num_params: expected number of parameters
574

575
  """
576
  if params is None:
577
    raise errors.ProgrammerError("No storage parameters for storage"
578
                                 " reporting is provided.")
579
  if not isinstance(params, list):
580
    raise errors.ProgrammerError("The storage parameters are not of type"
581
                                 " list: '%s'" % params)
582
  if not len(params) == num_params:
583
    raise errors.ProgrammerError("Did not receive the expected number of"
584
                                 "storage parameters: expected %s,"
585
                                 " received '%s'" % (num_params, len(params)))
586

    
587

    
588
def _CheckLvmStorageParams(params):
589
  """Performs sanity check for the 'exclusive storage' flag.
590

591
  @see: C{_CheckStorageParams}
592

593
  """
594
  _CheckStorageParams(params, 1)
595
  excl_stor = params[0]
596
  if not isinstance(params[0], bool):
597
    raise errors.ProgrammerError("Exclusive storage parameter is not"
598
                                 " boolean: '%s'." % excl_stor)
599
  return excl_stor
600

    
601

    
602
def _GetLvmVgSpaceInfo(name, params):
603
  """Wrapper around C{_GetVgInfo} which checks the storage parameters.
604

605
  @type name: string
606
  @param name: name of the volume group
607
  @type params: list
608
  @param params: list of storage parameters, which in this case should be
609
    containing only one for exclusive storage
610

611
  """
612
  excl_stor = _CheckLvmStorageParams(params)
613
  return _GetVgInfo(name, excl_stor)
614

    
615

    
616
def _GetVgInfo(
617
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
618
  """Retrieves information about a LVM volume group.
619

620
  """
621
  # TODO: GetVGInfo supports returning information for multiple VGs at once
622
  vginfo = info_fn([name], excl_stor)
623
  if vginfo:
624
    vg_free = int(round(vginfo[0][0], 0))
625
    vg_size = int(round(vginfo[0][1], 0))
626
  else:
627
    vg_free = None
628
    vg_size = None
629

    
630
  return {
631
    "type": constants.ST_LVM_VG,
632
    "name": name,
633
    "storage_free": vg_free,
634
    "storage_size": vg_size,
635
    }
636

    
637

    
638
def _GetLvmPvSpaceInfo(name, params):
639
  """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
640

641
  @see: C{_GetLvmVgSpaceInfo}
642

643
  """
644
  excl_stor = _CheckLvmStorageParams(params)
645
  return _GetVgSpindlesInfo(name, excl_stor)
646

    
647

    
648
def _GetVgSpindlesInfo(
649
    name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
650
  """Retrieves information about spindles in an LVM volume group.
651

652
  @type name: string
653
  @param name: VG name
654
  @type excl_stor: bool
655
  @param excl_stor: exclusive storage
656
  @rtype: dict
657
  @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
658
      free spindles, total spindles respectively
659

660
  """
661
  if excl_stor:
662
    (vg_free, vg_size) = info_fn(name)
663
  else:
664
    vg_free = 0
665
    vg_size = 0
666
  return {
667
    "type": constants.ST_LVM_PV,
668
    "name": name,
669
    "storage_free": vg_free,
670
    "storage_size": vg_size,
671
    }
672

    
673

    
674
def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
675
  """Retrieves node information from a hypervisor.
676

677
  The information returned depends on the hypervisor. Common items:
678

679
    - vg_size is the size of the configured volume group in MiB
680
    - vg_free is the free size of the volume group in MiB
681
    - memory_dom0 is the memory allocated for domain0 in MiB
682
    - memory_free is the currently available (free) ram in MiB
683
    - memory_total is the total number of ram in MiB
684
    - hv_version: the hypervisor version, if available
685

686
  @type hvparams: dict of string
687
  @param hvparams: the hypervisor's hvparams
688

689
  """
690
  return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
691

    
692

    
693
def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
694
  """Retrieves node information for all hypervisors.
695

696
  See C{_GetHvInfo} for information on the output.
697

698
  @type hv_specs: list of pairs (string, dict of strings)
699
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
700

701
  """
702
  if hv_specs is None:
703
    return None
704

    
705
  result = []
706
  for hvname, hvparams in hv_specs:
707
    result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
708
  return result
709

    
710

    
711
def _GetNamedNodeInfo(names, fn):
712
  """Calls C{fn} for all names in C{names} and returns a dictionary.
713

714
  @rtype: None or dict
715

716
  """
717
  if names is None:
718
    return None
719
  else:
720
    return map(fn, names)
721

    
722

    
723
def GetNodeInfo(storage_units, hv_specs):
724
  """Gives back a hash with different information about the node.
725

726
  @type storage_units: list of tuples (string, string, list)
727
  @param storage_units: List of tuples (storage unit, identifier, parameters) to
728
    ask for disk space information. In case of lvm-vg, the identifier is
729
    the VG name. The parameters can contain additional, storage-type-specific
730
    parameters, for example exclusive storage for lvm storage.
731
  @type hv_specs: list of pairs (string, dict of strings)
732
  @param hv_specs: list of pairs of a hypervisor's name and its hvparams
733
  @rtype: tuple; (string, None/dict, None/dict)
734
  @return: Tuple containing boot ID, volume group information and hypervisor
735
    information
736

737
  """
738
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
739
  storage_info = _GetNamedNodeInfo(
740
    storage_units,
741
    (lambda (storage_type, storage_key, storage_params):
742
        _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
743
  hv_info = _GetHvInfoAll(hv_specs)
744
  return (bootid, storage_info, hv_info)
745

    
746

    
747
def _GetFileStorageSpaceInfo(path, params):
748
  """Wrapper around filestorage.GetSpaceInfo.
749

750
  The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
751
  and ignore the *args parameter to not leak it into the filestorage
752
  module's code.
753

754
  @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
755
    parameters.
756

757
  """
758
  _CheckStorageParams(params, 0)
759
  return filestorage.GetFileStorageSpaceInfo(path)
760

    
761

    
762
# FIXME: implement storage reporting for all missing storage types.
763
_STORAGE_TYPE_INFO_FN = {
764
  constants.ST_BLOCK: None,
765
  constants.ST_DISKLESS: None,
766
  constants.ST_EXT: None,
767
  constants.ST_FILE: _GetFileStorageSpaceInfo,
768
  constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
769
  constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
770
  constants.ST_SHARED_FILE: None,
771
  constants.ST_RADOS: None,
772
}
773

    
774

    
775
def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
776
  """Looks up and applies the correct function to calculate free and total
777
  storage for the given storage type.
778

779
  @type storage_type: string
780
  @param storage_type: the storage type for which the storage shall be reported.
781
  @type storage_key: string
782
  @param storage_key: identifier of a storage unit, e.g. the volume group name
783
    of an LVM storage unit
784
  @type args: any
785
  @param args: various parameters that can be used for storage reporting. These
786
    parameters and their semantics vary from storage type to storage type and
787
    are just propagated in this function.
788
  @return: the results of the application of the storage space function (see
789
    _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
790
    storage type
791
  @raises NotImplementedError: for storage types who don't support space
792
    reporting yet
793
  """
794
  fn = _STORAGE_TYPE_INFO_FN[storage_type]
795
  if fn is not None:
796
    return fn(storage_key, *args)
797
  else:
798
    raise NotImplementedError
799

    
800

    
801
def _CheckExclusivePvs(pvi_list):
802
  """Check that PVs are not shared among LVs
803

804
  @type pvi_list: list of L{objects.LvmPvInfo} objects
805
  @param pvi_list: information about the PVs
806

807
  @rtype: list of tuples (string, list of strings)
808
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
809

810
  """
811
  res = []
812
  for pvi in pvi_list:
813
    if len(pvi.lv_list) > 1:
814
      res.append((pvi.name, pvi.lv_list))
815
  return res
816

    
817

    
818
def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
819
                       get_hv_fn=hypervisor.GetHypervisor):
820
  """Verifies the hypervisor. Appends the results to the 'results' list.
821

822
  @type what: C{dict}
823
  @param what: a dictionary of things to check
824
  @type vm_capable: boolean
825
  @param vm_capable: whether or not this node is vm capable
826
  @type result: dict
827
  @param result: dictionary of verification results; results of the
828
    verifications in this function will be added here
829
  @type all_hvparams: dict of dict of string
830
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
831
  @type get_hv_fn: function
832
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
833

834
  """
835
  if not vm_capable:
836
    return
837

    
838
  if constants.NV_HYPERVISOR in what:
839
    result[constants.NV_HYPERVISOR] = {}
840
    for hv_name in what[constants.NV_HYPERVISOR]:
841
      hvparams = all_hvparams[hv_name]
842
      try:
843
        val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
844
      except errors.HypervisorError, err:
845
        val = "Error while checking hypervisor: %s" % str(err)
846
      result[constants.NV_HYPERVISOR][hv_name] = val
847

    
848

    
849
def _VerifyHvparams(what, vm_capable, result,
850
                    get_hv_fn=hypervisor.GetHypervisor):
851
  """Verifies the hvparams. Appends the results to the 'results' list.
852

853
  @type what: C{dict}
854
  @param what: a dictionary of things to check
855
  @type vm_capable: boolean
856
  @param vm_capable: whether or not this node is vm capable
857
  @type result: dict
858
  @param result: dictionary of verification results; results of the
859
    verifications in this function will be added here
860
  @type get_hv_fn: function
861
  @param get_hv_fn: function to retrieve the hypervisor, to improve testability
862

863
  """
864
  if not vm_capable:
865
    return
866

    
867
  if constants.NV_HVPARAMS in what:
868
    result[constants.NV_HVPARAMS] = []
869
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
870
      try:
871
        logging.info("Validating hv %s, %s", hv_name, hvparms)
872
        get_hv_fn(hv_name).ValidateParameters(hvparms)
873
      except errors.HypervisorError, err:
874
        result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
875

    
876

    
877
def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
878
  """Verifies the instance list.
879

880
  @type what: C{dict}
881
  @param what: a dictionary of things to check
882
  @type vm_capable: boolean
883
  @param vm_capable: whether or not this node is vm capable
884
  @type result: dict
885
  @param result: dictionary of verification results; results of the
886
    verifications in this function will be added here
887
  @type all_hvparams: dict of dict of string
888
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
889

890
  """
891
  if constants.NV_INSTANCELIST in what and vm_capable:
892
    # GetInstanceList can fail
893
    try:
894
      val = GetInstanceList(what[constants.NV_INSTANCELIST],
895
                            all_hvparams=all_hvparams)
896
    except RPCFail, err:
897
      val = str(err)
898
    result[constants.NV_INSTANCELIST] = val
899

    
900

    
901
def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
902
  """Verifies the node info.
903

904
  @type what: C{dict}
905
  @param what: a dictionary of things to check
906
  @type vm_capable: boolean
907
  @param vm_capable: whether or not this node is vm capable
908
  @type result: dict
909
  @param result: dictionary of verification results; results of the
910
    verifications in this function will be added here
911
  @type all_hvparams: dict of dict of string
912
  @param all_hvparams: dictionary mapping hypervisor names to hvparams
913

914
  """
915
  if constants.NV_HVINFO in what and vm_capable:
916
    hvname = what[constants.NV_HVINFO]
917
    hyper = hypervisor.GetHypervisor(hvname)
918
    hvparams = all_hvparams[hvname]
919
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
920

    
921

    
922
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
923
  """Verify the existance and validity of the client SSL certificate.
924

925
  """
926
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
927
  if not os.path.exists(cert_file):
928
    return (constants.CV_ERROR,
929
            "The client certificate does not exist. Run '%s' to create"
930
            " client certificates for all nodes." % create_cert_cmd)
931

    
932
  (errcode, msg) = utils.VerifyCertificate(cert_file)
933
  if errcode is not None:
934
    return (errcode, msg)
935
  else:
936
    # if everything is fine, we return the digest to be compared to the config
937
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
938

    
939

    
940
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
941
  """Verify the status of the local node.
942

943
  Based on the input L{what} parameter, various checks are done on the
944
  local node.
945

946
  If the I{filelist} key is present, this list of
947
  files is checksummed and the file/checksum pairs are returned.
948

949
  If the I{nodelist} key is present, we check that we have
950
  connectivity via ssh with the target nodes (and check the hostname
951
  report).
952

953
  If the I{node-net-test} key is present, we check that we have
954
  connectivity to the given nodes via both primary IP and, if
955
  applicable, secondary IPs.
956

957
  @type what: C{dict}
958
  @param what: a dictionary of things to check:
959
      - filelist: list of files for which to compute checksums
960
      - nodelist: list of nodes we should check ssh communication with
961
      - node-net-test: list of nodes we should check node daemon port
962
        connectivity with
963
      - hypervisor: list with hypervisors to run the verify for
964
  @type cluster_name: string
965
  @param cluster_name: the cluster's name
966
  @type all_hvparams: dict of dict of strings
967
  @param all_hvparams: a dictionary mapping hypervisor names to hvparams
968
  @type node_groups: a dict of strings
969
  @param node_groups: node _names_ mapped to their group uuids (it's enough to
970
      have only those nodes that are in `what["nodelist"]`)
971
  @type groups_cfg: a dict of dict of strings
972
  @param groups_cfg: a dictionary mapping group uuids to their configuration
973
  @rtype: dict
974
  @return: a dictionary with the same keys as the input dict, and
975
      values representing the result of the checks
976

977
  """
978
  result = {}
979
  my_name = netutils.Hostname.GetSysName()
980
  port = netutils.GetDaemonPort(constants.NODED)
981
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
982

    
983
  _VerifyHypervisors(what, vm_capable, result, all_hvparams)
984
  _VerifyHvparams(what, vm_capable, result)
985

    
986
  if constants.NV_FILELIST in what:
987
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
988
                                              what[constants.NV_FILELIST]))
989
    result[constants.NV_FILELIST] = \
990
      dict((vcluster.MakeVirtualPath(key), value)
991
           for (key, value) in fingerprints.items())
992

    
993
  if constants.NV_CLIENT_CERT in what:
994
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
995

    
996
  if constants.NV_NODELIST in what:
997
    (nodes, bynode) = what[constants.NV_NODELIST]
998

    
999
    # Add nodes from other groups (different for each node)
1000
    try:
1001
      nodes.extend(bynode[my_name])
1002
    except KeyError:
1003
      pass
1004

    
1005
    # Use a random order
1006
    random.shuffle(nodes)
1007

    
1008
    # Try to contact all nodes
1009
    val = {}
1010
    for node in nodes:
1011
      params = groups_cfg.get(node_groups.get(node))
1012
      ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1013
      logging.debug("Ssh port %s (None = default) for node %s",
1014
                    str(ssh_port), node)
1015
      success, message = _GetSshRunner(cluster_name). \
1016
                            VerifyNodeHostname(node, ssh_port)
1017
      if not success:
1018
        val[node] = message
1019

    
1020
    result[constants.NV_NODELIST] = val
1021

    
1022
  if constants.NV_NODENETTEST in what:
1023
    result[constants.NV_NODENETTEST] = tmp = {}
1024
    my_pip = my_sip = None
1025
    for name, pip, sip in what[constants.NV_NODENETTEST]:
1026
      if name == my_name:
1027
        my_pip = pip
1028
        my_sip = sip
1029
        break
1030
    if not my_pip:
1031
      tmp[my_name] = ("Can't find my own primary/secondary IP"
1032
                      " in the node list")
1033
    else:
1034
      for name, pip, sip in what[constants.NV_NODENETTEST]:
1035
        fail = []
1036
        if not netutils.TcpPing(pip, port, source=my_pip):
1037
          fail.append("primary")
1038
        if sip != pip:
1039
          if not netutils.TcpPing(sip, port, source=my_sip):
1040
            fail.append("secondary")
1041
        if fail:
1042
          tmp[name] = ("failure using the %s interface(s)" %
1043
                       " and ".join(fail))
1044

    
1045
  if constants.NV_MASTERIP in what:
1046
    # FIXME: add checks on incoming data structures (here and in the
1047
    # rest of the function)
1048
    master_name, master_ip = what[constants.NV_MASTERIP]
1049
    if master_name == my_name:
1050
      source = constants.IP4_ADDRESS_LOCALHOST
1051
    else:
1052
      source = None
1053
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1054
                                                     source=source)
1055

    
1056
  if constants.NV_USERSCRIPTS in what:
1057
    result[constants.NV_USERSCRIPTS] = \
1058
      [script for script in what[constants.NV_USERSCRIPTS]
1059
       if not utils.IsExecutable(script)]
1060

    
1061
  if constants.NV_OOB_PATHS in what:
1062
    result[constants.NV_OOB_PATHS] = tmp = []
1063
    for path in what[constants.NV_OOB_PATHS]:
1064
      try:
1065
        st = os.stat(path)
1066
      except OSError, err:
1067
        tmp.append("error stating out of band helper: %s" % err)
1068
      else:
1069
        if stat.S_ISREG(st.st_mode):
1070
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1071
            tmp.append(None)
1072
          else:
1073
            tmp.append("out of band helper %s is not executable" % path)
1074
        else:
1075
          tmp.append("out of band helper %s is not a file" % path)
1076

    
1077
  if constants.NV_LVLIST in what and vm_capable:
1078
    try:
1079
      val = GetVolumeList(utils.ListVolumeGroups().keys())
1080
    except RPCFail, err:
1081
      val = str(err)
1082
    result[constants.NV_LVLIST] = val
1083

    
1084
  _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1085

    
1086
  if constants.NV_VGLIST in what and vm_capable:
1087
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1088

    
1089
  if constants.NV_PVLIST in what and vm_capable:
1090
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1091
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1092
                                       filter_allocatable=False,
1093
                                       include_lvs=check_exclusive_pvs)
1094
    if check_exclusive_pvs:
1095
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1096
      for pvi in val:
1097
        # Avoid sending useless data on the wire
1098
        pvi.lv_list = []
1099
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1100

    
1101
  if constants.NV_VERSION in what:
1102
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1103
                                    constants.RELEASE_VERSION)
1104

    
1105
  _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1106

    
1107
  if constants.NV_DRBDVERSION in what and vm_capable:
1108
    try:
1109
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
1110
    except errors.BlockDeviceError, err:
1111
      logging.warning("Can't get DRBD version", exc_info=True)
1112
      drbd_version = str(err)
1113
    result[constants.NV_DRBDVERSION] = drbd_version
1114

    
1115
  if constants.NV_DRBDLIST in what and vm_capable:
1116
    try:
1117
      used_minors = drbd.DRBD8.GetUsedDevs()
1118
    except errors.BlockDeviceError, err:
1119
      logging.warning("Can't get used minors list", exc_info=True)
1120
      used_minors = str(err)
1121
    result[constants.NV_DRBDLIST] = used_minors
1122

    
1123
  if constants.NV_DRBDHELPER in what and vm_capable:
1124
    status = True
1125
    try:
1126
      payload = drbd.DRBD8.GetUsermodeHelper()
1127
    except errors.BlockDeviceError, err:
1128
      logging.error("Can't get DRBD usermode helper: %s", str(err))
1129
      status = False
1130
      payload = str(err)
1131
    result[constants.NV_DRBDHELPER] = (status, payload)
1132

    
1133
  if constants.NV_NODESETUP in what:
1134
    result[constants.NV_NODESETUP] = tmpr = []
1135
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1136
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1137
                  " under /sys, missing required directories /sys/block"
1138
                  " and /sys/class/net")
1139
    if (not os.path.isdir("/proc/sys") or
1140
        not os.path.isfile("/proc/sysrq-trigger")):
1141
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
1142
                  " under /proc, missing required directory /proc/sys and"
1143
                  " the file /proc/sysrq-trigger")
1144

    
1145
  if constants.NV_TIME in what:
1146
    result[constants.NV_TIME] = utils.SplitTime(time.time())
1147

    
1148
  if constants.NV_OSLIST in what and vm_capable:
1149
    result[constants.NV_OSLIST] = DiagnoseOS()
1150

    
1151
  if constants.NV_BRIDGES in what and vm_capable:
1152
    result[constants.NV_BRIDGES] = [bridge
1153
                                    for bridge in what[constants.NV_BRIDGES]
1154
                                    if not utils.BridgeExists(bridge)]
1155

    
1156
  if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1157
    result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1158
        filestorage.ComputeWrongFileStoragePaths()
1159

    
1160
  if what.get(constants.NV_FILE_STORAGE_PATH):
1161
    pathresult = filestorage.CheckFileStoragePath(
1162
        what[constants.NV_FILE_STORAGE_PATH])
1163
    if pathresult:
1164
      result[constants.NV_FILE_STORAGE_PATH] = pathresult
1165

    
1166
  if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1167
    pathresult = filestorage.CheckFileStoragePath(
1168
        what[constants.NV_SHARED_FILE_STORAGE_PATH])
1169
    if pathresult:
1170
      result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1171

    
1172
  return result
1173

    
1174

    
1175
def GetCryptoTokens(token_requests):
1176
  """Perform actions on the node's cryptographic tokens.
1177

1178
  Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1179
  for 'ssl'. Action 'get' returns the digest of the public client ssl
1180
  certificate. Action 'create' creates a new client certificate and private key
1181
  and also returns the digest of the certificate. The third parameter of a
1182
  token request are optional parameters for the actions, so far only the
1183
  filename is supported.
1184

1185
  @type token_requests: list of tuples of (string, string, dict), where the
1186
    first string is in constants.CRYPTO_TYPES, the second in
1187
    constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1188
    to string.
1189
  @param token_requests: list of requests of cryptographic tokens and actions
1190
    to perform on them. The actions come with a dictionary of options.
1191
  @rtype: list of tuples (string, string)
1192
  @return: list of tuples of the token type and the public crypto token
1193

1194
  """
1195
  getents = runtime.GetEnts()
1196
  _VALID_CERT_FILES = [pathutils.NODED_CERT_FILE,
1197
                       pathutils.NODED_CLIENT_CERT_FILE,
1198
                       pathutils.NODED_CLIENT_CERT_FILE_TMP]
1199
  _DEFAULT_CERT_FILE = pathutils.NODED_CLIENT_CERT_FILE
1200
  tokens = []
1201
  for (token_type, action, options) in token_requests:
1202
    if token_type not in constants.CRYPTO_TYPES:
1203
      raise errors.ProgrammerError("Token type '%s' not supported." %
1204
                                   token_type)
1205
    if action not in constants.CRYPTO_ACTIONS:
1206
      raise errors.ProgrammerError("Action '%s' is not supported." %
1207
                                   action)
1208
    if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1209
      if action == constants.CRYPTO_ACTION_CREATE:
1210

    
1211
        # extract file name from options
1212
        cert_filename = None
1213
        if options:
1214
          cert_filename = options.get(constants.CRYPTO_OPTION_CERT_FILE)
1215
        if not cert_filename:
1216
          cert_filename = _DEFAULT_CERT_FILE
1217
        # For security reason, we don't allow arbitrary filenames
1218
        if not cert_filename in _VALID_CERT_FILES:
1219
          raise errors.ProgrammerError(
1220
            "The certificate file name path '%s' is not allowed." %
1221
            cert_filename)
1222

    
1223
        # extract serial number from options
1224
        serial_no = None
1225
        if options:
1226
          try:
1227
            serial_no = int(options[constants.CRYPTO_OPTION_SERIAL_NO])
1228
          except ValueError:
1229
            raise errors.ProgrammerError(
1230
              "The given serial number is not an intenger: %s." %
1231
              options.get(constants.CRYPTO_OPTION_SERIAL_NO))
1232
          except KeyError:
1233
            raise errors.ProgrammerError("No serial number was provided.")
1234

    
1235
        if not serial_no:
1236
          raise errors.ProgrammerError(
1237
            "Cannot create an SSL certificate without a serial no.")
1238

    
1239
        utils.GenerateNewSslCert(
1240
          True, cert_filename, serial_no,
1241
          "Create new client SSL certificate in %s." % cert_filename,
1242
          uid=getents.masterd_uid, gid=getents.masterd_gid)
1243
        tokens.append((token_type,
1244
                       utils.GetCertificateDigest(
1245
                         cert_filename=cert_filename)))
1246
      elif action == constants.CRYPTO_ACTION_GET:
1247
        tokens.append((token_type,
1248
                       utils.GetCertificateDigest()))
1249
  return tokens
1250

    
1251

    
1252
def GetBlockDevSizes(devices):
1253
  """Return the size of the given block devices
1254

1255
  @type devices: list
1256
  @param devices: list of block device nodes to query
1257
  @rtype: dict
1258
  @return:
1259
    dictionary of all block devices under /dev (key). The value is their
1260
    size in MiB.
1261

1262
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
1263

1264
  """
1265
  DEV_PREFIX = "/dev/"
1266
  blockdevs = {}
1267

    
1268
  for devpath in devices:
1269
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
1270
      continue
1271

    
1272
    try:
1273
      st = os.stat(devpath)
1274
    except EnvironmentError, err:
1275
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
1276
      continue
1277

    
1278
    if stat.S_ISBLK(st.st_mode):
1279
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
1280
      if result.failed:
1281
        # We don't want to fail, just do not list this device as available
1282
        logging.warning("Cannot get size for block device %s", devpath)
1283
        continue
1284

    
1285
      size = int(result.stdout) / (1024 * 1024)
1286
      blockdevs[devpath] = size
1287
  return blockdevs
1288

    
1289

    
1290
def GetVolumeList(vg_names):
1291
  """Compute list of logical volumes and their size.
1292

1293
  @type vg_names: list
1294
  @param vg_names: the volume groups whose LVs we should list, or
1295
      empty for all volume groups
1296
  @rtype: dict
1297
  @return:
1298
      dictionary of all partions (key) with value being a tuple of
1299
      their size (in MiB), inactive and online status::
1300

1301
        {'xenvg/test1': ('20.06', True, True)}
1302

1303
      in case of errors, a string is returned with the error
1304
      details.
1305

1306
  """
1307
  lvs = {}
1308
  sep = "|"
1309
  if not vg_names:
1310
    vg_names = []
1311
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1312
                         "--separator=%s" % sep,
1313
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
1314
  if result.failed:
1315
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
1316

    
1317
  for line in result.stdout.splitlines():
1318
    line = line.strip()
1319
    match = _LVSLINE_REGEX.match(line)
1320
    if not match:
1321
      logging.error("Invalid line returned from lvs output: '%s'", line)
1322
      continue
1323
    vg_name, name, size, attr = match.groups()
1324
    inactive = attr[4] == "-"
1325
    online = attr[5] == "o"
1326
    virtual = attr[0] == "v"
1327
    if virtual:
1328
      # we don't want to report such volumes as existing, since they
1329
      # don't really hold data
1330
      continue
1331
    lvs[vg_name + "/" + name] = (size, inactive, online)
1332

    
1333
  return lvs
1334

    
1335

    
1336
def ListVolumeGroups():
1337
  """List the volume groups and their size.
1338

1339
  @rtype: dict
1340
  @return: dictionary with keys volume name and values the
1341
      size of the volume
1342

1343
  """
1344
  return utils.ListVolumeGroups()
1345

    
1346

    
1347
def NodeVolumes():
1348
  """List all volumes on this node.
1349

1350
  @rtype: list
1351
  @return:
1352
    A list of dictionaries, each having four keys:
1353
      - name: the logical volume name,
1354
      - size: the size of the logical volume
1355
      - dev: the physical device on which the LV lives
1356
      - vg: the volume group to which it belongs
1357

1358
    In case of errors, we return an empty list and log the
1359
    error.
1360

1361
    Note that since a logical volume can live on multiple physical
1362
    volumes, the resulting list might include a logical volume
1363
    multiple times.
1364

1365
  """
1366
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1367
                         "--separator=|",
1368
                         "--options=lv_name,lv_size,devices,vg_name"])
1369
  if result.failed:
1370
    _Fail("Failed to list logical volumes, lvs output: %s",
1371
          result.output)
1372

    
1373
  def parse_dev(dev):
1374
    return dev.split("(")[0]
1375

    
1376
  def handle_dev(dev):
1377
    return [parse_dev(x) for x in dev.split(",")]
1378

    
1379
  def map_line(line):
1380
    line = [v.strip() for v in line]
1381
    return [{"name": line[0], "size": line[1],
1382
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1383

    
1384
  all_devs = []
1385
  for line in result.stdout.splitlines():
1386
    if line.count("|") >= 3:
1387
      all_devs.extend(map_line(line.split("|")))
1388
    else:
1389
      logging.warning("Strange line in the output from lvs: '%s'", line)
1390
  return all_devs
1391

    
1392

    
1393
def BridgesExist(bridges_list):
1394
  """Check if a list of bridges exist on the current node.
1395

1396
  @rtype: boolean
1397
  @return: C{True} if all of them exist, C{False} otherwise
1398

1399
  """
1400
  missing = []
1401
  for bridge in bridges_list:
1402
    if not utils.BridgeExists(bridge):
1403
      missing.append(bridge)
1404

    
1405
  if missing:
1406
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1407

    
1408

    
1409
def GetInstanceListForHypervisor(hname, hvparams=None,
1410
                                 get_hv_fn=hypervisor.GetHypervisor):
1411
  """Provides a list of instances of the given hypervisor.
1412

1413
  @type hname: string
1414
  @param hname: name of the hypervisor
1415
  @type hvparams: dict of strings
1416
  @param hvparams: hypervisor parameters for the given hypervisor
1417
  @type get_hv_fn: function
1418
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1419
    name; optional parameter to increase testability
1420

1421
  @rtype: list
1422
  @return: a list of all running instances on the current node
1423
    - instance1.example.com
1424
    - instance2.example.com
1425

1426
  """
1427
  results = []
1428
  try:
1429
    hv = get_hv_fn(hname)
1430
    names = hv.ListInstances(hvparams=hvparams)
1431
    results.extend(names)
1432
  except errors.HypervisorError, err:
1433
    _Fail("Error enumerating instances (hypervisor %s): %s",
1434
          hname, err, exc=True)
1435
  return results
1436

    
1437

    
1438
def GetInstanceList(hypervisor_list, all_hvparams=None,
1439
                    get_hv_fn=hypervisor.GetHypervisor):
1440
  """Provides a list of instances.
1441

1442
  @type hypervisor_list: list
1443
  @param hypervisor_list: the list of hypervisors to query information
1444
  @type all_hvparams: dict of dict of strings
1445
  @param all_hvparams: a dictionary mapping hypervisor types to respective
1446
    cluster-wide hypervisor parameters
1447
  @type get_hv_fn: function
1448
  @param get_hv_fn: function that returns a hypervisor for the given hypervisor
1449
    name; optional parameter to increase testability
1450

1451
  @rtype: list
1452
  @return: a list of all running instances on the current node
1453
    - instance1.example.com
1454
    - instance2.example.com
1455

1456
  """
1457
  results = []
1458
  for hname in hypervisor_list:
1459
    hvparams = all_hvparams[hname]
1460
    results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
1461
                                                get_hv_fn=get_hv_fn))
1462
  return results
1463

    
1464

    
1465
def GetInstanceInfo(instance, hname, hvparams=None):
1466
  """Gives back the information about an instance as a dictionary.
1467

1468
  @type instance: string
1469
  @param instance: the instance name
1470
  @type hname: string
1471
  @param hname: the hypervisor type of the instance
1472
  @type hvparams: dict of strings
1473
  @param hvparams: the instance's hvparams
1474

1475
  @rtype: dict
1476
  @return: dictionary with the following keys:
1477
      - memory: memory size of instance (int)
1478
      - state: state of instance (HvInstanceState)
1479
      - time: cpu time of instance (float)
1480
      - vcpus: the number of vcpus (int)
1481

1482
  """
1483
  output = {}
1484

    
1485
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
1486
                                                          hvparams=hvparams)
1487
  if iinfo is not None:
1488
    output["memory"] = iinfo[2]
1489
    output["vcpus"] = iinfo[3]
1490
    output["state"] = iinfo[4]
1491
    output["time"] = iinfo[5]
1492

    
1493
  return output
1494

    
1495

    
1496
def GetInstanceMigratable(instance):
1497
  """Computes whether an instance can be migrated.
1498

1499
  @type instance: L{objects.Instance}
1500
  @param instance: object representing the instance to be checked.
1501

1502
  @rtype: tuple
1503
  @return: tuple of (result, description) where:
1504
      - result: whether the instance can be migrated or not
1505
      - description: a description of the issue, if relevant
1506

1507
  """
1508
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1509
  iname = instance.name
1510
  if iname not in hyper.ListInstances(instance.hvparams):
1511
    _Fail("Instance %s is not running", iname)
1512

    
1513
  for idx in range(len(instance.disks)):
1514
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1515
    if not os.path.islink(link_name):
1516
      logging.warning("Instance %s is missing symlink %s for disk %d",
1517
                      iname, link_name, idx)
1518

    
1519

    
1520
def GetAllInstancesInfo(hypervisor_list, all_hvparams):
1521
  """Gather data about all instances.
1522

1523
  This is the equivalent of L{GetInstanceInfo}, except that it
1524
  computes data for all instances at once, thus being faster if one
1525
  needs data about more than one instance.
1526

1527
  @type hypervisor_list: list
1528
  @param hypervisor_list: list of hypervisors to query for instance data
1529
  @type all_hvparams: dict of dict of strings
1530
  @param all_hvparams: mapping of hypervisor names to hvparams
1531

1532
  @rtype: dict
1533
  @return: dictionary of instance: data, with data having the following keys:
1534
      - memory: memory size of instance (int)
1535
      - state: xen state of instance (string)
1536
      - time: cpu time of instance (float)
1537
      - vcpus: the number of vcpus
1538

1539
  """
1540
  output = {}
1541
  for hname in hypervisor_list:
1542
    hvparams = all_hvparams[hname]
1543
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
1544
    if iinfo:
1545
      for name, _, memory, vcpus, state, times in iinfo:
1546
        value = {
1547
          "memory": memory,
1548
          "vcpus": vcpus,
1549
          "state": state,
1550
          "time": times,
1551
          }
1552
        if name in output:
1553
          # we only check static parameters, like memory and vcpus,
1554
          # and not state and time which can change between the
1555
          # invocations of the different hypervisors
1556
          for key in "memory", "vcpus":
1557
            if value[key] != output[name][key]:
1558
              _Fail("Instance %s is running twice"
1559
                    " with different parameters", name)
1560
        output[name] = value
1561

    
1562
  return output
1563

    
1564

    
1565
def GetInstanceConsoleInfo(instance_param_dict,
1566
                           get_hv_fn=hypervisor.GetHypervisor):
1567
  """Gather data about the console access of a set of instances of this node.
1568

1569
  This function assumes that the caller already knows which instances are on
1570
  this node, by calling a function such as L{GetAllInstancesInfo} or
1571
  L{GetInstanceList}.
1572

1573
  For every instance, a large amount of configuration data needs to be
1574
  provided to the hypervisor interface in order to receive the console
1575
  information. Whether this could or should be cut down can be discussed.
1576
  The information is provided in a dictionary indexed by instance name,
1577
  allowing any number of instance queries to be done.
1578

1579
  @type instance_param_dict: dict of string to tuple of dictionaries, where the
1580
    dictionaries represent: L{objects.Instance}, L{objects.Node},
1581
    L{objects.NodeGroup}, HvParams, BeParams
1582
  @param instance_param_dict: mapping of instance name to parameters necessary
1583
    for console information retrieval
1584

1585
  @rtype: dict
1586
  @return: dictionary of instance: data, with data having the following keys:
1587
      - instance: instance name
1588
      - kind: console kind
1589
      - message: used with kind == CONS_MESSAGE, indicates console to be
1590
                 unavailable, supplies error message
1591
      - host: host to connect to
1592
      - port: port to use
1593
      - user: user for login
1594
      - command: the command, broken into parts as an array
1595
      - display: unknown, potentially unused?
1596

1597
  """
1598

    
1599
  output = {}
1600
  for inst_name in instance_param_dict:
1601
    instance = instance_param_dict[inst_name]["instance"]
1602
    pnode = instance_param_dict[inst_name]["node"]
1603
    group = instance_param_dict[inst_name]["group"]
1604
    hvparams = instance_param_dict[inst_name]["hvParams"]
1605
    beparams = instance_param_dict[inst_name]["beParams"]
1606

    
1607
    instance = objects.Instance.FromDict(instance)
1608
    pnode = objects.Node.FromDict(pnode)
1609
    group = objects.NodeGroup.FromDict(group)
1610

    
1611
    h = get_hv_fn(instance.hypervisor)
1612
    output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
1613
                                             hvparams, beparams).ToDict()
1614

    
1615
  return output
1616

    
1617

    
1618
def _InstanceLogName(kind, os_name, instance, component):
1619
  """Compute the OS log filename for a given instance and operation.
1620

1621
  The instance name and os name are passed in as strings since not all
1622
  operations have these as part of an instance object.
1623

1624
  @type kind: string
1625
  @param kind: the operation type (e.g. add, import, etc.)
1626
  @type os_name: string
1627
  @param os_name: the os name
1628
  @type instance: string
1629
  @param instance: the name of the instance being imported/added/etc.
1630
  @type component: string or None
1631
  @param component: the name of the component of the instance being
1632
      transferred
1633

1634
  """
1635
  # TODO: Use tempfile.mkstemp to create unique filename
1636
  if component:
1637
    assert "/" not in component
1638
    c_msg = "-%s" % component
1639
  else:
1640
    c_msg = ""
1641
  base = ("%s-%s-%s%s-%s.log" %
1642
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1643
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1644

    
1645

    
1646
def InstanceOsAdd(instance, reinstall, debug):
1647
  """Add an OS to an instance.
1648

1649
  @type instance: L{objects.Instance}
1650
  @param instance: Instance whose OS is to be installed
1651
  @type reinstall: boolean
1652
  @param reinstall: whether this is an instance reinstall
1653
  @type debug: integer
1654
  @param debug: debug level, passed to the OS scripts
1655
  @rtype: None
1656

1657
  """
1658
  inst_os = OSFromDisk(instance.os)
1659

    
1660
  create_env = OSEnvironment(instance, inst_os, debug)
1661
  if reinstall:
1662
    create_env["INSTANCE_REINSTALL"] = "1"
1663

    
1664
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1665

    
1666
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1667
                        cwd=inst_os.path, output=logfile, reset_env=True)
1668
  if result.failed:
1669
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1670
                  " output: %s", result.cmd, result.fail_reason, logfile,
1671
                  result.output)
1672
    lines = [utils.SafeEncode(val)
1673
             for val in utils.TailFile(logfile, lines=20)]
1674
    _Fail("OS create script failed (%s), last lines in the"
1675
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1676

    
1677

    
1678
def RunRenameInstance(instance, old_name, debug):
1679
  """Run the OS rename script for an instance.
1680

1681
  @type instance: L{objects.Instance}
1682
  @param instance: Instance whose OS is to be installed
1683
  @type old_name: string
1684
  @param old_name: previous instance name
1685
  @type debug: integer
1686
  @param debug: debug level, passed to the OS scripts
1687
  @rtype: boolean
1688
  @return: the success of the operation
1689

1690
  """
1691
  inst_os = OSFromDisk(instance.os)
1692

    
1693
  rename_env = OSEnvironment(instance, inst_os, debug)
1694
  rename_env["OLD_INSTANCE_NAME"] = old_name
1695

    
1696
  logfile = _InstanceLogName("rename", instance.os,
1697
                             "%s-%s" % (old_name, instance.name), None)
1698

    
1699
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1700
                        cwd=inst_os.path, output=logfile, reset_env=True)
1701

    
1702
  if result.failed:
1703
    logging.error("os create command '%s' returned error: %s output: %s",
1704
                  result.cmd, result.fail_reason, result.output)
1705
    lines = [utils.SafeEncode(val)
1706
             for val in utils.TailFile(logfile, lines=20)]
1707
    _Fail("OS rename script failed (%s), last lines in the"
1708
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1709

    
1710

    
1711
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1712
  """Returns symlink path for block device.
1713

1714
  """
1715
  if _dir is None:
1716
    _dir = pathutils.DISK_LINKS_DIR
1717

    
1718
  return utils.PathJoin(_dir,
1719
                        ("%s%s%s" %
1720
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1721

    
1722

    
1723
def _SymlinkBlockDev(instance_name, device_path, idx):
1724
  """Set up symlinks to a instance's block device.
1725

1726
  This is an auxiliary function run when an instance is start (on the primary
1727
  node) or when an instance is migrated (on the target node).
1728

1729

1730
  @param instance_name: the name of the target instance
1731
  @param device_path: path of the physical block device, on the node
1732
  @param idx: the disk index
1733
  @return: absolute path to the disk's symlink
1734

1735
  """
1736
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1737
  try:
1738
    os.symlink(device_path, link_name)
1739
  except OSError, err:
1740
    if err.errno == errno.EEXIST:
1741
      if (not os.path.islink(link_name) or
1742
          os.readlink(link_name) != device_path):
1743
        os.remove(link_name)
1744
        os.symlink(device_path, link_name)
1745
    else:
1746
      raise
1747

    
1748
  return link_name
1749

    
1750

    
1751
def _RemoveBlockDevLinks(instance_name, disks):
1752
  """Remove the block device symlinks belonging to the given instance.
1753

1754
  """
1755
  for idx, _ in enumerate(disks):
1756
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1757
    if os.path.islink(link_name):
1758
      try:
1759
        os.remove(link_name)
1760
      except OSError:
1761
        logging.exception("Can't remove symlink '%s'", link_name)
1762

    
1763

    
1764
def _CalculateDeviceURI(instance, disk, device):
1765
  """Get the URI for the device.
1766

1767
  @type instance: L{objects.Instance}
1768
  @param instance: the instance which disk belongs to
1769
  @type disk: L{objects.Disk}
1770
  @param disk: the target disk object
1771
  @type device: L{bdev.BlockDev}
1772
  @param device: the corresponding BlockDevice
1773
  @rtype: string
1774
  @return: the device uri if any else None
1775

1776
  """
1777
  access_mode = disk.params.get(constants.LDP_ACCESS,
1778
                                constants.DISK_KERNELSPACE)
1779
  if access_mode == constants.DISK_USERSPACE:
1780
    # This can raise errors.BlockDeviceError
1781
    return device.GetUserspaceAccessUri(instance.hypervisor)
1782
  else:
1783
    return None
1784

    
1785

    
1786
def _GatherAndLinkBlockDevs(instance):
1787
  """Set up an instance's block device(s).
1788

1789
  This is run on the primary node at instance startup. The block
1790
  devices must be already assembled.
1791

1792
  @type instance: L{objects.Instance}
1793
  @param instance: the instance whose disks we should assemble
1794
  @rtype: list
1795
  @return: list of (disk_object, link_name, drive_uri)
1796

1797
  """
1798
  block_devices = []
1799
  for idx, disk in enumerate(instance.disks):
1800
    device = _RecursiveFindBD(disk)
1801
    if device is None:
1802
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1803
                                    str(disk))
1804
    device.Open()
1805
    try:
1806
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1807
    except OSError, e:
1808
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1809
                                    e.strerror)
1810
    uri = _CalculateDeviceURI(instance, disk, device)
1811

    
1812
    block_devices.append((disk, link_name, uri))
1813

    
1814
  return block_devices
1815

    
1816

    
1817
def StartInstance(instance, startup_paused, reason, store_reason=True):
1818
  """Start an instance.
1819

1820
  @type instance: L{objects.Instance}
1821
  @param instance: the instance object
1822
  @type startup_paused: bool
1823
  @param instance: pause instance at startup?
1824
  @type reason: list of reasons
1825
  @param reason: the reason trail for this startup
1826
  @type store_reason: boolean
1827
  @param store_reason: whether to store the shutdown reason trail on file
1828
  @rtype: None
1829

1830
  """
1831
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1832
                                                   instance.hvparams)
1833

    
1834
  if instance.name in running_instances:
1835
    logging.info("Instance %s already running, not starting", instance.name)
1836
    return
1837

    
1838
  try:
1839
    block_devices = _GatherAndLinkBlockDevs(instance)
1840
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1841
    hyper.StartInstance(instance, block_devices, startup_paused)
1842
    if store_reason:
1843
      _StoreInstReasonTrail(instance.name, reason)
1844
  except errors.BlockDeviceError, err:
1845
    _Fail("Block device error: %s", err, exc=True)
1846
  except errors.HypervisorError, err:
1847
    _RemoveBlockDevLinks(instance.name, instance.disks)
1848
    _Fail("Hypervisor error: %s", err, exc=True)
1849

    
1850

    
1851
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1852
  """Shut an instance down.
1853

1854
  @note: this functions uses polling with a hardcoded timeout.
1855

1856
  @type instance: L{objects.Instance}
1857
  @param instance: the instance object
1858
  @type timeout: integer
1859
  @param timeout: maximum timeout for soft shutdown
1860
  @type reason: list of reasons
1861
  @param reason: the reason trail for this shutdown
1862
  @type store_reason: boolean
1863
  @param store_reason: whether to store the shutdown reason trail on file
1864
  @rtype: None
1865

1866
  """
1867
  hv_name = instance.hypervisor
1868
  hyper = hypervisor.GetHypervisor(hv_name)
1869
  iname = instance.name
1870

    
1871
  if instance.name not in hyper.ListInstances(instance.hvparams):
1872
    logging.info("Instance %s not running, doing nothing", iname)
1873
    return
1874

    
1875
  class _TryShutdown:
1876
    def __init__(self):
1877
      self.tried_once = False
1878

    
1879
    def __call__(self):
1880
      if iname not in hyper.ListInstances(instance.hvparams):
1881
        return
1882

    
1883
      try:
1884
        hyper.StopInstance(instance, retry=self.tried_once)
1885
        if store_reason:
1886
          _StoreInstReasonTrail(instance.name, reason)
1887
      except errors.HypervisorError, err:
1888
        if iname not in hyper.ListInstances(instance.hvparams):
1889
          # if the instance is no longer existing, consider this a
1890
          # success and go to cleanup
1891
          return
1892

    
1893
        _Fail("Failed to stop instance %s: %s", iname, err)
1894

    
1895
      self.tried_once = True
1896

    
1897
      raise utils.RetryAgain()
1898

    
1899
  try:
1900
    utils.Retry(_TryShutdown(), 5, timeout)
1901
  except utils.RetryTimeout:
1902
    # the shutdown did not succeed
1903
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1904

    
1905
    try:
1906
      hyper.StopInstance(instance, force=True)
1907
    except errors.HypervisorError, err:
1908
      if iname in hyper.ListInstances(instance.hvparams):
1909
        # only raise an error if the instance still exists, otherwise
1910
        # the error could simply be "instance ... unknown"!
1911
        _Fail("Failed to force stop instance %s: %s", iname, err)
1912

    
1913
    time.sleep(1)
1914

    
1915
    if iname in hyper.ListInstances(instance.hvparams):
1916
      _Fail("Could not shutdown instance %s even by destroy", iname)
1917

    
1918
  try:
1919
    hyper.CleanupInstance(instance.name)
1920
  except errors.HypervisorError, err:
1921
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1922

    
1923
  _RemoveBlockDevLinks(iname, instance.disks)
1924

    
1925

    
1926
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1927
  """Reboot an instance.
1928

1929
  @type instance: L{objects.Instance}
1930
  @param instance: the instance object to reboot
1931
  @type reboot_type: str
1932
  @param reboot_type: the type of reboot, one the following
1933
    constants:
1934
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1935
        instance OS, do not recreate the VM
1936
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1937
        restart the VM (at the hypervisor level)
1938
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1939
        not accepted here, since that mode is handled differently, in
1940
        cmdlib, and translates into full stop and start of the
1941
        instance (instead of a call_instance_reboot RPC)
1942
  @type shutdown_timeout: integer
1943
  @param shutdown_timeout: maximum timeout for soft shutdown
1944
  @type reason: list of reasons
1945
  @param reason: the reason trail for this reboot
1946
  @rtype: None
1947

1948
  """
1949
  running_instances = GetInstanceListForHypervisor(instance.hypervisor,
1950
                                                   instance.hvparams)
1951

    
1952
  if instance.name not in running_instances:
1953
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1954

    
1955
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1956
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1957
    try:
1958
      hyper.RebootInstance(instance)
1959
    except errors.HypervisorError, err:
1960
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1961
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1962
    try:
1963
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1964
      result = StartInstance(instance, False, reason, store_reason=False)
1965
      _StoreInstReasonTrail(instance.name, reason)
1966
      return result
1967
    except errors.HypervisorError, err:
1968
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1969
  else:
1970
    _Fail("Invalid reboot_type received: %s", reboot_type)
1971

    
1972

    
1973
def InstanceBalloonMemory(instance, memory):
1974
  """Resize an instance's memory.
1975

1976
  @type instance: L{objects.Instance}
1977
  @param instance: the instance object
1978
  @type memory: int
1979
  @param memory: new memory amount in MB
1980
  @rtype: None
1981

1982
  """
1983
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1984
  running = hyper.ListInstances(instance.hvparams)
1985
  if instance.name not in running:
1986
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1987
    return
1988
  try:
1989
    hyper.BalloonInstanceMemory(instance, memory)
1990
  except errors.HypervisorError, err:
1991
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1992

    
1993

    
1994
def MigrationInfo(instance):
1995
  """Gather information about an instance to be migrated.
1996

1997
  @type instance: L{objects.Instance}
1998
  @param instance: the instance definition
1999

2000
  """
2001
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2002
  try:
2003
    info = hyper.MigrationInfo(instance)
2004
  except errors.HypervisorError, err:
2005
    _Fail("Failed to fetch migration information: %s", err, exc=True)
2006
  return info
2007

    
2008

    
2009
def AcceptInstance(instance, info, target):
2010
  """Prepare the node to accept an instance.
2011

2012
  @type instance: L{objects.Instance}
2013
  @param instance: the instance definition
2014
  @type info: string/data (opaque)
2015
  @param info: migration information, from the source node
2016
  @type target: string
2017
  @param target: target host (usually ip), on this node
2018

2019
  """
2020
  # TODO: why is this required only for DTS_EXT_MIRROR?
2021
  if instance.disk_template in constants.DTS_EXT_MIRROR:
2022
    # Create the symlinks, as the disks are not active
2023
    # in any way
2024
    try:
2025
      _GatherAndLinkBlockDevs(instance)
2026
    except errors.BlockDeviceError, err:
2027
      _Fail("Block device error: %s", err, exc=True)
2028

    
2029
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2030
  try:
2031
    hyper.AcceptInstance(instance, info, target)
2032
  except errors.HypervisorError, err:
2033
    if instance.disk_template in constants.DTS_EXT_MIRROR:
2034
      _RemoveBlockDevLinks(instance.name, instance.disks)
2035
    _Fail("Failed to accept instance: %s", err, exc=True)
2036

    
2037

    
2038
def FinalizeMigrationDst(instance, info, success):
2039
  """Finalize any preparation to accept an instance.
2040

2041
  @type instance: L{objects.Instance}
2042
  @param instance: the instance definition
2043
  @type info: string/data (opaque)
2044
  @param info: migration information, from the source node
2045
  @type success: boolean
2046
  @param success: whether the migration was a success or a failure
2047

2048
  """
2049
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2050
  try:
2051
    hyper.FinalizeMigrationDst(instance, info, success)
2052
  except errors.HypervisorError, err:
2053
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2054

    
2055

    
2056
def MigrateInstance(cluster_name, instance, target, live):
2057
  """Migrates an instance to another node.
2058

2059
  @type cluster_name: string
2060
  @param cluster_name: name of the cluster
2061
  @type instance: L{objects.Instance}
2062
  @param instance: the instance definition
2063
  @type target: string
2064
  @param target: the target node name
2065
  @type live: boolean
2066
  @param live: whether the migration should be done live or not (the
2067
      interpretation of this parameter is left to the hypervisor)
2068
  @raise RPCFail: if migration fails for some reason
2069

2070
  """
2071
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2072

    
2073
  try:
2074
    hyper.MigrateInstance(cluster_name, instance, target, live)
2075
  except errors.HypervisorError, err:
2076
    _Fail("Failed to migrate instance: %s", err, exc=True)
2077

    
2078

    
2079
def FinalizeMigrationSource(instance, success, live):
2080
  """Finalize the instance migration on the source node.
2081

2082
  @type instance: L{objects.Instance}
2083
  @param instance: the instance definition of the migrated instance
2084
  @type success: bool
2085
  @param success: whether the migration succeeded or not
2086
  @type live: bool
2087
  @param live: whether the user requested a live migration or not
2088
  @raise RPCFail: If the execution fails for some reason
2089

2090
  """
2091
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2092

    
2093
  try:
2094
    hyper.FinalizeMigrationSource(instance, success, live)
2095
  except Exception, err:  # pylint: disable=W0703
2096
    _Fail("Failed to finalize the migration on the source node: %s", err,
2097
          exc=True)
2098

    
2099

    
2100
def GetMigrationStatus(instance):
2101
  """Get the migration status
2102

2103
  @type instance: L{objects.Instance}
2104
  @param instance: the instance that is being migrated
2105
  @rtype: L{objects.MigrationStatus}
2106
  @return: the status of the current migration (one of
2107
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2108
           progress info that can be retrieved from the hypervisor
2109
  @raise RPCFail: If the migration status cannot be retrieved
2110

2111
  """
2112
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2113
  try:
2114
    return hyper.GetMigrationStatus(instance)
2115
  except Exception, err:  # pylint: disable=W0703
2116
    _Fail("Failed to get migration status: %s", err, exc=True)
2117

    
2118

    
2119
def HotplugDevice(instance, action, dev_type, device, extra, seq):
2120
  """Hotplug a device
2121

2122
  Hotplug is currently supported only for KVM Hypervisor.
2123
  @type instance: L{objects.Instance}
2124
  @param instance: the instance to which we hotplug a device
2125
  @type action: string
2126
  @param action: the hotplug action to perform
2127
  @type dev_type: string
2128
  @param dev_type: the device type to hotplug
2129
  @type device: either L{objects.NIC} or L{objects.Disk}
2130
  @param device: the device object to hotplug
2131
  @type extra: string
2132
  @param extra: extra info used by hotplug code (e.g. disk link)
2133
  @type seq: int
2134
  @param seq: the index of the device from master perspective
2135
  @raise RPCFail: in case instance does not have KVM hypervisor
2136

2137
  """
2138
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2139
  try:
2140
    hyper.VerifyHotplugSupport(instance, action, dev_type)
2141
  except errors.HotplugError, err:
2142
    _Fail("Hotplug is not supported: %s", err)
2143

    
2144
  if action == constants.HOTPLUG_ACTION_ADD:
2145
    fn = hyper.HotAddDevice
2146
  elif action == constants.HOTPLUG_ACTION_REMOVE:
2147
    fn = hyper.HotDelDevice
2148
  elif action == constants.HOTPLUG_ACTION_MODIFY:
2149
    fn = hyper.HotModDevice
2150
  else:
2151
    assert action in constants.HOTPLUG_ALL_ACTIONS
2152

    
2153
  return fn(instance, dev_type, device, extra, seq)
2154

    
2155

    
2156
def HotplugSupported(instance):
2157
  """Checks if hotplug is generally supported.
2158

2159
  """
2160
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
2161
  try:
2162
    hyper.HotplugSupported(instance)
2163
  except errors.HotplugError, err:
2164
    _Fail("Hotplug is not supported: %s", err)
2165

    
2166

    
2167
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
2168
  """Creates a block device for an instance.
2169

2170
  @type disk: L{objects.Disk}
2171
  @param disk: the object describing the disk we should create
2172
  @type size: int
2173
  @param size: the size of the physical underlying device, in MiB
2174
  @type owner: str
2175
  @param owner: the name of the instance for which disk is created,
2176
      used for device cache data
2177
  @type on_primary: boolean
2178
  @param on_primary:  indicates if it is the primary node or not
2179
  @type info: string
2180
  @param info: string that will be sent to the physical device
2181
      creation, used for example to set (LVM) tags on LVs
2182
  @type excl_stor: boolean
2183
  @param excl_stor: Whether exclusive_storage is active
2184

2185
  @return: the new unique_id of the device (this can sometime be
2186
      computed only after creation), or None. On secondary nodes,
2187
      it's not required to return anything.
2188

2189
  """
2190
  # TODO: remove the obsolete "size" argument
2191
  # pylint: disable=W0613
2192
  clist = []
2193
  if disk.children:
2194
    for child in disk.children:
2195
      try:
2196
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
2197
      except errors.BlockDeviceError, err:
2198
        _Fail("Can't assemble device %s: %s", child, err)
2199
      if on_primary or disk.AssembleOnSecondary():
2200
        # we need the children open in case the device itself has to
2201
        # be assembled
2202
        try:
2203
          # pylint: disable=E1103
2204
          crdev.Open()
2205
        except errors.BlockDeviceError, err:
2206
          _Fail("Can't make child '%s' read-write: %s", child, err)
2207
      clist.append(crdev)
2208

    
2209
  try:
2210
    device = bdev.Create(disk, clist, excl_stor)
2211
  except errors.BlockDeviceError, err:
2212
    _Fail("Can't create block device: %s", err)
2213

    
2214
  if on_primary or disk.AssembleOnSecondary():
2215
    try:
2216
      device.Assemble()
2217
    except errors.BlockDeviceError, err:
2218
      _Fail("Can't assemble device after creation, unusual event: %s", err)
2219
    if on_primary or disk.OpenOnSecondary():
2220
      try:
2221
        device.Open(force=True)
2222
      except errors.BlockDeviceError, err:
2223
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
2224
    DevCacheManager.UpdateCache(device.dev_path, owner,
2225
                                on_primary, disk.iv_name)
2226

    
2227
  device.SetInfo(info)
2228

    
2229
  return device.unique_id
2230

    
2231

    
2232
def _WipeDevice(path, offset, size):
2233
  """This function actually wipes the device.
2234

2235
  @param path: The path to the device to wipe
2236
  @param offset: The offset in MiB in the file
2237
  @param size: The size in MiB to write
2238

2239
  """
2240
  # Internal sizes are always in Mebibytes; if the following "dd" command
2241
  # should use a different block size the offset and size given to this
2242
  # function must be adjusted accordingly before being passed to "dd".
2243
  block_size = 1024 * 1024
2244

    
2245
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
2246
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
2247
         "count=%d" % size]
2248
  result = utils.RunCmd(cmd)
2249

    
2250
  if result.failed:
2251
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
2252
          result.fail_reason, result.output)
2253

    
2254

    
2255
def BlockdevWipe(disk, offset, size):
2256
  """Wipes a block device.
2257

2258
  @type disk: L{objects.Disk}
2259
  @param disk: the disk object we want to wipe
2260
  @type offset: int
2261
  @param offset: The offset in MiB in the file
2262
  @type size: int
2263
  @param size: The size in MiB to write
2264

2265
  """
2266
  try:
2267
    rdev = _RecursiveFindBD(disk)
2268
  except errors.BlockDeviceError:
2269
    rdev = None
2270

    
2271
  if not rdev:
2272
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
2273

    
2274
  # Do cross verify some of the parameters
2275
  if offset < 0:
2276
    _Fail("Negative offset")
2277
  if size < 0:
2278
    _Fail("Negative size")
2279
  if offset > rdev.size:
2280
    _Fail("Offset is bigger than device size")
2281
  if (offset + size) > rdev.size:
2282
    _Fail("The provided offset and size to wipe is bigger than device size")
2283

    
2284
  _WipeDevice(rdev.dev_path, offset, size)
2285

    
2286

    
2287
def BlockdevPauseResumeSync(disks, pause):
2288
  """Pause or resume the sync of the block device.
2289

2290
  @type disks: list of L{objects.Disk}
2291
  @param disks: the disks object we want to pause/resume
2292
  @type pause: bool
2293
  @param pause: Wheater to pause or resume
2294

2295
  """
2296
  success = []
2297
  for disk in disks:
2298
    try:
2299
      rdev = _RecursiveFindBD(disk)
2300
    except errors.BlockDeviceError:
2301
      rdev = None
2302

    
2303
    if not rdev:
2304
      success.append((False, ("Cannot change sync for device %s:"
2305
                              " device not found" % disk.iv_name)))
2306
      continue
2307

    
2308
    result = rdev.PauseResumeSync(pause)
2309

    
2310
    if result:
2311
      success.append((result, None))
2312
    else:
2313
      if pause:
2314
        msg = "Pause"
2315
      else:
2316
        msg = "Resume"
2317
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
2318

    
2319
  return success
2320

    
2321

    
2322
def BlockdevRemove(disk):
2323
  """Remove a block device.
2324

2325
  @note: This is intended to be called recursively.
2326

2327
  @type disk: L{objects.Disk}
2328
  @param disk: the disk object we should remove
2329
  @rtype: boolean
2330
  @return: the success of the operation
2331

2332
  """
2333
  msgs = []
2334
  try:
2335
    rdev = _RecursiveFindBD(disk)
2336
  except errors.BlockDeviceError, err:
2337
    # probably can't attach
2338
    logging.info("Can't attach to device %s in remove", disk)
2339
    rdev = None
2340
  if rdev is not None:
2341
    r_path = rdev.dev_path
2342

    
2343
    def _TryRemove():
2344
      try:
2345
        rdev.Remove()
2346
        return []
2347
      except errors.BlockDeviceError, err:
2348
        return [str(err)]
2349

    
2350
    msgs.extend(utils.SimpleRetry([], _TryRemove,
2351
                                  constants.DISK_REMOVE_RETRY_INTERVAL,
2352
                                  constants.DISK_REMOVE_RETRY_TIMEOUT))
2353

    
2354
    if not msgs:
2355
      DevCacheManager.RemoveCache(r_path)
2356

    
2357
  if disk.children:
2358
    for child in disk.children:
2359
      try:
2360
        BlockdevRemove(child)
2361
      except RPCFail, err:
2362
        msgs.append(str(err))
2363

    
2364
  if msgs:
2365
    _Fail("; ".join(msgs))
2366

    
2367

    
2368
def _RecursiveAssembleBD(disk, owner, as_primary):
2369
  """Activate a block device for an instance.
2370

2371
  This is run on the primary and secondary nodes for an instance.
2372

2373
  @note: this function is called recursively.
2374

2375
  @type disk: L{objects.Disk}
2376
  @param disk: the disk we try to assemble
2377
  @type owner: str
2378
  @param owner: the name of the instance which owns the disk
2379
  @type as_primary: boolean
2380
  @param as_primary: if we should make the block device
2381
      read/write
2382

2383
  @return: the assembled device or None (in case no device
2384
      was assembled)
2385
  @raise errors.BlockDeviceError: in case there is an error
2386
      during the activation of the children or the device
2387
      itself
2388

2389
  """
2390
  children = []
2391
  if disk.children:
2392
    mcn = disk.ChildrenNeeded()
2393
    if mcn == -1:
2394
      mcn = 0 # max number of Nones allowed
2395
    else:
2396
      mcn = len(disk.children) - mcn # max number of Nones
2397
    for chld_disk in disk.children:
2398
      try:
2399
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
2400
      except errors.BlockDeviceError, err:
2401
        if children.count(None) >= mcn:
2402
          raise
2403
        cdev = None
2404
        logging.error("Error in child activation (but continuing): %s",
2405
                      str(err))
2406
      children.append(cdev)
2407

    
2408
  if as_primary or disk.AssembleOnSecondary():
2409
    r_dev = bdev.Assemble(disk, children)
2410
    result = r_dev
2411
    if as_primary or disk.OpenOnSecondary():
2412
      r_dev.Open()
2413
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
2414
                                as_primary, disk.iv_name)
2415

    
2416
  else:
2417
    result = True
2418
  return result
2419

    
2420

    
2421
def BlockdevAssemble(disk, owner, as_primary, idx):
2422
  """Activate a block device for an instance.
2423

2424
  This is a wrapper over _RecursiveAssembleBD.
2425

2426
  @rtype: str or boolean
2427
  @return: a tuple with the C{/dev/...} path and the created symlink
2428
      for primary nodes, and (C{True}, C{True}) for secondary nodes
2429

2430
  """
2431
  try:
2432
    result = _RecursiveAssembleBD(disk, owner, as_primary)
2433
    if isinstance(result, BlockDev):
2434
      # pylint: disable=E1103
2435
      dev_path = result.dev_path
2436
      link_name = None
2437
      if as_primary:
2438
        link_name = _SymlinkBlockDev(owner, dev_path, idx)
2439
    elif result:
2440
      return result, result
2441
    else:
2442
      _Fail("Unexpected result from _RecursiveAssembleBD")
2443
  except errors.BlockDeviceError, err:
2444
    _Fail("Error while assembling disk: %s", err, exc=True)
2445
  except OSError, err:
2446
    _Fail("Error while symlinking disk: %s", err, exc=True)
2447

    
2448
  return dev_path, link_name
2449

    
2450

    
2451
def BlockdevShutdown(disk):
2452
  """Shut down a block device.
2453

2454
  First, if the device is assembled (Attach() is successful), then
2455
  the device is shutdown. Then the children of the device are
2456
  shutdown.
2457

2458
  This function is called recursively. Note that we don't cache the
2459
  children or such, as oppossed to assemble, shutdown of different
2460
  devices doesn't require that the upper device was active.
2461

2462
  @type disk: L{objects.Disk}
2463
  @param disk: the description of the disk we should
2464
      shutdown
2465
  @rtype: None
2466

2467
  """
2468
  msgs = []
2469
  r_dev = _RecursiveFindBD(disk)
2470
  if r_dev is not None:
2471
    r_path = r_dev.dev_path
2472
    try:
2473
      r_dev.Shutdown()
2474
      DevCacheManager.RemoveCache(r_path)
2475
    except errors.BlockDeviceError, err:
2476
      msgs.append(str(err))
2477

    
2478
  if disk.children:
2479
    for child in disk.children:
2480
      try:
2481
        BlockdevShutdown(child)
2482
      except RPCFail, err:
2483
        msgs.append(str(err))
2484

    
2485
  if msgs:
2486
    _Fail("; ".join(msgs))
2487

    
2488

    
2489
def BlockdevAddchildren(parent_cdev, new_cdevs):
2490
  """Extend a mirrored block device.
2491

2492
  @type parent_cdev: L{objects.Disk}
2493
  @param parent_cdev: the disk to which we should add children
2494
  @type new_cdevs: list of L{objects.Disk}
2495
  @param new_cdevs: the list of children which we should add
2496
  @rtype: None
2497

2498
  """
2499
  parent_bdev = _RecursiveFindBD(parent_cdev)
2500
  if parent_bdev is None:
2501
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
2502
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
2503
  if new_bdevs.count(None) > 0:
2504
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
2505
  parent_bdev.AddChildren(new_bdevs)
2506

    
2507

    
2508
def BlockdevRemovechildren(parent_cdev, new_cdevs):
2509
  """Shrink a mirrored block device.
2510

2511
  @type parent_cdev: L{objects.Disk}
2512
  @param parent_cdev: the disk from which we should remove children
2513
  @type new_cdevs: list of L{objects.Disk}
2514
  @param new_cdevs: the list of children which we should remove
2515
  @rtype: None
2516

2517
  """
2518
  parent_bdev = _RecursiveFindBD(parent_cdev)
2519
  if parent_bdev is None:
2520
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
2521
  devs = []
2522
  for disk in new_cdevs:
2523
    rpath = disk.StaticDevPath()
2524
    if rpath is None:
2525
      bd = _RecursiveFindBD(disk)
2526
      if bd is None:
2527
        _Fail("Can't find device %s while removing children", disk)
2528
      else:
2529
        devs.append(bd.dev_path)
2530
    else:
2531
      if not utils.IsNormAbsPath(rpath):
2532
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
2533
      devs.append(rpath)
2534
  parent_bdev.RemoveChildren(devs)
2535

    
2536

    
2537
def BlockdevGetmirrorstatus(disks):
2538
  """Get the mirroring status of a list of devices.
2539

2540
  @type disks: list of L{objects.Disk}
2541
  @param disks: the list of disks which we should query
2542
  @rtype: disk
2543
  @return: List of L{objects.BlockDevStatus}, one for each disk
2544
  @raise errors.BlockDeviceError: if any of the disks cannot be
2545
      found
2546

2547
  """
2548
  stats = []
2549
  for dsk in disks:
2550
    rbd = _RecursiveFindBD(dsk)
2551
    if rbd is None:
2552
      _Fail("Can't find device %s", dsk)
2553

    
2554
    stats.append(rbd.CombinedSyncStatus())
2555

    
2556
  return stats
2557

    
2558

    
2559
def BlockdevGetmirrorstatusMulti(disks):
2560
  """Get the mirroring status of a list of devices.
2561

2562
  @type disks: list of L{objects.Disk}
2563
  @param disks: the list of disks which we should query
2564
  @rtype: disk
2565
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2566
    success/failure, status is L{objects.BlockDevStatus} on success, string
2567
    otherwise
2568

2569
  """
2570
  result = []
2571
  for disk in disks:
2572
    try:
2573
      rbd = _RecursiveFindBD(disk)
2574
      if rbd is None:
2575
        result.append((False, "Can't find device %s" % disk))
2576
        continue
2577

    
2578
      status = rbd.CombinedSyncStatus()
2579
    except errors.BlockDeviceError, err:
2580
      logging.exception("Error while getting disk status")
2581
      result.append((False, str(err)))
2582
    else:
2583
      result.append((True, status))
2584

    
2585
  assert len(disks) == len(result)
2586

    
2587
  return result
2588

    
2589

    
2590
def _RecursiveFindBD(disk):
2591
  """Check if a device is activated.
2592

2593
  If so, return information about the real device.
2594

2595
  @type disk: L{objects.Disk}
2596
  @param disk: the disk object we need to find
2597

2598
  @return: None if the device can't be found,
2599
      otherwise the device instance
2600

2601
  """
2602
  children = []
2603
  if disk.children:
2604
    for chdisk in disk.children:
2605
      children.append(_RecursiveFindBD(chdisk))
2606

    
2607
  return bdev.FindDevice(disk, children)
2608

    
2609

    
2610
def _OpenRealBD(disk):
2611
  """Opens the underlying block device of a disk.
2612

2613
  @type disk: L{objects.Disk}
2614
  @param disk: the disk object we want to open
2615

2616
  """
2617
  real_disk = _RecursiveFindBD(disk)
2618
  if real_disk is None:
2619
    _Fail("Block device '%s' is not set up", disk)
2620

    
2621
  real_disk.Open()
2622

    
2623
  return real_disk
2624

    
2625

    
2626
def BlockdevFind(disk):
2627
  """Check if a device is activated.
2628

2629
  If it is, return information about the real device.
2630

2631
  @type disk: L{objects.Disk}
2632
  @param disk: the disk to find
2633
  @rtype: None or objects.BlockDevStatus
2634
  @return: None if the disk cannot be found, otherwise a the current
2635
           information
2636

2637
  """
2638
  try:
2639
    rbd = _RecursiveFindBD(disk)
2640
  except errors.BlockDeviceError, err:
2641
    _Fail("Failed to find device: %s", err, exc=True)
2642

    
2643
  if rbd is None:
2644
    return None
2645

    
2646
  return rbd.GetSyncStatus()
2647

    
2648

    
2649
def BlockdevGetdimensions(disks):
2650
  """Computes the size of the given disks.
2651

2652
  If a disk is not found, returns None instead.
2653

2654
  @type disks: list of L{objects.Disk}
2655
  @param disks: the list of disk to compute the size for
2656
  @rtype: list
2657
  @return: list with elements None if the disk cannot be found,
2658
      otherwise the pair (size, spindles), where spindles is None if the
2659
      device doesn't support that
2660

2661
  """
2662
  result = []
2663
  for cf in disks:
2664
    try:
2665
      rbd = _RecursiveFindBD(cf)
2666
    except errors.BlockDeviceError:
2667
      result.append(None)
2668
      continue
2669
    if rbd is None:
2670
      result.append(None)
2671
    else:
2672
      result.append(rbd.GetActualDimensions())
2673
  return result
2674

    
2675

    
2676
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2677
  """Write a file to the filesystem.
2678

2679
  This allows the master to overwrite(!) a file. It will only perform
2680
  the operation if the file belongs to a list of configuration files.
2681

2682
  @type file_name: str
2683
  @param file_name: the target file name
2684
  @type data: str
2685
  @param data: the new contents of the file
2686
  @type mode: int
2687
  @param mode: the mode to give the file (can be None)
2688
  @type uid: string
2689
  @param uid: the owner of the file
2690
  @type gid: string
2691
  @param gid: the group of the file
2692
  @type atime: float
2693
  @param atime: the atime to set on the file (can be None)
2694
  @type mtime: float
2695
  @param mtime: the mtime to set on the file (can be None)
2696
  @rtype: None
2697

2698
  """
2699
  file_name = vcluster.LocalizeVirtualPath(file_name)
2700

    
2701
  if not os.path.isabs(file_name):
2702
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2703

    
2704
  if file_name not in _ALLOWED_UPLOAD_FILES:
2705
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2706
          file_name)
2707

    
2708
  raw_data = _Decompress(data)
2709

    
2710
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2711
    _Fail("Invalid username/groupname type")
2712

    
2713
  getents = runtime.GetEnts()
2714
  uid = getents.LookupUser(uid)
2715
  gid = getents.LookupGroup(gid)
2716

    
2717
  utils.SafeWriteFile(file_name, None,
2718
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2719
                      atime=atime, mtime=mtime)
2720

    
2721

    
2722
def RunOob(oob_program, command, node, timeout):
2723
  """Executes oob_program with given command on given node.
2724

2725
  @param oob_program: The path to the executable oob_program
2726
  @param command: The command to invoke on oob_program
2727
  @param node: The node given as an argument to the program
2728
  @param timeout: Timeout after which we kill the oob program
2729

2730
  @return: stdout
2731
  @raise RPCFail: If execution fails for some reason
2732

2733
  """
2734
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2735

    
2736
  if result.failed:
2737
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2738
          result.fail_reason, result.output)
2739

    
2740
  return result.stdout
2741

    
2742

    
2743
def _OSOndiskAPIVersion(os_dir):
2744
  """Compute and return the API version of a given OS.
2745

2746
  This function will try to read the API version of the OS residing in
2747
  the 'os_dir' directory.
2748

2749
  @type os_dir: str
2750
  @param os_dir: the directory in which we should look for the OS
2751
  @rtype: tuple
2752
  @return: tuple (status, data) with status denoting the validity and
2753
      data holding either the vaid versions or an error message
2754

2755
  """
2756
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2757

    
2758
  try:
2759
    st = os.stat(api_file)
2760
  except EnvironmentError, err:
2761
    return False, ("Required file '%s' not found under path %s: %s" %
2762
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2763

    
2764
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2765
    return False, ("File '%s' in %s is not a regular file" %
2766
                   (constants.OS_API_FILE, os_dir))
2767

    
2768
  try:
2769
    api_versions = utils.ReadFile(api_file).splitlines()
2770
  except EnvironmentError, err:
2771
    return False, ("Error while reading the API version file at %s: %s" %
2772
                   (api_file, utils.ErrnoOrStr(err)))
2773

    
2774
  try:
2775
    api_versions = [int(version.strip()) for version in api_versions]
2776
  except (TypeError, ValueError), err:
2777
    return False, ("API version(s) can't be converted to integer: %s" %
2778
                   str(err))
2779

    
2780
  return True, api_versions
2781

    
2782

    
2783
def DiagnoseOS(top_dirs=None):
2784
  """Compute the validity for all OSes.
2785

2786
  @type top_dirs: list
2787
  @param top_dirs: the list of directories in which to
2788
      search (if not given defaults to
2789
      L{pathutils.OS_SEARCH_PATH})
2790
  @rtype: list of L{objects.OS}
2791
  @return: a list of tuples (name, path, status, diagnose, variants,
2792
      parameters, api_version) for all (potential) OSes under all
2793
      search paths, where:
2794
          - name is the (potential) OS name
2795
          - path is the full path to the OS
2796
          - status True/False is the validity of the OS
2797
          - diagnose is the error message for an invalid OS, otherwise empty
2798
          - variants is a list of supported OS variants, if any
2799
          - parameters is a list of (name, help) parameters, if any
2800
          - api_version is a list of support OS API versions
2801

2802
  """
2803
  if top_dirs is None:
2804
    top_dirs = pathutils.OS_SEARCH_PATH
2805

    
2806
  result = []
2807
  for dir_name in top_dirs:
2808
    if os.path.isdir(dir_name):
2809
      try:
2810
        f_names = utils.ListVisibleFiles(dir_name)
2811
      except EnvironmentError, err:
2812
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2813
        break
2814
      for name in f_names:
2815
        os_path = utils.PathJoin(dir_name, name)
2816
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2817
        if status:
2818
          diagnose = ""
2819
          variants = os_inst.supported_variants
2820
          parameters = os_inst.supported_parameters
2821
          api_versions = os_inst.api_versions
2822
        else:
2823
          diagnose = os_inst
2824
          variants = parameters = api_versions = []
2825
        result.append((name, os_path, status, diagnose, variants,
2826
                       parameters, api_versions))
2827

    
2828
  return result
2829

    
2830

    
2831
def _TryOSFromDisk(name, base_dir=None):
2832
  """Create an OS instance from disk.
2833

2834
  This function will return an OS instance if the given name is a
2835
  valid OS name.
2836

2837
  @type base_dir: string
2838
  @keyword base_dir: Base directory containing OS installations.
2839
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2840
  @rtype: tuple
2841
  @return: success and either the OS instance if we find a valid one,
2842
      or error message
2843

2844
  """
2845
  if base_dir is None:
2846
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2847
  else:
2848
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2849

    
2850
  if os_dir is None:
2851
    return False, "Directory for OS %s not found in search path" % name
2852

    
2853
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2854
  if not status:
2855
    # push the error up
2856
    return status, api_versions
2857

    
2858
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2859
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2860
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2861

    
2862
  # OS Files dictionary, we will populate it with the absolute path
2863
  # names; if the value is True, then it is a required file, otherwise
2864
  # an optional one
2865
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2866

    
2867
  if max(api_versions) >= constants.OS_API_V15:
2868
    os_files[constants.OS_VARIANTS_FILE] = False
2869

    
2870
  if max(api_versions) >= constants.OS_API_V20:
2871
    os_files[constants.OS_PARAMETERS_FILE] = True
2872
  else:
2873
    del os_files[constants.OS_SCRIPT_VERIFY]
2874

    
2875
  for (filename, required) in os_files.items():
2876
    os_files[filename] = utils.PathJoin(os_dir, filename)
2877

    
2878
    try:
2879
      st = os.stat(os_files[filename])
2880
    except EnvironmentError, err:
2881
      if err.errno == errno.ENOENT and not required:
2882
        del os_files[filename]
2883
        continue
2884
      return False, ("File '%s' under path '%s' is missing (%s)" %
2885
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2886

    
2887
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2888
      return False, ("File '%s' under path '%s' is not a regular file" %
2889
                     (filename, os_dir))
2890

    
2891
    if filename in constants.OS_SCRIPTS:
2892
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2893
        return False, ("File '%s' under path '%s' is not executable" %
2894
                       (filename, os_dir))
2895

    
2896
  variants = []
2897
  if constants.OS_VARIANTS_FILE in os_files:
2898
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2899
    try:
2900
      variants = \
2901
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2902
    except EnvironmentError, err:
2903
      # we accept missing files, but not other errors
2904
      if err.errno != errno.ENOENT:
2905
        return False, ("Error while reading the OS variants file at %s: %s" %
2906
                       (variants_file, utils.ErrnoOrStr(err)))
2907

    
2908
  parameters = []
2909
  if constants.OS_PARAMETERS_FILE in os_files:
2910
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2911
    try:
2912
      parameters = utils.ReadFile(parameters_file).splitlines()
2913
    except EnvironmentError, err:
2914
      return False, ("Error while reading the OS parameters file at %s: %s" %
2915
                     (parameters_file, utils.ErrnoOrStr(err)))
2916
    parameters = [v.split(None, 1) for v in parameters]
2917

    
2918
  os_obj = objects.OS(name=name, path=os_dir,
2919
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2920
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2921
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2922
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2923
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2924
                                                 None),
2925
                      supported_variants=variants,
2926
                      supported_parameters=parameters,
2927
                      api_versions=api_versions)
2928
  return True, os_obj
2929

    
2930

    
2931
def OSFromDisk(name, base_dir=None):
2932
  """Create an OS instance from disk.
2933

2934
  This function will return an OS instance if the given name is a
2935
  valid OS name. Otherwise, it will raise an appropriate
2936
  L{RPCFail} exception, detailing why this is not a valid OS.
2937

2938
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2939
  an exception but returns true/false status data.
2940

2941
  @type base_dir: string
2942
  @keyword base_dir: Base directory containing OS installations.
2943
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2944
  @rtype: L{objects.OS}
2945
  @return: the OS instance if we find a valid one
2946
  @raise RPCFail: if we don't find a valid OS
2947

2948
  """
2949
  name_only = objects.OS.GetName(name)
2950
  status, payload = _TryOSFromDisk(name_only, base_dir)
2951

    
2952
  if not status:
2953
    _Fail(payload)
2954

    
2955
  return payload
2956

    
2957

    
2958
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2959
  """Calculate the basic environment for an os script.
2960

2961
  @type os_name: str
2962
  @param os_name: full operating system name (including variant)
2963
  @type inst_os: L{objects.OS}
2964
  @param inst_os: operating system for which the environment is being built
2965
  @type os_params: dict
2966
  @param os_params: the OS parameters
2967
  @type debug: integer
2968
  @param debug: debug level (0 or 1, for OS Api 10)
2969
  @rtype: dict
2970
  @return: dict of environment variables
2971
  @raise errors.BlockDeviceError: if the block device
2972
      cannot be found
2973

2974
  """
2975
  result = {}
2976
  api_version = \
2977
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2978
  result["OS_API_VERSION"] = "%d" % api_version
2979
  result["OS_NAME"] = inst_os.name
2980
  result["DEBUG_LEVEL"] = "%d" % debug
2981

    
2982
  # OS variants
2983
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2984
    variant = objects.OS.GetVariant(os_name)
2985
    if not variant:
2986
      variant = inst_os.supported_variants[0]
2987
  else:
2988
    variant = ""
2989
  result["OS_VARIANT"] = variant
2990

    
2991
  # OS params
2992
  for pname, pvalue in os_params.items():
2993
    result["OSP_%s" % pname.upper()] = pvalue
2994

    
2995
  # Set a default path otherwise programs called by OS scripts (or
2996
  # even hooks called from OS scripts) might break, and we don't want
2997
  # to have each script require setting a PATH variable
2998
  result["PATH"] = constants.HOOKS_PATH
2999

    
3000
  return result
3001

    
3002

    
3003
def OSEnvironment(instance, inst_os, debug=0):
3004
  """Calculate the environment for an os script.
3005

3006
  @type instance: L{objects.Instance}
3007
  @param instance: target instance for the os script run
3008
  @type inst_os: L{objects.OS}
3009
  @param inst_os: operating system for which the environment is being built
3010
  @type debug: integer
3011
  @param debug: debug level (0 or 1, for OS Api 10)
3012
  @rtype: dict
3013
  @return: dict of environment variables
3014
  @raise errors.BlockDeviceError: if the block device
3015
      cannot be found
3016

3017
  """
3018
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
3019

    
3020
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
3021
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
3022

    
3023
  result["HYPERVISOR"] = instance.hypervisor
3024
  result["DISK_COUNT"] = "%d" % len(instance.disks)
3025
  result["NIC_COUNT"] = "%d" % len(instance.nics)
3026
  result["INSTANCE_SECONDARY_NODES"] = \
3027
      ("%s" % " ".join(instance.secondary_nodes))
3028

    
3029
  # Disks
3030
  for idx, disk in enumerate(instance.disks):
3031
    real_disk = _OpenRealBD(disk)
3032
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
3033
    result["DISK_%d_ACCESS" % idx] = disk.mode
3034
    result["DISK_%d_UUID" % idx] = disk.uuid
3035
    if disk.name:
3036
      result["DISK_%d_NAME" % idx] = disk.name
3037
    if constants.HV_DISK_TYPE in instance.hvparams:
3038
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
3039
        instance.hvparams[constants.HV_DISK_TYPE]
3040
    if disk.dev_type in constants.DTS_BLOCK:
3041
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
3042
    elif disk.dev_type in constants.DTS_FILEBASED:
3043
      result["DISK_%d_BACKEND_TYPE" % idx] = \
3044
        "file:%s" % disk.logical_id[0]
3045

    
3046
  # NICs
3047
  for idx, nic in enumerate(instance.nics):
3048
    result["NIC_%d_MAC" % idx] = nic.mac
3049
    result["NIC_%d_UUID" % idx] = nic.uuid
3050
    if nic.name:
3051
      result["NIC_%d_NAME" % idx] = nic.name
3052
    if nic.ip:
3053
      result["NIC_%d_IP" % idx] = nic.ip
3054
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
3055
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3056
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
3057
    if nic.nicparams[constants.NIC_LINK]:
3058
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
3059
    if nic.netinfo:
3060
      nobj = objects.Network.FromDict(nic.netinfo)
3061
      result.update(nobj.HooksDict("NIC_%d_" % idx))
3062
    if constants.HV_NIC_TYPE in instance.hvparams:
3063
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
3064
        instance.hvparams[constants.HV_NIC_TYPE]
3065

    
3066
  # HV/BE params
3067
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
3068
    for key, value in source.items():
3069
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
3070

    
3071
  return result
3072

    
3073

    
3074
def DiagnoseExtStorage(top_dirs=None):
3075
  """Compute the validity for all ExtStorage Providers.
3076

3077
  @type top_dirs: list
3078
  @param top_dirs: the list of directories in which to
3079
      search (if not given defaults to
3080
      L{pathutils.ES_SEARCH_PATH})
3081
  @rtype: list of L{objects.ExtStorage}
3082
  @return: a list of tuples (name, path, status, diagnose, parameters)
3083
      for all (potential) ExtStorage Providers under all
3084
      search paths, where:
3085
          - name is the (potential) ExtStorage Provider
3086
          - path is the full path to the ExtStorage Provider
3087
          - status True/False is the validity of the ExtStorage Provider
3088
          - diagnose is the error message for an invalid ExtStorage Provider,
3089
            otherwise empty
3090
          - parameters is a list of (name, help) parameters, if any
3091

3092
  """
3093
  if top_dirs is None:
3094
    top_dirs = pathutils.ES_SEARCH_PATH
3095

    
3096
  result = []
3097
  for dir_name in top_dirs:
3098
    if os.path.isdir(dir_name):
3099
      try:
3100
        f_names = utils.ListVisibleFiles(dir_name)
3101
      except EnvironmentError, err:
3102
        logging.exception("Can't list the ExtStorage directory %s: %s",
3103
                          dir_name, err)
3104
        break
3105
      for name in f_names:
3106
        es_path = utils.PathJoin(dir_name, name)
3107
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
3108
        if status:
3109
          diagnose = ""
3110
          parameters = es_inst.supported_parameters
3111
        else:
3112
          diagnose = es_inst
3113
          parameters = []
3114
        result.append((name, es_path, status, diagnose, parameters))
3115

    
3116
  return result
3117

    
3118

    
3119
def BlockdevGrow(disk, amount, dryrun, backingstore, excl_stor):
3120
  """Grow a stack of block devices.
3121

3122
  This function is called recursively, with the childrens being the
3123
  first ones to resize.
3124

3125
  @type disk: L{objects.Disk}
3126
  @param disk: the disk to be grown
3127
  @type amount: integer
3128
  @param amount: the amount (in mebibytes) to grow with
3129
  @type dryrun: boolean
3130
  @param dryrun: whether to execute the operation in simulation mode
3131
      only, without actually increasing the size
3132
  @param backingstore: whether to execute the operation on backing storage
3133
      only, or on "logical" storage only; e.g. DRBD is logical storage,
3134
      whereas LVM, file, RBD are backing storage
3135
  @rtype: (status, result)
3136
  @type excl_stor: boolean
3137
  @param excl_stor: Whether exclusive_storage is active
3138
  @return: a tuple with the status of the operation (True/False), and
3139
      the errors message if status is False
3140

3141
  """
3142
  r_dev = _RecursiveFindBD(disk)
3143
  if r_dev is None:
3144
    _Fail("Cannot find block device %s", disk)
3145

    
3146
  try:
3147
    r_dev.Grow(amount, dryrun, backingstore, excl_stor)
3148
  except errors.BlockDeviceError, err:
3149
    _Fail("Failed to grow block device: %s", err, exc=True)
3150

    
3151

    
3152
def BlockdevSnapshot(disk):
3153
  """Create a snapshot copy of a block device.
3154

3155
  This function is called recursively, and the snapshot is actually created
3156
  just for the leaf lvm backend device.
3157

3158
  @type disk: L{objects.Disk}
3159
  @param disk: the disk to be snapshotted
3160
  @rtype: string
3161
  @return: snapshot disk ID as (vg, lv)
3162

3163
  """
3164
  if disk.dev_type == constants.DT_DRBD8:
3165
    if not disk.children:
3166
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
3167
            disk.unique_id)
3168
    return BlockdevSnapshot(disk.children[0])
3169
  elif disk.dev_type == constants.DT_PLAIN:
3170
    r_dev = _RecursiveFindBD(disk)
3171
    if r_dev is not None:
3172
      # FIXME: choose a saner value for the snapshot size
3173
      # let's stay on the safe side and ask for the full size, for now
3174
      return r_dev.Snapshot(disk.size)
3175
    else:
3176
      _Fail("Cannot find block device %s", disk)
3177
  else:
3178
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
3179
          disk.unique_id, disk.dev_type)
3180

    
3181

    
3182
def BlockdevSetInfo(disk, info):
3183
  """Sets 'metadata' information on block devices.
3184

3185
  This function sets 'info' metadata on block devices. Initial
3186
  information is set at device creation; this function should be used
3187
  for example after renames.
3188

3189
  @type disk: L{objects.Disk}
3190
  @param disk: the disk to be grown
3191
  @type info: string
3192
  @param info: new 'info' metadata
3193
  @rtype: (status, result)
3194
  @return: a tuple with the status of the operation (True/False), and
3195
      the errors message if status is False
3196

3197
  """
3198
  r_dev = _RecursiveFindBD(disk)
3199
  if r_dev is None:
3200
    _Fail("Cannot find block device %s", disk)
3201

    
3202
  try:
3203
    r_dev.SetInfo(info)
3204
  except errors.BlockDeviceError, err:
3205
    _Fail("Failed to set information on block device: %s", err, exc=True)
3206

    
3207

    
3208
def FinalizeExport(instance, snap_disks):
3209
  """Write out the export configuration information.
3210

3211
  @type instance: L{objects.Instance}
3212
  @param instance: the instance which we export, used for
3213
      saving configuration
3214
  @type snap_disks: list of L{objects.Disk}
3215
  @param snap_disks: list of snapshot block devices, which
3216
      will be used to get the actual name of the dump file
3217

3218
  @rtype: None
3219

3220
  """
3221
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
3222
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
3223

    
3224
  config = objects.SerializableConfigParser()
3225

    
3226
  config.add_section(constants.INISECT_EXP)
3227
  config.set(constants.INISECT_EXP, "version", "0")
3228
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
3229
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
3230
  config.set(constants.INISECT_EXP, "os", instance.os)
3231
  config.set(constants.INISECT_EXP, "compression", "none")
3232

    
3233
  config.add_section(constants.INISECT_INS)
3234
  config.set(constants.INISECT_INS, "name", instance.name)
3235
  config.set(constants.INISECT_INS, "maxmem", "%d" %
3236
             instance.beparams[constants.BE_MAXMEM])
3237
  config.set(constants.INISECT_INS, "minmem", "%d" %
3238
             instance.beparams[constants.BE_MINMEM])
3239
  # "memory" is deprecated, but useful for exporting to old ganeti versions
3240
  config.set(constants.INISECT_INS, "memory", "%d" %
3241
             instance.beparams[constants.BE_MAXMEM])
3242
  config.set(constants.INISECT_INS, "vcpus", "%d" %
3243
             instance.beparams[constants.BE_VCPUS])
3244
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
3245
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
3246
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
3247

    
3248
  nic_total = 0
3249
  for nic_count, nic in enumerate(instance.nics):
3250
    nic_total += 1
3251
    config.set(constants.INISECT_INS, "nic%d_mac" %
3252
               nic_count, "%s" % nic.mac)
3253
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
3254
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
3255
               "%s" % nic.network)
3256
    config.set(constants.INISECT_INS, "nic%d_name" % nic_count,
3257
               "%s" % nic.name)
3258
    for param in constants.NICS_PARAMETER_TYPES:
3259
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
3260
                 "%s" % nic.nicparams.get(param, None))
3261
  # TODO: redundant: on load can read nics until it doesn't exist
3262
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
3263

    
3264
  disk_total = 0
3265
  for disk_count, disk in enumerate(snap_disks):
3266
    if disk:
3267
      disk_total += 1
3268
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
3269
                 ("%s" % disk.iv_name))
3270
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
3271
                 ("%s" % disk.logical_id[1]))
3272
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
3273
                 ("%d" % disk.size))
3274
      config.set(constants.INISECT_INS, "disk%d_name" % disk_count,
3275
                 "%s" % disk.name)
3276

    
3277
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
3278

    
3279
  # New-style hypervisor/backend parameters
3280

    
3281
  config.add_section(constants.INISECT_HYP)
3282
  for name, value in instance.hvparams.items():
3283
    if name not in constants.HVC_GLOBALS:
3284
      config.set(constants.INISECT_HYP, name, str(value))
3285

    
3286
  config.add_section(constants.INISECT_BEP)
3287
  for name, value in instance.beparams.items():
3288
    config.set(constants.INISECT_BEP, name, str(value))
3289

    
3290
  config.add_section(constants.INISECT_OSP)
3291
  for name, value in instance.osparams.items():
3292
    config.set(constants.INISECT_OSP, name, str(value))
3293

    
3294
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
3295
                  data=config.Dumps())
3296
  shutil.rmtree(finaldestdir, ignore_errors=True)
3297
  shutil.move(destdir, finaldestdir)
3298

    
3299

    
3300
def ExportInfo(dest):
3301
  """Get export configuration information.
3302

3303
  @type dest: str
3304
  @param dest: directory containing the export
3305

3306
  @rtype: L{objects.SerializableConfigParser}
3307
  @return: a serializable config file containing the
3308
      export info
3309

3310
  """
3311
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
3312

    
3313
  config = objects.SerializableConfigParser()
3314
  config.read(cff)
3315

    
3316
  if (not config.has_section(constants.INISECT_EXP) or
3317
      not config.has_section(constants.INISECT_INS)):
3318
    _Fail("Export info file doesn't have the required fields")
3319

    
3320
  return config.Dumps()
3321

    
3322

    
3323
def ListExports():
3324
  """Return a list of exports currently available on this machine.
3325

3326
  @rtype: list
3327
  @return: list of the exports
3328

3329
  """
3330
  if os.path.isdir(pathutils.EXPORT_DIR):
3331
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
3332
  else:
3333
    _Fail("No exports directory")
3334

    
3335

    
3336
def RemoveExport(export):
3337
  """Remove an existing export from the node.
3338

3339
  @type export: str
3340
  @param export: the name of the export to remove
3341
  @rtype: None
3342

3343
  """
3344
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
3345

    
3346
  try:
3347
    shutil.rmtree(target)
3348
  except EnvironmentError, err:
3349
    _Fail("Error while removing the export: %s", err, exc=True)
3350

    
3351

    
3352
def BlockdevRename(devlist):
3353
  """Rename a list of block devices.
3354

3355
  @type devlist: list of tuples
3356
  @param devlist: list of tuples of the form  (disk, new_unique_id); disk is
3357
      an L{objects.Disk} object describing the current disk, and new
3358
      unique_id is the name we rename it to
3359
  @rtype: boolean
3360
  @return: True if all renames succeeded, False otherwise
3361

3362
  """
3363
  msgs = []
3364
  result = True
3365
  for disk, unique_id in devlist:
3366
    dev = _RecursiveFindBD(disk)
3367
    if dev is None:
3368
      msgs.append("Can't find device %s in rename" % str(disk))
3369
      result = False
3370
      continue
3371
    try:
3372
      old_rpath = dev.dev_path
3373
      dev.Rename(unique_id)
3374
      new_rpath = dev.dev_path
3375
      if old_rpath != new_rpath:
3376
        DevCacheManager.RemoveCache(old_rpath)
3377
        # FIXME: we should add the new cache information here, like:
3378
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
3379
        # but we don't have the owner here - maybe parse from existing
3380
        # cache? for now, we only lose lvm data when we rename, which
3381
        # is less critical than DRBD or MD
3382
    except errors.BlockDeviceError, err:
3383
      msgs.append("Can't rename device '%s' to '%s': %s" %
3384
                  (dev, unique_id, err))
3385
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
3386
      result = False
3387
  if not result:
3388
    _Fail("; ".join(msgs))
3389

    
3390

    
3391
def _TransformFileStorageDir(fs_dir):
3392
  """Checks whether given file_storage_dir is valid.
3393

3394
  Checks wheter the given fs_dir is within the cluster-wide default
3395
  file_storage_dir or the shared_file_storage_dir, which are stored in
3396
  SimpleStore. Only paths under those directories are allowed.
3397

3398
  @type fs_dir: str
3399
  @param fs_dir: the path to check
3400

3401
  @return: the normalized path if valid, None otherwise
3402

3403
  """
3404
  filestorage.CheckFileStoragePath(fs_dir)
3405

    
3406
  return os.path.normpath(fs_dir)
3407

    
3408

    
3409
def CreateFileStorageDir(file_storage_dir):
3410
  """Create file storage directory.
3411

3412
  @type file_storage_dir: str
3413
  @param file_storage_dir: directory to create
3414

3415
  @rtype: tuple
3416
  @return: tuple with first element a boolean indicating wheter dir
3417
      creation was successful or not
3418

3419
  """
3420
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3421
  if os.path.exists(file_storage_dir):
3422
    if not os.path.isdir(file_storage_dir):
3423
      _Fail("Specified storage dir '%s' is not a directory",
3424
            file_storage_dir)
3425
  else:
3426
    try:
3427
      os.makedirs(file_storage_dir, 0750)
3428
    except OSError, err:
3429
      _Fail("Cannot create file storage directory '%s': %s",
3430
            file_storage_dir, err, exc=True)
3431

    
3432

    
3433
def RemoveFileStorageDir(file_storage_dir):
3434
  """Remove file storage directory.
3435

3436
  Remove it only if it's empty. If not log an error and return.
3437

3438
  @type file_storage_dir: str
3439
  @param file_storage_dir: the directory we should cleanup
3440
  @rtype: tuple (success,)
3441
  @return: tuple of one element, C{success}, denoting
3442
      whether the operation was successful
3443

3444
  """
3445
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
3446
  if os.path.exists(file_storage_dir):
3447
    if not os.path.isdir(file_storage_dir):
3448
      _Fail("Specified Storage directory '%s' is not a directory",
3449
            file_storage_dir)
3450
    # deletes dir only if empty, otherwise we want to fail the rpc call
3451
    try:
3452
      os.rmdir(file_storage_dir)
3453
    except OSError, err:
3454
      _Fail("Cannot remove file storage directory '%s': %s",
3455
            file_storage_dir, err)
3456

    
3457

    
3458
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
3459
  """Rename the file storage directory.
3460

3461
  @type old_file_storage_dir: str
3462
  @param old_file_storage_dir: the current path
3463
  @type new_file_storage_dir: str
3464
  @param new_file_storage_dir: the name we should rename to
3465
  @rtype: tuple (success,)
3466
  @return: tuple of one element, C{success}, denoting
3467
      whether the operation was successful
3468

3469
  """
3470
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
3471
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
3472
  if not os.path.exists(new_file_storage_dir):
3473
    if os.path.isdir(old_file_storage_dir):
3474
      try:
3475
        os.rename(old_file_storage_dir, new_file_storage_dir)
3476
      except OSError, err:
3477
        _Fail("Cannot rename '%s' to '%s': %s",
3478
              old_file_storage_dir, new_file_storage_dir, err)
3479
    else:
3480
      _Fail("Specified storage dir '%s' is not a directory",
3481
            old_file_storage_dir)
3482
  else:
3483
    if os.path.exists(old_file_storage_dir):
3484
      _Fail("Cannot rename '%s' to '%s': both locations exist",
3485
            old_file_storage_dir, new_file_storage_dir)
3486

    
3487

    
3488
def _EnsureJobQueueFile(file_name):
3489
  """Checks whether the given filename is in the queue directory.
3490

3491
  @type file_name: str
3492
  @param file_name: the file name we should check
3493
  @rtype: None
3494
  @raises RPCFail: if the file is not valid
3495

3496
  """
3497
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
3498
    _Fail("Passed job queue file '%s' does not belong to"
3499
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
3500

    
3501

    
3502
def JobQueueUpdate(file_name, content):
3503
  """Updates a file in the queue directory.
3504

3505
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3506
  checking.
3507

3508
  @type file_name: str
3509
  @param file_name: the job file name
3510
  @type content: str
3511
  @param content: the new job contents
3512
  @rtype: boolean
3513
  @return: the success of the operation
3514

3515
  """
3516
  file_name = vcluster.LocalizeVirtualPath(file_name)
3517

    
3518
  _EnsureJobQueueFile(file_name)
3519
  getents = runtime.GetEnts()
3520

    
3521
  # Write and replace the file atomically
3522
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3523
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3524

    
3525

    
3526
def JobQueueRename(old, new):
3527
  """Renames a job queue file.
3528

3529
  This is just a wrapper over os.rename with proper checking.
3530

3531
  @type old: str
3532
  @param old: the old (actual) file name
3533
  @type new: str
3534
  @param new: the desired file name
3535
  @rtype: tuple
3536
  @return: the success of the operation and payload
3537

3538
  """
3539
  old = vcluster.LocalizeVirtualPath(old)
3540
  new = vcluster.LocalizeVirtualPath(new)
3541

    
3542
  _EnsureJobQueueFile(old)
3543
  _EnsureJobQueueFile(new)
3544

    
3545
  getents = runtime.GetEnts()
3546

    
3547
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3548
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3549

    
3550

    
3551
def BlockdevClose(instance_name, disks):
3552
  """Closes the given block devices.
3553

3554
  This means they will be switched to secondary mode (in case of
3555
  DRBD).
3556

3557
  @param instance_name: if the argument is not empty, the symlinks
3558
      of this instance will be removed
3559
  @type disks: list of L{objects.Disk}
3560
  @param disks: the list of disks to be closed
3561
  @rtype: tuple (success, message)
3562
  @return: a tuple of success and message, where success
3563
      indicates the succes of the operation, and message
3564
      which will contain the error details in case we
3565
      failed
3566

3567
  """
3568
  bdevs = []
3569
  for cf in disks:
3570
    rd = _RecursiveFindBD(cf)
3571
    if rd is None:
3572
      _Fail("Can't find device %s", cf)
3573
    bdevs.append(rd)
3574

    
3575
  msg = []
3576
  for rd in bdevs:
3577
    try:
3578
      rd.Close()
3579
    except errors.BlockDeviceError, err:
3580
      msg.append(str(err))
3581
  if msg:
3582
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3583
  else:
3584
    if instance_name:
3585
      _RemoveBlockDevLinks(instance_name, disks)
3586

    
3587

    
3588
def ValidateHVParams(hvname, hvparams):
3589
  """Validates the given hypervisor parameters.
3590

3591
  @type hvname: string
3592
  @param hvname: the hypervisor name
3593
  @type hvparams: dict
3594
  @param hvparams: the hypervisor parameters to be validated
3595
  @rtype: None
3596

3597
  """
3598
  try:
3599
    hv_type = hypervisor.GetHypervisor(hvname)
3600
    hv_type.ValidateParameters(hvparams)
3601
  except errors.HypervisorError, err:
3602
    _Fail(str(err), log=False)
3603

    
3604

    
3605
def _CheckOSPList(os_obj, parameters):
3606
  """Check whether a list of parameters is supported by the OS.
3607

3608
  @type os_obj: L{objects.OS}
3609
  @param os_obj: OS object to check
3610
  @type parameters: list
3611
  @param parameters: the list of parameters to check
3612

3613
  """
3614
  supported = [v[0] for v in os_obj.supported_parameters]
3615
  delta = frozenset(parameters).difference(supported)
3616
  if delta:
3617
    _Fail("The following parameters are not supported"
3618
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3619

    
3620

    
3621
def ValidateOS(required, osname, checks, osparams):
3622
  """Validate the given OS' parameters.
3623

3624
  @type required: boolean
3625
  @param required: whether absence of the OS should translate into
3626
      failure or not
3627
  @type osname: string
3628
  @param osname: the OS to be validated
3629
  @type checks: list
3630
  @param checks: list of the checks to run (currently only 'parameters')
3631
  @type osparams: dict
3632
  @param osparams: dictionary with OS parameters
3633
  @rtype: boolean
3634
  @return: True if the validation passed, or False if the OS was not
3635
      found and L{required} was false
3636

3637
  """
3638
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3639
    _Fail("Unknown checks required for OS %s: %s", osname,
3640
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3641

    
3642
  name_only = objects.OS.GetName(osname)
3643
  status, tbv = _TryOSFromDisk(name_only, None)
3644

    
3645
  if not status:
3646
    if required:
3647
      _Fail(tbv)
3648
    else:
3649
      return False
3650

    
3651
  if max(tbv.api_versions) < constants.OS_API_V20:
3652
    return True
3653

    
3654
  if constants.OS_VALIDATE_PARAMETERS in checks:
3655
    _CheckOSPList(tbv, osparams.keys())
3656

    
3657
  validate_env = OSCoreEnv(osname, tbv, osparams)
3658
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3659
                        cwd=tbv.path, reset_env=True)
3660
  if result.failed:
3661
    logging.error("os validate command '%s' returned error: %s output: %s",
3662
                  result.cmd, result.fail_reason, result.output)
3663
    _Fail("OS validation script failed (%s), output: %s",
3664
          result.fail_reason, result.output, log=False)
3665

    
3666
  return True
3667

    
3668

    
3669
def DemoteFromMC():
3670
  """Demotes the current node from master candidate role.
3671

3672
  """
3673
  # try to ensure we're not the master by mistake
3674
  master, myself = ssconf.GetMasterAndMyself()
3675
  if master == myself:
3676
    _Fail("ssconf status shows I'm the master node, will not demote")
3677

    
3678
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3679
  if not result.failed:
3680
    _Fail("The master daemon is running, will not demote")
3681

    
3682
  try:
3683
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3684
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3685
  except EnvironmentError, err:
3686
    if err.errno != errno.ENOENT:
3687
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3688

    
3689
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3690

    
3691

    
3692
def _GetX509Filenames(cryptodir, name):
3693
  """Returns the full paths for the private key and certificate.
3694

3695
  """
3696
  return (utils.PathJoin(cryptodir, name),
3697
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3698
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3699

    
3700

    
3701
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3702
  """Creates a new X509 certificate for SSL/TLS.
3703

3704
  @type validity: int
3705
  @param validity: Validity in seconds
3706
  @rtype: tuple; (string, string)
3707
  @return: Certificate name and public part
3708

3709
  """
3710
  (key_pem, cert_pem) = \
3711
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3712
                                     min(validity, _MAX_SSL_CERT_VALIDITY), 1)
3713

    
3714
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3715
                              prefix="x509-%s-" % utils.TimestampForFilename())
3716
  try:
3717
    name = os.path.basename(cert_dir)
3718
    assert len(name) > 5
3719

    
3720
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3721

    
3722
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3723
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3724

    
3725
    # Never return private key as it shouldn't leave the node
3726
    return (name, cert_pem)
3727
  except Exception:
3728
    shutil.rmtree(cert_dir, ignore_errors=True)
3729
    raise
3730

    
3731

    
3732
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3733
  """Removes a X509 certificate.
3734

3735
  @type name: string
3736
  @param name: Certificate name
3737

3738
  """
3739
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3740

    
3741
  utils.RemoveFile(key_file)
3742
  utils.RemoveFile(cert_file)
3743

    
3744
  try:
3745
    os.rmdir(cert_dir)
3746
  except EnvironmentError, err:
3747
    _Fail("Cannot remove certificate directory '%s': %s",
3748
          cert_dir, err)
3749

    
3750

    
3751
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3752
  """Returns the command for the requested input/output.
3753

3754
  @type instance: L{objects.Instance}
3755
  @param instance: The instance object
3756
  @param mode: Import/export mode
3757
  @param ieio: Input/output type
3758
  @param ieargs: Input/output arguments
3759

3760
  """
3761
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3762

    
3763
  env = None
3764
  prefix = None
3765
  suffix = None
3766
  exp_size = None
3767

    
3768
  if ieio == constants.IEIO_FILE:
3769
    (filename, ) = ieargs
3770

    
3771
    if not utils.IsNormAbsPath(filename):
3772
      _Fail("Path '%s' is not normalized or absolute", filename)
3773

    
3774
    real_filename = os.path.realpath(filename)
3775
    directory = os.path.dirname(real_filename)
3776

    
3777
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3778
      _Fail("File '%s' is not under exports directory '%s': %s",
3779
            filename, pathutils.EXPORT_DIR, real_filename)
3780

    
3781
    # Create directory
3782
    utils.Makedirs(directory, mode=0750)
3783

    
3784
    quoted_filename = utils.ShellQuote(filename)
3785

    
3786
    if mode == constants.IEM_IMPORT:
3787
      suffix = "> %s" % quoted_filename
3788
    elif mode == constants.IEM_EXPORT:
3789
      suffix = "< %s" % quoted_filename
3790

    
3791
      # Retrieve file size
3792
      try:
3793
        st = os.stat(filename)
3794
      except EnvironmentError, err:
3795
        logging.error("Can't stat(2) %s: %s", filename, err)
3796
      else:
3797
        exp_size = utils.BytesToMebibyte(st.st_size)
3798

    
3799
  elif ieio == constants.IEIO_RAW_DISK:
3800
    (disk, ) = ieargs
3801

    
3802
    real_disk = _OpenRealBD(disk)
3803

    
3804
    if mode == constants.IEM_IMPORT:
3805
      # we use nocreat to fail if the device is not already there or we pass a
3806
      # wrong path; we use notrunc to no attempt truncate on an LV device
3807
      suffix = utils.BuildShellCmd("| dd of=%s conv=nocreat,notrunc bs=%s",
3808
                                   real_disk.dev_path,
3809
                                   str(1024 * 1024)) # 1 MB
3810

    
3811
    elif mode == constants.IEM_EXPORT:
3812
      # the block size on the read dd is 1MiB to match our units
3813
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3814
                                   real_disk.dev_path,
3815
                                   str(1024 * 1024), # 1 MB
3816
                                   str(disk.size))
3817
      exp_size = disk.size
3818

    
3819
  elif ieio == constants.IEIO_SCRIPT:
3820
    (disk, disk_index, ) = ieargs
3821

    
3822
    assert isinstance(disk_index, (int, long))
3823

    
3824
    inst_os = OSFromDisk(instance.os)
3825
    env = OSEnvironment(instance, inst_os)
3826

    
3827
    if mode == constants.IEM_IMPORT:
3828
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3829
      env["IMPORT_INDEX"] = str(disk_index)
3830
      script = inst_os.import_script
3831

    
3832
    elif mode == constants.IEM_EXPORT:
3833
      real_disk = _OpenRealBD(disk)
3834
      env["EXPORT_DEVICE"] = real_disk.dev_path
3835
      env["EXPORT_INDEX"] = str(disk_index)
3836
      script = inst_os.export_script
3837

    
3838
    # TODO: Pass special environment only to script
3839
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3840

    
3841
    if mode == constants.IEM_IMPORT:
3842
      suffix = "| %s" % script_cmd
3843

    
3844
    elif mode == constants.IEM_EXPORT:
3845
      prefix = "%s |" % script_cmd
3846

    
3847
    # Let script predict size
3848
    exp_size = constants.IE_CUSTOM_SIZE
3849

    
3850
  else:
3851
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3852

    
3853
  return (env, prefix, suffix, exp_size)
3854

    
3855

    
3856
def _CreateImportExportStatusDir(prefix):
3857
  """Creates status directory for import/export.
3858

3859
  """
3860
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3861
                          prefix=("%s-%s-" %
3862
                                  (prefix, utils.TimestampForFilename())))
3863

    
3864

    
3865
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3866
                            ieio, ieioargs):
3867
  """Starts an import or export daemon.
3868

3869
  @param mode: Import/output mode
3870
  @type opts: L{objects.ImportExportOptions}
3871
  @param opts: Daemon options
3872
  @type host: string
3873
  @param host: Remote host for export (None for import)
3874
  @type port: int
3875
  @param port: Remote port for export (None for import)
3876
  @type instance: L{objects.Instance}
3877
  @param instance: Instance object
3878
  @type component: string
3879
  @param component: which part of the instance is transferred now,
3880
      e.g. 'disk/0'
3881
  @param ieio: Input/output type
3882
  @param ieioargs: Input/output arguments
3883

3884
  """
3885
  if mode == constants.IEM_IMPORT:
3886
    prefix = "import"
3887

    
3888
    if not (host is None and port is None):
3889
      _Fail("Can not specify host or port on import")
3890

    
3891
  elif mode == constants.IEM_EXPORT:
3892
    prefix = "export"
3893

    
3894
    if host is None or port is None:
3895
      _Fail("Host and port must be specified for an export")
3896

    
3897
  else:
3898
    _Fail("Invalid mode %r", mode)
3899

    
3900
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3901
    _Fail("Cluster certificate can only be used for both key and CA")
3902

    
3903
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3904
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3905

    
3906
  if opts.key_name is None:
3907
    # Use server.pem
3908
    key_path = pathutils.NODED_CERT_FILE
3909
    cert_path = pathutils.NODED_CERT_FILE
3910
    assert opts.ca_pem is None
3911
  else:
3912
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3913
                                                 opts.key_name)
3914
    assert opts.ca_pem is not None
3915

    
3916
  for i in [key_path, cert_path]:
3917
    if not os.path.exists(i):
3918
      _Fail("File '%s' does not exist" % i)
3919

    
3920
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3921
  try:
3922
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3923
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3924
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3925

    
3926
    if opts.ca_pem is None:
3927
      # Use server.pem
3928
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3929
    else:
3930
      ca = opts.ca_pem
3931

    
3932
    # Write CA file
3933
    utils.WriteFile(ca_file, data=ca, mode=0400)
3934

    
3935
    cmd = [
3936
      pathutils.IMPORT_EXPORT_DAEMON,
3937
      status_file, mode,
3938
      "--key=%s" % key_path,
3939
      "--cert=%s" % cert_path,
3940
      "--ca=%s" % ca_file,
3941
      ]
3942

    
3943
    if host:
3944
      cmd.append("--host=%s" % host)
3945

    
3946
    if port:
3947
      cmd.append("--port=%s" % port)
3948

    
3949
    if opts.ipv6:
3950
      cmd.append("--ipv6")
3951
    else:
3952
      cmd.append("--ipv4")
3953

    
3954
    if opts.compress:
3955
      cmd.append("--compress=%s" % opts.compress)
3956

    
3957
    if opts.magic:
3958
      cmd.append("--magic=%s" % opts.magic)
3959

    
3960
    if exp_size is not None:
3961
      cmd.append("--expected-size=%s" % exp_size)
3962

    
3963
    if cmd_prefix:
3964
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3965

    
3966
    if cmd_suffix:
3967
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3968

    
3969
    if mode == constants.IEM_EXPORT:
3970
      # Retry connection a few times when connecting to remote peer
3971
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3972
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3973
    elif opts.connect_timeout is not None:
3974
      assert mode == constants.IEM_IMPORT
3975
      # Overall timeout for establishing connection while listening
3976
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3977

    
3978
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3979

    
3980
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3981
    # support for receiving a file descriptor for output
3982
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3983
                      output=logfile)
3984

    
3985
    # The import/export name is simply the status directory name
3986
    return os.path.basename(status_dir)
3987

    
3988
  except Exception:
3989
    shutil.rmtree(status_dir, ignore_errors=True)
3990
    raise
3991

    
3992

    
3993
def GetImportExportStatus(names):
3994
  """Returns import/export daemon status.
3995

3996
  @type names: sequence
3997
  @param names: List of names
3998
  @rtype: List of dicts
3999
  @return: Returns a list of the state of each named import/export or None if a
4000
           status couldn't be read
4001

4002
  """
4003
  result = []
4004

    
4005
  for name in names:
4006
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
4007
                                 _IES_STATUS_FILE)
4008

    
4009
    try:
4010
      data = utils.ReadFile(status_file)
4011
    except EnvironmentError, err:
4012
      if err.errno != errno.ENOENT:
4013
        raise
4014
      data = None
4015

    
4016
    if not data:
4017
      result.append(None)
4018
      continue
4019

    
4020
    result.append(serializer.LoadJson(data))
4021

    
4022
  return result
4023

    
4024

    
4025
def AbortImportExport(name):
4026
  """Sends SIGTERM to a running import/export daemon.
4027

4028
  """
4029
  logging.info("Abort import/export %s", name)
4030

    
4031
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4032
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4033

    
4034
  if pid:
4035
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
4036
                 name, pid)
4037
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
4038

    
4039

    
4040
def CleanupImportExport(name):
4041
  """Cleanup after an import or export.
4042

4043
  If the import/export daemon is still running it's killed. Afterwards the
4044
  whole status directory is removed.
4045

4046
  """
4047
  logging.info("Finalizing import/export %s", name)
4048

    
4049
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
4050

    
4051
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
4052

    
4053
  if pid:
4054
    logging.info("Import/export %s is still running with PID %s",
4055
                 name, pid)
4056
    utils.KillProcess(pid, waitpid=False)
4057

    
4058
  shutil.rmtree(status_dir, ignore_errors=True)
4059

    
4060

    
4061
def _FindDisks(disks):
4062
  """Finds attached L{BlockDev}s for the given disks.
4063

4064
  @type disks: list of L{objects.Disk}
4065
  @param disks: the disk objects we need to find
4066

4067
  @return: list of L{BlockDev} objects or C{None} if a given disk
4068
           was not found or was no attached.
4069

4070
  """
4071
  bdevs = []
4072

    
4073
  for disk in disks:
4074
    rd = _RecursiveFindBD(disk)
4075
    if rd is None:
4076
      _Fail("Can't find device %s", disk)
4077
    bdevs.append(rd)
4078
  return bdevs
4079

    
4080

    
4081
def DrbdDisconnectNet(disks):
4082