Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 1cb97324

History | View | Annotate | Download (113.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67

    
68

    
69
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
70
_ALLOWED_CLEAN_DIRS = frozenset([
71
  pathutils.DATA_DIR,
72
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
73
  pathutils.QUEUE_DIR,
74
  pathutils.CRYPTO_KEYS_DIR,
75
  ])
76
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
77
_X509_KEY_FILE = "key"
78
_X509_CERT_FILE = "cert"
79
_IES_STATUS_FILE = "status"
80
_IES_PID_FILE = "pid"
81
_IES_CA_FILE = "ca"
82

    
83
#: Valid LVS output line regex
84
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
85

    
86
# Actions for the master setup script
87
_MASTER_START = "start"
88
_MASTER_STOP = "stop"
89

    
90

    
91
class RPCFail(Exception):
92
  """Class denoting RPC failure.
93

94
  Its argument is the error message.
95

96
  """
97

    
98

    
99
def _Fail(msg, *args, **kwargs):
100
  """Log an error and the raise an RPCFail exception.
101

102
  This exception is then handled specially in the ganeti daemon and
103
  turned into a 'failed' return type. As such, this function is a
104
  useful shortcut for logging the error and returning it to the master
105
  daemon.
106

107
  @type msg: string
108
  @param msg: the text of the exception
109
  @raise RPCFail
110

111
  """
112
  if args:
113
    msg = msg % args
114
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
115
    if "exc" in kwargs and kwargs["exc"]:
116
      logging.exception(msg)
117
    else:
118
      logging.error(msg)
119
  raise RPCFail(msg)
120

    
121

    
122
def _GetConfig():
123
  """Simple wrapper to return a SimpleStore.
124

125
  @rtype: L{ssconf.SimpleStore}
126
  @return: a SimpleStore instance
127

128
  """
129
  return ssconf.SimpleStore()
130

    
131

    
132
def _GetSshRunner(cluster_name):
133
  """Simple wrapper to return an SshRunner.
134

135
  @type cluster_name: str
136
  @param cluster_name: the cluster name, which is needed
137
      by the SshRunner constructor
138
  @rtype: L{ssh.SshRunner}
139
  @return: an SshRunner instance
140

141
  """
142
  return ssh.SshRunner(cluster_name)
143

    
144

    
145
def _Decompress(data):
146
  """Unpacks data compressed by the RPC client.
147

148
  @type data: list or tuple
149
  @param data: Data sent by RPC client
150
  @rtype: str
151
  @return: Decompressed data
152

153
  """
154
  assert isinstance(data, (list, tuple))
155
  assert len(data) == 2
156
  (encoding, content) = data
157
  if encoding == constants.RPC_ENCODING_NONE:
158
    return content
159
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
160
    return zlib.decompress(base64.b64decode(content))
161
  else:
162
    raise AssertionError("Unknown data encoding")
163

    
164

    
165
def _CleanDirectory(path, exclude=None):
166
  """Removes all regular files in a directory.
167

168
  @type path: str
169
  @param path: the directory to clean
170
  @type exclude: list
171
  @param exclude: list of files to be excluded, defaults
172
      to the empty list
173

174
  """
175
  if path not in _ALLOWED_CLEAN_DIRS:
176
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
177
          path)
178

    
179
  if not os.path.isdir(path):
180
    return
181
  if exclude is None:
182
    exclude = []
183
  else:
184
    # Normalize excluded paths
185
    exclude = [os.path.normpath(i) for i in exclude]
186

    
187
  for rel_name in utils.ListVisibleFiles(path):
188
    full_name = utils.PathJoin(path, rel_name)
189
    if full_name in exclude:
190
      continue
191
    if os.path.isfile(full_name) and not os.path.islink(full_name):
192
      utils.RemoveFile(full_name)
193

    
194

    
195
def _BuildUploadFileList():
196
  """Build the list of allowed upload files.
197

198
  This is abstracted so that it's built only once at module import time.
199

200
  """
201
  allowed_files = set([
202
    pathutils.CLUSTER_CONF_FILE,
203
    constants.ETC_HOSTS,
204
    pathutils.SSH_KNOWN_HOSTS_FILE,
205
    pathutils.VNC_PASSWORD_FILE,
206
    pathutils.RAPI_CERT_FILE,
207
    pathutils.SPICE_CERT_FILE,
208
    pathutils.SPICE_CACERT_FILE,
209
    pathutils.RAPI_USERS_FILE,
210
    pathutils.CONFD_HMAC_KEY,
211
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
212
    ])
213

    
214
  for hv_name in constants.HYPER_TYPES:
215
    hv_class = hypervisor.GetHypervisorClass(hv_name)
216
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
217

    
218
  return frozenset(allowed_files)
219

    
220

    
221
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
222

    
223

    
224
def JobQueuePurge():
225
  """Removes job queue files and archived jobs.
226

227
  @rtype: tuple
228
  @return: True, None
229

230
  """
231
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
232
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
233

    
234

    
235
def GetMasterInfo():
236
  """Returns master information.
237

238
  This is an utility function to compute master information, either
239
  for consumption here or from the node daemon.
240

241
  @rtype: tuple
242
  @return: master_netdev, master_ip, master_name, primary_ip_family,
243
    master_netmask
244
  @raise RPCFail: in case of errors
245

246
  """
247
  try:
248
    cfg = _GetConfig()
249
    master_netdev = cfg.GetMasterNetdev()
250
    master_ip = cfg.GetMasterIP()
251
    master_netmask = cfg.GetMasterNetmask()
252
    master_node = cfg.GetMasterNode()
253
    primary_ip_family = cfg.GetPrimaryIPFamily()
254
  except errors.ConfigurationError, err:
255
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
256
  return (master_netdev, master_ip, master_node, primary_ip_family,
257
          master_netmask)
258

    
259

    
260
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
261
  """Decorator that runs hooks before and after the decorated function.
262

263
  @type hook_opcode: string
264
  @param hook_opcode: opcode of the hook
265
  @type hooks_path: string
266
  @param hooks_path: path of the hooks
267
  @type env_builder_fn: function
268
  @param env_builder_fn: function that returns a dictionary containing the
269
    environment variables for the hooks. Will get all the parameters of the
270
    decorated function.
271
  @raise RPCFail: in case of pre-hook failure
272

273
  """
274
  def decorator(fn):
275
    def wrapper(*args, **kwargs):
276
      _, myself = ssconf.GetMasterAndMyself()
277
      nodes = ([myself], [myself])  # these hooks run locally
278

    
279
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
280

    
281
      cfg = _GetConfig()
282
      hr = HooksRunner()
283
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
284
                            None, env_fn, logging.warning, cfg.GetClusterName(),
285
                            cfg.GetMasterNode())
286

    
287
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
288
      result = fn(*args, **kwargs)
289
      hm.RunPhase(constants.HOOKS_PHASE_POST)
290

    
291
      return result
292
    return wrapper
293
  return decorator
294

    
295

    
296
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
297
  """Builds environment variables for master IP hooks.
298

299
  @type master_params: L{objects.MasterNetworkParameters}
300
  @param master_params: network parameters of the master
301
  @type use_external_mip_script: boolean
302
  @param use_external_mip_script: whether to use an external master IP
303
    address setup script (unused, but necessary per the implementation of the
304
    _RunLocalHooks decorator)
305

306
  """
307
  # pylint: disable=W0613
308
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
309
  env = {
310
    "MASTER_NETDEV": master_params.netdev,
311
    "MASTER_IP": master_params.ip,
312
    "MASTER_NETMASK": str(master_params.netmask),
313
    "CLUSTER_IP_VERSION": str(ver),
314
  }
315

    
316
  return env
317

    
318

    
319
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
320
  """Execute the master IP address setup script.
321

322
  @type master_params: L{objects.MasterNetworkParameters}
323
  @param master_params: network parameters of the master
324
  @type action: string
325
  @param action: action to pass to the script. Must be one of
326
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
327
  @type use_external_mip_script: boolean
328
  @param use_external_mip_script: whether to use an external master IP
329
    address setup script
330
  @raise backend.RPCFail: if there are errors during the execution of the
331
    script
332

333
  """
334
  env = _BuildMasterIpEnv(master_params)
335

    
336
  if use_external_mip_script:
337
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
338
  else:
339
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
340

    
341
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
342

    
343
  if result.failed:
344
    _Fail("Failed to %s the master IP. Script return value: %s" %
345
          (action, result.exit_code), log=True)
346

    
347

    
348
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
349
               _BuildMasterIpEnv)
350
def ActivateMasterIp(master_params, use_external_mip_script):
351
  """Activate the IP address of the master daemon.
352

353
  @type master_params: L{objects.MasterNetworkParameters}
354
  @param master_params: network parameters of the master
355
  @type use_external_mip_script: boolean
356
  @param use_external_mip_script: whether to use an external master IP
357
    address setup script
358
  @raise RPCFail: in case of errors during the IP startup
359

360
  """
361
  _RunMasterSetupScript(master_params, _MASTER_START,
362
                        use_external_mip_script)
363

    
364

    
365
def StartMasterDaemons(no_voting):
366
  """Activate local node as master node.
367

368
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
369

370
  @type no_voting: boolean
371
  @param no_voting: whether to start ganeti-masterd without a node vote
372
      but still non-interactively
373
  @rtype: None
374

375
  """
376

    
377
  if no_voting:
378
    masterd_args = "--no-voting --yes-do-it"
379
  else:
380
    masterd_args = ""
381

    
382
  env = {
383
    "EXTRA_MASTERD_ARGS": masterd_args,
384
    }
385

    
386
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
387
  if result.failed:
388
    msg = "Can't start Ganeti master: %s" % result.output
389
    logging.error(msg)
390
    _Fail(msg)
391

    
392

    
393
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
394
               _BuildMasterIpEnv)
395
def DeactivateMasterIp(master_params, use_external_mip_script):
396
  """Deactivate the master IP on this node.
397

398
  @type master_params: L{objects.MasterNetworkParameters}
399
  @param master_params: network parameters of the master
400
  @type use_external_mip_script: boolean
401
  @param use_external_mip_script: whether to use an external master IP
402
    address setup script
403
  @raise RPCFail: in case of errors during the IP turndown
404

405
  """
406
  _RunMasterSetupScript(master_params, _MASTER_STOP,
407
                        use_external_mip_script)
408

    
409

    
410
def StopMasterDaemons():
411
  """Stop the master daemons on this node.
412

413
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
414

415
  @rtype: None
416

417
  """
418
  # TODO: log and report back to the caller the error failures; we
419
  # need to decide in which case we fail the RPC for this
420

    
421
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
422
  if result.failed:
423
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
424
                  " and error %s",
425
                  result.cmd, result.exit_code, result.output)
426

    
427

    
428
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
429
  """Change the netmask of the master IP.
430

431
  @param old_netmask: the old value of the netmask
432
  @param netmask: the new value of the netmask
433
  @param master_ip: the master IP
434
  @param master_netdev: the master network device
435

436
  """
437
  if old_netmask == netmask:
438
    return
439

    
440
  if not netutils.IPAddress.Own(master_ip):
441
    _Fail("The master IP address is not up, not attempting to change its"
442
          " netmask")
443

    
444
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
445
                         "%s/%s" % (master_ip, netmask),
446
                         "dev", master_netdev, "label",
447
                         "%s:0" % master_netdev])
448
  if result.failed:
449
    _Fail("Could not set the new netmask on the master IP address")
450

    
451
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
452
                         "%s/%s" % (master_ip, old_netmask),
453
                         "dev", master_netdev, "label",
454
                         "%s:0" % master_netdev])
455
  if result.failed:
456
    _Fail("Could not bring down the master IP address with the old netmask")
457

    
458

    
459
def EtcHostsModify(mode, host, ip):
460
  """Modify a host entry in /etc/hosts.
461

462
  @param mode: The mode to operate. Either add or remove entry
463
  @param host: The host to operate on
464
  @param ip: The ip associated with the entry
465

466
  """
467
  if mode == constants.ETC_HOSTS_ADD:
468
    if not ip:
469
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
470
              " present")
471
    utils.AddHostToEtcHosts(host, ip)
472
  elif mode == constants.ETC_HOSTS_REMOVE:
473
    if ip:
474
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
475
              " parameter is present")
476
    utils.RemoveHostFromEtcHosts(host)
477
  else:
478
    RPCFail("Mode not supported")
479

    
480

    
481
def LeaveCluster(modify_ssh_setup):
482
  """Cleans up and remove the current node.
483

484
  This function cleans up and prepares the current node to be removed
485
  from the cluster.
486

487
  If processing is successful, then it raises an
488
  L{errors.QuitGanetiException} which is used as a special case to
489
  shutdown the node daemon.
490

491
  @param modify_ssh_setup: boolean
492

493
  """
494
  _CleanDirectory(pathutils.DATA_DIR)
495
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
496
  JobQueuePurge()
497

    
498
  if modify_ssh_setup:
499
    try:
500
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
501

    
502
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
503

    
504
      utils.RemoveFile(priv_key)
505
      utils.RemoveFile(pub_key)
506
    except errors.OpExecError:
507
      logging.exception("Error while processing ssh files")
508

    
509
  try:
510
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
511
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
512
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
513
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
514
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
515
  except: # pylint: disable=W0702
516
    logging.exception("Error while removing cluster secrets")
517

    
518
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
519
  if result.failed:
520
    logging.error("Command %s failed with exitcode %s and error %s",
521
                  result.cmd, result.exit_code, result.output)
522

    
523
  # Raise a custom exception (handled in ganeti-noded)
524
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
525

    
526

    
527
def _GetVgInfo(name):
528
  """Retrieves information about a LVM volume group.
529

530
  """
531
  # TODO: GetVGInfo supports returning information for multiple VGs at once
532
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
533
  if vginfo:
534
    vg_free = int(round(vginfo[0][0], 0))
535
    vg_size = int(round(vginfo[0][1], 0))
536
  else:
537
    vg_free = None
538
    vg_size = None
539

    
540
  return {
541
    "name": name,
542
    "vg_free": vg_free,
543
    "vg_size": vg_size,
544
    }
545

    
546

    
547
def _GetHvInfo(name):
548
  """Retrieves node information from a hypervisor.
549

550
  The information returned depends on the hypervisor. Common items:
551

552
    - vg_size is the size of the configured volume group in MiB
553
    - vg_free is the free size of the volume group in MiB
554
    - memory_dom0 is the memory allocated for domain0 in MiB
555
    - memory_free is the currently available (free) ram in MiB
556
    - memory_total is the total number of ram in MiB
557
    - hv_version: the hypervisor version, if available
558

559
  """
560
  return hypervisor.GetHypervisor(name).GetNodeInfo()
561

    
562

    
563
def _GetNamedNodeInfo(names, fn):
564
  """Calls C{fn} for all names in C{names} and returns a dictionary.
565

566
  @rtype: None or dict
567

568
  """
569
  if names is None:
570
    return None
571
  else:
572
    return map(fn, names)
573

    
574

    
575
def GetNodeInfo(vg_names, hv_names):
576
  """Gives back a hash with different information about the node.
577

578
  @type vg_names: list of string
579
  @param vg_names: Names of the volume groups to ask for disk space information
580
  @type hv_names: list of string
581
  @param hv_names: Names of the hypervisors to ask for node information
582
  @rtype: tuple; (string, None/dict, None/dict)
583
  @return: Tuple containing boot ID, volume group information and hypervisor
584
    information
585

586
  """
587
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
588
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
589
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
590

    
591
  return (bootid, vg_info, hv_info)
592

    
593

    
594
def VerifyNode(what, cluster_name):
595
  """Verify the status of the local node.
596

597
  Based on the input L{what} parameter, various checks are done on the
598
  local node.
599

600
  If the I{filelist} key is present, this list of
601
  files is checksummed and the file/checksum pairs are returned.
602

603
  If the I{nodelist} key is present, we check that we have
604
  connectivity via ssh with the target nodes (and check the hostname
605
  report).
606

607
  If the I{node-net-test} key is present, we check that we have
608
  connectivity to the given nodes via both primary IP and, if
609
  applicable, secondary IPs.
610

611
  @type what: C{dict}
612
  @param what: a dictionary of things to check:
613
      - filelist: list of files for which to compute checksums
614
      - nodelist: list of nodes we should check ssh communication with
615
      - node-net-test: list of nodes we should check node daemon port
616
        connectivity with
617
      - hypervisor: list with hypervisors to run the verify for
618
  @rtype: dict
619
  @return: a dictionary with the same keys as the input dict, and
620
      values representing the result of the checks
621

622
  """
623
  result = {}
624
  my_name = netutils.Hostname.GetSysName()
625
  port = netutils.GetDaemonPort(constants.NODED)
626
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
627

    
628
  if constants.NV_HYPERVISOR in what and vm_capable:
629
    result[constants.NV_HYPERVISOR] = tmp = {}
630
    for hv_name in what[constants.NV_HYPERVISOR]:
631
      try:
632
        val = hypervisor.GetHypervisor(hv_name).Verify()
633
      except errors.HypervisorError, err:
634
        val = "Error while checking hypervisor: %s" % str(err)
635
      tmp[hv_name] = val
636

    
637
  if constants.NV_HVPARAMS in what and vm_capable:
638
    result[constants.NV_HVPARAMS] = tmp = []
639
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
640
      try:
641
        logging.info("Validating hv %s, %s", hv_name, hvparms)
642
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
643
      except errors.HypervisorError, err:
644
        tmp.append((source, hv_name, str(err)))
645

    
646
  if constants.NV_FILELIST in what:
647
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
648
      what[constants.NV_FILELIST])
649

    
650
  if constants.NV_NODELIST in what:
651
    (nodes, bynode) = what[constants.NV_NODELIST]
652

    
653
    # Add nodes from other groups (different for each node)
654
    try:
655
      nodes.extend(bynode[my_name])
656
    except KeyError:
657
      pass
658

    
659
    # Use a random order
660
    random.shuffle(nodes)
661

    
662
    # Try to contact all nodes
663
    val = {}
664
    for node in nodes:
665
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
666
      if not success:
667
        val[node] = message
668

    
669
    result[constants.NV_NODELIST] = val
670

    
671
  if constants.NV_NODENETTEST in what:
672
    result[constants.NV_NODENETTEST] = tmp = {}
673
    my_pip = my_sip = None
674
    for name, pip, sip in what[constants.NV_NODENETTEST]:
675
      if name == my_name:
676
        my_pip = pip
677
        my_sip = sip
678
        break
679
    if not my_pip:
680
      tmp[my_name] = ("Can't find my own primary/secondary IP"
681
                      " in the node list")
682
    else:
683
      for name, pip, sip in what[constants.NV_NODENETTEST]:
684
        fail = []
685
        if not netutils.TcpPing(pip, port, source=my_pip):
686
          fail.append("primary")
687
        if sip != pip:
688
          if not netutils.TcpPing(sip, port, source=my_sip):
689
            fail.append("secondary")
690
        if fail:
691
          tmp[name] = ("failure using the %s interface(s)" %
692
                       " and ".join(fail))
693

    
694
  if constants.NV_MASTERIP in what:
695
    # FIXME: add checks on incoming data structures (here and in the
696
    # rest of the function)
697
    master_name, master_ip = what[constants.NV_MASTERIP]
698
    if master_name == my_name:
699
      source = constants.IP4_ADDRESS_LOCALHOST
700
    else:
701
      source = None
702
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
703
                                                     source=source)
704

    
705
  if constants.NV_USERSCRIPTS in what:
706
    result[constants.NV_USERSCRIPTS] = \
707
      [script for script in what[constants.NV_USERSCRIPTS]
708
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
709

    
710
  if constants.NV_OOB_PATHS in what:
711
    result[constants.NV_OOB_PATHS] = tmp = []
712
    for path in what[constants.NV_OOB_PATHS]:
713
      try:
714
        st = os.stat(path)
715
      except OSError, err:
716
        tmp.append("error stating out of band helper: %s" % err)
717
      else:
718
        if stat.S_ISREG(st.st_mode):
719
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
720
            tmp.append(None)
721
          else:
722
            tmp.append("out of band helper %s is not executable" % path)
723
        else:
724
          tmp.append("out of band helper %s is not a file" % path)
725

    
726
  if constants.NV_LVLIST in what and vm_capable:
727
    try:
728
      val = GetVolumeList(utils.ListVolumeGroups().keys())
729
    except RPCFail, err:
730
      val = str(err)
731
    result[constants.NV_LVLIST] = val
732

    
733
  if constants.NV_INSTANCELIST in what and vm_capable:
734
    # GetInstanceList can fail
735
    try:
736
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
737
    except RPCFail, err:
738
      val = str(err)
739
    result[constants.NV_INSTANCELIST] = val
740

    
741
  if constants.NV_VGLIST in what and vm_capable:
742
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
743

    
744
  if constants.NV_PVLIST in what and vm_capable:
745
    result[constants.NV_PVLIST] = \
746
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
747
                                   filter_allocatable=False)
748

    
749
  if constants.NV_VERSION in what:
750
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
751
                                    constants.RELEASE_VERSION)
752

    
753
  if constants.NV_HVINFO in what and vm_capable:
754
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
755
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
756

    
757
  if constants.NV_DRBDLIST in what and vm_capable:
758
    try:
759
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
760
    except errors.BlockDeviceError, err:
761
      logging.warning("Can't get used minors list", exc_info=True)
762
      used_minors = str(err)
763
    result[constants.NV_DRBDLIST] = used_minors
764

    
765
  if constants.NV_DRBDHELPER in what and vm_capable:
766
    status = True
767
    try:
768
      payload = bdev.BaseDRBD.GetUsermodeHelper()
769
    except errors.BlockDeviceError, err:
770
      logging.error("Can't get DRBD usermode helper: %s", str(err))
771
      status = False
772
      payload = str(err)
773
    result[constants.NV_DRBDHELPER] = (status, payload)
774

    
775
  if constants.NV_NODESETUP in what:
776
    result[constants.NV_NODESETUP] = tmpr = []
777
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
778
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
779
                  " under /sys, missing required directories /sys/block"
780
                  " and /sys/class/net")
781
    if (not os.path.isdir("/proc/sys") or
782
        not os.path.isfile("/proc/sysrq-trigger")):
783
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
784
                  " under /proc, missing required directory /proc/sys and"
785
                  " the file /proc/sysrq-trigger")
786

    
787
  if constants.NV_TIME in what:
788
    result[constants.NV_TIME] = utils.SplitTime(time.time())
789

    
790
  if constants.NV_OSLIST in what and vm_capable:
791
    result[constants.NV_OSLIST] = DiagnoseOS()
792

    
793
  if constants.NV_BRIDGES in what and vm_capable:
794
    result[constants.NV_BRIDGES] = [bridge
795
                                    for bridge in what[constants.NV_BRIDGES]
796
                                    if not utils.BridgeExists(bridge)]
797
  return result
798

    
799

    
800
def GetBlockDevSizes(devices):
801
  """Return the size of the given block devices
802

803
  @type devices: list
804
  @param devices: list of block device nodes to query
805
  @rtype: dict
806
  @return:
807
    dictionary of all block devices under /dev (key). The value is their
808
    size in MiB.
809

810
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
811

812
  """
813
  DEV_PREFIX = "/dev/"
814
  blockdevs = {}
815

    
816
  for devpath in devices:
817
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
818
      continue
819

    
820
    try:
821
      st = os.stat(devpath)
822
    except EnvironmentError, err:
823
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
824
      continue
825

    
826
    if stat.S_ISBLK(st.st_mode):
827
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
828
      if result.failed:
829
        # We don't want to fail, just do not list this device as available
830
        logging.warning("Cannot get size for block device %s", devpath)
831
        continue
832

    
833
      size = int(result.stdout) / (1024 * 1024)
834
      blockdevs[devpath] = size
835
  return blockdevs
836

    
837

    
838
def GetVolumeList(vg_names):
839
  """Compute list of logical volumes and their size.
840

841
  @type vg_names: list
842
  @param vg_names: the volume groups whose LVs we should list, or
843
      empty for all volume groups
844
  @rtype: dict
845
  @return:
846
      dictionary of all partions (key) with value being a tuple of
847
      their size (in MiB), inactive and online status::
848

849
        {'xenvg/test1': ('20.06', True, True)}
850

851
      in case of errors, a string is returned with the error
852
      details.
853

854
  """
855
  lvs = {}
856
  sep = "|"
857
  if not vg_names:
858
    vg_names = []
859
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
860
                         "--separator=%s" % sep,
861
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
862
  if result.failed:
863
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
864

    
865
  for line in result.stdout.splitlines():
866
    line = line.strip()
867
    match = _LVSLINE_REGEX.match(line)
868
    if not match:
869
      logging.error("Invalid line returned from lvs output: '%s'", line)
870
      continue
871
    vg_name, name, size, attr = match.groups()
872
    inactive = attr[4] == "-"
873
    online = attr[5] == "o"
874
    virtual = attr[0] == "v"
875
    if virtual:
876
      # we don't want to report such volumes as existing, since they
877
      # don't really hold data
878
      continue
879
    lvs[vg_name + "/" + name] = (size, inactive, online)
880

    
881
  return lvs
882

    
883

    
884
def ListVolumeGroups():
885
  """List the volume groups and their size.
886

887
  @rtype: dict
888
  @return: dictionary with keys volume name and values the
889
      size of the volume
890

891
  """
892
  return utils.ListVolumeGroups()
893

    
894

    
895
def NodeVolumes():
896
  """List all volumes on this node.
897

898
  @rtype: list
899
  @return:
900
    A list of dictionaries, each having four keys:
901
      - name: the logical volume name,
902
      - size: the size of the logical volume
903
      - dev: the physical device on which the LV lives
904
      - vg: the volume group to which it belongs
905

906
    In case of errors, we return an empty list and log the
907
    error.
908

909
    Note that since a logical volume can live on multiple physical
910
    volumes, the resulting list might include a logical volume
911
    multiple times.
912

913
  """
914
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
915
                         "--separator=|",
916
                         "--options=lv_name,lv_size,devices,vg_name"])
917
  if result.failed:
918
    _Fail("Failed to list logical volumes, lvs output: %s",
919
          result.output)
920

    
921
  def parse_dev(dev):
922
    return dev.split("(")[0]
923

    
924
  def handle_dev(dev):
925
    return [parse_dev(x) for x in dev.split(",")]
926

    
927
  def map_line(line):
928
    line = [v.strip() for v in line]
929
    return [{"name": line[0], "size": line[1],
930
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
931

    
932
  all_devs = []
933
  for line in result.stdout.splitlines():
934
    if line.count("|") >= 3:
935
      all_devs.extend(map_line(line.split("|")))
936
    else:
937
      logging.warning("Strange line in the output from lvs: '%s'", line)
938
  return all_devs
939

    
940

    
941
def BridgesExist(bridges_list):
942
  """Check if a list of bridges exist on the current node.
943

944
  @rtype: boolean
945
  @return: C{True} if all of them exist, C{False} otherwise
946

947
  """
948
  missing = []
949
  for bridge in bridges_list:
950
    if not utils.BridgeExists(bridge):
951
      missing.append(bridge)
952

    
953
  if missing:
954
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
955

    
956

    
957
def GetInstanceList(hypervisor_list):
958
  """Provides a list of instances.
959

960
  @type hypervisor_list: list
961
  @param hypervisor_list: the list of hypervisors to query information
962

963
  @rtype: list
964
  @return: a list of all running instances on the current node
965
    - instance1.example.com
966
    - instance2.example.com
967

968
  """
969
  results = []
970
  for hname in hypervisor_list:
971
    try:
972
      names = hypervisor.GetHypervisor(hname).ListInstances()
973
      results.extend(names)
974
    except errors.HypervisorError, err:
975
      _Fail("Error enumerating instances (hypervisor %s): %s",
976
            hname, err, exc=True)
977

    
978
  return results
979

    
980

    
981
def GetInstanceInfo(instance, hname):
982
  """Gives back the information about an instance as a dictionary.
983

984
  @type instance: string
985
  @param instance: the instance name
986
  @type hname: string
987
  @param hname: the hypervisor type of the instance
988

989
  @rtype: dict
990
  @return: dictionary with the following keys:
991
      - memory: memory size of instance (int)
992
      - state: xen state of instance (string)
993
      - time: cpu time of instance (float)
994
      - vcpus: the number of vcpus (int)
995

996
  """
997
  output = {}
998

    
999
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1000
  if iinfo is not None:
1001
    output["memory"] = iinfo[2]
1002
    output["vcpus"] = iinfo[3]
1003
    output["state"] = iinfo[4]
1004
    output["time"] = iinfo[5]
1005

    
1006
  return output
1007

    
1008

    
1009
def GetInstanceMigratable(instance):
1010
  """Gives whether an instance can be migrated.
1011

1012
  @type instance: L{objects.Instance}
1013
  @param instance: object representing the instance to be checked.
1014

1015
  @rtype: tuple
1016
  @return: tuple of (result, description) where:
1017
      - result: whether the instance can be migrated or not
1018
      - description: a description of the issue, if relevant
1019

1020
  """
1021
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1022
  iname = instance.name
1023
  if iname not in hyper.ListInstances():
1024
    _Fail("Instance %s is not running", iname)
1025

    
1026
  for idx in range(len(instance.disks)):
1027
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1028
    if not os.path.islink(link_name):
1029
      logging.warning("Instance %s is missing symlink %s for disk %d",
1030
                      iname, link_name, idx)
1031

    
1032

    
1033
def GetAllInstancesInfo(hypervisor_list):
1034
  """Gather data about all instances.
1035

1036
  This is the equivalent of L{GetInstanceInfo}, except that it
1037
  computes data for all instances at once, thus being faster if one
1038
  needs data about more than one instance.
1039

1040
  @type hypervisor_list: list
1041
  @param hypervisor_list: list of hypervisors to query for instance data
1042

1043
  @rtype: dict
1044
  @return: dictionary of instance: data, with data having the following keys:
1045
      - memory: memory size of instance (int)
1046
      - state: xen state of instance (string)
1047
      - time: cpu time of instance (float)
1048
      - vcpus: the number of vcpus
1049

1050
  """
1051
  output = {}
1052

    
1053
  for hname in hypervisor_list:
1054
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1055
    if iinfo:
1056
      for name, _, memory, vcpus, state, times in iinfo:
1057
        value = {
1058
          "memory": memory,
1059
          "vcpus": vcpus,
1060
          "state": state,
1061
          "time": times,
1062
          }
1063
        if name in output:
1064
          # we only check static parameters, like memory and vcpus,
1065
          # and not state and time which can change between the
1066
          # invocations of the different hypervisors
1067
          for key in "memory", "vcpus":
1068
            if value[key] != output[name][key]:
1069
              _Fail("Instance %s is running twice"
1070
                    " with different parameters", name)
1071
        output[name] = value
1072

    
1073
  return output
1074

    
1075

    
1076
def _InstanceLogName(kind, os_name, instance, component):
1077
  """Compute the OS log filename for a given instance and operation.
1078

1079
  The instance name and os name are passed in as strings since not all
1080
  operations have these as part of an instance object.
1081

1082
  @type kind: string
1083
  @param kind: the operation type (e.g. add, import, etc.)
1084
  @type os_name: string
1085
  @param os_name: the os name
1086
  @type instance: string
1087
  @param instance: the name of the instance being imported/added/etc.
1088
  @type component: string or None
1089
  @param component: the name of the component of the instance being
1090
      transferred
1091

1092
  """
1093
  # TODO: Use tempfile.mkstemp to create unique filename
1094
  if component:
1095
    assert "/" not in component
1096
    c_msg = "-%s" % component
1097
  else:
1098
    c_msg = ""
1099
  base = ("%s-%s-%s%s-%s.log" %
1100
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1101
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1102

    
1103

    
1104
def InstanceOsAdd(instance, reinstall, debug):
1105
  """Add an OS to an instance.
1106

1107
  @type instance: L{objects.Instance}
1108
  @param instance: Instance whose OS is to be installed
1109
  @type reinstall: boolean
1110
  @param reinstall: whether this is an instance reinstall
1111
  @type debug: integer
1112
  @param debug: debug level, passed to the OS scripts
1113
  @rtype: None
1114

1115
  """
1116
  inst_os = OSFromDisk(instance.os)
1117

    
1118
  create_env = OSEnvironment(instance, inst_os, debug)
1119
  if reinstall:
1120
    create_env["INSTANCE_REINSTALL"] = "1"
1121

    
1122
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1123

    
1124
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1125
                        cwd=inst_os.path, output=logfile, reset_env=True)
1126
  if result.failed:
1127
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1128
                  " output: %s", result.cmd, result.fail_reason, logfile,
1129
                  result.output)
1130
    lines = [utils.SafeEncode(val)
1131
             for val in utils.TailFile(logfile, lines=20)]
1132
    _Fail("OS create script failed (%s), last lines in the"
1133
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1134

    
1135

    
1136
def RunRenameInstance(instance, old_name, debug):
1137
  """Run the OS rename script for an instance.
1138

1139
  @type instance: L{objects.Instance}
1140
  @param instance: Instance whose OS is to be installed
1141
  @type old_name: string
1142
  @param old_name: previous instance name
1143
  @type debug: integer
1144
  @param debug: debug level, passed to the OS scripts
1145
  @rtype: boolean
1146
  @return: the success of the operation
1147

1148
  """
1149
  inst_os = OSFromDisk(instance.os)
1150

    
1151
  rename_env = OSEnvironment(instance, inst_os, debug)
1152
  rename_env["OLD_INSTANCE_NAME"] = old_name
1153

    
1154
  logfile = _InstanceLogName("rename", instance.os,
1155
                             "%s-%s" % (old_name, instance.name), None)
1156

    
1157
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1158
                        cwd=inst_os.path, output=logfile, reset_env=True)
1159

    
1160
  if result.failed:
1161
    logging.error("os create command '%s' returned error: %s output: %s",
1162
                  result.cmd, result.fail_reason, result.output)
1163
    lines = [utils.SafeEncode(val)
1164
             for val in utils.TailFile(logfile, lines=20)]
1165
    _Fail("OS rename script failed (%s), last lines in the"
1166
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1167

    
1168

    
1169
def _GetBlockDevSymlinkPath(instance_name, idx):
1170
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1171
                        (instance_name, constants.DISK_SEPARATOR, idx))
1172

    
1173

    
1174
def _SymlinkBlockDev(instance_name, device_path, idx):
1175
  """Set up symlinks to a instance's block device.
1176

1177
  This is an auxiliary function run when an instance is start (on the primary
1178
  node) or when an instance is migrated (on the target node).
1179

1180

1181
  @param instance_name: the name of the target instance
1182
  @param device_path: path of the physical block device, on the node
1183
  @param idx: the disk index
1184
  @return: absolute path to the disk's symlink
1185

1186
  """
1187
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1188
  try:
1189
    os.symlink(device_path, link_name)
1190
  except OSError, err:
1191
    if err.errno == errno.EEXIST:
1192
      if (not os.path.islink(link_name) or
1193
          os.readlink(link_name) != device_path):
1194
        os.remove(link_name)
1195
        os.symlink(device_path, link_name)
1196
    else:
1197
      raise
1198

    
1199
  return link_name
1200

    
1201

    
1202
def _RemoveBlockDevLinks(instance_name, disks):
1203
  """Remove the block device symlinks belonging to the given instance.
1204

1205
  """
1206
  for idx, _ in enumerate(disks):
1207
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1208
    if os.path.islink(link_name):
1209
      try:
1210
        os.remove(link_name)
1211
      except OSError:
1212
        logging.exception("Can't remove symlink '%s'", link_name)
1213

    
1214

    
1215
def _GatherAndLinkBlockDevs(instance):
1216
  """Set up an instance's block device(s).
1217

1218
  This is run on the primary node at instance startup. The block
1219
  devices must be already assembled.
1220

1221
  @type instance: L{objects.Instance}
1222
  @param instance: the instance whose disks we shoul assemble
1223
  @rtype: list
1224
  @return: list of (disk_object, device_path)
1225

1226
  """
1227
  block_devices = []
1228
  for idx, disk in enumerate(instance.disks):
1229
    device = _RecursiveFindBD(disk)
1230
    if device is None:
1231
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1232
                                    str(disk))
1233
    device.Open()
1234
    try:
1235
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1236
    except OSError, e:
1237
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1238
                                    e.strerror)
1239

    
1240
    block_devices.append((disk, link_name))
1241

    
1242
  return block_devices
1243

    
1244

    
1245
def StartInstance(instance, startup_paused):
1246
  """Start an instance.
1247

1248
  @type instance: L{objects.Instance}
1249
  @param instance: the instance object
1250
  @type startup_paused: bool
1251
  @param instance: pause instance at startup?
1252
  @rtype: None
1253

1254
  """
1255
  running_instances = GetInstanceList([instance.hypervisor])
1256

    
1257
  if instance.name in running_instances:
1258
    logging.info("Instance %s already running, not starting", instance.name)
1259
    return
1260

    
1261
  try:
1262
    block_devices = _GatherAndLinkBlockDevs(instance)
1263
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1264
    hyper.StartInstance(instance, block_devices, startup_paused)
1265
  except errors.BlockDeviceError, err:
1266
    _Fail("Block device error: %s", err, exc=True)
1267
  except errors.HypervisorError, err:
1268
    _RemoveBlockDevLinks(instance.name, instance.disks)
1269
    _Fail("Hypervisor error: %s", err, exc=True)
1270

    
1271

    
1272
def InstanceShutdown(instance, timeout):
1273
  """Shut an instance down.
1274

1275
  @note: this functions uses polling with a hardcoded timeout.
1276

1277
  @type instance: L{objects.Instance}
1278
  @param instance: the instance object
1279
  @type timeout: integer
1280
  @param timeout: maximum timeout for soft shutdown
1281
  @rtype: None
1282

1283
  """
1284
  hv_name = instance.hypervisor
1285
  hyper = hypervisor.GetHypervisor(hv_name)
1286
  iname = instance.name
1287

    
1288
  if instance.name not in hyper.ListInstances():
1289
    logging.info("Instance %s not running, doing nothing", iname)
1290
    return
1291

    
1292
  class _TryShutdown:
1293
    def __init__(self):
1294
      self.tried_once = False
1295

    
1296
    def __call__(self):
1297
      if iname not in hyper.ListInstances():
1298
        return
1299

    
1300
      try:
1301
        hyper.StopInstance(instance, retry=self.tried_once)
1302
      except errors.HypervisorError, err:
1303
        if iname not in hyper.ListInstances():
1304
          # if the instance is no longer existing, consider this a
1305
          # success and go to cleanup
1306
          return
1307

    
1308
        _Fail("Failed to stop instance %s: %s", iname, err)
1309

    
1310
      self.tried_once = True
1311

    
1312
      raise utils.RetryAgain()
1313

    
1314
  try:
1315
    utils.Retry(_TryShutdown(), 5, timeout)
1316
  except utils.RetryTimeout:
1317
    # the shutdown did not succeed
1318
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1319

    
1320
    try:
1321
      hyper.StopInstance(instance, force=True)
1322
    except errors.HypervisorError, err:
1323
      if iname in hyper.ListInstances():
1324
        # only raise an error if the instance still exists, otherwise
1325
        # the error could simply be "instance ... unknown"!
1326
        _Fail("Failed to force stop instance %s: %s", iname, err)
1327

    
1328
    time.sleep(1)
1329

    
1330
    if iname in hyper.ListInstances():
1331
      _Fail("Could not shutdown instance %s even by destroy", iname)
1332

    
1333
  try:
1334
    hyper.CleanupInstance(instance.name)
1335
  except errors.HypervisorError, err:
1336
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1337

    
1338
  _RemoveBlockDevLinks(iname, instance.disks)
1339

    
1340

    
1341
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1342
  """Reboot an instance.
1343

1344
  @type instance: L{objects.Instance}
1345
  @param instance: the instance object to reboot
1346
  @type reboot_type: str
1347
  @param reboot_type: the type of reboot, one the following
1348
    constants:
1349
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1350
        instance OS, do not recreate the VM
1351
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1352
        restart the VM (at the hypervisor level)
1353
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1354
        not accepted here, since that mode is handled differently, in
1355
        cmdlib, and translates into full stop and start of the
1356
        instance (instead of a call_instance_reboot RPC)
1357
  @type shutdown_timeout: integer
1358
  @param shutdown_timeout: maximum timeout for soft shutdown
1359
  @rtype: None
1360

1361
  """
1362
  running_instances = GetInstanceList([instance.hypervisor])
1363

    
1364
  if instance.name not in running_instances:
1365
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1366

    
1367
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1368
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1369
    try:
1370
      hyper.RebootInstance(instance)
1371
    except errors.HypervisorError, err:
1372
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1373
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1374
    try:
1375
      InstanceShutdown(instance, shutdown_timeout)
1376
      return StartInstance(instance, False)
1377
    except errors.HypervisorError, err:
1378
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1379
  else:
1380
    _Fail("Invalid reboot_type received: %s", reboot_type)
1381

    
1382

    
1383
def InstanceBalloonMemory(instance, memory):
1384
  """Resize an instance's memory.
1385

1386
  @type instance: L{objects.Instance}
1387
  @param instance: the instance object
1388
  @type memory: int
1389
  @param memory: new memory amount in MB
1390
  @rtype: None
1391

1392
  """
1393
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1394
  running = hyper.ListInstances()
1395
  if instance.name not in running:
1396
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1397
    return
1398
  try:
1399
    hyper.BalloonInstanceMemory(instance, memory)
1400
  except errors.HypervisorError, err:
1401
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1402

    
1403

    
1404
def MigrationInfo(instance):
1405
  """Gather information about an instance to be migrated.
1406

1407
  @type instance: L{objects.Instance}
1408
  @param instance: the instance definition
1409

1410
  """
1411
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1412
  try:
1413
    info = hyper.MigrationInfo(instance)
1414
  except errors.HypervisorError, err:
1415
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1416
  return info
1417

    
1418

    
1419
def AcceptInstance(instance, info, target):
1420
  """Prepare the node to accept an instance.
1421

1422
  @type instance: L{objects.Instance}
1423
  @param instance: the instance definition
1424
  @type info: string/data (opaque)
1425
  @param info: migration information, from the source node
1426
  @type target: string
1427
  @param target: target host (usually ip), on this node
1428

1429
  """
1430
  # TODO: why is this required only for DTS_EXT_MIRROR?
1431
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1432
    # Create the symlinks, as the disks are not active
1433
    # in any way
1434
    try:
1435
      _GatherAndLinkBlockDevs(instance)
1436
    except errors.BlockDeviceError, err:
1437
      _Fail("Block device error: %s", err, exc=True)
1438

    
1439
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1440
  try:
1441
    hyper.AcceptInstance(instance, info, target)
1442
  except errors.HypervisorError, err:
1443
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1444
      _RemoveBlockDevLinks(instance.name, instance.disks)
1445
    _Fail("Failed to accept instance: %s", err, exc=True)
1446

    
1447

    
1448
def FinalizeMigrationDst(instance, info, success):
1449
  """Finalize any preparation to accept an instance.
1450

1451
  @type instance: L{objects.Instance}
1452
  @param instance: the instance definition
1453
  @type info: string/data (opaque)
1454
  @param info: migration information, from the source node
1455
  @type success: boolean
1456
  @param success: whether the migration was a success or a failure
1457

1458
  """
1459
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1460
  try:
1461
    hyper.FinalizeMigrationDst(instance, info, success)
1462
  except errors.HypervisorError, err:
1463
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1464

    
1465

    
1466
def MigrateInstance(instance, target, live):
1467
  """Migrates an instance to another node.
1468

1469
  @type instance: L{objects.Instance}
1470
  @param instance: the instance definition
1471
  @type target: string
1472
  @param target: the target node name
1473
  @type live: boolean
1474
  @param live: whether the migration should be done live or not (the
1475
      interpretation of this parameter is left to the hypervisor)
1476
  @raise RPCFail: if migration fails for some reason
1477

1478
  """
1479
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1480

    
1481
  try:
1482
    hyper.MigrateInstance(instance, target, live)
1483
  except errors.HypervisorError, err:
1484
    _Fail("Failed to migrate instance: %s", err, exc=True)
1485

    
1486

    
1487
def FinalizeMigrationSource(instance, success, live):
1488
  """Finalize the instance migration on the source node.
1489

1490
  @type instance: L{objects.Instance}
1491
  @param instance: the instance definition of the migrated instance
1492
  @type success: bool
1493
  @param success: whether the migration succeeded or not
1494
  @type live: bool
1495
  @param live: whether the user requested a live migration or not
1496
  @raise RPCFail: If the execution fails for some reason
1497

1498
  """
1499
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1500

    
1501
  try:
1502
    hyper.FinalizeMigrationSource(instance, success, live)
1503
  except Exception, err:  # pylint: disable=W0703
1504
    _Fail("Failed to finalize the migration on the source node: %s", err,
1505
          exc=True)
1506

    
1507

    
1508
def GetMigrationStatus(instance):
1509
  """Get the migration status
1510

1511
  @type instance: L{objects.Instance}
1512
  @param instance: the instance that is being migrated
1513
  @rtype: L{objects.MigrationStatus}
1514
  @return: the status of the current migration (one of
1515
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1516
           progress info that can be retrieved from the hypervisor
1517
  @raise RPCFail: If the migration status cannot be retrieved
1518

1519
  """
1520
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1521
  try:
1522
    return hyper.GetMigrationStatus(instance)
1523
  except Exception, err:  # pylint: disable=W0703
1524
    _Fail("Failed to get migration status: %s", err, exc=True)
1525

    
1526

    
1527
def BlockdevCreate(disk, size, owner, on_primary, info):
1528
  """Creates a block device for an instance.
1529

1530
  @type disk: L{objects.Disk}
1531
  @param disk: the object describing the disk we should create
1532
  @type size: int
1533
  @param size: the size of the physical underlying device, in MiB
1534
  @type owner: str
1535
  @param owner: the name of the instance for which disk is created,
1536
      used for device cache data
1537
  @type on_primary: boolean
1538
  @param on_primary:  indicates if it is the primary node or not
1539
  @type info: string
1540
  @param info: string that will be sent to the physical device
1541
      creation, used for example to set (LVM) tags on LVs
1542

1543
  @return: the new unique_id of the device (this can sometime be
1544
      computed only after creation), or None. On secondary nodes,
1545
      it's not required to return anything.
1546

1547
  """
1548
  # TODO: remove the obsolete "size" argument
1549
  # pylint: disable=W0613
1550
  clist = []
1551
  if disk.children:
1552
    for child in disk.children:
1553
      try:
1554
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1555
      except errors.BlockDeviceError, err:
1556
        _Fail("Can't assemble device %s: %s", child, err)
1557
      if on_primary or disk.AssembleOnSecondary():
1558
        # we need the children open in case the device itself has to
1559
        # be assembled
1560
        try:
1561
          # pylint: disable=E1103
1562
          crdev.Open()
1563
        except errors.BlockDeviceError, err:
1564
          _Fail("Can't make child '%s' read-write: %s", child, err)
1565
      clist.append(crdev)
1566

    
1567
  try:
1568
    device = bdev.Create(disk, clist)
1569
  except errors.BlockDeviceError, err:
1570
    _Fail("Can't create block device: %s", err)
1571

    
1572
  if on_primary or disk.AssembleOnSecondary():
1573
    try:
1574
      device.Assemble()
1575
    except errors.BlockDeviceError, err:
1576
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1577
    if on_primary or disk.OpenOnSecondary():
1578
      try:
1579
        device.Open(force=True)
1580
      except errors.BlockDeviceError, err:
1581
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1582
    DevCacheManager.UpdateCache(device.dev_path, owner,
1583
                                on_primary, disk.iv_name)
1584

    
1585
  device.SetInfo(info)
1586

    
1587
  return device.unique_id
1588

    
1589

    
1590
def _WipeDevice(path, offset, size):
1591
  """This function actually wipes the device.
1592

1593
  @param path: The path to the device to wipe
1594
  @param offset: The offset in MiB in the file
1595
  @param size: The size in MiB to write
1596

1597
  """
1598
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1599
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1600
         "count=%d" % size]
1601
  result = utils.RunCmd(cmd)
1602

    
1603
  if result.failed:
1604
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1605
          result.fail_reason, result.output)
1606

    
1607

    
1608
def BlockdevWipe(disk, offset, size):
1609
  """Wipes a block device.
1610

1611
  @type disk: L{objects.Disk}
1612
  @param disk: the disk object we want to wipe
1613
  @type offset: int
1614
  @param offset: The offset in MiB in the file
1615
  @type size: int
1616
  @param size: The size in MiB to write
1617

1618
  """
1619
  try:
1620
    rdev = _RecursiveFindBD(disk)
1621
  except errors.BlockDeviceError:
1622
    rdev = None
1623

    
1624
  if not rdev:
1625
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1626

    
1627
  # Do cross verify some of the parameters
1628
  if offset > rdev.size:
1629
    _Fail("Offset is bigger than device size")
1630
  if (offset + size) > rdev.size:
1631
    _Fail("The provided offset and size to wipe is bigger than device size")
1632

    
1633
  _WipeDevice(rdev.dev_path, offset, size)
1634

    
1635

    
1636
def BlockdevPauseResumeSync(disks, pause):
1637
  """Pause or resume the sync of the block device.
1638

1639
  @type disks: list of L{objects.Disk}
1640
  @param disks: the disks object we want to pause/resume
1641
  @type pause: bool
1642
  @param pause: Wheater to pause or resume
1643

1644
  """
1645
  success = []
1646
  for disk in disks:
1647
    try:
1648
      rdev = _RecursiveFindBD(disk)
1649
    except errors.BlockDeviceError:
1650
      rdev = None
1651

    
1652
    if not rdev:
1653
      success.append((False, ("Cannot change sync for device %s:"
1654
                              " device not found" % disk.iv_name)))
1655
      continue
1656

    
1657
    result = rdev.PauseResumeSync(pause)
1658

    
1659
    if result:
1660
      success.append((result, None))
1661
    else:
1662
      if pause:
1663
        msg = "Pause"
1664
      else:
1665
        msg = "Resume"
1666
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1667

    
1668
  return success
1669

    
1670

    
1671
def BlockdevRemove(disk):
1672
  """Remove a block device.
1673

1674
  @note: This is intended to be called recursively.
1675

1676
  @type disk: L{objects.Disk}
1677
  @param disk: the disk object we should remove
1678
  @rtype: boolean
1679
  @return: the success of the operation
1680

1681
  """
1682
  msgs = []
1683
  try:
1684
    rdev = _RecursiveFindBD(disk)
1685
  except errors.BlockDeviceError, err:
1686
    # probably can't attach
1687
    logging.info("Can't attach to device %s in remove", disk)
1688
    rdev = None
1689
  if rdev is not None:
1690
    r_path = rdev.dev_path
1691
    try:
1692
      rdev.Remove()
1693
    except errors.BlockDeviceError, err:
1694
      msgs.append(str(err))
1695
    if not msgs:
1696
      DevCacheManager.RemoveCache(r_path)
1697

    
1698
  if disk.children:
1699
    for child in disk.children:
1700
      try:
1701
        BlockdevRemove(child)
1702
      except RPCFail, err:
1703
        msgs.append(str(err))
1704

    
1705
  if msgs:
1706
    _Fail("; ".join(msgs))
1707

    
1708

    
1709
def _RecursiveAssembleBD(disk, owner, as_primary):
1710
  """Activate a block device for an instance.
1711

1712
  This is run on the primary and secondary nodes for an instance.
1713

1714
  @note: this function is called recursively.
1715

1716
  @type disk: L{objects.Disk}
1717
  @param disk: the disk we try to assemble
1718
  @type owner: str
1719
  @param owner: the name of the instance which owns the disk
1720
  @type as_primary: boolean
1721
  @param as_primary: if we should make the block device
1722
      read/write
1723

1724
  @return: the assembled device or None (in case no device
1725
      was assembled)
1726
  @raise errors.BlockDeviceError: in case there is an error
1727
      during the activation of the children or the device
1728
      itself
1729

1730
  """
1731
  children = []
1732
  if disk.children:
1733
    mcn = disk.ChildrenNeeded()
1734
    if mcn == -1:
1735
      mcn = 0 # max number of Nones allowed
1736
    else:
1737
      mcn = len(disk.children) - mcn # max number of Nones
1738
    for chld_disk in disk.children:
1739
      try:
1740
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1741
      except errors.BlockDeviceError, err:
1742
        if children.count(None) >= mcn:
1743
          raise
1744
        cdev = None
1745
        logging.error("Error in child activation (but continuing): %s",
1746
                      str(err))
1747
      children.append(cdev)
1748

    
1749
  if as_primary or disk.AssembleOnSecondary():
1750
    r_dev = bdev.Assemble(disk, children)
1751
    result = r_dev
1752
    if as_primary or disk.OpenOnSecondary():
1753
      r_dev.Open()
1754
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1755
                                as_primary, disk.iv_name)
1756

    
1757
  else:
1758
    result = True
1759
  return result
1760

    
1761

    
1762
def BlockdevAssemble(disk, owner, as_primary, idx):
1763
  """Activate a block device for an instance.
1764

1765
  This is a wrapper over _RecursiveAssembleBD.
1766

1767
  @rtype: str or boolean
1768
  @return: a C{/dev/...} path for primary nodes, and
1769
      C{True} for secondary nodes
1770

1771
  """
1772
  try:
1773
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1774
    if isinstance(result, bdev.BlockDev):
1775
      # pylint: disable=E1103
1776
      result = result.dev_path
1777
      if as_primary:
1778
        _SymlinkBlockDev(owner, result, idx)
1779
  except errors.BlockDeviceError, err:
1780
    _Fail("Error while assembling disk: %s", err, exc=True)
1781
  except OSError, err:
1782
    _Fail("Error while symlinking disk: %s", err, exc=True)
1783

    
1784
  return result
1785

    
1786

    
1787
def BlockdevShutdown(disk):
1788
  """Shut down a block device.
1789

1790
  First, if the device is assembled (Attach() is successful), then
1791
  the device is shutdown. Then the children of the device are
1792
  shutdown.
1793

1794
  This function is called recursively. Note that we don't cache the
1795
  children or such, as oppossed to assemble, shutdown of different
1796
  devices doesn't require that the upper device was active.
1797

1798
  @type disk: L{objects.Disk}
1799
  @param disk: the description of the disk we should
1800
      shutdown
1801
  @rtype: None
1802

1803
  """
1804
  msgs = []
1805
  r_dev = _RecursiveFindBD(disk)
1806
  if r_dev is not None:
1807
    r_path = r_dev.dev_path
1808
    try:
1809
      r_dev.Shutdown()
1810
      DevCacheManager.RemoveCache(r_path)
1811
    except errors.BlockDeviceError, err:
1812
      msgs.append(str(err))
1813

    
1814
  if disk.children:
1815
    for child in disk.children:
1816
      try:
1817
        BlockdevShutdown(child)
1818
      except RPCFail, err:
1819
        msgs.append(str(err))
1820

    
1821
  if msgs:
1822
    _Fail("; ".join(msgs))
1823

    
1824

    
1825
def BlockdevAddchildren(parent_cdev, new_cdevs):
1826
  """Extend a mirrored block device.
1827

1828
  @type parent_cdev: L{objects.Disk}
1829
  @param parent_cdev: the disk to which we should add children
1830
  @type new_cdevs: list of L{objects.Disk}
1831
  @param new_cdevs: the list of children which we should add
1832
  @rtype: None
1833

1834
  """
1835
  parent_bdev = _RecursiveFindBD(parent_cdev)
1836
  if parent_bdev is None:
1837
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1838
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1839
  if new_bdevs.count(None) > 0:
1840
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1841
  parent_bdev.AddChildren(new_bdevs)
1842

    
1843

    
1844
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1845
  """Shrink a mirrored block device.
1846

1847
  @type parent_cdev: L{objects.Disk}
1848
  @param parent_cdev: the disk from which we should remove children
1849
  @type new_cdevs: list of L{objects.Disk}
1850
  @param new_cdevs: the list of children which we should remove
1851
  @rtype: None
1852

1853
  """
1854
  parent_bdev = _RecursiveFindBD(parent_cdev)
1855
  if parent_bdev is None:
1856
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1857
  devs = []
1858
  for disk in new_cdevs:
1859
    rpath = disk.StaticDevPath()
1860
    if rpath is None:
1861
      bd = _RecursiveFindBD(disk)
1862
      if bd is None:
1863
        _Fail("Can't find device %s while removing children", disk)
1864
      else:
1865
        devs.append(bd.dev_path)
1866
    else:
1867
      if not utils.IsNormAbsPath(rpath):
1868
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1869
      devs.append(rpath)
1870
  parent_bdev.RemoveChildren(devs)
1871

    
1872

    
1873
def BlockdevGetmirrorstatus(disks):
1874
  """Get the mirroring status of a list of devices.
1875

1876
  @type disks: list of L{objects.Disk}
1877
  @param disks: the list of disks which we should query
1878
  @rtype: disk
1879
  @return: List of L{objects.BlockDevStatus}, one for each disk
1880
  @raise errors.BlockDeviceError: if any of the disks cannot be
1881
      found
1882

1883
  """
1884
  stats = []
1885
  for dsk in disks:
1886
    rbd = _RecursiveFindBD(dsk)
1887
    if rbd is None:
1888
      _Fail("Can't find device %s", dsk)
1889

    
1890
    stats.append(rbd.CombinedSyncStatus())
1891

    
1892
  return stats
1893

    
1894

    
1895
def BlockdevGetmirrorstatusMulti(disks):
1896
  """Get the mirroring status of a list of devices.
1897

1898
  @type disks: list of L{objects.Disk}
1899
  @param disks: the list of disks which we should query
1900
  @rtype: disk
1901
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1902
    success/failure, status is L{objects.BlockDevStatus} on success, string
1903
    otherwise
1904

1905
  """
1906
  result = []
1907
  for disk in disks:
1908
    try:
1909
      rbd = _RecursiveFindBD(disk)
1910
      if rbd is None:
1911
        result.append((False, "Can't find device %s" % disk))
1912
        continue
1913

    
1914
      status = rbd.CombinedSyncStatus()
1915
    except errors.BlockDeviceError, err:
1916
      logging.exception("Error while getting disk status")
1917
      result.append((False, str(err)))
1918
    else:
1919
      result.append((True, status))
1920

    
1921
  assert len(disks) == len(result)
1922

    
1923
  return result
1924

    
1925

    
1926
def _RecursiveFindBD(disk):
1927
  """Check if a device is activated.
1928

1929
  If so, return information about the real device.
1930

1931
  @type disk: L{objects.Disk}
1932
  @param disk: the disk object we need to find
1933

1934
  @return: None if the device can't be found,
1935
      otherwise the device instance
1936

1937
  """
1938
  children = []
1939
  if disk.children:
1940
    for chdisk in disk.children:
1941
      children.append(_RecursiveFindBD(chdisk))
1942

    
1943
  return bdev.FindDevice(disk, children)
1944

    
1945

    
1946
def _OpenRealBD(disk):
1947
  """Opens the underlying block device of a disk.
1948

1949
  @type disk: L{objects.Disk}
1950
  @param disk: the disk object we want to open
1951

1952
  """
1953
  real_disk = _RecursiveFindBD(disk)
1954
  if real_disk is None:
1955
    _Fail("Block device '%s' is not set up", disk)
1956

    
1957
  real_disk.Open()
1958

    
1959
  return real_disk
1960

    
1961

    
1962
def BlockdevFind(disk):
1963
  """Check if a device is activated.
1964

1965
  If it is, return information about the real device.
1966

1967
  @type disk: L{objects.Disk}
1968
  @param disk: the disk to find
1969
  @rtype: None or objects.BlockDevStatus
1970
  @return: None if the disk cannot be found, otherwise a the current
1971
           information
1972

1973
  """
1974
  try:
1975
    rbd = _RecursiveFindBD(disk)
1976
  except errors.BlockDeviceError, err:
1977
    _Fail("Failed to find device: %s", err, exc=True)
1978

    
1979
  if rbd is None:
1980
    return None
1981

    
1982
  return rbd.GetSyncStatus()
1983

    
1984

    
1985
def BlockdevGetsize(disks):
1986
  """Computes the size of the given disks.
1987

1988
  If a disk is not found, returns None instead.
1989

1990
  @type disks: list of L{objects.Disk}
1991
  @param disks: the list of disk to compute the size for
1992
  @rtype: list
1993
  @return: list with elements None if the disk cannot be found,
1994
      otherwise the size
1995

1996
  """
1997
  result = []
1998
  for cf in disks:
1999
    try:
2000
      rbd = _RecursiveFindBD(cf)
2001
    except errors.BlockDeviceError:
2002
      result.append(None)
2003
      continue
2004
    if rbd is None:
2005
      result.append(None)
2006
    else:
2007
      result.append(rbd.GetActualSize())
2008
  return result
2009

    
2010

    
2011
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2012
  """Export a block device to a remote node.
2013

2014
  @type disk: L{objects.Disk}
2015
  @param disk: the description of the disk to export
2016
  @type dest_node: str
2017
  @param dest_node: the destination node to export to
2018
  @type dest_path: str
2019
  @param dest_path: the destination path on the target node
2020
  @type cluster_name: str
2021
  @param cluster_name: the cluster name, needed for SSH hostalias
2022
  @rtype: None
2023

2024
  """
2025
  real_disk = _OpenRealBD(disk)
2026

    
2027
  # the block size on the read dd is 1MiB to match our units
2028
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2029
                               "dd if=%s bs=1048576 count=%s",
2030
                               real_disk.dev_path, str(disk.size))
2031

    
2032
  # we set here a smaller block size as, due to ssh buffering, more
2033
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2034
  # device is not already there or we pass a wrong path; we use
2035
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2036
  # to not buffer too much memory; this means that at best, we flush
2037
  # every 64k, which will not be very fast
2038
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2039
                                " oflag=dsync", dest_path)
2040

    
2041
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2042
                                                   constants.GANETI_RUNAS,
2043
                                                   destcmd)
2044

    
2045
  # all commands have been checked, so we're safe to combine them
2046
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2047

    
2048
  result = utils.RunCmd(["bash", "-c", command])
2049

    
2050
  if result.failed:
2051
    _Fail("Disk copy command '%s' returned error: %s"
2052
          " output: %s", command, result.fail_reason, result.output)
2053

    
2054

    
2055
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2056
  """Write a file to the filesystem.
2057

2058
  This allows the master to overwrite(!) a file. It will only perform
2059
  the operation if the file belongs to a list of configuration files.
2060

2061
  @type file_name: str
2062
  @param file_name: the target file name
2063
  @type data: str
2064
  @param data: the new contents of the file
2065
  @type mode: int
2066
  @param mode: the mode to give the file (can be None)
2067
  @type uid: string
2068
  @param uid: the owner of the file
2069
  @type gid: string
2070
  @param gid: the group of the file
2071
  @type atime: float
2072
  @param atime: the atime to set on the file (can be None)
2073
  @type mtime: float
2074
  @param mtime: the mtime to set on the file (can be None)
2075
  @rtype: None
2076

2077
  """
2078
  file_name = vcluster.LocalizeVirtualPath(file_name)
2079

    
2080
  if not os.path.isabs(file_name):
2081
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2082

    
2083
  if file_name not in _ALLOWED_UPLOAD_FILES:
2084
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2085
          file_name)
2086

    
2087
  raw_data = _Decompress(data)
2088

    
2089
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2090
    _Fail("Invalid username/groupname type")
2091

    
2092
  getents = runtime.GetEnts()
2093
  uid = getents.LookupUser(uid)
2094
  gid = getents.LookupGroup(gid)
2095

    
2096
  utils.SafeWriteFile(file_name, None,
2097
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2098
                      atime=atime, mtime=mtime)
2099

    
2100

    
2101
def RunOob(oob_program, command, node, timeout):
2102
  """Executes oob_program with given command on given node.
2103

2104
  @param oob_program: The path to the executable oob_program
2105
  @param command: The command to invoke on oob_program
2106
  @param node: The node given as an argument to the program
2107
  @param timeout: Timeout after which we kill the oob program
2108

2109
  @return: stdout
2110
  @raise RPCFail: If execution fails for some reason
2111

2112
  """
2113
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2114

    
2115
  if result.failed:
2116
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2117
          result.fail_reason, result.output)
2118

    
2119
  return result.stdout
2120

    
2121

    
2122
def WriteSsconfFiles(values):
2123
  """Update all ssconf files.
2124

2125
  Wrapper around the SimpleStore.WriteFiles.
2126

2127
  """
2128
  ssconf.SimpleStore().WriteFiles(values)
2129

    
2130

    
2131
def _OSOndiskAPIVersion(os_dir):
2132
  """Compute and return the API version of a given OS.
2133

2134
  This function will try to read the API version of the OS residing in
2135
  the 'os_dir' directory.
2136

2137
  @type os_dir: str
2138
  @param os_dir: the directory in which we should look for the OS
2139
  @rtype: tuple
2140
  @return: tuple (status, data) with status denoting the validity and
2141
      data holding either the vaid versions or an error message
2142

2143
  """
2144
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2145

    
2146
  try:
2147
    st = os.stat(api_file)
2148
  except EnvironmentError, err:
2149
    return False, ("Required file '%s' not found under path %s: %s" %
2150
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2151

    
2152
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2153
    return False, ("File '%s' in %s is not a regular file" %
2154
                   (constants.OS_API_FILE, os_dir))
2155

    
2156
  try:
2157
    api_versions = utils.ReadFile(api_file).splitlines()
2158
  except EnvironmentError, err:
2159
    return False, ("Error while reading the API version file at %s: %s" %
2160
                   (api_file, utils.ErrnoOrStr(err)))
2161

    
2162
  try:
2163
    api_versions = [int(version.strip()) for version in api_versions]
2164
  except (TypeError, ValueError), err:
2165
    return False, ("API version(s) can't be converted to integer: %s" %
2166
                   str(err))
2167

    
2168
  return True, api_versions
2169

    
2170

    
2171
def DiagnoseOS(top_dirs=None):
2172
  """Compute the validity for all OSes.
2173

2174
  @type top_dirs: list
2175
  @param top_dirs: the list of directories in which to
2176
      search (if not given defaults to
2177
      L{pathutils.OS_SEARCH_PATH})
2178
  @rtype: list of L{objects.OS}
2179
  @return: a list of tuples (name, path, status, diagnose, variants,
2180
      parameters, api_version) for all (potential) OSes under all
2181
      search paths, where:
2182
          - name is the (potential) OS name
2183
          - path is the full path to the OS
2184
          - status True/False is the validity of the OS
2185
          - diagnose is the error message for an invalid OS, otherwise empty
2186
          - variants is a list of supported OS variants, if any
2187
          - parameters is a list of (name, help) parameters, if any
2188
          - api_version is a list of support OS API versions
2189

2190
  """
2191
  if top_dirs is None:
2192
    top_dirs = pathutils.OS_SEARCH_PATH
2193

    
2194
  result = []
2195
  for dir_name in top_dirs:
2196
    if os.path.isdir(dir_name):
2197
      try:
2198
        f_names = utils.ListVisibleFiles(dir_name)
2199
      except EnvironmentError, err:
2200
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2201
        break
2202
      for name in f_names:
2203
        os_path = utils.PathJoin(dir_name, name)
2204
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2205
        if status:
2206
          diagnose = ""
2207
          variants = os_inst.supported_variants
2208
          parameters = os_inst.supported_parameters
2209
          api_versions = os_inst.api_versions
2210
        else:
2211
          diagnose = os_inst
2212
          variants = parameters = api_versions = []
2213
        result.append((name, os_path, status, diagnose, variants,
2214
                       parameters, api_versions))
2215

    
2216
  return result
2217

    
2218

    
2219
def _TryOSFromDisk(name, base_dir=None):
2220
  """Create an OS instance from disk.
2221

2222
  This function will return an OS instance if the given name is a
2223
  valid OS name.
2224

2225
  @type base_dir: string
2226
  @keyword base_dir: Base directory containing OS installations.
2227
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2228
  @rtype: tuple
2229
  @return: success and either the OS instance if we find a valid one,
2230
      or error message
2231

2232
  """
2233
  if base_dir is None:
2234
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2235
  else:
2236
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2237

    
2238
  if os_dir is None:
2239
    return False, "Directory for OS %s not found in search path" % name
2240

    
2241
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2242
  if not status:
2243
    # push the error up
2244
    return status, api_versions
2245

    
2246
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2247
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2248
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2249

    
2250
  # OS Files dictionary, we will populate it with the absolute path
2251
  # names; if the value is True, then it is a required file, otherwise
2252
  # an optional one
2253
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2254

    
2255
  if max(api_versions) >= constants.OS_API_V15:
2256
    os_files[constants.OS_VARIANTS_FILE] = False
2257

    
2258
  if max(api_versions) >= constants.OS_API_V20:
2259
    os_files[constants.OS_PARAMETERS_FILE] = True
2260
  else:
2261
    del os_files[constants.OS_SCRIPT_VERIFY]
2262

    
2263
  for (filename, required) in os_files.items():
2264
    os_files[filename] = utils.PathJoin(os_dir, filename)
2265

    
2266
    try:
2267
      st = os.stat(os_files[filename])
2268
    except EnvironmentError, err:
2269
      if err.errno == errno.ENOENT and not required:
2270
        del os_files[filename]
2271
        continue
2272
      return False, ("File '%s' under path '%s' is missing (%s)" %
2273
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2274

    
2275
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2276
      return False, ("File '%s' under path '%s' is not a regular file" %
2277
                     (filename, os_dir))
2278

    
2279
    if filename in constants.OS_SCRIPTS:
2280
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2281
        return False, ("File '%s' under path '%s' is not executable" %
2282
                       (filename, os_dir))
2283

    
2284
  variants = []
2285
  if constants.OS_VARIANTS_FILE in os_files:
2286
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2287
    try:
2288
      variants = utils.ReadFile(variants_file).splitlines()
2289
    except EnvironmentError, err:
2290
      # we accept missing files, but not other errors
2291
      if err.errno != errno.ENOENT:
2292
        return False, ("Error while reading the OS variants file at %s: %s" %
2293
                       (variants_file, utils.ErrnoOrStr(err)))
2294

    
2295
  parameters = []
2296
  if constants.OS_PARAMETERS_FILE in os_files:
2297
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2298
    try:
2299
      parameters = utils.ReadFile(parameters_file).splitlines()
2300
    except EnvironmentError, err:
2301
      return False, ("Error while reading the OS parameters file at %s: %s" %
2302
                     (parameters_file, utils.ErrnoOrStr(err)))
2303
    parameters = [v.split(None, 1) for v in parameters]
2304

    
2305
  os_obj = objects.OS(name=name, path=os_dir,
2306
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2307
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2308
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2309
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2310
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2311
                                                 None),
2312
                      supported_variants=variants,
2313
                      supported_parameters=parameters,
2314
                      api_versions=api_versions)
2315
  return True, os_obj
2316

    
2317

    
2318
def OSFromDisk(name, base_dir=None):
2319
  """Create an OS instance from disk.
2320

2321
  This function will return an OS instance if the given name is a
2322
  valid OS name. Otherwise, it will raise an appropriate
2323
  L{RPCFail} exception, detailing why this is not a valid OS.
2324

2325
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2326
  an exception but returns true/false status data.
2327

2328
  @type base_dir: string
2329
  @keyword base_dir: Base directory containing OS installations.
2330
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2331
  @rtype: L{objects.OS}
2332
  @return: the OS instance if we find a valid one
2333
  @raise RPCFail: if we don't find a valid OS
2334

2335
  """
2336
  name_only = objects.OS.GetName(name)
2337
  status, payload = _TryOSFromDisk(name_only, base_dir)
2338

    
2339
  if not status:
2340
    _Fail(payload)
2341

    
2342
  return payload
2343

    
2344

    
2345
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2346
  """Calculate the basic environment for an os script.
2347

2348
  @type os_name: str
2349
  @param os_name: full operating system name (including variant)
2350
  @type inst_os: L{objects.OS}
2351
  @param inst_os: operating system for which the environment is being built
2352
  @type os_params: dict
2353
  @param os_params: the OS parameters
2354
  @type debug: integer
2355
  @param debug: debug level (0 or 1, for OS Api 10)
2356
  @rtype: dict
2357
  @return: dict of environment variables
2358
  @raise errors.BlockDeviceError: if the block device
2359
      cannot be found
2360

2361
  """
2362
  result = {}
2363
  api_version = \
2364
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2365
  result["OS_API_VERSION"] = "%d" % api_version
2366
  result["OS_NAME"] = inst_os.name
2367
  result["DEBUG_LEVEL"] = "%d" % debug
2368

    
2369
  # OS variants
2370
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2371
    variant = objects.OS.GetVariant(os_name)
2372
    if not variant:
2373
      variant = inst_os.supported_variants[0]
2374
  else:
2375
    variant = ""
2376
  result["OS_VARIANT"] = variant
2377

    
2378
  # OS params
2379
  for pname, pvalue in os_params.items():
2380
    result["OSP_%s" % pname.upper()] = pvalue
2381

    
2382
  # Set a default path otherwise programs called by OS scripts (or
2383
  # even hooks called from OS scripts) might break, and we don't want
2384
  # to have each script require setting a PATH variable
2385
  result["PATH"] = constants.HOOKS_PATH
2386

    
2387
  return result
2388

    
2389

    
2390
def OSEnvironment(instance, inst_os, debug=0):
2391
  """Calculate the environment for an os script.
2392

2393
  @type instance: L{objects.Instance}
2394
  @param instance: target instance for the os script run
2395
  @type inst_os: L{objects.OS}
2396
  @param inst_os: operating system for which the environment is being built
2397
  @type debug: integer
2398
  @param debug: debug level (0 or 1, for OS Api 10)
2399
  @rtype: dict
2400
  @return: dict of environment variables
2401
  @raise errors.BlockDeviceError: if the block device
2402
      cannot be found
2403

2404
  """
2405
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2406

    
2407
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2408
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2409

    
2410
  result["HYPERVISOR"] = instance.hypervisor
2411
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2412
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2413
  result["INSTANCE_SECONDARY_NODES"] = \
2414
      ("%s" % " ".join(instance.secondary_nodes))
2415

    
2416
  # Disks
2417
  for idx, disk in enumerate(instance.disks):
2418
    real_disk = _OpenRealBD(disk)
2419
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2420
    result["DISK_%d_ACCESS" % idx] = disk.mode
2421
    if constants.HV_DISK_TYPE in instance.hvparams:
2422
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2423
        instance.hvparams[constants.HV_DISK_TYPE]
2424
    if disk.dev_type in constants.LDS_BLOCK:
2425
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2426
    elif disk.dev_type == constants.LD_FILE:
2427
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2428
        "file:%s" % disk.physical_id[0]
2429

    
2430
  # NICs
2431
  for idx, nic in enumerate(instance.nics):
2432
    result["NIC_%d_MAC" % idx] = nic.mac
2433
    if nic.ip:
2434
      result["NIC_%d_IP" % idx] = nic.ip
2435
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2436
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2437
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2438
    if nic.nicparams[constants.NIC_LINK]:
2439
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2440
    if constants.HV_NIC_TYPE in instance.hvparams:
2441
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2442
        instance.hvparams[constants.HV_NIC_TYPE]
2443

    
2444
  # HV/BE params
2445
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2446
    for key, value in source.items():
2447
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2448

    
2449
  return result
2450

    
2451

    
2452
def BlockdevGrow(disk, amount, dryrun, backingstore):
2453
  """Grow a stack of block devices.
2454

2455
  This function is called recursively, with the childrens being the
2456
  first ones to resize.
2457

2458
  @type disk: L{objects.Disk}
2459
  @param disk: the disk to be grown
2460
  @type amount: integer
2461
  @param amount: the amount (in mebibytes) to grow with
2462
  @type dryrun: boolean
2463
  @param dryrun: whether to execute the operation in simulation mode
2464
      only, without actually increasing the size
2465
  @param backingstore: whether to execute the operation on backing storage
2466
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2467
      whereas LVM, file, RBD are backing storage
2468
  @rtype: (status, result)
2469
  @return: a tuple with the status of the operation (True/False), and
2470
      the errors message if status is False
2471

2472
  """
2473
  r_dev = _RecursiveFindBD(disk)
2474
  if r_dev is None:
2475
    _Fail("Cannot find block device %s", disk)
2476

    
2477
  try:
2478
    r_dev.Grow(amount, dryrun, backingstore)
2479
  except errors.BlockDeviceError, err:
2480
    _Fail("Failed to grow block device: %s", err, exc=True)
2481

    
2482

    
2483
def BlockdevSnapshot(disk):
2484
  """Create a snapshot copy of a block device.
2485

2486
  This function is called recursively, and the snapshot is actually created
2487
  just for the leaf lvm backend device.
2488

2489
  @type disk: L{objects.Disk}
2490
  @param disk: the disk to be snapshotted
2491
  @rtype: string
2492
  @return: snapshot disk ID as (vg, lv)
2493

2494
  """
2495
  if disk.dev_type == constants.LD_DRBD8:
2496
    if not disk.children:
2497
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2498
            disk.unique_id)
2499
    return BlockdevSnapshot(disk.children[0])
2500
  elif disk.dev_type == constants.LD_LV:
2501
    r_dev = _RecursiveFindBD(disk)
2502
    if r_dev is not None:
2503
      # FIXME: choose a saner value for the snapshot size
2504
      # let's stay on the safe side and ask for the full size, for now
2505
      return r_dev.Snapshot(disk.size)
2506
    else:
2507
      _Fail("Cannot find block device %s", disk)
2508
  else:
2509
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2510
          disk.unique_id, disk.dev_type)
2511

    
2512

    
2513
def FinalizeExport(instance, snap_disks):
2514
  """Write out the export configuration information.
2515

2516
  @type instance: L{objects.Instance}
2517
  @param instance: the instance which we export, used for
2518
      saving configuration
2519
  @type snap_disks: list of L{objects.Disk}
2520
  @param snap_disks: list of snapshot block devices, which
2521
      will be used to get the actual name of the dump file
2522

2523
  @rtype: None
2524

2525
  """
2526
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2527
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2528

    
2529
  config = objects.SerializableConfigParser()
2530

    
2531
  config.add_section(constants.INISECT_EXP)
2532
  config.set(constants.INISECT_EXP, "version", "0")
2533
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2534
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2535
  config.set(constants.INISECT_EXP, "os", instance.os)
2536
  config.set(constants.INISECT_EXP, "compression", "none")
2537

    
2538
  config.add_section(constants.INISECT_INS)
2539
  config.set(constants.INISECT_INS, "name", instance.name)
2540
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2541
             instance.beparams[constants.BE_MAXMEM])
2542
  config.set(constants.INISECT_INS, "minmem", "%d" %
2543
             instance.beparams[constants.BE_MINMEM])
2544
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2545
  config.set(constants.INISECT_INS, "memory", "%d" %
2546
             instance.beparams[constants.BE_MAXMEM])
2547
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2548
             instance.beparams[constants.BE_VCPUS])
2549
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2550
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2551
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2552

    
2553
  nic_total = 0
2554
  for nic_count, nic in enumerate(instance.nics):
2555
    nic_total += 1
2556
    config.set(constants.INISECT_INS, "nic%d_mac" %
2557
               nic_count, "%s" % nic.mac)
2558
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2559
    for param in constants.NICS_PARAMETER_TYPES:
2560
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2561
                 "%s" % nic.nicparams.get(param, None))
2562
  # TODO: redundant: on load can read nics until it doesn't exist
2563
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2564

    
2565
  disk_total = 0
2566
  for disk_count, disk in enumerate(snap_disks):
2567
    if disk:
2568
      disk_total += 1
2569
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2570
                 ("%s" % disk.iv_name))
2571
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2572
                 ("%s" % disk.physical_id[1]))
2573
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2574
                 ("%d" % disk.size))
2575

    
2576
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2577

    
2578
  # New-style hypervisor/backend parameters
2579

    
2580
  config.add_section(constants.INISECT_HYP)
2581
  for name, value in instance.hvparams.items():
2582
    if name not in constants.HVC_GLOBALS:
2583
      config.set(constants.INISECT_HYP, name, str(value))
2584

    
2585
  config.add_section(constants.INISECT_BEP)
2586
  for name, value in instance.beparams.items():
2587
    config.set(constants.INISECT_BEP, name, str(value))
2588

    
2589
  config.add_section(constants.INISECT_OSP)
2590
  for name, value in instance.osparams.items():
2591
    config.set(constants.INISECT_OSP, name, str(value))
2592

    
2593
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2594
                  data=config.Dumps())
2595
  shutil.rmtree(finaldestdir, ignore_errors=True)
2596
  shutil.move(destdir, finaldestdir)
2597

    
2598

    
2599
def ExportInfo(dest):
2600
  """Get export configuration information.
2601

2602
  @type dest: str
2603
  @param dest: directory containing the export
2604

2605
  @rtype: L{objects.SerializableConfigParser}
2606
  @return: a serializable config file containing the
2607
      export info
2608

2609
  """
2610
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2611

    
2612
  config = objects.SerializableConfigParser()
2613
  config.read(cff)
2614

    
2615
  if (not config.has_section(constants.INISECT_EXP) or
2616
      not config.has_section(constants.INISECT_INS)):
2617
    _Fail("Export info file doesn't have the required fields")
2618

    
2619
  return config.Dumps()
2620

    
2621

    
2622
def ListExports():
2623
  """Return a list of exports currently available on this machine.
2624

2625
  @rtype: list
2626
  @return: list of the exports
2627

2628
  """
2629
  if os.path.isdir(pathutils.EXPORT_DIR):
2630
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2631
  else:
2632
    _Fail("No exports directory")
2633

    
2634

    
2635
def RemoveExport(export):
2636
  """Remove an existing export from the node.
2637

2638
  @type export: str
2639
  @param export: the name of the export to remove
2640
  @rtype: None
2641

2642
  """
2643
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2644

    
2645
  try:
2646
    shutil.rmtree(target)
2647
  except EnvironmentError, err:
2648
    _Fail("Error while removing the export: %s", err, exc=True)
2649

    
2650

    
2651
def BlockdevRename(devlist):
2652
  """Rename a list of block devices.
2653

2654
  @type devlist: list of tuples
2655
  @param devlist: list of tuples of the form  (disk,
2656
      new_logical_id, new_physical_id); disk is an
2657
      L{objects.Disk} object describing the current disk,
2658
      and new logical_id/physical_id is the name we
2659
      rename it to
2660
  @rtype: boolean
2661
  @return: True if all renames succeeded, False otherwise
2662

2663
  """
2664
  msgs = []
2665
  result = True
2666
  for disk, unique_id in devlist:
2667
    dev = _RecursiveFindBD(disk)
2668
    if dev is None:
2669
      msgs.append("Can't find device %s in rename" % str(disk))
2670
      result = False
2671
      continue
2672
    try:
2673
      old_rpath = dev.dev_path
2674
      dev.Rename(unique_id)
2675
      new_rpath = dev.dev_path
2676
      if old_rpath != new_rpath:
2677
        DevCacheManager.RemoveCache(old_rpath)
2678
        # FIXME: we should add the new cache information here, like:
2679
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2680
        # but we don't have the owner here - maybe parse from existing
2681
        # cache? for now, we only lose lvm data when we rename, which
2682
        # is less critical than DRBD or MD
2683
    except errors.BlockDeviceError, err:
2684
      msgs.append("Can't rename device '%s' to '%s': %s" %
2685
                  (dev, unique_id, err))
2686
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2687
      result = False
2688
  if not result:
2689
    _Fail("; ".join(msgs))
2690

    
2691

    
2692
def _TransformFileStorageDir(fs_dir):
2693
  """Checks whether given file_storage_dir is valid.
2694

2695
  Checks wheter the given fs_dir is within the cluster-wide default
2696
  file_storage_dir or the shared_file_storage_dir, which are stored in
2697
  SimpleStore. Only paths under those directories are allowed.
2698

2699
  @type fs_dir: str
2700
  @param fs_dir: the path to check
2701

2702
  @return: the normalized path if valid, None otherwise
2703

2704
  """
2705
  if not constants.ENABLE_FILE_STORAGE:
2706
    _Fail("File storage disabled at configure time")
2707
  cfg = _GetConfig()
2708
  fs_dir = os.path.normpath(fs_dir)
2709
  base_fstore = cfg.GetFileStorageDir()
2710
  base_shared = cfg.GetSharedFileStorageDir()
2711
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2712
          utils.IsBelowDir(base_shared, fs_dir)):
2713
    _Fail("File storage directory '%s' is not under base file"
2714
          " storage directory '%s' or shared storage directory '%s'",
2715
          fs_dir, base_fstore, base_shared)
2716
  return fs_dir
2717

    
2718

    
2719
def CreateFileStorageDir(file_storage_dir):
2720
  """Create file storage directory.
2721

2722
  @type file_storage_dir: str
2723
  @param file_storage_dir: directory to create
2724

2725
  @rtype: tuple
2726
  @return: tuple with first element a boolean indicating wheter dir
2727
      creation was successful or not
2728

2729
  """
2730
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2731
  if os.path.exists(file_storage_dir):
2732
    if not os.path.isdir(file_storage_dir):
2733
      _Fail("Specified storage dir '%s' is not a directory",
2734
            file_storage_dir)
2735
  else:
2736
    try:
2737
      os.makedirs(file_storage_dir, 0750)
2738
    except OSError, err:
2739
      _Fail("Cannot create file storage directory '%s': %s",
2740
            file_storage_dir, err, exc=True)
2741

    
2742

    
2743
def RemoveFileStorageDir(file_storage_dir):
2744
  """Remove file storage directory.
2745

2746
  Remove it only if it's empty. If not log an error and return.
2747

2748
  @type file_storage_dir: str
2749
  @param file_storage_dir: the directory we should cleanup
2750
  @rtype: tuple (success,)
2751
  @return: tuple of one element, C{success}, denoting
2752
      whether the operation was successful
2753

2754
  """
2755
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2756
  if os.path.exists(file_storage_dir):
2757
    if not os.path.isdir(file_storage_dir):
2758
      _Fail("Specified Storage directory '%s' is not a directory",
2759
            file_storage_dir)
2760
    # deletes dir only if empty, otherwise we want to fail the rpc call
2761
    try:
2762
      os.rmdir(file_storage_dir)
2763
    except OSError, err:
2764
      _Fail("Cannot remove file storage directory '%s': %s",
2765
            file_storage_dir, err)
2766

    
2767

    
2768
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2769
  """Rename the file storage directory.
2770

2771
  @type old_file_storage_dir: str
2772
  @param old_file_storage_dir: the current path
2773
  @type new_file_storage_dir: str
2774
  @param new_file_storage_dir: the name we should rename to
2775
  @rtype: tuple (success,)
2776
  @return: tuple of one element, C{success}, denoting
2777
      whether the operation was successful
2778

2779
  """
2780
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2781
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2782
  if not os.path.exists(new_file_storage_dir):
2783
    if os.path.isdir(old_file_storage_dir):
2784
      try:
2785
        os.rename(old_file_storage_dir, new_file_storage_dir)
2786
      except OSError, err:
2787
        _Fail("Cannot rename '%s' to '%s': %s",
2788
              old_file_storage_dir, new_file_storage_dir, err)
2789
    else:
2790
      _Fail("Specified storage dir '%s' is not a directory",
2791
            old_file_storage_dir)
2792
  else:
2793
    if os.path.exists(old_file_storage_dir):
2794
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2795
            old_file_storage_dir, new_file_storage_dir)
2796

    
2797

    
2798
def _EnsureJobQueueFile(file_name):
2799
  """Checks whether the given filename is in the queue directory.
2800

2801
  @type file_name: str
2802
  @param file_name: the file name we should check
2803
  @rtype: None
2804
  @raises RPCFail: if the file is not valid
2805

2806
  """
2807
  queue_dir = os.path.normpath(pathutils.QUEUE_DIR)
2808
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2809

    
2810
  if not result:
2811
    _Fail("Passed job queue file '%s' does not belong to"
2812
          " the queue directory '%s'", file_name, queue_dir)
2813

    
2814

    
2815
def JobQueueUpdate(file_name, content):
2816
  """Updates a file in the queue directory.
2817

2818
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2819
  checking.
2820

2821
  @type file_name: str
2822
  @param file_name: the job file name
2823
  @type content: str
2824
  @param content: the new job contents
2825
  @rtype: boolean
2826
  @return: the success of the operation
2827

2828
  """
2829
  file_name = vcluster.LocalizeVirtualPath(file_name)
2830

    
2831
  _EnsureJobQueueFile(file_name)
2832
  getents = runtime.GetEnts()
2833

    
2834
  # Write and replace the file atomically
2835
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2836
                  gid=getents.masterd_gid)
2837

    
2838

    
2839
def JobQueueRename(old, new):
2840
  """Renames a job queue file.
2841

2842
  This is just a wrapper over os.rename with proper checking.
2843

2844
  @type old: str
2845
  @param old: the old (actual) file name
2846
  @type new: str
2847
  @param new: the desired file name
2848
  @rtype: tuple
2849
  @return: the success of the operation and payload
2850

2851
  """
2852
  old = vcluster.LocalizeVirtualPath(old)
2853
  new = vcluster.LocalizeVirtualPath(new)
2854

    
2855
  _EnsureJobQueueFile(old)
2856
  _EnsureJobQueueFile(new)
2857

    
2858
  getents = runtime.GetEnts()
2859

    
2860
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2861
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2862

    
2863

    
2864
def BlockdevClose(instance_name, disks):
2865
  """Closes the given block devices.
2866

2867
  This means they will be switched to secondary mode (in case of
2868
  DRBD).
2869

2870
  @param instance_name: if the argument is not empty, the symlinks
2871
      of this instance will be removed
2872
  @type disks: list of L{objects.Disk}
2873
  @param disks: the list of disks to be closed
2874
  @rtype: tuple (success, message)
2875
  @return: a tuple of success and message, where success
2876
      indicates the succes of the operation, and message
2877
      which will contain the error details in case we
2878
      failed
2879

2880
  """
2881
  bdevs = []
2882
  for cf in disks:
2883
    rd = _RecursiveFindBD(cf)
2884
    if rd is None:
2885
      _Fail("Can't find device %s", cf)
2886
    bdevs.append(rd)
2887

    
2888
  msg = []
2889
  for rd in bdevs:
2890
    try:
2891
      rd.Close()
2892
    except errors.BlockDeviceError, err:
2893
      msg.append(str(err))
2894
  if msg:
2895
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2896
  else:
2897
    if instance_name:
2898
      _RemoveBlockDevLinks(instance_name, disks)
2899

    
2900

    
2901
def ValidateHVParams(hvname, hvparams):
2902
  """Validates the given hypervisor parameters.
2903

2904
  @type hvname: string
2905
  @param hvname: the hypervisor name
2906
  @type hvparams: dict
2907
  @param hvparams: the hypervisor parameters to be validated
2908
  @rtype: None
2909

2910
  """
2911
  try:
2912
    hv_type = hypervisor.GetHypervisor(hvname)
2913
    hv_type.ValidateParameters(hvparams)
2914
  except errors.HypervisorError, err:
2915
    _Fail(str(err), log=False)
2916

    
2917

    
2918
def _CheckOSPList(os_obj, parameters):
2919
  """Check whether a list of parameters is supported by the OS.
2920

2921
  @type os_obj: L{objects.OS}
2922
  @param os_obj: OS object to check
2923
  @type parameters: list
2924
  @param parameters: the list of parameters to check
2925

2926
  """
2927
  supported = [v[0] for v in os_obj.supported_parameters]
2928
  delta = frozenset(parameters).difference(supported)
2929
  if delta:
2930
    _Fail("The following parameters are not supported"
2931
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2932

    
2933

    
2934
def ValidateOS(required, osname, checks, osparams):
2935
  """Validate the given OS' parameters.
2936

2937
  @type required: boolean
2938
  @param required: whether absence of the OS should translate into
2939
      failure or not
2940
  @type osname: string
2941
  @param osname: the OS to be validated
2942
  @type checks: list
2943
  @param checks: list of the checks to run (currently only 'parameters')
2944
  @type osparams: dict
2945
  @param osparams: dictionary with OS parameters
2946
  @rtype: boolean
2947
  @return: True if the validation passed, or False if the OS was not
2948
      found and L{required} was false
2949

2950
  """
2951
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2952
    _Fail("Unknown checks required for OS %s: %s", osname,
2953
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2954

    
2955
  name_only = objects.OS.GetName(osname)
2956
  status, tbv = _TryOSFromDisk(name_only, None)
2957

    
2958
  if not status:
2959
    if required:
2960
      _Fail(tbv)
2961
    else:
2962
      return False
2963

    
2964
  if max(tbv.api_versions) < constants.OS_API_V20:
2965
    return True
2966

    
2967
  if constants.OS_VALIDATE_PARAMETERS in checks:
2968
    _CheckOSPList(tbv, osparams.keys())
2969

    
2970
  validate_env = OSCoreEnv(osname, tbv, osparams)
2971
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2972
                        cwd=tbv.path, reset_env=True)
2973
  if result.failed:
2974
    logging.error("os validate command '%s' returned error: %s output: %s",
2975
                  result.cmd, result.fail_reason, result.output)
2976
    _Fail("OS validation script failed (%s), output: %s",
2977
          result.fail_reason, result.output, log=False)
2978

    
2979
  return True
2980

    
2981

    
2982
def DemoteFromMC():
2983
  """Demotes the current node from master candidate role.
2984

2985
  """
2986
  # try to ensure we're not the master by mistake
2987
  master, myself = ssconf.GetMasterAndMyself()
2988
  if master == myself:
2989
    _Fail("ssconf status shows I'm the master node, will not demote")
2990

    
2991
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
2992
  if not result.failed:
2993
    _Fail("The master daemon is running, will not demote")
2994

    
2995
  try:
2996
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
2997
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
2998
  except EnvironmentError, err:
2999
    if err.errno != errno.ENOENT:
3000
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3001

    
3002
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3003

    
3004

    
3005
def _GetX509Filenames(cryptodir, name):
3006
  """Returns the full paths for the private key and certificate.
3007

3008
  """
3009
  return (utils.PathJoin(cryptodir, name),
3010
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3011
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3012

    
3013

    
3014
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3015
  """Creates a new X509 certificate for SSL/TLS.
3016

3017
  @type validity: int
3018
  @param validity: Validity in seconds
3019
  @rtype: tuple; (string, string)
3020
  @return: Certificate name and public part
3021

3022
  """
3023
  (key_pem, cert_pem) = \
3024
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3025
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3026

    
3027
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3028
                              prefix="x509-%s-" % utils.TimestampForFilename())
3029
  try:
3030
    name = os.path.basename(cert_dir)
3031
    assert len(name) > 5
3032

    
3033
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3034

    
3035
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3036
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3037

    
3038
    # Never return private key as it shouldn't leave the node
3039
    return (name, cert_pem)
3040
  except Exception:
3041
    shutil.rmtree(cert_dir, ignore_errors=True)
3042
    raise
3043

    
3044

    
3045
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3046
  """Removes a X509 certificate.
3047

3048
  @type name: string
3049
  @param name: Certificate name
3050

3051
  """
3052
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3053

    
3054
  utils.RemoveFile(key_file)
3055
  utils.RemoveFile(cert_file)
3056

    
3057
  try:
3058
    os.rmdir(cert_dir)
3059
  except EnvironmentError, err:
3060
    _Fail("Cannot remove certificate directory '%s': %s",
3061
          cert_dir, err)
3062

    
3063

    
3064
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3065
  """Returns the command for the requested input/output.
3066

3067
  @type instance: L{objects.Instance}
3068
  @param instance: The instance object
3069
  @param mode: Import/export mode
3070
  @param ieio: Input/output type
3071
  @param ieargs: Input/output arguments
3072

3073
  """
3074
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3075

    
3076
  env = None
3077
  prefix = None
3078
  suffix = None
3079
  exp_size = None
3080

    
3081
  if ieio == constants.IEIO_FILE:
3082
    (filename, ) = ieargs
3083

    
3084
    if not utils.IsNormAbsPath(filename):
3085
      _Fail("Path '%s' is not normalized or absolute", filename)
3086

    
3087
    real_filename = os.path.realpath(filename)
3088
    directory = os.path.dirname(real_filename)
3089

    
3090
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3091
      _Fail("File '%s' is not under exports directory '%s': %s",
3092
            filename, pathutils.EXPORT_DIR, real_filename)
3093

    
3094
    # Create directory
3095
    utils.Makedirs(directory, mode=0750)
3096

    
3097
    quoted_filename = utils.ShellQuote(filename)
3098

    
3099
    if mode == constants.IEM_IMPORT:
3100
      suffix = "> %s" % quoted_filename
3101
    elif mode == constants.IEM_EXPORT:
3102
      suffix = "< %s" % quoted_filename
3103

    
3104
      # Retrieve file size
3105
      try:
3106
        st = os.stat(filename)
3107
      except EnvironmentError, err:
3108
        logging.error("Can't stat(2) %s: %s", filename, err)
3109
      else:
3110
        exp_size = utils.BytesToMebibyte(st.st_size)
3111

    
3112
  elif ieio == constants.IEIO_RAW_DISK:
3113
    (disk, ) = ieargs
3114

    
3115
    real_disk = _OpenRealBD(disk)
3116

    
3117
    if mode == constants.IEM_IMPORT:
3118
      # we set here a smaller block size as, due to transport buffering, more
3119
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3120
      # is not already there or we pass a wrong path; we use notrunc to no
3121
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3122
      # much memory; this means that at best, we flush every 64k, which will
3123
      # not be very fast
3124
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3125
                                    " bs=%s oflag=dsync"),
3126
                                    real_disk.dev_path,
3127
                                    str(64 * 1024))
3128

    
3129
    elif mode == constants.IEM_EXPORT:
3130
      # the block size on the read dd is 1MiB to match our units
3131
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3132
                                   real_disk.dev_path,
3133
                                   str(1024 * 1024), # 1 MB
3134
                                   str(disk.size))
3135
      exp_size = disk.size
3136

    
3137
  elif ieio == constants.IEIO_SCRIPT:
3138
    (disk, disk_index, ) = ieargs
3139

    
3140
    assert isinstance(disk_index, (int, long))
3141

    
3142
    real_disk = _OpenRealBD(disk)
3143

    
3144
    inst_os = OSFromDisk(instance.os)
3145
    env = OSEnvironment(instance, inst_os)
3146

    
3147
    if mode == constants.IEM_IMPORT:
3148
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3149
      env["IMPORT_INDEX"] = str(disk_index)
3150
      script = inst_os.import_script
3151

    
3152
    elif mode == constants.IEM_EXPORT:
3153
      env["EXPORT_DEVICE"] = real_disk.dev_path
3154
      env["EXPORT_INDEX"] = str(disk_index)
3155
      script = inst_os.export_script
3156

    
3157
    # TODO: Pass special environment only to script
3158
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3159

    
3160
    if mode == constants.IEM_IMPORT:
3161
      suffix = "| %s" % script_cmd
3162

    
3163
    elif mode == constants.IEM_EXPORT:
3164
      prefix = "%s |" % script_cmd
3165

    
3166
    # Let script predict size
3167
    exp_size = constants.IE_CUSTOM_SIZE
3168

    
3169
  else:
3170
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3171

    
3172
  return (env, prefix, suffix, exp_size)
3173

    
3174

    
3175
def _CreateImportExportStatusDir(prefix):
3176
  """Creates status directory for import/export.
3177

3178
  """
3179
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3180
                          prefix=("%s-%s-" %
3181
                                  (prefix, utils.TimestampForFilename())))
3182

    
3183

    
3184
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3185
                            ieio, ieioargs):
3186
  """Starts an import or export daemon.
3187

3188
  @param mode: Import/output mode
3189
  @type opts: L{objects.ImportExportOptions}
3190
  @param opts: Daemon options
3191
  @type host: string
3192
  @param host: Remote host for export (None for import)
3193
  @type port: int
3194
  @param port: Remote port for export (None for import)
3195
  @type instance: L{objects.Instance}
3196
  @param instance: Instance object
3197
  @type component: string
3198
  @param component: which part of the instance is transferred now,
3199
      e.g. 'disk/0'
3200
  @param ieio: Input/output type
3201
  @param ieioargs: Input/output arguments
3202

3203
  """
3204
  if mode == constants.IEM_IMPORT:
3205
    prefix = "import"
3206

    
3207
    if not (host is None and port is None):
3208
      _Fail("Can not specify host or port on import")
3209

    
3210
  elif mode == constants.IEM_EXPORT:
3211
    prefix = "export"
3212

    
3213
    if host is None or port is None:
3214
      _Fail("Host and port must be specified for an export")
3215

    
3216
  else:
3217
    _Fail("Invalid mode %r", mode)
3218

    
3219
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3220
    _Fail("Cluster certificate can only be used for both key and CA")
3221

    
3222
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3223
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3224

    
3225
  if opts.key_name is None:
3226
    # Use server.pem
3227
    key_path = pathutils.NODED_CERT_FILE
3228
    cert_path = pathutils.NODED_CERT_FILE
3229
    assert opts.ca_pem is None
3230
  else:
3231
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3232
                                                 opts.key_name)
3233
    assert opts.ca_pem is not None
3234

    
3235
  for i in [key_path, cert_path]:
3236
    if not os.path.exists(i):
3237
      _Fail("File '%s' does not exist" % i)
3238

    
3239
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3240
  try:
3241
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3242
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3243
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3244

    
3245
    if opts.ca_pem is None:
3246
      # Use server.pem
3247
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3248
    else:
3249
      ca = opts.ca_pem
3250

    
3251
    # Write CA file
3252
    utils.WriteFile(ca_file, data=ca, mode=0400)
3253

    
3254
    cmd = [
3255
      pathutils.IMPORT_EXPORT_DAEMON,
3256
      status_file, mode,
3257
      "--key=%s" % key_path,
3258
      "--cert=%s" % cert_path,
3259
      "--ca=%s" % ca_file,
3260
      ]
3261

    
3262
    if host:
3263
      cmd.append("--host=%s" % host)
3264

    
3265
    if port:
3266
      cmd.append("--port=%s" % port)
3267

    
3268
    if opts.ipv6:
3269
      cmd.append("--ipv6")
3270
    else:
3271
      cmd.append("--ipv4")
3272

    
3273
    if opts.compress:
3274
      cmd.append("--compress=%s" % opts.compress)
3275

    
3276
    if opts.magic:
3277
      cmd.append("--magic=%s" % opts.magic)
3278

    
3279
    if exp_size is not None:
3280
      cmd.append("--expected-size=%s" % exp_size)
3281

    
3282
    if cmd_prefix:
3283
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3284

    
3285
    if cmd_suffix:
3286
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3287

    
3288
    if mode == constants.IEM_EXPORT:
3289
      # Retry connection a few times when connecting to remote peer
3290
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3291
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3292
    elif opts.connect_timeout is not None:
3293
      assert mode == constants.IEM_IMPORT
3294
      # Overall timeout for establishing connection while listening
3295
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3296

    
3297
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3298

    
3299
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3300
    # support for receiving a file descriptor for output
3301
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3302
                      output=logfile)
3303

    
3304
    # The import/export name is simply the status directory name
3305
    return os.path.basename(status_dir)
3306

    
3307
  except Exception:
3308
    shutil.rmtree(status_dir, ignore_errors=True)
3309
    raise
3310

    
3311

    
3312
def GetImportExportStatus(names):
3313
  """Returns import/export daemon status.
3314

3315
  @type names: sequence
3316
  @param names: List of names
3317
  @rtype: List of dicts
3318
  @return: Returns a list of the state of each named import/export or None if a
3319
           status couldn't be read
3320

3321
  """
3322
  result = []
3323

    
3324
  for name in names:
3325
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3326
                                 _IES_STATUS_FILE)
3327

    
3328
    try:
3329
      data = utils.ReadFile(status_file)
3330
    except EnvironmentError, err:
3331
      if err.errno != errno.ENOENT:
3332
        raise
3333
      data = None
3334

    
3335
    if not data:
3336
      result.append(None)
3337
      continue
3338

    
3339
    result.append(serializer.LoadJson(data))
3340

    
3341
  return result
3342

    
3343

    
3344
def AbortImportExport(name):
3345
  """Sends SIGTERM to a running import/export daemon.
3346

3347
  """
3348
  logging.info("Abort import/export %s", name)
3349

    
3350
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3351
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3352

    
3353
  if pid:
3354
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3355
                 name, pid)
3356
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3357

    
3358

    
3359
def CleanupImportExport(name):
3360
  """Cleanup after an import or export.
3361

3362
  If the import/export daemon is still running it's killed. Afterwards the
3363
  whole status directory is removed.
3364

3365
  """
3366
  logging.info("Finalizing import/export %s", name)
3367

    
3368
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3369

    
3370
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3371

    
3372
  if pid:
3373
    logging.info("Import/export %s is still running with PID %s",
3374
                 name, pid)
3375
    utils.KillProcess(pid, waitpid=False)
3376

    
3377
  shutil.rmtree(status_dir, ignore_errors=True)
3378

    
3379

    
3380
def _FindDisks(nodes_ip, disks):
3381
  """Sets the physical ID on disks and returns the block devices.
3382

3383
  """
3384
  # set the correct physical ID
3385
  my_name = netutils.Hostname.GetSysName()
3386
  for cf in disks:
3387
    cf.SetPhysicalID(my_name, nodes_ip)
3388

    
3389
  bdevs = []
3390

    
3391
  for cf in disks:
3392
    rd = _RecursiveFindBD(cf)
3393
    if rd is None:
3394
      _Fail("Can't find device %s", cf)
3395
    bdevs.append(rd)
3396
  return bdevs
3397

    
3398

    
3399
def DrbdDisconnectNet(nodes_ip, disks):
3400
  """Disconnects the network on a list of drbd devices.
3401

3402
  """
3403
  bdevs = _FindDisks(nodes_ip, disks)
3404

    
3405
  # disconnect disks
3406
  for rd in bdevs:
3407
    try:
3408
      rd.DisconnectNet()
3409
    except errors.BlockDeviceError, err:
3410
      _Fail("Can't change network configuration to standalone mode: %s",
3411
            err, exc=True)
3412

    
3413

    
3414
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3415
  """Attaches the network on a list of drbd devices.
3416

3417
  """
3418
  bdevs = _FindDisks(nodes_ip, disks)
3419

    
3420
  if multimaster:
3421
    for idx, rd in enumerate(bdevs):
3422
      try:
3423
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3424
      except EnvironmentError, err:
3425
        _Fail("Can't create symlink: %s", err)
3426
  # reconnect disks, switch to new master configuration and if
3427
  # needed primary mode
3428
  for rd in bdevs:
3429
    try:
3430
      rd.AttachNet(multimaster)
3431
    except errors.BlockDeviceError, err:
3432
      _Fail("Can't change network configuration: %s", err)
3433

    
3434
  # wait until the disks are connected; we need to retry the re-attach
3435
  # if the device becomes standalone, as this might happen if the one
3436
  # node disconnects and reconnects in a different mode before the
3437
  # other node reconnects; in this case, one or both of the nodes will
3438
  # decide it has wrong configuration and switch to standalone
3439

    
3440
  def _Attach():
3441
    all_connected = True
3442

    
3443
    for rd in bdevs:
3444
      stats = rd.GetProcStatus()
3445

    
3446
      all_connected = (all_connected and
3447
                       (stats.is_connected or stats.is_in_resync))
3448

    
3449
      if stats.is_standalone:
3450
        # peer had different config info and this node became
3451
        # standalone, even though this should not happen with the
3452
        # new staged way of changing disk configs
3453
        try:
3454
          rd.AttachNet(multimaster)
3455
        except errors.BlockDeviceError, err:
3456
          _Fail("Can't change network configuration: %s", err)
3457

    
3458
    if not all_connected:
3459
      raise utils.RetryAgain()
3460

    
3461
  try:
3462
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3463
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3464
  except utils.RetryTimeout:
3465
    _Fail("Timeout in disk reconnecting")
3466

    
3467
  if multimaster:
3468
    # change to primary mode
3469
    for rd in bdevs:
3470
      try:
3471
        rd.Open()
3472
      except errors.BlockDeviceError, err:
3473
        _Fail("Can't change to primary mode: %s", err)
3474

    
3475

    
3476
def DrbdWaitSync(nodes_ip, disks):
3477
  """Wait until DRBDs have synchronized.
3478

3479
  """
3480
  def _helper(rd):
3481
    stats = rd.GetProcStatus()
3482
    if not (stats.is_connected or stats.is_in_resync):
3483
      raise utils.RetryAgain()
3484
    return stats
3485

    
3486
  bdevs = _FindDisks(nodes_ip, disks)
3487

    
3488
  min_resync = 100
3489
  alldone = True
3490
  for rd in bdevs:
3491
    try:
3492
      # poll each second for 15 seconds
3493
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3494
    except utils.RetryTimeout:
3495
      stats = rd.GetProcStatus()
3496
      # last check
3497
      if not (stats.is_connected or stats.is_in_resync):
3498
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3499
    alldone = alldone and (not stats.is_in_resync)
3500
    if stats.sync_percent is not None:
3501
      min_resync = min(min_resync, stats.sync_percent)
3502

    
3503
  return (alldone, min_resync)
3504

    
3505

    
3506
def GetDrbdUsermodeHelper():
3507
  """Returns DRBD usermode helper currently configured.
3508

3509
  """
3510
  try:
3511
    return bdev.BaseDRBD.GetUsermodeHelper()
3512
  except errors.BlockDeviceError, err:
3513
    _Fail(str(err))
3514

    
3515

    
3516
def PowercycleNode(hypervisor_type):
3517
  """Hard-powercycle the node.
3518

3519
  Because we need to return first, and schedule the powercycle in the
3520
  background, we won't be able to report failures nicely.
3521

3522
  """
3523
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3524
  try:
3525
    pid = os.fork()
3526
  except OSError:
3527
    # if we can't fork, we'll pretend that we're in the child process
3528
    pid = 0
3529
  if pid > 0:
3530
    return "Reboot scheduled in 5 seconds"
3531
  # ensure the child is running on ram
3532
  try:
3533
    utils.Mlockall()
3534
  except Exception: # pylint: disable=W0703
3535
    pass
3536
  time.sleep(5)
3537
  hyper.PowercycleNode()
3538

    
3539

    
3540
class HooksRunner(object):
3541
  """Hook runner.
3542

3543
  This class is instantiated on the node side (ganeti-noded) and not
3544
  on the master side.
3545

3546
  """
3547
  def __init__(self, hooks_base_dir=None):
3548
    """Constructor for hooks runner.
3549

3550
    @type hooks_base_dir: str or None
3551
    @param hooks_base_dir: if not None, this overrides the
3552
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3553

3554
    """
3555
    if hooks_base_dir is None:
3556
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3557
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3558
    # constant
3559
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3560

    
3561
  def RunLocalHooks(self, node_list, hpath, phase, env):
3562
    """Check that the hooks will be run only locally and then run them.
3563

3564
    """
3565
    assert len(node_list) == 1
3566
    node = node_list[0]
3567
    _, myself = ssconf.GetMasterAndMyself()
3568
    assert node == myself
3569

    
3570
    results = self.RunHooks(hpath, phase, env)
3571

    
3572
    # Return values in the form expected by HooksMaster
3573
    return {node: (None, False, results)}
3574

    
3575
  def RunHooks(self, hpath, phase, env):
3576
    """Run the scripts in the hooks directory.
3577

3578
    @type hpath: str
3579
    @param hpath: the path to the hooks directory which
3580
        holds the scripts
3581
    @type phase: str
3582
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3583
        L{constants.HOOKS_PHASE_POST}
3584
    @type env: dict
3585
    @param env: dictionary with the environment for the hook
3586
    @rtype: list
3587
    @return: list of 3-element tuples:
3588
      - script path
3589
      - script result, either L{constants.HKR_SUCCESS} or
3590
        L{constants.HKR_FAIL}
3591
      - output of the script
3592

3593
    @raise errors.ProgrammerError: for invalid input
3594
        parameters
3595

3596
    """
3597
    if phase == constants.HOOKS_PHASE_PRE:
3598
      suffix = "pre"
3599
    elif phase == constants.HOOKS_PHASE_POST:
3600
      suffix = "post"
3601
    else:
3602
      _Fail("Unknown hooks phase '%s'", phase)
3603

    
3604
    subdir = "%s-%s.d" % (hpath, suffix)
3605
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3606

    
3607
    results = []
3608

    
3609
    if not os.path.isdir(dir_name):
3610
      # for non-existing/non-dirs, we simply exit instead of logging a
3611
      # warning at every operation
3612
      return results
3613

    
3614
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3615

    
3616
    for (relname, relstatus, runresult) in runparts_results:
3617
      if relstatus == constants.RUNPARTS_SKIP:
3618
        rrval = constants.HKR_SKIP
3619
        output = ""
3620
      elif relstatus == constants.RUNPARTS_ERR:
3621
        rrval = constants.HKR_FAIL
3622
        output = "Hook script execution error: %s" % runresult
3623
      elif relstatus == constants.RUNPARTS_RUN:
3624
        if runresult.failed:
3625
          rrval = constants.HKR_FAIL
3626
        else:
3627
          rrval = constants.HKR_SUCCESS
3628
        output = utils.SafeEncode(runresult.output.strip())
3629
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3630

    
3631
    return results
3632

    
3633

    
3634
class IAllocatorRunner(object):
3635
  """IAllocator runner.
3636

3637
  This class is instantiated on the node side (ganeti-noded) and not on
3638
  the master side.
3639

3640
  """
3641
  @staticmethod
3642
  def Run(name, idata):
3643
    """Run an iallocator script.
3644

3645
    @type name: str
3646
    @param name: the iallocator script name
3647
    @type idata: str
3648
    @param idata: the allocator input data
3649

3650
    @rtype: tuple
3651
    @return: two element tuple of:
3652
       - status
3653
       - either error message or stdout of allocator (for success)
3654

3655
    """
3656
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3657
                                  os.path.isfile)
3658
    if alloc_script is None:
3659
      _Fail("iallocator module '%s' not found in the search path", name)
3660

    
3661
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3662
    try:
3663
      os.write(fd, idata)
3664
      os.close(fd)
3665
      result = utils.RunCmd([alloc_script, fin_name])
3666
      if result.failed:
3667
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3668
              name, result.fail_reason, result.output)
3669
    finally:
3670
      os.unlink(fin_name)
3671

    
3672
    return result.stdout
3673

    
3674

    
3675
class DevCacheManager(object):
3676
  """Simple class for managing a cache of block device information.
3677

3678
  """
3679
  _DEV_PREFIX = "/dev/"
3680
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3681

    
3682
  @classmethod
3683
  def _ConvertPath(cls, dev_path):
3684
    """Converts a /dev/name path to the cache file name.
3685

3686
    This replaces slashes with underscores and strips the /dev
3687
    prefix. It then returns the full path to the cache file.
3688

3689
    @type dev_path: str
3690
    @param dev_path: the C{/dev/} path name
3691
    @rtype: str
3692
    @return: the converted path name
3693

3694
    """
3695
    if dev_path.startswith(cls._DEV_PREFIX):
3696
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3697
    dev_path = dev_path.replace("/", "_")
3698
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3699
    return fpath
3700

    
3701
  @classmethod
3702
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3703
    """Updates the cache information for a given device.
3704

3705
    @type dev_path: str
3706
    @param dev_path: the pathname of the device
3707
    @type owner: str
3708
    @param owner: the owner (instance name) of the device
3709
    @type on_primary: bool
3710
    @param on_primary: whether this is the primary
3711
        node nor not
3712
    @type iv_name: str
3713
    @param iv_name: the instance-visible name of the
3714
        device, as in objects.Disk.iv_name
3715

3716
    @rtype: None
3717

3718
    """
3719
    if dev_path is None:
3720
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3721
      return
3722
    fpath = cls._ConvertPath(dev_path)
3723
    if on_primary:
3724
      state = "primary"
3725
    else:
3726
      state = "secondary"
3727
    if iv_name is None:
3728
      iv_name = "not_visible"
3729
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3730
    try:
3731
      utils.WriteFile(fpath, data=fdata)
3732
    except EnvironmentError, err:
3733
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3734

    
3735
  @classmethod
3736
  def RemoveCache(cls, dev_path):
3737
    """Remove data for a dev_path.
3738

3739
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3740
    path name and logging.
3741

3742
    @type dev_path: str
3743
    @param dev_path: the pathname of the device
3744

3745
    @rtype: None
3746

3747
    """
3748
    if dev_path is None:
3749
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3750
      return
3751
    fpath = cls._ConvertPath(dev_path)
3752
    try:
3753
      utils.RemoveFile(fpath)
3754
    except EnvironmentError, err:
3755
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)