Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 19930d75

History | View | Annotate | Download (114.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti import bdev
58
from ganeti import objects
59
from ganeti import ssconf
60
from ganeti import serializer
61
from ganeti import netutils
62
from ganeti import runtime
63
from ganeti import mcpu
64
from ganeti import compat
65
from ganeti import pathutils
66

    
67

    
68
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
69
_ALLOWED_CLEAN_DIRS = frozenset([
70
  pathutils.DATA_DIR,
71
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
72
  pathutils.QUEUE_DIR,
73
  pathutils.CRYPTO_KEYS_DIR,
74
  ])
75
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
76
_X509_KEY_FILE = "key"
77
_X509_CERT_FILE = "cert"
78
_IES_STATUS_FILE = "status"
79
_IES_PID_FILE = "pid"
80
_IES_CA_FILE = "ca"
81

    
82
#: Valid LVS output line regex
83
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
84

    
85
# Actions for the master setup script
86
_MASTER_START = "start"
87
_MASTER_STOP = "stop"
88

    
89

    
90
class RPCFail(Exception):
91
  """Class denoting RPC failure.
92

93
  Its argument is the error message.
94

95
  """
96

    
97

    
98
def _Fail(msg, *args, **kwargs):
99
  """Log an error and the raise an RPCFail exception.
100

101
  This exception is then handled specially in the ganeti daemon and
102
  turned into a 'failed' return type. As such, this function is a
103
  useful shortcut for logging the error and returning it to the master
104
  daemon.
105

106
  @type msg: string
107
  @param msg: the text of the exception
108
  @raise RPCFail
109

110
  """
111
  if args:
112
    msg = msg % args
113
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
114
    if "exc" in kwargs and kwargs["exc"]:
115
      logging.exception(msg)
116
    else:
117
      logging.error(msg)
118
  raise RPCFail(msg)
119

    
120

    
121
def _GetConfig():
122
  """Simple wrapper to return a SimpleStore.
123

124
  @rtype: L{ssconf.SimpleStore}
125
  @return: a SimpleStore instance
126

127
  """
128
  return ssconf.SimpleStore()
129

    
130

    
131
def _GetSshRunner(cluster_name):
132
  """Simple wrapper to return an SshRunner.
133

134
  @type cluster_name: str
135
  @param cluster_name: the cluster name, which is needed
136
      by the SshRunner constructor
137
  @rtype: L{ssh.SshRunner}
138
  @return: an SshRunner instance
139

140
  """
141
  return ssh.SshRunner(cluster_name)
142

    
143

    
144
def _Decompress(data):
145
  """Unpacks data compressed by the RPC client.
146

147
  @type data: list or tuple
148
  @param data: Data sent by RPC client
149
  @rtype: str
150
  @return: Decompressed data
151

152
  """
153
  assert isinstance(data, (list, tuple))
154
  assert len(data) == 2
155
  (encoding, content) = data
156
  if encoding == constants.RPC_ENCODING_NONE:
157
    return content
158
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
159
    return zlib.decompress(base64.b64decode(content))
160
  else:
161
    raise AssertionError("Unknown data encoding")
162

    
163

    
164
def _CleanDirectory(path, exclude=None):
165
  """Removes all regular files in a directory.
166

167
  @type path: str
168
  @param path: the directory to clean
169
  @type exclude: list
170
  @param exclude: list of files to be excluded, defaults
171
      to the empty list
172

173
  """
174
  if path not in _ALLOWED_CLEAN_DIRS:
175
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
176
          path)
177

    
178
  if not os.path.isdir(path):
179
    return
180
  if exclude is None:
181
    exclude = []
182
  else:
183
    # Normalize excluded paths
184
    exclude = [os.path.normpath(i) for i in exclude]
185

    
186
  for rel_name in utils.ListVisibleFiles(path):
187
    full_name = utils.PathJoin(path, rel_name)
188
    if full_name in exclude:
189
      continue
190
    if os.path.isfile(full_name) and not os.path.islink(full_name):
191
      utils.RemoveFile(full_name)
192

    
193

    
194
def _BuildUploadFileList():
195
  """Build the list of allowed upload files.
196

197
  This is abstracted so that it's built only once at module import time.
198

199
  """
200
  allowed_files = set([
201
    pathutils.CLUSTER_CONF_FILE,
202
    constants.ETC_HOSTS,
203
    pathutils.SSH_KNOWN_HOSTS_FILE,
204
    pathutils.VNC_PASSWORD_FILE,
205
    pathutils.RAPI_CERT_FILE,
206
    pathutils.SPICE_CERT_FILE,
207
    pathutils.SPICE_CACERT_FILE,
208
    pathutils.RAPI_USERS_FILE,
209
    pathutils.CONFD_HMAC_KEY,
210
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
211
    ])
212

    
213
  for hv_name in constants.HYPER_TYPES:
214
    hv_class = hypervisor.GetHypervisorClass(hv_name)
215
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
216

    
217
  return frozenset(allowed_files)
218

    
219

    
220
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
221

    
222

    
223
def JobQueuePurge():
224
  """Removes job queue files and archived jobs.
225

226
  @rtype: tuple
227
  @return: True, None
228

229
  """
230
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
231
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
232

    
233

    
234
def GetMasterInfo():
235
  """Returns master information.
236

237
  This is an utility function to compute master information, either
238
  for consumption here or from the node daemon.
239

240
  @rtype: tuple
241
  @return: master_netdev, master_ip, master_name, primary_ip_family,
242
    master_netmask
243
  @raise RPCFail: in case of errors
244

245
  """
246
  try:
247
    cfg = _GetConfig()
248
    master_netdev = cfg.GetMasterNetdev()
249
    master_ip = cfg.GetMasterIP()
250
    master_netmask = cfg.GetMasterNetmask()
251
    master_node = cfg.GetMasterNode()
252
    primary_ip_family = cfg.GetPrimaryIPFamily()
253
  except errors.ConfigurationError, err:
254
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
255
  return (master_netdev, master_ip, master_node, primary_ip_family,
256
          master_netmask)
257

    
258

    
259
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
260
  """Decorator that runs hooks before and after the decorated function.
261

262
  @type hook_opcode: string
263
  @param hook_opcode: opcode of the hook
264
  @type hooks_path: string
265
  @param hooks_path: path of the hooks
266
  @type env_builder_fn: function
267
  @param env_builder_fn: function that returns a dictionary containing the
268
    environment variables for the hooks. Will get all the parameters of the
269
    decorated function.
270
  @raise RPCFail: in case of pre-hook failure
271

272
  """
273
  def decorator(fn):
274
    def wrapper(*args, **kwargs):
275
      _, myself = ssconf.GetMasterAndMyself()
276
      nodes = ([myself], [myself])  # these hooks run locally
277

    
278
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
279

    
280
      cfg = _GetConfig()
281
      hr = HooksRunner()
282
      hm = mcpu.HooksMaster(hook_opcode, hooks_path, nodes, hr.RunLocalHooks,
283
                            None, env_fn, logging.warning, cfg.GetClusterName(),
284
                            cfg.GetMasterNode())
285

    
286
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
287
      result = fn(*args, **kwargs)
288
      hm.RunPhase(constants.HOOKS_PHASE_POST)
289

    
290
      return result
291
    return wrapper
292
  return decorator
293

    
294

    
295
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
296
  """Builds environment variables for master IP hooks.
297

298
  @type master_params: L{objects.MasterNetworkParameters}
299
  @param master_params: network parameters of the master
300
  @type use_external_mip_script: boolean
301
  @param use_external_mip_script: whether to use an external master IP
302
    address setup script (unused, but necessary per the implementation of the
303
    _RunLocalHooks decorator)
304

305
  """
306
  # pylint: disable=W0613
307
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
308
  env = {
309
    "MASTER_NETDEV": master_params.netdev,
310
    "MASTER_IP": master_params.ip,
311
    "MASTER_NETMASK": str(master_params.netmask),
312
    "CLUSTER_IP_VERSION": str(ver),
313
  }
314

    
315
  return env
316

    
317

    
318
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
319
  """Execute the master IP address setup script.
320

321
  @type master_params: L{objects.MasterNetworkParameters}
322
  @param master_params: network parameters of the master
323
  @type action: string
324
  @param action: action to pass to the script. Must be one of
325
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
326
  @type use_external_mip_script: boolean
327
  @param use_external_mip_script: whether to use an external master IP
328
    address setup script
329
  @raise backend.RPCFail: if there are errors during the execution of the
330
    script
331

332
  """
333
  env = _BuildMasterIpEnv(master_params)
334

    
335
  if use_external_mip_script:
336
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
337
  else:
338
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
339

    
340
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
341

    
342
  if result.failed:
343
    _Fail("Failed to %s the master IP. Script return value: %s" %
344
          (action, result.exit_code), log=True)
345

    
346

    
347
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
348
               _BuildMasterIpEnv)
349
def ActivateMasterIp(master_params, use_external_mip_script):
350
  """Activate the IP address of the master daemon.
351

352
  @type master_params: L{objects.MasterNetworkParameters}
353
  @param master_params: network parameters of the master
354
  @type use_external_mip_script: boolean
355
  @param use_external_mip_script: whether to use an external master IP
356
    address setup script
357
  @raise RPCFail: in case of errors during the IP startup
358

359
  """
360
  _RunMasterSetupScript(master_params, _MASTER_START,
361
                        use_external_mip_script)
362

    
363

    
364
def StartMasterDaemons(no_voting):
365
  """Activate local node as master node.
366

367
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
368

369
  @type no_voting: boolean
370
  @param no_voting: whether to start ganeti-masterd without a node vote
371
      but still non-interactively
372
  @rtype: None
373

374
  """
375

    
376
  if no_voting:
377
    masterd_args = "--no-voting --yes-do-it"
378
  else:
379
    masterd_args = ""
380

    
381
  env = {
382
    "EXTRA_MASTERD_ARGS": masterd_args,
383
    }
384

    
385
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
386
  if result.failed:
387
    msg = "Can't start Ganeti master: %s" % result.output
388
    logging.error(msg)
389
    _Fail(msg)
390

    
391

    
392
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
393
               _BuildMasterIpEnv)
394
def DeactivateMasterIp(master_params, use_external_mip_script):
395
  """Deactivate the master IP on this node.
396

397
  @type master_params: L{objects.MasterNetworkParameters}
398
  @param master_params: network parameters of the master
399
  @type use_external_mip_script: boolean
400
  @param use_external_mip_script: whether to use an external master IP
401
    address setup script
402
  @raise RPCFail: in case of errors during the IP turndown
403

404
  """
405
  _RunMasterSetupScript(master_params, _MASTER_STOP,
406
                        use_external_mip_script)
407

    
408

    
409
def StopMasterDaemons():
410
  """Stop the master daemons on this node.
411

412
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
413

414
  @rtype: None
415

416
  """
417
  # TODO: log and report back to the caller the error failures; we
418
  # need to decide in which case we fail the RPC for this
419

    
420
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
421
  if result.failed:
422
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
423
                  " and error %s",
424
                  result.cmd, result.exit_code, result.output)
425

    
426

    
427
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
428
  """Change the netmask of the master IP.
429

430
  @param old_netmask: the old value of the netmask
431
  @param netmask: the new value of the netmask
432
  @param master_ip: the master IP
433
  @param master_netdev: the master network device
434

435
  """
436
  if old_netmask == netmask:
437
    return
438

    
439
  if not netutils.IPAddress.Own(master_ip):
440
    _Fail("The master IP address is not up, not attempting to change its"
441
          " netmask")
442

    
443
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
444
                         "%s/%s" % (master_ip, netmask),
445
                         "dev", master_netdev, "label",
446
                         "%s:0" % master_netdev])
447
  if result.failed:
448
    _Fail("Could not set the new netmask on the master IP address")
449

    
450
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
451
                         "%s/%s" % (master_ip, old_netmask),
452
                         "dev", master_netdev, "label",
453
                         "%s:0" % master_netdev])
454
  if result.failed:
455
    _Fail("Could not bring down the master IP address with the old netmask")
456

    
457

    
458
def EtcHostsModify(mode, host, ip):
459
  """Modify a host entry in /etc/hosts.
460

461
  @param mode: The mode to operate. Either add or remove entry
462
  @param host: The host to operate on
463
  @param ip: The ip associated with the entry
464

465
  """
466
  if mode == constants.ETC_HOSTS_ADD:
467
    if not ip:
468
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
469
              " present")
470
    utils.AddHostToEtcHosts(host, ip)
471
  elif mode == constants.ETC_HOSTS_REMOVE:
472
    if ip:
473
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
474
              " parameter is present")
475
    utils.RemoveHostFromEtcHosts(host)
476
  else:
477
    RPCFail("Mode not supported")
478

    
479

    
480
def LeaveCluster(modify_ssh_setup):
481
  """Cleans up and remove the current node.
482

483
  This function cleans up and prepares the current node to be removed
484
  from the cluster.
485

486
  If processing is successful, then it raises an
487
  L{errors.QuitGanetiException} which is used as a special case to
488
  shutdown the node daemon.
489

490
  @param modify_ssh_setup: boolean
491

492
  """
493
  _CleanDirectory(pathutils.DATA_DIR)
494
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
495
  JobQueuePurge()
496

    
497
  if modify_ssh_setup:
498
    try:
499
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
500

    
501
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
502

    
503
      utils.RemoveFile(priv_key)
504
      utils.RemoveFile(pub_key)
505
    except errors.OpExecError:
506
      logging.exception("Error while processing ssh files")
507

    
508
  try:
509
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
510
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
511
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
512
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
513
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
514
  except: # pylint: disable=W0702
515
    logging.exception("Error while removing cluster secrets")
516

    
517
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
518
  if result.failed:
519
    logging.error("Command %s failed with exitcode %s and error %s",
520
                  result.cmd, result.exit_code, result.output)
521

    
522
  # Raise a custom exception (handled in ganeti-noded)
523
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
524

    
525

    
526
def _GetVgInfo(name):
527
  """Retrieves information about a LVM volume group.
528

529
  """
530
  # TODO: GetVGInfo supports returning information for multiple VGs at once
531
  vginfo = bdev.LogicalVolume.GetVGInfo([name])
532
  if vginfo:
533
    vg_free = int(round(vginfo[0][0], 0))
534
    vg_size = int(round(vginfo[0][1], 0))
535
  else:
536
    vg_free = None
537
    vg_size = None
538

    
539
  return {
540
    "name": name,
541
    "vg_free": vg_free,
542
    "vg_size": vg_size,
543
    }
544

    
545

    
546
def _GetHvInfo(name):
547
  """Retrieves node information from a hypervisor.
548

549
  The information returned depends on the hypervisor. Common items:
550

551
    - vg_size is the size of the configured volume group in MiB
552
    - vg_free is the free size of the volume group in MiB
553
    - memory_dom0 is the memory allocated for domain0 in MiB
554
    - memory_free is the currently available (free) ram in MiB
555
    - memory_total is the total number of ram in MiB
556
    - hv_version: the hypervisor version, if available
557

558
  """
559
  return hypervisor.GetHypervisor(name).GetNodeInfo()
560

    
561

    
562
def _GetNamedNodeInfo(names, fn):
563
  """Calls C{fn} for all names in C{names} and returns a dictionary.
564

565
  @rtype: None or dict
566

567
  """
568
  if names is None:
569
    return None
570
  else:
571
    return map(fn, names)
572

    
573

    
574
def GetNodeInfo(vg_names, hv_names):
575
  """Gives back a hash with different information about the node.
576

577
  @type vg_names: list of string
578
  @param vg_names: Names of the volume groups to ask for disk space information
579
  @type hv_names: list of string
580
  @param hv_names: Names of the hypervisors to ask for node information
581
  @rtype: tuple; (string, None/dict, None/dict)
582
  @return: Tuple containing boot ID, volume group information and hypervisor
583
    information
584

585
  """
586
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
587
  vg_info = _GetNamedNodeInfo(vg_names, _GetVgInfo)
588
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
589

    
590
  return (bootid, vg_info, hv_info)
591

    
592

    
593
def VerifyNode(what, cluster_name):
594
  """Verify the status of the local node.
595

596
  Based on the input L{what} parameter, various checks are done on the
597
  local node.
598

599
  If the I{filelist} key is present, this list of
600
  files is checksummed and the file/checksum pairs are returned.
601

602
  If the I{nodelist} key is present, we check that we have
603
  connectivity via ssh with the target nodes (and check the hostname
604
  report).
605

606
  If the I{node-net-test} key is present, we check that we have
607
  connectivity to the given nodes via both primary IP and, if
608
  applicable, secondary IPs.
609

610
  @type what: C{dict}
611
  @param what: a dictionary of things to check:
612
      - filelist: list of files for which to compute checksums
613
      - nodelist: list of nodes we should check ssh communication with
614
      - node-net-test: list of nodes we should check node daemon port
615
        connectivity with
616
      - hypervisor: list with hypervisors to run the verify for
617
  @rtype: dict
618
  @return: a dictionary with the same keys as the input dict, and
619
      values representing the result of the checks
620

621
  """
622
  result = {}
623
  my_name = netutils.Hostname.GetSysName()
624
  port = netutils.GetDaemonPort(constants.NODED)
625
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
626

    
627
  if constants.NV_HYPERVISOR in what and vm_capable:
628
    result[constants.NV_HYPERVISOR] = tmp = {}
629
    for hv_name in what[constants.NV_HYPERVISOR]:
630
      try:
631
        val = hypervisor.GetHypervisor(hv_name).Verify()
632
      except errors.HypervisorError, err:
633
        val = "Error while checking hypervisor: %s" % str(err)
634
      tmp[hv_name] = val
635

    
636
  if constants.NV_HVPARAMS in what and vm_capable:
637
    result[constants.NV_HVPARAMS] = tmp = []
638
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
639
      try:
640
        logging.info("Validating hv %s, %s", hv_name, hvparms)
641
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
642
      except errors.HypervisorError, err:
643
        tmp.append((source, hv_name, str(err)))
644

    
645
  if constants.NV_FILELIST in what:
646
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
647
      what[constants.NV_FILELIST])
648

    
649
  if constants.NV_NODELIST in what:
650
    (nodes, bynode) = what[constants.NV_NODELIST]
651

    
652
    # Add nodes from other groups (different for each node)
653
    try:
654
      nodes.extend(bynode[my_name])
655
    except KeyError:
656
      pass
657

    
658
    # Use a random order
659
    random.shuffle(nodes)
660

    
661
    # Try to contact all nodes
662
    val = {}
663
    for node in nodes:
664
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
665
      if not success:
666
        val[node] = message
667

    
668
    result[constants.NV_NODELIST] = val
669

    
670
  if constants.NV_NODENETTEST in what:
671
    result[constants.NV_NODENETTEST] = tmp = {}
672
    my_pip = my_sip = None
673
    for name, pip, sip in what[constants.NV_NODENETTEST]:
674
      if name == my_name:
675
        my_pip = pip
676
        my_sip = sip
677
        break
678
    if not my_pip:
679
      tmp[my_name] = ("Can't find my own primary/secondary IP"
680
                      " in the node list")
681
    else:
682
      for name, pip, sip in what[constants.NV_NODENETTEST]:
683
        fail = []
684
        if not netutils.TcpPing(pip, port, source=my_pip):
685
          fail.append("primary")
686
        if sip != pip:
687
          if not netutils.TcpPing(sip, port, source=my_sip):
688
            fail.append("secondary")
689
        if fail:
690
          tmp[name] = ("failure using the %s interface(s)" %
691
                       " and ".join(fail))
692

    
693
  if constants.NV_MASTERIP in what:
694
    # FIXME: add checks on incoming data structures (here and in the
695
    # rest of the function)
696
    master_name, master_ip = what[constants.NV_MASTERIP]
697
    if master_name == my_name:
698
      source = constants.IP4_ADDRESS_LOCALHOST
699
    else:
700
      source = None
701
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
702
                                                     source=source)
703

    
704
  if constants.NV_USERSCRIPTS in what:
705
    result[constants.NV_USERSCRIPTS] = \
706
      [script for script in what[constants.NV_USERSCRIPTS]
707
       if not (os.path.exists(script) and os.access(script, os.X_OK))]
708

    
709
  if constants.NV_OOB_PATHS in what:
710
    result[constants.NV_OOB_PATHS] = tmp = []
711
    for path in what[constants.NV_OOB_PATHS]:
712
      try:
713
        st = os.stat(path)
714
      except OSError, err:
715
        tmp.append("error stating out of band helper: %s" % err)
716
      else:
717
        if stat.S_ISREG(st.st_mode):
718
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
719
            tmp.append(None)
720
          else:
721
            tmp.append("out of band helper %s is not executable" % path)
722
        else:
723
          tmp.append("out of band helper %s is not a file" % path)
724

    
725
  if constants.NV_LVLIST in what and vm_capable:
726
    try:
727
      val = GetVolumeList(utils.ListVolumeGroups().keys())
728
    except RPCFail, err:
729
      val = str(err)
730
    result[constants.NV_LVLIST] = val
731

    
732
  if constants.NV_INSTANCELIST in what and vm_capable:
733
    # GetInstanceList can fail
734
    try:
735
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
736
    except RPCFail, err:
737
      val = str(err)
738
    result[constants.NV_INSTANCELIST] = val
739

    
740
  if constants.NV_VGLIST in what and vm_capable:
741
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
742

    
743
  if constants.NV_PVLIST in what and vm_capable:
744
    result[constants.NV_PVLIST] = \
745
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
746
                                   filter_allocatable=False)
747

    
748
  if constants.NV_VERSION in what:
749
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
750
                                    constants.RELEASE_VERSION)
751

    
752
  if constants.NV_HVINFO in what and vm_capable:
753
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
754
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
755

    
756
  if constants.NV_DRBDLIST in what and vm_capable:
757
    try:
758
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
759
    except errors.BlockDeviceError, err:
760
      logging.warning("Can't get used minors list", exc_info=True)
761
      used_minors = str(err)
762
    result[constants.NV_DRBDLIST] = used_minors
763

    
764
  if constants.NV_DRBDHELPER in what and vm_capable:
765
    status = True
766
    try:
767
      payload = bdev.BaseDRBD.GetUsermodeHelper()
768
    except errors.BlockDeviceError, err:
769
      logging.error("Can't get DRBD usermode helper: %s", str(err))
770
      status = False
771
      payload = str(err)
772
    result[constants.NV_DRBDHELPER] = (status, payload)
773

    
774
  if constants.NV_NODESETUP in what:
775
    result[constants.NV_NODESETUP] = tmpr = []
776
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
777
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
778
                  " under /sys, missing required directories /sys/block"
779
                  " and /sys/class/net")
780
    if (not os.path.isdir("/proc/sys") or
781
        not os.path.isfile("/proc/sysrq-trigger")):
782
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
783
                  " under /proc, missing required directory /proc/sys and"
784
                  " the file /proc/sysrq-trigger")
785

    
786
  if constants.NV_TIME in what:
787
    result[constants.NV_TIME] = utils.SplitTime(time.time())
788

    
789
  if constants.NV_OSLIST in what and vm_capable:
790
    result[constants.NV_OSLIST] = DiagnoseOS()
791

    
792
  if constants.NV_BRIDGES in what and vm_capable:
793
    result[constants.NV_BRIDGES] = [bridge
794
                                    for bridge in what[constants.NV_BRIDGES]
795
                                    if not utils.BridgeExists(bridge)]
796
  return result
797

    
798

    
799
def GetBlockDevSizes(devices):
800
  """Return the size of the given block devices
801

802
  @type devices: list
803
  @param devices: list of block device nodes to query
804
  @rtype: dict
805
  @return:
806
    dictionary of all block devices under /dev (key). The value is their
807
    size in MiB.
808

809
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
810

811
  """
812
  DEV_PREFIX = "/dev/"
813
  blockdevs = {}
814

    
815
  for devpath in devices:
816
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
817
      continue
818

    
819
    try:
820
      st = os.stat(devpath)
821
    except EnvironmentError, err:
822
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
823
      continue
824

    
825
    if stat.S_ISBLK(st.st_mode):
826
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
827
      if result.failed:
828
        # We don't want to fail, just do not list this device as available
829
        logging.warning("Cannot get size for block device %s", devpath)
830
        continue
831

    
832
      size = int(result.stdout) / (1024 * 1024)
833
      blockdevs[devpath] = size
834
  return blockdevs
835

    
836

    
837
def GetVolumeList(vg_names):
838
  """Compute list of logical volumes and their size.
839

840
  @type vg_names: list
841
  @param vg_names: the volume groups whose LVs we should list, or
842
      empty for all volume groups
843
  @rtype: dict
844
  @return:
845
      dictionary of all partions (key) with value being a tuple of
846
      their size (in MiB), inactive and online status::
847

848
        {'xenvg/test1': ('20.06', True, True)}
849

850
      in case of errors, a string is returned with the error
851
      details.
852

853
  """
854
  lvs = {}
855
  sep = "|"
856
  if not vg_names:
857
    vg_names = []
858
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
859
                         "--separator=%s" % sep,
860
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
861
  if result.failed:
862
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
863

    
864
  for line in result.stdout.splitlines():
865
    line = line.strip()
866
    match = _LVSLINE_REGEX.match(line)
867
    if not match:
868
      logging.error("Invalid line returned from lvs output: '%s'", line)
869
      continue
870
    vg_name, name, size, attr = match.groups()
871
    inactive = attr[4] == "-"
872
    online = attr[5] == "o"
873
    virtual = attr[0] == "v"
874
    if virtual:
875
      # we don't want to report such volumes as existing, since they
876
      # don't really hold data
877
      continue
878
    lvs[vg_name + "/" + name] = (size, inactive, online)
879

    
880
  return lvs
881

    
882

    
883
def ListVolumeGroups():
884
  """List the volume groups and their size.
885

886
  @rtype: dict
887
  @return: dictionary with keys volume name and values the
888
      size of the volume
889

890
  """
891
  return utils.ListVolumeGroups()
892

    
893

    
894
def NodeVolumes():
895
  """List all volumes on this node.
896

897
  @rtype: list
898
  @return:
899
    A list of dictionaries, each having four keys:
900
      - name: the logical volume name,
901
      - size: the size of the logical volume
902
      - dev: the physical device on which the LV lives
903
      - vg: the volume group to which it belongs
904

905
    In case of errors, we return an empty list and log the
906
    error.
907

908
    Note that since a logical volume can live on multiple physical
909
    volumes, the resulting list might include a logical volume
910
    multiple times.
911

912
  """
913
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
914
                         "--separator=|",
915
                         "--options=lv_name,lv_size,devices,vg_name"])
916
  if result.failed:
917
    _Fail("Failed to list logical volumes, lvs output: %s",
918
          result.output)
919

    
920
  def parse_dev(dev):
921
    return dev.split("(")[0]
922

    
923
  def handle_dev(dev):
924
    return [parse_dev(x) for x in dev.split(",")]
925

    
926
  def map_line(line):
927
    line = [v.strip() for v in line]
928
    return [{"name": line[0], "size": line[1],
929
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
930

    
931
  all_devs = []
932
  for line in result.stdout.splitlines():
933
    if line.count("|") >= 3:
934
      all_devs.extend(map_line(line.split("|")))
935
    else:
936
      logging.warning("Strange line in the output from lvs: '%s'", line)
937
  return all_devs
938

    
939

    
940
def BridgesExist(bridges_list):
941
  """Check if a list of bridges exist on the current node.
942

943
  @rtype: boolean
944
  @return: C{True} if all of them exist, C{False} otherwise
945

946
  """
947
  missing = []
948
  for bridge in bridges_list:
949
    if not utils.BridgeExists(bridge):
950
      missing.append(bridge)
951

    
952
  if missing:
953
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
954

    
955

    
956
def GetInstanceList(hypervisor_list):
957
  """Provides a list of instances.
958

959
  @type hypervisor_list: list
960
  @param hypervisor_list: the list of hypervisors to query information
961

962
  @rtype: list
963
  @return: a list of all running instances on the current node
964
    - instance1.example.com
965
    - instance2.example.com
966

967
  """
968
  results = []
969
  for hname in hypervisor_list:
970
    try:
971
      names = hypervisor.GetHypervisor(hname).ListInstances()
972
      results.extend(names)
973
    except errors.HypervisorError, err:
974
      _Fail("Error enumerating instances (hypervisor %s): %s",
975
            hname, err, exc=True)
976

    
977
  return results
978

    
979

    
980
def GetInstanceInfo(instance, hname):
981
  """Gives back the information about an instance as a dictionary.
982

983
  @type instance: string
984
  @param instance: the instance name
985
  @type hname: string
986
  @param hname: the hypervisor type of the instance
987

988
  @rtype: dict
989
  @return: dictionary with the following keys:
990
      - memory: memory size of instance (int)
991
      - state: xen state of instance (string)
992
      - time: cpu time of instance (float)
993

994
  """
995
  output = {}
996

    
997
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
998
  if iinfo is not None:
999
    output["memory"] = iinfo[2]
1000
    output["state"] = iinfo[4]
1001
    output["time"] = iinfo[5]
1002

    
1003
  return output
1004

    
1005

    
1006
def GetInstanceMigratable(instance):
1007
  """Gives whether an instance can be migrated.
1008

1009
  @type instance: L{objects.Instance}
1010
  @param instance: object representing the instance to be checked.
1011

1012
  @rtype: tuple
1013
  @return: tuple of (result, description) where:
1014
      - result: whether the instance can be migrated or not
1015
      - description: a description of the issue, if relevant
1016

1017
  """
1018
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1019
  iname = instance.name
1020
  if iname not in hyper.ListInstances():
1021
    _Fail("Instance %s is not running", iname)
1022

    
1023
  for idx in range(len(instance.disks)):
1024
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1025
    if not os.path.islink(link_name):
1026
      logging.warning("Instance %s is missing symlink %s for disk %d",
1027
                      iname, link_name, idx)
1028

    
1029

    
1030
def GetAllInstancesInfo(hypervisor_list):
1031
  """Gather data about all instances.
1032

1033
  This is the equivalent of L{GetInstanceInfo}, except that it
1034
  computes data for all instances at once, thus being faster if one
1035
  needs data about more than one instance.
1036

1037
  @type hypervisor_list: list
1038
  @param hypervisor_list: list of hypervisors to query for instance data
1039

1040
  @rtype: dict
1041
  @return: dictionary of instance: data, with data having the following keys:
1042
      - memory: memory size of instance (int)
1043
      - state: xen state of instance (string)
1044
      - time: cpu time of instance (float)
1045
      - vcpus: the number of vcpus
1046

1047
  """
1048
  output = {}
1049

    
1050
  for hname in hypervisor_list:
1051
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1052
    if iinfo:
1053
      for name, _, memory, vcpus, state, times in iinfo:
1054
        value = {
1055
          "memory": memory,
1056
          "vcpus": vcpus,
1057
          "state": state,
1058
          "time": times,
1059
          }
1060
        if name in output:
1061
          # we only check static parameters, like memory and vcpus,
1062
          # and not state and time which can change between the
1063
          # invocations of the different hypervisors
1064
          for key in "memory", "vcpus":
1065
            if value[key] != output[name][key]:
1066
              _Fail("Instance %s is running twice"
1067
                    " with different parameters", name)
1068
        output[name] = value
1069

    
1070
  return output
1071

    
1072

    
1073
def _InstanceLogName(kind, os_name, instance, component):
1074
  """Compute the OS log filename for a given instance and operation.
1075

1076
  The instance name and os name are passed in as strings since not all
1077
  operations have these as part of an instance object.
1078

1079
  @type kind: string
1080
  @param kind: the operation type (e.g. add, import, etc.)
1081
  @type os_name: string
1082
  @param os_name: the os name
1083
  @type instance: string
1084
  @param instance: the name of the instance being imported/added/etc.
1085
  @type component: string or None
1086
  @param component: the name of the component of the instance being
1087
      transferred
1088

1089
  """
1090
  # TODO: Use tempfile.mkstemp to create unique filename
1091
  if component:
1092
    assert "/" not in component
1093
    c_msg = "-%s" % component
1094
  else:
1095
    c_msg = ""
1096
  base = ("%s-%s-%s%s-%s.log" %
1097
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1098
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1099

    
1100

    
1101
def InstanceOsAdd(instance, reinstall, debug):
1102
  """Add an OS to an instance.
1103

1104
  @type instance: L{objects.Instance}
1105
  @param instance: Instance whose OS is to be installed
1106
  @type reinstall: boolean
1107
  @param reinstall: whether this is an instance reinstall
1108
  @type debug: integer
1109
  @param debug: debug level, passed to the OS scripts
1110
  @rtype: None
1111

1112
  """
1113
  inst_os = OSFromDisk(instance.os)
1114

    
1115
  create_env = OSEnvironment(instance, inst_os, debug)
1116
  if reinstall:
1117
    create_env["INSTANCE_REINSTALL"] = "1"
1118

    
1119
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1120

    
1121
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1122
                        cwd=inst_os.path, output=logfile, reset_env=True)
1123
  if result.failed:
1124
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1125
                  " output: %s", result.cmd, result.fail_reason, logfile,
1126
                  result.output)
1127
    lines = [utils.SafeEncode(val)
1128
             for val in utils.TailFile(logfile, lines=20)]
1129
    _Fail("OS create script failed (%s), last lines in the"
1130
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1131

    
1132

    
1133
def RunRenameInstance(instance, old_name, debug):
1134
  """Run the OS rename script for an instance.
1135

1136
  @type instance: L{objects.Instance}
1137
  @param instance: Instance whose OS is to be installed
1138
  @type old_name: string
1139
  @param old_name: previous instance name
1140
  @type debug: integer
1141
  @param debug: debug level, passed to the OS scripts
1142
  @rtype: boolean
1143
  @return: the success of the operation
1144

1145
  """
1146
  inst_os = OSFromDisk(instance.os)
1147

    
1148
  rename_env = OSEnvironment(instance, inst_os, debug)
1149
  rename_env["OLD_INSTANCE_NAME"] = old_name
1150

    
1151
  logfile = _InstanceLogName("rename", instance.os,
1152
                             "%s-%s" % (old_name, instance.name), None)
1153

    
1154
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1155
                        cwd=inst_os.path, output=logfile, reset_env=True)
1156

    
1157
  if result.failed:
1158
    logging.error("os create command '%s' returned error: %s output: %s",
1159
                  result.cmd, result.fail_reason, result.output)
1160
    lines = [utils.SafeEncode(val)
1161
             for val in utils.TailFile(logfile, lines=20)]
1162
    _Fail("OS rename script failed (%s), last lines in the"
1163
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1164

    
1165

    
1166
def _GetBlockDevSymlinkPath(instance_name, idx):
1167
  return utils.PathJoin(pathutils.DISK_LINKS_DIR, "%s%s%d" %
1168
                        (instance_name, constants.DISK_SEPARATOR, idx))
1169

    
1170

    
1171
def _SymlinkBlockDev(instance_name, device_path, idx):
1172
  """Set up symlinks to a instance's block device.
1173

1174
  This is an auxiliary function run when an instance is start (on the primary
1175
  node) or when an instance is migrated (on the target node).
1176

1177

1178
  @param instance_name: the name of the target instance
1179
  @param device_path: path of the physical block device, on the node
1180
  @param idx: the disk index
1181
  @return: absolute path to the disk's symlink
1182

1183
  """
1184
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1185
  try:
1186
    os.symlink(device_path, link_name)
1187
  except OSError, err:
1188
    if err.errno == errno.EEXIST:
1189
      if (not os.path.islink(link_name) or
1190
          os.readlink(link_name) != device_path):
1191
        os.remove(link_name)
1192
        os.symlink(device_path, link_name)
1193
    else:
1194
      raise
1195

    
1196
  return link_name
1197

    
1198

    
1199
def _RemoveBlockDevLinks(instance_name, disks):
1200
  """Remove the block device symlinks belonging to the given instance.
1201

1202
  """
1203
  for idx, _ in enumerate(disks):
1204
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1205
    if os.path.islink(link_name):
1206
      try:
1207
        os.remove(link_name)
1208
      except OSError:
1209
        logging.exception("Can't remove symlink '%s'", link_name)
1210

    
1211

    
1212
def _GatherAndLinkBlockDevs(instance):
1213
  """Set up an instance's block device(s).
1214

1215
  This is run on the primary node at instance startup. The block
1216
  devices must be already assembled.
1217

1218
  @type instance: L{objects.Instance}
1219
  @param instance: the instance whose disks we shoul assemble
1220
  @rtype: list
1221
  @return: list of (disk_object, device_path)
1222

1223
  """
1224
  block_devices = []
1225
  for idx, disk in enumerate(instance.disks):
1226
    device = _RecursiveFindBD(disk)
1227
    if device is None:
1228
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1229
                                    str(disk))
1230
    device.Open()
1231
    try:
1232
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1233
    except OSError, e:
1234
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1235
                                    e.strerror)
1236

    
1237
    block_devices.append((disk, link_name))
1238

    
1239
  return block_devices
1240

    
1241

    
1242
def StartInstance(instance, startup_paused):
1243
  """Start an instance.
1244

1245
  @type instance: L{objects.Instance}
1246
  @param instance: the instance object
1247
  @type startup_paused: bool
1248
  @param instance: pause instance at startup?
1249
  @rtype: None
1250

1251
  """
1252
  running_instances = GetInstanceList([instance.hypervisor])
1253

    
1254
  if instance.name in running_instances:
1255
    logging.info("Instance %s already running, not starting", instance.name)
1256
    return
1257

    
1258
  try:
1259
    block_devices = _GatherAndLinkBlockDevs(instance)
1260
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1261
    hyper.StartInstance(instance, block_devices, startup_paused)
1262
  except errors.BlockDeviceError, err:
1263
    _Fail("Block device error: %s", err, exc=True)
1264
  except errors.HypervisorError, err:
1265
    _RemoveBlockDevLinks(instance.name, instance.disks)
1266
    _Fail("Hypervisor error: %s", err, exc=True)
1267

    
1268

    
1269
def InstanceShutdown(instance, timeout):
1270
  """Shut an instance down.
1271

1272
  @note: this functions uses polling with a hardcoded timeout.
1273

1274
  @type instance: L{objects.Instance}
1275
  @param instance: the instance object
1276
  @type timeout: integer
1277
  @param timeout: maximum timeout for soft shutdown
1278
  @rtype: None
1279

1280
  """
1281
  hv_name = instance.hypervisor
1282
  hyper = hypervisor.GetHypervisor(hv_name)
1283
  iname = instance.name
1284

    
1285
  if instance.name not in hyper.ListInstances():
1286
    logging.info("Instance %s not running, doing nothing", iname)
1287
    return
1288

    
1289
  class _TryShutdown:
1290
    def __init__(self):
1291
      self.tried_once = False
1292

    
1293
    def __call__(self):
1294
      if iname not in hyper.ListInstances():
1295
        return
1296

    
1297
      try:
1298
        hyper.StopInstance(instance, retry=self.tried_once)
1299
      except errors.HypervisorError, err:
1300
        if iname not in hyper.ListInstances():
1301
          # if the instance is no longer existing, consider this a
1302
          # success and go to cleanup
1303
          return
1304

    
1305
        _Fail("Failed to stop instance %s: %s", iname, err)
1306

    
1307
      self.tried_once = True
1308

    
1309
      raise utils.RetryAgain()
1310

    
1311
  try:
1312
    utils.Retry(_TryShutdown(), 5, timeout)
1313
  except utils.RetryTimeout:
1314
    # the shutdown did not succeed
1315
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1316

    
1317
    try:
1318
      hyper.StopInstance(instance, force=True)
1319
    except errors.HypervisorError, err:
1320
      if iname in hyper.ListInstances():
1321
        # only raise an error if the instance still exists, otherwise
1322
        # the error could simply be "instance ... unknown"!
1323
        _Fail("Failed to force stop instance %s: %s", iname, err)
1324

    
1325
    time.sleep(1)
1326

    
1327
    if iname in hyper.ListInstances():
1328
      _Fail("Could not shutdown instance %s even by destroy", iname)
1329

    
1330
  try:
1331
    hyper.CleanupInstance(instance.name)
1332
  except errors.HypervisorError, err:
1333
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1334

    
1335
  _RemoveBlockDevLinks(iname, instance.disks)
1336

    
1337

    
1338
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1339
  """Reboot an instance.
1340

1341
  @type instance: L{objects.Instance}
1342
  @param instance: the instance object to reboot
1343
  @type reboot_type: str
1344
  @param reboot_type: the type of reboot, one the following
1345
    constants:
1346
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1347
        instance OS, do not recreate the VM
1348
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1349
        restart the VM (at the hypervisor level)
1350
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1351
        not accepted here, since that mode is handled differently, in
1352
        cmdlib, and translates into full stop and start of the
1353
        instance (instead of a call_instance_reboot RPC)
1354
  @type shutdown_timeout: integer
1355
  @param shutdown_timeout: maximum timeout for soft shutdown
1356
  @rtype: None
1357

1358
  """
1359
  running_instances = GetInstanceList([instance.hypervisor])
1360

    
1361
  if instance.name not in running_instances:
1362
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1363

    
1364
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1365
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1366
    try:
1367
      hyper.RebootInstance(instance)
1368
    except errors.HypervisorError, err:
1369
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1370
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1371
    try:
1372
      InstanceShutdown(instance, shutdown_timeout)
1373
      return StartInstance(instance, False)
1374
    except errors.HypervisorError, err:
1375
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1376
  else:
1377
    _Fail("Invalid reboot_type received: %s", reboot_type)
1378

    
1379

    
1380
def InstanceBalloonMemory(instance, memory):
1381
  """Resize an instance's memory.
1382

1383
  @type instance: L{objects.Instance}
1384
  @param instance: the instance object
1385
  @type memory: int
1386
  @param memory: new memory amount in MB
1387
  @rtype: None
1388

1389
  """
1390
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1391
  running = hyper.ListInstances()
1392
  if instance.name not in running:
1393
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1394
    return
1395
  try:
1396
    hyper.BalloonInstanceMemory(instance, memory)
1397
  except errors.HypervisorError, err:
1398
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1399

    
1400

    
1401
def MigrationInfo(instance):
1402
  """Gather information about an instance to be migrated.
1403

1404
  @type instance: L{objects.Instance}
1405
  @param instance: the instance definition
1406

1407
  """
1408
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1409
  try:
1410
    info = hyper.MigrationInfo(instance)
1411
  except errors.HypervisorError, err:
1412
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1413
  return info
1414

    
1415

    
1416
def AcceptInstance(instance, info, target):
1417
  """Prepare the node to accept an instance.
1418

1419
  @type instance: L{objects.Instance}
1420
  @param instance: the instance definition
1421
  @type info: string/data (opaque)
1422
  @param info: migration information, from the source node
1423
  @type target: string
1424
  @param target: target host (usually ip), on this node
1425

1426
  """
1427
  # TODO: why is this required only for DTS_EXT_MIRROR?
1428
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1429
    # Create the symlinks, as the disks are not active
1430
    # in any way
1431
    try:
1432
      _GatherAndLinkBlockDevs(instance)
1433
    except errors.BlockDeviceError, err:
1434
      _Fail("Block device error: %s", err, exc=True)
1435

    
1436
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1437
  try:
1438
    hyper.AcceptInstance(instance, info, target)
1439
  except errors.HypervisorError, err:
1440
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1441
      _RemoveBlockDevLinks(instance.name, instance.disks)
1442
    _Fail("Failed to accept instance: %s", err, exc=True)
1443

    
1444

    
1445
def FinalizeMigrationDst(instance, info, success):
1446
  """Finalize any preparation to accept an instance.
1447

1448
  @type instance: L{objects.Instance}
1449
  @param instance: the instance definition
1450
  @type info: string/data (opaque)
1451
  @param info: migration information, from the source node
1452
  @type success: boolean
1453
  @param success: whether the migration was a success or a failure
1454

1455
  """
1456
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1457
  try:
1458
    hyper.FinalizeMigrationDst(instance, info, success)
1459
  except errors.HypervisorError, err:
1460
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1461

    
1462

    
1463
def MigrateInstance(instance, target, live):
1464
  """Migrates an instance to another node.
1465

1466
  @type instance: L{objects.Instance}
1467
  @param instance: the instance definition
1468
  @type target: string
1469
  @param target: the target node name
1470
  @type live: boolean
1471
  @param live: whether the migration should be done live or not (the
1472
      interpretation of this parameter is left to the hypervisor)
1473
  @raise RPCFail: if migration fails for some reason
1474

1475
  """
1476
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1477

    
1478
  try:
1479
    hyper.MigrateInstance(instance, target, live)
1480
  except errors.HypervisorError, err:
1481
    _Fail("Failed to migrate instance: %s", err, exc=True)
1482

    
1483

    
1484
def FinalizeMigrationSource(instance, success, live):
1485
  """Finalize the instance migration on the source node.
1486

1487
  @type instance: L{objects.Instance}
1488
  @param instance: the instance definition of the migrated instance
1489
  @type success: bool
1490
  @param success: whether the migration succeeded or not
1491
  @type live: bool
1492
  @param live: whether the user requested a live migration or not
1493
  @raise RPCFail: If the execution fails for some reason
1494

1495
  """
1496
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1497

    
1498
  try:
1499
    hyper.FinalizeMigrationSource(instance, success, live)
1500
  except Exception, err:  # pylint: disable=W0703
1501
    _Fail("Failed to finalize the migration on the source node: %s", err,
1502
          exc=True)
1503

    
1504

    
1505
def GetMigrationStatus(instance):
1506
  """Get the migration status
1507

1508
  @type instance: L{objects.Instance}
1509
  @param instance: the instance that is being migrated
1510
  @rtype: L{objects.MigrationStatus}
1511
  @return: the status of the current migration (one of
1512
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1513
           progress info that can be retrieved from the hypervisor
1514
  @raise RPCFail: If the migration status cannot be retrieved
1515

1516
  """
1517
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1518
  try:
1519
    return hyper.GetMigrationStatus(instance)
1520
  except Exception, err:  # pylint: disable=W0703
1521
    _Fail("Failed to get migration status: %s", err, exc=True)
1522

    
1523

    
1524
def BlockdevCreate(disk, size, owner, on_primary, info):
1525
  """Creates a block device for an instance.
1526

1527
  @type disk: L{objects.Disk}
1528
  @param disk: the object describing the disk we should create
1529
  @type size: int
1530
  @param size: the size of the physical underlying device, in MiB
1531
  @type owner: str
1532
  @param owner: the name of the instance for which disk is created,
1533
      used for device cache data
1534
  @type on_primary: boolean
1535
  @param on_primary:  indicates if it is the primary node or not
1536
  @type info: string
1537
  @param info: string that will be sent to the physical device
1538
      creation, used for example to set (LVM) tags on LVs
1539

1540
  @return: the new unique_id of the device (this can sometime be
1541
      computed only after creation), or None. On secondary nodes,
1542
      it's not required to return anything.
1543

1544
  """
1545
  # TODO: remove the obsolete "size" argument
1546
  # pylint: disable=W0613
1547
  clist = []
1548
  if disk.children:
1549
    for child in disk.children:
1550
      try:
1551
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1552
      except errors.BlockDeviceError, err:
1553
        _Fail("Can't assemble device %s: %s", child, err)
1554
      if on_primary or disk.AssembleOnSecondary():
1555
        # we need the children open in case the device itself has to
1556
        # be assembled
1557
        try:
1558
          # pylint: disable=E1103
1559
          crdev.Open()
1560
        except errors.BlockDeviceError, err:
1561
          _Fail("Can't make child '%s' read-write: %s", child, err)
1562
      clist.append(crdev)
1563

    
1564
  try:
1565
    device = bdev.Create(disk, clist)
1566
  except errors.BlockDeviceError, err:
1567
    _Fail("Can't create block device: %s", err)
1568

    
1569
  if on_primary or disk.AssembleOnSecondary():
1570
    try:
1571
      device.Assemble()
1572
    except errors.BlockDeviceError, err:
1573
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1574
    if on_primary or disk.OpenOnSecondary():
1575
      try:
1576
        device.Open(force=True)
1577
      except errors.BlockDeviceError, err:
1578
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1579
    DevCacheManager.UpdateCache(device.dev_path, owner,
1580
                                on_primary, disk.iv_name)
1581

    
1582
  device.SetInfo(info)
1583

    
1584
  return device.unique_id
1585

    
1586

    
1587
def _WipeDevice(path, offset, size):
1588
  """This function actually wipes the device.
1589

1590
  @param path: The path to the device to wipe
1591
  @param offset: The offset in MiB in the file
1592
  @param size: The size in MiB to write
1593

1594
  """
1595
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1596
         "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1597
         "count=%d" % size]
1598
  result = utils.RunCmd(cmd)
1599

    
1600
  if result.failed:
1601
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1602
          result.fail_reason, result.output)
1603

    
1604

    
1605
def BlockdevWipe(disk, offset, size):
1606
  """Wipes a block device.
1607

1608
  @type disk: L{objects.Disk}
1609
  @param disk: the disk object we want to wipe
1610
  @type offset: int
1611
  @param offset: The offset in MiB in the file
1612
  @type size: int
1613
  @param size: The size in MiB to write
1614

1615
  """
1616
  try:
1617
    rdev = _RecursiveFindBD(disk)
1618
  except errors.BlockDeviceError:
1619
    rdev = None
1620

    
1621
  if not rdev:
1622
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1623

    
1624
  # Do cross verify some of the parameters
1625
  if offset > rdev.size:
1626
    _Fail("Offset is bigger than device size")
1627
  if (offset + size) > rdev.size:
1628
    _Fail("The provided offset and size to wipe is bigger than device size")
1629

    
1630
  _WipeDevice(rdev.dev_path, offset, size)
1631

    
1632

    
1633
def BlockdevPauseResumeSync(disks, pause):
1634
  """Pause or resume the sync of the block device.
1635

1636
  @type disks: list of L{objects.Disk}
1637
  @param disks: the disks object we want to pause/resume
1638
  @type pause: bool
1639
  @param pause: Wheater to pause or resume
1640

1641
  """
1642
  success = []
1643
  for disk in disks:
1644
    try:
1645
      rdev = _RecursiveFindBD(disk)
1646
    except errors.BlockDeviceError:
1647
      rdev = None
1648

    
1649
    if not rdev:
1650
      success.append((False, ("Cannot change sync for device %s:"
1651
                              " device not found" % disk.iv_name)))
1652
      continue
1653

    
1654
    result = rdev.PauseResumeSync(pause)
1655

    
1656
    if result:
1657
      success.append((result, None))
1658
    else:
1659
      if pause:
1660
        msg = "Pause"
1661
      else:
1662
        msg = "Resume"
1663
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1664

    
1665
  return success
1666

    
1667

    
1668
def BlockdevRemove(disk):
1669
  """Remove a block device.
1670

1671
  @note: This is intended to be called recursively.
1672

1673
  @type disk: L{objects.Disk}
1674
  @param disk: the disk object we should remove
1675
  @rtype: boolean
1676
  @return: the success of the operation
1677

1678
  """
1679
  msgs = []
1680
  try:
1681
    rdev = _RecursiveFindBD(disk)
1682
  except errors.BlockDeviceError, err:
1683
    # probably can't attach
1684
    logging.info("Can't attach to device %s in remove", disk)
1685
    rdev = None
1686
  if rdev is not None:
1687
    r_path = rdev.dev_path
1688
    try:
1689
      rdev.Remove()
1690
    except errors.BlockDeviceError, err:
1691
      msgs.append(str(err))
1692
    if not msgs:
1693
      DevCacheManager.RemoveCache(r_path)
1694

    
1695
  if disk.children:
1696
    for child in disk.children:
1697
      try:
1698
        BlockdevRemove(child)
1699
      except RPCFail, err:
1700
        msgs.append(str(err))
1701

    
1702
  if msgs:
1703
    _Fail("; ".join(msgs))
1704

    
1705

    
1706
def _RecursiveAssembleBD(disk, owner, as_primary):
1707
  """Activate a block device for an instance.
1708

1709
  This is run on the primary and secondary nodes for an instance.
1710

1711
  @note: this function is called recursively.
1712

1713
  @type disk: L{objects.Disk}
1714
  @param disk: the disk we try to assemble
1715
  @type owner: str
1716
  @param owner: the name of the instance which owns the disk
1717
  @type as_primary: boolean
1718
  @param as_primary: if we should make the block device
1719
      read/write
1720

1721
  @return: the assembled device or None (in case no device
1722
      was assembled)
1723
  @raise errors.BlockDeviceError: in case there is an error
1724
      during the activation of the children or the device
1725
      itself
1726

1727
  """
1728
  children = []
1729
  if disk.children:
1730
    mcn = disk.ChildrenNeeded()
1731
    if mcn == -1:
1732
      mcn = 0 # max number of Nones allowed
1733
    else:
1734
      mcn = len(disk.children) - mcn # max number of Nones
1735
    for chld_disk in disk.children:
1736
      try:
1737
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1738
      except errors.BlockDeviceError, err:
1739
        if children.count(None) >= mcn:
1740
          raise
1741
        cdev = None
1742
        logging.error("Error in child activation (but continuing): %s",
1743
                      str(err))
1744
      children.append(cdev)
1745

    
1746
  if as_primary or disk.AssembleOnSecondary():
1747
    r_dev = bdev.Assemble(disk, children)
1748
    result = r_dev
1749
    if as_primary or disk.OpenOnSecondary():
1750
      r_dev.Open()
1751
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1752
                                as_primary, disk.iv_name)
1753

    
1754
  else:
1755
    result = True
1756
  return result
1757

    
1758

    
1759
def BlockdevAssemble(disk, owner, as_primary, idx):
1760
  """Activate a block device for an instance.
1761

1762
  This is a wrapper over _RecursiveAssembleBD.
1763

1764
  @rtype: str or boolean
1765
  @return: a C{/dev/...} path for primary nodes, and
1766
      C{True} for secondary nodes
1767

1768
  """
1769
  try:
1770
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1771
    if isinstance(result, bdev.BlockDev):
1772
      # pylint: disable=E1103
1773
      result = result.dev_path
1774
      if as_primary:
1775
        _SymlinkBlockDev(owner, result, idx)
1776
  except errors.BlockDeviceError, err:
1777
    _Fail("Error while assembling disk: %s", err, exc=True)
1778
  except OSError, err:
1779
    _Fail("Error while symlinking disk: %s", err, exc=True)
1780

    
1781
  return result
1782

    
1783

    
1784
def BlockdevShutdown(disk):
1785
  """Shut down a block device.
1786

1787
  First, if the device is assembled (Attach() is successful), then
1788
  the device is shutdown. Then the children of the device are
1789
  shutdown.
1790

1791
  This function is called recursively. Note that we don't cache the
1792
  children or such, as oppossed to assemble, shutdown of different
1793
  devices doesn't require that the upper device was active.
1794

1795
  @type disk: L{objects.Disk}
1796
  @param disk: the description of the disk we should
1797
      shutdown
1798
  @rtype: None
1799

1800
  """
1801
  msgs = []
1802
  r_dev = _RecursiveFindBD(disk)
1803
  if r_dev is not None:
1804
    r_path = r_dev.dev_path
1805
    try:
1806
      r_dev.Shutdown()
1807
      DevCacheManager.RemoveCache(r_path)
1808
    except errors.BlockDeviceError, err:
1809
      msgs.append(str(err))
1810

    
1811
  if disk.children:
1812
    for child in disk.children:
1813
      try:
1814
        BlockdevShutdown(child)
1815
      except RPCFail, err:
1816
        msgs.append(str(err))
1817

    
1818
  if msgs:
1819
    _Fail("; ".join(msgs))
1820

    
1821

    
1822
def BlockdevAddchildren(parent_cdev, new_cdevs):
1823
  """Extend a mirrored block device.
1824

1825
  @type parent_cdev: L{objects.Disk}
1826
  @param parent_cdev: the disk to which we should add children
1827
  @type new_cdevs: list of L{objects.Disk}
1828
  @param new_cdevs: the list of children which we should add
1829
  @rtype: None
1830

1831
  """
1832
  parent_bdev = _RecursiveFindBD(parent_cdev)
1833
  if parent_bdev is None:
1834
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1835
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1836
  if new_bdevs.count(None) > 0:
1837
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1838
  parent_bdev.AddChildren(new_bdevs)
1839

    
1840

    
1841
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1842
  """Shrink a mirrored block device.
1843

1844
  @type parent_cdev: L{objects.Disk}
1845
  @param parent_cdev: the disk from which we should remove children
1846
  @type new_cdevs: list of L{objects.Disk}
1847
  @param new_cdevs: the list of children which we should remove
1848
  @rtype: None
1849

1850
  """
1851
  parent_bdev = _RecursiveFindBD(parent_cdev)
1852
  if parent_bdev is None:
1853
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1854
  devs = []
1855
  for disk in new_cdevs:
1856
    rpath = disk.StaticDevPath()
1857
    if rpath is None:
1858
      bd = _RecursiveFindBD(disk)
1859
      if bd is None:
1860
        _Fail("Can't find device %s while removing children", disk)
1861
      else:
1862
        devs.append(bd.dev_path)
1863
    else:
1864
      if not utils.IsNormAbsPath(rpath):
1865
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1866
      devs.append(rpath)
1867
  parent_bdev.RemoveChildren(devs)
1868

    
1869

    
1870
def BlockdevGetmirrorstatus(disks):
1871
  """Get the mirroring status of a list of devices.
1872

1873
  @type disks: list of L{objects.Disk}
1874
  @param disks: the list of disks which we should query
1875
  @rtype: disk
1876
  @return: List of L{objects.BlockDevStatus}, one for each disk
1877
  @raise errors.BlockDeviceError: if any of the disks cannot be
1878
      found
1879

1880
  """
1881
  stats = []
1882
  for dsk in disks:
1883
    rbd = _RecursiveFindBD(dsk)
1884
    if rbd is None:
1885
      _Fail("Can't find device %s", dsk)
1886

    
1887
    stats.append(rbd.CombinedSyncStatus())
1888

    
1889
  return stats
1890

    
1891

    
1892
def BlockdevGetmirrorstatusMulti(disks):
1893
  """Get the mirroring status of a list of devices.
1894

1895
  @type disks: list of L{objects.Disk}
1896
  @param disks: the list of disks which we should query
1897
  @rtype: disk
1898
  @return: List of tuples, (bool, status), one for each disk; bool denotes
1899
    success/failure, status is L{objects.BlockDevStatus} on success, string
1900
    otherwise
1901

1902
  """
1903
  result = []
1904
  for disk in disks:
1905
    try:
1906
      rbd = _RecursiveFindBD(disk)
1907
      if rbd is None:
1908
        result.append((False, "Can't find device %s" % disk))
1909
        continue
1910

    
1911
      status = rbd.CombinedSyncStatus()
1912
    except errors.BlockDeviceError, err:
1913
      logging.exception("Error while getting disk status")
1914
      result.append((False, str(err)))
1915
    else:
1916
      result.append((True, status))
1917

    
1918
  assert len(disks) == len(result)
1919

    
1920
  return result
1921

    
1922

    
1923
def _RecursiveFindBD(disk):
1924
  """Check if a device is activated.
1925

1926
  If so, return information about the real device.
1927

1928
  @type disk: L{objects.Disk}
1929
  @param disk: the disk object we need to find
1930

1931
  @return: None if the device can't be found,
1932
      otherwise the device instance
1933

1934
  """
1935
  children = []
1936
  if disk.children:
1937
    for chdisk in disk.children:
1938
      children.append(_RecursiveFindBD(chdisk))
1939

    
1940
  return bdev.FindDevice(disk, children)
1941

    
1942

    
1943
def _OpenRealBD(disk):
1944
  """Opens the underlying block device of a disk.
1945

1946
  @type disk: L{objects.Disk}
1947
  @param disk: the disk object we want to open
1948

1949
  """
1950
  real_disk = _RecursiveFindBD(disk)
1951
  if real_disk is None:
1952
    _Fail("Block device '%s' is not set up", disk)
1953

    
1954
  real_disk.Open()
1955

    
1956
  return real_disk
1957

    
1958

    
1959
def BlockdevFind(disk):
1960
  """Check if a device is activated.
1961

1962
  If it is, return information about the real device.
1963

1964
  @type disk: L{objects.Disk}
1965
  @param disk: the disk to find
1966
  @rtype: None or objects.BlockDevStatus
1967
  @return: None if the disk cannot be found, otherwise a the current
1968
           information
1969

1970
  """
1971
  try:
1972
    rbd = _RecursiveFindBD(disk)
1973
  except errors.BlockDeviceError, err:
1974
    _Fail("Failed to find device: %s", err, exc=True)
1975

    
1976
  if rbd is None:
1977
    return None
1978

    
1979
  return rbd.GetSyncStatus()
1980

    
1981

    
1982
def BlockdevGetsize(disks):
1983
  """Computes the size of the given disks.
1984

1985
  If a disk is not found, returns None instead.
1986

1987
  @type disks: list of L{objects.Disk}
1988
  @param disks: the list of disk to compute the size for
1989
  @rtype: list
1990
  @return: list with elements None if the disk cannot be found,
1991
      otherwise the size
1992

1993
  """
1994
  result = []
1995
  for cf in disks:
1996
    try:
1997
      rbd = _RecursiveFindBD(cf)
1998
    except errors.BlockDeviceError:
1999
      result.append(None)
2000
      continue
2001
    if rbd is None:
2002
      result.append(None)
2003
    else:
2004
      result.append(rbd.GetActualSize())
2005
  return result
2006

    
2007

    
2008
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2009
  """Export a block device to a remote node.
2010

2011
  @type disk: L{objects.Disk}
2012
  @param disk: the description of the disk to export
2013
  @type dest_node: str
2014
  @param dest_node: the destination node to export to
2015
  @type dest_path: str
2016
  @param dest_path: the destination path on the target node
2017
  @type cluster_name: str
2018
  @param cluster_name: the cluster name, needed for SSH hostalias
2019
  @rtype: None
2020

2021
  """
2022
  real_disk = _OpenRealBD(disk)
2023

    
2024
  # the block size on the read dd is 1MiB to match our units
2025
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2026
                               "dd if=%s bs=1048576 count=%s",
2027
                               real_disk.dev_path, str(disk.size))
2028

    
2029
  # we set here a smaller block size as, due to ssh buffering, more
2030
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2031
  # device is not already there or we pass a wrong path; we use
2032
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2033
  # to not buffer too much memory; this means that at best, we flush
2034
  # every 64k, which will not be very fast
2035
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2036
                                " oflag=dsync", dest_path)
2037

    
2038
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2039
                                                   constants.GANETI_RUNAS,
2040
                                                   destcmd)
2041

    
2042
  # all commands have been checked, so we're safe to combine them
2043
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2044

    
2045
  result = utils.RunCmd(["bash", "-c", command])
2046

    
2047
  if result.failed:
2048
    _Fail("Disk copy command '%s' returned error: %s"
2049
          " output: %s", command, result.fail_reason, result.output)
2050

    
2051

    
2052
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2053
  """Write a file to the filesystem.
2054

2055
  This allows the master to overwrite(!) a file. It will only perform
2056
  the operation if the file belongs to a list of configuration files.
2057

2058
  @type file_name: str
2059
  @param file_name: the target file name
2060
  @type data: str
2061
  @param data: the new contents of the file
2062
  @type mode: int
2063
  @param mode: the mode to give the file (can be None)
2064
  @type uid: string
2065
  @param uid: the owner of the file
2066
  @type gid: string
2067
  @param gid: the group of the file
2068
  @type atime: float
2069
  @param atime: the atime to set on the file (can be None)
2070
  @type mtime: float
2071
  @param mtime: the mtime to set on the file (can be None)
2072
  @rtype: None
2073

2074
  """
2075
  if not os.path.isabs(file_name):
2076
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2077

    
2078
  if file_name not in _ALLOWED_UPLOAD_FILES:
2079
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2080
          file_name)
2081

    
2082
  raw_data = _Decompress(data)
2083

    
2084
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2085
    _Fail("Invalid username/groupname type")
2086

    
2087
  getents = runtime.GetEnts()
2088
  uid = getents.LookupUser(uid)
2089
  gid = getents.LookupGroup(gid)
2090

    
2091
  utils.SafeWriteFile(file_name, None,
2092
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2093
                      atime=atime, mtime=mtime)
2094

    
2095

    
2096
def RunOob(oob_program, command, node, timeout):
2097
  """Executes oob_program with given command on given node.
2098

2099
  @param oob_program: The path to the executable oob_program
2100
  @param command: The command to invoke on oob_program
2101
  @param node: The node given as an argument to the program
2102
  @param timeout: Timeout after which we kill the oob program
2103

2104
  @return: stdout
2105
  @raise RPCFail: If execution fails for some reason
2106

2107
  """
2108
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2109

    
2110
  if result.failed:
2111
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2112
          result.fail_reason, result.output)
2113

    
2114
  return result.stdout
2115

    
2116

    
2117
def WriteSsconfFiles(values):
2118
  """Update all ssconf files.
2119

2120
  Wrapper around the SimpleStore.WriteFiles.
2121

2122
  """
2123
  ssconf.SimpleStore().WriteFiles(values)
2124

    
2125

    
2126
def _OSOndiskAPIVersion(os_dir):
2127
  """Compute and return the API version of a given OS.
2128

2129
  This function will try to read the API version of the OS residing in
2130
  the 'os_dir' directory.
2131

2132
  @type os_dir: str
2133
  @param os_dir: the directory in which we should look for the OS
2134
  @rtype: tuple
2135
  @return: tuple (status, data) with status denoting the validity and
2136
      data holding either the vaid versions or an error message
2137

2138
  """
2139
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2140

    
2141
  try:
2142
    st = os.stat(api_file)
2143
  except EnvironmentError, err:
2144
    return False, ("Required file '%s' not found under path %s: %s" %
2145
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2146

    
2147
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2148
    return False, ("File '%s' in %s is not a regular file" %
2149
                   (constants.OS_API_FILE, os_dir))
2150

    
2151
  try:
2152
    api_versions = utils.ReadFile(api_file).splitlines()
2153
  except EnvironmentError, err:
2154
    return False, ("Error while reading the API version file at %s: %s" %
2155
                   (api_file, utils.ErrnoOrStr(err)))
2156

    
2157
  try:
2158
    api_versions = [int(version.strip()) for version in api_versions]
2159
  except (TypeError, ValueError), err:
2160
    return False, ("API version(s) can't be converted to integer: %s" %
2161
                   str(err))
2162

    
2163
  return True, api_versions
2164

    
2165

    
2166
def DiagnoseOS(top_dirs=None):
2167
  """Compute the validity for all OSes.
2168

2169
  @type top_dirs: list
2170
  @param top_dirs: the list of directories in which to
2171
      search (if not given defaults to
2172
      L{pathutils.OS_SEARCH_PATH})
2173
  @rtype: list of L{objects.OS}
2174
  @return: a list of tuples (name, path, status, diagnose, variants,
2175
      parameters, api_version) for all (potential) OSes under all
2176
      search paths, where:
2177
          - name is the (potential) OS name
2178
          - path is the full path to the OS
2179
          - status True/False is the validity of the OS
2180
          - diagnose is the error message for an invalid OS, otherwise empty
2181
          - variants is a list of supported OS variants, if any
2182
          - parameters is a list of (name, help) parameters, if any
2183
          - api_version is a list of support OS API versions
2184

2185
  """
2186
  if top_dirs is None:
2187
    top_dirs = pathutils.OS_SEARCH_PATH
2188

    
2189
  result = []
2190
  for dir_name in top_dirs:
2191
    if os.path.isdir(dir_name):
2192
      try:
2193
        f_names = utils.ListVisibleFiles(dir_name)
2194
      except EnvironmentError, err:
2195
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2196
        break
2197
      for name in f_names:
2198
        os_path = utils.PathJoin(dir_name, name)
2199
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2200
        if status:
2201
          diagnose = ""
2202
          variants = os_inst.supported_variants
2203
          parameters = os_inst.supported_parameters
2204
          api_versions = os_inst.api_versions
2205
        else:
2206
          diagnose = os_inst
2207
          variants = parameters = api_versions = []
2208
        result.append((name, os_path, status, diagnose, variants,
2209
                       parameters, api_versions))
2210

    
2211
  return result
2212

    
2213

    
2214
def _TryOSFromDisk(name, base_dir=None):
2215
  """Create an OS instance from disk.
2216

2217
  This function will return an OS instance if the given name is a
2218
  valid OS name.
2219

2220
  @type base_dir: string
2221
  @keyword base_dir: Base directory containing OS installations.
2222
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2223
  @rtype: tuple
2224
  @return: success and either the OS instance if we find a valid one,
2225
      or error message
2226

2227
  """
2228
  if base_dir is None:
2229
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2230
  else:
2231
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2232

    
2233
  if os_dir is None:
2234
    return False, "Directory for OS %s not found in search path" % name
2235

    
2236
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2237
  if not status:
2238
    # push the error up
2239
    return status, api_versions
2240

    
2241
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2242
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2243
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2244

    
2245
  # OS Files dictionary, we will populate it with the absolute path
2246
  # names; if the value is True, then it is a required file, otherwise
2247
  # an optional one
2248
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2249

    
2250
  if max(api_versions) >= constants.OS_API_V15:
2251
    os_files[constants.OS_VARIANTS_FILE] = False
2252

    
2253
  if max(api_versions) >= constants.OS_API_V20:
2254
    os_files[constants.OS_PARAMETERS_FILE] = True
2255
  else:
2256
    del os_files[constants.OS_SCRIPT_VERIFY]
2257

    
2258
  for (filename, required) in os_files.items():
2259
    os_files[filename] = utils.PathJoin(os_dir, filename)
2260

    
2261
    try:
2262
      st = os.stat(os_files[filename])
2263
    except EnvironmentError, err:
2264
      if err.errno == errno.ENOENT and not required:
2265
        del os_files[filename]
2266
        continue
2267
      return False, ("File '%s' under path '%s' is missing (%s)" %
2268
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2269

    
2270
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2271
      return False, ("File '%s' under path '%s' is not a regular file" %
2272
                     (filename, os_dir))
2273

    
2274
    if filename in constants.OS_SCRIPTS:
2275
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2276
        return False, ("File '%s' under path '%s' is not executable" %
2277
                       (filename, os_dir))
2278

    
2279
  variants = []
2280
  if constants.OS_VARIANTS_FILE in os_files:
2281
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2282
    try:
2283
      variants = utils.ReadFile(variants_file).splitlines()
2284
    except EnvironmentError, err:
2285
      # we accept missing files, but not other errors
2286
      if err.errno != errno.ENOENT:
2287
        return False, ("Error while reading the OS variants file at %s: %s" %
2288
                       (variants_file, utils.ErrnoOrStr(err)))
2289

    
2290
  parameters = []
2291
  if constants.OS_PARAMETERS_FILE in os_files:
2292
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2293
    try:
2294
      parameters = utils.ReadFile(parameters_file).splitlines()
2295
    except EnvironmentError, err:
2296
      return False, ("Error while reading the OS parameters file at %s: %s" %
2297
                     (parameters_file, utils.ErrnoOrStr(err)))
2298
    parameters = [v.split(None, 1) for v in parameters]
2299

    
2300
  os_obj = objects.OS(name=name, path=os_dir,
2301
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2302
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2303
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2304
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2305
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2306
                                                 None),
2307
                      supported_variants=variants,
2308
                      supported_parameters=parameters,
2309
                      api_versions=api_versions)
2310
  return True, os_obj
2311

    
2312

    
2313
def OSFromDisk(name, base_dir=None):
2314
  """Create an OS instance from disk.
2315

2316
  This function will return an OS instance if the given name is a
2317
  valid OS name. Otherwise, it will raise an appropriate
2318
  L{RPCFail} exception, detailing why this is not a valid OS.
2319

2320
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2321
  an exception but returns true/false status data.
2322

2323
  @type base_dir: string
2324
  @keyword base_dir: Base directory containing OS installations.
2325
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2326
  @rtype: L{objects.OS}
2327
  @return: the OS instance if we find a valid one
2328
  @raise RPCFail: if we don't find a valid OS
2329

2330
  """
2331
  name_only = objects.OS.GetName(name)
2332
  status, payload = _TryOSFromDisk(name_only, base_dir)
2333

    
2334
  if not status:
2335
    _Fail(payload)
2336

    
2337
  return payload
2338

    
2339

    
2340
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2341
  """Calculate the basic environment for an os script.
2342

2343
  @type os_name: str
2344
  @param os_name: full operating system name (including variant)
2345
  @type inst_os: L{objects.OS}
2346
  @param inst_os: operating system for which the environment is being built
2347
  @type os_params: dict
2348
  @param os_params: the OS parameters
2349
  @type debug: integer
2350
  @param debug: debug level (0 or 1, for OS Api 10)
2351
  @rtype: dict
2352
  @return: dict of environment variables
2353
  @raise errors.BlockDeviceError: if the block device
2354
      cannot be found
2355

2356
  """
2357
  result = {}
2358
  api_version = \
2359
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2360
  result["OS_API_VERSION"] = "%d" % api_version
2361
  result["OS_NAME"] = inst_os.name
2362
  result["DEBUG_LEVEL"] = "%d" % debug
2363

    
2364
  # OS variants
2365
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2366
    variant = objects.OS.GetVariant(os_name)
2367
    if not variant:
2368
      variant = inst_os.supported_variants[0]
2369
  else:
2370
    variant = ""
2371
  result["OS_VARIANT"] = variant
2372

    
2373
  # OS params
2374
  for pname, pvalue in os_params.items():
2375
    result["OSP_%s" % pname.upper()] = pvalue
2376

    
2377
  # Set a default path otherwise programs called by OS scripts (or
2378
  # even hooks called from OS scripts) might break, and we don't want
2379
  # to have each script require setting a PATH variable
2380
  result["PATH"] = constants.HOOKS_PATH
2381

    
2382
  return result
2383

    
2384

    
2385
def OSEnvironment(instance, inst_os, debug=0):
2386
  """Calculate the environment for an os script.
2387

2388
  @type instance: L{objects.Instance}
2389
  @param instance: target instance for the os script run
2390
  @type inst_os: L{objects.OS}
2391
  @param inst_os: operating system for which the environment is being built
2392
  @type debug: integer
2393
  @param debug: debug level (0 or 1, for OS Api 10)
2394
  @rtype: dict
2395
  @return: dict of environment variables
2396
  @raise errors.BlockDeviceError: if the block device
2397
      cannot be found
2398

2399
  """
2400
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2401

    
2402
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2403
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2404

    
2405
  result["HYPERVISOR"] = instance.hypervisor
2406
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2407
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2408
  result["INSTANCE_SECONDARY_NODES"] = \
2409
      ("%s" % " ".join(instance.secondary_nodes))
2410

    
2411
  # Disks
2412
  for idx, disk in enumerate(instance.disks):
2413
    real_disk = _OpenRealBD(disk)
2414
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2415
    result["DISK_%d_ACCESS" % idx] = disk.mode
2416
    if constants.HV_DISK_TYPE in instance.hvparams:
2417
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2418
        instance.hvparams[constants.HV_DISK_TYPE]
2419
    if disk.dev_type in constants.LDS_BLOCK:
2420
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2421
    elif disk.dev_type == constants.LD_FILE:
2422
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2423
        "file:%s" % disk.physical_id[0]
2424

    
2425
  # NICs
2426
  for idx, nic in enumerate(instance.nics):
2427
    result["NIC_%d_MAC" % idx] = nic.mac
2428
    if nic.ip:
2429
      result["NIC_%d_IP" % idx] = nic.ip
2430
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2431
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2432
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2433
    if nic.nicparams[constants.NIC_LINK]:
2434
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2435
    if constants.HV_NIC_TYPE in instance.hvparams:
2436
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2437
        instance.hvparams[constants.HV_NIC_TYPE]
2438

    
2439
  # HV/BE params
2440
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2441
    for key, value in source.items():
2442
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2443

    
2444
  return result
2445

    
2446

    
2447
def DiagnoseExtStorage(top_dirs=None):
2448
  """Compute the validity for all ExtStorage Providers.
2449

2450
  @type top_dirs: list
2451
  @param top_dirs: the list of directories in which to
2452
      search (if not given defaults to
2453
      L{pathutils.ES_SEARCH_PATH})
2454
  @rtype: list of L{objects.ExtStorage}
2455
  @return: a list of tuples (name, path, status, diagnose, parameters)
2456
      for all (potential) ExtStorage Providers under all
2457
      search paths, where:
2458
          - name is the (potential) ExtStorage Provider
2459
          - path is the full path to the ExtStorage Provider
2460
          - status True/False is the validity of the ExtStorage Provider
2461
          - diagnose is the error message for an invalid ExtStorage Provider,
2462
            otherwise empty
2463
          - parameters is a list of (name, help) parameters, if any
2464

2465
  """
2466
  if top_dirs is None:
2467
    top_dirs = pathutils.ES_SEARCH_PATH
2468

    
2469
  result = []
2470
  for dir_name in top_dirs:
2471
    if os.path.isdir(dir_name):
2472
      try:
2473
        f_names = utils.ListVisibleFiles(dir_name)
2474
      except EnvironmentError, err:
2475
        logging.exception("Can't list the ExtStorage directory %s: %s",
2476
                          dir_name, err)
2477
        break
2478
      for name in f_names:
2479
        es_path = utils.PathJoin(dir_name, name)
2480
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2481
        if status:
2482
          diagnose = ""
2483
          parameters = es_inst.supported_parameters
2484
        else:
2485
          diagnose = es_inst
2486
          parameters = []
2487
        result.append((name, es_path, status, diagnose, parameters))
2488

    
2489
  return result
2490

    
2491

    
2492
def BlockdevGrow(disk, amount, dryrun, backingstore):
2493
  """Grow a stack of block devices.
2494

2495
  This function is called recursively, with the childrens being the
2496
  first ones to resize.
2497

2498
  @type disk: L{objects.Disk}
2499
  @param disk: the disk to be grown
2500
  @type amount: integer
2501
  @param amount: the amount (in mebibytes) to grow with
2502
  @type dryrun: boolean
2503
  @param dryrun: whether to execute the operation in simulation mode
2504
      only, without actually increasing the size
2505
  @param backingstore: whether to execute the operation on backing storage
2506
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2507
      whereas LVM, file, RBD are backing storage
2508
  @rtype: (status, result)
2509
  @return: a tuple with the status of the operation (True/False), and
2510
      the errors message if status is False
2511

2512
  """
2513
  r_dev = _RecursiveFindBD(disk)
2514
  if r_dev is None:
2515
    _Fail("Cannot find block device %s", disk)
2516

    
2517
  try:
2518
    r_dev.Grow(amount, dryrun, backingstore)
2519
  except errors.BlockDeviceError, err:
2520
    _Fail("Failed to grow block device: %s", err, exc=True)
2521

    
2522

    
2523
def BlockdevSnapshot(disk):
2524
  """Create a snapshot copy of a block device.
2525

2526
  This function is called recursively, and the snapshot is actually created
2527
  just for the leaf lvm backend device.
2528

2529
  @type disk: L{objects.Disk}
2530
  @param disk: the disk to be snapshotted
2531
  @rtype: string
2532
  @return: snapshot disk ID as (vg, lv)
2533

2534
  """
2535
  if disk.dev_type == constants.LD_DRBD8:
2536
    if not disk.children:
2537
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2538
            disk.unique_id)
2539
    return BlockdevSnapshot(disk.children[0])
2540
  elif disk.dev_type == constants.LD_LV:
2541
    r_dev = _RecursiveFindBD(disk)
2542
    if r_dev is not None:
2543
      # FIXME: choose a saner value for the snapshot size
2544
      # let's stay on the safe side and ask for the full size, for now
2545
      return r_dev.Snapshot(disk.size)
2546
    else:
2547
      _Fail("Cannot find block device %s", disk)
2548
  else:
2549
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2550
          disk.unique_id, disk.dev_type)
2551

    
2552

    
2553
def FinalizeExport(instance, snap_disks):
2554
  """Write out the export configuration information.
2555

2556
  @type instance: L{objects.Instance}
2557
  @param instance: the instance which we export, used for
2558
      saving configuration
2559
  @type snap_disks: list of L{objects.Disk}
2560
  @param snap_disks: list of snapshot block devices, which
2561
      will be used to get the actual name of the dump file
2562

2563
  @rtype: None
2564

2565
  """
2566
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2567
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2568

    
2569
  config = objects.SerializableConfigParser()
2570

    
2571
  config.add_section(constants.INISECT_EXP)
2572
  config.set(constants.INISECT_EXP, "version", "0")
2573
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2574
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2575
  config.set(constants.INISECT_EXP, "os", instance.os)
2576
  config.set(constants.INISECT_EXP, "compression", "none")
2577

    
2578
  config.add_section(constants.INISECT_INS)
2579
  config.set(constants.INISECT_INS, "name", instance.name)
2580
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2581
             instance.beparams[constants.BE_MAXMEM])
2582
  config.set(constants.INISECT_INS, "minmem", "%d" %
2583
             instance.beparams[constants.BE_MINMEM])
2584
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2585
  config.set(constants.INISECT_INS, "memory", "%d" %
2586
             instance.beparams[constants.BE_MAXMEM])
2587
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2588
             instance.beparams[constants.BE_VCPUS])
2589
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2590
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2591
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2592

    
2593
  nic_total = 0
2594
  for nic_count, nic in enumerate(instance.nics):
2595
    nic_total += 1
2596
    config.set(constants.INISECT_INS, "nic%d_mac" %
2597
               nic_count, "%s" % nic.mac)
2598
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2599
    for param in constants.NICS_PARAMETER_TYPES:
2600
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2601
                 "%s" % nic.nicparams.get(param, None))
2602
  # TODO: redundant: on load can read nics until it doesn't exist
2603
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2604

    
2605
  disk_total = 0
2606
  for disk_count, disk in enumerate(snap_disks):
2607
    if disk:
2608
      disk_total += 1
2609
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2610
                 ("%s" % disk.iv_name))
2611
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2612
                 ("%s" % disk.physical_id[1]))
2613
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2614
                 ("%d" % disk.size))
2615

    
2616
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2617

    
2618
  # New-style hypervisor/backend parameters
2619

    
2620
  config.add_section(constants.INISECT_HYP)
2621
  for name, value in instance.hvparams.items():
2622
    if name not in constants.HVC_GLOBALS:
2623
      config.set(constants.INISECT_HYP, name, str(value))
2624

    
2625
  config.add_section(constants.INISECT_BEP)
2626
  for name, value in instance.beparams.items():
2627
    config.set(constants.INISECT_BEP, name, str(value))
2628

    
2629
  config.add_section(constants.INISECT_OSP)
2630
  for name, value in instance.osparams.items():
2631
    config.set(constants.INISECT_OSP, name, str(value))
2632

    
2633
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2634
                  data=config.Dumps())
2635
  shutil.rmtree(finaldestdir, ignore_errors=True)
2636
  shutil.move(destdir, finaldestdir)
2637

    
2638

    
2639
def ExportInfo(dest):
2640
  """Get export configuration information.
2641

2642
  @type dest: str
2643
  @param dest: directory containing the export
2644

2645
  @rtype: L{objects.SerializableConfigParser}
2646
  @return: a serializable config file containing the
2647
      export info
2648

2649
  """
2650
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2651

    
2652
  config = objects.SerializableConfigParser()
2653
  config.read(cff)
2654

    
2655
  if (not config.has_section(constants.INISECT_EXP) or
2656
      not config.has_section(constants.INISECT_INS)):
2657
    _Fail("Export info file doesn't have the required fields")
2658

    
2659
  return config.Dumps()
2660

    
2661

    
2662
def ListExports():
2663
  """Return a list of exports currently available on this machine.
2664

2665
  @rtype: list
2666
  @return: list of the exports
2667

2668
  """
2669
  if os.path.isdir(pathutils.EXPORT_DIR):
2670
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2671
  else:
2672
    _Fail("No exports directory")
2673

    
2674

    
2675
def RemoveExport(export):
2676
  """Remove an existing export from the node.
2677

2678
  @type export: str
2679
  @param export: the name of the export to remove
2680
  @rtype: None
2681

2682
  """
2683
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2684

    
2685
  try:
2686
    shutil.rmtree(target)
2687
  except EnvironmentError, err:
2688
    _Fail("Error while removing the export: %s", err, exc=True)
2689

    
2690

    
2691
def BlockdevRename(devlist):
2692
  """Rename a list of block devices.
2693

2694
  @type devlist: list of tuples
2695
  @param devlist: list of tuples of the form  (disk,
2696
      new_logical_id, new_physical_id); disk is an
2697
      L{objects.Disk} object describing the current disk,
2698
      and new logical_id/physical_id is the name we
2699
      rename it to
2700
  @rtype: boolean
2701
  @return: True if all renames succeeded, False otherwise
2702

2703
  """
2704
  msgs = []
2705
  result = True
2706
  for disk, unique_id in devlist:
2707
    dev = _RecursiveFindBD(disk)
2708
    if dev is None:
2709
      msgs.append("Can't find device %s in rename" % str(disk))
2710
      result = False
2711
      continue
2712
    try:
2713
      old_rpath = dev.dev_path
2714
      dev.Rename(unique_id)
2715
      new_rpath = dev.dev_path
2716
      if old_rpath != new_rpath:
2717
        DevCacheManager.RemoveCache(old_rpath)
2718
        # FIXME: we should add the new cache information here, like:
2719
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2720
        # but we don't have the owner here - maybe parse from existing
2721
        # cache? for now, we only lose lvm data when we rename, which
2722
        # is less critical than DRBD or MD
2723
    except errors.BlockDeviceError, err:
2724
      msgs.append("Can't rename device '%s' to '%s': %s" %
2725
                  (dev, unique_id, err))
2726
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2727
      result = False
2728
  if not result:
2729
    _Fail("; ".join(msgs))
2730

    
2731

    
2732
def _TransformFileStorageDir(fs_dir):
2733
  """Checks whether given file_storage_dir is valid.
2734

2735
  Checks wheter the given fs_dir is within the cluster-wide default
2736
  file_storage_dir or the shared_file_storage_dir, which are stored in
2737
  SimpleStore. Only paths under those directories are allowed.
2738

2739
  @type fs_dir: str
2740
  @param fs_dir: the path to check
2741

2742
  @return: the normalized path if valid, None otherwise
2743

2744
  """
2745
  if not constants.ENABLE_FILE_STORAGE:
2746
    _Fail("File storage disabled at configure time")
2747
  cfg = _GetConfig()
2748
  fs_dir = os.path.normpath(fs_dir)
2749
  base_fstore = cfg.GetFileStorageDir()
2750
  base_shared = cfg.GetSharedFileStorageDir()
2751
  if not (utils.IsBelowDir(base_fstore, fs_dir) or
2752
          utils.IsBelowDir(base_shared, fs_dir)):
2753
    _Fail("File storage directory '%s' is not under base file"
2754
          " storage directory '%s' or shared storage directory '%s'",
2755
          fs_dir, base_fstore, base_shared)
2756
  return fs_dir
2757

    
2758

    
2759
def CreateFileStorageDir(file_storage_dir):
2760
  """Create file storage directory.
2761

2762
  @type file_storage_dir: str
2763
  @param file_storage_dir: directory to create
2764

2765
  @rtype: tuple
2766
  @return: tuple with first element a boolean indicating wheter dir
2767
      creation was successful or not
2768

2769
  """
2770
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2771
  if os.path.exists(file_storage_dir):
2772
    if not os.path.isdir(file_storage_dir):
2773
      _Fail("Specified storage dir '%s' is not a directory",
2774
            file_storage_dir)
2775
  else:
2776
    try:
2777
      os.makedirs(file_storage_dir, 0750)
2778
    except OSError, err:
2779
      _Fail("Cannot create file storage directory '%s': %s",
2780
            file_storage_dir, err, exc=True)
2781

    
2782

    
2783
def RemoveFileStorageDir(file_storage_dir):
2784
  """Remove file storage directory.
2785

2786
  Remove it only if it's empty. If not log an error and return.
2787

2788
  @type file_storage_dir: str
2789
  @param file_storage_dir: the directory we should cleanup
2790
  @rtype: tuple (success,)
2791
  @return: tuple of one element, C{success}, denoting
2792
      whether the operation was successful
2793

2794
  """
2795
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2796
  if os.path.exists(file_storage_dir):
2797
    if not os.path.isdir(file_storage_dir):
2798
      _Fail("Specified Storage directory '%s' is not a directory",
2799
            file_storage_dir)
2800
    # deletes dir only if empty, otherwise we want to fail the rpc call
2801
    try:
2802
      os.rmdir(file_storage_dir)
2803
    except OSError, err:
2804
      _Fail("Cannot remove file storage directory '%s': %s",
2805
            file_storage_dir, err)
2806

    
2807

    
2808
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2809
  """Rename the file storage directory.
2810

2811
  @type old_file_storage_dir: str
2812
  @param old_file_storage_dir: the current path
2813
  @type new_file_storage_dir: str
2814
  @param new_file_storage_dir: the name we should rename to
2815
  @rtype: tuple (success,)
2816
  @return: tuple of one element, C{success}, denoting
2817
      whether the operation was successful
2818

2819
  """
2820
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2821
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2822
  if not os.path.exists(new_file_storage_dir):
2823
    if os.path.isdir(old_file_storage_dir):
2824
      try:
2825
        os.rename(old_file_storage_dir, new_file_storage_dir)
2826
      except OSError, err:
2827
        _Fail("Cannot rename '%s' to '%s': %s",
2828
              old_file_storage_dir, new_file_storage_dir, err)
2829
    else:
2830
      _Fail("Specified storage dir '%s' is not a directory",
2831
            old_file_storage_dir)
2832
  else:
2833
    if os.path.exists(old_file_storage_dir):
2834
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2835
            old_file_storage_dir, new_file_storage_dir)
2836

    
2837

    
2838
def _EnsureJobQueueFile(file_name):
2839
  """Checks whether the given filename is in the queue directory.
2840

2841
  @type file_name: str
2842
  @param file_name: the file name we should check
2843
  @rtype: None
2844
  @raises RPCFail: if the file is not valid
2845

2846
  """
2847
  queue_dir = os.path.normpath(pathutils.QUEUE_DIR)
2848
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2849

    
2850
  if not result:
2851
    _Fail("Passed job queue file '%s' does not belong to"
2852
          " the queue directory '%s'", file_name, queue_dir)
2853

    
2854

    
2855
def JobQueueUpdate(file_name, content):
2856
  """Updates a file in the queue directory.
2857

2858
  This is just a wrapper over L{utils.io.WriteFile}, with proper
2859
  checking.
2860

2861
  @type file_name: str
2862
  @param file_name: the job file name
2863
  @type content: str
2864
  @param content: the new job contents
2865
  @rtype: boolean
2866
  @return: the success of the operation
2867

2868
  """
2869
  _EnsureJobQueueFile(file_name)
2870
  getents = runtime.GetEnts()
2871

    
2872
  # Write and replace the file atomically
2873
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2874
                  gid=getents.masterd_gid)
2875

    
2876

    
2877
def JobQueueRename(old, new):
2878
  """Renames a job queue file.
2879

2880
  This is just a wrapper over os.rename with proper checking.
2881

2882
  @type old: str
2883
  @param old: the old (actual) file name
2884
  @type new: str
2885
  @param new: the desired file name
2886
  @rtype: tuple
2887
  @return: the success of the operation and payload
2888

2889
  """
2890
  _EnsureJobQueueFile(old)
2891
  _EnsureJobQueueFile(new)
2892

    
2893
  getents = runtime.GetEnts()
2894

    
2895
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0700,
2896
                   dir_uid=getents.masterd_uid, dir_gid=getents.masterd_gid)
2897

    
2898

    
2899
def BlockdevClose(instance_name, disks):
2900
  """Closes the given block devices.
2901

2902
  This means they will be switched to secondary mode (in case of
2903
  DRBD).
2904

2905
  @param instance_name: if the argument is not empty, the symlinks
2906
      of this instance will be removed
2907
  @type disks: list of L{objects.Disk}
2908
  @param disks: the list of disks to be closed
2909
  @rtype: tuple (success, message)
2910
  @return: a tuple of success and message, where success
2911
      indicates the succes of the operation, and message
2912
      which will contain the error details in case we
2913
      failed
2914

2915
  """
2916
  bdevs = []
2917
  for cf in disks:
2918
    rd = _RecursiveFindBD(cf)
2919
    if rd is None:
2920
      _Fail("Can't find device %s", cf)
2921
    bdevs.append(rd)
2922

    
2923
  msg = []
2924
  for rd in bdevs:
2925
    try:
2926
      rd.Close()
2927
    except errors.BlockDeviceError, err:
2928
      msg.append(str(err))
2929
  if msg:
2930
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2931
  else:
2932
    if instance_name:
2933
      _RemoveBlockDevLinks(instance_name, disks)
2934

    
2935

    
2936
def ValidateHVParams(hvname, hvparams):
2937
  """Validates the given hypervisor parameters.
2938

2939
  @type hvname: string
2940
  @param hvname: the hypervisor name
2941
  @type hvparams: dict
2942
  @param hvparams: the hypervisor parameters to be validated
2943
  @rtype: None
2944

2945
  """
2946
  try:
2947
    hv_type = hypervisor.GetHypervisor(hvname)
2948
    hv_type.ValidateParameters(hvparams)
2949
  except errors.HypervisorError, err:
2950
    _Fail(str(err), log=False)
2951

    
2952

    
2953
def _CheckOSPList(os_obj, parameters):
2954
  """Check whether a list of parameters is supported by the OS.
2955

2956
  @type os_obj: L{objects.OS}
2957
  @param os_obj: OS object to check
2958
  @type parameters: list
2959
  @param parameters: the list of parameters to check
2960

2961
  """
2962
  supported = [v[0] for v in os_obj.supported_parameters]
2963
  delta = frozenset(parameters).difference(supported)
2964
  if delta:
2965
    _Fail("The following parameters are not supported"
2966
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2967

    
2968

    
2969
def ValidateOS(required, osname, checks, osparams):
2970
  """Validate the given OS' parameters.
2971

2972
  @type required: boolean
2973
  @param required: whether absence of the OS should translate into
2974
      failure or not
2975
  @type osname: string
2976
  @param osname: the OS to be validated
2977
  @type checks: list
2978
  @param checks: list of the checks to run (currently only 'parameters')
2979
  @type osparams: dict
2980
  @param osparams: dictionary with OS parameters
2981
  @rtype: boolean
2982
  @return: True if the validation passed, or False if the OS was not
2983
      found and L{required} was false
2984

2985
  """
2986
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2987
    _Fail("Unknown checks required for OS %s: %s", osname,
2988
          set(checks).difference(constants.OS_VALIDATE_CALLS))
2989

    
2990
  name_only = objects.OS.GetName(osname)
2991
  status, tbv = _TryOSFromDisk(name_only, None)
2992

    
2993
  if not status:
2994
    if required:
2995
      _Fail(tbv)
2996
    else:
2997
      return False
2998

    
2999
  if max(tbv.api_versions) < constants.OS_API_V20:
3000
    return True
3001

    
3002
  if constants.OS_VALIDATE_PARAMETERS in checks:
3003
    _CheckOSPList(tbv, osparams.keys())
3004

    
3005
  validate_env = OSCoreEnv(osname, tbv, osparams)
3006
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3007
                        cwd=tbv.path, reset_env=True)
3008
  if result.failed:
3009
    logging.error("os validate command '%s' returned error: %s output: %s",
3010
                  result.cmd, result.fail_reason, result.output)
3011
    _Fail("OS validation script failed (%s), output: %s",
3012
          result.fail_reason, result.output, log=False)
3013

    
3014
  return True
3015

    
3016

    
3017
def DemoteFromMC():
3018
  """Demotes the current node from master candidate role.
3019

3020
  """
3021
  # try to ensure we're not the master by mistake
3022
  master, myself = ssconf.GetMasterAndMyself()
3023
  if master == myself:
3024
    _Fail("ssconf status shows I'm the master node, will not demote")
3025

    
3026
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3027
  if not result.failed:
3028
    _Fail("The master daemon is running, will not demote")
3029

    
3030
  try:
3031
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3032
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3033
  except EnvironmentError, err:
3034
    if err.errno != errno.ENOENT:
3035
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3036

    
3037
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3038

    
3039

    
3040
def _GetX509Filenames(cryptodir, name):
3041
  """Returns the full paths for the private key and certificate.
3042

3043
  """
3044
  return (utils.PathJoin(cryptodir, name),
3045
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3046
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3047

    
3048

    
3049
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3050
  """Creates a new X509 certificate for SSL/TLS.
3051

3052
  @type validity: int
3053
  @param validity: Validity in seconds
3054
  @rtype: tuple; (string, string)
3055
  @return: Certificate name and public part
3056

3057
  """
3058
  (key_pem, cert_pem) = \
3059
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3060
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3061

    
3062
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3063
                              prefix="x509-%s-" % utils.TimestampForFilename())
3064
  try:
3065
    name = os.path.basename(cert_dir)
3066
    assert len(name) > 5
3067

    
3068
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3069

    
3070
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3071
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3072

    
3073
    # Never return private key as it shouldn't leave the node
3074
    return (name, cert_pem)
3075
  except Exception:
3076
    shutil.rmtree(cert_dir, ignore_errors=True)
3077
    raise
3078

    
3079

    
3080
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3081
  """Removes a X509 certificate.
3082

3083
  @type name: string
3084
  @param name: Certificate name
3085

3086
  """
3087
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3088

    
3089
  utils.RemoveFile(key_file)
3090
  utils.RemoveFile(cert_file)
3091

    
3092
  try:
3093
    os.rmdir(cert_dir)
3094
  except EnvironmentError, err:
3095
    _Fail("Cannot remove certificate directory '%s': %s",
3096
          cert_dir, err)
3097

    
3098

    
3099
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3100
  """Returns the command for the requested input/output.
3101

3102
  @type instance: L{objects.Instance}
3103
  @param instance: The instance object
3104
  @param mode: Import/export mode
3105
  @param ieio: Input/output type
3106
  @param ieargs: Input/output arguments
3107

3108
  """
3109
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3110

    
3111
  env = None
3112
  prefix = None
3113
  suffix = None
3114
  exp_size = None
3115

    
3116
  if ieio == constants.IEIO_FILE:
3117
    (filename, ) = ieargs
3118

    
3119
    if not utils.IsNormAbsPath(filename):
3120
      _Fail("Path '%s' is not normalized or absolute", filename)
3121

    
3122
    real_filename = os.path.realpath(filename)
3123
    directory = os.path.dirname(real_filename)
3124

    
3125
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3126
      _Fail("File '%s' is not under exports directory '%s': %s",
3127
            filename, pathutils.EXPORT_DIR, real_filename)
3128

    
3129
    # Create directory
3130
    utils.Makedirs(directory, mode=0750)
3131

    
3132
    quoted_filename = utils.ShellQuote(filename)
3133

    
3134
    if mode == constants.IEM_IMPORT:
3135
      suffix = "> %s" % quoted_filename
3136
    elif mode == constants.IEM_EXPORT:
3137
      suffix = "< %s" % quoted_filename
3138

    
3139
      # Retrieve file size
3140
      try:
3141
        st = os.stat(filename)
3142
      except EnvironmentError, err:
3143
        logging.error("Can't stat(2) %s: %s", filename, err)
3144
      else:
3145
        exp_size = utils.BytesToMebibyte(st.st_size)
3146

    
3147
  elif ieio == constants.IEIO_RAW_DISK:
3148
    (disk, ) = ieargs
3149

    
3150
    real_disk = _OpenRealBD(disk)
3151

    
3152
    if mode == constants.IEM_IMPORT:
3153
      # we set here a smaller block size as, due to transport buffering, more
3154
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3155
      # is not already there or we pass a wrong path; we use notrunc to no
3156
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3157
      # much memory; this means that at best, we flush every 64k, which will
3158
      # not be very fast
3159
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3160
                                    " bs=%s oflag=dsync"),
3161
                                    real_disk.dev_path,
3162
                                    str(64 * 1024))
3163

    
3164
    elif mode == constants.IEM_EXPORT:
3165
      # the block size on the read dd is 1MiB to match our units
3166
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3167
                                   real_disk.dev_path,
3168
                                   str(1024 * 1024), # 1 MB
3169
                                   str(disk.size))
3170
      exp_size = disk.size
3171

    
3172
  elif ieio == constants.IEIO_SCRIPT:
3173
    (disk, disk_index, ) = ieargs
3174

    
3175
    assert isinstance(disk_index, (int, long))
3176

    
3177
    real_disk = _OpenRealBD(disk)
3178

    
3179
    inst_os = OSFromDisk(instance.os)
3180
    env = OSEnvironment(instance, inst_os)
3181

    
3182
    if mode == constants.IEM_IMPORT:
3183
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3184
      env["IMPORT_INDEX"] = str(disk_index)
3185
      script = inst_os.import_script
3186

    
3187
    elif mode == constants.IEM_EXPORT:
3188
      env["EXPORT_DEVICE"] = real_disk.dev_path
3189
      env["EXPORT_INDEX"] = str(disk_index)
3190
      script = inst_os.export_script
3191

    
3192
    # TODO: Pass special environment only to script
3193
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3194

    
3195
    if mode == constants.IEM_IMPORT:
3196
      suffix = "| %s" % script_cmd
3197

    
3198
    elif mode == constants.IEM_EXPORT:
3199
      prefix = "%s |" % script_cmd
3200

    
3201
    # Let script predict size
3202
    exp_size = constants.IE_CUSTOM_SIZE
3203

    
3204
  else:
3205
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3206

    
3207
  return (env, prefix, suffix, exp_size)
3208

    
3209

    
3210
def _CreateImportExportStatusDir(prefix):
3211
  """Creates status directory for import/export.
3212

3213
  """
3214
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3215
                          prefix=("%s-%s-" %
3216
                                  (prefix, utils.TimestampForFilename())))
3217

    
3218

    
3219
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3220
                            ieio, ieioargs):
3221
  """Starts an import or export daemon.
3222

3223
  @param mode: Import/output mode
3224
  @type opts: L{objects.ImportExportOptions}
3225
  @param opts: Daemon options
3226
  @type host: string
3227
  @param host: Remote host for export (None for import)
3228
  @type port: int
3229
  @param port: Remote port for export (None for import)
3230
  @type instance: L{objects.Instance}
3231
  @param instance: Instance object
3232
  @type component: string
3233
  @param component: which part of the instance is transferred now,
3234
      e.g. 'disk/0'
3235
  @param ieio: Input/output type
3236
  @param ieioargs: Input/output arguments
3237

3238
  """
3239
  if mode == constants.IEM_IMPORT:
3240
    prefix = "import"
3241

    
3242
    if not (host is None and port is None):
3243
      _Fail("Can not specify host or port on import")
3244

    
3245
  elif mode == constants.IEM_EXPORT:
3246
    prefix = "export"
3247

    
3248
    if host is None or port is None:
3249
      _Fail("Host and port must be specified for an export")
3250

    
3251
  else:
3252
    _Fail("Invalid mode %r", mode)
3253

    
3254
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3255
    _Fail("Cluster certificate can only be used for both key and CA")
3256

    
3257
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3258
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3259

    
3260
  if opts.key_name is None:
3261
    # Use server.pem
3262
    key_path = pathutils.NODED_CERT_FILE
3263
    cert_path = pathutils.NODED_CERT_FILE
3264
    assert opts.ca_pem is None
3265
  else:
3266
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3267
                                                 opts.key_name)
3268
    assert opts.ca_pem is not None
3269

    
3270
  for i in [key_path, cert_path]:
3271
    if not os.path.exists(i):
3272
      _Fail("File '%s' does not exist" % i)
3273

    
3274
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3275
  try:
3276
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3277
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3278
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3279

    
3280
    if opts.ca_pem is None:
3281
      # Use server.pem
3282
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3283
    else:
3284
      ca = opts.ca_pem
3285

    
3286
    # Write CA file
3287
    utils.WriteFile(ca_file, data=ca, mode=0400)
3288

    
3289
    cmd = [
3290
      pathutils.IMPORT_EXPORT_DAEMON,
3291
      status_file, mode,
3292
      "--key=%s" % key_path,
3293
      "--cert=%s" % cert_path,
3294
      "--ca=%s" % ca_file,
3295
      ]
3296

    
3297
    if host:
3298
      cmd.append("--host=%s" % host)
3299

    
3300
    if port:
3301
      cmd.append("--port=%s" % port)
3302

    
3303
    if opts.ipv6:
3304
      cmd.append("--ipv6")
3305
    else:
3306
      cmd.append("--ipv4")
3307

    
3308
    if opts.compress:
3309
      cmd.append("--compress=%s" % opts.compress)
3310

    
3311
    if opts.magic:
3312
      cmd.append("--magic=%s" % opts.magic)
3313

    
3314
    if exp_size is not None:
3315
      cmd.append("--expected-size=%s" % exp_size)
3316

    
3317
    if cmd_prefix:
3318
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3319

    
3320
    if cmd_suffix:
3321
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3322

    
3323
    if mode == constants.IEM_EXPORT:
3324
      # Retry connection a few times when connecting to remote peer
3325
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3326
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3327
    elif opts.connect_timeout is not None:
3328
      assert mode == constants.IEM_IMPORT
3329
      # Overall timeout for establishing connection while listening
3330
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3331

    
3332
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3333

    
3334
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3335
    # support for receiving a file descriptor for output
3336
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3337
                      output=logfile)
3338

    
3339
    # The import/export name is simply the status directory name
3340
    return os.path.basename(status_dir)
3341

    
3342
  except Exception:
3343
    shutil.rmtree(status_dir, ignore_errors=True)
3344
    raise
3345

    
3346

    
3347
def GetImportExportStatus(names):
3348
  """Returns import/export daemon status.
3349

3350
  @type names: sequence
3351
  @param names: List of names
3352
  @rtype: List of dicts
3353
  @return: Returns a list of the state of each named import/export or None if a
3354
           status couldn't be read
3355

3356
  """
3357
  result = []
3358

    
3359
  for name in names:
3360
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3361
                                 _IES_STATUS_FILE)
3362

    
3363
    try:
3364
      data = utils.ReadFile(status_file)
3365
    except EnvironmentError, err:
3366
      if err.errno != errno.ENOENT:
3367
        raise
3368
      data = None
3369

    
3370
    if not data:
3371
      result.append(None)
3372
      continue
3373

    
3374
    result.append(serializer.LoadJson(data))
3375

    
3376
  return result
3377

    
3378

    
3379
def AbortImportExport(name):
3380
  """Sends SIGTERM to a running import/export daemon.
3381

3382
  """
3383
  logging.info("Abort import/export %s", name)
3384

    
3385
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3386
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3387

    
3388
  if pid:
3389
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3390
                 name, pid)
3391
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3392

    
3393

    
3394
def CleanupImportExport(name):
3395
  """Cleanup after an import or export.
3396

3397
  If the import/export daemon is still running it's killed. Afterwards the
3398
  whole status directory is removed.
3399

3400
  """
3401
  logging.info("Finalizing import/export %s", name)
3402

    
3403
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3404

    
3405
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3406

    
3407
  if pid:
3408
    logging.info("Import/export %s is still running with PID %s",
3409
                 name, pid)
3410
    utils.KillProcess(pid, waitpid=False)
3411

    
3412
  shutil.rmtree(status_dir, ignore_errors=True)
3413

    
3414

    
3415
def _FindDisks(nodes_ip, disks):
3416
  """Sets the physical ID on disks and returns the block devices.
3417

3418
  """
3419
  # set the correct physical ID
3420
  my_name = netutils.Hostname.GetSysName()
3421
  for cf in disks:
3422
    cf.SetPhysicalID(my_name, nodes_ip)
3423

    
3424
  bdevs = []
3425

    
3426
  for cf in disks:
3427
    rd = _RecursiveFindBD(cf)
3428
    if rd is None:
3429
      _Fail("Can't find device %s", cf)
3430
    bdevs.append(rd)
3431
  return bdevs
3432

    
3433

    
3434
def DrbdDisconnectNet(nodes_ip, disks):
3435
  """Disconnects the network on a list of drbd devices.
3436

3437
  """
3438
  bdevs = _FindDisks(nodes_ip, disks)
3439

    
3440
  # disconnect disks
3441
  for rd in bdevs:
3442
    try:
3443
      rd.DisconnectNet()
3444
    except errors.BlockDeviceError, err:
3445
      _Fail("Can't change network configuration to standalone mode: %s",
3446
            err, exc=True)
3447

    
3448

    
3449
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3450
  """Attaches the network on a list of drbd devices.
3451

3452
  """
3453
  bdevs = _FindDisks(nodes_ip, disks)
3454

    
3455
  if multimaster:
3456
    for idx, rd in enumerate(bdevs):
3457
      try:
3458
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3459
      except EnvironmentError, err:
3460
        _Fail("Can't create symlink: %s", err)
3461
  # reconnect disks, switch to new master configuration and if
3462
  # needed primary mode
3463
  for rd in bdevs:
3464
    try:
3465
      rd.AttachNet(multimaster)
3466
    except errors.BlockDeviceError, err:
3467
      _Fail("Can't change network configuration: %s", err)
3468

    
3469
  # wait until the disks are connected; we need to retry the re-attach
3470
  # if the device becomes standalone, as this might happen if the one
3471
  # node disconnects and reconnects in a different mode before the
3472
  # other node reconnects; in this case, one or both of the nodes will
3473
  # decide it has wrong configuration and switch to standalone
3474

    
3475
  def _Attach():
3476
    all_connected = True
3477

    
3478
    for rd in bdevs:
3479
      stats = rd.GetProcStatus()
3480

    
3481
      all_connected = (all_connected and
3482
                       (stats.is_connected or stats.is_in_resync))
3483

    
3484
      if stats.is_standalone:
3485
        # peer had different config info and this node became
3486
        # standalone, even though this should not happen with the
3487
        # new staged way of changing disk configs
3488
        try:
3489
          rd.AttachNet(multimaster)
3490
        except errors.BlockDeviceError, err:
3491
          _Fail("Can't change network configuration: %s", err)
3492

    
3493
    if not all_connected:
3494
      raise utils.RetryAgain()
3495

    
3496
  try:
3497
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3498
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3499
  except utils.RetryTimeout:
3500
    _Fail("Timeout in disk reconnecting")
3501

    
3502
  if multimaster:
3503
    # change to primary mode
3504
    for rd in bdevs:
3505
      try:
3506
        rd.Open()
3507
      except errors.BlockDeviceError, err:
3508
        _Fail("Can't change to primary mode: %s", err)
3509

    
3510

    
3511
def DrbdWaitSync(nodes_ip, disks):
3512
  """Wait until DRBDs have synchronized.
3513

3514
  """
3515
  def _helper(rd):
3516
    stats = rd.GetProcStatus()
3517
    if not (stats.is_connected or stats.is_in_resync):
3518
      raise utils.RetryAgain()
3519
    return stats
3520

    
3521
  bdevs = _FindDisks(nodes_ip, disks)
3522

    
3523
  min_resync = 100
3524
  alldone = True
3525
  for rd in bdevs:
3526
    try:
3527
      # poll each second for 15 seconds
3528
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3529
    except utils.RetryTimeout:
3530
      stats = rd.GetProcStatus()
3531
      # last check
3532
      if not (stats.is_connected or stats.is_in_resync):
3533
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3534
    alldone = alldone and (not stats.is_in_resync)
3535
    if stats.sync_percent is not None:
3536
      min_resync = min(min_resync, stats.sync_percent)
3537

    
3538
  return (alldone, min_resync)
3539

    
3540

    
3541
def GetDrbdUsermodeHelper():
3542
  """Returns DRBD usermode helper currently configured.
3543

3544
  """
3545
  try:
3546
    return bdev.BaseDRBD.GetUsermodeHelper()
3547
  except errors.BlockDeviceError, err:
3548
    _Fail(str(err))
3549

    
3550

    
3551
def PowercycleNode(hypervisor_type):
3552
  """Hard-powercycle the node.
3553

3554
  Because we need to return first, and schedule the powercycle in the
3555
  background, we won't be able to report failures nicely.
3556

3557
  """
3558
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3559
  try:
3560
    pid = os.fork()
3561
  except OSError:
3562
    # if we can't fork, we'll pretend that we're in the child process
3563
    pid = 0
3564
  if pid > 0:
3565
    return "Reboot scheduled in 5 seconds"
3566
  # ensure the child is running on ram
3567
  try:
3568
    utils.Mlockall()
3569
  except Exception: # pylint: disable=W0703
3570
    pass
3571
  time.sleep(5)
3572
  hyper.PowercycleNode()
3573

    
3574

    
3575
class HooksRunner(object):
3576
  """Hook runner.
3577

3578
  This class is instantiated on the node side (ganeti-noded) and not
3579
  on the master side.
3580

3581
  """
3582
  def __init__(self, hooks_base_dir=None):
3583
    """Constructor for hooks runner.
3584

3585
    @type hooks_base_dir: str or None
3586
    @param hooks_base_dir: if not None, this overrides the
3587
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3588

3589
    """
3590
    if hooks_base_dir is None:
3591
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3592
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3593
    # constant
3594
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3595

    
3596
  def RunLocalHooks(self, node_list, hpath, phase, env):
3597
    """Check that the hooks will be run only locally and then run them.
3598

3599
    """
3600
    assert len(node_list) == 1
3601
    node = node_list[0]
3602
    _, myself = ssconf.GetMasterAndMyself()
3603
    assert node == myself
3604

    
3605
    results = self.RunHooks(hpath, phase, env)
3606

    
3607
    # Return values in the form expected by HooksMaster
3608
    return {node: (None, False, results)}
3609

    
3610
  def RunHooks(self, hpath, phase, env):
3611
    """Run the scripts in the hooks directory.
3612

3613
    @type hpath: str
3614
    @param hpath: the path to the hooks directory which
3615
        holds the scripts
3616
    @type phase: str
3617
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3618
        L{constants.HOOKS_PHASE_POST}
3619
    @type env: dict
3620
    @param env: dictionary with the environment for the hook
3621
    @rtype: list
3622
    @return: list of 3-element tuples:
3623
      - script path
3624
      - script result, either L{constants.HKR_SUCCESS} or
3625
        L{constants.HKR_FAIL}
3626
      - output of the script
3627

3628
    @raise errors.ProgrammerError: for invalid input
3629
        parameters
3630

3631
    """
3632
    if phase == constants.HOOKS_PHASE_PRE:
3633
      suffix = "pre"
3634
    elif phase == constants.HOOKS_PHASE_POST:
3635
      suffix = "post"
3636
    else:
3637
      _Fail("Unknown hooks phase '%s'", phase)
3638

    
3639
    subdir = "%s-%s.d" % (hpath, suffix)
3640
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3641

    
3642
    results = []
3643

    
3644
    if not os.path.isdir(dir_name):
3645
      # for non-existing/non-dirs, we simply exit instead of logging a
3646
      # warning at every operation
3647
      return results
3648

    
3649
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3650

    
3651
    for (relname, relstatus, runresult) in runparts_results:
3652
      if relstatus == constants.RUNPARTS_SKIP:
3653
        rrval = constants.HKR_SKIP
3654
        output = ""
3655
      elif relstatus == constants.RUNPARTS_ERR:
3656
        rrval = constants.HKR_FAIL
3657
        output = "Hook script execution error: %s" % runresult
3658
      elif relstatus == constants.RUNPARTS_RUN:
3659
        if runresult.failed:
3660
          rrval = constants.HKR_FAIL
3661
        else:
3662
          rrval = constants.HKR_SUCCESS
3663
        output = utils.SafeEncode(runresult.output.strip())
3664
      results.append(("%s/%s" % (subdir, relname), rrval, output))
3665

    
3666
    return results
3667

    
3668

    
3669
class IAllocatorRunner(object):
3670
  """IAllocator runner.
3671

3672
  This class is instantiated on the node side (ganeti-noded) and not on
3673
  the master side.
3674

3675
  """
3676
  @staticmethod
3677
  def Run(name, idata):
3678
    """Run an iallocator script.
3679

3680
    @type name: str
3681
    @param name: the iallocator script name
3682
    @type idata: str
3683
    @param idata: the allocator input data
3684

3685
    @rtype: tuple
3686
    @return: two element tuple of:
3687
       - status
3688
       - either error message or stdout of allocator (for success)
3689

3690
    """
3691
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3692
                                  os.path.isfile)
3693
    if alloc_script is None:
3694
      _Fail("iallocator module '%s' not found in the search path", name)
3695

    
3696
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3697
    try:
3698
      os.write(fd, idata)
3699
      os.close(fd)
3700
      result = utils.RunCmd([alloc_script, fin_name])
3701
      if result.failed:
3702
        _Fail("iallocator module '%s' failed: %s, output '%s'",
3703
              name, result.fail_reason, result.output)
3704
    finally:
3705
      os.unlink(fin_name)
3706

    
3707
    return result.stdout
3708

    
3709

    
3710
class DevCacheManager(object):
3711
  """Simple class for managing a cache of block device information.
3712

3713
  """
3714
  _DEV_PREFIX = "/dev/"
3715
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
3716

    
3717
  @classmethod
3718
  def _ConvertPath(cls, dev_path):
3719
    """Converts a /dev/name path to the cache file name.
3720

3721
    This replaces slashes with underscores and strips the /dev
3722
    prefix. It then returns the full path to the cache file.
3723

3724
    @type dev_path: str
3725
    @param dev_path: the C{/dev/} path name
3726
    @rtype: str
3727
    @return: the converted path name
3728

3729
    """
3730
    if dev_path.startswith(cls._DEV_PREFIX):
3731
      dev_path = dev_path[len(cls._DEV_PREFIX):]
3732
    dev_path = dev_path.replace("/", "_")
3733
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3734
    return fpath
3735

    
3736
  @classmethod
3737
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3738
    """Updates the cache information for a given device.
3739

3740
    @type dev_path: str
3741
    @param dev_path: the pathname of the device
3742
    @type owner: str
3743
    @param owner: the owner (instance name) of the device
3744
    @type on_primary: bool
3745
    @param on_primary: whether this is the primary
3746
        node nor not
3747
    @type iv_name: str
3748
    @param iv_name: the instance-visible name of the
3749
        device, as in objects.Disk.iv_name
3750

3751
    @rtype: None
3752

3753
    """
3754
    if dev_path is None:
3755
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
3756
      return
3757
    fpath = cls._ConvertPath(dev_path)
3758
    if on_primary:
3759
      state = "primary"
3760
    else:
3761
      state = "secondary"
3762
    if iv_name is None:
3763
      iv_name = "not_visible"
3764
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3765
    try:
3766
      utils.WriteFile(fpath, data=fdata)
3767
    except EnvironmentError, err:
3768
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3769

    
3770
  @classmethod
3771
  def RemoveCache(cls, dev_path):
3772
    """Remove data for a dev_path.
3773

3774
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
3775
    path name and logging.
3776

3777
    @type dev_path: str
3778
    @param dev_path: the pathname of the device
3779

3780
    @rtype: None
3781

3782
    """
3783
    if dev_path is None:
3784
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
3785
      return
3786
    fpath = cls._ConvertPath(dev_path)
3787
    try:
3788
      utils.RemoveFile(fpath)
3789
    except EnvironmentError, err:
3790
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)