Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7c0aa8e9

History | View | Annotate | Download (85.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  try:
379
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380
    utils.RemoveFile(constants.RAPI_CERT_FILE)
381
    utils.RemoveFile(constants.SSL_CERT_FILE)
382
  except:
383
    logging.exception("Error while removing cluster secrets")
384

    
385
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386

    
387
  if confd_pid:
388
    utils.KillProcess(confd_pid, timeout=2)
389

    
390
  # Raise a custom exception (handled in ganeti-noded)
391
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392

    
393

    
394
def GetNodeInfo(vgname, hypervisor_type):
395
  """Gives back a hash with different information about the node.
396

397
  @type vgname: C{string}
398
  @param vgname: the name of the volume group to ask for disk space information
399
  @type hypervisor_type: C{str}
400
  @param hypervisor_type: the name of the hypervisor to ask for
401
      memory information
402
  @rtype: C{dict}
403
  @return: dictionary with the following keys:
404
      - vg_size is the size of the configured volume group in MiB
405
      - vg_free is the free size of the volume group in MiB
406
      - memory_dom0 is the memory allocated for domain0 in MiB
407
      - memory_free is the currently available (free) ram in MiB
408
      - memory_total is the total number of ram in MiB
409

410
  """
411
  outputarray = {}
412
  vginfo = _GetVGInfo(vgname)
413
  outputarray['vg_size'] = vginfo['vg_size']
414
  outputarray['vg_free'] = vginfo['vg_free']
415

    
416
  hyper = hypervisor.GetHypervisor(hypervisor_type)
417
  hyp_info = hyper.GetNodeInfo()
418
  if hyp_info is not None:
419
    outputarray.update(hyp_info)
420

    
421
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422

    
423
  return outputarray
424

    
425

    
426
def VerifyNode(what, cluster_name):
427
  """Verify the status of the local node.
428

429
  Based on the input L{what} parameter, various checks are done on the
430
  local node.
431

432
  If the I{filelist} key is present, this list of
433
  files is checksummed and the file/checksum pairs are returned.
434

435
  If the I{nodelist} key is present, we check that we have
436
  connectivity via ssh with the target nodes (and check the hostname
437
  report).
438

439
  If the I{node-net-test} key is present, we check that we have
440
  connectivity to the given nodes via both primary IP and, if
441
  applicable, secondary IPs.
442

443
  @type what: C{dict}
444
  @param what: a dictionary of things to check:
445
      - filelist: list of files for which to compute checksums
446
      - nodelist: list of nodes we should check ssh communication with
447
      - node-net-test: list of nodes we should check node daemon port
448
        connectivity with
449
      - hypervisor: list with hypervisors to run the verify for
450
  @rtype: dict
451
  @return: a dictionary with the same keys as the input dict, and
452
      values representing the result of the checks
453

454
  """
455
  result = {}
456

    
457
  if constants.NV_HYPERVISOR in what:
458
    result[constants.NV_HYPERVISOR] = tmp = {}
459
    for hv_name in what[constants.NV_HYPERVISOR]:
460
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461

    
462
  if constants.NV_FILELIST in what:
463
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
464
      what[constants.NV_FILELIST])
465

    
466
  if constants.NV_NODELIST in what:
467
    result[constants.NV_NODELIST] = tmp = {}
468
    random.shuffle(what[constants.NV_NODELIST])
469
    for node in what[constants.NV_NODELIST]:
470
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471
      if not success:
472
        tmp[node] = message
473

    
474
  if constants.NV_NODENETTEST in what:
475
    result[constants.NV_NODENETTEST] = tmp = {}
476
    my_name = utils.HostInfo().name
477
    my_pip = my_sip = None
478
    for name, pip, sip in what[constants.NV_NODENETTEST]:
479
      if name == my_name:
480
        my_pip = pip
481
        my_sip = sip
482
        break
483
    if not my_pip:
484
      tmp[my_name] = ("Can't find my own primary/secondary IP"
485
                      " in the node list")
486
    else:
487
      port = utils.GetDaemonPort(constants.NODED)
488
      for name, pip, sip in what[constants.NV_NODENETTEST]:
489
        fail = []
490
        if not utils.TcpPing(pip, port, source=my_pip):
491
          fail.append("primary")
492
        if sip != pip:
493
          if not utils.TcpPing(sip, port, source=my_sip):
494
            fail.append("secondary")
495
        if fail:
496
          tmp[name] = ("failure using the %s interface(s)" %
497
                       " and ".join(fail))
498

    
499
  if constants.NV_LVLIST in what:
500
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501

    
502
  if constants.NV_INSTANCELIST in what:
503
    result[constants.NV_INSTANCELIST] = GetInstanceList(
504
      what[constants.NV_INSTANCELIST])
505

    
506
  if constants.NV_VGLIST in what:
507
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508

    
509
  if constants.NV_VERSION in what:
510
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511
                                    constants.RELEASE_VERSION)
512

    
513
  if constants.NV_HVINFO in what:
514
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516

    
517
  if constants.NV_DRBDLIST in what:
518
    try:
519
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
520
    except errors.BlockDeviceError, err:
521
      logging.warning("Can't get used minors list", exc_info=True)
522
      used_minors = str(err)
523
    result[constants.NV_DRBDLIST] = used_minors
524

    
525
  if constants.NV_NODESETUP in what:
526
    result[constants.NV_NODESETUP] = tmpr = []
527
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
528
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
529
                  " under /sys, missing required directories /sys/block"
530
                  " and /sys/class/net")
531
    if (not os.path.isdir("/proc/sys") or
532
        not os.path.isfile("/proc/sysrq-trigger")):
533
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
534
                  " under /proc, missing required directory /proc/sys and"
535
                  " the file /proc/sysrq-trigger")
536
  return result
537

    
538

    
539
def GetVolumeList(vg_name):
540
  """Compute list of logical volumes and their size.
541

542
  @type vg_name: str
543
  @param vg_name: the volume group whose LVs we should list
544
  @rtype: dict
545
  @return:
546
      dictionary of all partions (key) with value being a tuple of
547
      their size (in MiB), inactive and online status::
548

549
        {'test1': ('20.06', True, True)}
550

551
      in case of errors, a string is returned with the error
552
      details.
553

554
  """
555
  lvs = {}
556
  sep = '|'
557
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
558
                         "--separator=%s" % sep,
559
                         "-olv_name,lv_size,lv_attr", vg_name])
560
  if result.failed:
561
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
562

    
563
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
564
  for line in result.stdout.splitlines():
565
    line = line.strip()
566
    match = valid_line_re.match(line)
567
    if not match:
568
      logging.error("Invalid line returned from lvs output: '%s'", line)
569
      continue
570
    name, size, attr = match.groups()
571
    inactive = attr[4] == '-'
572
    online = attr[5] == 'o'
573
    virtual = attr[0] == 'v'
574
    if virtual:
575
      # we don't want to report such volumes as existing, since they
576
      # don't really hold data
577
      continue
578
    lvs[name] = (size, inactive, online)
579

    
580
  return lvs
581

    
582

    
583
def ListVolumeGroups():
584
  """List the volume groups and their size.
585

586
  @rtype: dict
587
  @return: dictionary with keys volume name and values the
588
      size of the volume
589

590
  """
591
  return utils.ListVolumeGroups()
592

    
593

    
594
def NodeVolumes():
595
  """List all volumes on this node.
596

597
  @rtype: list
598
  @return:
599
    A list of dictionaries, each having four keys:
600
      - name: the logical volume name,
601
      - size: the size of the logical volume
602
      - dev: the physical device on which the LV lives
603
      - vg: the volume group to which it belongs
604

605
    In case of errors, we return an empty list and log the
606
    error.
607

608
    Note that since a logical volume can live on multiple physical
609
    volumes, the resulting list might include a logical volume
610
    multiple times.
611

612
  """
613
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
614
                         "--separator=|",
615
                         "--options=lv_name,lv_size,devices,vg_name"])
616
  if result.failed:
617
    _Fail("Failed to list logical volumes, lvs output: %s",
618
          result.output)
619

    
620
  def parse_dev(dev):
621
    if '(' in dev:
622
      return dev.split('(')[0]
623
    else:
624
      return dev
625

    
626
  def map_line(line):
627
    return {
628
      'name': line[0].strip(),
629
      'size': line[1].strip(),
630
      'dev': parse_dev(line[2].strip()),
631
      'vg': line[3].strip(),
632
    }
633

    
634
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
635
          if line.count('|') >= 3]
636

    
637

    
638
def BridgesExist(bridges_list):
639
  """Check if a list of bridges exist on the current node.
640

641
  @rtype: boolean
642
  @return: C{True} if all of them exist, C{False} otherwise
643

644
  """
645
  missing = []
646
  for bridge in bridges_list:
647
    if not utils.BridgeExists(bridge):
648
      missing.append(bridge)
649

    
650
  if missing:
651
    _Fail("Missing bridges %s", ", ".join(missing))
652

    
653

    
654
def GetInstanceList(hypervisor_list):
655
  """Provides a list of instances.
656

657
  @type hypervisor_list: list
658
  @param hypervisor_list: the list of hypervisors to query information
659

660
  @rtype: list
661
  @return: a list of all running instances on the current node
662
    - instance1.example.com
663
    - instance2.example.com
664

665
  """
666
  results = []
667
  for hname in hypervisor_list:
668
    try:
669
      names = hypervisor.GetHypervisor(hname).ListInstances()
670
      results.extend(names)
671
    except errors.HypervisorError, err:
672
      _Fail("Error enumerating instances (hypervisor %s): %s",
673
            hname, err, exc=True)
674

    
675
  return results
676

    
677

    
678
def GetInstanceInfo(instance, hname):
679
  """Gives back the information about an instance as a dictionary.
680

681
  @type instance: string
682
  @param instance: the instance name
683
  @type hname: string
684
  @param hname: the hypervisor type of the instance
685

686
  @rtype: dict
687
  @return: dictionary with the following keys:
688
      - memory: memory size of instance (int)
689
      - state: xen state of instance (string)
690
      - time: cpu time of instance (float)
691

692
  """
693
  output = {}
694

    
695
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
696
  if iinfo is not None:
697
    output['memory'] = iinfo[2]
698
    output['state'] = iinfo[4]
699
    output['time'] = iinfo[5]
700

    
701
  return output
702

    
703

    
704
def GetInstanceMigratable(instance):
705
  """Gives whether an instance can be migrated.
706

707
  @type instance: L{objects.Instance}
708
  @param instance: object representing the instance to be checked.
709

710
  @rtype: tuple
711
  @return: tuple of (result, description) where:
712
      - result: whether the instance can be migrated or not
713
      - description: a description of the issue, if relevant
714

715
  """
716
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
717
  iname = instance.name
718
  if iname not in hyper.ListInstances():
719
    _Fail("Instance %s is not running", iname)
720

    
721
  for idx in range(len(instance.disks)):
722
    link_name = _GetBlockDevSymlinkPath(iname, idx)
723
    if not os.path.islink(link_name):
724
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
725

    
726

    
727
def GetAllInstancesInfo(hypervisor_list):
728
  """Gather data about all instances.
729

730
  This is the equivalent of L{GetInstanceInfo}, except that it
731
  computes data for all instances at once, thus being faster if one
732
  needs data about more than one instance.
733

734
  @type hypervisor_list: list
735
  @param hypervisor_list: list of hypervisors to query for instance data
736

737
  @rtype: dict
738
  @return: dictionary of instance: data, with data having the following keys:
739
      - memory: memory size of instance (int)
740
      - state: xen state of instance (string)
741
      - time: cpu time of instance (float)
742
      - vcpus: the number of vcpus
743

744
  """
745
  output = {}
746

    
747
  for hname in hypervisor_list:
748
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
749
    if iinfo:
750
      for name, _, memory, vcpus, state, times in iinfo:
751
        value = {
752
          'memory': memory,
753
          'vcpus': vcpus,
754
          'state': state,
755
          'time': times,
756
          }
757
        if name in output:
758
          # we only check static parameters, like memory and vcpus,
759
          # and not state and time which can change between the
760
          # invocations of the different hypervisors
761
          for key in 'memory', 'vcpus':
762
            if value[key] != output[name][key]:
763
              _Fail("Instance %s is running twice"
764
                    " with different parameters", name)
765
        output[name] = value
766

    
767
  return output
768

    
769

    
770
def InstanceOsAdd(instance, reinstall):
771
  """Add an OS to an instance.
772

773
  @type instance: L{objects.Instance}
774
  @param instance: Instance whose OS is to be installed
775
  @type reinstall: boolean
776
  @param reinstall: whether this is an instance reinstall
777
  @rtype: None
778

779
  """
780
  inst_os = OSFromDisk(instance.os)
781

    
782
  create_env = OSEnvironment(instance, inst_os)
783
  if reinstall:
784
    create_env['INSTANCE_REINSTALL'] = "1"
785

    
786
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
787
                                     instance.name, int(time.time()))
788

    
789
  result = utils.RunCmd([inst_os.create_script], env=create_env,
790
                        cwd=inst_os.path, output=logfile,)
791
  if result.failed:
792
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
793
                  " output: %s", result.cmd, result.fail_reason, logfile,
794
                  result.output)
795
    lines = [utils.SafeEncode(val)
796
             for val in utils.TailFile(logfile, lines=20)]
797
    _Fail("OS create script failed (%s), last lines in the"
798
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
799

    
800

    
801
def RunRenameInstance(instance, old_name):
802
  """Run the OS rename script for an instance.
803

804
  @type instance: L{objects.Instance}
805
  @param instance: Instance whose OS is to be installed
806
  @type old_name: string
807
  @param old_name: previous instance name
808
  @rtype: boolean
809
  @return: the success of the operation
810

811
  """
812
  inst_os = OSFromDisk(instance.os)
813

    
814
  rename_env = OSEnvironment(instance, inst_os)
815
  rename_env['OLD_INSTANCE_NAME'] = old_name
816

    
817
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
818
                                           old_name,
819
                                           instance.name, int(time.time()))
820

    
821
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
822
                        cwd=inst_os.path, output=logfile)
823

    
824
  if result.failed:
825
    logging.error("os create command '%s' returned error: %s output: %s",
826
                  result.cmd, result.fail_reason, result.output)
827
    lines = [utils.SafeEncode(val)
828
             for val in utils.TailFile(logfile, lines=20)]
829
    _Fail("OS rename script failed (%s), last lines in the"
830
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
831

    
832

    
833
def _GetVGInfo(vg_name):
834
  """Get information about the volume group.
835

836
  @type vg_name: str
837
  @param vg_name: the volume group which we query
838
  @rtype: dict
839
  @return:
840
    A dictionary with the following keys:
841
      - C{vg_size} is the total size of the volume group in MiB
842
      - C{vg_free} is the free size of the volume group in MiB
843
      - C{pv_count} are the number of physical disks in that VG
844

845
    If an error occurs during gathering of data, we return the same dict
846
    with keys all set to None.
847

848
  """
849
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
850

    
851
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
852
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
853

    
854
  if retval.failed:
855
    logging.error("volume group %s not present", vg_name)
856
    return retdic
857
  valarr = retval.stdout.strip().rstrip(':').split(':')
858
  if len(valarr) == 3:
859
    try:
860
      retdic = {
861
        "vg_size": int(round(float(valarr[0]), 0)),
862
        "vg_free": int(round(float(valarr[1]), 0)),
863
        "pv_count": int(valarr[2]),
864
        }
865
    except ValueError, err:
866
      logging.exception("Fail to parse vgs output: %s", err)
867
  else:
868
    logging.error("vgs output has the wrong number of fields (expected"
869
                  " three): %s", str(valarr))
870
  return retdic
871

    
872

    
873
def _GetBlockDevSymlinkPath(instance_name, idx):
874
  return os.path.join(constants.DISK_LINKS_DIR,
875
                      "%s:%d" % (instance_name, idx))
876

    
877

    
878
def _SymlinkBlockDev(instance_name, device_path, idx):
879
  """Set up symlinks to a instance's block device.
880

881
  This is an auxiliary function run when an instance is start (on the primary
882
  node) or when an instance is migrated (on the target node).
883

884

885
  @param instance_name: the name of the target instance
886
  @param device_path: path of the physical block device, on the node
887
  @param idx: the disk index
888
  @return: absolute path to the disk's symlink
889

890
  """
891
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
892
  try:
893
    os.symlink(device_path, link_name)
894
  except OSError, err:
895
    if err.errno == errno.EEXIST:
896
      if (not os.path.islink(link_name) or
897
          os.readlink(link_name) != device_path):
898
        os.remove(link_name)
899
        os.symlink(device_path, link_name)
900
    else:
901
      raise
902

    
903
  return link_name
904

    
905

    
906
def _RemoveBlockDevLinks(instance_name, disks):
907
  """Remove the block device symlinks belonging to the given instance.
908

909
  """
910
  for idx, _ in enumerate(disks):
911
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
912
    if os.path.islink(link_name):
913
      try:
914
        os.remove(link_name)
915
      except OSError:
916
        logging.exception("Can't remove symlink '%s'", link_name)
917

    
918

    
919
def _GatherAndLinkBlockDevs(instance):
920
  """Set up an instance's block device(s).
921

922
  This is run on the primary node at instance startup. The block
923
  devices must be already assembled.
924

925
  @type instance: L{objects.Instance}
926
  @param instance: the instance whose disks we shoul assemble
927
  @rtype: list
928
  @return: list of (disk_object, device_path)
929

930
  """
931
  block_devices = []
932
  for idx, disk in enumerate(instance.disks):
933
    device = _RecursiveFindBD(disk)
934
    if device is None:
935
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
936
                                    str(disk))
937
    device.Open()
938
    try:
939
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
940
    except OSError, e:
941
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
942
                                    e.strerror)
943

    
944
    block_devices.append((disk, link_name))
945

    
946
  return block_devices
947

    
948

    
949
def StartInstance(instance):
950
  """Start an instance.
951

952
  @type instance: L{objects.Instance}
953
  @param instance: the instance object
954
  @rtype: None
955

956
  """
957
  running_instances = GetInstanceList([instance.hypervisor])
958

    
959
  if instance.name in running_instances:
960
    logging.info("Instance %s already running, not starting", instance.name)
961
    return
962

    
963
  try:
964
    block_devices = _GatherAndLinkBlockDevs(instance)
965
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
966
    hyper.StartInstance(instance, block_devices)
967
  except errors.BlockDeviceError, err:
968
    _Fail("Block device error: %s", err, exc=True)
969
  except errors.HypervisorError, err:
970
    _RemoveBlockDevLinks(instance.name, instance.disks)
971
    _Fail("Hypervisor error: %s", err, exc=True)
972

    
973

    
974
def InstanceShutdown(instance, timeout):
975
  """Shut an instance down.
976

977
  @note: this functions uses polling with a hardcoded timeout.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance object
981
  @type timeout: integer
982
  @param timeout: maximum timeout for soft shutdown
983
  @rtype: None
984

985
  """
986
  hv_name = instance.hypervisor
987
  hyper = hypervisor.GetHypervisor(hv_name)
988
  running_instances = hyper.ListInstances()
989
  iname = instance.name
990

    
991
  if iname not in running_instances:
992
    logging.info("Instance %s not running, doing nothing", iname)
993
    return
994

    
995
  start = time.time()
996
  end = start + timeout
997
  sleep_time = 1
998

    
999
  tried_once = False
1000
  while not tried_once and time.time() < end:
1001
    try:
1002
      hyper.StopInstance(instance, retry=tried_once)
1003
    except errors.HypervisorError, err:
1004
      _Fail("Failed to stop instance %s: %s", iname, err)
1005
    tried_once = True
1006
    time.sleep(sleep_time)
1007
    if instance.name not in hyper.ListInstances():
1008
      break
1009
    if sleep_time < 5:
1010
      # 1.2 behaves particularly good for our case:
1011
      # it gives us 10 increasing steps and caps just slightly above 5 seconds
1012
      sleep_time *= 1.2
1013
  else:
1014
    # the shutdown did not succeed
1015
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1016

    
1017
    try:
1018
      hyper.StopInstance(instance, force=True)
1019
    except errors.HypervisorError, err:
1020
      _Fail("Failed to force stop instance %s: %s", iname, err)
1021

    
1022
    time.sleep(1)
1023
    if instance.name in GetInstanceList([hv_name]):
1024
      _Fail("Could not shutdown instance %s even by destroy", iname)
1025

    
1026
  _RemoveBlockDevLinks(iname, instance.disks)
1027

    
1028

    
1029
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1030
  """Reboot an instance.
1031

1032
  @type instance: L{objects.Instance}
1033
  @param instance: the instance object to reboot
1034
  @type reboot_type: str
1035
  @param reboot_type: the type of reboot, one the following
1036
    constants:
1037
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1038
        instance OS, do not recreate the VM
1039
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1040
        restart the VM (at the hypervisor level)
1041
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1042
        not accepted here, since that mode is handled differently, in
1043
        cmdlib, and translates into full stop and start of the
1044
        instance (instead of a call_instance_reboot RPC)
1045
  @type timeout: integer
1046
  @param timeout: maximum timeout for soft shutdown
1047
  @rtype: None
1048

1049
  """
1050
  running_instances = GetInstanceList([instance.hypervisor])
1051

    
1052
  if instance.name not in running_instances:
1053
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1054

    
1055
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1056
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1057
    try:
1058
      hyper.RebootInstance(instance)
1059
    except errors.HypervisorError, err:
1060
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1061
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1062
    try:
1063
      InstanceShutdown(instance, shutdown_timeout)
1064
      return StartInstance(instance)
1065
    except errors.HypervisorError, err:
1066
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1067
  else:
1068
    _Fail("Invalid reboot_type received: %s", reboot_type)
1069

    
1070

    
1071
def MigrationInfo(instance):
1072
  """Gather information about an instance to be migrated.
1073

1074
  @type instance: L{objects.Instance}
1075
  @param instance: the instance definition
1076

1077
  """
1078
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1079
  try:
1080
    info = hyper.MigrationInfo(instance)
1081
  except errors.HypervisorError, err:
1082
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1083
  return info
1084

    
1085

    
1086
def AcceptInstance(instance, info, target):
1087
  """Prepare the node to accept an instance.
1088

1089
  @type instance: L{objects.Instance}
1090
  @param instance: the instance definition
1091
  @type info: string/data (opaque)
1092
  @param info: migration information, from the source node
1093
  @type target: string
1094
  @param target: target host (usually ip), on this node
1095

1096
  """
1097
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1098
  try:
1099
    hyper.AcceptInstance(instance, info, target)
1100
  except errors.HypervisorError, err:
1101
    _Fail("Failed to accept instance: %s", err, exc=True)
1102

    
1103

    
1104
def FinalizeMigration(instance, info, success):
1105
  """Finalize any preparation to accept an instance.
1106

1107
  @type instance: L{objects.Instance}
1108
  @param instance: the instance definition
1109
  @type info: string/data (opaque)
1110
  @param info: migration information, from the source node
1111
  @type success: boolean
1112
  @param success: whether the migration was a success or a failure
1113

1114
  """
1115
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1116
  try:
1117
    hyper.FinalizeMigration(instance, info, success)
1118
  except errors.HypervisorError, err:
1119
    _Fail("Failed to finalize migration: %s", err, exc=True)
1120

    
1121

    
1122
def MigrateInstance(instance, target, live):
1123
  """Migrates an instance to another node.
1124

1125
  @type instance: L{objects.Instance}
1126
  @param instance: the instance definition
1127
  @type target: string
1128
  @param target: the target node name
1129
  @type live: boolean
1130
  @param live: whether the migration should be done live or not (the
1131
      interpretation of this parameter is left to the hypervisor)
1132
  @rtype: tuple
1133
  @return: a tuple of (success, msg) where:
1134
      - succes is a boolean denoting the success/failure of the operation
1135
      - msg is a string with details in case of failure
1136

1137
  """
1138
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1139

    
1140
  try:
1141
    hyper.MigrateInstance(instance.name, target, live)
1142
  except errors.HypervisorError, err:
1143
    _Fail("Failed to migrate instance: %s", err, exc=True)
1144

    
1145

    
1146
def BlockdevCreate(disk, size, owner, on_primary, info):
1147
  """Creates a block device for an instance.
1148

1149
  @type disk: L{objects.Disk}
1150
  @param disk: the object describing the disk we should create
1151
  @type size: int
1152
  @param size: the size of the physical underlying device, in MiB
1153
  @type owner: str
1154
  @param owner: the name of the instance for which disk is created,
1155
      used for device cache data
1156
  @type on_primary: boolean
1157
  @param on_primary:  indicates if it is the primary node or not
1158
  @type info: string
1159
  @param info: string that will be sent to the physical device
1160
      creation, used for example to set (LVM) tags on LVs
1161

1162
  @return: the new unique_id of the device (this can sometime be
1163
      computed only after creation), or None. On secondary nodes,
1164
      it's not required to return anything.
1165

1166
  """
1167
  clist = []
1168
  if disk.children:
1169
    for child in disk.children:
1170
      try:
1171
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1172
      except errors.BlockDeviceError, err:
1173
        _Fail("Can't assemble device %s: %s", child, err)
1174
      if on_primary or disk.AssembleOnSecondary():
1175
        # we need the children open in case the device itself has to
1176
        # be assembled
1177
        try:
1178
          crdev.Open()
1179
        except errors.BlockDeviceError, err:
1180
          _Fail("Can't make child '%s' read-write: %s", child, err)
1181
      clist.append(crdev)
1182

    
1183
  try:
1184
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1185
  except errors.BlockDeviceError, err:
1186
    _Fail("Can't create block device: %s", err)
1187

    
1188
  if on_primary or disk.AssembleOnSecondary():
1189
    try:
1190
      device.Assemble()
1191
    except errors.BlockDeviceError, err:
1192
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1193
    device.SetSyncSpeed(constants.SYNC_SPEED)
1194
    if on_primary or disk.OpenOnSecondary():
1195
      try:
1196
        device.Open(force=True)
1197
      except errors.BlockDeviceError, err:
1198
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1199
    DevCacheManager.UpdateCache(device.dev_path, owner,
1200
                                on_primary, disk.iv_name)
1201

    
1202
  device.SetInfo(info)
1203

    
1204
  return device.unique_id
1205

    
1206

    
1207
def BlockdevRemove(disk):
1208
  """Remove a block device.
1209

1210
  @note: This is intended to be called recursively.
1211

1212
  @type disk: L{objects.Disk}
1213
  @param disk: the disk object we should remove
1214
  @rtype: boolean
1215
  @return: the success of the operation
1216

1217
  """
1218
  msgs = []
1219
  try:
1220
    rdev = _RecursiveFindBD(disk)
1221
  except errors.BlockDeviceError, err:
1222
    # probably can't attach
1223
    logging.info("Can't attach to device %s in remove", disk)
1224
    rdev = None
1225
  if rdev is not None:
1226
    r_path = rdev.dev_path
1227
    try:
1228
      rdev.Remove()
1229
    except errors.BlockDeviceError, err:
1230
      msgs.append(str(err))
1231
    if not msgs:
1232
      DevCacheManager.RemoveCache(r_path)
1233

    
1234
  if disk.children:
1235
    for child in disk.children:
1236
      try:
1237
        BlockdevRemove(child)
1238
      except RPCFail, err:
1239
        msgs.append(str(err))
1240

    
1241
  if msgs:
1242
    _Fail("; ".join(msgs))
1243

    
1244

    
1245
def _RecursiveAssembleBD(disk, owner, as_primary):
1246
  """Activate a block device for an instance.
1247

1248
  This is run on the primary and secondary nodes for an instance.
1249

1250
  @note: this function is called recursively.
1251

1252
  @type disk: L{objects.Disk}
1253
  @param disk: the disk we try to assemble
1254
  @type owner: str
1255
  @param owner: the name of the instance which owns the disk
1256
  @type as_primary: boolean
1257
  @param as_primary: if we should make the block device
1258
      read/write
1259

1260
  @return: the assembled device or None (in case no device
1261
      was assembled)
1262
  @raise errors.BlockDeviceError: in case there is an error
1263
      during the activation of the children or the device
1264
      itself
1265

1266
  """
1267
  children = []
1268
  if disk.children:
1269
    mcn = disk.ChildrenNeeded()
1270
    if mcn == -1:
1271
      mcn = 0 # max number of Nones allowed
1272
    else:
1273
      mcn = len(disk.children) - mcn # max number of Nones
1274
    for chld_disk in disk.children:
1275
      try:
1276
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1277
      except errors.BlockDeviceError, err:
1278
        if children.count(None) >= mcn:
1279
          raise
1280
        cdev = None
1281
        logging.error("Error in child activation (but continuing): %s",
1282
                      str(err))
1283
      children.append(cdev)
1284

    
1285
  if as_primary or disk.AssembleOnSecondary():
1286
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1287
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1288
    result = r_dev
1289
    if as_primary or disk.OpenOnSecondary():
1290
      r_dev.Open()
1291
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1292
                                as_primary, disk.iv_name)
1293

    
1294
  else:
1295
    result = True
1296
  return result
1297

    
1298

    
1299
def BlockdevAssemble(disk, owner, as_primary):
1300
  """Activate a block device for an instance.
1301

1302
  This is a wrapper over _RecursiveAssembleBD.
1303

1304
  @rtype: str or boolean
1305
  @return: a C{/dev/...} path for primary nodes, and
1306
      C{True} for secondary nodes
1307

1308
  """
1309
  try:
1310
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1311
    if isinstance(result, bdev.BlockDev):
1312
      result = result.dev_path
1313
  except errors.BlockDeviceError, err:
1314
    _Fail("Error while assembling disk: %s", err, exc=True)
1315

    
1316
  return result
1317

    
1318

    
1319
def BlockdevShutdown(disk):
1320
  """Shut down a block device.
1321

1322
  First, if the device is assembled (Attach() is successful), then
1323
  the device is shutdown. Then the children of the device are
1324
  shutdown.
1325

1326
  This function is called recursively. Note that we don't cache the
1327
  children or such, as oppossed to assemble, shutdown of different
1328
  devices doesn't require that the upper device was active.
1329

1330
  @type disk: L{objects.Disk}
1331
  @param disk: the description of the disk we should
1332
      shutdown
1333
  @rtype: None
1334

1335
  """
1336
  msgs = []
1337
  r_dev = _RecursiveFindBD(disk)
1338
  if r_dev is not None:
1339
    r_path = r_dev.dev_path
1340
    try:
1341
      r_dev.Shutdown()
1342
      DevCacheManager.RemoveCache(r_path)
1343
    except errors.BlockDeviceError, err:
1344
      msgs.append(str(err))
1345

    
1346
  if disk.children:
1347
    for child in disk.children:
1348
      try:
1349
        BlockdevShutdown(child)
1350
      except RPCFail, err:
1351
        msgs.append(str(err))
1352

    
1353
  if msgs:
1354
    _Fail("; ".join(msgs))
1355

    
1356

    
1357
def BlockdevAddchildren(parent_cdev, new_cdevs):
1358
  """Extend a mirrored block device.
1359

1360
  @type parent_cdev: L{objects.Disk}
1361
  @param parent_cdev: the disk to which we should add children
1362
  @type new_cdevs: list of L{objects.Disk}
1363
  @param new_cdevs: the list of children which we should add
1364
  @rtype: None
1365

1366
  """
1367
  parent_bdev = _RecursiveFindBD(parent_cdev)
1368
  if parent_bdev is None:
1369
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1370
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1371
  if new_bdevs.count(None) > 0:
1372
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1373
  parent_bdev.AddChildren(new_bdevs)
1374

    
1375

    
1376
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1377
  """Shrink a mirrored block device.
1378

1379
  @type parent_cdev: L{objects.Disk}
1380
  @param parent_cdev: the disk from which we should remove children
1381
  @type new_cdevs: list of L{objects.Disk}
1382
  @param new_cdevs: the list of children which we should remove
1383
  @rtype: None
1384

1385
  """
1386
  parent_bdev = _RecursiveFindBD(parent_cdev)
1387
  if parent_bdev is None:
1388
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1389
  devs = []
1390
  for disk in new_cdevs:
1391
    rpath = disk.StaticDevPath()
1392
    if rpath is None:
1393
      bd = _RecursiveFindBD(disk)
1394
      if bd is None:
1395
        _Fail("Can't find device %s while removing children", disk)
1396
      else:
1397
        devs.append(bd.dev_path)
1398
    else:
1399
      devs.append(rpath)
1400
  parent_bdev.RemoveChildren(devs)
1401

    
1402

    
1403
def BlockdevGetmirrorstatus(disks):
1404
  """Get the mirroring status of a list of devices.
1405

1406
  @type disks: list of L{objects.Disk}
1407
  @param disks: the list of disks which we should query
1408
  @rtype: disk
1409
  @return:
1410
      a list of (mirror_done, estimated_time) tuples, which
1411
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1412
  @raise errors.BlockDeviceError: if any of the disks cannot be
1413
      found
1414

1415
  """
1416
  stats = []
1417
  for dsk in disks:
1418
    rbd = _RecursiveFindBD(dsk)
1419
    if rbd is None:
1420
      _Fail("Can't find device %s", dsk)
1421

    
1422
    stats.append(rbd.CombinedSyncStatus())
1423

    
1424
  return stats
1425

    
1426

    
1427
def _RecursiveFindBD(disk):
1428
  """Check if a device is activated.
1429

1430
  If so, return information about the real device.
1431

1432
  @type disk: L{objects.Disk}
1433
  @param disk: the disk object we need to find
1434

1435
  @return: None if the device can't be found,
1436
      otherwise the device instance
1437

1438
  """
1439
  children = []
1440
  if disk.children:
1441
    for chdisk in disk.children:
1442
      children.append(_RecursiveFindBD(chdisk))
1443

    
1444
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1445

    
1446

    
1447
def BlockdevFind(disk):
1448
  """Check if a device is activated.
1449

1450
  If it is, return information about the real device.
1451

1452
  @type disk: L{objects.Disk}
1453
  @param disk: the disk to find
1454
  @rtype: None or objects.BlockDevStatus
1455
  @return: None if the disk cannot be found, otherwise a the current
1456
           information
1457

1458
  """
1459
  try:
1460
    rbd = _RecursiveFindBD(disk)
1461
  except errors.BlockDeviceError, err:
1462
    _Fail("Failed to find device: %s", err, exc=True)
1463

    
1464
  if rbd is None:
1465
    return None
1466

    
1467
  return rbd.GetSyncStatus()
1468

    
1469

    
1470
def BlockdevGetsize(disks):
1471
  """Computes the size of the given disks.
1472

1473
  If a disk is not found, returns None instead.
1474

1475
  @type disks: list of L{objects.Disk}
1476
  @param disks: the list of disk to compute the size for
1477
  @rtype: list
1478
  @return: list with elements None if the disk cannot be found,
1479
      otherwise the size
1480

1481
  """
1482
  result = []
1483
  for cf in disks:
1484
    try:
1485
      rbd = _RecursiveFindBD(cf)
1486
    except errors.BlockDeviceError, err:
1487
      result.append(None)
1488
      continue
1489
    if rbd is None:
1490
      result.append(None)
1491
    else:
1492
      result.append(rbd.GetActualSize())
1493
  return result
1494

    
1495

    
1496
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1497
  """Export a block device to a remote node.
1498

1499
  @type disk: L{objects.Disk}
1500
  @param disk: the description of the disk to export
1501
  @type dest_node: str
1502
  @param dest_node: the destination node to export to
1503
  @type dest_path: str
1504
  @param dest_path: the destination path on the target node
1505
  @type cluster_name: str
1506
  @param cluster_name: the cluster name, needed for SSH hostalias
1507
  @rtype: None
1508

1509
  """
1510
  real_disk = _RecursiveFindBD(disk)
1511
  if real_disk is None:
1512
    _Fail("Block device '%s' is not set up", disk)
1513

    
1514
  real_disk.Open()
1515

    
1516
  # the block size on the read dd is 1MiB to match our units
1517
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1518
                               "dd if=%s bs=1048576 count=%s",
1519
                               real_disk.dev_path, str(disk.size))
1520

    
1521
  # we set here a smaller block size as, due to ssh buffering, more
1522
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1523
  # device is not already there or we pass a wrong path; we use
1524
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1525
  # to not buffer too much memory; this means that at best, we flush
1526
  # every 64k, which will not be very fast
1527
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1528
                                " oflag=dsync", dest_path)
1529

    
1530
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1531
                                                   constants.GANETI_RUNAS,
1532
                                                   destcmd)
1533

    
1534
  # all commands have been checked, so we're safe to combine them
1535
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1536

    
1537
  result = utils.RunCmd(["bash", "-c", command])
1538

    
1539
  if result.failed:
1540
    _Fail("Disk copy command '%s' returned error: %s"
1541
          " output: %s", command, result.fail_reason, result.output)
1542

    
1543

    
1544
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1545
  """Write a file to the filesystem.
1546

1547
  This allows the master to overwrite(!) a file. It will only perform
1548
  the operation if the file belongs to a list of configuration files.
1549

1550
  @type file_name: str
1551
  @param file_name: the target file name
1552
  @type data: str
1553
  @param data: the new contents of the file
1554
  @type mode: int
1555
  @param mode: the mode to give the file (can be None)
1556
  @type uid: int
1557
  @param uid: the owner of the file (can be -1 for default)
1558
  @type gid: int
1559
  @param gid: the group of the file (can be -1 for default)
1560
  @type atime: float
1561
  @param atime: the atime to set on the file (can be None)
1562
  @type mtime: float
1563
  @param mtime: the mtime to set on the file (can be None)
1564
  @rtype: None
1565

1566
  """
1567
  if not os.path.isabs(file_name):
1568
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1569

    
1570
  if file_name not in _ALLOWED_UPLOAD_FILES:
1571
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1572
          file_name)
1573

    
1574
  raw_data = _Decompress(data)
1575

    
1576
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1577
                  atime=atime, mtime=mtime)
1578

    
1579

    
1580
def WriteSsconfFiles(values):
1581
  """Update all ssconf files.
1582

1583
  Wrapper around the SimpleStore.WriteFiles.
1584

1585
  """
1586
  ssconf.SimpleStore().WriteFiles(values)
1587

    
1588

    
1589
def _ErrnoOrStr(err):
1590
  """Format an EnvironmentError exception.
1591

1592
  If the L{err} argument has an errno attribute, it will be looked up
1593
  and converted into a textual C{E...} description. Otherwise the
1594
  string representation of the error will be returned.
1595

1596
  @type err: L{EnvironmentError}
1597
  @param err: the exception to format
1598

1599
  """
1600
  if hasattr(err, 'errno'):
1601
    detail = errno.errorcode[err.errno]
1602
  else:
1603
    detail = str(err)
1604
  return detail
1605

    
1606

    
1607
def _OSOndiskAPIVersion(name, os_dir):
1608
  """Compute and return the API version of a given OS.
1609

1610
  This function will try to read the API version of the OS given by
1611
  the 'name' parameter and residing in the 'os_dir' directory.
1612

1613
  @type name: str
1614
  @param name: the OS name we should look for
1615
  @type os_dir: str
1616
  @param os_dir: the directory inwhich we should look for the OS
1617
  @rtype: tuple
1618
  @return: tuple (status, data) with status denoting the validity and
1619
      data holding either the vaid versions or an error message
1620

1621
  """
1622
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1623

    
1624
  try:
1625
    st = os.stat(api_file)
1626
  except EnvironmentError, err:
1627
    return False, ("Required file '%s' not found under path %s: %s" %
1628
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1629

    
1630
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1631
    return False, ("File '%s' in %s is not a regular file" %
1632
                   (constants.OS_API_FILE, os_dir))
1633

    
1634
  try:
1635
    api_versions = utils.ReadFile(api_file).splitlines()
1636
  except EnvironmentError, err:
1637
    return False, ("Error while reading the API version file at %s: %s" %
1638
                   (api_file, _ErrnoOrStr(err)))
1639

    
1640
  try:
1641
    api_versions = [int(version.strip()) for version in api_versions]
1642
  except (TypeError, ValueError), err:
1643
    return False, ("API version(s) can't be converted to integer: %s" %
1644
                   str(err))
1645

    
1646
  return True, api_versions
1647

    
1648

    
1649
def DiagnoseOS(top_dirs=None):
1650
  """Compute the validity for all OSes.
1651

1652
  @type top_dirs: list
1653
  @param top_dirs: the list of directories in which to
1654
      search (if not given defaults to
1655
      L{constants.OS_SEARCH_PATH})
1656
  @rtype: list of L{objects.OS}
1657
  @return: a list of tuples (name, path, status, diagnose, variants)
1658
      for all (potential) OSes under all search paths, where:
1659
          - name is the (potential) OS name
1660
          - path is the full path to the OS
1661
          - status True/False is the validity of the OS
1662
          - diagnose is the error message for an invalid OS, otherwise empty
1663
          - variants is a list of supported OS variants, if any
1664

1665
  """
1666
  if top_dirs is None:
1667
    top_dirs = constants.OS_SEARCH_PATH
1668

    
1669
  result = []
1670
  for dir_name in top_dirs:
1671
    if os.path.isdir(dir_name):
1672
      try:
1673
        f_names = utils.ListVisibleFiles(dir_name)
1674
      except EnvironmentError, err:
1675
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1676
        break
1677
      for name in f_names:
1678
        os_path = os.path.sep.join([dir_name, name])
1679
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1680
        if status:
1681
          diagnose = ""
1682
          variants = os_inst.supported_variants
1683
        else:
1684
          diagnose = os_inst
1685
          variants = []
1686
        result.append((name, os_path, status, diagnose, variants))
1687

    
1688
  return result
1689

    
1690

    
1691
def _TryOSFromDisk(name, base_dir=None):
1692
  """Create an OS instance from disk.
1693

1694
  This function will return an OS instance if the given name is a
1695
  valid OS name.
1696

1697
  @type base_dir: string
1698
  @keyword base_dir: Base directory containing OS installations.
1699
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1700
  @rtype: tuple
1701
  @return: success and either the OS instance if we find a valid one,
1702
      or error message
1703

1704
  """
1705
  if base_dir is None:
1706
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1707
    if os_dir is None:
1708
      return False, "Directory for OS %s not found in search path" % name
1709
  else:
1710
    os_dir = os.path.sep.join([base_dir, name])
1711

    
1712
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1713
  if not status:
1714
    # push the error up
1715
    return status, api_versions
1716

    
1717
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1718
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1719
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1720

    
1721
  # OS Files dictionary, we will populate it with the absolute path names
1722
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1723

    
1724
  if max(api_versions) >= constants.OS_API_V15:
1725
    os_files[constants.OS_VARIANTS_FILE] = ''
1726

    
1727
  for name in os_files:
1728
    os_files[name] = os.path.sep.join([os_dir, name])
1729

    
1730
    try:
1731
      st = os.stat(os_files[name])
1732
    except EnvironmentError, err:
1733
      return False, ("File '%s' under path '%s' is missing (%s)" %
1734
                     (name, os_dir, _ErrnoOrStr(err)))
1735

    
1736
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1737
      return False, ("File '%s' under path '%s' is not a regular file" %
1738
                     (name, os_dir))
1739

    
1740
    if name in constants.OS_SCRIPTS:
1741
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1742
        return False, ("File '%s' under path '%s' is not executable" %
1743
                       (name, os_dir))
1744

    
1745
  variants = None
1746
  if constants.OS_VARIANTS_FILE in os_files:
1747
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1748
    try:
1749
      variants = utils.ReadFile(variants_file).splitlines()
1750
    except EnvironmentError, err:
1751
      return False, ("Error while reading the OS variants file at %s: %s" %
1752
                     (variants_file, _ErrnoOrStr(err)))
1753
    if not variants:
1754
      return False, ("No supported os variant found")
1755

    
1756
  os_obj = objects.OS(name=name, path=os_dir,
1757
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1758
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1759
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1760
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1761
                      supported_variants=variants,
1762
                      api_versions=api_versions)
1763
  return True, os_obj
1764

    
1765

    
1766
def OSFromDisk(name, base_dir=None):
1767
  """Create an OS instance from disk.
1768

1769
  This function will return an OS instance if the given name is a
1770
  valid OS name. Otherwise, it will raise an appropriate
1771
  L{RPCFail} exception, detailing why this is not a valid OS.
1772

1773
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1774
  an exception but returns true/false status data.
1775

1776
  @type base_dir: string
1777
  @keyword base_dir: Base directory containing OS installations.
1778
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1779
  @rtype: L{objects.OS}
1780
  @return: the OS instance if we find a valid one
1781
  @raise RPCFail: if we don't find a valid OS
1782

1783
  """
1784
  name_only = name.split("+", 1)[0]
1785
  status, payload = _TryOSFromDisk(name_only, base_dir)
1786

    
1787
  if not status:
1788
    _Fail(payload)
1789

    
1790
  return payload
1791

    
1792

    
1793
def OSEnvironment(instance, os, debug=0):
1794
  """Calculate the environment for an os script.
1795

1796
  @type instance: L{objects.Instance}
1797
  @param instance: target instance for the os script run
1798
  @type os: L{objects.OS}
1799
  @param os: operating system for which the environment is being built
1800
  @type debug: integer
1801
  @param debug: debug level (0 or 1, for OS Api 10)
1802
  @rtype: dict
1803
  @return: dict of environment variables
1804
  @raise errors.BlockDeviceError: if the block device
1805
      cannot be found
1806

1807
  """
1808
  result = {}
1809
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1810
  result['OS_API_VERSION'] = '%d' % api_version
1811
  result['INSTANCE_NAME'] = instance.name
1812
  result['INSTANCE_OS'] = instance.os
1813
  result['HYPERVISOR'] = instance.hypervisor
1814
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1815
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1816
  result['DEBUG_LEVEL'] = '%d' % debug
1817
  if api_version >= constants.OS_API_V15:
1818
    try:
1819
      variant = instance.os.split('+', 1)[1]
1820
    except IndexError:
1821
      variant = os.supported_variants[0]
1822
    result['OS_VARIANT'] = variant
1823
  for idx, disk in enumerate(instance.disks):
1824
    real_disk = _RecursiveFindBD(disk)
1825
    if real_disk is None:
1826
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1827
                                    str(disk))
1828
    real_disk.Open()
1829
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1830
    result['DISK_%d_ACCESS' % idx] = disk.mode
1831
    if constants.HV_DISK_TYPE in instance.hvparams:
1832
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1833
        instance.hvparams[constants.HV_DISK_TYPE]
1834
    if disk.dev_type in constants.LDS_BLOCK:
1835
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1836
    elif disk.dev_type == constants.LD_FILE:
1837
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1838
        'file:%s' % disk.physical_id[0]
1839
  for idx, nic in enumerate(instance.nics):
1840
    result['NIC_%d_MAC' % idx] = nic.mac
1841
    if nic.ip:
1842
      result['NIC_%d_IP' % idx] = nic.ip
1843
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1844
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1845
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1846
    if nic.nicparams[constants.NIC_LINK]:
1847
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1848
    if constants.HV_NIC_TYPE in instance.hvparams:
1849
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1850
        instance.hvparams[constants.HV_NIC_TYPE]
1851

    
1852
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1853
    for key, value in source.items():
1854
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1855

    
1856
  return result
1857

    
1858
def BlockdevGrow(disk, amount):
1859
  """Grow a stack of block devices.
1860

1861
  This function is called recursively, with the childrens being the
1862
  first ones to resize.
1863

1864
  @type disk: L{objects.Disk}
1865
  @param disk: the disk to be grown
1866
  @rtype: (status, result)
1867
  @return: a tuple with the status of the operation
1868
      (True/False), and the errors message if status
1869
      is False
1870

1871
  """
1872
  r_dev = _RecursiveFindBD(disk)
1873
  if r_dev is None:
1874
    _Fail("Cannot find block device %s", disk)
1875

    
1876
  try:
1877
    r_dev.Grow(amount)
1878
  except errors.BlockDeviceError, err:
1879
    _Fail("Failed to grow block device: %s", err, exc=True)
1880

    
1881

    
1882
def BlockdevSnapshot(disk):
1883
  """Create a snapshot copy of a block device.
1884

1885
  This function is called recursively, and the snapshot is actually created
1886
  just for the leaf lvm backend device.
1887

1888
  @type disk: L{objects.Disk}
1889
  @param disk: the disk to be snapshotted
1890
  @rtype: string
1891
  @return: snapshot disk path
1892

1893
  """
1894
  if disk.children:
1895
    if len(disk.children) == 1:
1896
      # only one child, let's recurse on it
1897
      return BlockdevSnapshot(disk.children[0])
1898
    else:
1899
      # more than one child, choose one that matches
1900
      for child in disk.children:
1901
        if child.size == disk.size:
1902
          # return implies breaking the loop
1903
          return BlockdevSnapshot(child)
1904
  elif disk.dev_type == constants.LD_LV:
1905
    r_dev = _RecursiveFindBD(disk)
1906
    if r_dev is not None:
1907
      # let's stay on the safe side and ask for the full size, for now
1908
      return r_dev.Snapshot(disk.size)
1909
    else:
1910
      _Fail("Cannot find block device %s", disk)
1911
  else:
1912
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1913
          disk.unique_id, disk.dev_type)
1914

    
1915

    
1916
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1917
  """Export a block device snapshot to a remote node.
1918

1919
  @type disk: L{objects.Disk}
1920
  @param disk: the description of the disk to export
1921
  @type dest_node: str
1922
  @param dest_node: the destination node to export to
1923
  @type instance: L{objects.Instance}
1924
  @param instance: the instance object to whom the disk belongs
1925
  @type cluster_name: str
1926
  @param cluster_name: the cluster name, needed for SSH hostalias
1927
  @type idx: int
1928
  @param idx: the index of the disk in the instance's disk list,
1929
      used to export to the OS scripts environment
1930
  @rtype: None
1931

1932
  """
1933
  inst_os = OSFromDisk(instance.os)
1934
  export_env = OSEnvironment(instance, inst_os)
1935

    
1936
  export_script = inst_os.export_script
1937

    
1938
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1939
                                     instance.name, int(time.time()))
1940
  if not os.path.exists(constants.LOG_OS_DIR):
1941
    os.mkdir(constants.LOG_OS_DIR, 0750)
1942
  real_disk = _RecursiveFindBD(disk)
1943
  if real_disk is None:
1944
    _Fail("Block device '%s' is not set up", disk)
1945

    
1946
  real_disk.Open()
1947

    
1948
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1949
  export_env['EXPORT_INDEX'] = str(idx)
1950

    
1951
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1952
  destfile = disk.physical_id[1]
1953

    
1954
  # the target command is built out of three individual commands,
1955
  # which are joined by pipes; we check each individual command for
1956
  # valid parameters
1957
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1958
                               inst_os.path, export_script, logfile)
1959

    
1960
  comprcmd = "gzip"
1961

    
1962
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1963
                                destdir, destdir, destfile)
1964
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1965
                                                   constants.GANETI_RUNAS,
1966
                                                   destcmd)
1967

    
1968
  # all commands have been checked, so we're safe to combine them
1969
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1970

    
1971
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1972

    
1973
  if result.failed:
1974
    _Fail("OS snapshot export command '%s' returned error: %s"
1975
          " output: %s", command, result.fail_reason, result.output)
1976

    
1977

    
1978
def FinalizeExport(instance, snap_disks):
1979
  """Write out the export configuration information.
1980

1981
  @type instance: L{objects.Instance}
1982
  @param instance: the instance which we export, used for
1983
      saving configuration
1984
  @type snap_disks: list of L{objects.Disk}
1985
  @param snap_disks: list of snapshot block devices, which
1986
      will be used to get the actual name of the dump file
1987

1988
  @rtype: None
1989

1990
  """
1991
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1992
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1993

    
1994
  config = objects.SerializableConfigParser()
1995

    
1996
  config.add_section(constants.INISECT_EXP)
1997
  config.set(constants.INISECT_EXP, 'version', '0')
1998
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1999
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2000
  config.set(constants.INISECT_EXP, 'os', instance.os)
2001
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2002

    
2003
  config.add_section(constants.INISECT_INS)
2004
  config.set(constants.INISECT_INS, 'name', instance.name)
2005
  config.set(constants.INISECT_INS, 'memory', '%d' %
2006
             instance.beparams[constants.BE_MEMORY])
2007
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2008
             instance.beparams[constants.BE_VCPUS])
2009
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2010

    
2011
  nic_total = 0
2012
  for nic_count, nic in enumerate(instance.nics):
2013
    nic_total += 1
2014
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2015
               nic_count, '%s' % nic.mac)
2016
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2017
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2018
               '%s' % nic.bridge)
2019
  # TODO: redundant: on load can read nics until it doesn't exist
2020
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2021

    
2022
  disk_total = 0
2023
  for disk_count, disk in enumerate(snap_disks):
2024
    if disk:
2025
      disk_total += 1
2026
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2027
                 ('%s' % disk.iv_name))
2028
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2029
                 ('%s' % disk.physical_id[1]))
2030
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2031
                 ('%d' % disk.size))
2032

    
2033
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2034

    
2035
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2036
                  data=config.Dumps())
2037
  shutil.rmtree(finaldestdir, True)
2038
  shutil.move(destdir, finaldestdir)
2039

    
2040

    
2041
def ExportInfo(dest):
2042
  """Get export configuration information.
2043

2044
  @type dest: str
2045
  @param dest: directory containing the export
2046

2047
  @rtype: L{objects.SerializableConfigParser}
2048
  @return: a serializable config file containing the
2049
      export info
2050

2051
  """
2052
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2053

    
2054
  config = objects.SerializableConfigParser()
2055
  config.read(cff)
2056

    
2057
  if (not config.has_section(constants.INISECT_EXP) or
2058
      not config.has_section(constants.INISECT_INS)):
2059
    _Fail("Export info file doesn't have the required fields")
2060

    
2061
  return config.Dumps()
2062

    
2063

    
2064
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2065
  """Import an os image into an instance.
2066

2067
  @type instance: L{objects.Instance}
2068
  @param instance: instance to import the disks into
2069
  @type src_node: string
2070
  @param src_node: source node for the disk images
2071
  @type src_images: list of string
2072
  @param src_images: absolute paths of the disk images
2073
  @rtype: list of boolean
2074
  @return: each boolean represent the success of importing the n-th disk
2075

2076
  """
2077
  inst_os = OSFromDisk(instance.os)
2078
  import_env = OSEnvironment(instance, inst_os)
2079
  import_script = inst_os.import_script
2080

    
2081
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2082
                                        instance.name, int(time.time()))
2083
  if not os.path.exists(constants.LOG_OS_DIR):
2084
    os.mkdir(constants.LOG_OS_DIR, 0750)
2085

    
2086
  comprcmd = "gunzip"
2087
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2088
                               import_script, logfile)
2089

    
2090
  final_result = []
2091
  for idx, image in enumerate(src_images):
2092
    if image:
2093
      destcmd = utils.BuildShellCmd('cat %s', image)
2094
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2095
                                                       constants.GANETI_RUNAS,
2096
                                                       destcmd)
2097
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2098
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2099
      import_env['IMPORT_INDEX'] = str(idx)
2100
      result = utils.RunCmd(command, env=import_env)
2101
      if result.failed:
2102
        logging.error("Disk import command '%s' returned error: %s"
2103
                      " output: %s", command, result.fail_reason,
2104
                      result.output)
2105
        final_result.append("error importing disk %d: %s, %s" %
2106
                            (idx, result.fail_reason, result.output[-100]))
2107

    
2108
  if final_result:
2109
    _Fail("; ".join(final_result), log=False)
2110

    
2111

    
2112
def ListExports():
2113
  """Return a list of exports currently available on this machine.
2114

2115
  @rtype: list
2116
  @return: list of the exports
2117

2118
  """
2119
  if os.path.isdir(constants.EXPORT_DIR):
2120
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2121
  else:
2122
    _Fail("No exports directory")
2123

    
2124

    
2125
def RemoveExport(export):
2126
  """Remove an existing export from the node.
2127

2128
  @type export: str
2129
  @param export: the name of the export to remove
2130
  @rtype: None
2131

2132
  """
2133
  target = os.path.join(constants.EXPORT_DIR, export)
2134

    
2135
  try:
2136
    shutil.rmtree(target)
2137
  except EnvironmentError, err:
2138
    _Fail("Error while removing the export: %s", err, exc=True)
2139

    
2140

    
2141
def BlockdevRename(devlist):
2142
  """Rename a list of block devices.
2143

2144
  @type devlist: list of tuples
2145
  @param devlist: list of tuples of the form  (disk,
2146
      new_logical_id, new_physical_id); disk is an
2147
      L{objects.Disk} object describing the current disk,
2148
      and new logical_id/physical_id is the name we
2149
      rename it to
2150
  @rtype: boolean
2151
  @return: True if all renames succeeded, False otherwise
2152

2153
  """
2154
  msgs = []
2155
  result = True
2156
  for disk, unique_id in devlist:
2157
    dev = _RecursiveFindBD(disk)
2158
    if dev is None:
2159
      msgs.append("Can't find device %s in rename" % str(disk))
2160
      result = False
2161
      continue
2162
    try:
2163
      old_rpath = dev.dev_path
2164
      dev.Rename(unique_id)
2165
      new_rpath = dev.dev_path
2166
      if old_rpath != new_rpath:
2167
        DevCacheManager.RemoveCache(old_rpath)
2168
        # FIXME: we should add the new cache information here, like:
2169
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2170
        # but we don't have the owner here - maybe parse from existing
2171
        # cache? for now, we only lose lvm data when we rename, which
2172
        # is less critical than DRBD or MD
2173
    except errors.BlockDeviceError, err:
2174
      msgs.append("Can't rename device '%s' to '%s': %s" %
2175
                  (dev, unique_id, err))
2176
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2177
      result = False
2178
  if not result:
2179
    _Fail("; ".join(msgs))
2180

    
2181

    
2182
def _TransformFileStorageDir(file_storage_dir):
2183
  """Checks whether given file_storage_dir is valid.
2184

2185
  Checks wheter the given file_storage_dir is within the cluster-wide
2186
  default file_storage_dir stored in SimpleStore. Only paths under that
2187
  directory are allowed.
2188

2189
  @type file_storage_dir: str
2190
  @param file_storage_dir: the path to check
2191

2192
  @return: the normalized path if valid, None otherwise
2193

2194
  """
2195
  cfg = _GetConfig()
2196
  file_storage_dir = os.path.normpath(file_storage_dir)
2197
  base_file_storage_dir = cfg.GetFileStorageDir()
2198
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2199
      base_file_storage_dir):
2200
    _Fail("File storage directory '%s' is not under base file"
2201
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2202
  return file_storage_dir
2203

    
2204

    
2205
def CreateFileStorageDir(file_storage_dir):
2206
  """Create file storage directory.
2207

2208
  @type file_storage_dir: str
2209
  @param file_storage_dir: directory to create
2210

2211
  @rtype: tuple
2212
  @return: tuple with first element a boolean indicating wheter dir
2213
      creation was successful or not
2214

2215
  """
2216
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2217
  if os.path.exists(file_storage_dir):
2218
    if not os.path.isdir(file_storage_dir):
2219
      _Fail("Specified storage dir '%s' is not a directory",
2220
            file_storage_dir)
2221
  else:
2222
    try:
2223
      os.makedirs(file_storage_dir, 0750)
2224
    except OSError, err:
2225
      _Fail("Cannot create file storage directory '%s': %s",
2226
            file_storage_dir, err, exc=True)
2227

    
2228

    
2229
def RemoveFileStorageDir(file_storage_dir):
2230
  """Remove file storage directory.
2231

2232
  Remove it only if it's empty. If not log an error and return.
2233

2234
  @type file_storage_dir: str
2235
  @param file_storage_dir: the directory we should cleanup
2236
  @rtype: tuple (success,)
2237
  @return: tuple of one element, C{success}, denoting
2238
      whether the operation was successful
2239

2240
  """
2241
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2242
  if os.path.exists(file_storage_dir):
2243
    if not os.path.isdir(file_storage_dir):
2244
      _Fail("Specified Storage directory '%s' is not a directory",
2245
            file_storage_dir)
2246
    # deletes dir only if empty, otherwise we want to fail the rpc call
2247
    try:
2248
      os.rmdir(file_storage_dir)
2249
    except OSError, err:
2250
      _Fail("Cannot remove file storage directory '%s': %s",
2251
            file_storage_dir, err)
2252

    
2253

    
2254
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2255
  """Rename the file storage directory.
2256

2257
  @type old_file_storage_dir: str
2258
  @param old_file_storage_dir: the current path
2259
  @type new_file_storage_dir: str
2260
  @param new_file_storage_dir: the name we should rename to
2261
  @rtype: tuple (success,)
2262
  @return: tuple of one element, C{success}, denoting
2263
      whether the operation was successful
2264

2265
  """
2266
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2267
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2268
  if not os.path.exists(new_file_storage_dir):
2269
    if os.path.isdir(old_file_storage_dir):
2270
      try:
2271
        os.rename(old_file_storage_dir, new_file_storage_dir)
2272
      except OSError, err:
2273
        _Fail("Cannot rename '%s' to '%s': %s",
2274
              old_file_storage_dir, new_file_storage_dir, err)
2275
    else:
2276
      _Fail("Specified storage dir '%s' is not a directory",
2277
            old_file_storage_dir)
2278
  else:
2279
    if os.path.exists(old_file_storage_dir):
2280
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2281
            old_file_storage_dir, new_file_storage_dir)
2282

    
2283

    
2284
def _EnsureJobQueueFile(file_name):
2285
  """Checks whether the given filename is in the queue directory.
2286

2287
  @type file_name: str
2288
  @param file_name: the file name we should check
2289
  @rtype: None
2290
  @raises RPCFail: if the file is not valid
2291

2292
  """
2293
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2294
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2295

    
2296
  if not result:
2297
    _Fail("Passed job queue file '%s' does not belong to"
2298
          " the queue directory '%s'", file_name, queue_dir)
2299

    
2300

    
2301
def JobQueueUpdate(file_name, content):
2302
  """Updates a file in the queue directory.
2303

2304
  This is just a wrapper over L{utils.WriteFile}, with proper
2305
  checking.
2306

2307
  @type file_name: str
2308
  @param file_name: the job file name
2309
  @type content: str
2310
  @param content: the new job contents
2311
  @rtype: boolean
2312
  @return: the success of the operation
2313

2314
  """
2315
  _EnsureJobQueueFile(file_name)
2316

    
2317
  # Write and replace the file atomically
2318
  utils.WriteFile(file_name, data=_Decompress(content))
2319

    
2320

    
2321
def JobQueueRename(old, new):
2322
  """Renames a job queue file.
2323

2324
  This is just a wrapper over os.rename with proper checking.
2325

2326
  @type old: str
2327
  @param old: the old (actual) file name
2328
  @type new: str
2329
  @param new: the desired file name
2330
  @rtype: tuple
2331
  @return: the success of the operation and payload
2332

2333
  """
2334
  _EnsureJobQueueFile(old)
2335
  _EnsureJobQueueFile(new)
2336

    
2337
  utils.RenameFile(old, new, mkdir=True)
2338

    
2339

    
2340
def JobQueueSetDrainFlag(drain_flag):
2341
  """Set the drain flag for the queue.
2342

2343
  This will set or unset the queue drain flag.
2344

2345
  @type drain_flag: boolean
2346
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2347
  @rtype: truple
2348
  @return: always True, None
2349
  @warning: the function always returns True
2350

2351
  """
2352
  if drain_flag:
2353
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2354
  else:
2355
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2356

    
2357

    
2358
def BlockdevClose(instance_name, disks):
2359
  """Closes the given block devices.
2360

2361
  This means they will be switched to secondary mode (in case of
2362
  DRBD).
2363

2364
  @param instance_name: if the argument is not empty, the symlinks
2365
      of this instance will be removed
2366
  @type disks: list of L{objects.Disk}
2367
  @param disks: the list of disks to be closed
2368
  @rtype: tuple (success, message)
2369
  @return: a tuple of success and message, where success
2370
      indicates the succes of the operation, and message
2371
      which will contain the error details in case we
2372
      failed
2373

2374
  """
2375
  bdevs = []
2376
  for cf in disks:
2377
    rd = _RecursiveFindBD(cf)
2378
    if rd is None:
2379
      _Fail("Can't find device %s", cf)
2380
    bdevs.append(rd)
2381

    
2382
  msg = []
2383
  for rd in bdevs:
2384
    try:
2385
      rd.Close()
2386
    except errors.BlockDeviceError, err:
2387
      msg.append(str(err))
2388
  if msg:
2389
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2390
  else:
2391
    if instance_name:
2392
      _RemoveBlockDevLinks(instance_name, disks)
2393

    
2394

    
2395
def ValidateHVParams(hvname, hvparams):
2396
  """Validates the given hypervisor parameters.
2397

2398
  @type hvname: string
2399
  @param hvname: the hypervisor name
2400
  @type hvparams: dict
2401
  @param hvparams: the hypervisor parameters to be validated
2402
  @rtype: None
2403

2404
  """
2405
  try:
2406
    hv_type = hypervisor.GetHypervisor(hvname)
2407
    hv_type.ValidateParameters(hvparams)
2408
  except errors.HypervisorError, err:
2409
    _Fail(str(err), log=False)
2410

    
2411

    
2412
def DemoteFromMC():
2413
  """Demotes the current node from master candidate role.
2414

2415
  """
2416
  # try to ensure we're not the master by mistake
2417
  master, myself = ssconf.GetMasterAndMyself()
2418
  if master == myself:
2419
    _Fail("ssconf status shows I'm the master node, will not demote")
2420
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2421
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2422
    _Fail("The master daemon is running, will not demote")
2423
  try:
2424
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2425
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2426
  except EnvironmentError, err:
2427
    if err.errno != errno.ENOENT:
2428
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2429
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2430

    
2431

    
2432
def _FindDisks(nodes_ip, disks):
2433
  """Sets the physical ID on disks and returns the block devices.
2434

2435
  """
2436
  # set the correct physical ID
2437
  my_name = utils.HostInfo().name
2438
  for cf in disks:
2439
    cf.SetPhysicalID(my_name, nodes_ip)
2440

    
2441
  bdevs = []
2442

    
2443
  for cf in disks:
2444
    rd = _RecursiveFindBD(cf)
2445
    if rd is None:
2446
      _Fail("Can't find device %s", cf)
2447
    bdevs.append(rd)
2448
  return bdevs
2449

    
2450

    
2451
def DrbdDisconnectNet(nodes_ip, disks):
2452
  """Disconnects the network on a list of drbd devices.
2453

2454
  """
2455
  bdevs = _FindDisks(nodes_ip, disks)
2456

    
2457
  # disconnect disks
2458
  for rd in bdevs:
2459
    try:
2460
      rd.DisconnectNet()
2461
    except errors.BlockDeviceError, err:
2462
      _Fail("Can't change network configuration to standalone mode: %s",
2463
            err, exc=True)
2464

    
2465

    
2466
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2467
  """Attaches the network on a list of drbd devices.
2468

2469
  """
2470
  bdevs = _FindDisks(nodes_ip, disks)
2471

    
2472
  if multimaster:
2473
    for idx, rd in enumerate(bdevs):
2474
      try:
2475
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2476
      except EnvironmentError, err:
2477
        _Fail("Can't create symlink: %s", err)
2478
  # reconnect disks, switch to new master configuration and if
2479
  # needed primary mode
2480
  for rd in bdevs:
2481
    try:
2482
      rd.AttachNet(multimaster)
2483
    except errors.BlockDeviceError, err:
2484
      _Fail("Can't change network configuration: %s", err)
2485
  # wait until the disks are connected; we need to retry the re-attach
2486
  # if the device becomes standalone, as this might happen if the one
2487
  # node disconnects and reconnects in a different mode before the
2488
  # other node reconnects; in this case, one or both of the nodes will
2489
  # decide it has wrong configuration and switch to standalone
2490
  RECONNECT_TIMEOUT = 2 * 60
2491
  sleep_time = 0.100 # start with 100 miliseconds
2492
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2493
  while time.time() < timeout_limit:
2494
    all_connected = True
2495
    for rd in bdevs:
2496
      stats = rd.GetProcStatus()
2497
      if not (stats.is_connected or stats.is_in_resync):
2498
        all_connected = False
2499
      if stats.is_standalone:
2500
        # peer had different config info and this node became
2501
        # standalone, even though this should not happen with the
2502
        # new staged way of changing disk configs
2503
        try:
2504
          rd.AttachNet(multimaster)
2505
        except errors.BlockDeviceError, err:
2506
          _Fail("Can't change network configuration: %s", err)
2507
    if all_connected:
2508
      break
2509
    time.sleep(sleep_time)
2510
    sleep_time = min(5, sleep_time * 1.5)
2511
  if not all_connected:
2512
    _Fail("Timeout in disk reconnecting")
2513
  if multimaster:
2514
    # change to primary mode
2515
    for rd in bdevs:
2516
      try:
2517
        rd.Open()
2518
      except errors.BlockDeviceError, err:
2519
        _Fail("Can't change to primary mode: %s", err)
2520

    
2521

    
2522
def DrbdWaitSync(nodes_ip, disks):
2523
  """Wait until DRBDs have synchronized.
2524

2525
  """
2526
  bdevs = _FindDisks(nodes_ip, disks)
2527

    
2528
  min_resync = 100
2529
  alldone = True
2530
  for rd in bdevs:
2531
    stats = rd.GetProcStatus()
2532
    if not (stats.is_connected or stats.is_in_resync):
2533
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2534
    alldone = alldone and (not stats.is_in_resync)
2535
    if stats.sync_percent is not None:
2536
      min_resync = min(min_resync, stats.sync_percent)
2537

    
2538
  return (alldone, min_resync)
2539

    
2540

    
2541
def PowercycleNode(hypervisor_type):
2542
  """Hard-powercycle the node.
2543

2544
  Because we need to return first, and schedule the powercycle in the
2545
  background, we won't be able to report failures nicely.
2546

2547
  """
2548
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2549
  try:
2550
    pid = os.fork()
2551
  except OSError:
2552
    # if we can't fork, we'll pretend that we're in the child process
2553
    pid = 0
2554
  if pid > 0:
2555
    return "Reboot scheduled in 5 seconds"
2556
  time.sleep(5)
2557
  hyper.PowercycleNode()
2558

    
2559

    
2560
class HooksRunner(object):
2561
  """Hook runner.
2562

2563
  This class is instantiated on the node side (ganeti-noded) and not
2564
  on the master side.
2565

2566
  """
2567
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2568

    
2569
  def __init__(self, hooks_base_dir=None):
2570
    """Constructor for hooks runner.
2571

2572
    @type hooks_base_dir: str or None
2573
    @param hooks_base_dir: if not None, this overrides the
2574
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2575

2576
    """
2577
    if hooks_base_dir is None:
2578
      hooks_base_dir = constants.HOOKS_BASE_DIR
2579
    self._BASE_DIR = hooks_base_dir
2580

    
2581
  @staticmethod
2582
  def ExecHook(script, env):
2583
    """Exec one hook script.
2584

2585
    @type script: str
2586
    @param script: the full path to the script
2587
    @type env: dict
2588
    @param env: the environment with which to exec the script
2589
    @rtype: tuple (success, message)
2590
    @return: a tuple of success and message, where success
2591
        indicates the succes of the operation, and message
2592
        which will contain the error details in case we
2593
        failed
2594

2595
    """
2596
    # exec the process using subprocess and log the output
2597
    fdstdin = None
2598
    try:
2599
      fdstdin = open("/dev/null", "r")
2600
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2601
                               stderr=subprocess.STDOUT, close_fds=True,
2602
                               shell=False, cwd="/", env=env)
2603
      output = ""
2604
      try:
2605
        output = child.stdout.read(4096)
2606
        child.stdout.close()
2607
      except EnvironmentError, err:
2608
        output += "Hook script error: %s" % str(err)
2609

    
2610
      while True:
2611
        try:
2612
          result = child.wait()
2613
          break
2614
        except EnvironmentError, err:
2615
          if err.errno == errno.EINTR:
2616
            continue
2617
          raise
2618
    finally:
2619
      # try not to leak fds
2620
      for fd in (fdstdin, ):
2621
        if fd is not None:
2622
          try:
2623
            fd.close()
2624
          except EnvironmentError, err:
2625
            # just log the error
2626
            #logging.exception("Error while closing fd %s", fd)
2627
            pass
2628

    
2629
    return result == 0, utils.SafeEncode(output.strip())
2630

    
2631
  def RunHooks(self, hpath, phase, env):
2632
    """Run the scripts in the hooks directory.
2633

2634
    @type hpath: str
2635
    @param hpath: the path to the hooks directory which
2636
        holds the scripts
2637
    @type phase: str
2638
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2639
        L{constants.HOOKS_PHASE_POST}
2640
    @type env: dict
2641
    @param env: dictionary with the environment for the hook
2642
    @rtype: list
2643
    @return: list of 3-element tuples:
2644
      - script path
2645
      - script result, either L{constants.HKR_SUCCESS} or
2646
        L{constants.HKR_FAIL}
2647
      - output of the script
2648

2649
    @raise errors.ProgrammerError: for invalid input
2650
        parameters
2651

2652
    """
2653
    if phase == constants.HOOKS_PHASE_PRE:
2654
      suffix = "pre"
2655
    elif phase == constants.HOOKS_PHASE_POST:
2656
      suffix = "post"
2657
    else:
2658
      _Fail("Unknown hooks phase '%s'", phase)
2659

    
2660
    rr = []
2661

    
2662
    subdir = "%s-%s.d" % (hpath, suffix)
2663
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2664
    try:
2665
      dir_contents = utils.ListVisibleFiles(dir_name)
2666
    except OSError:
2667
      # FIXME: must log output in case of failures
2668
      return rr
2669

    
2670
    # we use the standard python sort order,
2671
    # so 00name is the recommended naming scheme
2672
    dir_contents.sort()
2673
    for relname in dir_contents:
2674
      fname = os.path.join(dir_name, relname)
2675
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2676
          self.RE_MASK.match(relname) is not None):
2677
        rrval = constants.HKR_SKIP
2678
        output = ""
2679
      else:
2680
        result, output = self.ExecHook(fname, env)
2681
        if not result:
2682
          rrval = constants.HKR_FAIL
2683
        else:
2684
          rrval = constants.HKR_SUCCESS
2685
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2686

    
2687
    return rr
2688

    
2689

    
2690
class IAllocatorRunner(object):
2691
  """IAllocator runner.
2692

2693
  This class is instantiated on the node side (ganeti-noded) and not on
2694
  the master side.
2695

2696
  """
2697
  def Run(self, name, idata):
2698
    """Run an iallocator script.
2699

2700
    @type name: str
2701
    @param name: the iallocator script name
2702
    @type idata: str
2703
    @param idata: the allocator input data
2704

2705
    @rtype: tuple
2706
    @return: two element tuple of:
2707
       - status
2708
       - either error message or stdout of allocator (for success)
2709

2710
    """
2711
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2712
                                  os.path.isfile)
2713
    if alloc_script is None:
2714
      _Fail("iallocator module '%s' not found in the search path", name)
2715

    
2716
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2717
    try:
2718
      os.write(fd, idata)
2719
      os.close(fd)
2720
      result = utils.RunCmd([alloc_script, fin_name])
2721
      if result.failed:
2722
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2723
              name, result.fail_reason, result.output)
2724
    finally:
2725
      os.unlink(fin_name)
2726

    
2727
    return result.stdout
2728

    
2729

    
2730
class DevCacheManager(object):
2731
  """Simple class for managing a cache of block device information.
2732

2733
  """
2734
  _DEV_PREFIX = "/dev/"
2735
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2736

    
2737
  @classmethod
2738
  def _ConvertPath(cls, dev_path):
2739
    """Converts a /dev/name path to the cache file name.
2740

2741
    This replaces slashes with underscores and strips the /dev
2742
    prefix. It then returns the full path to the cache file.
2743

2744
    @type dev_path: str
2745
    @param dev_path: the C{/dev/} path name
2746
    @rtype: str
2747
    @return: the converted path name
2748

2749
    """
2750
    if dev_path.startswith(cls._DEV_PREFIX):
2751
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2752
    dev_path = dev_path.replace("/", "_")
2753
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2754
    return fpath
2755

    
2756
  @classmethod
2757
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2758
    """Updates the cache information for a given device.
2759

2760
    @type dev_path: str
2761
    @param dev_path: the pathname of the device
2762
    @type owner: str
2763
    @param owner: the owner (instance name) of the device
2764
    @type on_primary: bool
2765
    @param on_primary: whether this is the primary
2766
        node nor not
2767
    @type iv_name: str
2768
    @param iv_name: the instance-visible name of the
2769
        device, as in objects.Disk.iv_name
2770

2771
    @rtype: None
2772

2773
    """
2774
    if dev_path is None:
2775
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2776
      return
2777
    fpath = cls._ConvertPath(dev_path)
2778
    if on_primary:
2779
      state = "primary"
2780
    else:
2781
      state = "secondary"
2782
    if iv_name is None:
2783
      iv_name = "not_visible"
2784
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2785
    try:
2786
      utils.WriteFile(fpath, data=fdata)
2787
    except EnvironmentError, err:
2788
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2789

    
2790
  @classmethod
2791
  def RemoveCache(cls, dev_path):
2792
    """Remove data for a dev_path.
2793

2794
    This is just a wrapper over L{utils.RemoveFile} with a converted
2795
    path name and logging.
2796

2797
    @type dev_path: str
2798
    @param dev_path: the pathname of the device
2799

2800
    @rtype: None
2801

2802
    """
2803
    if dev_path is None:
2804
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2805
      return
2806
    fpath = cls._ConvertPath(dev_path)
2807
    try:
2808
      utils.RemoveFile(fpath)
2809
    except EnvironmentError, err:
2810
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)