Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ cde49218

History | View | Annotate | Download (125.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26
@var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27
     in the L{_CleanDirectory} function
28

29
"""
30

    
31
# pylint: disable=E1103
32

    
33
# E1103: %s %r has no %r member (but some types could not be
34
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
35
# or (False, "string") which confuses pylint
36

    
37

    
38
import os
39
import os.path
40
import shutil
41
import time
42
import stat
43
import errno
44
import re
45
import random
46
import logging
47
import tempfile
48
import zlib
49
import base64
50
import signal
51

    
52
from ganeti import errors
53
from ganeti import utils
54
from ganeti import ssh
55
from ganeti import hypervisor
56
from ganeti import constants
57
from ganeti.storage import bdev
58
from ganeti.storage import drbd
59
from ganeti import objects
60
from ganeti import ssconf
61
from ganeti import serializer
62
from ganeti import netutils
63
from ganeti import runtime
64
from ganeti import compat
65
from ganeti import pathutils
66
from ganeti import vcluster
67
from ganeti import ht
68
from ganeti.storage.base import BlockDev
69
from ganeti.storage.drbd import DRBD8
70
from ganeti import hooksmaster
71

    
72

    
73
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
74
_ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
75
  pathutils.DATA_DIR,
76
  pathutils.JOB_QUEUE_ARCHIVE_DIR,
77
  pathutils.QUEUE_DIR,
78
  pathutils.CRYPTO_KEYS_DIR,
79
  ])
80
_MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
81
_X509_KEY_FILE = "key"
82
_X509_CERT_FILE = "cert"
83
_IES_STATUS_FILE = "status"
84
_IES_PID_FILE = "pid"
85
_IES_CA_FILE = "ca"
86

    
87
#: Valid LVS output line regex
88
_LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
89

    
90
# Actions for the master setup script
91
_MASTER_START = "start"
92
_MASTER_STOP = "stop"
93

    
94
#: Maximum file permissions for restricted command directory and executables
95
_RCMD_MAX_MODE = (stat.S_IRWXU |
96
                  stat.S_IRGRP | stat.S_IXGRP |
97
                  stat.S_IROTH | stat.S_IXOTH)
98

    
99
#: Delay before returning an error for restricted commands
100
_RCMD_INVALID_DELAY = 10
101

    
102
#: How long to wait to acquire lock for restricted commands (shorter than
103
#: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
104
#: command requests arrive
105
_RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
106

    
107

    
108
class RPCFail(Exception):
109
  """Class denoting RPC failure.
110

111
  Its argument is the error message.
112

113
  """
114

    
115

    
116
def _GetInstReasonFilename(instance_name):
117
  """Path of the file containing the reason of the instance status change.
118

119
  @type instance_name: string
120
  @param instance_name: The name of the instance
121
  @rtype: string
122
  @return: The path of the file
123

124
  """
125
  return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
126

    
127

    
128
def _StoreInstReasonTrail(instance_name, trail):
129
  """Serialize a reason trail related to an instance change of state to file.
130

131
  The exact location of the file depends on the name of the instance and on
132
  the configuration of the Ganeti cluster defined at deploy time.
133

134
  @type instance_name: string
135
  @param instance_name: The name of the instance
136
  @rtype: None
137

138
  """
139
  json = serializer.DumpJson(trail)
140
  filename = _GetInstReasonFilename(instance_name)
141
  utils.WriteFile(filename, data=json)
142

    
143

    
144
def _Fail(msg, *args, **kwargs):
145
  """Log an error and the raise an RPCFail exception.
146

147
  This exception is then handled specially in the ganeti daemon and
148
  turned into a 'failed' return type. As such, this function is a
149
  useful shortcut for logging the error and returning it to the master
150
  daemon.
151

152
  @type msg: string
153
  @param msg: the text of the exception
154
  @raise RPCFail
155

156
  """
157
  if args:
158
    msg = msg % args
159
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
160
    if "exc" in kwargs and kwargs["exc"]:
161
      logging.exception(msg)
162
    else:
163
      logging.error(msg)
164
  raise RPCFail(msg)
165

    
166

    
167
def _GetConfig():
168
  """Simple wrapper to return a SimpleStore.
169

170
  @rtype: L{ssconf.SimpleStore}
171
  @return: a SimpleStore instance
172

173
  """
174
  return ssconf.SimpleStore()
175

    
176

    
177
def _GetSshRunner(cluster_name):
178
  """Simple wrapper to return an SshRunner.
179

180
  @type cluster_name: str
181
  @param cluster_name: the cluster name, which is needed
182
      by the SshRunner constructor
183
  @rtype: L{ssh.SshRunner}
184
  @return: an SshRunner instance
185

186
  """
187
  return ssh.SshRunner(cluster_name)
188

    
189

    
190
def _Decompress(data):
191
  """Unpacks data compressed by the RPC client.
192

193
  @type data: list or tuple
194
  @param data: Data sent by RPC client
195
  @rtype: str
196
  @return: Decompressed data
197

198
  """
199
  assert isinstance(data, (list, tuple))
200
  assert len(data) == 2
201
  (encoding, content) = data
202
  if encoding == constants.RPC_ENCODING_NONE:
203
    return content
204
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
205
    return zlib.decompress(base64.b64decode(content))
206
  else:
207
    raise AssertionError("Unknown data encoding")
208

    
209

    
210
def _CleanDirectory(path, exclude=None):
211
  """Removes all regular files in a directory.
212

213
  @type path: str
214
  @param path: the directory to clean
215
  @type exclude: list
216
  @param exclude: list of files to be excluded, defaults
217
      to the empty list
218

219
  """
220
  if path not in _ALLOWED_CLEAN_DIRS:
221
    _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
222
          path)
223

    
224
  if not os.path.isdir(path):
225
    return
226
  if exclude is None:
227
    exclude = []
228
  else:
229
    # Normalize excluded paths
230
    exclude = [os.path.normpath(i) for i in exclude]
231

    
232
  for rel_name in utils.ListVisibleFiles(path):
233
    full_name = utils.PathJoin(path, rel_name)
234
    if full_name in exclude:
235
      continue
236
    if os.path.isfile(full_name) and not os.path.islink(full_name):
237
      utils.RemoveFile(full_name)
238

    
239

    
240
def _BuildUploadFileList():
241
  """Build the list of allowed upload files.
242

243
  This is abstracted so that it's built only once at module import time.
244

245
  """
246
  allowed_files = set([
247
    pathutils.CLUSTER_CONF_FILE,
248
    pathutils.ETC_HOSTS,
249
    pathutils.SSH_KNOWN_HOSTS_FILE,
250
    pathutils.VNC_PASSWORD_FILE,
251
    pathutils.RAPI_CERT_FILE,
252
    pathutils.SPICE_CERT_FILE,
253
    pathutils.SPICE_CACERT_FILE,
254
    pathutils.RAPI_USERS_FILE,
255
    pathutils.CONFD_HMAC_KEY,
256
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
257
    ])
258

    
259
  for hv_name in constants.HYPER_TYPES:
260
    hv_class = hypervisor.GetHypervisorClass(hv_name)
261
    allowed_files.update(hv_class.GetAncillaryFiles()[0])
262

    
263
  assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
264
    "Allowed file storage paths should never be uploaded via RPC"
265

    
266
  return frozenset(allowed_files)
267

    
268

    
269
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
270

    
271

    
272
def JobQueuePurge():
273
  """Removes job queue files and archived jobs.
274

275
  @rtype: tuple
276
  @return: True, None
277

278
  """
279
  _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
280
  _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
281

    
282

    
283
def GetMasterInfo():
284
  """Returns master information.
285

286
  This is an utility function to compute master information, either
287
  for consumption here or from the node daemon.
288

289
  @rtype: tuple
290
  @return: master_netdev, master_ip, master_name, primary_ip_family,
291
    master_netmask
292
  @raise RPCFail: in case of errors
293

294
  """
295
  try:
296
    cfg = _GetConfig()
297
    master_netdev = cfg.GetMasterNetdev()
298
    master_ip = cfg.GetMasterIP()
299
    master_netmask = cfg.GetMasterNetmask()
300
    master_node = cfg.GetMasterNode()
301
    primary_ip_family = cfg.GetPrimaryIPFamily()
302
  except errors.ConfigurationError, err:
303
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
304
  return (master_netdev, master_ip, master_node, primary_ip_family,
305
          master_netmask)
306

    
307

    
308
def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
309
  """Decorator that runs hooks before and after the decorated function.
310

311
  @type hook_opcode: string
312
  @param hook_opcode: opcode of the hook
313
  @type hooks_path: string
314
  @param hooks_path: path of the hooks
315
  @type env_builder_fn: function
316
  @param env_builder_fn: function that returns a dictionary containing the
317
    environment variables for the hooks. Will get all the parameters of the
318
    decorated function.
319
  @raise RPCFail: in case of pre-hook failure
320

321
  """
322
  def decorator(fn):
323
    def wrapper(*args, **kwargs):
324
      _, myself = ssconf.GetMasterAndMyself()
325
      nodes = ([myself], [myself])  # these hooks run locally
326

    
327
      env_fn = compat.partial(env_builder_fn, *args, **kwargs)
328

    
329
      cfg = _GetConfig()
330
      hr = HooksRunner()
331
      hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
332
                                   hr.RunLocalHooks, None, env_fn,
333
                                   logging.warning, cfg.GetClusterName(),
334
                                   cfg.GetMasterNode())
335
      hm.RunPhase(constants.HOOKS_PHASE_PRE)
336
      result = fn(*args, **kwargs)
337
      hm.RunPhase(constants.HOOKS_PHASE_POST)
338

    
339
      return result
340
    return wrapper
341
  return decorator
342

    
343

    
344
def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
345
  """Builds environment variables for master IP hooks.
346

347
  @type master_params: L{objects.MasterNetworkParameters}
348
  @param master_params: network parameters of the master
349
  @type use_external_mip_script: boolean
350
  @param use_external_mip_script: whether to use an external master IP
351
    address setup script (unused, but necessary per the implementation of the
352
    _RunLocalHooks decorator)
353

354
  """
355
  # pylint: disable=W0613
356
  ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
357
  env = {
358
    "MASTER_NETDEV": master_params.netdev,
359
    "MASTER_IP": master_params.ip,
360
    "MASTER_NETMASK": str(master_params.netmask),
361
    "CLUSTER_IP_VERSION": str(ver),
362
  }
363

    
364
  return env
365

    
366

    
367
def _RunMasterSetupScript(master_params, action, use_external_mip_script):
368
  """Execute the master IP address setup script.
369

370
  @type master_params: L{objects.MasterNetworkParameters}
371
  @param master_params: network parameters of the master
372
  @type action: string
373
  @param action: action to pass to the script. Must be one of
374
    L{backend._MASTER_START} or L{backend._MASTER_STOP}
375
  @type use_external_mip_script: boolean
376
  @param use_external_mip_script: whether to use an external master IP
377
    address setup script
378
  @raise backend.RPCFail: if there are errors during the execution of the
379
    script
380

381
  """
382
  env = _BuildMasterIpEnv(master_params)
383

    
384
  if use_external_mip_script:
385
    setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
386
  else:
387
    setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
388

    
389
  result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
390

    
391
  if result.failed:
392
    _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
393
          (action, result.exit_code, result.output), log=True)
394

    
395

    
396
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
397
               _BuildMasterIpEnv)
398
def ActivateMasterIp(master_params, use_external_mip_script):
399
  """Activate the IP address of the master daemon.
400

401
  @type master_params: L{objects.MasterNetworkParameters}
402
  @param master_params: network parameters of the master
403
  @type use_external_mip_script: boolean
404
  @param use_external_mip_script: whether to use an external master IP
405
    address setup script
406
  @raise RPCFail: in case of errors during the IP startup
407

408
  """
409
  _RunMasterSetupScript(master_params, _MASTER_START,
410
                        use_external_mip_script)
411

    
412

    
413
def StartMasterDaemons(no_voting):
414
  """Activate local node as master node.
415

416
  The function will start the master daemons (ganeti-masterd and ganeti-rapi).
417

418
  @type no_voting: boolean
419
  @param no_voting: whether to start ganeti-masterd without a node vote
420
      but still non-interactively
421
  @rtype: None
422

423
  """
424

    
425
  if no_voting:
426
    masterd_args = "--no-voting --yes-do-it"
427
  else:
428
    masterd_args = ""
429

    
430
  env = {
431
    "EXTRA_MASTERD_ARGS": masterd_args,
432
    }
433

    
434
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
435
  if result.failed:
436
    msg = "Can't start Ganeti master: %s" % result.output
437
    logging.error(msg)
438
    _Fail(msg)
439

    
440

    
441
@RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
442
               _BuildMasterIpEnv)
443
def DeactivateMasterIp(master_params, use_external_mip_script):
444
  """Deactivate the master IP on this node.
445

446
  @type master_params: L{objects.MasterNetworkParameters}
447
  @param master_params: network parameters of the master
448
  @type use_external_mip_script: boolean
449
  @param use_external_mip_script: whether to use an external master IP
450
    address setup script
451
  @raise RPCFail: in case of errors during the IP turndown
452

453
  """
454
  _RunMasterSetupScript(master_params, _MASTER_STOP,
455
                        use_external_mip_script)
456

    
457

    
458
def StopMasterDaemons():
459
  """Stop the master daemons on this node.
460

461
  Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
462

463
  @rtype: None
464

465
  """
466
  # TODO: log and report back to the caller the error failures; we
467
  # need to decide in which case we fail the RPC for this
468

    
469
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
470
  if result.failed:
471
    logging.error("Could not stop Ganeti master, command %s had exitcode %s"
472
                  " and error %s",
473
                  result.cmd, result.exit_code, result.output)
474

    
475

    
476
def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
477
  """Change the netmask of the master IP.
478

479
  @param old_netmask: the old value of the netmask
480
  @param netmask: the new value of the netmask
481
  @param master_ip: the master IP
482
  @param master_netdev: the master network device
483

484
  """
485
  if old_netmask == netmask:
486
    return
487

    
488
  if not netutils.IPAddress.Own(master_ip):
489
    _Fail("The master IP address is not up, not attempting to change its"
490
          " netmask")
491

    
492
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
493
                         "%s/%s" % (master_ip, netmask),
494
                         "dev", master_netdev, "label",
495
                         "%s:0" % master_netdev])
496
  if result.failed:
497
    _Fail("Could not set the new netmask on the master IP address")
498

    
499
  result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
500
                         "%s/%s" % (master_ip, old_netmask),
501
                         "dev", master_netdev, "label",
502
                         "%s:0" % master_netdev])
503
  if result.failed:
504
    _Fail("Could not bring down the master IP address with the old netmask")
505

    
506

    
507
def EtcHostsModify(mode, host, ip):
508
  """Modify a host entry in /etc/hosts.
509

510
  @param mode: The mode to operate. Either add or remove entry
511
  @param host: The host to operate on
512
  @param ip: The ip associated with the entry
513

514
  """
515
  if mode == constants.ETC_HOSTS_ADD:
516
    if not ip:
517
      RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
518
              " present")
519
    utils.AddHostToEtcHosts(host, ip)
520
  elif mode == constants.ETC_HOSTS_REMOVE:
521
    if ip:
522
      RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
523
              " parameter is present")
524
    utils.RemoveHostFromEtcHosts(host)
525
  else:
526
    RPCFail("Mode not supported")
527

    
528

    
529
def LeaveCluster(modify_ssh_setup):
530
  """Cleans up and remove the current node.
531

532
  This function cleans up and prepares the current node to be removed
533
  from the cluster.
534

535
  If processing is successful, then it raises an
536
  L{errors.QuitGanetiException} which is used as a special case to
537
  shutdown the node daemon.
538

539
  @param modify_ssh_setup: boolean
540

541
  """
542
  _CleanDirectory(pathutils.DATA_DIR)
543
  _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
544
  JobQueuePurge()
545

    
546
  if modify_ssh_setup:
547
    try:
548
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
549

    
550
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
551

    
552
      utils.RemoveFile(priv_key)
553
      utils.RemoveFile(pub_key)
554
    except errors.OpExecError:
555
      logging.exception("Error while processing ssh files")
556

    
557
  try:
558
    utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
559
    utils.RemoveFile(pathutils.RAPI_CERT_FILE)
560
    utils.RemoveFile(pathutils.SPICE_CERT_FILE)
561
    utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
562
    utils.RemoveFile(pathutils.NODED_CERT_FILE)
563
  except: # pylint: disable=W0702
564
    logging.exception("Error while removing cluster secrets")
565

    
566
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.CONFD])
567
  if result.failed:
568
    logging.error("Command %s failed with exitcode %s and error %s",
569
                  result.cmd, result.exit_code, result.output)
570

    
571
  # Raise a custom exception (handled in ganeti-noded)
572
  raise errors.QuitGanetiException(True, "Shutdown scheduled")
573

    
574

    
575
def _GetVgInfo(name, excl_stor):
576
  """Retrieves information about a LVM volume group.
577

578
  """
579
  # TODO: GetVGInfo supports returning information for multiple VGs at once
580
  vginfo = bdev.LogicalVolume.GetVGInfo([name], excl_stor)
581
  if vginfo:
582
    vg_free = int(round(vginfo[0][0], 0))
583
    vg_size = int(round(vginfo[0][1], 0))
584
  else:
585
    vg_free = None
586
    vg_size = None
587

    
588
  return {
589
    "name": name,
590
    "vg_free": vg_free,
591
    "vg_size": vg_size,
592
    }
593

    
594

    
595
def _GetHvInfo(name):
596
  """Retrieves node information from a hypervisor.
597

598
  The information returned depends on the hypervisor. Common items:
599

600
    - vg_size is the size of the configured volume group in MiB
601
    - vg_free is the free size of the volume group in MiB
602
    - memory_dom0 is the memory allocated for domain0 in MiB
603
    - memory_free is the currently available (free) ram in MiB
604
    - memory_total is the total number of ram in MiB
605
    - hv_version: the hypervisor version, if available
606

607
  """
608
  return hypervisor.GetHypervisor(name).GetNodeInfo()
609

    
610

    
611
def _GetNamedNodeInfo(names, fn):
612
  """Calls C{fn} for all names in C{names} and returns a dictionary.
613

614
  @rtype: None or dict
615

616
  """
617
  if names is None:
618
    return None
619
  else:
620
    return map(fn, names)
621

    
622

    
623
def GetNodeInfo(vg_names, hv_names, excl_stor):
624
  """Gives back a hash with different information about the node.
625

626
  @type vg_names: list of string
627
  @param vg_names: Names of the volume groups to ask for disk space information
628
  @type hv_names: list of string
629
  @param hv_names: Names of the hypervisors to ask for node information
630
  @type excl_stor: boolean
631
  @param excl_stor: Whether exclusive_storage is active
632
  @rtype: tuple; (string, None/dict, None/dict)
633
  @return: Tuple containing boot ID, volume group information and hypervisor
634
    information
635

636
  """
637
  bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
638
  vg_info = _GetNamedNodeInfo(vg_names, (lambda vg: _GetVgInfo(vg, excl_stor)))
639
  hv_info = _GetNamedNodeInfo(hv_names, _GetHvInfo)
640

    
641
  return (bootid, vg_info, hv_info)
642

    
643

    
644
def _CheckExclusivePvs(pvi_list):
645
  """Check that PVs are not shared among LVs
646

647
  @type pvi_list: list of L{objects.LvmPvInfo} objects
648
  @param pvi_list: information about the PVs
649

650
  @rtype: list of tuples (string, list of strings)
651
  @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
652

653
  """
654
  res = []
655
  for pvi in pvi_list:
656
    if len(pvi.lv_list) > 1:
657
      res.append((pvi.name, pvi.lv_list))
658
  return res
659

    
660

    
661
def VerifyNode(what, cluster_name):
662
  """Verify the status of the local node.
663

664
  Based on the input L{what} parameter, various checks are done on the
665
  local node.
666

667
  If the I{filelist} key is present, this list of
668
  files is checksummed and the file/checksum pairs are returned.
669

670
  If the I{nodelist} key is present, we check that we have
671
  connectivity via ssh with the target nodes (and check the hostname
672
  report).
673

674
  If the I{node-net-test} key is present, we check that we have
675
  connectivity to the given nodes via both primary IP and, if
676
  applicable, secondary IPs.
677

678
  @type what: C{dict}
679
  @param what: a dictionary of things to check:
680
      - filelist: list of files for which to compute checksums
681
      - nodelist: list of nodes we should check ssh communication with
682
      - node-net-test: list of nodes we should check node daemon port
683
        connectivity with
684
      - hypervisor: list with hypervisors to run the verify for
685
  @rtype: dict
686
  @return: a dictionary with the same keys as the input dict, and
687
      values representing the result of the checks
688

689
  """
690
  result = {}
691
  my_name = netutils.Hostname.GetSysName()
692
  port = netutils.GetDaemonPort(constants.NODED)
693
  vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
694

    
695
  if constants.NV_HYPERVISOR in what and vm_capable:
696
    result[constants.NV_HYPERVISOR] = tmp = {}
697
    for hv_name in what[constants.NV_HYPERVISOR]:
698
      try:
699
        val = hypervisor.GetHypervisor(hv_name).Verify()
700
      except errors.HypervisorError, err:
701
        val = "Error while checking hypervisor: %s" % str(err)
702
      tmp[hv_name] = val
703

    
704
  if constants.NV_HVPARAMS in what and vm_capable:
705
    result[constants.NV_HVPARAMS] = tmp = []
706
    for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
707
      try:
708
        logging.info("Validating hv %s, %s", hv_name, hvparms)
709
        hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
710
      except errors.HypervisorError, err:
711
        tmp.append((source, hv_name, str(err)))
712

    
713
  if constants.NV_FILELIST in what:
714
    fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
715
                                              what[constants.NV_FILELIST]))
716
    result[constants.NV_FILELIST] = \
717
      dict((vcluster.MakeVirtualPath(key), value)
718
           for (key, value) in fingerprints.items())
719

    
720
  if constants.NV_NODELIST in what:
721
    (nodes, bynode) = what[constants.NV_NODELIST]
722

    
723
    # Add nodes from other groups (different for each node)
724
    try:
725
      nodes.extend(bynode[my_name])
726
    except KeyError:
727
      pass
728

    
729
    # Use a random order
730
    random.shuffle(nodes)
731

    
732
    # Try to contact all nodes
733
    val = {}
734
    for node in nodes:
735
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
736
      if not success:
737
        val[node] = message
738

    
739
    result[constants.NV_NODELIST] = val
740

    
741
  if constants.NV_NODENETTEST in what:
742
    result[constants.NV_NODENETTEST] = tmp = {}
743
    my_pip = my_sip = None
744
    for name, pip, sip in what[constants.NV_NODENETTEST]:
745
      if name == my_name:
746
        my_pip = pip
747
        my_sip = sip
748
        break
749
    if not my_pip:
750
      tmp[my_name] = ("Can't find my own primary/secondary IP"
751
                      " in the node list")
752
    else:
753
      for name, pip, sip in what[constants.NV_NODENETTEST]:
754
        fail = []
755
        if not netutils.TcpPing(pip, port, source=my_pip):
756
          fail.append("primary")
757
        if sip != pip:
758
          if not netutils.TcpPing(sip, port, source=my_sip):
759
            fail.append("secondary")
760
        if fail:
761
          tmp[name] = ("failure using the %s interface(s)" %
762
                       " and ".join(fail))
763

    
764
  if constants.NV_MASTERIP in what:
765
    # FIXME: add checks on incoming data structures (here and in the
766
    # rest of the function)
767
    master_name, master_ip = what[constants.NV_MASTERIP]
768
    if master_name == my_name:
769
      source = constants.IP4_ADDRESS_LOCALHOST
770
    else:
771
      source = None
772
    result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
773
                                                     source=source)
774

    
775
  if constants.NV_USERSCRIPTS in what:
776
    result[constants.NV_USERSCRIPTS] = \
777
      [script for script in what[constants.NV_USERSCRIPTS]
778
       if not utils.IsExecutable(script)]
779

    
780
  if constants.NV_OOB_PATHS in what:
781
    result[constants.NV_OOB_PATHS] = tmp = []
782
    for path in what[constants.NV_OOB_PATHS]:
783
      try:
784
        st = os.stat(path)
785
      except OSError, err:
786
        tmp.append("error stating out of band helper: %s" % err)
787
      else:
788
        if stat.S_ISREG(st.st_mode):
789
          if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
790
            tmp.append(None)
791
          else:
792
            tmp.append("out of band helper %s is not executable" % path)
793
        else:
794
          tmp.append("out of band helper %s is not a file" % path)
795

    
796
  if constants.NV_LVLIST in what and vm_capable:
797
    try:
798
      val = GetVolumeList(utils.ListVolumeGroups().keys())
799
    except RPCFail, err:
800
      val = str(err)
801
    result[constants.NV_LVLIST] = val
802

    
803
  if constants.NV_INSTANCELIST in what and vm_capable:
804
    # GetInstanceList can fail
805
    try:
806
      val = GetInstanceList(what[constants.NV_INSTANCELIST])
807
    except RPCFail, err:
808
      val = str(err)
809
    result[constants.NV_INSTANCELIST] = val
810

    
811
  if constants.NV_VGLIST in what and vm_capable:
812
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
813

    
814
  if constants.NV_PVLIST in what and vm_capable:
815
    check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
816
    val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
817
                                       filter_allocatable=False,
818
                                       include_lvs=check_exclusive_pvs)
819
    if check_exclusive_pvs:
820
      result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
821
      for pvi in val:
822
        # Avoid sending useless data on the wire
823
        pvi.lv_list = []
824
    result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
825

    
826
  if constants.NV_VERSION in what:
827
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
828
                                    constants.RELEASE_VERSION)
829

    
830
  if constants.NV_HVINFO in what and vm_capable:
831
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
832
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
833

    
834
  if constants.NV_DRBDVERSION in what and vm_capable:
835
    try:
836
      drbd_version = DRBD8.GetProcInfo().GetVersionString()
837
    except errors.BlockDeviceError, err:
838
      logging.warning("Can't get DRBD version", exc_info=True)
839
      drbd_version = str(err)
840
    result[constants.NV_DRBDVERSION] = drbd_version
841

    
842
  if constants.NV_DRBDLIST in what and vm_capable:
843
    try:
844
      used_minors = drbd.DRBD8.GetUsedDevs()
845
    except errors.BlockDeviceError, err:
846
      logging.warning("Can't get used minors list", exc_info=True)
847
      used_minors = str(err)
848
    result[constants.NV_DRBDLIST] = used_minors
849

    
850
  if constants.NV_DRBDHELPER in what and vm_capable:
851
    status = True
852
    try:
853
      payload = drbd.DRBD8.GetUsermodeHelper()
854
    except errors.BlockDeviceError, err:
855
      logging.error("Can't get DRBD usermode helper: %s", str(err))
856
      status = False
857
      payload = str(err)
858
    result[constants.NV_DRBDHELPER] = (status, payload)
859

    
860
  if constants.NV_NODESETUP in what:
861
    result[constants.NV_NODESETUP] = tmpr = []
862
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
863
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
864
                  " under /sys, missing required directories /sys/block"
865
                  " and /sys/class/net")
866
    if (not os.path.isdir("/proc/sys") or
867
        not os.path.isfile("/proc/sysrq-trigger")):
868
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
869
                  " under /proc, missing required directory /proc/sys and"
870
                  " the file /proc/sysrq-trigger")
871

    
872
  if constants.NV_TIME in what:
873
    result[constants.NV_TIME] = utils.SplitTime(time.time())
874

    
875
  if constants.NV_OSLIST in what and vm_capable:
876
    result[constants.NV_OSLIST] = DiagnoseOS()
877

    
878
  if constants.NV_BRIDGES in what and vm_capable:
879
    result[constants.NV_BRIDGES] = [bridge
880
                                    for bridge in what[constants.NV_BRIDGES]
881
                                    if not utils.BridgeExists(bridge)]
882

    
883
  if what.get(constants.NV_FILE_STORAGE_PATHS) == my_name:
884
    result[constants.NV_FILE_STORAGE_PATHS] = \
885
      bdev.ComputeWrongFileStoragePaths()
886

    
887
  return result
888

    
889

    
890
def GetBlockDevSizes(devices):
891
  """Return the size of the given block devices
892

893
  @type devices: list
894
  @param devices: list of block device nodes to query
895
  @rtype: dict
896
  @return:
897
    dictionary of all block devices under /dev (key). The value is their
898
    size in MiB.
899

900
    {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
901

902
  """
903
  DEV_PREFIX = "/dev/"
904
  blockdevs = {}
905

    
906
  for devpath in devices:
907
    if not utils.IsBelowDir(DEV_PREFIX, devpath):
908
      continue
909

    
910
    try:
911
      st = os.stat(devpath)
912
    except EnvironmentError, err:
913
      logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
914
      continue
915

    
916
    if stat.S_ISBLK(st.st_mode):
917
      result = utils.RunCmd(["blockdev", "--getsize64", devpath])
918
      if result.failed:
919
        # We don't want to fail, just do not list this device as available
920
        logging.warning("Cannot get size for block device %s", devpath)
921
        continue
922

    
923
      size = int(result.stdout) / (1024 * 1024)
924
      blockdevs[devpath] = size
925
  return blockdevs
926

    
927

    
928
def GetVolumeList(vg_names):
929
  """Compute list of logical volumes and their size.
930

931
  @type vg_names: list
932
  @param vg_names: the volume groups whose LVs we should list, or
933
      empty for all volume groups
934
  @rtype: dict
935
  @return:
936
      dictionary of all partions (key) with value being a tuple of
937
      their size (in MiB), inactive and online status::
938

939
        {'xenvg/test1': ('20.06', True, True)}
940

941
      in case of errors, a string is returned with the error
942
      details.
943

944
  """
945
  lvs = {}
946
  sep = "|"
947
  if not vg_names:
948
    vg_names = []
949
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
950
                         "--separator=%s" % sep,
951
                         "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
952
  if result.failed:
953
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
954

    
955
  for line in result.stdout.splitlines():
956
    line = line.strip()
957
    match = _LVSLINE_REGEX.match(line)
958
    if not match:
959
      logging.error("Invalid line returned from lvs output: '%s'", line)
960
      continue
961
    vg_name, name, size, attr = match.groups()
962
    inactive = attr[4] == "-"
963
    online = attr[5] == "o"
964
    virtual = attr[0] == "v"
965
    if virtual:
966
      # we don't want to report such volumes as existing, since they
967
      # don't really hold data
968
      continue
969
    lvs[vg_name + "/" + name] = (size, inactive, online)
970

    
971
  return lvs
972

    
973

    
974
def ListVolumeGroups():
975
  """List the volume groups and their size.
976

977
  @rtype: dict
978
  @return: dictionary with keys volume name and values the
979
      size of the volume
980

981
  """
982
  return utils.ListVolumeGroups()
983

    
984

    
985
def NodeVolumes():
986
  """List all volumes on this node.
987

988
  @rtype: list
989
  @return:
990
    A list of dictionaries, each having four keys:
991
      - name: the logical volume name,
992
      - size: the size of the logical volume
993
      - dev: the physical device on which the LV lives
994
      - vg: the volume group to which it belongs
995

996
    In case of errors, we return an empty list and log the
997
    error.
998

999
    Note that since a logical volume can live on multiple physical
1000
    volumes, the resulting list might include a logical volume
1001
    multiple times.
1002

1003
  """
1004
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
1005
                         "--separator=|",
1006
                         "--options=lv_name,lv_size,devices,vg_name"])
1007
  if result.failed:
1008
    _Fail("Failed to list logical volumes, lvs output: %s",
1009
          result.output)
1010

    
1011
  def parse_dev(dev):
1012
    return dev.split("(")[0]
1013

    
1014
  def handle_dev(dev):
1015
    return [parse_dev(x) for x in dev.split(",")]
1016

    
1017
  def map_line(line):
1018
    line = [v.strip() for v in line]
1019
    return [{"name": line[0], "size": line[1],
1020
             "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
1021

    
1022
  all_devs = []
1023
  for line in result.stdout.splitlines():
1024
    if line.count("|") >= 3:
1025
      all_devs.extend(map_line(line.split("|")))
1026
    else:
1027
      logging.warning("Strange line in the output from lvs: '%s'", line)
1028
  return all_devs
1029

    
1030

    
1031
def BridgesExist(bridges_list):
1032
  """Check if a list of bridges exist on the current node.
1033

1034
  @rtype: boolean
1035
  @return: C{True} if all of them exist, C{False} otherwise
1036

1037
  """
1038
  missing = []
1039
  for bridge in bridges_list:
1040
    if not utils.BridgeExists(bridge):
1041
      missing.append(bridge)
1042

    
1043
  if missing:
1044
    _Fail("Missing bridges %s", utils.CommaJoin(missing))
1045

    
1046

    
1047
def GetInstanceList(hypervisor_list):
1048
  """Provides a list of instances.
1049

1050
  @type hypervisor_list: list
1051
  @param hypervisor_list: the list of hypervisors to query information
1052

1053
  @rtype: list
1054
  @return: a list of all running instances on the current node
1055
    - instance1.example.com
1056
    - instance2.example.com
1057

1058
  """
1059
  results = []
1060
  for hname in hypervisor_list:
1061
    try:
1062
      names = hypervisor.GetHypervisor(hname).ListInstances()
1063
      results.extend(names)
1064
    except errors.HypervisorError, err:
1065
      _Fail("Error enumerating instances (hypervisor %s): %s",
1066
            hname, err, exc=True)
1067

    
1068
  return results
1069

    
1070

    
1071
def GetInstanceInfo(instance, hname):
1072
  """Gives back the information about an instance as a dictionary.
1073

1074
  @type instance: string
1075
  @param instance: the instance name
1076
  @type hname: string
1077
  @param hname: the hypervisor type of the instance
1078

1079
  @rtype: dict
1080
  @return: dictionary with the following keys:
1081
      - memory: memory size of instance (int)
1082
      - state: xen state of instance (string)
1083
      - time: cpu time of instance (float)
1084
      - vcpus: the number of vcpus (int)
1085

1086
  """
1087
  output = {}
1088

    
1089
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
1090
  if iinfo is not None:
1091
    output["memory"] = iinfo[2]
1092
    output["vcpus"] = iinfo[3]
1093
    output["state"] = iinfo[4]
1094
    output["time"] = iinfo[5]
1095

    
1096
  return output
1097

    
1098

    
1099
def GetInstanceMigratable(instance):
1100
  """Gives whether an instance can be migrated.
1101

1102
  @type instance: L{objects.Instance}
1103
  @param instance: object representing the instance to be checked.
1104

1105
  @rtype: tuple
1106
  @return: tuple of (result, description) where:
1107
      - result: whether the instance can be migrated or not
1108
      - description: a description of the issue, if relevant
1109

1110
  """
1111
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1112
  iname = instance.name
1113
  if iname not in hyper.ListInstances():
1114
    _Fail("Instance %s is not running", iname)
1115

    
1116
  for idx in range(len(instance.disks)):
1117
    link_name = _GetBlockDevSymlinkPath(iname, idx)
1118
    if not os.path.islink(link_name):
1119
      logging.warning("Instance %s is missing symlink %s for disk %d",
1120
                      iname, link_name, idx)
1121

    
1122

    
1123
def GetAllInstancesInfo(hypervisor_list):
1124
  """Gather data about all instances.
1125

1126
  This is the equivalent of L{GetInstanceInfo}, except that it
1127
  computes data for all instances at once, thus being faster if one
1128
  needs data about more than one instance.
1129

1130
  @type hypervisor_list: list
1131
  @param hypervisor_list: list of hypervisors to query for instance data
1132

1133
  @rtype: dict
1134
  @return: dictionary of instance: data, with data having the following keys:
1135
      - memory: memory size of instance (int)
1136
      - state: xen state of instance (string)
1137
      - time: cpu time of instance (float)
1138
      - vcpus: the number of vcpus
1139

1140
  """
1141
  output = {}
1142

    
1143
  for hname in hypervisor_list:
1144
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
1145
    if iinfo:
1146
      for name, _, memory, vcpus, state, times in iinfo:
1147
        value = {
1148
          "memory": memory,
1149
          "vcpus": vcpus,
1150
          "state": state,
1151
          "time": times,
1152
          }
1153
        if name in output:
1154
          # we only check static parameters, like memory and vcpus,
1155
          # and not state and time which can change between the
1156
          # invocations of the different hypervisors
1157
          for key in "memory", "vcpus":
1158
            if value[key] != output[name][key]:
1159
              _Fail("Instance %s is running twice"
1160
                    " with different parameters", name)
1161
        output[name] = value
1162

    
1163
  return output
1164

    
1165

    
1166
def _InstanceLogName(kind, os_name, instance, component):
1167
  """Compute the OS log filename for a given instance and operation.
1168

1169
  The instance name and os name are passed in as strings since not all
1170
  operations have these as part of an instance object.
1171

1172
  @type kind: string
1173
  @param kind: the operation type (e.g. add, import, etc.)
1174
  @type os_name: string
1175
  @param os_name: the os name
1176
  @type instance: string
1177
  @param instance: the name of the instance being imported/added/etc.
1178
  @type component: string or None
1179
  @param component: the name of the component of the instance being
1180
      transferred
1181

1182
  """
1183
  # TODO: Use tempfile.mkstemp to create unique filename
1184
  if component:
1185
    assert "/" not in component
1186
    c_msg = "-%s" % component
1187
  else:
1188
    c_msg = ""
1189
  base = ("%s-%s-%s%s-%s.log" %
1190
          (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
1191
  return utils.PathJoin(pathutils.LOG_OS_DIR, base)
1192

    
1193

    
1194
def InstanceOsAdd(instance, reinstall, debug):
1195
  """Add an OS to an instance.
1196

1197
  @type instance: L{objects.Instance}
1198
  @param instance: Instance whose OS is to be installed
1199
  @type reinstall: boolean
1200
  @param reinstall: whether this is an instance reinstall
1201
  @type debug: integer
1202
  @param debug: debug level, passed to the OS scripts
1203
  @rtype: None
1204

1205
  """
1206
  inst_os = OSFromDisk(instance.os)
1207

    
1208
  create_env = OSEnvironment(instance, inst_os, debug)
1209
  if reinstall:
1210
    create_env["INSTANCE_REINSTALL"] = "1"
1211

    
1212
  logfile = _InstanceLogName("add", instance.os, instance.name, None)
1213

    
1214
  result = utils.RunCmd([inst_os.create_script], env=create_env,
1215
                        cwd=inst_os.path, output=logfile, reset_env=True)
1216
  if result.failed:
1217
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
1218
                  " output: %s", result.cmd, result.fail_reason, logfile,
1219
                  result.output)
1220
    lines = [utils.SafeEncode(val)
1221
             for val in utils.TailFile(logfile, lines=20)]
1222
    _Fail("OS create script failed (%s), last lines in the"
1223
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1224

    
1225

    
1226
def RunRenameInstance(instance, old_name, debug):
1227
  """Run the OS rename script for an instance.
1228

1229
  @type instance: L{objects.Instance}
1230
  @param instance: Instance whose OS is to be installed
1231
  @type old_name: string
1232
  @param old_name: previous instance name
1233
  @type debug: integer
1234
  @param debug: debug level, passed to the OS scripts
1235
  @rtype: boolean
1236
  @return: the success of the operation
1237

1238
  """
1239
  inst_os = OSFromDisk(instance.os)
1240

    
1241
  rename_env = OSEnvironment(instance, inst_os, debug)
1242
  rename_env["OLD_INSTANCE_NAME"] = old_name
1243

    
1244
  logfile = _InstanceLogName("rename", instance.os,
1245
                             "%s-%s" % (old_name, instance.name), None)
1246

    
1247
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
1248
                        cwd=inst_os.path, output=logfile, reset_env=True)
1249

    
1250
  if result.failed:
1251
    logging.error("os create command '%s' returned error: %s output: %s",
1252
                  result.cmd, result.fail_reason, result.output)
1253
    lines = [utils.SafeEncode(val)
1254
             for val in utils.TailFile(logfile, lines=20)]
1255
    _Fail("OS rename script failed (%s), last lines in the"
1256
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1257

    
1258

    
1259
def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
1260
  """Returns symlink path for block device.
1261

1262
  """
1263
  if _dir is None:
1264
    _dir = pathutils.DISK_LINKS_DIR
1265

    
1266
  return utils.PathJoin(_dir,
1267
                        ("%s%s%s" %
1268
                         (instance_name, constants.DISK_SEPARATOR, idx)))
1269

    
1270

    
1271
def _SymlinkBlockDev(instance_name, device_path, idx):
1272
  """Set up symlinks to a instance's block device.
1273

1274
  This is an auxiliary function run when an instance is start (on the primary
1275
  node) or when an instance is migrated (on the target node).
1276

1277

1278
  @param instance_name: the name of the target instance
1279
  @param device_path: path of the physical block device, on the node
1280
  @param idx: the disk index
1281
  @return: absolute path to the disk's symlink
1282

1283
  """
1284
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1285
  try:
1286
    os.symlink(device_path, link_name)
1287
  except OSError, err:
1288
    if err.errno == errno.EEXIST:
1289
      if (not os.path.islink(link_name) or
1290
          os.readlink(link_name) != device_path):
1291
        os.remove(link_name)
1292
        os.symlink(device_path, link_name)
1293
    else:
1294
      raise
1295

    
1296
  return link_name
1297

    
1298

    
1299
def _RemoveBlockDevLinks(instance_name, disks):
1300
  """Remove the block device symlinks belonging to the given instance.
1301

1302
  """
1303
  for idx, _ in enumerate(disks):
1304
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1305
    if os.path.islink(link_name):
1306
      try:
1307
        os.remove(link_name)
1308
      except OSError:
1309
        logging.exception("Can't remove symlink '%s'", link_name)
1310

    
1311

    
1312
def _GatherAndLinkBlockDevs(instance):
1313
  """Set up an instance's block device(s).
1314

1315
  This is run on the primary node at instance startup. The block
1316
  devices must be already assembled.
1317

1318
  @type instance: L{objects.Instance}
1319
  @param instance: the instance whose disks we shoul assemble
1320
  @rtype: list
1321
  @return: list of (disk_object, device_path)
1322

1323
  """
1324
  block_devices = []
1325
  for idx, disk in enumerate(instance.disks):
1326
    device = _RecursiveFindBD(disk)
1327
    if device is None:
1328
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
1329
                                    str(disk))
1330
    device.Open()
1331
    try:
1332
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1333
    except OSError, e:
1334
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1335
                                    e.strerror)
1336

    
1337
    block_devices.append((disk, link_name))
1338

    
1339
  return block_devices
1340

    
1341

    
1342
def StartInstance(instance, startup_paused, reason, store_reason=True):
1343
  """Start an instance.
1344

1345
  @type instance: L{objects.Instance}
1346
  @param instance: the instance object
1347
  @type startup_paused: bool
1348
  @param instance: pause instance at startup?
1349
  @type reason: list of reasons
1350
  @param reason: the reason trail for this startup
1351
  @type store_reason: boolean
1352
  @param store_reason: whether to store the shutdown reason trail on file
1353
  @rtype: None
1354

1355
  """
1356
  running_instances = GetInstanceList([instance.hypervisor])
1357

    
1358
  if instance.name in running_instances:
1359
    logging.info("Instance %s already running, not starting", instance.name)
1360
    return
1361

    
1362
  try:
1363
    block_devices = _GatherAndLinkBlockDevs(instance)
1364
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
1365
    hyper.StartInstance(instance, block_devices, startup_paused)
1366
    if store_reason:
1367
      _StoreInstReasonTrail(instance.name, reason)
1368
  except errors.BlockDeviceError, err:
1369
    _Fail("Block device error: %s", err, exc=True)
1370
  except errors.HypervisorError, err:
1371
    _RemoveBlockDevLinks(instance.name, instance.disks)
1372
    _Fail("Hypervisor error: %s", err, exc=True)
1373

    
1374

    
1375
def InstanceShutdown(instance, timeout, reason, store_reason=True):
1376
  """Shut an instance down.
1377

1378
  @note: this functions uses polling with a hardcoded timeout.
1379

1380
  @type instance: L{objects.Instance}
1381
  @param instance: the instance object
1382
  @type timeout: integer
1383
  @param timeout: maximum timeout for soft shutdown
1384
  @type reason: list of reasons
1385
  @param reason: the reason trail for this shutdown
1386
  @type store_reason: boolean
1387
  @param store_reason: whether to store the shutdown reason trail on file
1388
  @rtype: None
1389

1390
  """
1391
  hv_name = instance.hypervisor
1392
  hyper = hypervisor.GetHypervisor(hv_name)
1393
  iname = instance.name
1394

    
1395
  if instance.name not in hyper.ListInstances():
1396
    logging.info("Instance %s not running, doing nothing", iname)
1397
    return
1398

    
1399
  class _TryShutdown:
1400
    def __init__(self):
1401
      self.tried_once = False
1402

    
1403
    def __call__(self):
1404
      if iname not in hyper.ListInstances():
1405
        return
1406

    
1407
      try:
1408
        hyper.StopInstance(instance, retry=self.tried_once)
1409
        if store_reason:
1410
          _StoreInstReasonTrail(instance.name, reason)
1411
      except errors.HypervisorError, err:
1412
        if iname not in hyper.ListInstances():
1413
          # if the instance is no longer existing, consider this a
1414
          # success and go to cleanup
1415
          return
1416

    
1417
        _Fail("Failed to stop instance %s: %s", iname, err)
1418

    
1419
      self.tried_once = True
1420

    
1421
      raise utils.RetryAgain()
1422

    
1423
  try:
1424
    utils.Retry(_TryShutdown(), 5, timeout)
1425
  except utils.RetryTimeout:
1426
    # the shutdown did not succeed
1427
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1428

    
1429
    try:
1430
      hyper.StopInstance(instance, force=True)
1431
    except errors.HypervisorError, err:
1432
      if iname in hyper.ListInstances():
1433
        # only raise an error if the instance still exists, otherwise
1434
        # the error could simply be "instance ... unknown"!
1435
        _Fail("Failed to force stop instance %s: %s", iname, err)
1436

    
1437
    time.sleep(1)
1438

    
1439
    if iname in hyper.ListInstances():
1440
      _Fail("Could not shutdown instance %s even by destroy", iname)
1441

    
1442
  try:
1443
    hyper.CleanupInstance(instance.name)
1444
  except errors.HypervisorError, err:
1445
    logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1446

    
1447
  _RemoveBlockDevLinks(iname, instance.disks)
1448

    
1449

    
1450
def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
1451
  """Reboot an instance.
1452

1453
  @type instance: L{objects.Instance}
1454
  @param instance: the instance object to reboot
1455
  @type reboot_type: str
1456
  @param reboot_type: the type of reboot, one the following
1457
    constants:
1458
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1459
        instance OS, do not recreate the VM
1460
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1461
        restart the VM (at the hypervisor level)
1462
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1463
        not accepted here, since that mode is handled differently, in
1464
        cmdlib, and translates into full stop and start of the
1465
        instance (instead of a call_instance_reboot RPC)
1466
  @type shutdown_timeout: integer
1467
  @param shutdown_timeout: maximum timeout for soft shutdown
1468
  @type reason: list of reasons
1469
  @param reason: the reason trail for this reboot
1470
  @rtype: None
1471

1472
  """
1473
  running_instances = GetInstanceList([instance.hypervisor])
1474

    
1475
  if instance.name not in running_instances:
1476
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1477

    
1478
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1479
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1480
    try:
1481
      hyper.RebootInstance(instance)
1482
    except errors.HypervisorError, err:
1483
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1484
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1485
    try:
1486
      InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
1487
      result = StartInstance(instance, False, reason, store_reason=False)
1488
      _StoreInstReasonTrail(instance.name, reason)
1489
      return result
1490
    except errors.HypervisorError, err:
1491
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1492
  else:
1493
    _Fail("Invalid reboot_type received: %s", reboot_type)
1494

    
1495

    
1496
def InstanceBalloonMemory(instance, memory):
1497
  """Resize an instance's memory.
1498

1499
  @type instance: L{objects.Instance}
1500
  @param instance: the instance object
1501
  @type memory: int
1502
  @param memory: new memory amount in MB
1503
  @rtype: None
1504

1505
  """
1506
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1507
  running = hyper.ListInstances()
1508
  if instance.name not in running:
1509
    logging.info("Instance %s is not running, cannot balloon", instance.name)
1510
    return
1511
  try:
1512
    hyper.BalloonInstanceMemory(instance, memory)
1513
  except errors.HypervisorError, err:
1514
    _Fail("Failed to balloon instance memory: %s", err, exc=True)
1515

    
1516

    
1517
def MigrationInfo(instance):
1518
  """Gather information about an instance to be migrated.
1519

1520
  @type instance: L{objects.Instance}
1521
  @param instance: the instance definition
1522

1523
  """
1524
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1525
  try:
1526
    info = hyper.MigrationInfo(instance)
1527
  except errors.HypervisorError, err:
1528
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1529
  return info
1530

    
1531

    
1532
def AcceptInstance(instance, info, target):
1533
  """Prepare the node to accept an instance.
1534

1535
  @type instance: L{objects.Instance}
1536
  @param instance: the instance definition
1537
  @type info: string/data (opaque)
1538
  @param info: migration information, from the source node
1539
  @type target: string
1540
  @param target: target host (usually ip), on this node
1541

1542
  """
1543
  # TODO: why is this required only for DTS_EXT_MIRROR?
1544
  if instance.disk_template in constants.DTS_EXT_MIRROR:
1545
    # Create the symlinks, as the disks are not active
1546
    # in any way
1547
    try:
1548
      _GatherAndLinkBlockDevs(instance)
1549
    except errors.BlockDeviceError, err:
1550
      _Fail("Block device error: %s", err, exc=True)
1551

    
1552
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1553
  try:
1554
    hyper.AcceptInstance(instance, info, target)
1555
  except errors.HypervisorError, err:
1556
    if instance.disk_template in constants.DTS_EXT_MIRROR:
1557
      _RemoveBlockDevLinks(instance.name, instance.disks)
1558
    _Fail("Failed to accept instance: %s", err, exc=True)
1559

    
1560

    
1561
def FinalizeMigrationDst(instance, info, success):
1562
  """Finalize any preparation to accept an instance.
1563

1564
  @type instance: L{objects.Instance}
1565
  @param instance: the instance definition
1566
  @type info: string/data (opaque)
1567
  @param info: migration information, from the source node
1568
  @type success: boolean
1569
  @param success: whether the migration was a success or a failure
1570

1571
  """
1572
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1573
  try:
1574
    hyper.FinalizeMigrationDst(instance, info, success)
1575
  except errors.HypervisorError, err:
1576
    _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
1577

    
1578

    
1579
def MigrateInstance(instance, target, live):
1580
  """Migrates an instance to another node.
1581

1582
  @type instance: L{objects.Instance}
1583
  @param instance: the instance definition
1584
  @type target: string
1585
  @param target: the target node name
1586
  @type live: boolean
1587
  @param live: whether the migration should be done live or not (the
1588
      interpretation of this parameter is left to the hypervisor)
1589
  @raise RPCFail: if migration fails for some reason
1590

1591
  """
1592
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1593

    
1594
  try:
1595
    hyper.MigrateInstance(instance, target, live)
1596
  except errors.HypervisorError, err:
1597
    _Fail("Failed to migrate instance: %s", err, exc=True)
1598

    
1599

    
1600
def FinalizeMigrationSource(instance, success, live):
1601
  """Finalize the instance migration on the source node.
1602

1603
  @type instance: L{objects.Instance}
1604
  @param instance: the instance definition of the migrated instance
1605
  @type success: bool
1606
  @param success: whether the migration succeeded or not
1607
  @type live: bool
1608
  @param live: whether the user requested a live migration or not
1609
  @raise RPCFail: If the execution fails for some reason
1610

1611
  """
1612
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1613

    
1614
  try:
1615
    hyper.FinalizeMigrationSource(instance, success, live)
1616
  except Exception, err:  # pylint: disable=W0703
1617
    _Fail("Failed to finalize the migration on the source node: %s", err,
1618
          exc=True)
1619

    
1620

    
1621
def GetMigrationStatus(instance):
1622
  """Get the migration status
1623

1624
  @type instance: L{objects.Instance}
1625
  @param instance: the instance that is being migrated
1626
  @rtype: L{objects.MigrationStatus}
1627
  @return: the status of the current migration (one of
1628
           L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
1629
           progress info that can be retrieved from the hypervisor
1630
  @raise RPCFail: If the migration status cannot be retrieved
1631

1632
  """
1633
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1634
  try:
1635
    return hyper.GetMigrationStatus(instance)
1636
  except Exception, err:  # pylint: disable=W0703
1637
    _Fail("Failed to get migration status: %s", err, exc=True)
1638

    
1639

    
1640
def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
1641
  """Creates a block device for an instance.
1642

1643
  @type disk: L{objects.Disk}
1644
  @param disk: the object describing the disk we should create
1645
  @type size: int
1646
  @param size: the size of the physical underlying device, in MiB
1647
  @type owner: str
1648
  @param owner: the name of the instance for which disk is created,
1649
      used for device cache data
1650
  @type on_primary: boolean
1651
  @param on_primary:  indicates if it is the primary node or not
1652
  @type info: string
1653
  @param info: string that will be sent to the physical device
1654
      creation, used for example to set (LVM) tags on LVs
1655
  @type excl_stor: boolean
1656
  @param excl_stor: Whether exclusive_storage is active
1657

1658
  @return: the new unique_id of the device (this can sometime be
1659
      computed only after creation), or None. On secondary nodes,
1660
      it's not required to return anything.
1661

1662
  """
1663
  # TODO: remove the obsolete "size" argument
1664
  # pylint: disable=W0613
1665
  clist = []
1666
  if disk.children:
1667
    for child in disk.children:
1668
      try:
1669
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1670
      except errors.BlockDeviceError, err:
1671
        _Fail("Can't assemble device %s: %s", child, err)
1672
      if on_primary or disk.AssembleOnSecondary():
1673
        # we need the children open in case the device itself has to
1674
        # be assembled
1675
        try:
1676
          # pylint: disable=E1103
1677
          crdev.Open()
1678
        except errors.BlockDeviceError, err:
1679
          _Fail("Can't make child '%s' read-write: %s", child, err)
1680
      clist.append(crdev)
1681

    
1682
  try:
1683
    device = bdev.Create(disk, clist, excl_stor)
1684
  except errors.BlockDeviceError, err:
1685
    _Fail("Can't create block device: %s", err)
1686

    
1687
  if on_primary or disk.AssembleOnSecondary():
1688
    try:
1689
      device.Assemble()
1690
    except errors.BlockDeviceError, err:
1691
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1692
    if on_primary or disk.OpenOnSecondary():
1693
      try:
1694
        device.Open(force=True)
1695
      except errors.BlockDeviceError, err:
1696
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1697
    DevCacheManager.UpdateCache(device.dev_path, owner,
1698
                                on_primary, disk.iv_name)
1699

    
1700
  device.SetInfo(info)
1701

    
1702
  return device.unique_id
1703

    
1704

    
1705
def _WipeDevice(path, offset, size):
1706
  """This function actually wipes the device.
1707

1708
  @param path: The path to the device to wipe
1709
  @param offset: The offset in MiB in the file
1710
  @param size: The size in MiB to write
1711

1712
  """
1713
  # Internal sizes are always in Mebibytes; if the following "dd" command
1714
  # should use a different block size the offset and size given to this
1715
  # function must be adjusted accordingly before being passed to "dd".
1716
  block_size = 1024 * 1024
1717

    
1718
  cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1719
         "bs=%s" % block_size, "oflag=direct", "of=%s" % path,
1720
         "count=%d" % size]
1721
  result = utils.RunCmd(cmd)
1722

    
1723
  if result.failed:
1724
    _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1725
          result.fail_reason, result.output)
1726

    
1727

    
1728
def BlockdevWipe(disk, offset, size):
1729
  """Wipes a block device.
1730

1731
  @type disk: L{objects.Disk}
1732
  @param disk: the disk object we want to wipe
1733
  @type offset: int
1734
  @param offset: The offset in MiB in the file
1735
  @type size: int
1736
  @param size: The size in MiB to write
1737

1738
  """
1739
  try:
1740
    rdev = _RecursiveFindBD(disk)
1741
  except errors.BlockDeviceError:
1742
    rdev = None
1743

    
1744
  if not rdev:
1745
    _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1746

    
1747
  # Do cross verify some of the parameters
1748
  if offset < 0:
1749
    _Fail("Negative offset")
1750
  if size < 0:
1751
    _Fail("Negative size")
1752
  if offset > rdev.size:
1753
    _Fail("Offset is bigger than device size")
1754
  if (offset + size) > rdev.size:
1755
    _Fail("The provided offset and size to wipe is bigger than device size")
1756

    
1757
  _WipeDevice(rdev.dev_path, offset, size)
1758

    
1759

    
1760
def BlockdevPauseResumeSync(disks, pause):
1761
  """Pause or resume the sync of the block device.
1762

1763
  @type disks: list of L{objects.Disk}
1764
  @param disks: the disks object we want to pause/resume
1765
  @type pause: bool
1766
  @param pause: Wheater to pause or resume
1767

1768
  """
1769
  success = []
1770
  for disk in disks:
1771
    try:
1772
      rdev = _RecursiveFindBD(disk)
1773
    except errors.BlockDeviceError:
1774
      rdev = None
1775

    
1776
    if not rdev:
1777
      success.append((False, ("Cannot change sync for device %s:"
1778
                              " device not found" % disk.iv_name)))
1779
      continue
1780

    
1781
    result = rdev.PauseResumeSync(pause)
1782

    
1783
    if result:
1784
      success.append((result, None))
1785
    else:
1786
      if pause:
1787
        msg = "Pause"
1788
      else:
1789
        msg = "Resume"
1790
      success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1791

    
1792
  return success
1793

    
1794

    
1795
def BlockdevRemove(disk):
1796
  """Remove a block device.
1797

1798
  @note: This is intended to be called recursively.
1799

1800
  @type disk: L{objects.Disk}
1801
  @param disk: the disk object we should remove
1802
  @rtype: boolean
1803
  @return: the success of the operation
1804

1805
  """
1806
  msgs = []
1807
  try:
1808
    rdev = _RecursiveFindBD(disk)
1809
  except errors.BlockDeviceError, err:
1810
    # probably can't attach
1811
    logging.info("Can't attach to device %s in remove", disk)
1812
    rdev = None
1813
  if rdev is not None:
1814
    r_path = rdev.dev_path
1815
    try:
1816
      rdev.Remove()
1817
    except errors.BlockDeviceError, err:
1818
      msgs.append(str(err))
1819
    if not msgs:
1820
      DevCacheManager.RemoveCache(r_path)
1821

    
1822
  if disk.children:
1823
    for child in disk.children:
1824
      try:
1825
        BlockdevRemove(child)
1826
      except RPCFail, err:
1827
        msgs.append(str(err))
1828

    
1829
  if msgs:
1830
    _Fail("; ".join(msgs))
1831

    
1832

    
1833
def _RecursiveAssembleBD(disk, owner, as_primary):
1834
  """Activate a block device for an instance.
1835

1836
  This is run on the primary and secondary nodes for an instance.
1837

1838
  @note: this function is called recursively.
1839

1840
  @type disk: L{objects.Disk}
1841
  @param disk: the disk we try to assemble
1842
  @type owner: str
1843
  @param owner: the name of the instance which owns the disk
1844
  @type as_primary: boolean
1845
  @param as_primary: if we should make the block device
1846
      read/write
1847

1848
  @return: the assembled device or None (in case no device
1849
      was assembled)
1850
  @raise errors.BlockDeviceError: in case there is an error
1851
      during the activation of the children or the device
1852
      itself
1853

1854
  """
1855
  children = []
1856
  if disk.children:
1857
    mcn = disk.ChildrenNeeded()
1858
    if mcn == -1:
1859
      mcn = 0 # max number of Nones allowed
1860
    else:
1861
      mcn = len(disk.children) - mcn # max number of Nones
1862
    for chld_disk in disk.children:
1863
      try:
1864
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1865
      except errors.BlockDeviceError, err:
1866
        if children.count(None) >= mcn:
1867
          raise
1868
        cdev = None
1869
        logging.error("Error in child activation (but continuing): %s",
1870
                      str(err))
1871
      children.append(cdev)
1872

    
1873
  if as_primary or disk.AssembleOnSecondary():
1874
    r_dev = bdev.Assemble(disk, children)
1875
    result = r_dev
1876
    if as_primary or disk.OpenOnSecondary():
1877
      r_dev.Open()
1878
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1879
                                as_primary, disk.iv_name)
1880

    
1881
  else:
1882
    result = True
1883
  return result
1884

    
1885

    
1886
def BlockdevAssemble(disk, owner, as_primary, idx):
1887
  """Activate a block device for an instance.
1888

1889
  This is a wrapper over _RecursiveAssembleBD.
1890

1891
  @rtype: str or boolean
1892
  @return: a C{/dev/...} path for primary nodes, and
1893
      C{True} for secondary nodes
1894

1895
  """
1896
  try:
1897
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1898
    if isinstance(result, BlockDev):
1899
      # pylint: disable=E1103
1900
      result = result.dev_path
1901
      if as_primary:
1902
        _SymlinkBlockDev(owner, result, idx)
1903
  except errors.BlockDeviceError, err:
1904
    _Fail("Error while assembling disk: %s", err, exc=True)
1905
  except OSError, err:
1906
    _Fail("Error while symlinking disk: %s", err, exc=True)
1907

    
1908
  return result
1909

    
1910

    
1911
def BlockdevShutdown(disk):
1912
  """Shut down a block device.
1913

1914
  First, if the device is assembled (Attach() is successful), then
1915
  the device is shutdown. Then the children of the device are
1916
  shutdown.
1917

1918
  This function is called recursively. Note that we don't cache the
1919
  children or such, as oppossed to assemble, shutdown of different
1920
  devices doesn't require that the upper device was active.
1921

1922
  @type disk: L{objects.Disk}
1923
  @param disk: the description of the disk we should
1924
      shutdown
1925
  @rtype: None
1926

1927
  """
1928
  msgs = []
1929
  r_dev = _RecursiveFindBD(disk)
1930
  if r_dev is not None:
1931
    r_path = r_dev.dev_path
1932
    try:
1933
      r_dev.Shutdown()
1934
      DevCacheManager.RemoveCache(r_path)
1935
    except errors.BlockDeviceError, err:
1936
      msgs.append(str(err))
1937

    
1938
  if disk.children:
1939
    for child in disk.children:
1940
      try:
1941
        BlockdevShutdown(child)
1942
      except RPCFail, err:
1943
        msgs.append(str(err))
1944

    
1945
  if msgs:
1946
    _Fail("; ".join(msgs))
1947

    
1948

    
1949
def BlockdevAddchildren(parent_cdev, new_cdevs):
1950
  """Extend a mirrored block device.
1951

1952
  @type parent_cdev: L{objects.Disk}
1953
  @param parent_cdev: the disk to which we should add children
1954
  @type new_cdevs: list of L{objects.Disk}
1955
  @param new_cdevs: the list of children which we should add
1956
  @rtype: None
1957

1958
  """
1959
  parent_bdev = _RecursiveFindBD(parent_cdev)
1960
  if parent_bdev is None:
1961
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1962
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1963
  if new_bdevs.count(None) > 0:
1964
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1965
  parent_bdev.AddChildren(new_bdevs)
1966

    
1967

    
1968
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1969
  """Shrink a mirrored block device.
1970

1971
  @type parent_cdev: L{objects.Disk}
1972
  @param parent_cdev: the disk from which we should remove children
1973
  @type new_cdevs: list of L{objects.Disk}
1974
  @param new_cdevs: the list of children which we should remove
1975
  @rtype: None
1976

1977
  """
1978
  parent_bdev = _RecursiveFindBD(parent_cdev)
1979
  if parent_bdev is None:
1980
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1981
  devs = []
1982
  for disk in new_cdevs:
1983
    rpath = disk.StaticDevPath()
1984
    if rpath is None:
1985
      bd = _RecursiveFindBD(disk)
1986
      if bd is None:
1987
        _Fail("Can't find device %s while removing children", disk)
1988
      else:
1989
        devs.append(bd.dev_path)
1990
    else:
1991
      if not utils.IsNormAbsPath(rpath):
1992
        _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1993
      devs.append(rpath)
1994
  parent_bdev.RemoveChildren(devs)
1995

    
1996

    
1997
def BlockdevGetmirrorstatus(disks):
1998
  """Get the mirroring status of a list of devices.
1999

2000
  @type disks: list of L{objects.Disk}
2001
  @param disks: the list of disks which we should query
2002
  @rtype: disk
2003
  @return: List of L{objects.BlockDevStatus}, one for each disk
2004
  @raise errors.BlockDeviceError: if any of the disks cannot be
2005
      found
2006

2007
  """
2008
  stats = []
2009
  for dsk in disks:
2010
    rbd = _RecursiveFindBD(dsk)
2011
    if rbd is None:
2012
      _Fail("Can't find device %s", dsk)
2013

    
2014
    stats.append(rbd.CombinedSyncStatus())
2015

    
2016
  return stats
2017

    
2018

    
2019
def BlockdevGetmirrorstatusMulti(disks):
2020
  """Get the mirroring status of a list of devices.
2021

2022
  @type disks: list of L{objects.Disk}
2023
  @param disks: the list of disks which we should query
2024
  @rtype: disk
2025
  @return: List of tuples, (bool, status), one for each disk; bool denotes
2026
    success/failure, status is L{objects.BlockDevStatus} on success, string
2027
    otherwise
2028

2029
  """
2030
  result = []
2031
  for disk in disks:
2032
    try:
2033
      rbd = _RecursiveFindBD(disk)
2034
      if rbd is None:
2035
        result.append((False, "Can't find device %s" % disk))
2036
        continue
2037

    
2038
      status = rbd.CombinedSyncStatus()
2039
    except errors.BlockDeviceError, err:
2040
      logging.exception("Error while getting disk status")
2041
      result.append((False, str(err)))
2042
    else:
2043
      result.append((True, status))
2044

    
2045
  assert len(disks) == len(result)
2046

    
2047
  return result
2048

    
2049

    
2050
def _RecursiveFindBD(disk):
2051
  """Check if a device is activated.
2052

2053
  If so, return information about the real device.
2054

2055
  @type disk: L{objects.Disk}
2056
  @param disk: the disk object we need to find
2057

2058
  @return: None if the device can't be found,
2059
      otherwise the device instance
2060

2061
  """
2062
  children = []
2063
  if disk.children:
2064
    for chdisk in disk.children:
2065
      children.append(_RecursiveFindBD(chdisk))
2066

    
2067
  return bdev.FindDevice(disk, children)
2068

    
2069

    
2070
def _OpenRealBD(disk):
2071
  """Opens the underlying block device of a disk.
2072

2073
  @type disk: L{objects.Disk}
2074
  @param disk: the disk object we want to open
2075

2076
  """
2077
  real_disk = _RecursiveFindBD(disk)
2078
  if real_disk is None:
2079
    _Fail("Block device '%s' is not set up", disk)
2080

    
2081
  real_disk.Open()
2082

    
2083
  return real_disk
2084

    
2085

    
2086
def BlockdevFind(disk):
2087
  """Check if a device is activated.
2088

2089
  If it is, return information about the real device.
2090

2091
  @type disk: L{objects.Disk}
2092
  @param disk: the disk to find
2093
  @rtype: None or objects.BlockDevStatus
2094
  @return: None if the disk cannot be found, otherwise a the current
2095
           information
2096

2097
  """
2098
  try:
2099
    rbd = _RecursiveFindBD(disk)
2100
  except errors.BlockDeviceError, err:
2101
    _Fail("Failed to find device: %s", err, exc=True)
2102

    
2103
  if rbd is None:
2104
    return None
2105

    
2106
  return rbd.GetSyncStatus()
2107

    
2108

    
2109
def BlockdevGetsize(disks):
2110
  """Computes the size of the given disks.
2111

2112
  If a disk is not found, returns None instead.
2113

2114
  @type disks: list of L{objects.Disk}
2115
  @param disks: the list of disk to compute the size for
2116
  @rtype: list
2117
  @return: list with elements None if the disk cannot be found,
2118
      otherwise the size
2119

2120
  """
2121
  result = []
2122
  for cf in disks:
2123
    try:
2124
      rbd = _RecursiveFindBD(cf)
2125
    except errors.BlockDeviceError:
2126
      result.append(None)
2127
      continue
2128
    if rbd is None:
2129
      result.append(None)
2130
    else:
2131
      result.append(rbd.GetActualSize())
2132
  return result
2133

    
2134

    
2135
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
2136
  """Export a block device to a remote node.
2137

2138
  @type disk: L{objects.Disk}
2139
  @param disk: the description of the disk to export
2140
  @type dest_node: str
2141
  @param dest_node: the destination node to export to
2142
  @type dest_path: str
2143
  @param dest_path: the destination path on the target node
2144
  @type cluster_name: str
2145
  @param cluster_name: the cluster name, needed for SSH hostalias
2146
  @rtype: None
2147

2148
  """
2149
  real_disk = _OpenRealBD(disk)
2150

    
2151
  # the block size on the read dd is 1MiB to match our units
2152
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
2153
                               "dd if=%s bs=1048576 count=%s",
2154
                               real_disk.dev_path, str(disk.size))
2155

    
2156
  # we set here a smaller block size as, due to ssh buffering, more
2157
  # than 64-128k will mostly ignored; we use nocreat to fail if the
2158
  # device is not already there or we pass a wrong path; we use
2159
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
2160
  # to not buffer too much memory; this means that at best, we flush
2161
  # every 64k, which will not be very fast
2162
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
2163
                                " oflag=dsync", dest_path)
2164

    
2165
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2166
                                                   constants.SSH_LOGIN_USER,
2167
                                                   destcmd)
2168

    
2169
  # all commands have been checked, so we're safe to combine them
2170
  command = "|".join([expcmd, utils.ShellQuoteArgs(remotecmd)])
2171

    
2172
  result = utils.RunCmd(["bash", "-c", command])
2173

    
2174
  if result.failed:
2175
    _Fail("Disk copy command '%s' returned error: %s"
2176
          " output: %s", command, result.fail_reason, result.output)
2177

    
2178

    
2179
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
2180
  """Write a file to the filesystem.
2181

2182
  This allows the master to overwrite(!) a file. It will only perform
2183
  the operation if the file belongs to a list of configuration files.
2184

2185
  @type file_name: str
2186
  @param file_name: the target file name
2187
  @type data: str
2188
  @param data: the new contents of the file
2189
  @type mode: int
2190
  @param mode: the mode to give the file (can be None)
2191
  @type uid: string
2192
  @param uid: the owner of the file
2193
  @type gid: string
2194
  @param gid: the group of the file
2195
  @type atime: float
2196
  @param atime: the atime to set on the file (can be None)
2197
  @type mtime: float
2198
  @param mtime: the mtime to set on the file (can be None)
2199
  @rtype: None
2200

2201
  """
2202
  file_name = vcluster.LocalizeVirtualPath(file_name)
2203

    
2204
  if not os.path.isabs(file_name):
2205
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
2206

    
2207
  if file_name not in _ALLOWED_UPLOAD_FILES:
2208
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
2209
          file_name)
2210

    
2211
  raw_data = _Decompress(data)
2212

    
2213
  if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
2214
    _Fail("Invalid username/groupname type")
2215

    
2216
  getents = runtime.GetEnts()
2217
  uid = getents.LookupUser(uid)
2218
  gid = getents.LookupGroup(gid)
2219

    
2220
  utils.SafeWriteFile(file_name, None,
2221
                      data=raw_data, mode=mode, uid=uid, gid=gid,
2222
                      atime=atime, mtime=mtime)
2223

    
2224

    
2225
def RunOob(oob_program, command, node, timeout):
2226
  """Executes oob_program with given command on given node.
2227

2228
  @param oob_program: The path to the executable oob_program
2229
  @param command: The command to invoke on oob_program
2230
  @param node: The node given as an argument to the program
2231
  @param timeout: Timeout after which we kill the oob program
2232

2233
  @return: stdout
2234
  @raise RPCFail: If execution fails for some reason
2235

2236
  """
2237
  result = utils.RunCmd([oob_program, command, node], timeout=timeout)
2238

    
2239
  if result.failed:
2240
    _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
2241
          result.fail_reason, result.output)
2242

    
2243
  return result.stdout
2244

    
2245

    
2246
def _OSOndiskAPIVersion(os_dir):
2247
  """Compute and return the API version of a given OS.
2248

2249
  This function will try to read the API version of the OS residing in
2250
  the 'os_dir' directory.
2251

2252
  @type os_dir: str
2253
  @param os_dir: the directory in which we should look for the OS
2254
  @rtype: tuple
2255
  @return: tuple (status, data) with status denoting the validity and
2256
      data holding either the vaid versions or an error message
2257

2258
  """
2259
  api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
2260

    
2261
  try:
2262
    st = os.stat(api_file)
2263
  except EnvironmentError, err:
2264
    return False, ("Required file '%s' not found under path %s: %s" %
2265
                   (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
2266

    
2267
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2268
    return False, ("File '%s' in %s is not a regular file" %
2269
                   (constants.OS_API_FILE, os_dir))
2270

    
2271
  try:
2272
    api_versions = utils.ReadFile(api_file).splitlines()
2273
  except EnvironmentError, err:
2274
    return False, ("Error while reading the API version file at %s: %s" %
2275
                   (api_file, utils.ErrnoOrStr(err)))
2276

    
2277
  try:
2278
    api_versions = [int(version.strip()) for version in api_versions]
2279
  except (TypeError, ValueError), err:
2280
    return False, ("API version(s) can't be converted to integer: %s" %
2281
                   str(err))
2282

    
2283
  return True, api_versions
2284

    
2285

    
2286
def DiagnoseOS(top_dirs=None):
2287
  """Compute the validity for all OSes.
2288

2289
  @type top_dirs: list
2290
  @param top_dirs: the list of directories in which to
2291
      search (if not given defaults to
2292
      L{pathutils.OS_SEARCH_PATH})
2293
  @rtype: list of L{objects.OS}
2294
  @return: a list of tuples (name, path, status, diagnose, variants,
2295
      parameters, api_version) for all (potential) OSes under all
2296
      search paths, where:
2297
          - name is the (potential) OS name
2298
          - path is the full path to the OS
2299
          - status True/False is the validity of the OS
2300
          - diagnose is the error message for an invalid OS, otherwise empty
2301
          - variants is a list of supported OS variants, if any
2302
          - parameters is a list of (name, help) parameters, if any
2303
          - api_version is a list of support OS API versions
2304

2305
  """
2306
  if top_dirs is None:
2307
    top_dirs = pathutils.OS_SEARCH_PATH
2308

    
2309
  result = []
2310
  for dir_name in top_dirs:
2311
    if os.path.isdir(dir_name):
2312
      try:
2313
        f_names = utils.ListVisibleFiles(dir_name)
2314
      except EnvironmentError, err:
2315
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
2316
        break
2317
      for name in f_names:
2318
        os_path = utils.PathJoin(dir_name, name)
2319
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
2320
        if status:
2321
          diagnose = ""
2322
          variants = os_inst.supported_variants
2323
          parameters = os_inst.supported_parameters
2324
          api_versions = os_inst.api_versions
2325
        else:
2326
          diagnose = os_inst
2327
          variants = parameters = api_versions = []
2328
        result.append((name, os_path, status, diagnose, variants,
2329
                       parameters, api_versions))
2330

    
2331
  return result
2332

    
2333

    
2334
def _TryOSFromDisk(name, base_dir=None):
2335
  """Create an OS instance from disk.
2336

2337
  This function will return an OS instance if the given name is a
2338
  valid OS name.
2339

2340
  @type base_dir: string
2341
  @keyword base_dir: Base directory containing OS installations.
2342
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2343
  @rtype: tuple
2344
  @return: success and either the OS instance if we find a valid one,
2345
      or error message
2346

2347
  """
2348
  if base_dir is None:
2349
    os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
2350
  else:
2351
    os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2352

    
2353
  if os_dir is None:
2354
    return False, "Directory for OS %s not found in search path" % name
2355

    
2356
  status, api_versions = _OSOndiskAPIVersion(os_dir)
2357
  if not status:
2358
    # push the error up
2359
    return status, api_versions
2360

    
2361
  if not constants.OS_API_VERSIONS.intersection(api_versions):
2362
    return False, ("API version mismatch for path '%s': found %s, want %s." %
2363
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
2364

    
2365
  # OS Files dictionary, we will populate it with the absolute path
2366
  # names; if the value is True, then it is a required file, otherwise
2367
  # an optional one
2368
  os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
2369

    
2370
  if max(api_versions) >= constants.OS_API_V15:
2371
    os_files[constants.OS_VARIANTS_FILE] = False
2372

    
2373
  if max(api_versions) >= constants.OS_API_V20:
2374
    os_files[constants.OS_PARAMETERS_FILE] = True
2375
  else:
2376
    del os_files[constants.OS_SCRIPT_VERIFY]
2377

    
2378
  for (filename, required) in os_files.items():
2379
    os_files[filename] = utils.PathJoin(os_dir, filename)
2380

    
2381
    try:
2382
      st = os.stat(os_files[filename])
2383
    except EnvironmentError, err:
2384
      if err.errno == errno.ENOENT and not required:
2385
        del os_files[filename]
2386
        continue
2387
      return False, ("File '%s' under path '%s' is missing (%s)" %
2388
                     (filename, os_dir, utils.ErrnoOrStr(err)))
2389

    
2390
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2391
      return False, ("File '%s' under path '%s' is not a regular file" %
2392
                     (filename, os_dir))
2393

    
2394
    if filename in constants.OS_SCRIPTS:
2395
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2396
        return False, ("File '%s' under path '%s' is not executable" %
2397
                       (filename, os_dir))
2398

    
2399
  variants = []
2400
  if constants.OS_VARIANTS_FILE in os_files:
2401
    variants_file = os_files[constants.OS_VARIANTS_FILE]
2402
    try:
2403
      variants = \
2404
        utils.FilterEmptyLinesAndComments(utils.ReadFile(variants_file))
2405
    except EnvironmentError, err:
2406
      # we accept missing files, but not other errors
2407
      if err.errno != errno.ENOENT:
2408
        return False, ("Error while reading the OS variants file at %s: %s" %
2409
                       (variants_file, utils.ErrnoOrStr(err)))
2410

    
2411
  parameters = []
2412
  if constants.OS_PARAMETERS_FILE in os_files:
2413
    parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2414
    try:
2415
      parameters = utils.ReadFile(parameters_file).splitlines()
2416
    except EnvironmentError, err:
2417
      return False, ("Error while reading the OS parameters file at %s: %s" %
2418
                     (parameters_file, utils.ErrnoOrStr(err)))
2419
    parameters = [v.split(None, 1) for v in parameters]
2420

    
2421
  os_obj = objects.OS(name=name, path=os_dir,
2422
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
2423
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
2424
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
2425
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
2426
                      verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2427
                                                 None),
2428
                      supported_variants=variants,
2429
                      supported_parameters=parameters,
2430
                      api_versions=api_versions)
2431
  return True, os_obj
2432

    
2433

    
2434
def OSFromDisk(name, base_dir=None):
2435
  """Create an OS instance from disk.
2436

2437
  This function will return an OS instance if the given name is a
2438
  valid OS name. Otherwise, it will raise an appropriate
2439
  L{RPCFail} exception, detailing why this is not a valid OS.
2440

2441
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2442
  an exception but returns true/false status data.
2443

2444
  @type base_dir: string
2445
  @keyword base_dir: Base directory containing OS installations.
2446
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
2447
  @rtype: L{objects.OS}
2448
  @return: the OS instance if we find a valid one
2449
  @raise RPCFail: if we don't find a valid OS
2450

2451
  """
2452
  name_only = objects.OS.GetName(name)
2453
  status, payload = _TryOSFromDisk(name_only, base_dir)
2454

    
2455
  if not status:
2456
    _Fail(payload)
2457

    
2458
  return payload
2459

    
2460

    
2461
def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2462
  """Calculate the basic environment for an os script.
2463

2464
  @type os_name: str
2465
  @param os_name: full operating system name (including variant)
2466
  @type inst_os: L{objects.OS}
2467
  @param inst_os: operating system for which the environment is being built
2468
  @type os_params: dict
2469
  @param os_params: the OS parameters
2470
  @type debug: integer
2471
  @param debug: debug level (0 or 1, for OS Api 10)
2472
  @rtype: dict
2473
  @return: dict of environment variables
2474
  @raise errors.BlockDeviceError: if the block device
2475
      cannot be found
2476

2477
  """
2478
  result = {}
2479
  api_version = \
2480
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2481
  result["OS_API_VERSION"] = "%d" % api_version
2482
  result["OS_NAME"] = inst_os.name
2483
  result["DEBUG_LEVEL"] = "%d" % debug
2484

    
2485
  # OS variants
2486
  if api_version >= constants.OS_API_V15 and inst_os.supported_variants:
2487
    variant = objects.OS.GetVariant(os_name)
2488
    if not variant:
2489
      variant = inst_os.supported_variants[0]
2490
  else:
2491
    variant = ""
2492
  result["OS_VARIANT"] = variant
2493

    
2494
  # OS params
2495
  for pname, pvalue in os_params.items():
2496
    result["OSP_%s" % pname.upper()] = pvalue
2497

    
2498
  # Set a default path otherwise programs called by OS scripts (or
2499
  # even hooks called from OS scripts) might break, and we don't want
2500
  # to have each script require setting a PATH variable
2501
  result["PATH"] = constants.HOOKS_PATH
2502

    
2503
  return result
2504

    
2505

    
2506
def OSEnvironment(instance, inst_os, debug=0):
2507
  """Calculate the environment for an os script.
2508

2509
  @type instance: L{objects.Instance}
2510
  @param instance: target instance for the os script run
2511
  @type inst_os: L{objects.OS}
2512
  @param inst_os: operating system for which the environment is being built
2513
  @type debug: integer
2514
  @param debug: debug level (0 or 1, for OS Api 10)
2515
  @rtype: dict
2516
  @return: dict of environment variables
2517
  @raise errors.BlockDeviceError: if the block device
2518
      cannot be found
2519

2520
  """
2521
  result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2522

    
2523
  for attr in ["name", "os", "uuid", "ctime", "mtime", "primary_node"]:
2524
    result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2525

    
2526
  result["HYPERVISOR"] = instance.hypervisor
2527
  result["DISK_COUNT"] = "%d" % len(instance.disks)
2528
  result["NIC_COUNT"] = "%d" % len(instance.nics)
2529
  result["INSTANCE_SECONDARY_NODES"] = \
2530
      ("%s" % " ".join(instance.secondary_nodes))
2531

    
2532
  # Disks
2533
  for idx, disk in enumerate(instance.disks):
2534
    real_disk = _OpenRealBD(disk)
2535
    result["DISK_%d_PATH" % idx] = real_disk.dev_path
2536
    result["DISK_%d_ACCESS" % idx] = disk.mode
2537
    if constants.HV_DISK_TYPE in instance.hvparams:
2538
      result["DISK_%d_FRONTEND_TYPE" % idx] = \
2539
        instance.hvparams[constants.HV_DISK_TYPE]
2540
    if disk.dev_type in constants.LDS_BLOCK:
2541
      result["DISK_%d_BACKEND_TYPE" % idx] = "block"
2542
    elif disk.dev_type == constants.LD_FILE:
2543
      result["DISK_%d_BACKEND_TYPE" % idx] = \
2544
        "file:%s" % disk.physical_id[0]
2545

    
2546
  # NICs
2547
  for idx, nic in enumerate(instance.nics):
2548
    result["NIC_%d_MAC" % idx] = nic.mac
2549
    if nic.ip:
2550
      result["NIC_%d_IP" % idx] = nic.ip
2551
    result["NIC_%d_MODE" % idx] = nic.nicparams[constants.NIC_MODE]
2552
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2553
      result["NIC_%d_BRIDGE" % idx] = nic.nicparams[constants.NIC_LINK]
2554
    if nic.nicparams[constants.NIC_LINK]:
2555
      result["NIC_%d_LINK" % idx] = nic.nicparams[constants.NIC_LINK]
2556
    if nic.netinfo:
2557
      nobj = objects.Network.FromDict(nic.netinfo)
2558
      result.update(nobj.HooksDict("NIC_%d_" % idx))
2559
    if constants.HV_NIC_TYPE in instance.hvparams:
2560
      result["NIC_%d_FRONTEND_TYPE" % idx] = \
2561
        instance.hvparams[constants.HV_NIC_TYPE]
2562

    
2563
  # HV/BE params
2564
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2565
    for key, value in source.items():
2566
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2567

    
2568
  return result
2569

    
2570

    
2571
def DiagnoseExtStorage(top_dirs=None):
2572
  """Compute the validity for all ExtStorage Providers.
2573

2574
  @type top_dirs: list
2575
  @param top_dirs: the list of directories in which to
2576
      search (if not given defaults to
2577
      L{pathutils.ES_SEARCH_PATH})
2578
  @rtype: list of L{objects.ExtStorage}
2579
  @return: a list of tuples (name, path, status, diagnose, parameters)
2580
      for all (potential) ExtStorage Providers under all
2581
      search paths, where:
2582
          - name is the (potential) ExtStorage Provider
2583
          - path is the full path to the ExtStorage Provider
2584
          - status True/False is the validity of the ExtStorage Provider
2585
          - diagnose is the error message for an invalid ExtStorage Provider,
2586
            otherwise empty
2587
          - parameters is a list of (name, help) parameters, if any
2588

2589
  """
2590
  if top_dirs is None:
2591
    top_dirs = pathutils.ES_SEARCH_PATH
2592

    
2593
  result = []
2594
  for dir_name in top_dirs:
2595
    if os.path.isdir(dir_name):
2596
      try:
2597
        f_names = utils.ListVisibleFiles(dir_name)
2598
      except EnvironmentError, err:
2599
        logging.exception("Can't list the ExtStorage directory %s: %s",
2600
                          dir_name, err)
2601
        break
2602
      for name in f_names:
2603
        es_path = utils.PathJoin(dir_name, name)
2604
        status, es_inst = bdev.ExtStorageFromDisk(name, base_dir=dir_name)
2605
        if status:
2606
          diagnose = ""
2607
          parameters = es_inst.supported_parameters
2608
        else:
2609
          diagnose = es_inst
2610
          parameters = []
2611
        result.append((name, es_path, status, diagnose, parameters))
2612

    
2613
  return result
2614

    
2615

    
2616
def BlockdevGrow(disk, amount, dryrun, backingstore):
2617
  """Grow a stack of block devices.
2618

2619
  This function is called recursively, with the childrens being the
2620
  first ones to resize.
2621

2622
  @type disk: L{objects.Disk}
2623
  @param disk: the disk to be grown
2624
  @type amount: integer
2625
  @param amount: the amount (in mebibytes) to grow with
2626
  @type dryrun: boolean
2627
  @param dryrun: whether to execute the operation in simulation mode
2628
      only, without actually increasing the size
2629
  @param backingstore: whether to execute the operation on backing storage
2630
      only, or on "logical" storage only; e.g. DRBD is logical storage,
2631
      whereas LVM, file, RBD are backing storage
2632
  @rtype: (status, result)
2633
  @return: a tuple with the status of the operation (True/False), and
2634
      the errors message if status is False
2635

2636
  """
2637
  r_dev = _RecursiveFindBD(disk)
2638
  if r_dev is None:
2639
    _Fail("Cannot find block device %s", disk)
2640

    
2641
  try:
2642
    r_dev.Grow(amount, dryrun, backingstore)
2643
  except errors.BlockDeviceError, err:
2644
    _Fail("Failed to grow block device: %s", err, exc=True)
2645

    
2646

    
2647
def BlockdevSnapshot(disk):
2648
  """Create a snapshot copy of a block device.
2649

2650
  This function is called recursively, and the snapshot is actually created
2651
  just for the leaf lvm backend device.
2652

2653
  @type disk: L{objects.Disk}
2654
  @param disk: the disk to be snapshotted
2655
  @rtype: string
2656
  @return: snapshot disk ID as (vg, lv)
2657

2658
  """
2659
  if disk.dev_type == constants.LD_DRBD8:
2660
    if not disk.children:
2661
      _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2662
            disk.unique_id)
2663
    return BlockdevSnapshot(disk.children[0])
2664
  elif disk.dev_type == constants.LD_LV:
2665
    r_dev = _RecursiveFindBD(disk)
2666
    if r_dev is not None:
2667
      # FIXME: choose a saner value for the snapshot size
2668
      # let's stay on the safe side and ask for the full size, for now
2669
      return r_dev.Snapshot(disk.size)
2670
    else:
2671
      _Fail("Cannot find block device %s", disk)
2672
  else:
2673
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2674
          disk.unique_id, disk.dev_type)
2675

    
2676

    
2677
def BlockdevSetInfo(disk, info):
2678
  """Sets 'metadata' information on block devices.
2679

2680
  This function sets 'info' metadata on block devices. Initial
2681
  information is set at device creation; this function should be used
2682
  for example after renames.
2683

2684
  @type disk: L{objects.Disk}
2685
  @param disk: the disk to be grown
2686
  @type info: string
2687
  @param info: new 'info' metadata
2688
  @rtype: (status, result)
2689
  @return: a tuple with the status of the operation (True/False), and
2690
      the errors message if status is False
2691

2692
  """
2693
  r_dev = _RecursiveFindBD(disk)
2694
  if r_dev is None:
2695
    _Fail("Cannot find block device %s", disk)
2696

    
2697
  try:
2698
    r_dev.SetInfo(info)
2699
  except errors.BlockDeviceError, err:
2700
    _Fail("Failed to set information on block device: %s", err, exc=True)
2701

    
2702

    
2703
def FinalizeExport(instance, snap_disks):
2704
  """Write out the export configuration information.
2705

2706
  @type instance: L{objects.Instance}
2707
  @param instance: the instance which we export, used for
2708
      saving configuration
2709
  @type snap_disks: list of L{objects.Disk}
2710
  @param snap_disks: list of snapshot block devices, which
2711
      will be used to get the actual name of the dump file
2712

2713
  @rtype: None
2714

2715
  """
2716
  destdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name + ".new")
2717
  finaldestdir = utils.PathJoin(pathutils.EXPORT_DIR, instance.name)
2718

    
2719
  config = objects.SerializableConfigParser()
2720

    
2721
  config.add_section(constants.INISECT_EXP)
2722
  config.set(constants.INISECT_EXP, "version", "0")
2723
  config.set(constants.INISECT_EXP, "timestamp", "%d" % int(time.time()))
2724
  config.set(constants.INISECT_EXP, "source", instance.primary_node)
2725
  config.set(constants.INISECT_EXP, "os", instance.os)
2726
  config.set(constants.INISECT_EXP, "compression", "none")
2727

    
2728
  config.add_section(constants.INISECT_INS)
2729
  config.set(constants.INISECT_INS, "name", instance.name)
2730
  config.set(constants.INISECT_INS, "maxmem", "%d" %
2731
             instance.beparams[constants.BE_MAXMEM])
2732
  config.set(constants.INISECT_INS, "minmem", "%d" %
2733
             instance.beparams[constants.BE_MINMEM])
2734
  # "memory" is deprecated, but useful for exporting to old ganeti versions
2735
  config.set(constants.INISECT_INS, "memory", "%d" %
2736
             instance.beparams[constants.BE_MAXMEM])
2737
  config.set(constants.INISECT_INS, "vcpus", "%d" %
2738
             instance.beparams[constants.BE_VCPUS])
2739
  config.set(constants.INISECT_INS, "disk_template", instance.disk_template)
2740
  config.set(constants.INISECT_INS, "hypervisor", instance.hypervisor)
2741
  config.set(constants.INISECT_INS, "tags", " ".join(instance.GetTags()))
2742

    
2743
  nic_total = 0
2744
  for nic_count, nic in enumerate(instance.nics):
2745
    nic_total += 1
2746
    config.set(constants.INISECT_INS, "nic%d_mac" %
2747
               nic_count, "%s" % nic.mac)
2748
    config.set(constants.INISECT_INS, "nic%d_ip" % nic_count, "%s" % nic.ip)
2749
    config.set(constants.INISECT_INS, "nic%d_network" % nic_count,
2750
               "%s" % nic.network)
2751
    for param in constants.NICS_PARAMETER_TYPES:
2752
      config.set(constants.INISECT_INS, "nic%d_%s" % (nic_count, param),
2753
                 "%s" % nic.nicparams.get(param, None))
2754
  # TODO: redundant: on load can read nics until it doesn't exist
2755
  config.set(constants.INISECT_INS, "nic_count", "%d" % nic_total)
2756

    
2757
  disk_total = 0
2758
  for disk_count, disk in enumerate(snap_disks):
2759
    if disk:
2760
      disk_total += 1
2761
      config.set(constants.INISECT_INS, "disk%d_ivname" % disk_count,
2762
                 ("%s" % disk.iv_name))
2763
      config.set(constants.INISECT_INS, "disk%d_dump" % disk_count,
2764
                 ("%s" % disk.physical_id[1]))
2765
      config.set(constants.INISECT_INS, "disk%d_size" % disk_count,
2766
                 ("%d" % disk.size))
2767

    
2768
  config.set(constants.INISECT_INS, "disk_count", "%d" % disk_total)
2769

    
2770
  # New-style hypervisor/backend parameters
2771

    
2772
  config.add_section(constants.INISECT_HYP)
2773
  for name, value in instance.hvparams.items():
2774
    if name not in constants.HVC_GLOBALS:
2775
      config.set(constants.INISECT_HYP, name, str(value))
2776

    
2777
  config.add_section(constants.INISECT_BEP)
2778
  for name, value in instance.beparams.items():
2779
    config.set(constants.INISECT_BEP, name, str(value))
2780

    
2781
  config.add_section(constants.INISECT_OSP)
2782
  for name, value in instance.osparams.items():
2783
    config.set(constants.INISECT_OSP, name, str(value))
2784

    
2785
  utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2786
                  data=config.Dumps())
2787
  shutil.rmtree(finaldestdir, ignore_errors=True)
2788
  shutil.move(destdir, finaldestdir)
2789

    
2790

    
2791
def ExportInfo(dest):
2792
  """Get export configuration information.
2793

2794
  @type dest: str
2795
  @param dest: directory containing the export
2796

2797
  @rtype: L{objects.SerializableConfigParser}
2798
  @return: a serializable config file containing the
2799
      export info
2800

2801
  """
2802
  cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2803

    
2804
  config = objects.SerializableConfigParser()
2805
  config.read(cff)
2806

    
2807
  if (not config.has_section(constants.INISECT_EXP) or
2808
      not config.has_section(constants.INISECT_INS)):
2809
    _Fail("Export info file doesn't have the required fields")
2810

    
2811
  return config.Dumps()
2812

    
2813

    
2814
def ListExports():
2815
  """Return a list of exports currently available on this machine.
2816

2817
  @rtype: list
2818
  @return: list of the exports
2819

2820
  """
2821
  if os.path.isdir(pathutils.EXPORT_DIR):
2822
    return sorted(utils.ListVisibleFiles(pathutils.EXPORT_DIR))
2823
  else:
2824
    _Fail("No exports directory")
2825

    
2826

    
2827
def RemoveExport(export):
2828
  """Remove an existing export from the node.
2829

2830
  @type export: str
2831
  @param export: the name of the export to remove
2832
  @rtype: None
2833

2834
  """
2835
  target = utils.PathJoin(pathutils.EXPORT_DIR, export)
2836

    
2837
  try:
2838
    shutil.rmtree(target)
2839
  except EnvironmentError, err:
2840
    _Fail("Error while removing the export: %s", err, exc=True)
2841

    
2842

    
2843
def BlockdevRename(devlist):
2844
  """Rename a list of block devices.
2845

2846
  @type devlist: list of tuples
2847
  @param devlist: list of tuples of the form  (disk,
2848
      new_logical_id, new_physical_id); disk is an
2849
      L{objects.Disk} object describing the current disk,
2850
      and new logical_id/physical_id is the name we
2851
      rename it to
2852
  @rtype: boolean
2853
  @return: True if all renames succeeded, False otherwise
2854

2855
  """
2856
  msgs = []
2857
  result = True
2858
  for disk, unique_id in devlist:
2859
    dev = _RecursiveFindBD(disk)
2860
    if dev is None:
2861
      msgs.append("Can't find device %s in rename" % str(disk))
2862
      result = False
2863
      continue
2864
    try:
2865
      old_rpath = dev.dev_path
2866
      dev.Rename(unique_id)
2867
      new_rpath = dev.dev_path
2868
      if old_rpath != new_rpath:
2869
        DevCacheManager.RemoveCache(old_rpath)
2870
        # FIXME: we should add the new cache information here, like:
2871
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2872
        # but we don't have the owner here - maybe parse from existing
2873
        # cache? for now, we only lose lvm data when we rename, which
2874
        # is less critical than DRBD or MD
2875
    except errors.BlockDeviceError, err:
2876
      msgs.append("Can't rename device '%s' to '%s': %s" %
2877
                  (dev, unique_id, err))
2878
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2879
      result = False
2880
  if not result:
2881
    _Fail("; ".join(msgs))
2882

    
2883

    
2884
def _TransformFileStorageDir(fs_dir):
2885
  """Checks whether given file_storage_dir is valid.
2886

2887
  Checks wheter the given fs_dir is within the cluster-wide default
2888
  file_storage_dir or the shared_file_storage_dir, which are stored in
2889
  SimpleStore. Only paths under those directories are allowed.
2890

2891
  @type fs_dir: str
2892
  @param fs_dir: the path to check
2893

2894
  @return: the normalized path if valid, None otherwise
2895

2896
  """
2897
  if not (constants.ENABLE_FILE_STORAGE or
2898
          constants.ENABLE_SHARED_FILE_STORAGE):
2899
    _Fail("File storage disabled at configure time")
2900

    
2901
  bdev.CheckFileStoragePath(fs_dir)
2902

    
2903
  return os.path.normpath(fs_dir)
2904

    
2905

    
2906
def CreateFileStorageDir(file_storage_dir):
2907
  """Create file storage directory.
2908

2909
  @type file_storage_dir: str
2910
  @param file_storage_dir: directory to create
2911

2912
  @rtype: tuple
2913
  @return: tuple with first element a boolean indicating wheter dir
2914
      creation was successful or not
2915

2916
  """
2917
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2918
  if os.path.exists(file_storage_dir):
2919
    if not os.path.isdir(file_storage_dir):
2920
      _Fail("Specified storage dir '%s' is not a directory",
2921
            file_storage_dir)
2922
  else:
2923
    try:
2924
      os.makedirs(file_storage_dir, 0750)
2925
    except OSError, err:
2926
      _Fail("Cannot create file storage directory '%s': %s",
2927
            file_storage_dir, err, exc=True)
2928

    
2929

    
2930
def RemoveFileStorageDir(file_storage_dir):
2931
  """Remove file storage directory.
2932

2933
  Remove it only if it's empty. If not log an error and return.
2934

2935
  @type file_storage_dir: str
2936
  @param file_storage_dir: the directory we should cleanup
2937
  @rtype: tuple (success,)
2938
  @return: tuple of one element, C{success}, denoting
2939
      whether the operation was successful
2940

2941
  """
2942
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2943
  if os.path.exists(file_storage_dir):
2944
    if not os.path.isdir(file_storage_dir):
2945
      _Fail("Specified Storage directory '%s' is not a directory",
2946
            file_storage_dir)
2947
    # deletes dir only if empty, otherwise we want to fail the rpc call
2948
    try:
2949
      os.rmdir(file_storage_dir)
2950
    except OSError, err:
2951
      _Fail("Cannot remove file storage directory '%s': %s",
2952
            file_storage_dir, err)
2953

    
2954

    
2955
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2956
  """Rename the file storage directory.
2957

2958
  @type old_file_storage_dir: str
2959
  @param old_file_storage_dir: the current path
2960
  @type new_file_storage_dir: str
2961
  @param new_file_storage_dir: the name we should rename to
2962
  @rtype: tuple (success,)
2963
  @return: tuple of one element, C{success}, denoting
2964
      whether the operation was successful
2965

2966
  """
2967
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2968
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2969
  if not os.path.exists(new_file_storage_dir):
2970
    if os.path.isdir(old_file_storage_dir):
2971
      try:
2972
        os.rename(old_file_storage_dir, new_file_storage_dir)
2973
      except OSError, err:
2974
        _Fail("Cannot rename '%s' to '%s': %s",
2975
              old_file_storage_dir, new_file_storage_dir, err)
2976
    else:
2977
      _Fail("Specified storage dir '%s' is not a directory",
2978
            old_file_storage_dir)
2979
  else:
2980
    if os.path.exists(old_file_storage_dir):
2981
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2982
            old_file_storage_dir, new_file_storage_dir)
2983

    
2984

    
2985
def _EnsureJobQueueFile(file_name):
2986
  """Checks whether the given filename is in the queue directory.
2987

2988
  @type file_name: str
2989
  @param file_name: the file name we should check
2990
  @rtype: None
2991
  @raises RPCFail: if the file is not valid
2992

2993
  """
2994
  if not utils.IsBelowDir(pathutils.QUEUE_DIR, file_name):
2995
    _Fail("Passed job queue file '%s' does not belong to"
2996
          " the queue directory '%s'", file_name, pathutils.QUEUE_DIR)
2997

    
2998

    
2999
def JobQueueUpdate(file_name, content):
3000
  """Updates a file in the queue directory.
3001

3002
  This is just a wrapper over L{utils.io.WriteFile}, with proper
3003
  checking.
3004

3005
  @type file_name: str
3006
  @param file_name: the job file name
3007
  @type content: str
3008
  @param content: the new job contents
3009
  @rtype: boolean
3010
  @return: the success of the operation
3011

3012
  """
3013
  file_name = vcluster.LocalizeVirtualPath(file_name)
3014

    
3015
  _EnsureJobQueueFile(file_name)
3016
  getents = runtime.GetEnts()
3017

    
3018
  # Write and replace the file atomically
3019
  utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
3020
                  gid=getents.daemons_gid, mode=constants.JOB_QUEUE_FILES_PERMS)
3021

    
3022

    
3023
def JobQueueRename(old, new):
3024
  """Renames a job queue file.
3025

3026
  This is just a wrapper over os.rename with proper checking.
3027

3028
  @type old: str
3029
  @param old: the old (actual) file name
3030
  @type new: str
3031
  @param new: the desired file name
3032
  @rtype: tuple
3033
  @return: the success of the operation and payload
3034

3035
  """
3036
  old = vcluster.LocalizeVirtualPath(old)
3037
  new = vcluster.LocalizeVirtualPath(new)
3038

    
3039
  _EnsureJobQueueFile(old)
3040
  _EnsureJobQueueFile(new)
3041

    
3042
  getents = runtime.GetEnts()
3043

    
3044
  utils.RenameFile(old, new, mkdir=True, mkdir_mode=0750,
3045
                   dir_uid=getents.masterd_uid, dir_gid=getents.daemons_gid)
3046

    
3047

    
3048
def BlockdevClose(instance_name, disks):
3049
  """Closes the given block devices.
3050

3051
  This means they will be switched to secondary mode (in case of
3052
  DRBD).
3053

3054
  @param instance_name: if the argument is not empty, the symlinks
3055
      of this instance will be removed
3056
  @type disks: list of L{objects.Disk}
3057
  @param disks: the list of disks to be closed
3058
  @rtype: tuple (success, message)
3059
  @return: a tuple of success and message, where success
3060
      indicates the succes of the operation, and message
3061
      which will contain the error details in case we
3062
      failed
3063

3064
  """
3065
  bdevs = []
3066
  for cf in disks:
3067
    rd = _RecursiveFindBD(cf)
3068
    if rd is None:
3069
      _Fail("Can't find device %s", cf)
3070
    bdevs.append(rd)
3071

    
3072
  msg = []
3073
  for rd in bdevs:
3074
    try:
3075
      rd.Close()
3076
    except errors.BlockDeviceError, err:
3077
      msg.append(str(err))
3078
  if msg:
3079
    _Fail("Can't make devices secondary: %s", ",".join(msg))
3080
  else:
3081
    if instance_name:
3082
      _RemoveBlockDevLinks(instance_name, disks)
3083

    
3084

    
3085
def ValidateHVParams(hvname, hvparams):
3086
  """Validates the given hypervisor parameters.
3087

3088
  @type hvname: string
3089
  @param hvname: the hypervisor name
3090
  @type hvparams: dict
3091
  @param hvparams: the hypervisor parameters to be validated
3092
  @rtype: None
3093

3094
  """
3095
  try:
3096
    hv_type = hypervisor.GetHypervisor(hvname)
3097
    hv_type.ValidateParameters(hvparams)
3098
  except errors.HypervisorError, err:
3099
    _Fail(str(err), log=False)
3100

    
3101

    
3102
def _CheckOSPList(os_obj, parameters):
3103
  """Check whether a list of parameters is supported by the OS.
3104

3105
  @type os_obj: L{objects.OS}
3106
  @param os_obj: OS object to check
3107
  @type parameters: list
3108
  @param parameters: the list of parameters to check
3109

3110
  """
3111
  supported = [v[0] for v in os_obj.supported_parameters]
3112
  delta = frozenset(parameters).difference(supported)
3113
  if delta:
3114
    _Fail("The following parameters are not supported"
3115
          " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
3116

    
3117

    
3118
def ValidateOS(required, osname, checks, osparams):
3119
  """Validate the given OS' parameters.
3120

3121
  @type required: boolean
3122
  @param required: whether absence of the OS should translate into
3123
      failure or not
3124
  @type osname: string
3125
  @param osname: the OS to be validated
3126
  @type checks: list
3127
  @param checks: list of the checks to run (currently only 'parameters')
3128
  @type osparams: dict
3129
  @param osparams: dictionary with OS parameters
3130
  @rtype: boolean
3131
  @return: True if the validation passed, or False if the OS was not
3132
      found and L{required} was false
3133

3134
  """
3135
  if not constants.OS_VALIDATE_CALLS.issuperset(checks):
3136
    _Fail("Unknown checks required for OS %s: %s", osname,
3137
          set(checks).difference(constants.OS_VALIDATE_CALLS))
3138

    
3139
  name_only = objects.OS.GetName(osname)
3140
  status, tbv = _TryOSFromDisk(name_only, None)
3141

    
3142
  if not status:
3143
    if required:
3144
      _Fail(tbv)
3145
    else:
3146
      return False
3147

    
3148
  if max(tbv.api_versions) < constants.OS_API_V20:
3149
    return True
3150

    
3151
  if constants.OS_VALIDATE_PARAMETERS in checks:
3152
    _CheckOSPList(tbv, osparams.keys())
3153

    
3154
  validate_env = OSCoreEnv(osname, tbv, osparams)
3155
  result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
3156
                        cwd=tbv.path, reset_env=True)
3157
  if result.failed:
3158
    logging.error("os validate command '%s' returned error: %s output: %s",
3159
                  result.cmd, result.fail_reason, result.output)
3160
    _Fail("OS validation script failed (%s), output: %s",
3161
          result.fail_reason, result.output, log=False)
3162

    
3163
  return True
3164

    
3165

    
3166
def DemoteFromMC():
3167
  """Demotes the current node from master candidate role.
3168

3169
  """
3170
  # try to ensure we're not the master by mistake
3171
  master, myself = ssconf.GetMasterAndMyself()
3172
  if master == myself:
3173
    _Fail("ssconf status shows I'm the master node, will not demote")
3174

    
3175
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "check", constants.MASTERD])
3176
  if not result.failed:
3177
    _Fail("The master daemon is running, will not demote")
3178

    
3179
  try:
3180
    if os.path.isfile(pathutils.CLUSTER_CONF_FILE):
3181
      utils.CreateBackup(pathutils.CLUSTER_CONF_FILE)
3182
  except EnvironmentError, err:
3183
    if err.errno != errno.ENOENT:
3184
      _Fail("Error while backing up cluster file: %s", err, exc=True)
3185

    
3186
  utils.RemoveFile(pathutils.CLUSTER_CONF_FILE)
3187

    
3188

    
3189
def _GetX509Filenames(cryptodir, name):
3190
  """Returns the full paths for the private key and certificate.
3191

3192
  """
3193
  return (utils.PathJoin(cryptodir, name),
3194
          utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
3195
          utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
3196

    
3197

    
3198
def CreateX509Certificate(validity, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3199
  """Creates a new X509 certificate for SSL/TLS.
3200

3201
  @type validity: int
3202
  @param validity: Validity in seconds
3203
  @rtype: tuple; (string, string)
3204
  @return: Certificate name and public part
3205

3206
  """
3207
  (key_pem, cert_pem) = \
3208
    utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
3209
                                     min(validity, _MAX_SSL_CERT_VALIDITY))
3210

    
3211
  cert_dir = tempfile.mkdtemp(dir=cryptodir,
3212
                              prefix="x509-%s-" % utils.TimestampForFilename())
3213
  try:
3214
    name = os.path.basename(cert_dir)
3215
    assert len(name) > 5
3216

    
3217
    (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3218

    
3219
    utils.WriteFile(key_file, mode=0400, data=key_pem)
3220
    utils.WriteFile(cert_file, mode=0400, data=cert_pem)
3221

    
3222
    # Never return private key as it shouldn't leave the node
3223
    return (name, cert_pem)
3224
  except Exception:
3225
    shutil.rmtree(cert_dir, ignore_errors=True)
3226
    raise
3227

    
3228

    
3229
def RemoveX509Certificate(name, cryptodir=pathutils.CRYPTO_KEYS_DIR):
3230
  """Removes a X509 certificate.
3231

3232
  @type name: string
3233
  @param name: Certificate name
3234

3235
  """
3236
  (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
3237

    
3238
  utils.RemoveFile(key_file)
3239
  utils.RemoveFile(cert_file)
3240

    
3241
  try:
3242
    os.rmdir(cert_dir)
3243
  except EnvironmentError, err:
3244
    _Fail("Cannot remove certificate directory '%s': %s",
3245
          cert_dir, err)
3246

    
3247

    
3248
def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
3249
  """Returns the command for the requested input/output.
3250

3251
  @type instance: L{objects.Instance}
3252
  @param instance: The instance object
3253
  @param mode: Import/export mode
3254
  @param ieio: Input/output type
3255
  @param ieargs: Input/output arguments
3256

3257
  """
3258
  assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
3259

    
3260
  env = None
3261
  prefix = None
3262
  suffix = None
3263
  exp_size = None
3264

    
3265
  if ieio == constants.IEIO_FILE:
3266
    (filename, ) = ieargs
3267

    
3268
    if not utils.IsNormAbsPath(filename):
3269
      _Fail("Path '%s' is not normalized or absolute", filename)
3270

    
3271
    real_filename = os.path.realpath(filename)
3272
    directory = os.path.dirname(real_filename)
3273

    
3274
    if not utils.IsBelowDir(pathutils.EXPORT_DIR, real_filename):
3275
      _Fail("File '%s' is not under exports directory '%s': %s",
3276
            filename, pathutils.EXPORT_DIR, real_filename)
3277

    
3278
    # Create directory
3279
    utils.Makedirs(directory, mode=0750)
3280

    
3281
    quoted_filename = utils.ShellQuote(filename)
3282

    
3283
    if mode == constants.IEM_IMPORT:
3284
      suffix = "> %s" % quoted_filename
3285
    elif mode == constants.IEM_EXPORT:
3286
      suffix = "< %s" % quoted_filename
3287

    
3288
      # Retrieve file size
3289
      try:
3290
        st = os.stat(filename)
3291
      except EnvironmentError, err:
3292
        logging.error("Can't stat(2) %s: %s", filename, err)
3293
      else:
3294
        exp_size = utils.BytesToMebibyte(st.st_size)
3295

    
3296
  elif ieio == constants.IEIO_RAW_DISK:
3297
    (disk, ) = ieargs
3298

    
3299
    real_disk = _OpenRealBD(disk)
3300

    
3301
    if mode == constants.IEM_IMPORT:
3302
      # we set here a smaller block size as, due to transport buffering, more
3303
      # than 64-128k will mostly ignored; we use nocreat to fail if the device
3304
      # is not already there or we pass a wrong path; we use notrunc to no
3305
      # attempt truncate on an LV device; we use oflag=dsync to not buffer too
3306
      # much memory; this means that at best, we flush every 64k, which will
3307
      # not be very fast
3308
      suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
3309
                                    " bs=%s oflag=dsync"),
3310
                                    real_disk.dev_path,
3311
                                    str(64 * 1024))
3312

    
3313
    elif mode == constants.IEM_EXPORT:
3314
      # the block size on the read dd is 1MiB to match our units
3315
      prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
3316
                                   real_disk.dev_path,
3317
                                   str(1024 * 1024), # 1 MB
3318
                                   str(disk.size))
3319
      exp_size = disk.size
3320

    
3321
  elif ieio == constants.IEIO_SCRIPT:
3322
    (disk, disk_index, ) = ieargs
3323

    
3324
    assert isinstance(disk_index, (int, long))
3325

    
3326
    real_disk = _OpenRealBD(disk)
3327

    
3328
    inst_os = OSFromDisk(instance.os)
3329
    env = OSEnvironment(instance, inst_os)
3330

    
3331
    if mode == constants.IEM_IMPORT:
3332
      env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
3333
      env["IMPORT_INDEX"] = str(disk_index)
3334
      script = inst_os.import_script
3335

    
3336
    elif mode == constants.IEM_EXPORT:
3337
      env["EXPORT_DEVICE"] = real_disk.dev_path
3338
      env["EXPORT_INDEX"] = str(disk_index)
3339
      script = inst_os.export_script
3340

    
3341
    # TODO: Pass special environment only to script
3342
    script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
3343

    
3344
    if mode == constants.IEM_IMPORT:
3345
      suffix = "| %s" % script_cmd
3346

    
3347
    elif mode == constants.IEM_EXPORT:
3348
      prefix = "%s |" % script_cmd
3349

    
3350
    # Let script predict size
3351
    exp_size = constants.IE_CUSTOM_SIZE
3352

    
3353
  else:
3354
    _Fail("Invalid %s I/O mode %r", mode, ieio)
3355

    
3356
  return (env, prefix, suffix, exp_size)
3357

    
3358

    
3359
def _CreateImportExportStatusDir(prefix):
3360
  """Creates status directory for import/export.
3361

3362
  """
3363
  return tempfile.mkdtemp(dir=pathutils.IMPORT_EXPORT_DIR,
3364
                          prefix=("%s-%s-" %
3365
                                  (prefix, utils.TimestampForFilename())))
3366

    
3367

    
3368
def StartImportExportDaemon(mode, opts, host, port, instance, component,
3369
                            ieio, ieioargs):
3370
  """Starts an import or export daemon.
3371

3372
  @param mode: Import/output mode
3373
  @type opts: L{objects.ImportExportOptions}
3374
  @param opts: Daemon options
3375
  @type host: string
3376
  @param host: Remote host for export (None for import)
3377
  @type port: int
3378
  @param port: Remote port for export (None for import)
3379
  @type instance: L{objects.Instance}
3380
  @param instance: Instance object
3381
  @type component: string
3382
  @param component: which part of the instance is transferred now,
3383
      e.g. 'disk/0'
3384
  @param ieio: Input/output type
3385
  @param ieioargs: Input/output arguments
3386

3387
  """
3388
  if mode == constants.IEM_IMPORT:
3389
    prefix = "import"
3390

    
3391
    if not (host is None and port is None):
3392
      _Fail("Can not specify host or port on import")
3393

    
3394
  elif mode == constants.IEM_EXPORT:
3395
    prefix = "export"
3396

    
3397
    if host is None or port is None:
3398
      _Fail("Host and port must be specified for an export")
3399

    
3400
  else:
3401
    _Fail("Invalid mode %r", mode)
3402

    
3403
  if (opts.key_name is None) ^ (opts.ca_pem is None):
3404
    _Fail("Cluster certificate can only be used for both key and CA")
3405

    
3406
  (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
3407
    _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
3408

    
3409
  if opts.key_name is None:
3410
    # Use server.pem
3411
    key_path = pathutils.NODED_CERT_FILE
3412
    cert_path = pathutils.NODED_CERT_FILE
3413
    assert opts.ca_pem is None
3414
  else:
3415
    (_, key_path, cert_path) = _GetX509Filenames(pathutils.CRYPTO_KEYS_DIR,
3416
                                                 opts.key_name)
3417
    assert opts.ca_pem is not None
3418

    
3419
  for i in [key_path, cert_path]:
3420
    if not os.path.exists(i):
3421
      _Fail("File '%s' does not exist" % i)
3422

    
3423
  status_dir = _CreateImportExportStatusDir("%s-%s" % (prefix, component))
3424
  try:
3425
    status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
3426
    pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
3427
    ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
3428

    
3429
    if opts.ca_pem is None:
3430
      # Use server.pem
3431
      ca = utils.ReadFile(pathutils.NODED_CERT_FILE)
3432
    else:
3433
      ca = opts.ca_pem
3434

    
3435
    # Write CA file
3436
    utils.WriteFile(ca_file, data=ca, mode=0400)
3437

    
3438
    cmd = [
3439
      pathutils.IMPORT_EXPORT_DAEMON,
3440
      status_file, mode,
3441
      "--key=%s" % key_path,
3442
      "--cert=%s" % cert_path,
3443
      "--ca=%s" % ca_file,
3444
      ]
3445

    
3446
    if host:
3447
      cmd.append("--host=%s" % host)
3448

    
3449
    if port:
3450
      cmd.append("--port=%s" % port)
3451

    
3452
    if opts.ipv6:
3453
      cmd.append("--ipv6")
3454
    else:
3455
      cmd.append("--ipv4")
3456

    
3457
    if opts.compress:
3458
      cmd.append("--compress=%s" % opts.compress)
3459

    
3460
    if opts.magic:
3461
      cmd.append("--magic=%s" % opts.magic)
3462

    
3463
    if exp_size is not None:
3464
      cmd.append("--expected-size=%s" % exp_size)
3465

    
3466
    if cmd_prefix:
3467
      cmd.append("--cmd-prefix=%s" % cmd_prefix)
3468

    
3469
    if cmd_suffix:
3470
      cmd.append("--cmd-suffix=%s" % cmd_suffix)
3471

    
3472
    if mode == constants.IEM_EXPORT:
3473
      # Retry connection a few times when connecting to remote peer
3474
      cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3475
      cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3476
    elif opts.connect_timeout is not None:
3477
      assert mode == constants.IEM_IMPORT
3478
      # Overall timeout for establishing connection while listening
3479
      cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3480

    
3481
    logfile = _InstanceLogName(prefix, instance.os, instance.name, component)
3482

    
3483
    # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3484
    # support for receiving a file descriptor for output
3485
    utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3486
                      output=logfile)
3487

    
3488
    # The import/export name is simply the status directory name
3489
    return os.path.basename(status_dir)
3490

    
3491
  except Exception:
3492
    shutil.rmtree(status_dir, ignore_errors=True)
3493
    raise
3494

    
3495

    
3496
def GetImportExportStatus(names):
3497
  """Returns import/export daemon status.
3498

3499
  @type names: sequence
3500
  @param names: List of names
3501
  @rtype: List of dicts
3502
  @return: Returns a list of the state of each named import/export or None if a
3503
           status couldn't be read
3504

3505
  """
3506
  result = []
3507

    
3508
  for name in names:
3509
    status_file = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name,
3510
                                 _IES_STATUS_FILE)
3511

    
3512
    try:
3513
      data = utils.ReadFile(status_file)
3514
    except EnvironmentError, err:
3515
      if err.errno != errno.ENOENT:
3516
        raise
3517
      data = None
3518

    
3519
    if not data:
3520
      result.append(None)
3521
      continue
3522

    
3523
    result.append(serializer.LoadJson(data))
3524

    
3525
  return result
3526

    
3527

    
3528
def AbortImportExport(name):
3529
  """Sends SIGTERM to a running import/export daemon.
3530

3531
  """
3532
  logging.info("Abort import/export %s", name)
3533

    
3534
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3535
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3536

    
3537
  if pid:
3538
    logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3539
                 name, pid)
3540
    utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3541

    
3542

    
3543
def CleanupImportExport(name):
3544
  """Cleanup after an import or export.
3545

3546
  If the import/export daemon is still running it's killed. Afterwards the
3547
  whole status directory is removed.
3548

3549
  """
3550
  logging.info("Finalizing import/export %s", name)
3551

    
3552
  status_dir = utils.PathJoin(pathutils.IMPORT_EXPORT_DIR, name)
3553

    
3554
  pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3555

    
3556
  if pid:
3557
    logging.info("Import/export %s is still running with PID %s",
3558
                 name, pid)
3559
    utils.KillProcess(pid, waitpid=False)
3560

    
3561
  shutil.rmtree(status_dir, ignore_errors=True)
3562

    
3563

    
3564
def _FindDisks(nodes_ip, disks):
3565
  """Sets the physical ID on disks and returns the block devices.
3566

3567
  """
3568
  # set the correct physical ID
3569
  my_name = netutils.Hostname.GetSysName()
3570
  for cf in disks:
3571
    cf.SetPhysicalID(my_name, nodes_ip)
3572

    
3573
  bdevs = []
3574

    
3575
  for cf in disks:
3576
    rd = _RecursiveFindBD(cf)
3577
    if rd is None:
3578
      _Fail("Can't find device %s", cf)
3579
    bdevs.append(rd)
3580
  return bdevs
3581

    
3582

    
3583
def DrbdDisconnectNet(nodes_ip, disks):
3584
  """Disconnects the network on a list of drbd devices.
3585

3586
  """
3587
  bdevs = _FindDisks(nodes_ip, disks)
3588

    
3589
  # disconnect disks
3590
  for rd in bdevs:
3591
    try:
3592
      rd.DisconnectNet()
3593
    except errors.BlockDeviceError, err:
3594
      _Fail("Can't change network configuration to standalone mode: %s",
3595
            err, exc=True)
3596

    
3597

    
3598
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3599
  """Attaches the network on a list of drbd devices.
3600

3601
  """
3602
  bdevs = _FindDisks(nodes_ip, disks)
3603

    
3604
  if multimaster:
3605
    for idx, rd in enumerate(bdevs):
3606
      try:
3607
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3608
      except EnvironmentError, err:
3609
        _Fail("Can't create symlink: %s", err)
3610
  # reconnect disks, switch to new master configuration and if
3611
  # needed primary mode
3612
  for rd in bdevs:
3613
    try:
3614
      rd.AttachNet(multimaster)
3615
    except errors.BlockDeviceError, err:
3616
      _Fail("Can't change network configuration: %s", err)
3617

    
3618
  # wait until the disks are connected; we need to retry the re-attach
3619
  # if the device becomes standalone, as this might happen if the one
3620
  # node disconnects and reconnects in a different mode before the
3621
  # other node reconnects; in this case, one or both of the nodes will
3622
  # decide it has wrong configuration and switch to standalone
3623

    
3624
  def _Attach():
3625
    all_connected = True
3626

    
3627
    for rd in bdevs:
3628
      stats = rd.GetProcStatus()
3629

    
3630
      all_connected = (all_connected and
3631
                       (stats.is_connected or stats.is_in_resync))
3632

    
3633
      if stats.is_standalone:
3634
        # peer had different config info and this node became
3635
        # standalone, even though this should not happen with the
3636
        # new staged way of changing disk configs
3637
        try:
3638
          rd.AttachNet(multimaster)
3639
        except errors.BlockDeviceError, err:
3640
          _Fail("Can't change network configuration: %s", err)
3641

    
3642
    if not all_connected:
3643
      raise utils.RetryAgain()
3644

    
3645
  try:
3646
    # Start with a delay of 100 miliseconds and go up to 5 seconds
3647
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3648
  except utils.RetryTimeout:
3649
    _Fail("Timeout in disk reconnecting")
3650

    
3651
  if multimaster:
3652
    # change to primary mode
3653
    for rd in bdevs:
3654
      try:
3655
        rd.Open()
3656
      except errors.BlockDeviceError, err:
3657
        _Fail("Can't change to primary mode: %s", err)
3658

    
3659

    
3660
def DrbdWaitSync(nodes_ip, disks):
3661
  """Wait until DRBDs have synchronized.
3662

3663
  """
3664
  def _helper(rd):
3665
    stats = rd.GetProcStatus()
3666
    if not (stats.is_connected or stats.is_in_resync):
3667
      raise utils.RetryAgain()
3668
    return stats
3669

    
3670
  bdevs = _FindDisks(nodes_ip, disks)
3671

    
3672
  min_resync = 100
3673
  alldone = True
3674
  for rd in bdevs:
3675
    try:
3676
      # poll each second for 15 seconds
3677
      stats = utils.Retry(_helper, 1, 15, args=[rd])
3678
    except utils.RetryTimeout:
3679
      stats = rd.GetProcStatus()
3680
      # last check
3681
      if not (stats.is_connected or stats.is_in_resync):
3682
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3683
    alldone = alldone and (not stats.is_in_resync)
3684
    if stats.sync_percent is not None:
3685
      min_resync = min(min_resync, stats.sync_percent)
3686

    
3687
  return (alldone, min_resync)
3688

    
3689

    
3690
def GetDrbdUsermodeHelper():
3691
  """Returns DRBD usermode helper currently configured.
3692

3693
  """
3694
  try:
3695
    return drbd.DRBD8.GetUsermodeHelper()
3696
  except errors.BlockDeviceError, err:
3697
    _Fail(str(err))
3698

    
3699

    
3700
def PowercycleNode(hypervisor_type):
3701
  """Hard-powercycle the node.
3702

3703
  Because we need to return first, and schedule the powercycle in the
3704
  background, we won't be able to report failures nicely.
3705

3706
  """
3707
  hyper = hypervisor.GetHypervisor(hypervisor_type)
3708
  try:
3709
    pid = os.fork()
3710
  except OSError:
3711
    # if we can't fork, we'll pretend that we're in the child process
3712
    pid = 0
3713
  if pid > 0:
3714
    return "Reboot scheduled in 5 seconds"
3715
  # ensure the child is running on ram
3716
  try:
3717
    utils.Mlockall()
3718
  except Exception: # pylint: disable=W0703
3719
    pass
3720
  time.sleep(5)
3721
  hyper.PowercycleNode()
3722

    
3723

    
3724
def _VerifyRestrictedCmdName(cmd):
3725
  """Verifies a restricted command name.
3726

3727
  @type cmd: string
3728
  @param cmd: Command name
3729
  @rtype: tuple; (boolean, string or None)
3730
  @return: The tuple's first element is the status; if C{False}, the second
3731
    element is an error message string, otherwise it's C{None}
3732

3733
  """
3734
  if not cmd.strip():
3735
    return (False, "Missing command name")
3736

    
3737
  if os.path.basename(cmd) != cmd:
3738
    return (False, "Invalid command name")
3739

    
3740
  if not constants.EXT_PLUGIN_MASK.match(cmd):
3741
    return (False, "Command name contains forbidden characters")
3742

    
3743
  return (True, None)
3744

    
3745

    
3746
def _CommonRestrictedCmdCheck(path, owner):
3747
  """Common checks for restricted command file system directories and files.
3748

3749
  @type path: string
3750
  @param path: Path to check
3751
  @param owner: C{None} or tuple containing UID and GID
3752
  @rtype: tuple; (boolean, string or C{os.stat} result)
3753
  @return: The tuple's first element is the status; if C{False}, the second
3754
    element is an error message string, otherwise it's the result of C{os.stat}
3755

3756
  """
3757
  if owner is None:
3758
    # Default to root as owner
3759
    owner = (0, 0)
3760

    
3761
  try:
3762
    st = os.stat(path)
3763
  except EnvironmentError, err:
3764
    return (False, "Can't stat(2) '%s': %s" % (path, err))
3765

    
3766
  if stat.S_IMODE(st.st_mode) & (~_RCMD_MAX_MODE):
3767
    return (False, "Permissions on '%s' are too permissive" % path)
3768

    
3769
  if (st.st_uid, st.st_gid) != owner:
3770
    (owner_uid, owner_gid) = owner
3771
    return (False, "'%s' is not owned by %s:%s" % (path, owner_uid, owner_gid))
3772

    
3773
  return (True, st)
3774

    
3775

    
3776
def _VerifyRestrictedCmdDirectory(path, _owner=None):
3777
  """Verifies restricted command directory.
3778

3779
  @type path: string
3780
  @param path: Path to check
3781
  @rtype: tuple; (boolean, string or None)
3782
  @return: The tuple's first element is the status; if C{False}, the second
3783
    element is an error message string, otherwise it's C{None}
3784

3785
  """
3786
  (status, value) = _CommonRestrictedCmdCheck(path, _owner)
3787

    
3788
  if not status:
3789
    return (False, value)
3790

    
3791
  if not stat.S_ISDIR(value.st_mode):
3792
    return (False, "Path '%s' is not a directory" % path)
3793

    
3794
  return (True, None)
3795

    
3796

    
3797
def _VerifyRestrictedCmd(path, cmd, _owner=None):
3798
  """Verifies a whole restricted command and returns its executable filename.
3799

3800
  @type path: string
3801
  @param path: Directory containing restricted commands
3802
  @type cmd: string
3803
  @param cmd: Command name
3804
  @rtype: tuple; (boolean, string)
3805
  @return: The tuple's first element is the status; if C{False}, the second
3806
    element is an error message string, otherwise the second element is the
3807
    absolute path to the executable
3808

3809
  """
3810
  executable = utils.PathJoin(path, cmd)
3811

    
3812
  (status, msg) = _CommonRestrictedCmdCheck(executable, _owner)
3813

    
3814
  if not status:
3815
    return (False, msg)
3816

    
3817
  if not utils.IsExecutable(executable):
3818
    return (False, "access(2) thinks '%s' can't be executed" % executable)
3819

    
3820
  return (True, executable)
3821

    
3822

    
3823
def _PrepareRestrictedCmd(path, cmd,
3824
                          _verify_dir=_VerifyRestrictedCmdDirectory,
3825
                          _verify_name=_VerifyRestrictedCmdName,
3826
                          _verify_cmd=_VerifyRestrictedCmd):
3827
  """Performs a number of tests on a restricted command.
3828

3829
  @type path: string
3830
  @param path: Directory containing restricted commands
3831
  @type cmd: string
3832
  @param cmd: Command name
3833
  @return: Same as L{_VerifyRestrictedCmd}
3834

3835
  """
3836
  # Verify the directory first
3837
  (status, msg) = _verify_dir(path)
3838
  if status:
3839
    # Check command if everything was alright
3840
    (status, msg) = _verify_name(cmd)
3841

    
3842
  if not status:
3843
    return (False, msg)
3844

    
3845
  # Check actual executable
3846
  return _verify_cmd(path, cmd)
3847

    
3848

    
3849
def RunRestrictedCmd(cmd,
3850
                     _lock_timeout=_RCMD_LOCK_TIMEOUT,
3851
                     _lock_file=pathutils.RESTRICTED_COMMANDS_LOCK_FILE,
3852
                     _path=pathutils.RESTRICTED_COMMANDS_DIR,
3853
                     _sleep_fn=time.sleep,
3854
                     _prepare_fn=_PrepareRestrictedCmd,
3855
                     _runcmd_fn=utils.RunCmd,
3856
                     _enabled=constants.ENABLE_RESTRICTED_COMMANDS):
3857
  """Executes a restricted command after performing strict tests.
3858

3859
  @type cmd: string
3860
  @param cmd: Command name
3861
  @rtype: string
3862
  @return: Command output
3863
  @raise RPCFail: In case of an error
3864

3865
  """
3866
  logging.info("Preparing to run restricted command '%s'", cmd)
3867

    
3868
  if not _enabled:
3869
    _Fail("Restricted commands disabled at configure time")
3870

    
3871
  lock = None
3872
  try:
3873
    cmdresult = None
3874
    try:
3875
      lock = utils.FileLock.Open(_lock_file)
3876
      lock.Exclusive(blocking=True, timeout=_lock_timeout)
3877

    
3878
      (status, value) = _prepare_fn(_path, cmd)
3879

    
3880
      if status:
3881
        cmdresult = _runcmd_fn([value], env={}, reset_env=True,
3882
                               postfork_fn=lambda _: lock.Unlock())
3883
      else:
3884
        logging.error(value)
3885
    except Exception: # pylint: disable=W0703
3886
      # Keep original error in log
3887
      logging.exception("Caught exception")
3888

    
3889
    if cmdresult is None:
3890
      logging.info("Sleeping for %0.1f seconds before returning",
3891
                   _RCMD_INVALID_DELAY)
3892
      _sleep_fn(_RCMD_INVALID_DELAY)
3893

    
3894
      # Do not include original error message in returned error
3895
      _Fail("Executing command '%s' failed" % cmd)
3896
    elif cmdresult.failed or cmdresult.fail_reason:
3897
      _Fail("Restricted command '%s' failed: %s; output: %s",
3898
            cmd, cmdresult.fail_reason, cmdresult.output)
3899
    else:
3900
      return cmdresult.output
3901
  finally:
3902
    if lock is not None:
3903
      # Release lock at last
3904
      lock.Close()
3905
      lock = None
3906

    
3907

    
3908
def SetWatcherPause(until, _filename=pathutils.WATCHER_PAUSEFILE):
3909
  """Creates or removes the watcher pause file.
3910

3911
  @type until: None or number
3912
  @param until: Unix timestamp saying until when the watcher shouldn't run
3913

3914
  """
3915
  if until is None:
3916
    logging.info("Received request to no longer pause watcher")
3917
    utils.RemoveFile(_filename)
3918
  else:
3919
    logging.info("Received request to pause watcher until %s", until)
3920

    
3921
    if not ht.TNumber(until):
3922
      _Fail("Duration must be numeric")
3923

    
3924
    utils.WriteFile(_filename, data="%d\n" % (until, ), mode=0644)
3925

    
3926

    
3927
class HooksRunner(object):
3928
  """Hook runner.
3929

3930
  This class is instantiated on the node side (ganeti-noded) and not
3931
  on the master side.
3932

3933
  """
3934
  def __init__(self, hooks_base_dir=None):
3935
    """Constructor for hooks runner.
3936

3937
    @type hooks_base_dir: str or None
3938
    @param hooks_base_dir: if not None, this overrides the
3939
        L{pathutils.HOOKS_BASE_DIR} (useful for unittests)
3940

3941
    """
3942
    if hooks_base_dir is None:
3943
      hooks_base_dir = pathutils.HOOKS_BASE_DIR
3944
    # yeah, _BASE_DIR is not valid for attributes, we use it like a
3945
    # constant
3946
    self._BASE_DIR = hooks_base_dir # pylint: disable=C0103
3947

    
3948
  def RunLocalHooks(self, node_list, hpath, phase, env):
3949
    """Check that the hooks will be run only locally and then run them.
3950

3951
    """
3952
    assert len(node_list) == 1
3953
    node = node_list[0]
3954
    _, myself = ssconf.GetMasterAndMyself()
3955
    assert node == myself
3956

    
3957
    results = self.RunHooks(hpath, phase, env)
3958

    
3959
    # Return values in the form expected by HooksMaster
3960
    return {node: (None, False, results)}
3961

    
3962
  def RunHooks(self, hpath, phase, env):
3963
    """Run the scripts in the hooks directory.
3964

3965
    @type hpath: str
3966
    @param hpath: the path to the hooks directory which
3967
        holds the scripts
3968
    @type phase: str
3969
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
3970
        L{constants.HOOKS_PHASE_POST}
3971
    @type env: dict
3972
    @param env: dictionary with the environment for the hook
3973
    @rtype: list
3974
    @return: list of 3-element tuples:
3975
      - script path
3976
      - script result, either L{constants.HKR_SUCCESS} or
3977
        L{constants.HKR_FAIL}
3978
      - output of the script
3979

3980
    @raise errors.ProgrammerError: for invalid input
3981
        parameters
3982

3983
    """
3984
    if phase == constants.HOOKS_PHASE_PRE:
3985
      suffix = "pre"
3986
    elif phase == constants.HOOKS_PHASE_POST:
3987
      suffix = "post"
3988
    else:
3989
      _Fail("Unknown hooks phase '%s'", phase)
3990

    
3991
    subdir = "%s-%s.d" % (hpath, suffix)
3992
    dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3993

    
3994
    results = []
3995

    
3996
    if not os.path.isdir(dir_name):
3997
      # for non-existing/non-dirs, we simply exit instead of logging a
3998
      # warning at every operation
3999
      return results
4000

    
4001
    runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
4002

    
4003
    for (relname, relstatus, runresult) in runparts_results:
4004
      if relstatus == constants.RUNPARTS_SKIP:
4005
        rrval = constants.HKR_SKIP
4006
        output = ""
4007
      elif relstatus == constants.RUNPARTS_ERR:
4008
        rrval = constants.HKR_FAIL
4009
        output = "Hook script execution error: %s" % runresult
4010
      elif relstatus == constants.RUNPARTS_RUN:
4011
        if runresult.failed:
4012
          rrval = constants.HKR_FAIL
4013
        else:
4014
          rrval = constants.HKR_SUCCESS
4015
        output = utils.SafeEncode(runresult.output.strip())
4016
      results.append(("%s/%s" % (subdir, relname), rrval, output))
4017

    
4018
    return results
4019

    
4020

    
4021
class IAllocatorRunner(object):
4022
  """IAllocator runner.
4023

4024
  This class is instantiated on the node side (ganeti-noded) and not on
4025
  the master side.
4026

4027
  """
4028
  @staticmethod
4029
  def Run(name, idata):
4030
    """Run an iallocator script.
4031

4032
    @type name: str
4033
    @param name: the iallocator script name
4034
    @type idata: str
4035
    @param idata: the allocator input data
4036

4037
    @rtype: tuple
4038
    @return: two element tuple of:
4039
       - status
4040
       - either error message or stdout of allocator (for success)
4041

4042
    """
4043
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
4044
                                  os.path.isfile)
4045
    if alloc_script is None:
4046
      _Fail("iallocator module '%s' not found in the search path", name)
4047

    
4048
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
4049
    try:
4050
      os.write(fd, idata)
4051
      os.close(fd)
4052
      result = utils.RunCmd([alloc_script, fin_name])
4053
      if result.failed:
4054
        _Fail("iallocator module '%s' failed: %s, output '%s'",
4055
              name, result.fail_reason, result.output)
4056
    finally:
4057
      os.unlink(fin_name)
4058

    
4059
    return result.stdout
4060

    
4061

    
4062
class DevCacheManager(object):
4063
  """Simple class for managing a cache of block device information.
4064

4065
  """
4066
  _DEV_PREFIX = "/dev/"
4067
  _ROOT_DIR = pathutils.BDEV_CACHE_DIR
4068

    
4069
  @classmethod
4070
  def _ConvertPath(cls, dev_path):
4071
    """Converts a /dev/name path to the cache file name.
4072

4073
    This replaces slashes with underscores and strips the /dev
4074
    prefix. It then returns the full path to the cache file.
4075

4076
    @type dev_path: str
4077
    @param dev_path: the C{/dev/} path name
4078
    @rtype: str
4079
    @return: the converted path name
4080

4081
    """
4082
    if dev_path.startswith(cls._DEV_PREFIX):
4083
      dev_path = dev_path[len(cls._DEV_PREFIX):]
4084
    dev_path = dev_path.replace("/", "_")
4085
    fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
4086
    return fpath
4087

    
4088
  @classmethod
4089
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
4090
    """Updates the cache information for a given device.
4091

4092
    @type dev_path: str
4093
    @param dev_path: the pathname of the device
4094
    @type owner: str
4095
    @param owner: the owner (instance name) of the device
4096
    @type on_primary: bool
4097
    @param on_primary: whether this is the primary
4098
        node nor not
4099
    @type iv_name: str
4100
    @param iv_name: the instance-visible name of the
4101
        device, as in objects.Disk.iv_name
4102

4103
    @rtype: None
4104

4105
    """
4106
    if dev_path is None:
4107
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
4108
      return
4109
    fpath = cls._ConvertPath(dev_path)
4110
    if on_primary:
4111
      state = "primary"
4112
    else:
4113
      state = "secondary"
4114
    if iv_name is None:
4115
      iv_name = "not_visible"
4116
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
4117
    try:
4118
      utils.WriteFile(fpath, data=fdata)
4119
    except EnvironmentError, err:
4120
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
4121

    
4122
  @classmethod
4123
  def RemoveCache(cls, dev_path):
4124
    """Remove data for a dev_path.
4125

4126
    This is just a wrapper over L{utils.io.RemoveFile} with a converted
4127
    path name and logging.
4128

4129
    @type dev_path: str
4130
    @param dev_path: the pathname of the device
4131

4132
    @rtype: None
4133

4134
    """
4135
    if dev_path is None:
4136
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
4137
      return
4138
    fpath = cls._ConvertPath(dev_path)
4139
    try:
4140
      utils.RemoveFile(fpath)
4141
    except EnvironmentError, err:
4142
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)