Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 0188611b

History | View | Annotate | Download (113.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70
_ALLOWED_CLEAN_DIRS = frozenset([
71
  pathutils.DATA_DIR,
72
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
73
  pathutils.QUEUE_DIR,
74
  pathutils.CRYPTO_KEYS_DIR,
75
  ])
76
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77
_X509_KEY_FILE = "key"
78
_X509_CERT_FILE = "cert"
79
_IES_STATUS_FILE = "status"
80
_IES_PID_FILE = "pid"
81
_IES_CA_FILE = "ca"
82

    
83
#: Valid LVS output line regex
84
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85

    
86
# Actions for the master setup script
87
_MASTER_START = "start"
88
_MASTER_STOP = "stop"
89

    
90

    
91
class RPCFail(Exception):
92
  """Class denoting RPC failure.
93

94
  Its argument is the error message.
95

96
  """
97

    
98

    
99
def _Fail(msg, *args, **kwargs):
100
  """Log an error and the raise an RPCFail exception.
101

102
  This exception is then handled specially in the ganeti daemon and
103
  turned into a 'failed' return type. As such, this function is a
104
  useful shortcut for logging the error and returning it to the master
105
  daemon.
106

107
  @type msg: string
108
  @param msg: the text of the exception
109
  @raise RPCFail
110

111
  """
112
  if args:
113
    msg = msg % args
114
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
115
    if "exc" in kwargs and kwargs["exc"]:
116
      logging.exception(msg)
117
    else:
118
      logging.error(msg)
119
  raise RPCFail(msg)
120

    
121

    
122
def _GetConfig():
123
  """Simple wrapper to return a SimpleStore.
124

125
  @rtype: L{ssconf.SimpleStore}
126
  @return: a SimpleStore instance
127

128
  """
129
  return ssconf.SimpleStore()
130

    
131

    
132
def _GetSshRunner(cluster_name):
133
  """Simple wrapper to return an SshRunner.
134

135
  @type cluster_name: str
136
  @param cluster_name: the cluster name, which is needed
137
      by the SshRunner constructor
138
  @rtype: L{ssh.SshRunner}
139
  @return: an SshRunner instance
140

141
  """
142
  return ssh.SshRunner(cluster_name)
143

    
144

    
145
def _Decompress(data):
146
  """Unpacks data compressed by the RPC client.
147

148
  @type data: list or tuple
149
  @param data: Data sent by RPC client
150
  @rtype: str
151
  @return: Decompressed data
152

153
  """
154
  assert isinstance(data, (list, tuple))
155
  assert len(data) == 2
156
  (encoding, content) = data
157
  if encoding == constants.RPC_ENCODING_NONE:
158
    return content
159
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160
    return zlib.decompress(base64.b64decode(content))
161
  else:
162
    raise AssertionError("Unknown data encoding")
163

    
164

    
165
def _CleanDirectory(path, exclude=None):
166
  """Removes all regular files in a directory.
167

168
  @type path: str
169
  @param path: the directory to clean
170
  @type exclude: list
171
  @param exclude: list of files to be excluded, defaults
172
      to the empty list
173

174
  """
175
  if path not in _ALLOWED_CLEAN_DIRS:
176
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177
          path)
178

    
179
  if not os.path.isdir(path):
180
    return
181
  if exclude is None:
182
    exclude = []
183
  else:
184
    # Normalize excluded paths
185
    exclude = [os.path.normpath(i) for i in exclude]
186

    
187
  for rel_name in utils.ListVisibleFiles(path):
188
    full_name = utils.PathJoin(path, rel_name)
189
    if full_name in exclude:
190
      continue
191
    if os.path.isfile(full_name) and not os.path.islink(full_name):
192
      utils.RemoveFile(full_name)
193

    
194

    
195
def _BuildUploadFileList():
196
  """Build the list of allowed upload files.
197

198
  This is abstracted so that it's built only once at module import time.
199

200
  """
201
  allowed_files = set([
202
    pathutils.CLUSTER_CONF_FILE,
203
    constants.ETC_HOSTS,
204
    pathutils.SSH_KNOWN_HOSTS_FILE,
205
    pathutils.VNC_PASSWORD_FILE,
206
    pathutils.RAPI_CERT_FILE,
207
    pathutils.SPICE_CERT_FILE,
208
    pathutils.SPICE_CACERT_FILE,
209
    pathutils.RAPI_USERS_FILE,
210
    pathutils.CONFD_HMAC_KEY,
211
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212
    ])
213

    
214
  for hv_name in constants.HYPER_TYPES:
215
    hv_class = hypervisor.GetHypervisorClass(hv_name)
216
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
217

    
218
  return frozenset(allowed_files)
219

    
220

    
221
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
222

    
223

    
224
def JobQueuePurge():
225
  """Removes job queue files and archived jobs.
226

227
  @rtype: tuple
228
  @return: True, None
229

230
  """
231
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
232
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
233

    
234

    
235
def GetMasterInfo():
236
  """Returns master information.
237

238
  This is an utility function to compute master information, either
239
  for consumption here or from the node daemon.
240

241
  @rtype: tuple
242
  @return: master_netdev, master_ip, master_name, primary_ip_family,
243
    master_netmask
244
  @raise RPCFail: in case of errors
245

246
  """
247
  try:
248
    cfg = _GetConfig()
249
    master_netdev = cfg.GetMasterNetdev()
250
    master_ip = cfg.GetMasterIP()
251
    master_netmask = cfg.GetMasterNetmask()
252
    master_node = cfg.GetMasterNode()
253
    primary_ip_family = cfg.GetPrimaryIPFamily()
254
  except errors.ConfigurationError, err:
255
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
256
  return (master_netdev, master_ip, master_node, primary_ip_family,
257
          master_netmask)
258

    
259

    
260
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
261
  """Decorator that runs hooks before and after the decorated function.
262

263
  @type hook_opcode: string
264
  @param hook_opcode: opcode of the hook
265
  @type hooks_path: string
266
  @param hooks_path: path of the hooks
267
  @type env_builder_fn: function
268
  @param env_builder_fn: function that returns a dictionary containing the
269
    environment variables for the hooks. Will get all the parameters of the
270
    decorated function.
271
  @raise RPCFail: in case of pre-hook failure
272

273
  """
274
  def decorator(fn):
275
    def wrapper(*args, **kwargs):
276
      _, myself = ssconf.GetMasterAndMyself()
277
      nodes = ([myself], [myself])  # these hooks run locally
278

    
279
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
280

    
281
      cfg = _GetConfig()
282
      hr = HooksRunner()
283
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
284
                            None, env_fn, logging.warning, cfg.GetClusterName(),
285
                            cfg.GetMasterNode())
286

    
287
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
288
      result = fn(*args, **kwargs)
289
      hm.RunPhase(constants.HOOKS_PHASE_POST)
290

    
291
      return result
292
    return wrapper
293
  return decorator
294

    
295

    
296
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
297
  """Builds environment variables for master IP hooks.
298

299
  @type master_params: L{objects.MasterNetworkParameters}
300
  @param master_params: network parameters of the master
301
  @type use_external_mip_script: boolean
302
  @param use_external_mip_script: whether to use an external master IP
303
    address setup script (unused, but necessary per the implementation of the
304
    _RunLocalHooks decorator)
305

306
  """
307
  # pylint: disable=W0613
308
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
309
  env = {
310
    "MASTER_NETDEV": master_params.netdev,
311
    "MASTER_IP": master_params.ip,
312
    "MASTER_NETMASK": str(master_params.netmask),
313
    "CLUSTER_IP_VERSION": str(ver),
314
  }
315

    
316
  return env
317

    
318

    
319
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
320
  """Execute the master IP address setup script.
321

322
  @type master_params: L{objects.MasterNetworkParameters}
323
  @param master_params: network parameters of the master
324
  @type action: string
325
  @param action: action to pass to the script. Must be one of
326
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
327
  @type use_external_mip_script: boolean
328
  @param use_external_mip_script: whether to use an external master IP
329
    address setup script
330
  @raise backend.RPCFail: if there are errors during the execution of the
331
    script
332

333
  """
334
  env = _BuildMasterIpEnv(master_params)
335

    
336
  if use_external_mip_script:
337
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
338
  else:
339
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
340

    
341
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
342

    
343
  if result.failed:
344
    _Fail("Failed to %s the master IP. Script return value: %s" %
345
          (action, result.exit_code), log=True)
346

    
347

    
348
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
349
               _BuildMasterIpEnv)
350
def ActivateMasterIp(master_params, use_external_mip_script):
351
  """Activate the IP address of the master daemon.
352

353
  @type master_params: L{objects.MasterNetworkParameters}
354
  @param master_params: network parameters of the master
355
  @type use_external_mip_script: boolean
356
  @param use_external_mip_script: whether to use an external master IP
357
    address setup script
358
  @raise RPCFail: in case of errors during the IP startup
359

360
  """
361
  _RunMasterSetupScript(master_params, _MASTER_START,
362
                        use_external_mip_script)
363

    
364

    
365
def StartMasterDaemons(no_voting):
366
  """Activate local node as master node.
367

368
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
369

370
  @type no_voting: boolean
371
  @param no_voting: whether to start ganeti-masterd without a node vote
372
      but still non-interactively
373
  @rtype: None
374

375
  """
376

    
377
  if no_voting:
378
    masterd_args = "--no-voting --yes-do-it"
379
  else:
380
    masterd_args = ""
381

    
382
  env = {
383
    "EXTRA_MASTERD_ARGS": masterd_args,
384
    }
385

    
386
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
387
  if result.failed:
388
    msg = "Can't start Ganeti master: %s" % result.output
389
    logging.error(msg)
390
    _Fail(msg)
391

    
392

    
393
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
394
               _BuildMasterIpEnv)
395
def DeactivateMasterIp(master_params, use_external_mip_script):
396
  """Deactivate the master IP on this node.
397

398
  @type master_params: L{objects.MasterNetworkParameters}
399
  @param master_params: network parameters of the master
400
  @type use_external_mip_script: boolean
401
  @param use_external_mip_script: whether to use an external master IP
402
    address setup script
403
  @raise RPCFail: in case of errors during the IP turndown
404

405
  """
406
  _RunMasterSetupScript(master_params, _MASTER_STOP,
407
                        use_external_mip_script)
408

    
409

    
410
def StopMasterDaemons():
411
  """Stop the master daemons on this node.
412

413
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
414

415
  @rtype: None
416

417
  """
418
  # TODO: log and report back to the caller the error failures; we
419
  # need to decide in which case we fail the RPC for this
420

    
421
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
422
  if result.failed:
423
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
424
                  " and error %s",
425
                  result.cmd, result.exit_code, result.output)
426

    
427

    
428
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
429
  """Change the netmask of the master IP.
430

431
  @param old_netmask: the old value of the netmask
432
  @param netmask: the new value of the netmask
433
  @param master_ip: the master IP
434
  @param master_netdev: the master network device
435

436
  """
437
  if old_netmask == netmask:
438
    return
439

    
440
  if not netutils.IPAddress.Own(master_ip):
441
    _Fail("The master IP address is not up, not attempting to change its"
442
          " netmask")
443

    
444
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
445
                         "%s/%s" % (master_ip, netmask),
446
                         "dev", master_netdev, "label",
447
                         "%s:0" % master_netdev])
448
  if result.failed:
449
    _Fail("Could not set the new netmask on the master IP address")
450

    
451
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
452
                         "%s/%s" % (master_ip, old_netmask),
453
                         "dev", master_netdev, "label",
454
                         "%s:0" % master_netdev])
455
  if result.failed:
456
    _Fail("Could not bring down the master IP address with the old netmask")
457

    
458

    
459
def EtcHostsModify(mode, host, ip):
460
  """Modify a host entry in /etc/hosts.
461

462
  @param mode: The mode to operate. Either add or remove entry
463
  @param host: The host to operate on
464
  @param ip: The ip associated with the entry
465

466
  """
467
  if mode == constants.ETC_HOSTS_ADD:
468
    if not ip:
469
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
470
              " present")
471
    utils.AddHostToEtcHosts(host, ip)
472
  elif mode == constants.ETC_HOSTS_REMOVE:
473
    if ip:
474
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
475
              " parameter is present")
476
    utils.RemoveHostFromEtcHosts(host)
477
  else:
478
    RPCFail("Mode not supported")
479

    
480

    
481
def LeaveCluster(modify_ssh_setup):
482
  """Cleans up and remove the current node.
483

484
  This function cleans up and prepares the current node to be removed
485
  from the cluster.
486

487
  If processing is successful, then it raises an
488
  L{errors.QuitGanetiException} which is used as a special case to
489
  shutdown the node daemon.
490

491
  @param modify_ssh_setup: boolean
492

493
  """
494
  _CleanDirectory(pathutils.DATA_DIR)
495
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
496
  JobQueuePurge()
497

    
498
  if modify_ssh_setup:
499
    try:
500
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
501

    
502
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
503

    
504
      utils.RemoveFile(priv_key)
505
      utils.RemoveFile(pub_key)
506
    except errors.OpExecError:
507
      logging.exception("Error while processing ssh files")
508

    
509
  try:
510
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
511
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
512
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
513
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
514
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
515
  except: # pylint: disable=W0702
516
    logging.exception("Error while removing cluster secrets")
517

    
518
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
519
  if result.failed:
520
    logging.error("Command %s failed with exitcode %s and error %s",
521
                  result.cmd, result.exit_code, result.output)
522

    
523
  # Raise a custom exception (handled in ganeti-noded)
524
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
525

    
526

    
527
def _GetVgInfo(name):
528
  """Retrieves information about a LVM volume group.
529

530
  """
531
  # TODO: GetVGInfo supports returning information for multiple VGs at once
532
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
533
  if vginfo:
534
    vg_free = int(round(vginfo[0][0], 0))
535
    vg_size = int(round(vginfo[0][1], 0))
536
  else:
537
    vg_free = None
538
    vg_size = None
539

    
540
  return {
541
    "name": name,
542
    "vg_free": vg_free,
543
    "vg_size": vg_size,
544
    }
545

    
546

    
547
def _GetHvInfo(name):
548
  """Retrieves node information from a hypervisor.
549

550
  The information returned depends on the hypervisor. Common items:
551

552
    - vg_size is the size of the configured volume group in MiB
553
    - vg_free is the free size of the volume group in MiB
554
    - memory_dom0 is the memory allocated for domain0 in MiB
555
    - memory_free is the currently available (free) ram in MiB
556
    - memory_total is the total number of ram in MiB
557
    - hv_version: the hypervisor version, if available
558

559
  """
560
  return hypervisor.GetHypervisor(name).GetNodeInfo()
561

    
562

    
563
def _GetNamedNodeInfo(names, fn):
564
  """Calls C{fn} for all names in C{names} and returns a dictionary.
565

566
  @rtype: None or dict
567

568
  """
569
  if names is None:
570
    return None
571
  else:
572
    return map(fn, names)
573

    
574

    
575
def GetNodeInfo(vg_names, hv_names):
576
  """Gives back a hash with different information about the node.
577

578
  @type vg_names: list of string
579
  @param vg_names: Names of the volume groups to ask for disk space information
580
  @type hv_names: list of string
581
  @param hv_names: Names of the hypervisors to ask for node information
582
  @rtype: tuple; (string, None/dict, None/dict)
583
  @return: Tuple containing boot ID, volume group information and hypervisor
584
    information
585

586
  """
587
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
588
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
589
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
590

    
591
  return (bootid, vg_info, hv_info)
592

    
593

    
594
def VerifyNode(what, cluster_name):
595
  """Verify the status of the local node.
596

597
  Based on the input L{what} parameter, various checks are done on the
598
  local node.
599

600
  If the I{filelist} key is present, this list of
601
  files is checksummed and the file/checksum pairs are returned.
602

603
  If the I{nodelist} key is present, we check that we have
604
  connectivity via ssh with the target nodes (and check the hostname
605
  report).
606

607
  If the I{node-net-test} key is present, we check that we have
608
  connectivity to the given nodes via both primary IP and, if
609
  applicable, secondary IPs.
610

611
  @type what: C{dict}
612
  @param what: a dictionary of things to check:
613
      - filelist: list of files for which to compute checksums
614
      - nodelist: list of nodes we should check ssh communication with
615
      - node-net-test: list of nodes we should check node daemon port
616
        connectivity with
617
      - hypervisor: list with hypervisors to run the verify for
618
  @rtype: dict
619
  @return: a dictionary with the same keys as the input dict, and
620
      values representing the result of the checks
621

622
  """
623
  result = {}
624
  my_name = netutils.Hostname.GetSysName()
625
  port = netutils.GetDaemonPort(constants.NODED)
626
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
627

    
628
  if constants.NV_HYPERVISOR in what and vm_capable:
629
    result[constants.NV_HYPERVISOR] = tmp = {}
630
    for hv_name in what[constants.NV_HYPERVISOR]:
631
      try:
632
        val = hypervisor.GetHypervisor(hv_name).Verify()
633
      except errors.HypervisorError, err:
634
        val = "Error while checking hypervisor: %s" % str(err)
635
      tmp[hv_name] = val
636

    
637
  if constants.NV_HVPARAMS in what and vm_capable:
638
    result[constants.NV_HVPARAMS] = tmp = []
639
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
640
      try:
641
        logging.info("Validating hv %s, %s", hv_name, hvparms)
642
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
643
      except errors.HypervisorError, err:
644
        tmp.append((source, hv_name, str(err)))
645

    
646
  if constants.NV_FILELIST in what:
647
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
648
      what[constants.NV_FILELIST])
649

    
650
  if constants.NV_NODELIST in what:
651
    (nodes, bynode) = what[constants.NV_NODELIST]
652

    
653
    # Add nodes from other groups (different for each node)
654
    try:
655
      nodes.extend(bynode[my_name])
656
    except KeyError:
657
      pass
658

    
659
    # Use a random order
660
    random.shuffle(nodes)
661

    
662
    # Try to contact all nodes
663
    val = {}
664
    for node in nodes:
665
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
666
      if not success:
667
        val[node] = message
668

    
669
    result[constants.NV_NODELIST] = val
670

    
671
  if constants.NV_NODENETTEST in what:
672
    result[constants.NV_NODENETTEST] = tmp = {}
673
    my_pip = my_sip = None
674
    for name, pip, sip in what[constants.NV_NODENETTEST]:
675
      if name == my_name:
676
        my_pip = pip
677
        my_sip = sip
678
        break
679
    if not my_pip:
680
      tmp[my_name] = ("Can't find my own primary/secondary IP"
681
                      " in the node list")
682
    else:
683
      for name, pip, sip in what[constants.NV_NODENETTEST]:
684
        fail = []
685
        if not netutils.TcpPing(pip, port, source=my_pip):
686
          fail.append("primary")
687
        if sip != pip:
688
          if not netutils.TcpPing(sip, port, source=my_sip):
689
            fail.append("secondary")
690
        if fail:
691
          tmp[name] = ("failure using the %s interface(s)" %
692
                       " and ".join(fail))
693

    
694
  if constants.NV_MASTERIP in what:
695
    # FIXME: add checks on incoming data structures (here and in the
696
    # rest of the function)
697
    master_name, master_ip = what[constants.NV_MASTERIP]
698
    if master_name == my_name:
699
      source = constants.IP4_ADDRESS_LOCALHOST
700
    else:
701
      source = None
702
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
703
                                                     source=source)
704

    
705
  if constants.NV_USERSCRIPTS in what:
706
    result[constants.NV_USERSCRIPTS] = \
707
      [script for script in what[constants.NV_USERSCRIPTS]
708
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
709

    
710
  if constants.NV_OOB_PATHS in what:
711
    result[constants.NV_OOB_PATHS] = tmp = []
712
    for path in what[constants.NV_OOB_PATHS]:
713
      try:
714
        st = os.stat(path)
715
      except OSError, err:
716
        tmp.append("error stating out of band helper: %s" % err)
717
      else:
718
        if stat.S_ISREG(st.st_mode):
719
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
720
            tmp.append(None)
721
          else:
722
            tmp.append("out of band helper %s is not executable" % path)
723
        else:
724
          tmp.append("out of band helper %s is not a file" % path)
725

    
726
  if constants.NV_LVLIST in what and vm_capable:
727
    try:
728
      val = GetVolumeList(utils.ListVolumeGroups().keys())
729
    except RPCFail, err:
730
      val = str(err)
731
    result[constants.NV_LVLIST] = val
732

    
733
  if constants.NV_INSTANCELIST in what and vm_capable:
734
    # GetInstanceList can fail
735
    try:
736
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
737
    except RPCFail, err:
738
      val = str(err)
739
    result[constants.NV_INSTANCELIST] = val
740

    
741
  if constants.NV_VGLIST in what and vm_capable:
742
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
743

    
744
  if constants.NV_PVLIST in what and vm_capable:
745
    result[constants.NV_PVLIST] = \
746
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
747
                                   filter_allocatable=False)
748

    
749
  if constants.NV_VERSION in what:
750
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
751
                                    constants.RELEASE_VERSION)
752

    
753
  if constants.NV_HVINFO in what and vm_capable:
754
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
755
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
756

    
757
  if constants.NV_DRBDLIST in what and vm_capable:
758
    try:
759
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
760
    except errors.BlockDeviceError, err:
761
      logging.warning("Can't get used minors list", exc_info=True)
762
      used_minors = str(err)
763
    result[constants.NV_DRBDLIST] = used_minors
764

    
765
  if constants.NV_DRBDHELPER in what and vm_capable:
766
    status = True
767
    try:
768
      payload = bdev.BaseDRBD.GetUsermodeHelper()
769
    except errors.BlockDeviceError, err:
770
      logging.error("Can't get DRBD usermode helper: %s", str(err))
771
      status = False
772
      payload = str(err)
773
    result[constants.NV_DRBDHELPER] = (status, payload)
774

    
775
  if constants.NV_NODESETUP in what:
776
    result[constants.NV_NODESETUP] = tmpr = []
777
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
778
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
779
                  " under /sys, missing required directories /sys/block"
780
                  " and /sys/class/net")
781
    if (not os.path.isdir("/proc/sys") or
782
        not os.path.isfile("/proc/sysrq-trigger")):
783
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
784
                  " under /proc, missing required directory /proc/sys and"
785
                  " the file /proc/sysrq-trigger")
786

    
787
  if constants.NV_TIME in what:
788
    result[constants.NV_TIME] = utils.SplitTime(time.time())
789

    
790
  if constants.NV_OSLIST in what and vm_capable:
791
    result[constants.NV_OSLIST] = DiagnoseOS()
792

    
793
  if constants.NV_BRIDGES in what and vm_capable:
794
    result[constants.NV_BRIDGES] = [bridge
795
                                    for bridge in what[constants.NV_BRIDGES]
796
                                    if not utils.BridgeExists(bridge)]
797
  return result
798

    
799

    
800
def GetBlockDevSizes(devices):
801
  """Return the size of the given block devices
802

803
  @type devices: list
804
  @param devices: list of block device nodes to query
805
  @rtype: dict
806
  @return:
807
    dictionary of all block devices under /dev (key). The value is their
808
    size in MiB.
809

810
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
811

812
  """
813
  DEV_PREFIX = "/dev/"
814
  blockdevs = {}
815

    
816
  for devpath in devices:
817
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
818
      continue
819

    
820
    try:
821
      st = os.stat(devpath)
822
    except EnvironmentError, err:
823
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
824
      continue
825

    
826
    if stat.S_ISBLK(st.st_mode):
827
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
828
      if result.failed:
829
        # We don't want to fail, just do not list this device as available
830
        logging.warning("Cannot get size for block device %s", devpath)
831
        continue
832

    
833
      size = int(result.stdout) / (1024 * 1024)
834
      blockdevs[devpath] = size
835
  return blockdevs
836

    
837

    
838
def GetVolumeList(vg_names):
839
  """Compute list of logical volumes and their size.
840

841
  @type vg_names: list
842
  @param vg_names: the volume groups whose LVs we should list, or
843
      empty for all volume groups
844
  @rtype: dict
845
  @return:
846
      dictionary of all partions (key) with value being a tuple of
847
      their size (in MiB), inactive and online status::
848

849
        {'xenvg/test1': ('20.06', True, True)}
850

851
      in case of errors, a string is returned with the error
852
      details.
853

854
  """
855
  lvs = {}
856
  sep = "|"
857
  if not vg_names:
858
    vg_names = []
859
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
860
                         "--separator=%s" % sep,
861
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
862
  if result.failed:
863
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
864

    
865
  for line in result.stdout.splitlines():
866
    line = line.strip()
867
    match = _LVSLINE_REGEX.match(line)
868
    if not match:
869
      logging.error("Invalid line returned from lvs output: '%s'", line)
870
      continue
871
    vg_name, name, size, attr = match.groups()
872
    inactive = attr[4] == "-"
873
    online = attr[5] == "o"
874
    virtual = attr[0] == "v"
875
    if virtual:
876
      # we don't want to report such volumes as existing, since they
877
      # don't really hold data
878
      continue
879
    lvs[vg_name + "/" + name] = (size, inactive, online)
880

    
881
  return lvs
882

    
883

    
884
def ListVolumeGroups():
885
  """List the volume groups and their size.
886

887
  @rtype: dict
888
  @return: dictionary with keys volume name and values the
889
      size of the volume
890

891
  """
892
  return utils.ListVolumeGroups()
893

    
894

    
895
def NodeVolumes():
896
  """List all volumes on this node.
897

898
  @rtype: list
899
  @return:
900
    A list of dictionaries, each having four keys:
901
      - name: the logical volume name,
902
      - size: the size of the logical volume
903
      - dev: the physical device on which the LV lives
904
      - vg: the volume group to which it belongs
905

906
    In case of errors, we return an empty list and log the
907
    error.
908

909
    Note that since a logical volume can live on multiple physical
910
    volumes, the resulting list might include a logical volume
911
    multiple times.
912

913
  """
914
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
915
                         "--separator=|",
916
                         "--options=lv_name,lv_size,devices,vg_name"])
917
  if result.failed:
918
    _Fail("Failed to list logical volumes, lvs output: %s",
919
          result.output)
920

    
921
  def parse_dev(dev):
922
    return dev.split("(")[0]
923

    
924
  def handle_dev(dev):
925
    return [parse_dev(x) for x in dev.split(",")]
926

    
927
  def map_line(line):
928
    line = [v.strip() for v in line]
929
    return [{"name": line[0], "size": line[1],
930
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
931

    
932
  all_devs = []
933
  for line in result.stdout.splitlines():
934
    if line.count("|") >= 3:
935
      all_devs.extend(map_line(line.split("|")))
936
    else:
937
      logging.warning("Strange line in the output from lvs: '%s'", line)
938
  return all_devs
939

    
940

    
941
def BridgesExist(bridges_list):
942
  """Check if a list of bridges exist on the current node.
943

944
  @rtype: boolean
945
  @return: C{True} if all of them exist, C{False} otherwise
946

947
  """
948
  missing = []
949
  for bridge in bridges_list:
950
    if not utils.BridgeExists(bridge):
951
      missing.append(bridge)
952

    
953
  if missing:
954
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
955

    
956

    
957
def GetInstanceList(hypervisor_list):
958
  """Provides a list of instances.
959

960
  @type hypervisor_list: list
961
  @param hypervisor_list: the list of hypervisors to query information
962

963
  @rtype: list
964
  @return: a list of all running instances on the current node
965
    - instance1.example.com
966
    - instance2.example.com
967

968
  """
969
  results = []
970
  for hname in hypervisor_list:
971
    try:
972
      names = hypervisor.GetHypervisor(hname).ListInstances()
973
      results.extend(names)
974
    except errors.HypervisorError, err:
975
      _Fail("Error enumerating instances (hypervisor %s): %s",
976
            hname, err, exc=True)
977

    
978
  return results
979

    
980

    
981
def GetInstanceInfo(instance, hname):
982
  """Gives back the information about an instance as a dictionary.
983

984
  @type instance: string
985
  @param instance: the instance name
986
  @type hname: string
987
  @param hname: the hypervisor type of the instance
988

989
  @rtype: dict
990
  @return: dictionary with the following keys:
991
      - memory: memory size of instance (int)
992
      - state: xen state of instance (string)
993
      - time: cpu time of instance (float)
994
      - vcpus: the number of vcpus (int)
995

996
  """
997
  output = {}
998

    
999
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1000
  if iinfo is not None:
1001
    output["memory"] = iinfo[2]
1002
    output["vcpus"] = iinfo[3]
1003
    output["state"] = iinfo[4]
1004
    output["time"] = iinfo[5]
1005

    
1006
  return output
1007

    
1008

    
1009
def GetInstanceMigratable(instance):
1010
  """Gives whether an instance can be migrated.
1011

1012
  @type instance: L{objects.Instance}
1013
  @param instance: object representing the instance to be checked.
1014

1015
  @rtype: tuple
1016
  @return: tuple of (result, description) where:
1017
      - result: whether the instance can be migrated or not
1018
      - description: a description of the issue, if relevant
1019

1020
  """
1021
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1022
  iname = instance.name
1023
  if iname not in hyper.ListInstances():
1024
    _Fail("Instance %s is not running", iname)
1025

    
1026
  for idx in range(len(instance.disks)):
1027
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1028
    if not os.path.islink(link_name):
1029
      logging.warning("Instance %s is missing symlink %s for disk %d",
1030
                      iname, link_name, idx)
1031

    
1032

    
1033
def GetAllInstancesInfo(hypervisor_list):
1034
  """Gather data about all instances.
1035

1036
  This is the equivalent of L{GetInstanceInfo}, except that it
1037
  computes data for all instances at once, thus being faster if one
1038
  needs data about more than one instance.
1039

1040
  @type hypervisor_list: list
1041
  @param hypervisor_list: list of hypervisors to query for instance data
1042

1043
  @rtype: dict
1044
  @return: dictionary of instance: data, with data having the following keys:
1045
      - memory: memory size of instance (int)
1046
      - state: xen state of instance (string)
1047
      - time: cpu time of instance (float)
1048
      - vcpus: the number of vcpus
1049

1050
  """
1051
  output = {}
1052

    
1053
  for hname in hypervisor_list:
1054
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1055
    if iinfo:
1056
      for name, _, memory, vcpus, state, times in iinfo:
1057
        value = {
1058
          "memory": memory,
1059
          "vcpus": vcpus,
1060
          "state": state,
1061
          "time": times,
1062
          }
1063
        if name in output:
1064
          # we only check static parameters, like memory and vcpus,
1065
          # and not state and time which can change between the
1066
          # invocations of the different hypervisors
1067
          for key in "memory", "vcpus":
1068
            if value[key] != output[name][key]:
1069
              _Fail("Instance %s is running twice"
1070
                    " with different parameters", name)
1071
        output[name] = value
1072

    
1073
  return output
1074

    
1075

    
1076
def _InstanceLogName(kind, os_name, instance, component):
1077
  """Compute the OS log filename for a given instance and operation.
1078

1079
  The instance name and os name are passed in as strings since not all
1080
  operations have these as part of an instance object.
1081

1082
  @type kind: string
1083
  @param kind: the operation type (e.g. add, import, etc.)
1084
  @type os_name: string
1085
  @param os_name: the os name
1086
  @type instance: string
1087
  @param instance: the name of the instance being imported/added/etc.
1088
  @type component: string or None
1089
  @param component: the name of the component of the instance being
1090
      transferred
1091

1092
  """
1093
  # TODO: Use tempfile.mkstemp to create unique filename
1094
  if component:
1095
    assert "/" not in component
1096
    c_msg = "-%s" % component
1097
  else:
1098
    c_msg = ""
1099
  base = ("%s-%s-%s%s-%s.log" %
1100
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1101
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1102

    
1103

    
1104
def InstanceOsAdd(instance, reinstall, debug):
1105
  """Add an OS to an instance.
1106

1107
  @type instance: L{objects.Instance}
1108
  @param instance: Instance whose OS is to be installed
1109
  @type reinstall: boolean
1110
  @param reinstall: whether this is an instance reinstall
1111
  @type debug: integer
1112
  @param debug: debug level, passed to the OS scripts
1113
  @rtype: None
1114

1115
  """
1116
  inst_os = OSFromDisk(instance.os)
1117

    
1118
  create_env = OSEnvironment(instance, inst_os, debug)
1119
  if reinstall:
1120
    create_env["INSTANCE_REINSTALL"] = "1"
1121

    
1122
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1123

    
1124
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1125
                        cwd=inst_os.path, output=logfile, reset_env=True)
1126
  if result.failed:
1127
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1128
                  " output: %s", result.cmd, result.fail_reason, logfile,
1129
                  result.output)
1130
    lines = [utils.SafeEncode(val)
1131
             for val in utils.TailFile(logfile, lines=20)]
1132
    _Fail("OS create script failed (%s), last lines in the"
1133
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1134

    
1135

    
1136
def RunRenameInstance(instance, old_name, debug):
1137
  """Run the OS rename script for an instance.
1138

1139
  @type instance: L{objects.Instance}
1140
  @param instance: Instance whose OS is to be installed
1141
  @type old_name: string
1142
  @param old_name: previous instance name
1143
  @type debug: integer
1144
  @param debug: debug level, passed to the OS scripts
1145
  @rtype: boolean
1146
  @return: the success of the operation
1147

1148
  """
1149
  inst_os = OSFromDisk(instance.os)
1150

    
1151
  rename_env = OSEnvironment(instance, inst_os, debug)
1152
  rename_env["OLD_INSTANCE_NAME"] = old_name
1153

    
1154
  logfile = _InstanceLogName("rename", instance.os,
1155
                             "%s-%s" % (old_name, instance.name), None)
1156

    
1157
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1158
                        cwd=inst_os.path, output=logfile, reset_env=True)
1159

    
1160
  if result.failed:
1161
    logging.error("os create command '%s' returned error: %s output: %s",
1162
                  result.cmd, result.fail_reason, result.output)
1163
    lines = [utils.SafeEncode(val)
1164
             for val in utils.TailFile(logfile, lines=20)]
1165
    _Fail("OS rename script failed (%s), last lines in the"
1166
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1167

    
1168

    
1169
def _GetBlockDevSymlinkPath(instance_name, idx):
1170
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1171
                        (instance_name, constants.DISK_SEPARATOR, idx))
1172

    
1173

    
1174
def _SymlinkBlockDev(instance_name, device_path, idx):
1175
  """Set up symlinks to a instance's block device.
1176

1177
  This is an auxiliary function run when an instance is start (on the primary
1178
  node) or when an instance is migrated (on the target node).
1179

1180

1181
  @param instance_name: the name of the target instance
1182
  @param device_path: path of the physical block device, on the node
1183
  @param idx: the disk index
1184
  @return: absolute path to the disk's symlink
1185

1186
  """
1187
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1188
  try:
1189
    os.symlink(device_path, link_name)
1190
  except OSError, err:
1191
    if err.errno == errno.EEXIST:
1192
      if (not os.path.islink(link_name) or
1193
          os.readlink(link_name) != device_path):
1194
        os.remove(link_name)
1195
        os.symlink(device_path, link_name)
1196
    else:
1197
      raise
1198

    
1199
  return link_name
1200

    
1201

    
1202
def _RemoveBlockDevLinks(instance_name, disks):
1203
  """Remove the block device symlinks belonging to the given instance.
1204

1205
  """
1206
  for idx, _ in enumerate(disks):
1207
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1208
    if os.path.islink(link_name):
1209
      try:
1210
        os.remove(link_name)
1211
      except OSError:
1212
        logging.exception("Can't remove symlink '%s'", link_name)
1213

    
1214

    
1215
def _GatherAndLinkBlockDevs(instance):
1216
  """Set up an instance's block device(s).
1217

1218
  This is run on the primary node at instance startup. The block
1219
  devices must be already assembled.
1220

1221
  @type instance: L{objects.Instance}
1222
  @param instance: the instance whose disks we shoul assemble
1223
  @rtype: list
1224
  @return: list of (disk_object, device_path)
1225

1226
  """
1227
  block_devices = []
1228
  for idx, disk in enumerate(instance.disks):
1229
    device = _RecursiveFindBD(disk)
1230
    if device is None:
1231
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1232
                                    str(disk))
1233
    device.Open()
1234
    try:
1235
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1236
    except OSError, e:
1237
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1238
                                    e.strerror)
1239

    
1240
    block_devices.append((disk, link_name))
1241

    
1242
  return block_devices
1243

    
1244

    
1245
def StartInstance(instance, startup_paused):
1246
  """Start an instance.
1247

1248
  @type instance: L{objects.Instance}
1249
  @param instance: the instance object
1250
  @type startup_paused: bool
1251
  @param instance: pause instance at startup?
1252
  @rtype: None
1253

1254
  """
1255
  running_instances = GetInstanceList([instance.hypervisor])
1256

    
1257
  if instance.name in running_instances:
1258
    logging.info("Instance %s already running, not starting", instance.name)
1259
    return
1260

    
1261
  try:
1262
    block_devices = _GatherAndLinkBlockDevs(instance)
1263
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1264
    hyper.StartInstance(instance, block_devices, startup_paused)
1265
  except errors.BlockDeviceError, err:
1266
    _Fail("Block device error: %s", err, exc=True)
1267
  except errors.HypervisorError, err:
1268
    _RemoveBlockDevLinks(instance.name, instance.disks)
1269
    _Fail("Hypervisor error: %s", err, exc=True)
1270

    
1271

    
1272
def InstanceShutdown(instance, timeout):
1273
  """Shut an instance down.
1274

1275
  @note: this functions uses polling with a hardcoded timeout.
1276

1277
  @type instance: L{objects.Instance}
1278
  @param instance: the instance object
1279
  @type timeout: integer
1280
  @param timeout: maximum timeout for soft shutdown
1281
  @rtype: None
1282

1283
  """
1284
  hv_name = instance.hypervisor
1285
  hyper = hypervisor.GetHypervisor(hv_name)
1286
  iname = instance.name
1287

    
1288
  if instance.name not in hyper.ListInstances():
1289
    logging.info("Instance %s not running, doing nothing", iname)
1290
    return
1291

    
1292
  class _TryShutdown:
1293
    def __init__(self):
1294
      self.tried_once = False
1295

    
1296
    def __call__(self):
1297
      if iname not in hyper.ListInstances():
1298
        return
1299

    
1300
      try:
1301
        hyper.StopInstance(instance, retry=self.tried_once)
1302
      except errors.HypervisorError, err:
1303
        if iname not in hyper.ListInstances():
1304
          # if the instance is no longer existing, consider this a
1305
          # success and go to cleanup
1306
          return
1307

    
1308
        _Fail("Failed to stop instance %s: %s", iname, err)
1309

    
1310
      self.tried_once = True
1311

    
1312
      raise utils.RetryAgain()
1313

    
1314
  try:
1315
    utils.Retry(_TryShutdown(), 5, timeout)
1316
  except utils.RetryTimeout:
1317
    # the shutdown did not succeed
1318
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1319

    
1320
    try:
1321
      hyper.StopInstance(instance, force=True)
1322
    except errors.HypervisorError, err:
1323
      if iname in hyper.ListInstances():
1324
        # only raise an error if the instance still exists, otherwise
1325
        # the error could simply be "instance ... unknown"!
1326
        _Fail("Failed to force stop instance %s: %s", iname, err)
1327

    
1328
    time.sleep(1)
1329

    
1330
    if iname in hyper.ListInstances():
1331
      _Fail("Could not shutdown instance %s even by destroy", iname)
1332

    
1333
  try:
1334
    hyper.CleanupInstance(instance.name)
1335
  except errors.HypervisorError, err:
1336
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1337

    
1338
  _RemoveBlockDevLinks(iname, instance.disks)
1339

    
1340

    
1341
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1342
  """Reboot an instance.
1343

1344
  @type instance: L{objects.Instance}
1345
  @param instance: the instance object to reboot
1346
  @type reboot_type: str
1347
  @param reboot_type: the type of reboot, one the following
1348
    constants:
1349
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1350
        instance OS, do not recreate the VM
1351
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1352
        restart the VM (at the hypervisor level)
1353
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1354
        not accepted here, since that mode is handled differently, in
1355
        cmdlib, and translates into full stop and start of the
1356
        instance (instead of a call_instance_reboot RPC)
1357
  @type shutdown_timeout: integer
1358
  @param shutdown_timeout: maximum timeout for soft shutdown
1359
  @rtype: None
1360

1361
  """
1362
  running_instances = GetInstanceList([instance.hypervisor])
1363

    
1364
  if instance.name not in running_instances:
1365
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1366

    
1367
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1368
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1369
    try:
1370
      hyper.RebootInstance(instance)
1371
    except errors.HypervisorError, err:
1372
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1373
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1374
    try:
1375
      InstanceShutdown(instance, shutdown_timeout)
1376
      return StartInstance(instance, False)
1377
    except errors.HypervisorError, err:
1378
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1379
  else:
1380
    _Fail("Invalid reboot_type received: %s", reboot_type)
1381

    
1382

    
1383
def InstanceBalloonMemory(instance, memory):
1384
  """Resize an instance's memory.
1385

1386
  @type instance: L{objects.Instance}
1387
  @param instance: the instance object
1388
  @type memory: int
1389
  @param memory: new memory amount in MB
1390
  @rtype: None
1391

1392
  """
1393
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1394
  running = hyper.ListInstances()
1395
  if instance.name not in running:
1396
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1397
    return
1398
  try:
1399
    hyper.BalloonInstanceMemory(instance, memory)
1400
  except errors.HypervisorError, err:
1401
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1402

    
1403

    
1404
def MigrationInfo(instance):
1405
  """Gather information about an instance to be migrated.
1406

1407
  @type instance: L{objects.Instance}
1408
  @param instance: the instance definition
1409

1410
  """
1411
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1412
  try:
1413
    info = hyper.MigrationInfo(instance)
1414
  except errors.HypervisorError, err:
1415
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1416
  return info
1417

    
1418

    
1419
def AcceptInstance(instance, info, target):
1420
  """Prepare the node to accept an instance.
1421

1422
  @type instance: L{objects.Instance}
1423
  @param instance: the instance definition
1424
  @type info: string/data (opaque)
1425
  @param info: migration information, from the source node
1426
  @type target: string
1427
  @param target: target host (usually ip), on this node
1428

1429
  """
1430
  # TODO: why is this required only for DTS_EXT_MIRROR?
1431
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1432
    # Create the symlinks, as the disks are not active
1433
    # in any way
1434
    try:
1435
      _GatherAndLinkBlockDevs(instance)
1436
    except errors.BlockDeviceError, err:
1437
      _Fail("Block device error: %s", err, exc=True)
1438

    
1439
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1440
  try:
1441
    hyper.AcceptInstance(instance, info, target)
1442
  except errors.HypervisorError, err:
1443
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1444
      _RemoveBlockDevLinks(instance.name, instance.disks)
1445
    _Fail("Failed to accept instance: %s", err, exc=True)
1446

    
1447

    
1448
def FinalizeMigrationDst(instance, info, success):
1449
  """Finalize any preparation to accept an instance.
1450

1451
  @type instance: L{objects.Instance}
1452
  @param instance: the instance definition
1453
  @type info: string/data (opaque)
1454
  @param info: migration information, from the source node
1455
  @type success: boolean
1456
  @param success: whether the migration was a success or a failure
1457

1458
  """
1459
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1460
  try:
1461
    hyper.FinalizeMigrationDst(instance, info, success)
1462
  except errors.HypervisorError, err:
1463
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1464

    
1465

    
1466
def MigrateInstance(instance, target, live):
1467
  """Migrates an instance to another node.
1468

1469
  @type instance: L{objects.Instance}
1470
  @param instance: the instance definition
1471
  @type target: string
1472
  @param target: the target node name
1473
  @type live: boolean
1474
  @param live: whether the migration should be done live or not (the
1475
      interpretation of this parameter is left to the hypervisor)
1476
  @raise RPCFail: if migration fails for some reason
1477

1478
  """
1479
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1480

    
1481
  try:
1482
    hyper.MigrateInstance(instance, target, live)
1483
  except errors.HypervisorError, err:
1484
    _Fail("Failed to migrate instance: %s", err, exc=True)
1485

    
1486

    
1487
def FinalizeMigrationSource(instance, success, live):
1488
  """Finalize the instance migration on the source node.
1489

1490
  @type instance: L{objects.Instance}
1491
  @param instance: the instance definition of the migrated instance
1492
  @type success: bool
1493
  @param success: whether the migration succeeded or not
1494
  @type live: bool
1495
  @param live: whether the user requested a live migration or not
1496
  @raise RPCFail: If the execution fails for some reason
1497

1498
  """
1499
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1500

    
1501
  try:
1502
    hyper.FinalizeMigrationSource(instance, success, live)
1503
  except Exception, err:  # pylint: disable=W0703
1504
    _Fail("Failed to finalize the migration on the source node: %s", err,
1505
          exc=True)
1506

    
1507

    
1508
def GetMigrationStatus(instance):
1509
  """Get the migration status
1510

1511
  @type instance: L{objects.Instance}
1512
  @param instance: the instance that is being migrated
1513
  @rtype: L{objects.MigrationStatus}
1514
  @return: the status of the current migration (one of
1515
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1516
           progress info that can be retrieved from the hypervisor
1517
  @raise RPCFail: If the migration status cannot be retrieved
1518

1519
  """
1520
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1521
  try:
1522
    return hyper.GetMigrationStatus(instance)
1523
  except Exception, err:  # pylint: disable=W0703
1524
    _Fail("Failed to get migration status: %s", err, exc=True)
1525

    
1526

    
1527
def BlockdevCreate(disk, size, owner, on_primary, info):
1528
  """Creates a block device for an instance.
1529

1530
  @type disk: L{objects.Disk}
1531
  @param disk: the object describing the disk we should create
1532
  @type size: int
1533
  @param size: the size of the physical underlying device, in MiB
1534
  @type owner: str
1535
  @param owner: the name of the instance for which disk is created,
1536
      used for device cache data
1537
  @type on_primary: boolean
1538
  @param on_primary:  indicates if it is the primary node or not
1539
  @type info: string
1540
  @param info: string that will be sent to the physical device
1541
      creation, used for example to set (LVM) tags on LVs
1542

1543
  @return: the new unique_id of the device (this can sometime be
1544
      computed only after creation), or None. On secondary nodes,
1545
      it's not required to return anything.
1546

1547
  """
1548
  # TODO: remove the obsolete "size" argument
1549
  # pylint: disable=W0613
1550
  clist = []
1551
  if disk.children:
1552
    for child in disk.children:
1553
      try:
1554
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1555
      except errors.BlockDeviceError, err:
1556
        _Fail("Can't assemble device %s: %s", child, err)
1557
      if on_primary or disk.AssembleOnSecondary():
1558
        # we need the children open in case the device itself has to
1559
        # be assembled
1560
        try:
1561
          # pylint: disable=E1103
1562
          crdev.Open()
1563
        except errors.BlockDeviceError, err:
1564
          _Fail("Can't make child '%s' read-write: %s", child, err)
1565
      clist.append(crdev)
1566

    
1567
  try:
1568
    device = bdev.Create(disk, clist)
1569
  except errors.BlockDeviceError, err:
1570
    _Fail("Can't create block device: %s", err)
1571

    
1572
  if on_primary or disk.AssembleOnSecondary():
1573
    try:
1574
      device.Assemble()
1575
    except errors.BlockDeviceError, err:
1576
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1577
    if on_primary or disk.OpenOnSecondary():
1578
      try:
1579
        device.Open(force=True)
1580
      except errors.BlockDeviceError, err:
1581
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1582
    DevCacheManager.UpdateCache(device.dev_path, owner,
1583
                                on_primary, disk.iv_name)
1584

    
1585
  device.SetInfo(info)
1586

    
1587
  return device.unique_id
1588

    
1589

    
1590
def _WipeDevice(path, offset, size):
1591
  """This function actually wipes the device.
1592

1593
  @param path: The path to the device to wipe
1594
  @param offset: The offset in MiB in the file
1595
  @param size: The size in MiB to write
1596

1597
  """
1598
  # Internal sizes are always in Mebibytes; if the following "dd" command
1599
  # should use a different block size the offset and size given to this
1600
  # function must be adjusted accordingly before being passed to "dd".
1601
  block_size = 1024 * 1024
1602

    
1603
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1604
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1605
         "count=%d" % size]
1606
  result = utils.RunCmd(cmd)
1607

    
1608
  if result.failed:
1609
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1610
          result.fail_reason, result.output)
1611

    
1612

    
1613
def BlockdevWipe(disk, offset, size):
1614
  """Wipes a block device.
1615

1616
  @type disk: L{objects.Disk}
1617
  @param disk: the disk object we want to wipe
1618
  @type offset: int
1619
  @param offset: The offset in MiB in the file
1620
  @type size: int
1621
  @param size: The size in MiB to write
1622

1623
  """
1624
  try:
1625
    rdev = _RecursiveFindBD(disk)
1626
  except errors.BlockDeviceError:
1627
    rdev = None
1628

    
1629
  if not rdev:
1630
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1631

    
1632
  # Do cross verify some of the parameters
1633
  if offset < 0:
1634
    _Fail("Negative offset")
1635
  if size < 0:
1636
    _Fail("Negative size")
1637
  if offset > rdev.size:
1638
    _Fail("Offset is bigger than device size")
1639
  if (offset + size) > rdev.size:
1640
    _Fail("The provided offset and size to wipe is bigger than device size")
1641

    
1642
  _WipeDevice(rdev.dev_path, offset, size)
1643

    
1644

    
1645
def BlockdevPauseResumeSync(disks, pause):
1646
  """Pause or resume the sync of the block device.
1647

1648
  @type disks: list of L{objects.Disk}
1649
  @param disks: the disks object we want to pause/resume
1650
  @type pause: bool
1651
  @param pause: Wheater to pause or resume
1652

1653
  """
1654
  success = []
1655
  for disk in disks:
1656
    try:
1657
      rdev = _RecursiveFindBD(disk)
1658
    except errors.BlockDeviceError:
1659
      rdev = None
1660

    
1661
    if not rdev:
1662
      success.append((False, ("Cannot change sync for device %s:"
1663
                              " device not found" % disk.iv_name)))
1664
      continue
1665

    
1666
    result = rdev.PauseResumeSync(pause)
1667

    
1668
    if result:
1669
      success.append((result, None))
1670
    else:
1671
      if pause:
1672
        msg = "Pause"
1673
      else:
1674
        msg = "Resume"
1675
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1676

    
1677
  return success
1678

    
1679

    
1680
def BlockdevRemove(disk):
1681
  """Remove a block device.
1682

1683
  @note: This is intended to be called recursively.
1684

1685
  @type disk: L{objects.Disk}
1686
  @param disk: the disk object we should remove
1687
  @rtype: boolean
1688
  @return: the success of the operation
1689

1690
  """
1691
  msgs = []
1692
  try:
1693
    rdev = _RecursiveFindBD(disk)
1694
  except errors.BlockDeviceError, err:
1695
    # probably can't attach
1696
    logging.info("Can't attach to device %s in remove", disk)
1697
    rdev = None
1698
  if rdev is not None:
1699
    r_path = rdev.dev_path
1700
    try:
1701
      rdev.Remove()
1702
    except errors.BlockDeviceError, err:
1703
      msgs.append(str(err))
1704
    if not msgs:
1705
      DevCacheManager.RemoveCache(r_path)
1706

    
1707
  if disk.children:
1708
    for child in disk.children:
1709
      try:
1710
        BlockdevRemove(child)
1711
      except RPCFail, err:
1712
        msgs.append(str(err))
1713

    
1714
  if msgs:
1715
    _Fail("; ".join(msgs))
1716

    
1717

    
1718
def _RecursiveAssembleBD(disk, owner, as_primary):
1719
  """Activate a block device for an instance.
1720

1721
  This is run on the primary and secondary nodes for an instance.
1722

1723
  @note: this function is called recursively.
1724

1725
  @type disk: L{objects.Disk}
1726
  @param disk: the disk we try to assemble
1727
  @type owner: str
1728
  @param owner: the name of the instance which owns the disk
1729
  @type as_primary: boolean
1730
  @param as_primary: if we should make the block device
1731
      read/write
1732

1733
  @return: the assembled device or None (in case no device
1734
      was assembled)
1735
  @raise errors.BlockDeviceError: in case there is an error
1736
      during the activation of the children or the device
1737
      itself
1738

1739
  """
1740
  children = []
1741
  if disk.children:
1742
    mcn = disk.ChildrenNeeded()
1743
    if mcn == -1:
1744
      mcn = 0 # max number of Nones allowed
1745
    else:
1746
      mcn = len(disk.children) - mcn # max number of Nones
1747
    for chld_disk in disk.children:
1748
      try:
1749
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1750
      except errors.BlockDeviceError, err:
1751
        if children.count(None) >= mcn:
1752
          raise
1753
        cdev = None
1754
        logging.error("Error in child activation (but continuing): %s",
1755
                      str(err))
1756
      children.append(cdev)
1757

    
1758
  if as_primary or disk.AssembleOnSecondary():
1759
    r_dev = bdev.Assemble(disk, children)
1760
    result = r_dev
1761
    if as_primary or disk.OpenOnSecondary():
1762
      r_dev.Open()
1763
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1764
                                as_primary, disk.iv_name)
1765

    
1766
  else:
1767
    result = True
1768
  return result
1769

    
1770

    
1771
def BlockdevAssemble(disk, owner, as_primary, idx):
1772
  """Activate a block device for an instance.
1773

1774
  This is a wrapper over _RecursiveAssembleBD.
1775

1776
  @rtype: str or boolean
1777
  @return: a C{/dev/...} path for primary nodes, and
1778
      C{True} for secondary nodes
1779

1780
  """
1781
  try:
1782
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1783
    if isinstance(result, bdev.BlockDev):
1784
      # pylint: disable=E1103
1785
      result = result.dev_path
1786
      if as_primary:
1787
        _SymlinkBlockDev(owner, result, idx)
1788
  except errors.BlockDeviceError, err:
1789
    _Fail("Error while assembling disk: %s", err, exc=True)
1790
  except OSError, err:
1791
    _Fail("Error while symlinking disk: %s", err, exc=True)
1792

    
1793
  return result
1794

    
1795

    
1796
def BlockdevShutdown(disk):
1797
  """Shut down a block device.
1798

1799
  First, if the device is assembled (Attach() is successful), then
1800
  the device is shutdown. Then the children of the device are
1801
  shutdown.
1802

1803
  This function is called recursively. Note that we don't cache the
1804
  children or such, as oppossed to assemble, shutdown of different
1805
  devices doesn't require that the upper device was active.
1806

1807
  @type disk: L{objects.Disk}
1808
  @param disk: the description of the disk we should
1809
      shutdown
1810
  @rtype: None
1811

1812
  """
1813
  msgs = []
1814
  r_dev = _RecursiveFindBD(disk)
1815
  if r_dev is not None:
1816
    r_path = r_dev.dev_path
1817
    try:
1818
      r_dev.Shutdown()
1819
      DevCacheManager.RemoveCache(r_path)
1820
    except errors.BlockDeviceError, err:
1821
      msgs.append(str(err))
1822

    
1823
  if disk.children:
1824
    for child in disk.children:
1825
      try:
1826
        BlockdevShutdown(child)
1827
      except RPCFail, err:
1828
        msgs.append(str(err))
1829

    
1830
  if msgs:
1831
    _Fail("; ".join(msgs))
1832

    
1833

    
1834
def BlockdevAddchildren(parent_cdev, new_cdevs):
1835
  """Extend a mirrored block device.
1836

1837
  @type parent_cdev: L{objects.Disk}
1838
  @param parent_cdev: the disk to which we should add children
1839
  @type new_cdevs: list of L{objects.Disk}
1840
  @param new_cdevs: the list of children which we should add
1841
  @rtype: None
1842

1843
  """
1844
  parent_bdev = _RecursiveFindBD(parent_cdev)
1845
  if parent_bdev is None:
1846
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1847
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1848
  if new_bdevs.count(None) > 0:
1849
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1850
  parent_bdev.AddChildren(new_bdevs)
1851

    
1852

    
1853
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1854
  """Shrink a mirrored block device.
1855

1856
  @type parent_cdev: L{objects.Disk}
1857
  @param parent_cdev: the disk from which we should remove children
1858
  @type new_cdevs: list of L{objects.Disk}
1859
  @param new_cdevs: the list of children which we should remove
1860
  @rtype: None
1861

1862
  """
1863
  parent_bdev = _RecursiveFindBD(parent_cdev)
1864
  if parent_bdev is None:
1865
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1866
  devs = []
1867
  for disk in new_cdevs:
1868
    rpath = disk.StaticDevPath()
1869
    if rpath is None:
1870
      bd = _RecursiveFindBD(disk)
1871
      if bd is None:
1872
        _Fail("Can't find device %s while removing children", disk)
1873
      else:
1874
        devs.append(bd.dev_path)
1875
    else:
1876
      if not utils.IsNormAbsPath(rpath):
1877
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1878
      devs.append(rpath)
1879
  parent_bdev.RemoveChildren(devs)
1880

    
1881

    
1882
def BlockdevGetmirrorstatus(disks):
1883
  """Get the mirroring status of a list of devices.
1884

1885
  @type disks: list of L{objects.Disk}
1886
  @param disks: the list of disks which we should query
1887
  @rtype: disk
1888
  @return: List of L{objects.BlockDevStatus}, one for each disk
1889
  @raise errors.BlockDeviceError: if any of the disks cannot be
1890
      found
1891

1892
  """
1893
  stats = []
1894
  for dsk in disks:
1895
    rbd = _RecursiveFindBD(dsk)
1896
    if rbd is None:
1897
      _Fail("Can't find device %s", dsk)
1898

    
1899
    stats.append(rbd.CombinedSyncStatus())
1900

    
1901
  return stats
1902

    
1903

    
1904
def BlockdevGetmirrorstatusMulti(disks):
1905
  """Get the mirroring status of a list of devices.
1906

1907
  @type disks: list of L{objects.Disk}
1908
  @param disks: the list of disks which we should query
1909
  @rtype: disk
1910
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1911
    success/failure, status is L{objects.BlockDevStatus} on success, string
1912
    otherwise
1913

1914
  """
1915
  result = []
1916
  for disk in disks:
1917
    try:
1918
      rbd = _RecursiveFindBD(disk)
1919
      if rbd is None:
1920
        result.append((False, "Can't find device %s" % disk))
1921
        continue
1922

    
1923
      status = rbd.CombinedSyncStatus()
1924
    except errors.BlockDeviceError, err:
1925
      logging.exception("Error while getting disk status")
1926
      result.append((False, str(err)))
1927
    else:
1928
      result.append((True, status))
1929

    
1930
  assert len(disks) == len(result)
1931

    
1932
  return result
1933

    
1934

    
1935
def _RecursiveFindBD(disk):
1936
  """Check if a device is activated.
1937

1938
  If so, return information about the real device.
1939

1940
  @type disk: L{objects.Disk}
1941
  @param disk: the disk object we need to find
1942

1943
  @return: None if the device can't be found,
1944
      otherwise the device instance
1945

1946
  """
1947
  children = []
1948
  if disk.children:
1949
    for chdisk in disk.children:
1950
      children.append(_RecursiveFindBD(chdisk))
1951

    
1952
  return bdev.FindDevice(disk, children)
1953

    
1954

    
1955
def _OpenRealBD(disk):
1956
  """Opens the underlying block device of a disk.
1957

1958
  @type disk: L{objects.Disk}
1959
  @param disk: the disk object we want to open
1960

1961
  """
1962
  real_disk = _RecursiveFindBD(disk)
1963
  if real_disk is None:
1964
    _Fail("Block device '%s' is not set up", disk)
1965

    
1966
  real_disk.Open()
1967

    
1968
  return real_disk
1969

    
1970

    
1971
def BlockdevFind(disk):
1972
  """Check if a device is activated.
1973

1974
  If it is, return information about the real device.
1975

1976
  @type disk: L{objects.Disk}
1977
  @param disk: the disk to find
1978
  @rtype: None or objects.BlockDevStatus
1979
  @return: None if the disk cannot be found, otherwise a the current
1980
           information
1981

1982
  """
1983
  try:
1984
    rbd = _RecursiveFindBD(disk)
1985
  except errors.BlockDeviceError, err:
1986
    _Fail("Failed to find device: %s", err, exc=True)
1987

    
1988
  if rbd is None:
1989
    return None
1990

    
1991
  return rbd.GetSyncStatus()
1992

    
1993

    
1994
def BlockdevGetsize(disks):
1995
  """Computes the size of the given disks.
1996

1997
  If a disk is not found, returns None instead.
1998

1999
  @type disks: list of L{objects.Disk}
2000
  @param disks: the list of disk to compute the size for
2001
  @rtype: list
2002
  @return: list with elements None if the disk cannot be found,
2003
      otherwise the size
2004

2005
  """
2006
  result = []
2007
  for cf in disks:
2008
    try:
2009
      rbd = _RecursiveFindBD(cf)
2010
    except errors.BlockDeviceError:
2011
      result.append(None)
2012
      continue
2013
    if rbd is None:
2014
      result.append(None)
2015
    else:
2016
      result.append(rbd.GetActualSize())
2017
  return result
2018

    
2019

    
2020
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2021
  """Export a block device to a remote node.
2022

2023
  @type disk: L{objects.Disk}
2024
  @param disk: the description of the disk to export
2025
  @type dest_node: str
2026
  @param dest_node: the destination node to export to
2027
  @type dest_path: str
2028
  @param dest_path: the destination path on the target node
2029
  @type cluster_name: str
2030
  @param cluster_name: the cluster name, needed for SSH hostalias
2031
  @rtype: None
2032

2033
  """
2034
  real_disk = _OpenRealBD(disk)
2035

    
2036
  # the block size on the read dd is 1MiB to match our units
2037
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2038
                               "dd if=%s bs=1048576 count=%s",
2039
                               real_disk.dev_path, str(disk.size))
2040

    
2041
  # we set here a smaller block size as, due to ssh buffering, more
2042
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2043
  # device is not already there or we pass a wrong path; we use
2044
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2045
  # to not buffer too much memory; this means that at best, we flush
2046
  # every 64k, which will not be very fast
2047
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2048
                                " oflag=dsync", dest_path)
2049

    
2050
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2051
                                                   constants.SSH_LOGIN_USER,
2052
                                                   destcmd)
2053

    
2054
  # all commands have been checked, so we're safe to combine them
2055
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2056

    
2057
  result = utils.RunCmd(["bash", "-c", command])
2058

    
2059
  if result.failed:
2060
    _Fail("Disk copy command '%s' returned error: %s"
2061
          " output: %s", command, result.fail_reason, result.output)
2062

    
2063

    
2064
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2065
  """Write a file to the filesystem.
2066

2067
  This allows the master to overwrite(!) a file. It will only perform
2068
  the operation if the file belongs to a list of configuration files.
2069

2070
  @type file_name: str
2071
  @param file_name: the target file name
2072
  @type data: str
2073
  @param data: the new contents of the file
2074
  @type mode: int
2075
  @param mode: the mode to give the file (can be None)
2076
  @type uid: string
2077
  @param uid: the owner of the file
2078
  @type gid: string
2079
  @param gid: the group of the file
2080
  @type atime: float
2081
  @param atime: the atime to set on the file (can be None)
2082
  @type mtime: float
2083
  @param mtime: the mtime to set on the file (can be None)
2084
  @rtype: None
2085

2086
  """
2087
  file_name = vcluster.LocalizeVirtualPath(file_name)
2088

    
2089
  if not os.path.isabs(file_name):
2090
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2091

    
2092
  if file_name not in _ALLOWED_UPLOAD_FILES:
2093
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2094
          file_name)
2095

    
2096
  raw_data = _Decompress(data)
2097

    
2098
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2099
    _Fail("Invalid username/groupname type")
2100

    
2101
  getents = runtime.GetEnts()
2102
  uid = getents.LookupUser(uid)
2103
  gid = getents.LookupGroup(gid)
2104

    
2105
  utils.SafeWriteFile(file_name, None,
2106
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2107
                      atime=atime, mtime=mtime)
2108

    
2109

    
2110
def RunOob(oob_program, command, node, timeout):
2111
  """Executes oob_program with given command on given node.
2112

2113
  @param oob_program: The path to the executable oob_program
2114
  @param command: The command to invoke on oob_program
2115
  @param node: The node given as an argument to the program
2116
  @param timeout: Timeout after which we kill the oob program
2117

2118
  @return: stdout
2119
  @raise RPCFail: If execution fails for some reason
2120

2121
  """
2122
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2123

    
2124
  if result.failed:
2125
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2126
          result.fail_reason, result.output)
2127

    
2128
  return result.stdout
2129

    
2130

    
2131
def WriteSsconfFiles(values):
2132
  """Update all ssconf files.
2133

2134
  Wrapper around the SimpleStore.WriteFiles.
2135

2136
  """
2137
  ssconf.SimpleStore().WriteFiles(values)
2138

    
2139

    
2140
def _OSOndiskAPIVersion(os_dir):
2141
  """Compute and return the API version of a given OS.
2142

2143
  This function will try to read the API version of the OS residing in
2144
  the 'os_dir' directory.
2145

2146
  @type os_dir: str
2147
  @param os_dir: the directory in which we should look for the OS
2148
  @rtype: tuple
2149
  @return: tuple (status, data) with status denoting the validity and
2150
      data holding either the vaid versions or an error message
2151

2152
  """
2153
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2154

    
2155
  try:
2156
    st = os.stat(api_file)
2157
  except EnvironmentError, err:
2158
    return False, ("Required file '%s' not found under path %s: %s" %
2159
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2160

    
2161
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2162
    return False, ("File '%s' in %s is not a regular file" %
2163
                   (constants.OS_API_FILE, os_dir))
2164

    
2165
  try:
2166
    api_versions = utils.ReadFile(api_file).splitlines()
2167
  except EnvironmentError, err:
2168
    return False, ("Error while reading the API version file at %s: %s" %
2169
                   (api_file, utils.ErrnoOrStr(err)))
2170

    
2171
  try:
2172
    api_versions = [int(version.strip()) for version in api_versions]
2173
  except (TypeError, ValueError), err:
2174
    return False, ("API version(s) can't be converted to integer: %s" %
2175
                   str(err))
2176

    
2177
  return True, api_versions
2178

    
2179

    
2180
def DiagnoseOS(top_dirs=None):
2181
  """Compute the validity for all OSes.
2182

2183
  @type top_dirs: list
2184
  @param top_dirs: the list of directories in which to
2185
      search (if not given defaults to
2186
      L{pathutils.OS_SEARCH_PATH})
2187
  @rtype: list of L{objects.OS}
2188
  @return: a list of tuples (name, path, status, diagnose, variants,
2189
      parameters, api_version) for all (potential) OSes under all
2190
      search paths, where:
2191
          - name is the (potential) OS name
2192
          - path is the full path to the OS
2193
          - status True/False is the validity of the OS
2194
          - diagnose is the error message for an invalid OS, otherwise empty
2195
          - variants is a list of supported OS variants, if any
2196
          - parameters is a list of (name, help) parameters, if any
2197
          - api_version is a list of support OS API versions
2198

2199
  """
2200
  if top_dirs is None:
2201
    top_dirs = pathutils.OS_SEARCH_PATH
2202

    
2203
  result = []
2204
  for dir_name in top_dirs:
2205
    if os.path.isdir(dir_name):
2206
      try:
2207
        f_names = utils.ListVisibleFiles(dir_name)
2208
      except EnvironmentError, err:
2209
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2210
        break
2211
      for name in f_names:
2212
        os_path = utils.PathJoin(dir_name, name)
2213
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2214
        if status:
2215
          diagnose = ""
2216
          variants = os_inst.supported_variants
2217
          parameters = os_inst.supported_parameters
2218
          api_versions = os_inst.api_versions
2219
        else:
2220
          diagnose = os_inst
2221
          variants = parameters = api_versions = []
2222
        result.append((name, os_path, status, diagnose, variants,
2223
                       parameters, api_versions))
2224

    
2225
  return result
2226

    
2227

    
2228
def _TryOSFromDisk(name, base_dir=None):
2229
  """Create an OS instance from disk.
2230

2231
  This function will return an OS instance if the given name is a
2232
  valid OS name.
2233

2234
  @type base_dir: string
2235
  @keyword base_dir: Base directory containing OS installations.
2236
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2237
  @rtype: tuple
2238
  @return: success and either the OS instance if we find a valid one,
2239
      or error message
2240

2241
  """
2242
  if base_dir is None:
2243
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2244
  else:
2245
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2246

    
2247
  if os_dir is None:
2248
    return False, "Directory for OS %s not found in search path" % name
2249

    
2250
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2251
  if not status:
2252
    # push the error up
2253
    return status, api_versions
2254

    
2255
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2256
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2257
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2258

    
2259
  # OS Files dictionary, we will populate it with the absolute path
2260
  # names; if the value is True, then it is a required file, otherwise
2261
  # an optional one
2262
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2263

    
2264
  if max(api_versions) >= constants.OS_API_V15:
2265
    os_files[constants.OS_VARIANTS_FILE] = False
2266

    
2267
  if max(api_versions) >= constants.OS_API_V20:
2268
    os_files[constants.OS_PARAMETERS_FILE] = True
2269
  else:
2270
    del os_files[constants.OS_SCRIPT_VERIFY]
2271

    
2272
  for (filename, required) in os_files.items():
2273
    os_files[filename] = utils.PathJoin(os_dir, filename)
2274

    
2275
    try:
2276
      st = os.stat(os_files[filename])
2277
    except EnvironmentError, err:
2278
      if err.errno == errno.ENOENT and not required:
2279
        del os_files[filename]
2280
        continue
2281
      return False, ("File '%s' under path '%s' is missing (%s)" %
2282
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2283

    
2284
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2285
      return False, ("File '%s' under path '%s' is not a regular file" %
2286
                     (filename, os_dir))
2287

    
2288
    if filename in constants.OS_SCRIPTS:
2289
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2290
        return False, ("File '%s' under path '%s' is not executable" %
2291
                       (filename, os_dir))
2292

    
2293
  variants = []
2294
  if constants.OS_VARIANTS_FILE in os_files:
2295
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2296
    try:
2297
      variants = utils.ReadFile(variants_file).splitlines()
2298
    except EnvironmentError, err:
2299
      # we accept missing files, but not other errors
2300
      if err.errno != errno.ENOENT:
2301
        return False, ("Error while reading the OS variants file at %s: %s" %
2302
                       (variants_file, utils.ErrnoOrStr(err)))
2303

    
2304
  parameters = []
2305
  if constants.OS_PARAMETERS_FILE in os_files:
2306
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2307
    try:
2308
      parameters = utils.ReadFile(parameters_file).splitlines()
2309
    except EnvironmentError, err:
2310
      return False, ("Error while reading the OS parameters file at %s: %s" %
2311
                     (parameters_file, utils.ErrnoOrStr(err)))
2312
    parameters = [v.split(None, 1) for v in parameters]
2313

    
2314
  os_obj = objects.OS(name=name, path=os_dir,
2315
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2316
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2317
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2318
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2319
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2320
                                                 None),
2321
                      supported_variants=variants,
2322
                      supported_parameters=parameters,
2323
                      api_versions=api_versions)
2324
  return True, os_obj
2325

    
2326

    
2327
def OSFromDisk(name, base_dir=None):
2328
  """Create an OS instance from disk.
2329

2330
  This function will return an OS instance if the given name is a
2331
  valid OS name. Otherwise, it will raise an appropriate
2332
  L{RPCFail} exception, detailing why this is not a valid OS.
2333

2334
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2335
  an exception but returns true/false status data.
2336

2337
  @type base_dir: string
2338
  @keyword base_dir: Base directory containing OS installations.
2339
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2340
  @rtype: L{objects.OS}
2341
  @return: the OS instance if we find a valid one
2342
  @raise RPCFail: if we don't find a valid OS
2343

2344
  """
2345
  name_only = objects.OS.GetName(name)
2346
  status, payload = _TryOSFromDisk(name_only, base_dir)
2347

    
2348
  if not status:
2349
    _Fail(payload)
2350

    
2351
  return payload
2352

    
2353

    
2354
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2355
  """Calculate the basic environment for an os script.
2356

2357
  @type os_name: str
2358
  @param os_name: full operating system name (including variant)
2359
  @type inst_os: L{objects.OS}
2360
  @param inst_os: operating system for which the environment is being built
2361
  @type os_params: dict
2362
  @param os_params: the OS parameters
2363
  @type debug: integer
2364
  @param debug: debug level (0 or 1, for OS Api 10)
2365
  @rtype: dict
2366
  @return: dict of environment variables
2367
  @raise errors.BlockDeviceError: if the block device
2368
      cannot be found
2369

2370
  """
2371
  result = {}
2372
  api_version = \
2373
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2374
  result["OS_API_VERSION"] = "%d" % api_version
2375
  result["OS_NAME"] = inst_os.name
2376
  result["DEBUG_LEVEL"] = "%d" % debug
2377

    
2378
  # OS variants
2379
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2380
    variant = objects.OS.GetVariant(os_name)
2381
    if not variant:
2382
      variant = inst_os.supported_variants[0]
2383
  else:
2384
    variant = ""
2385
  result["OS_VARIANT"] = variant
2386

    
2387
  # OS params
2388
  for pname, pvalue in os_params.items():
2389
    result["OSP_%s" % pname.upper()] = pvalue
2390

    
2391
  # Set a default path otherwise programs called by OS scripts (or
2392
  # even hooks called from OS scripts) might break, and we don't want
2393
  # to have each script require setting a PATH variable
2394
  result["PATH"] = constants.HOOKS_PATH
2395

    
2396
  return result
2397

    
2398

    
2399
def OSEnvironment(instance, inst_os, debug=0):
2400
  """Calculate the environment for an os script.
2401

2402
  @type instance: L{objects.Instance}
2403
  @param instance: target instance for the os script run
2404
  @type inst_os: L{objects.OS}
2405
  @param inst_os: operating system for which the environment is being built
2406
  @type debug: integer
2407
  @param debug: debug level (0 or 1, for OS Api 10)
2408
  @rtype: dict
2409
  @return: dict of environment variables
2410
  @raise errors.BlockDeviceError: if the block device
2411
      cannot be found
2412

2413
  """
2414
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2415

    
2416
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2417
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2418

    
2419
  result["HYPERVISOR"] = instance.hypervisor
2420
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2421
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2422
  result["INSTANCE_SECONDARY_NODES"] = \
2423
      ("%s" % " ".join(instance.secondary_nodes))
2424

    
2425
  # Disks
2426
  for idx, disk in enumerate(instance.disks):
2427
    real_disk = _OpenRealBD(disk)
2428
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2429
    result["DISK_%d_ACCESS" % idx] = disk.mode
2430
    if constants.HV_DISK_TYPE in instance.hvparams:
2431
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2432
        instance.hvparams[constants.HV_DISK_TYPE]
2433
    if disk.dev_type in constants.LDS_BLOCK:
2434
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2435
    elif disk.dev_type == constants.LD_FILE:
2436
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2437
        "file:%s" % disk.physical_id[0]
2438

    
2439
  # NICs
2440
  for idx, nic in enumerate(instance.nics):
2441
    result["NIC_%d_MAC" % idx] = nic.mac
2442
    if nic.ip:
2443
      result["NIC_%d_IP" % idx] = nic.ip
2444
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2445
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2446
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2447
    if nic.nicparams[constants.NIC_LINK]:
2448
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2449
    if constants.HV_NIC_TYPE in instance.hvparams:
2450
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2451
        instance.hvparams[constants.HV_NIC_TYPE]
2452

    
2453
  # HV/BE params
2454
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2455
    for key, value in source.items():
2456
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2457

    
2458
  return result
2459

    
2460

    
2461
def BlockdevGrow(disk, amount, dryrun, backingstore):
2462
  """Grow a stack of block devices.
2463

2464
  This function is called recursively, with the childrens being the
2465
  first ones to resize.
2466

2467
  @type disk: L{objects.Disk}
2468
  @param disk: the disk to be grown
2469
  @type amount: integer
2470
  @param amount: the amount (in mebibytes) to grow with
2471
  @type dryrun: boolean
2472
  @param dryrun: whether to execute the operation in simulation mode
2473
      only, without actually increasing the size
2474
  @param backingstore: whether to execute the operation on backing storage
2475
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2476
      whereas LVM, file, RBD are backing storage
2477
  @rtype: (status, result)
2478
  @return: a tuple with the status of the operation (True/False), and
2479
      the errors message if status is False
2480

2481
  """
2482
  r_dev = _RecursiveFindBD(disk)
2483
  if r_dev is None:
2484
    _Fail("Cannot find block device %s", disk)
2485

    
2486
  try:
2487
    r_dev.Grow(amount, dryrun, backingstore)
2488
  except errors.BlockDeviceError, err:
2489
    _Fail("Failed to grow block device: %s", err, exc=True)
2490

    
2491

    
2492
def BlockdevSnapshot(disk):
2493
  """Create a snapshot copy of a block device.
2494

2495
  This function is called recursively, and the snapshot is actually created
2496
  just for the leaf lvm backend device.
2497

2498
  @type disk: L{objects.Disk}
2499
  @param disk: the disk to be snapshotted
2500
  @rtype: string
2501
  @return: snapshot disk ID as (vg, lv)
2502

2503
  """
2504
  if disk.dev_type == constants.LD_DRBD8:
2505
    if not disk.children:
2506
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2507
            disk.unique_id)
2508
    return BlockdevSnapshot(disk.children[0])
2509
  elif disk.dev_type == constants.LD_LV:
2510
    r_dev = _RecursiveFindBD(disk)
2511
    if r_dev is not None:
2512
      # FIXME: choose a saner value for the snapshot size
2513
      # let's stay on the safe side and ask for the full size, for now
2514
      return r_dev.Snapshot(disk.size)
2515
    else:
2516
      _Fail("Cannot find block device %s", disk)
2517
  else:
2518
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2519
          disk.unique_id, disk.dev_type)
2520

    
2521

    
2522
def FinalizeExport(instance, snap_disks):
2523
  """Write out the export configuration information.
2524

2525
  @type instance: L{objects.Instance}
2526
  @param instance: the instance which we export, used for
2527
      saving configuration
2528
  @type snap_disks: list of L{objects.Disk}
2529
  @param snap_disks: list of snapshot block devices, which
2530
      will be used to get the actual name of the dump file
2531

2532
  @rtype: None
2533

2534
  """
2535
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2536
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2537

    
2538
  config = objects.SerializableConfigParser()
2539

    
2540
  config.add_section(constants.INISECT_EXP)
2541
  config.set(constants.INISECT_EXP, "version", "0")
2542
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2543
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2544
  config.set(constants.INISECT_EXP, "os", instance.os)
2545
  config.set(constants.INISECT_EXP, "compression", "none")
2546

    
2547
  config.add_section(constants.INISECT_INS)
2548
  config.set(constants.INISECT_INS, "name", instance.name)
2549
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2550
             instance.beparams[constants.BE_MAXMEM])
2551
  config.set(constants.INISECT_INS, "minmem", "%d" %
2552
             instance.beparams[constants.BE_MINMEM])
2553
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2554
  config.set(constants.INISECT_INS, "memory", "%d" %
2555
             instance.beparams[constants.BE_MAXMEM])
2556
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2557
             instance.beparams[constants.BE_VCPUS])
2558
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2559
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2560
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2561

    
2562
  nic_total = 0
2563
  for nic_count, nic in enumerate(instance.nics):
2564
    nic_total += 1
2565
    config.set(constants.INISECT_INS, "nic%d_mac" %
2566
               nic_count, "%s" % nic.mac)
2567
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2568
    for param in constants.NICS_PARAMETER_TYPES:
2569
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2570
                 "%s" % nic.nicparams.get(param, None))
2571
  # TODO: redundant: on load can read nics until it doesn't exist
2572
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2573

    
2574
  disk_total = 0
2575
  for disk_count, disk in enumerate(snap_disks):
2576
    if disk:
2577
      disk_total += 1
2578
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2579
                 ("%s" % disk.iv_name))
2580
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2581
                 ("%s" % disk.physical_id[1]))
2582
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2583
                 ("%d" % disk.size))
2584

    
2585
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2586

    
2587
  # New-style hypervisor/backend parameters
2588

    
2589
  config.add_section(constants.INISECT_HYP)
2590
  for name, value in instance.hvparams.items():
2591
    if name not in constants.HVC_GLOBALS:
2592
      config.set(constants.INISECT_HYP, name, str(value))
2593

    
2594
  config.add_section(constants.INISECT_BEP)
2595
  for name, value in instance.beparams.items():
2596
    config.set(constants.INISECT_BEP, name, str(value))
2597

    
2598
  config.add_section(constants.INISECT_OSP)
2599
  for name, value in instance.osparams.items():
2600
    config.set(constants.INISECT_OSP, name, str(value))
2601

    
2602
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2603
                  data=config.Dumps())
2604
  shutil.rmtree(finaldestdir, ignore_errors=True)
2605
  shutil.move(destdir, finaldestdir)
2606

    
2607

    
2608
def ExportInfo(dest):
2609
  """Get export configuration information.
2610

2611
  @type dest: str
2612
  @param dest: directory containing the export
2613

2614
  @rtype: L{objects.SerializableConfigParser}
2615
  @return: a serializable config file containing the
2616
      export info
2617

2618
  """
2619
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2620

    
2621
  config = objects.SerializableConfigParser()
2622
  config.read(cff)
2623

    
2624
  if (not config.has_section(constants.INISECT_EXP) or
2625
      not config.has_section(constants.INISECT_INS)):
2626
    _Fail("Export info file doesn't have the required fields")
2627

    
2628
  return config.Dumps()
2629

    
2630

    
2631
def ListExports():
2632
  """Return a list of exports currently available on this machine.
2633

2634
  @rtype: list
2635
  @return: list of the exports
2636

2637
  """
2638
  if os.path.isdir(pathutils.EXPORT_DIR):
2639
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2640
  else:
2641
    _Fail("No exports directory")
2642

    
2643

    
2644
def RemoveExport(export):
2645
  """Remove an existing export from the node.
2646

2647
  @type export: str
2648
  @param export: the name of the export to remove
2649
  @rtype: None
2650

2651
  """
2652
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2653

    
2654
  try:
2655
    shutil.rmtree(target)
2656
  except EnvironmentError, err:
2657
    _Fail("Error while removing the export: %s", err, exc=True)
2658

    
2659

    
2660
def BlockdevRename(devlist):
2661
  """Rename a list of block devices.
2662

2663
  @type devlist: list of tuples
2664
  @param devlist: list of tuples of the form  (disk,
2665
      new_logical_id, new_physical_id); disk is an
2666
      L{objects.Disk} object describing the current disk,
2667
      and new logical_id/physical_id is the name we
2668
      rename it to
2669
  @rtype: boolean
2670
  @return: True if all renames succeeded, False otherwise
2671

2672
  """
2673
  msgs = []
2674
  result = True
2675
  for disk, unique_id in devlist:
2676
    dev = _RecursiveFindBD(disk)
2677
    if dev is None:
2678
      msgs.append("Can't find device %s in rename" % str(disk))
2679
      result = False
2680
      continue
2681
    try:
2682
      old_rpath = dev.dev_path
2683
      dev.Rename(unique_id)
2684
      new_rpath = dev.dev_path
2685
      if old_rpath != new_rpath:
2686
        DevCacheManager.RemoveCache(old_rpath)
2687
        # FIXME: we should add the new cache information here, like:
2688
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2689
        # but we don't have the owner here - maybe parse from existing
2690
        # cache? for now, we only lose lvm data when we rename, which
2691
        # is less critical than DRBD or MD
2692
    except errors.BlockDeviceError, err:
2693
      msgs.append("Can't rename device '%s' to '%s': %s" %
2694
                  (dev, unique_id, err))
2695
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2696
      result = False
2697
  if not result:
2698
    _Fail("; ".join(msgs))
2699

    
2700

    
2701
def _TransformFileStorageDir(fs_dir):
2702
  """Checks whether given file_storage_dir is valid.
2703

2704
  Checks wheter the given fs_dir is within the cluster-wide default
2705
  file_storage_dir or the shared_file_storage_dir, which are stored in
2706
  SimpleStore. Only paths under those directories are allowed.
2707

2708
  @type fs_dir: str
2709
  @param fs_dir: the path to check
2710

2711
  @return: the normalized path if valid, None otherwise
2712

2713
  """
2714
  if not constants.ENABLE_FILE_STORAGE:
2715
    _Fail("File storage disabled at configure time")
2716
  cfg = _GetConfig()
2717
  fs_dir = os.path.normpath(fs_dir)
2718
  base_fstore = cfg.GetFileStorageDir()
2719
  base_shared = cfg.GetSharedFileStorageDir()
2720
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2721
          utils.IsBelowDir(base_shared, fs_dir)):
2722
    _Fail("File storage directory '%s' is not under base file"
2723
          " storage directory '%s' or shared storage directory '%s'",
2724
          fs_dir, base_fstore, base_shared)
2725
  return fs_dir
2726

    
2727

    
2728
def CreateFileStorageDir(file_storage_dir):
2729
  """Create file storage directory.
2730

2731
  @type file_storage_dir: str
2732
  @param file_storage_dir: directory to create
2733

2734
  @rtype: tuple
2735
  @return: tuple with first element a boolean indicating wheter dir
2736
      creation was successful or not
2737

2738
  """
2739
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2740
  if os.path.exists(file_storage_dir):
2741
    if not os.path.isdir(file_storage_dir):
2742
      _Fail("Specified storage dir '%s' is not a directory",
2743
            file_storage_dir)
2744
  else:
2745
    try:
2746
      os.makedirs(file_storage_dir, 0750)
2747
    except OSError, err:
2748
      _Fail("Cannot create file storage directory '%s': %s",
2749
            file_storage_dir, err, exc=True)
2750

    
2751

    
2752
def RemoveFileStorageDir(file_storage_dir):
2753
  """Remove file storage directory.
2754

2755
  Remove it only if it's empty. If not log an error and return.
2756

2757
  @type file_storage_dir: str
2758
  @param file_storage_dir: the directory we should cleanup
2759
  @rtype: tuple (success,)
2760
  @return: tuple of one element, C{success}, denoting
2761
      whether the operation was successful
2762

2763
  """
2764
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2765
  if os.path.exists(file_storage_dir):
2766
    if not os.path.isdir(file_storage_dir):
2767
      _Fail("Specified Storage directory '%s' is not a directory",
2768
            file_storage_dir)
2769
    # deletes dir only if empty, otherwise we want to fail the rpc call
2770
    try:
2771
      os.rmdir(file_storage_dir)
2772
    except OSError, err:
2773
      _Fail("Cannot remove file storage directory '%s': %s",
2774
            file_storage_dir, err)
2775

    
2776

    
2777
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2778
  """Rename the file storage directory.
2779

2780
  @type old_file_storage_dir: str
2781
  @param old_file_storage_dir: the current path
2782
  @type new_file_storage_dir: str
2783
  @param new_file_storage_dir: the name we should rename to
2784
  @rtype: tuple (success,)
2785
  @return: tuple of one element, C{success}, denoting
2786
      whether the operation was successful
2787

2788
  """
2789
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2790
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2791
  if not os.path.exists(new_file_storage_dir):
2792
    if os.path.isdir(old_file_storage_dir):
2793
      try:
2794
        os.rename(old_file_storage_dir, new_file_storage_dir)
2795
      except OSError, err:
2796
        _Fail("Cannot rename '%s' to '%s': %s",
2797
              old_file_storage_dir, new_file_storage_dir, err)
2798
    else:
2799
      _Fail("Specified storage dir '%s' is not a directory",
2800
            old_file_storage_dir)
2801
  else:
2802
    if os.path.exists(old_file_storage_dir):
2803
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2804
            old_file_storage_dir, new_file_storage_dir)
2805

    
2806

    
2807
def _EnsureJobQueueFile(file_name):
2808
  """Checks whether the given filename is in the queue directory.
2809

2810
  @type file_name: str
2811
  @param file_name: the file name we should check
2812
  @rtype: None
2813
  @raises RPCFail: if the file is not valid
2814

2815
  """
2816
  queue_dir = os.path.normpath(pathutils.QUEUE_DIR)
2817
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2818

    
2819
  if not result:
2820
    _Fail("Passed job queue file '%s' does not belong to"
2821
          " the queue directory '%s'", file_name, queue_dir)
2822

    
2823

    
2824
def JobQueueUpdate(file_name, content):
2825
  """Updates a file in the queue directory.
2826

2827
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2828
  checking.
2829

2830
  @type file_name: str
2831
  @param file_name: the job file name
2832
  @type content: str
2833
  @param content: the new job contents
2834
  @rtype: boolean
2835
  @return: the success of the operation
2836

2837
  """
2838
  file_name = vcluster.LocalizeVirtualPath(file_name)
2839

    
2840
  _EnsureJobQueueFile(file_name)
2841
  getents = runtime.GetEnts()
2842

    
2843
  # Write and replace the file atomically
2844
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2845
                  gid=getents.masterd_gid)
2846

    
2847

    
2848
def JobQueueRename(old, new):
2849
  """Renames a job queue file.
2850

2851
  This is just a wrapper over os.rename with proper checking.
2852

2853
  @type old: str
2854
  @param old: the old (actual) file name
2855
  @type new: str
2856
  @param new: the desired file name
2857
  @rtype: tuple
2858
  @return: the success of the operation and payload
2859

2860
  """
2861
  old = vcluster.LocalizeVirtualPath(old)
2862
  new = vcluster.LocalizeVirtualPath(new)
2863

    
2864
  _EnsureJobQueueFile(old)
2865
  _EnsureJobQueueFile(new)
2866

    
2867
  getents = runtime.GetEnts()
2868

    
2869
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2870
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2871

    
2872

    
2873
def BlockdevClose(instance_name, disks):
2874
  """Closes the given block devices.
2875

2876
  This means they will be switched to secondary mode (in case of
2877
  DRBD).
2878

2879
  @param instance_name: if the argument is not empty, the symlinks
2880
      of this instance will be removed
2881
  @type disks: list of L{objects.Disk}
2882
  @param disks: the list of disks to be closed
2883
  @rtype: tuple (success, message)
2884
  @return: a tuple of success and message, where success
2885
      indicates the succes of the operation, and message
2886
      which will contain the error details in case we
2887
      failed
2888

2889
  """
2890
  bdevs = []
2891
  for cf in disks:
2892
    rd = _RecursiveFindBD(cf)
2893
    if rd is None:
2894
      _Fail("Can't find device %s", cf)
2895
    bdevs.append(rd)
2896

    
2897
  msg = []
2898
  for rd in bdevs:
2899
    try:
2900
      rd.Close()
2901
    except errors.BlockDeviceError, err:
2902
      msg.append(str(err))
2903
  if msg:
2904
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2905
  else:
2906
    if instance_name:
2907
      _RemoveBlockDevLinks(instance_name, disks)
2908

    
2909

    
2910
def ValidateHVParams(hvname, hvparams):
2911
  """Validates the given hypervisor parameters.
2912

2913
  @type hvname: string
2914
  @param hvname: the hypervisor name
2915
  @type hvparams: dict
2916
  @param hvparams: the hypervisor parameters to be validated
2917
  @rtype: None
2918

2919
  """
2920
  try:
2921
    hv_type = hypervisor.GetHypervisor(hvname)
2922
    hv_type.ValidateParameters(hvparams)
2923
  except errors.HypervisorError, err:
2924
    _Fail(str(err), log=False)
2925

    
2926

    
2927
def _CheckOSPList(os_obj, parameters):
2928
  """Check whether a list of parameters is supported by the OS.
2929

2930
  @type os_obj: L{objects.OS}
2931
  @param os_obj: OS object to check
2932
  @type parameters: list
2933
  @param parameters: the list of parameters to check
2934

2935
  """
2936
  supported = [v[0] for v in os_obj.supported_parameters]
2937
  delta = frozenset(parameters).difference(supported)
2938
  if delta:
2939
    _Fail("The following parameters are not supported"
2940
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2941

    
2942

    
2943
def ValidateOS(required, osname, checks, osparams):
2944
  """Validate the given OS' parameters.
2945

2946
  @type required: boolean
2947
  @param required: whether absence of the OS should translate into
2948
      failure or not
2949
  @type osname: string
2950
  @param osname: the OS to be validated
2951
  @type checks: list
2952
  @param checks: list of the checks to run (currently only 'parameters')
2953
  @type osparams: dict
2954
  @param osparams: dictionary with OS parameters
2955
  @rtype: boolean
2956
  @return: True if the validation passed, or False if the OS was not
2957
      found and L{required} was false
2958

2959
  """
2960
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2961
    _Fail("Unknown checks required for OS %s: %s", osname,
2962
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2963

    
2964
  name_only = objects.OS.GetName(osname)
2965
  status, tbv = _TryOSFromDisk(name_only, None)
2966

    
2967
  if not status:
2968
    if required:
2969
      _Fail(tbv)
2970
    else:
2971
      return False
2972

    
2973
  if max(tbv.api_versions) < constants.OS_API_V20:
2974
    return True
2975

    
2976
  if constants.OS_VALIDATE_PARAMETERS in checks:
2977
    _CheckOSPList(tbv, osparams.keys())
2978

    
2979
  validate_env = OSCoreEnv(osname, tbv, osparams)
2980
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2981
                        cwd=tbv.path, reset_env=True)
2982
  if result.failed:
2983
    logging.error("os validate command '%s' returned error: %s output: %s",
2984
                  result.cmd, result.fail_reason, result.output)
2985
    _Fail("OS validation script failed (%s), output: %s",
2986
          result.fail_reason, result.output, log=False)
2987

    
2988
  return True
2989

    
2990

    
2991
def DemoteFromMC():
2992
  """Demotes the current node from master candidate role.
2993

2994
  """
2995
  # try to ensure we're not the master by mistake
2996
  master, myself = ssconf.GetMasterAndMyself()
2997
  if master == myself:
2998
    _Fail("ssconf status shows I'm the master node, will not demote")
2999

    
3000
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3001
  if not result.failed:
3002
    _Fail("The master daemon is running, will not demote")
3003

    
3004
  try:
3005
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3006
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3007
  except EnvironmentError, err:
3008
    if err.errno != errno.ENOENT:
3009
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3010

    
3011
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3012

    
3013

    
3014
def _GetX509Filenames(cryptodir, name):
3015
  """Returns the full paths for the private key and certificate.
3016

3017
  """
3018
  return (utils.PathJoin(cryptodir, name),
3019
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3020
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3021

    
3022

    
3023
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3024
  """Creates a new X509 certificate for SSL/TLS.
3025

3026
  @type validity: int
3027
  @param validity: Validity in seconds
3028
  @rtype: tuple; (string, string)
3029
  @return: Certificate name and public part
3030

3031
  """
3032
  (key_pem, cert_pem) = \
3033
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3034
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3035

    
3036
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3037
                              prefix="x509-%s-" % utils.TimestampForFilename())
3038
  try:
3039
    name = os.path.basename(cert_dir)
3040
    assert len(name) > 5
3041

    
3042
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3043

    
3044
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3045
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3046

    
3047
    # Never return private key as it shouldn't leave the node
3048
    return (name, cert_pem)
3049
  except Exception:
3050
    shutil.rmtree(cert_dir, ignore_errors=True)
3051
    raise
3052

    
3053

    
3054
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3055
  """Removes a X509 certificate.
3056

3057
  @type name: string
3058
  @param name: Certificate name
3059

3060
  """
3061
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3062

    
3063
  utils.RemoveFile(key_file)
3064
  utils.RemoveFile(cert_file)
3065

    
3066
  try:
3067
    os.rmdir(cert_dir)
3068
  except EnvironmentError, err:
3069
    _Fail("Cannot remove certificate directory '%s': %s",
3070
          cert_dir, err)
3071

    
3072

    
3073
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3074
  """Returns the command for the requested input/output.
3075

3076
  @type instance: L{objects.Instance}
3077
  @param instance: The instance object
3078
  @param mode: Import/export mode
3079
  @param ieio: Input/output type
3080
  @param ieargs: Input/output arguments
3081

3082
  """
3083
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3084

    
3085
  env = None
3086
  prefix = None
3087
  suffix = None
3088
  exp_size = None
3089

    
3090
  if ieio == constants.IEIO_FILE:
3091
    (filename, ) = ieargs
3092

    
3093
    if not utils.IsNormAbsPath(filename):
3094
      _Fail("Path '%s' is not normalized or absolute", filename)
3095

    
3096
    real_filename = os.path.realpath(filename)
3097
    directory = os.path.dirname(real_filename)
3098

    
3099
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3100
      _Fail("File '%s' is not under exports directory '%s': %s",
3101
            filename, pathutils.EXPORT_DIR, real_filename)
3102

    
3103
    # Create directory
3104
    utils.Makedirs(directory, mode=0750)
3105

    
3106
    quoted_filename = utils.ShellQuote(filename)
3107

    
3108
    if mode == constants.IEM_IMPORT:
3109
      suffix = "> %s" % quoted_filename
3110
    elif mode == constants.IEM_EXPORT:
3111
      suffix = "< %s" % quoted_filename
3112

    
3113
      # Retrieve file size
3114
      try:
3115
        st = os.stat(filename)
3116
      except EnvironmentError, err:
3117
        logging.error("Can't stat(2) %s: %s", filename, err)
3118
      else:
3119
        exp_size = utils.BytesToMebibyte(st.st_size)
3120

    
3121
  elif ieio == constants.IEIO_RAW_DISK:
3122
    (disk, ) = ieargs
3123

    
3124
    real_disk = _OpenRealBD(disk)
3125

    
3126
    if mode == constants.IEM_IMPORT:
3127
      # we set here a smaller block size as, due to transport buffering, more
3128
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3129
      # is not already there or we pass a wrong path; we use notrunc to no
3130
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3131
      # much memory; this means that at best, we flush every 64k, which will
3132
      # not be very fast
3133
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3134
                                    " bs=%s oflag=dsync"),
3135
                                    real_disk.dev_path,
3136
                                    str(64 * 1024))
3137

    
3138
    elif mode == constants.IEM_EXPORT:
3139
      # the block size on the read dd is 1MiB to match our units
3140
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3141
                                   real_disk.dev_path,
3142
                                   str(1024 * 1024), # 1 MB
3143
                                   str(disk.size))
3144
      exp_size = disk.size
3145

    
3146
  elif ieio == constants.IEIO_SCRIPT:
3147
    (disk, disk_index, ) = ieargs
3148

    
3149
    assert isinstance(disk_index, (int, long))
3150

    
3151
    real_disk = _OpenRealBD(disk)
3152

    
3153
    inst_os = OSFromDisk(instance.os)
3154
    env = OSEnvironment(instance, inst_os)
3155

    
3156
    if mode == constants.IEM_IMPORT:
3157
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3158
      env["IMPORT_INDEX"] = str(disk_index)
3159
      script = inst_os.import_script
3160

    
3161
    elif mode == constants.IEM_EXPORT:
3162
      env["EXPORT_DEVICE"] = real_disk.dev_path
3163
      env["EXPORT_INDEX"] = str(disk_index)
3164
      script = inst_os.export_script
3165

    
3166
    # TODO: Pass special environment only to script
3167
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3168

    
3169
    if mode == constants.IEM_IMPORT:
3170
      suffix = "| %s" % script_cmd
3171

    
3172
    elif mode == constants.IEM_EXPORT:
3173
      prefix = "%s |" % script_cmd
3174

    
3175
    # Let script predict size
3176
    exp_size = constants.IE_CUSTOM_SIZE
3177

    
3178
  else:
3179
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3180

    
3181
  return (env, prefix, suffix, exp_size)
3182

    
3183

    
3184
def _CreateImportExportStatusDir(prefix):
3185
  """Creates status directory for import/export.
3186

3187
  """
3188
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3189
                          prefix=("%s-%s-" %
3190
                                  (prefix, utils.TimestampForFilename())))
3191

    
3192

    
3193
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3194
                            ieio, ieioargs):
3195
  """Starts an import or export daemon.
3196

3197
  @param mode: Import/output mode
3198
  @type opts: L{objects.ImportExportOptions}
3199
  @param opts: Daemon options
3200
  @type host: string
3201
  @param host: Remote host for export (None for import)
3202
  @type port: int
3203
  @param port: Remote port for export (None for import)
3204
  @type instance: L{objects.Instance}
3205
  @param instance: Instance object
3206
  @type component: string
3207
  @param component: which part of the instance is transferred now,
3208
      e.g. 'disk/0'
3209
  @param ieio: Input/output type
3210
  @param ieioargs: Input/output arguments
3211

3212
  """
3213
  if mode == constants.IEM_IMPORT:
3214
    prefix = "import"
3215

    
3216
    if not (host is None and port is None):
3217
      _Fail("Can not specify host or port on import")
3218

    
3219
  elif mode == constants.IEM_EXPORT:
3220
    prefix = "export"
3221

    
3222
    if host is None or port is None:
3223
      _Fail("Host and port must be specified for an export")
3224

    
3225
  else:
3226
    _Fail("Invalid mode %r", mode)
3227

    
3228
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3229
    _Fail("Cluster certificate can only be used for both key and CA")
3230

    
3231
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3232
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3233

    
3234
  if opts.key_name is None:
3235
    # Use server.pem
3236
    key_path = pathutils.NODED_CERT_FILE
3237
    cert_path = pathutils.NODED_CERT_FILE
3238
    assert opts.ca_pem is None
3239
  else:
3240
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3241
                                                 opts.key_name)
3242
    assert opts.ca_pem is not None
3243

    
3244
  for i in [key_path, cert_path]:
3245
    if not os.path.exists(i):
3246
      _Fail("File '%s' does not exist" % i)
3247

    
3248
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3249
  try:
3250
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3251
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3252
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3253

    
3254
    if opts.ca_pem is None:
3255
      # Use server.pem
3256
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3257
    else:
3258
      ca = opts.ca_pem
3259

    
3260
    # Write CA file
3261
    utils.WriteFile(ca_file, data=ca, mode=0400)
3262

    
3263
    cmd = [
3264
      pathutils.IMPORT_EXPORT_DAEMON,
3265
      status_file, mode,
3266
      "--key=%s" % key_path,
3267
      "--cert=%s" % cert_path,
3268
      "--ca=%s" % ca_file,
3269
      ]
3270

    
3271
    if host:
3272
      cmd.append("--host=%s" % host)
3273

    
3274
    if port:
3275
      cmd.append("--port=%s" % port)
3276

    
3277
    if opts.ipv6:
3278
      cmd.append("--ipv6")
3279
    else:
3280
      cmd.append("--ipv4")
3281

    
3282
    if opts.compress:
3283
      cmd.append("--compress=%s" % opts.compress)
3284

    
3285
    if opts.magic:
3286
      cmd.append("--magic=%s" % opts.magic)
3287

    
3288
    if exp_size is not None:
3289
      cmd.append("--expected-size=%s" % exp_size)
3290

    
3291
    if cmd_prefix:
3292
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3293

    
3294
    if cmd_suffix:
3295
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3296

    
3297
    if mode == constants.IEM_EXPORT:
3298
      # Retry connection a few times when connecting to remote peer
3299
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3300
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3301
    elif opts.connect_timeout is not None:
3302
      assert mode == constants.IEM_IMPORT
3303
      # Overall timeout for establishing connection while listening
3304
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3305

    
3306
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3307

    
3308
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3309
    # support for receiving a file descriptor for output
3310
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3311
                      output=logfile)
3312

    
3313
    # The import/export name is simply the status directory name
3314
    return os.path.basename(status_dir)
3315

    
3316
  except Exception:
3317
    shutil.rmtree(status_dir, ignore_errors=True)
3318
    raise
3319

    
3320

    
3321
def GetImportExportStatus(names):
3322
  """Returns import/export daemon status.
3323

3324
  @type names: sequence
3325
  @param names: List of names
3326
  @rtype: List of dicts
3327
  @return: Returns a list of the state of each named import/export or None if a
3328
           status couldn't be read
3329

3330
  """
3331
  result = []
3332

    
3333
  for name in names:
3334
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3335
                                 _IES_STATUS_FILE)
3336

    
3337
    try:
3338
      data = utils.ReadFile(status_file)
3339
    except EnvironmentError, err:
3340
      if err.errno != errno.ENOENT:
3341
        raise
3342
      data = None
3343

    
3344
    if not data:
3345
      result.append(None)
3346
      continue
3347

    
3348
    result.append(serializer.LoadJson(data))
3349

    
3350
  return result
3351

    
3352

    
3353
def AbortImportExport(name):
3354
  """Sends SIGTERM to a running import/export daemon.
3355

3356
  """
3357
  logging.info("Abort import/export %s", name)
3358

    
3359
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3360
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3361

    
3362
  if pid:
3363
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3364
                 name, pid)
3365
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3366

    
3367

    
3368
def CleanupImportExport(name):
3369
  """Cleanup after an import or export.
3370

3371
  If the import/export daemon is still running it's killed. Afterwards the
3372
  whole status directory is removed.
3373

3374
  """
3375
  logging.info("Finalizing import/export %s", name)
3376

    
3377
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3378

    
3379
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3380

    
3381
  if pid:
3382
    logging.info("Import/export %s is still running with PID %s",
3383
                 name, pid)
3384
    utils.KillProcess(pid, waitpid=False)
3385

    
3386
  shutil.rmtree(status_dir, ignore_errors=True)
3387

    
3388

    
3389
def _FindDisks(nodes_ip, disks):
3390
  """Sets the physical ID on disks and returns the block devices.
3391

3392
  """
3393
  # set the correct physical ID
3394
  my_name = netutils.Hostname.GetSysName()
3395
  for cf in disks:
3396
    cf.SetPhysicalID(my_name, nodes_ip)
3397

    
3398
  bdevs = []
3399

    
3400
  for cf in disks:
3401
    rd = _RecursiveFindBD(cf)
3402
    if rd is None:
3403
      _Fail("Can't find device %s", cf)
3404
    bdevs.append(rd)
3405
  return bdevs
3406

    
3407

    
3408
def DrbdDisconnectNet(nodes_ip, disks):
3409
  """Disconnects the network on a list of drbd devices.
3410

3411
  """
3412
  bdevs = _FindDisks(nodes_ip, disks)
3413

    
3414
  # disconnect disks
3415
  for rd in bdevs:
3416
    try:
3417
      rd.DisconnectNet()
3418
    except errors.BlockDeviceError, err:
3419
      _Fail("Can't change network configuration to standalone mode: %s",
3420
            err, exc=True)
3421

    
3422

    
3423
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3424
  """Attaches the network on a list of drbd devices.
3425

3426
  """
3427
  bdevs = _FindDisks(nodes_ip, disks)
3428

    
3429
  if multimaster:
3430
    for idx, rd in enumerate(bdevs):
3431
      try:
3432
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3433
      except EnvironmentError, err:
3434
        _Fail("Can't create symlink: %s", err)
3435
  # reconnect disks, switch to new master configuration and if
3436
  # needed primary mode
3437
  for rd in bdevs:
3438
    try:
3439
      rd.AttachNet(multimaster)
3440
    except errors.BlockDeviceError, err:
3441
      _Fail("Can't change network configuration: %s", err)
3442

    
3443
  # wait until the disks are connected; we need to retry the re-attach
3444
  # if the device becomes standalone, as this might happen if the one
3445
  # node disconnects and reconnects in a different mode before the
3446
  # other node reconnects; in this case, one or both of the nodes will
3447
  # decide it has wrong configuration and switch to standalone
3448

    
3449
  def _Attach():
3450
    all_connected = True
3451

    
3452
    for rd in bdevs:
3453
      stats = rd.GetProcStatus()
3454

    
3455
      all_connected = (all_connected and
3456
                       (stats.is_connected or stats.is_in_resync))
3457

    
3458
      if stats.is_standalone:
3459
        # peer had different config info and this node became
3460
        # standalone, even though this should not happen with the
3461
        # new staged way of changing disk configs
3462
        try:
3463
          rd.AttachNet(multimaster)
3464
        except errors.BlockDeviceError, err:
3465
          _Fail("Can't change network configuration: %s", err)
3466

    
3467
    if not all_connected:
3468
      raise utils.RetryAgain()
3469

    
3470
  try:
3471
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3472
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3473
  except utils.RetryTimeout:
3474
    _Fail("Timeout in disk reconnecting")
3475

    
3476
  if multimaster:
3477
    # change to primary mode
3478
    for rd in bdevs:
3479
      try:
3480
        rd.Open()
3481
      except errors.BlockDeviceError, err:
3482
        _Fail("Can't change to primary mode: %s", err)
3483

    
3484

    
3485
def DrbdWaitSync(nodes_ip, disks):
3486
  """Wait until DRBDs have synchronized.
3487

3488
  """
3489
  def _helper(rd):
3490
    stats = rd.GetProcStatus()
3491
    if not (stats.is_connected or stats.is_in_resync):
3492
      raise utils.RetryAgain()
3493
    return stats
3494

    
3495
  bdevs = _FindDisks(nodes_ip, disks)
3496

    
3497
  min_resync = 100
3498
  alldone = True
3499
  for rd in bdevs:
3500
    try:
3501
      # poll each second for 15 seconds
3502
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3503
    except utils.RetryTimeout:
3504
      stats = rd.GetProcStatus()
3505
      # last check
3506
      if not (stats.is_connected or stats.is_in_resync):
3507
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3508
    alldone = alldone and (not stats.is_in_resync)
3509
    if stats.sync_percent is not None:
3510
      min_resync = min(min_resync, stats.sync_percent)
3511

    
3512
  return (alldone, min_resync)
3513

    
3514

    
3515
def GetDrbdUsermodeHelper():
3516
  """Returns DRBD usermode helper currently configured.
3517

3518
  """
3519
  try:
3520
    return bdev.BaseDRBD.GetUsermodeHelper()
3521
  except errors.BlockDeviceError, err:
3522
    _Fail(str(err))
3523

    
3524

    
3525
def PowercycleNode(hypervisor_type):
3526
  """Hard-powercycle the node.
3527

3528
  Because we need to return first, and schedule the powercycle in the
3529
  background, we won't be able to report failures nicely.
3530

3531
  """
3532
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3533
  try:
3534
    pid = os.fork()
3535
  except OSError:
3536
    # if we can't fork, we'll pretend that we're in the child process
3537
    pid = 0
3538
  if pid > 0:
3539
    return "Reboot scheduled in 5 seconds"
3540
  # ensure the child is running on ram
3541
  try:
3542
    utils.Mlockall()
3543
  except Exception: # pylint: disable=W0703
3544
    pass
3545
  time.sleep(5)
3546
  hyper.PowercycleNode()
3547

    
3548

    
3549
class HooksRunner(object):
3550
  """Hook runner.
3551

3552
  This class is instantiated on the node side (ganeti-noded) and not
3553
  on the master side.
3554

3555
  """
3556
  def __init__(self, hooks_base_dir=None):
3557
    """Constructor for hooks runner.
3558

3559
    @type hooks_base_dir: str or None
3560
    @param hooks_base_dir: if not None, this overrides the
3561
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3562

3563
    """
3564
    if hooks_base_dir is None:
3565
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3566
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3567
    # constant
3568
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3569

    
3570
  def RunLocalHooks(self, node_list, hpath, phase, env):
3571
    """Check that the hooks will be run only locally and then run them.
3572

3573
    """
3574
    assert len(node_list) == 1
3575
    node = node_list[0]
3576
    _, myself = ssconf.GetMasterAndMyself()
3577
    assert node == myself
3578

    
3579
    results = self.RunHooks(hpath, phase, env)
3580

    
3581
    # Return values in the form expected by HooksMaster
3582
    return {node: (None, False, results)}
3583

    
3584
  def RunHooks(self, hpath, phase, env):
3585
    """Run the scripts in the hooks directory.
3586

3587
    @type hpath: str
3588
    @param hpath: the path to the hooks directory which
3589
        holds the scripts
3590
    @type phase: str
3591
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3592
        L{constants.HOOKS_PHASE_POST}
3593
    @type env: dict
3594
    @param env: dictionary with the environment for the hook
3595
    @rtype: list
3596
    @return: list of 3-element tuples:
3597
      - script path
3598
      - script result, either L{constants.HKR_SUCCESS} or
3599
        L{constants.HKR_FAIL}
3600
      - output of the script
3601

3602
    @raise errors.ProgrammerError: for invalid input
3603
        parameters
3604

3605
    """
3606
    if phase == constants.HOOKS_PHASE_PRE:
3607
      suffix = "pre"
3608
    elif phase == constants.HOOKS_PHASE_POST:
3609
      suffix = "post"
3610
    else:
3611
      _Fail("Unknown hooks phase '%s'", phase)
3612

    
3613
    subdir = "%s-%s.d" % (hpath, suffix)
3614
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3615

    
3616
    results = []
3617

    
3618
    if not os.path.isdir(dir_name):
3619
      # for non-existing/non-dirs, we simply exit instead of logging a
3620
      # warning at every operation
3621
      return results
3622

    
3623
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3624

    
3625
    for (relname, relstatus, runresult) in runparts_results:
3626
      if relstatus == constants.RUNPARTS_SKIP:
3627
        rrval = constants.HKR_SKIP
3628
        output = ""
3629
      elif relstatus == constants.RUNPARTS_ERR:
3630
        rrval = constants.HKR_FAIL
3631
        output = "Hook script execution error: %s" % runresult
3632
      elif relstatus == constants.RUNPARTS_RUN:
3633
        if runresult.failed:
3634
          rrval = constants.HKR_FAIL
3635
        else:
3636
          rrval = constants.HKR_SUCCESS
3637
        output = utils.SafeEncode(runresult.output.strip())
3638
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3639

    
3640
    return results
3641

    
3642

    
3643
class IAllocatorRunner(object):
3644
  """IAllocator runner.
3645

3646
  This class is instantiated on the node side (ganeti-noded) and not on
3647
  the master side.
3648

3649
  """
3650
  @staticmethod
3651
  def Run(name, idata):
3652
    """Run an iallocator script.
3653

3654
    @type name: str
3655
    @param name: the iallocator script name
3656
    @type idata: str
3657
    @param idata: the allocator input data
3658

3659
    @rtype: tuple
3660
    @return: two element tuple of:
3661
       - status
3662
       - either error message or stdout of allocator (for success)
3663

3664
    """
3665
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3666
                                  os.path.isfile)
3667
    if alloc_script is None:
3668
      _Fail("iallocator module '%s' not found in the search path", name)
3669

    
3670
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3671
    try:
3672
      os.write(fd, idata)
3673
      os.close(fd)
3674
      result = utils.RunCmd([alloc_script, fin_name])
3675
      if result.failed:
3676
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3677
              name, result.fail_reason, result.output)
3678
    finally:
3679
      os.unlink(fin_name)
3680

    
3681
    return result.stdout
3682

    
3683

    
3684
class DevCacheManager(object):
3685
  """Simple class for managing a cache of block device information.
3686

3687
  """
3688
  _DEV_PREFIX = "/dev/"
3689
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3690

    
3691
  @classmethod
3692
  def _ConvertPath(cls, dev_path):
3693
    """Converts a /dev/name path to the cache file name.
3694

3695
    This replaces slashes with underscores and strips the /dev
3696
    prefix. It then returns the full path to the cache file.
3697

3698
    @type dev_path: str
3699
    @param dev_path: the C{/dev/} path name
3700
    @rtype: str
3701
    @return: the converted path name
3702

3703
    """
3704
    if dev_path.startswith(cls._DEV_PREFIX):
3705
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3706
    dev_path = dev_path.replace("/", "_")
3707
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3708
    return fpath
3709

    
3710
  @classmethod
3711
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3712
    """Updates the cache information for a given device.
3713

3714
    @type dev_path: str
3715
    @param dev_path: the pathname of the device
3716
    @type owner: str
3717
    @param owner: the owner (instance name) of the device
3718
    @type on_primary: bool
3719
    @param on_primary: whether this is the primary
3720
        node nor not
3721
    @type iv_name: str
3722
    @param iv_name: the instance-visible name of the
3723
        device, as in objects.Disk.iv_name
3724

3725
    @rtype: None
3726

3727
    """
3728
    if dev_path is None:
3729
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3730
      return
3731
    fpath = cls._ConvertPath(dev_path)
3732
    if on_primary:
3733
      state = "primary"
3734
    else:
3735
      state = "secondary"
3736
    if iv_name is None:
3737
      iv_name = "not_visible"
3738
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3739
    try:
3740
      utils.WriteFile(fpath, data=fdata)
3741
    except EnvironmentError, err:
3742
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3743

    
3744
  @classmethod
3745
  def RemoveCache(cls, dev_path):
3746
    """Remove data for a dev_path.
3747

3748
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3749
    path name and logging.
3750

3751
    @type dev_path: str
3752
    @param dev_path: the pathname of the device
3753

3754
    @rtype: None
3755

3756
    """
3757
    if dev_path is None:
3758
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3759
      return
3760
    fpath = cls._ConvertPath(dev_path)
3761
    try:
3762
      utils.RemoveFile(fpath)
3763
    except EnvironmentError, err:
3764
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)