Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 920aae98

History | View | Annotate | Download (76.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=None):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100

101
  """
102
  if not os.path.isdir(path):
103
    return
104
  if exclude is None:
105
    exclude = []
106
  else:
107
    # Normalize excluded paths
108
    exclude = [os.path.normpath(i) for i in exclude]
109

    
110
  for rel_name in utils.ListVisibleFiles(path):
111
    full_name = os.path.normpath(os.path.join(path, rel_name))
112
    if full_name in exclude:
113
      continue
114
    if os.path.isfile(full_name) and not os.path.islink(full_name):
115
      utils.RemoveFile(full_name)
116

    
117

    
118
def JobQueuePurge():
119
  """Removes job queue files and archived jobs.
120

121
  @rtype: None
122

123
  """
124
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126

    
127

    
128
def GetMasterInfo():
129
  """Returns master information.
130

131
  This is an utility function to compute master information, either
132
  for consumption here or from the node daemon.
133

134
  @rtype: tuple
135
  @return: (master_netdev, master_ip, master_name) if we have a good
136
      configuration, otherwise (None, None, None)
137

138
  """
139
  try:
140
    cfg = _GetConfig()
141
    master_netdev = cfg.GetMasterNetdev()
142
    master_ip = cfg.GetMasterIP()
143
    master_node = cfg.GetMasterNode()
144
  except errors.ConfigurationError, err:
145
    logging.exception("Cluster configuration incomplete")
146
    return (None, None, None)
147
  return (master_netdev, master_ip, master_node)
148

    
149

    
150
def StartMaster(start_daemons):
151
  """Activate local node as master node.
152

153
  The function will always try activate the IP address of the master
154
  (unless someone else has it). It will also start the master daemons,
155
  based on the start_daemons parameter.
156

157
  @type start_daemons: boolean
158
  @param start_daemons: whther to also start the master
159
      daemons (ganeti-masterd and ganeti-rapi)
160
  @rtype: None
161

162
  """
163
  ok = True
164
  master_netdev, master_ip, _ = GetMasterInfo()
165
  if not master_netdev:
166
    return False
167

    
168
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169
    if utils.OwnIpAddress(master_ip):
170
      # we already have the ip:
171
      logging.debug("Already started")
172
    else:
173
      logging.error("Someone else has the master ip, not activating")
174
      ok = False
175
  else:
176
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177
                           "dev", master_netdev, "label",
178
                           "%s:0" % master_netdev])
179
    if result.failed:
180
      logging.error("Can't activate master IP: %s", result.output)
181
      ok = False
182

    
183
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184
                           "-s", master_ip, master_ip])
185
    # we'll ignore the exit code of arping
186

    
187
  # and now start the master and rapi daemons
188
  if start_daemons:
189
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
190
      result = utils.RunCmd([daemon])
191
      if result.failed:
192
        logging.error("Can't start daemon %s: %s", daemon, result.output)
193
        ok = False
194
  return ok
195

    
196

    
197
def StopMaster(stop_daemons):
198
  """Deactivate this node as master.
199

200
  The function will always try to deactivate the IP address of the
201
  master. It will also stop the master daemons depending on the
202
  stop_daemons parameter.
203

204
  @type stop_daemons: boolean
205
  @param stop_daemons: whether to also stop the master daemons
206
      (ganeti-masterd and ganeti-rapi)
207
  @rtype: None
208

209
  """
210
  master_netdev, master_ip, _ = GetMasterInfo()
211
  if not master_netdev:
212
    return False
213

    
214
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215
                         "dev", master_netdev])
216
  if result.failed:
217
    logging.error("Can't remove the master IP, error: %s", result.output)
218
    # but otherwise ignore the failure
219

    
220
  if stop_daemons:
221
    # stop/kill the rapi and the master daemon
222
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224

    
225
  return True
226

    
227

    
228
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229
  """Joins this node to the cluster.
230

231
  This does the following:
232
      - updates the hostkeys of the machine (rsa and dsa)
233
      - adds the ssh private key to the user
234
      - adds the ssh public key to the users' authorized_keys file
235

236
  @type dsa: str
237
  @param dsa: the DSA private key to write
238
  @type dsapub: str
239
  @param dsapub: the DSA public key to write
240
  @type rsa: str
241
  @param rsa: the RSA private key to write
242
  @type rsapub: str
243
  @param rsapub: the RSA public key to write
244
  @type sshkey: str
245
  @param sshkey: the SSH private key to write
246
  @type sshpub: str
247
  @param sshpub: the SSH public key to write
248
  @rtype: boolean
249
  @return: the success of the operation
250

251
  """
252
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256
  for name, content, mode in sshd_keys:
257
    utils.WriteFile(name, data=content, mode=mode)
258

    
259
  try:
260
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261
                                                    mkdir=True)
262
  except errors.OpExecError, err:
263
    logging.exception("Error while processing user ssh files")
264
    return False
265

    
266
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
267
    utils.WriteFile(name, data=content, mode=0600)
268

    
269
  utils.AddAuthorizedKey(auth_keys, sshpub)
270

    
271
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
272

    
273
  return True
274

    
275

    
276
def LeaveCluster():
277
  """Cleans up and remove the current node.
278

279
  This function cleans up and prepares the current node to be removed
280
  from the cluster.
281

282
  If processing is successful, then it raises an
283
  L{errors.QuitGanetiException} which is used as a special case to
284
  shutdown the node daemon.
285

286
  """
287
  _CleanDirectory(constants.DATA_DIR)
288
  JobQueuePurge()
289

    
290
  try:
291
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
292
  except errors.OpExecError:
293
    logging.exception("Error while processing ssh files")
294
    return
295

    
296
  f = open(pub_key, 'r')
297
  try:
298
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
299
  finally:
300
    f.close()
301

    
302
  utils.RemoveFile(priv_key)
303
  utils.RemoveFile(pub_key)
304

    
305
  # Return a reassuring string to the caller, and quit
306
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
307

    
308

    
309
def GetNodeInfo(vgname, hypervisor_type):
310
  """Gives back a hash with different informations about the node.
311

312
  @type vgname: C{string}
313
  @param vgname: the name of the volume group to ask for disk space information
314
  @type hypervisor_type: C{str}
315
  @param hypervisor_type: the name of the hypervisor to ask for
316
      memory information
317
  @rtype: C{dict}
318
  @return: dictionary with the following keys:
319
      - vg_size is the size of the configured volume group in MiB
320
      - vg_free is the free size of the volume group in MiB
321
      - memory_dom0 is the memory allocated for domain0 in MiB
322
      - memory_free is the currently available (free) ram in MiB
323
      - memory_total is the total number of ram in MiB
324

325
  """
326
  outputarray = {}
327
  vginfo = _GetVGInfo(vgname)
328
  outputarray['vg_size'] = vginfo['vg_size']
329
  outputarray['vg_free'] = vginfo['vg_free']
330

    
331
  hyper = hypervisor.GetHypervisor(hypervisor_type)
332
  hyp_info = hyper.GetNodeInfo()
333
  if hyp_info is not None:
334
    outputarray.update(hyp_info)
335

    
336
  f = open("/proc/sys/kernel/random/boot_id", 'r')
337
  try:
338
    outputarray["bootid"] = f.read(128).rstrip("\n")
339
  finally:
340
    f.close()
341

    
342
  return outputarray
343

    
344

    
345
def VerifyNode(what, cluster_name):
346
  """Verify the status of the local node.
347

348
  Based on the input L{what} parameter, various checks are done on the
349
  local node.
350

351
  If the I{filelist} key is present, this list of
352
  files is checksummed and the file/checksum pairs are returned.
353

354
  If the I{nodelist} key is present, we check that we have
355
  connectivity via ssh with the target nodes (and check the hostname
356
  report).
357

358
  If the I{node-net-test} key is present, we check that we have
359
  connectivity to the given nodes via both primary IP and, if
360
  applicable, secondary IPs.
361

362
  @type what: C{dict}
363
  @param what: a dictionary of things to check:
364
      - filelist: list of files for which to compute checksums
365
      - nodelist: list of nodes we should check ssh communication with
366
      - node-net-test: list of nodes we should check node daemon port
367
        connectivity with
368
      - hypervisor: list with hypervisors to run the verify for
369
  @rtype: dict
370
  @return: a dictionary with the same keys as the input dict, and
371
      values representing the result of the checks
372

373
  """
374
  result = {}
375

    
376
  if constants.NV_HYPERVISOR in what:
377
    result[constants.NV_HYPERVISOR] = tmp = {}
378
    for hv_name in what[constants.NV_HYPERVISOR]:
379
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
380

    
381
  if constants.NV_FILELIST in what:
382
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
383
      what[constants.NV_FILELIST])
384

    
385
  if constants.NV_NODELIST in what:
386
    result[constants.NV_NODELIST] = tmp = {}
387
    random.shuffle(what[constants.NV_NODELIST])
388
    for node in what[constants.NV_NODELIST]:
389
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
390
      if not success:
391
        tmp[node] = message
392

    
393
  if constants.NV_NODENETTEST in what:
394
    result[constants.NV_NODENETTEST] = tmp = {}
395
    my_name = utils.HostInfo().name
396
    my_pip = my_sip = None
397
    for name, pip, sip in what[constants.NV_NODENETTEST]:
398
      if name == my_name:
399
        my_pip = pip
400
        my_sip = sip
401
        break
402
    if not my_pip:
403
      tmp[my_name] = ("Can't find my own primary/secondary IP"
404
                      " in the node list")
405
    else:
406
      port = utils.GetNodeDaemonPort()
407
      for name, pip, sip in what[constants.NV_NODENETTEST]:
408
        fail = []
409
        if not utils.TcpPing(pip, port, source=my_pip):
410
          fail.append("primary")
411
        if sip != pip:
412
          if not utils.TcpPing(sip, port, source=my_sip):
413
            fail.append("secondary")
414
        if fail:
415
          tmp[name] = ("failure using the %s interface(s)" %
416
                       " and ".join(fail))
417

    
418
  if constants.NV_LVLIST in what:
419
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
420

    
421
  if constants.NV_INSTANCELIST in what:
422
    result[constants.NV_INSTANCELIST] = GetInstanceList(
423
      what[constants.NV_INSTANCELIST])
424

    
425
  if constants.NV_VGLIST in what:
426
    result[constants.NV_VGLIST] = ListVolumeGroups()
427

    
428
  if constants.NV_VERSION in what:
429
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
430

    
431
  if constants.NV_HVINFO in what:
432
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
433
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
434

    
435
  if constants.NV_DRBDLIST in what:
436
    try:
437
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
438
    except errors.BlockDeviceErrors:
439
      logging.warning("Can't get used minors list", exc_info=True)
440
      used_minors = []
441
    result[constants.NV_DRBDLIST] = used_minors
442

    
443
  return result
444

    
445

    
446
def GetVolumeList(vg_name):
447
  """Compute list of logical volumes and their size.
448

449
  @type vg_name: str
450
  @param vg_name: the volume group whose LVs we should list
451
  @rtype: dict
452
  @return:
453
      dictionary of all partions (key) with value being a tuple of
454
      their size (in MiB), inactive and online status::
455

456
        {'test1': ('20.06', True, True)}
457

458
      in case of errors, a string is returned with the error
459
      details.
460

461
  """
462
  lvs = {}
463
  sep = '|'
464
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
465
                         "--separator=%s" % sep,
466
                         "-olv_name,lv_size,lv_attr", vg_name])
467
  if result.failed:
468
    logging.error("Failed to list logical volumes, lvs output: %s",
469
                  result.output)
470
    return result.output
471

    
472
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
473
  for line in result.stdout.splitlines():
474
    line = line.strip()
475
    match = valid_line_re.match(line)
476
    if not match:
477
      logging.error("Invalid line returned from lvs output: '%s'", line)
478
      continue
479
    name, size, attr = match.groups()
480
    inactive = attr[4] == '-'
481
    online = attr[5] == 'o'
482
    lvs[name] = (size, inactive, online)
483

    
484
  return lvs
485

    
486

    
487
def ListVolumeGroups():
488
  """List the volume groups and their size.
489

490
  @rtype: dict
491
  @return: dictionary with keys volume name and values the
492
      size of the volume
493

494
  """
495
  return utils.ListVolumeGroups()
496

    
497

    
498
def NodeVolumes():
499
  """List all volumes on this node.
500

501
  @rtype: list
502
  @return:
503
    A list of dictionaries, each having four keys:
504
      - name: the logical volume name,
505
      - size: the size of the logical volume
506
      - dev: the physical device on which the LV lives
507
      - vg: the volume group to which it belongs
508

509
    In case of errors, we return an empty list and log the
510
    error.
511

512
    Note that since a logical volume can live on multiple physical
513
    volumes, the resulting list might include a logical volume
514
    multiple times.
515

516
  """
517
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
518
                         "--separator=|",
519
                         "--options=lv_name,lv_size,devices,vg_name"])
520
  if result.failed:
521
    logging.error("Failed to list logical volumes, lvs output: %s",
522
                  result.output)
523
    return []
524

    
525
  def parse_dev(dev):
526
    if '(' in dev:
527
      return dev.split('(')[0]
528
    else:
529
      return dev
530

    
531
  def map_line(line):
532
    return {
533
      'name': line[0].strip(),
534
      'size': line[1].strip(),
535
      'dev': parse_dev(line[2].strip()),
536
      'vg': line[3].strip(),
537
    }
538

    
539
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
540
          if line.count('|') >= 3]
541

    
542

    
543
def BridgesExist(bridges_list):
544
  """Check if a list of bridges exist on the current node.
545

546
  @rtype: boolean
547
  @return: C{True} if all of them exist, C{False} otherwise
548

549
  """
550
  for bridge in bridges_list:
551
    if not utils.BridgeExists(bridge):
552
      return False
553

    
554
  return True
555

    
556

    
557
def GetInstanceList(hypervisor_list):
558
  """Provides a list of instances.
559

560
  @type hypervisor_list: list
561
  @param hypervisor_list: the list of hypervisors to query information
562

563
  @rtype: list
564
  @return: a list of all running instances on the current node
565
    - instance1.example.com
566
    - instance2.example.com
567

568
  """
569
  results = []
570
  for hname in hypervisor_list:
571
    try:
572
      names = hypervisor.GetHypervisor(hname).ListInstances()
573
      results.extend(names)
574
    except errors.HypervisorError, err:
575
      logging.exception("Error enumerating instances for hypevisor %s", hname)
576
      # FIXME: should we somehow not propagate this to the master?
577
      raise
578

    
579
  return results
580

    
581

    
582
def GetInstanceInfo(instance, hname):
583
  """Gives back the informations about an instance as a dictionary.
584

585
  @type instance: string
586
  @param instance: the instance name
587
  @type hname: string
588
  @param hname: the hypervisor type of the instance
589

590
  @rtype: dict
591
  @return: dictionary with the following keys:
592
      - memory: memory size of instance (int)
593
      - state: xen state of instance (string)
594
      - time: cpu time of instance (float)
595

596
  """
597
  output = {}
598

    
599
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
600
  if iinfo is not None:
601
    output['memory'] = iinfo[2]
602
    output['state'] = iinfo[4]
603
    output['time'] = iinfo[5]
604

    
605
  return output
606

    
607

    
608
def GetInstanceMigratable(instance):
609
  """Gives whether an instance can be migrated.
610

611
  @type instance: L{objects.Instance}
612
  @param instance: object representing the instance to be checked.
613

614
  @rtype: tuple
615
  @return: tuple of (result, description) where:
616
      - result: whether the instance can be migrated or not
617
      - description: a description of the issue, if relevant
618

619
  """
620
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
621
  if instance.name not in hyper.ListInstances():
622
    return (False, 'not running')
623

    
624
  for idx in range(len(instance.disks)):
625
    link_name = _GetBlockDevSymlinkPath(instance.name, idx)
626
    if not os.path.islink(link_name):
627
      return (False, 'not restarted since ganeti 1.2.5')
628

    
629
  return (True, '')
630

    
631

    
632
def GetAllInstancesInfo(hypervisor_list):
633
  """Gather data about all instances.
634

635
  This is the equivalent of L{GetInstanceInfo}, except that it
636
  computes data for all instances at once, thus being faster if one
637
  needs data about more than one instance.
638

639
  @type hypervisor_list: list
640
  @param hypervisor_list: list of hypervisors to query for instance data
641

642
  @rtype: dict
643
  @return: dictionary of instance: data, with data having the following keys:
644
      - memory: memory size of instance (int)
645
      - state: xen state of instance (string)
646
      - time: cpu time of instance (float)
647
      - vcpus: the number of vcpus
648

649
  """
650
  output = {}
651

    
652
  for hname in hypervisor_list:
653
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
654
    if iinfo:
655
      for name, inst_id, memory, vcpus, state, times in iinfo:
656
        value = {
657
          'memory': memory,
658
          'vcpus': vcpus,
659
          'state': state,
660
          'time': times,
661
          }
662
        if name in output and output[name] != value:
663
          raise errors.HypervisorError("Instance %s running duplicate"
664
                                       " with different parameters" % name)
665
        output[name] = value
666

    
667
  return output
668

    
669

    
670
def AddOSToInstance(instance):
671
  """Add an OS to an instance.
672

673
  @type instance: L{objects.Instance}
674
  @param instance: Instance whose OS is to be installed
675
  @rtype: boolean
676
  @return: the success of the operation
677

678
  """
679
  inst_os = OSFromDisk(instance.os)
680

    
681
  create_env = OSEnvironment(instance)
682

    
683
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
684
                                     instance.name, int(time.time()))
685

    
686
  result = utils.RunCmd([inst_os.create_script], env=create_env,
687
                        cwd=inst_os.path, output=logfile,)
688
  if result.failed:
689
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
690
                  " output: %s", result.cmd, result.fail_reason, logfile,
691
                  result.output)
692
    lines = [val.encode("string_escape")
693
             for val in utils.TailFile(logfile, lines=20)]
694
    return (False, "OS create script failed (%s), last lines in the"
695
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
696

    
697
  return (True, "Successfully installed")
698

    
699

    
700
def RunRenameInstance(instance, old_name):
701
  """Run the OS rename script for an instance.
702

703
  @type instance: L{objects.Instance}
704
  @param instance: Instance whose OS is to be installed
705
  @type old_name: string
706
  @param old_name: previous instance name
707
  @rtype: boolean
708
  @return: the success of the operation
709

710
  """
711
  inst_os = OSFromDisk(instance.os)
712

    
713
  rename_env = OSEnvironment(instance)
714
  rename_env['OLD_INSTANCE_NAME'] = old_name
715

    
716
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
717
                                           old_name,
718
                                           instance.name, int(time.time()))
719

    
720
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
721
                        cwd=inst_os.path, output=logfile)
722

    
723
  if result.failed:
724
    logging.error("os create command '%s' returned error: %s output: %s",
725
                  result.cmd, result.fail_reason, result.output)
726
    lines = [val.encode("string_escape")
727
             for val in utils.TailFile(logfile, lines=20)]
728
    return (False, "OS rename script failed (%s), last lines in the"
729
            " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
730

    
731
  return (True, "Rename successful")
732

    
733

    
734
def _GetVGInfo(vg_name):
735
  """Get informations about the volume group.
736

737
  @type vg_name: str
738
  @param vg_name: the volume group which we query
739
  @rtype: dict
740
  @return:
741
    A dictionary with the following keys:
742
      - C{vg_size} is the total size of the volume group in MiB
743
      - C{vg_free} is the free size of the volume group in MiB
744
      - C{pv_count} are the number of physical disks in that VG
745

746
    If an error occurs during gathering of data, we return the same dict
747
    with keys all set to None.
748

749
  """
750
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
751

    
752
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
753
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
754

    
755
  if retval.failed:
756
    logging.error("volume group %s not present", vg_name)
757
    return retdic
758
  valarr = retval.stdout.strip().rstrip(':').split(':')
759
  if len(valarr) == 3:
760
    try:
761
      retdic = {
762
        "vg_size": int(round(float(valarr[0]), 0)),
763
        "vg_free": int(round(float(valarr[1]), 0)),
764
        "pv_count": int(valarr[2]),
765
        }
766
    except ValueError, err:
767
      logging.exception("Fail to parse vgs output")
768
  else:
769
    logging.error("vgs output has the wrong number of fields (expected"
770
                  " three): %s", str(valarr))
771
  return retdic
772

    
773

    
774
def _GetBlockDevSymlinkPath(instance_name, idx):
775
  return os.path.join(constants.DISK_LINKS_DIR,
776
                      "%s:%d" % (instance_name, idx))
777

    
778

    
779
def _SymlinkBlockDev(instance_name, device_path, idx):
780
  """Set up symlinks to a instance's block device.
781

782
  This is an auxiliary function run when an instance is start (on the primary
783
  node) or when an instance is migrated (on the target node).
784

785

786
  @param instance_name: the name of the target instance
787
  @param device_path: path of the physical block device, on the node
788
  @param idx: the disk index
789
  @return: absolute path to the disk's symlink
790

791
  """
792
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
793
  try:
794
    os.symlink(device_path, link_name)
795
  except OSError, err:
796
    if err.errno == errno.EEXIST:
797
      if (not os.path.islink(link_name) or
798
          os.readlink(link_name) != device_path):
799
        os.remove(link_name)
800
        os.symlink(device_path, link_name)
801
    else:
802
      raise
803

    
804
  return link_name
805

    
806

    
807
def _RemoveBlockDevLinks(instance_name, disks):
808
  """Remove the block device symlinks belonging to the given instance.
809

810
  """
811
  for idx, disk in enumerate(disks):
812
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
813
    if os.path.islink(link_name):
814
      try:
815
        os.remove(link_name)
816
      except OSError:
817
        logging.exception("Can't remove symlink '%s'", link_name)
818

    
819

    
820
def _GatherAndLinkBlockDevs(instance):
821
  """Set up an instance's block device(s).
822

823
  This is run on the primary node at instance startup. The block
824
  devices must be already assembled.
825

826
  @type instance: L{objects.Instance}
827
  @param instance: the instance whose disks we shoul assemble
828
  @rtype: list
829
  @return: list of (disk_object, device_path)
830

831
  """
832
  block_devices = []
833
  for idx, disk in enumerate(instance.disks):
834
    device = _RecursiveFindBD(disk)
835
    if device is None:
836
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
837
                                    str(disk))
838
    device.Open()
839
    try:
840
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
841
    except OSError, e:
842
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
843
                                    e.strerror)
844

    
845
    block_devices.append((disk, link_name))
846

    
847
  return block_devices
848

    
849

    
850
def StartInstance(instance, extra_args):
851
  """Start an instance.
852

853
  @type instance: L{objects.Instance}
854
  @param instance: the instance object
855
  @rtype: boolean
856
  @return: whether the startup was successful or not
857

858
  """
859
  running_instances = GetInstanceList([instance.hypervisor])
860

    
861
  if instance.name in running_instances:
862
    return (True, "Already running")
863

    
864
  try:
865
    block_devices = _GatherAndLinkBlockDevs(instance)
866
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
867
    hyper.StartInstance(instance, block_devices, extra_args)
868
  except errors.BlockDeviceError, err:
869
    logging.exception("Failed to start instance")
870
    return (False, "Block device error: %s" % str(err))
871
  except errors.HypervisorError, err:
872
    logging.exception("Failed to start instance")
873
    _RemoveBlockDevLinks(instance.name, instance.disks)
874
    return (False, "Hypervisor error: %s" % str(err))
875

    
876
  return (True, "Instance started successfully")
877

    
878

    
879
def ShutdownInstance(instance):
880
  """Shut an instance down.
881

882
  @note: this functions uses polling with a hardcoded timeout.
883

884
  @type instance: L{objects.Instance}
885
  @param instance: the instance object
886
  @rtype: boolean
887
  @return: whether the startup was successful or not
888

889
  """
890
  hv_name = instance.hypervisor
891
  running_instances = GetInstanceList([hv_name])
892

    
893
  if instance.name not in running_instances:
894
    return True
895

    
896
  hyper = hypervisor.GetHypervisor(hv_name)
897
  try:
898
    hyper.StopInstance(instance)
899
  except errors.HypervisorError, err:
900
    logging.error("Failed to stop instance: %s" % err)
901
    return False
902

    
903
  # test every 10secs for 2min
904

    
905
  time.sleep(1)
906
  for dummy in range(11):
907
    if instance.name not in GetInstanceList([hv_name]):
908
      break
909
    time.sleep(10)
910
  else:
911
    # the shutdown did not succeed
912
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
913

    
914
    try:
915
      hyper.StopInstance(instance, force=True)
916
    except errors.HypervisorError, err:
917
      logging.exception("Failed to stop instance: %s" % err)
918
      return False
919

    
920
    time.sleep(1)
921
    if instance.name in GetInstanceList([hv_name]):
922
      logging.error("could not shutdown instance '%s' even by destroy",
923
                    instance.name)
924
      return False
925

    
926
  _RemoveBlockDevLinks(instance.name, instance.disks)
927

    
928
  return True
929

    
930

    
931
def RebootInstance(instance, reboot_type, extra_args):
932
  """Reboot an instance.
933

934
  @type instance: L{objects.Instance}
935
  @param instance: the instance object to reboot
936
  @type reboot_type: str
937
  @param reboot_type: the type of reboot, one the following
938
    constants:
939
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
940
        instance OS, do not recreate the VM
941
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
942
        restart the VM (at the hypervisor level)
943
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
944
        is not accepted here, since that mode is handled
945
        differently
946
  @rtype: boolean
947
  @return: the success of the operation
948

949
  """
950
  running_instances = GetInstanceList([instance.hypervisor])
951

    
952
  if instance.name not in running_instances:
953
    logging.error("Cannot reboot instance that is not running")
954
    return False
955

    
956
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
957
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
958
    try:
959
      hyper.RebootInstance(instance)
960
    except errors.HypervisorError, err:
961
      logging.exception("Failed to soft reboot instance")
962
      return False
963
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
964
    try:
965
      ShutdownInstance(instance)
966
      StartInstance(instance, extra_args)
967
    except errors.HypervisorError, err:
968
      logging.exception("Failed to hard reboot instance")
969
      return False
970
  else:
971
    raise errors.ParameterError("reboot_type invalid")
972

    
973
  return True
974

    
975

    
976
def MigrateInstance(instance, target, live):
977
  """Migrates an instance to another node.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance definition
981
  @type target: string
982
  @param target: the target node name
983
  @type live: boolean
984
  @param live: whether the migration should be done live or not (the
985
      interpretation of this parameter is left to the hypervisor)
986
  @rtype: tuple
987
  @return: a tuple of (success, msg) where:
988
      - succes is a boolean denoting the success/failure of the operation
989
      - msg is a string with details in case of failure
990

991
  """
992
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
993

    
994
  try:
995
    hyper.MigrateInstance(instance.name, target, live)
996
  except errors.HypervisorError, err:
997
    msg = "Failed to migrate instance"
998
    logging.exception(msg)
999
    return (False, "%s: %s" % (msg, err))
1000
  return (True, "Migration successfull")
1001

    
1002

    
1003
def CreateBlockDevice(disk, size, owner, on_primary, info):
1004
  """Creates a block device for an instance.
1005

1006
  @type disk: L{objects.Disk}
1007
  @param disk: the object describing the disk we should create
1008
  @type size: int
1009
  @param size: the size of the physical underlying device, in MiB
1010
  @type owner: str
1011
  @param owner: the name of the instance for which disk is created,
1012
      used for device cache data
1013
  @type on_primary: boolean
1014
  @param on_primary:  indicates if it is the primary node or not
1015
  @type info: string
1016
  @param info: string that will be sent to the physical device
1017
      creation, used for example to set (LVM) tags on LVs
1018

1019
  @return: the new unique_id of the device (this can sometime be
1020
      computed only after creation), or None. On secondary nodes,
1021
      it's not required to return anything.
1022

1023
  """
1024
  clist = []
1025
  if disk.children:
1026
    for child in disk.children:
1027
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
1028
      if on_primary or disk.AssembleOnSecondary():
1029
        # we need the children open in case the device itself has to
1030
        # be assembled
1031
        crdev.Open()
1032
      clist.append(crdev)
1033

    
1034
  try:
1035
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1036
  except errors.GenericError, err:
1037
    return False, "Can't create block device: %s" % str(err)
1038

    
1039
  if on_primary or disk.AssembleOnSecondary():
1040
    if not device.Assemble():
1041
      errorstring = "Can't assemble device after creation, very unusual event"
1042
      logging.error(errorstring)
1043
      return False, errorstring
1044
    device.SetSyncSpeed(constants.SYNC_SPEED)
1045
    if on_primary or disk.OpenOnSecondary():
1046
      device.Open(force=True)
1047
    DevCacheManager.UpdateCache(device.dev_path, owner,
1048
                                on_primary, disk.iv_name)
1049

    
1050
  device.SetInfo(info)
1051

    
1052
  physical_id = device.unique_id
1053
  return True, physical_id
1054

    
1055

    
1056
def RemoveBlockDevice(disk):
1057
  """Remove a block device.
1058

1059
  @note: This is intended to be called recursively.
1060

1061
  @type disk: L{objects.Disk}
1062
  @param disk: the disk object we should remove
1063
  @rtype: boolean
1064
  @return: the success of the operation
1065

1066
  """
1067
  try:
1068
    rdev = _RecursiveFindBD(disk)
1069
  except errors.BlockDeviceError, err:
1070
    # probably can't attach
1071
    logging.info("Can't attach to device %s in remove", disk)
1072
    rdev = None
1073
  if rdev is not None:
1074
    r_path = rdev.dev_path
1075
    result = rdev.Remove()
1076
    if result:
1077
      DevCacheManager.RemoveCache(r_path)
1078
  else:
1079
    result = True
1080
  if disk.children:
1081
    for child in disk.children:
1082
      result = result and RemoveBlockDevice(child)
1083
  return result
1084

    
1085

    
1086
def _RecursiveAssembleBD(disk, owner, as_primary):
1087
  """Activate a block device for an instance.
1088

1089
  This is run on the primary and secondary nodes for an instance.
1090

1091
  @note: this function is called recursively.
1092

1093
  @type disk: L{objects.Disk}
1094
  @param disk: the disk we try to assemble
1095
  @type owner: str
1096
  @param owner: the name of the instance which owns the disk
1097
  @type as_primary: boolean
1098
  @param as_primary: if we should make the block device
1099
      read/write
1100

1101
  @return: the assembled device or None (in case no device
1102
      was assembled)
1103
  @raise errors.BlockDeviceError: in case there is an error
1104
      during the activation of the children or the device
1105
      itself
1106

1107
  """
1108
  children = []
1109
  if disk.children:
1110
    mcn = disk.ChildrenNeeded()
1111
    if mcn == -1:
1112
      mcn = 0 # max number of Nones allowed
1113
    else:
1114
      mcn = len(disk.children) - mcn # max number of Nones
1115
    for chld_disk in disk.children:
1116
      try:
1117
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1118
      except errors.BlockDeviceError, err:
1119
        if children.count(None) >= mcn:
1120
          raise
1121
        cdev = None
1122
        logging.debug("Error in child activation: %s", str(err))
1123
      children.append(cdev)
1124

    
1125
  if as_primary or disk.AssembleOnSecondary():
1126
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1127
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1128
    result = r_dev
1129
    if as_primary or disk.OpenOnSecondary():
1130
      r_dev.Open()
1131
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1132
                                as_primary, disk.iv_name)
1133

    
1134
  else:
1135
    result = True
1136
  return result
1137

    
1138

    
1139
def AssembleBlockDevice(disk, owner, as_primary):
1140
  """Activate a block device for an instance.
1141

1142
  This is a wrapper over _RecursiveAssembleBD.
1143

1144
  @rtype: str or boolean
1145
  @return: a C{/dev/...} path for primary nodes, and
1146
      C{True} for secondary nodes
1147

1148
  """
1149
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1150
  if isinstance(result, bdev.BlockDev):
1151
    result = result.dev_path
1152
  return result
1153

    
1154

    
1155
def ShutdownBlockDevice(disk):
1156
  """Shut down a block device.
1157

1158
  First, if the device is assembled (Attach() is successfull), then
1159
  the device is shutdown. Then the children of the device are
1160
  shutdown.
1161

1162
  This function is called recursively. Note that we don't cache the
1163
  children or such, as oppossed to assemble, shutdown of different
1164
  devices doesn't require that the upper device was active.
1165

1166
  @type disk: L{objects.Disk}
1167
  @param disk: the description of the disk we should
1168
      shutdown
1169
  @rtype: boolean
1170
  @return: the success of the operation
1171

1172
  """
1173
  r_dev = _RecursiveFindBD(disk)
1174
  if r_dev is not None:
1175
    r_path = r_dev.dev_path
1176
    result = r_dev.Shutdown()
1177
    if result:
1178
      DevCacheManager.RemoveCache(r_path)
1179
  else:
1180
    result = True
1181
  if disk.children:
1182
    for child in disk.children:
1183
      result = result and ShutdownBlockDevice(child)
1184
  return result
1185

    
1186

    
1187
def MirrorAddChildren(parent_cdev, new_cdevs):
1188
  """Extend a mirrored block device.
1189

1190
  @type parent_cdev: L{objects.Disk}
1191
  @param parent_cdev: the disk to which we should add children
1192
  @type new_cdevs: list of L{objects.Disk}
1193
  @param new_cdevs: the list of children which we should add
1194
  @rtype: boolean
1195
  @return: the success of the operation
1196

1197
  """
1198
  parent_bdev = _RecursiveFindBD(parent_cdev)
1199
  if parent_bdev is None:
1200
    logging.error("Can't find parent device")
1201
    return False
1202
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1203
  if new_bdevs.count(None) > 0:
1204
    logging.error("Can't find new device(s) to add: %s:%s",
1205
                  new_bdevs, new_cdevs)
1206
    return False
1207
  parent_bdev.AddChildren(new_bdevs)
1208
  return True
1209

    
1210

    
1211
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1212
  """Shrink a mirrored block device.
1213

1214
  @type parent_cdev: L{objects.Disk}
1215
  @param parent_cdev: the disk from which we should remove children
1216
  @type new_cdevs: list of L{objects.Disk}
1217
  @param new_cdevs: the list of children which we should remove
1218
  @rtype: boolean
1219
  @return: the success of the operation
1220

1221
  """
1222
  parent_bdev = _RecursiveFindBD(parent_cdev)
1223
  if parent_bdev is None:
1224
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1225
    return False
1226
  devs = []
1227
  for disk in new_cdevs:
1228
    rpath = disk.StaticDevPath()
1229
    if rpath is None:
1230
      bd = _RecursiveFindBD(disk)
1231
      if bd is None:
1232
        logging.error("Can't find dynamic device %s while removing children",
1233
                      disk)
1234
        return False
1235
      else:
1236
        devs.append(bd.dev_path)
1237
    else:
1238
      devs.append(rpath)
1239
  parent_bdev.RemoveChildren(devs)
1240
  return True
1241

    
1242

    
1243
def GetMirrorStatus(disks):
1244
  """Get the mirroring status of a list of devices.
1245

1246
  @type disks: list of L{objects.Disk}
1247
  @param disks: the list of disks which we should query
1248
  @rtype: disk
1249
  @return:
1250
      a list of (mirror_done, estimated_time) tuples, which
1251
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1252
  @raise errors.BlockDeviceError: if any of the disks cannot be
1253
      found
1254

1255
  """
1256
  stats = []
1257
  for dsk in disks:
1258
    rbd = _RecursiveFindBD(dsk)
1259
    if rbd is None:
1260
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1261
    stats.append(rbd.CombinedSyncStatus())
1262
  return stats
1263

    
1264

    
1265
def _RecursiveFindBD(disk):
1266
  """Check if a device is activated.
1267

1268
  If so, return informations about the real device.
1269

1270
  @type disk: L{objects.Disk}
1271
  @param disk: the disk object we need to find
1272

1273
  @return: None if the device can't be found,
1274
      otherwise the device instance
1275

1276
  """
1277
  children = []
1278
  if disk.children:
1279
    for chdisk in disk.children:
1280
      children.append(_RecursiveFindBD(chdisk))
1281

    
1282
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1283

    
1284

    
1285
def FindBlockDevice(disk):
1286
  """Check if a device is activated.
1287

1288
  If it is, return informations about the real device.
1289

1290
  @type disk: L{objects.Disk}
1291
  @param disk: the disk to find
1292
  @rtype: None or tuple
1293
  @return: None if the disk cannot be found, otherwise a
1294
      tuple (device_path, major, minor, sync_percent,
1295
      estimated_time, is_degraded)
1296

1297
  """
1298
  rbd = _RecursiveFindBD(disk)
1299
  if rbd is None:
1300
    return rbd
1301
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1302

    
1303

    
1304
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1305
  """Write a file to the filesystem.
1306

1307
  This allows the master to overwrite(!) a file. It will only perform
1308
  the operation if the file belongs to a list of configuration files.
1309

1310
  @type file_name: str
1311
  @param file_name: the target file name
1312
  @type data: str
1313
  @param data: the new contents of the file
1314
  @type mode: int
1315
  @param mode: the mode to give the file (can be None)
1316
  @type uid: int
1317
  @param uid: the owner of the file (can be -1 for default)
1318
  @type gid: int
1319
  @param gid: the group of the file (can be -1 for default)
1320
  @type atime: float
1321
  @param atime: the atime to set on the file (can be None)
1322
  @type mtime: float
1323
  @param mtime: the mtime to set on the file (can be None)
1324
  @rtype: boolean
1325
  @return: the success of the operation; errors are logged
1326
      in the node daemon log
1327

1328
  """
1329
  if not os.path.isabs(file_name):
1330
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1331
                  file_name)
1332
    return False
1333

    
1334
  allowed_files = [
1335
    constants.CLUSTER_CONF_FILE,
1336
    constants.ETC_HOSTS,
1337
    constants.SSH_KNOWN_HOSTS_FILE,
1338
    constants.VNC_PASSWORD_FILE,
1339
    ]
1340

    
1341
  if file_name not in allowed_files:
1342
    logging.error("Filename passed to UploadFile not in allowed"
1343
                 " upload targets: '%s'", file_name)
1344
    return False
1345

    
1346
  raw_data = _Decompress(data)
1347

    
1348
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1349
                  atime=atime, mtime=mtime)
1350
  return True
1351

    
1352

    
1353
def WriteSsconfFiles(values):
1354
  """Update all ssconf files.
1355

1356
  Wrapper around the SimpleStore.WriteFiles.
1357

1358
  """
1359
  ssconf.SimpleStore().WriteFiles(values)
1360

    
1361

    
1362
def _ErrnoOrStr(err):
1363
  """Format an EnvironmentError exception.
1364

1365
  If the L{err} argument has an errno attribute, it will be looked up
1366
  and converted into a textual C{E...} description. Otherwise the
1367
  string representation of the error will be returned.
1368

1369
  @type err: L{EnvironmentError}
1370
  @param err: the exception to format
1371

1372
  """
1373
  if hasattr(err, 'errno'):
1374
    detail = errno.errorcode[err.errno]
1375
  else:
1376
    detail = str(err)
1377
  return detail
1378

    
1379

    
1380
def _OSOndiskVersion(name, os_dir):
1381
  """Compute and return the API version of a given OS.
1382

1383
  This function will try to read the API version of the OS given by
1384
  the 'name' parameter and residing in the 'os_dir' directory.
1385

1386
  @type name: str
1387
  @param name: the OS name we should look for
1388
  @type os_dir: str
1389
  @param os_dir: the directory inwhich we should look for the OS
1390
  @rtype: int or None
1391
  @return:
1392
      Either an integer denoting the version or None in the
1393
      case when this is not a valid OS name.
1394
  @raise errors.InvalidOS: if the OS cannot be found
1395

1396
  """
1397
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1398

    
1399
  try:
1400
    st = os.stat(api_file)
1401
  except EnvironmentError, err:
1402
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1403
                           " found (%s)" % _ErrnoOrStr(err))
1404

    
1405
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1406
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1407
                           " a regular file")
1408

    
1409
  try:
1410
    f = open(api_file)
1411
    try:
1412
      api_versions = f.readlines()
1413
    finally:
1414
      f.close()
1415
  except EnvironmentError, err:
1416
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1417
                           " API version (%s)" % _ErrnoOrStr(err))
1418

    
1419
  api_versions = [version.strip() for version in api_versions]
1420
  try:
1421
    api_versions = [int(version) for version in api_versions]
1422
  except (TypeError, ValueError), err:
1423
    raise errors.InvalidOS(name, os_dir,
1424
                           "API version is not integer (%s)" % str(err))
1425

    
1426
  return api_versions
1427

    
1428

    
1429
def DiagnoseOS(top_dirs=None):
1430
  """Compute the validity for all OSes.
1431

1432
  @type top_dirs: list
1433
  @param top_dirs: the list of directories in which to
1434
      search (if not given defaults to
1435
      L{constants.OS_SEARCH_PATH})
1436
  @rtype: list of L{objects.OS}
1437
  @return: an OS object for each name in all the given
1438
      directories
1439

1440
  """
1441
  if top_dirs is None:
1442
    top_dirs = constants.OS_SEARCH_PATH
1443

    
1444
  result = []
1445
  for dir_name in top_dirs:
1446
    if os.path.isdir(dir_name):
1447
      try:
1448
        f_names = utils.ListVisibleFiles(dir_name)
1449
      except EnvironmentError, err:
1450
        logging.exception("Can't list the OS directory %s", dir_name)
1451
        break
1452
      for name in f_names:
1453
        try:
1454
          os_inst = OSFromDisk(name, base_dir=dir_name)
1455
          result.append(os_inst)
1456
        except errors.InvalidOS, err:
1457
          result.append(objects.OS.FromInvalidOS(err))
1458

    
1459
  return result
1460

    
1461

    
1462
def OSFromDisk(name, base_dir=None):
1463
  """Create an OS instance from disk.
1464

1465
  This function will return an OS instance if the given name is a
1466
  valid OS name. Otherwise, it will raise an appropriate
1467
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1468

1469
  @type base_dir: string
1470
  @keyword base_dir: Base directory containing OS installations.
1471
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1472
  @rtype: L{objects.OS}
1473
  @return: the OS instance if we find a valid one
1474
  @raise errors.InvalidOS: if we don't find a valid OS
1475

1476
  """
1477
  if base_dir is None:
1478
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1479
    if os_dir is None:
1480
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1481
  else:
1482
    os_dir = os.path.sep.join([base_dir, name])
1483

    
1484
  api_versions = _OSOndiskVersion(name, os_dir)
1485

    
1486
  if constants.OS_API_VERSION not in api_versions:
1487
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1488
                           " (found %s want %s)"
1489
                           % (api_versions, constants.OS_API_VERSION))
1490

    
1491
  # OS Scripts dictionary, we will populate it with the actual script names
1492
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1493

    
1494
  for script in os_scripts:
1495
    os_scripts[script] = os.path.sep.join([os_dir, script])
1496

    
1497
    try:
1498
      st = os.stat(os_scripts[script])
1499
    except EnvironmentError, err:
1500
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1501
                             (script, _ErrnoOrStr(err)))
1502

    
1503
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1504
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1505
                             script)
1506

    
1507
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1508
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1509
                             script)
1510

    
1511

    
1512
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1513
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1514
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1515
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1516
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1517
                    api_versions=api_versions)
1518

    
1519
def OSEnvironment(instance, debug=0):
1520
  """Calculate the environment for an os script.
1521

1522
  @type instance: L{objects.Instance}
1523
  @param instance: target instance for the os script run
1524
  @type debug: integer
1525
  @param debug: debug level (0 or 1, for OS Api 10)
1526
  @rtype: dict
1527
  @return: dict of environment variables
1528
  @raise errors.BlockDeviceError: if the block device
1529
      cannot be found
1530

1531
  """
1532
  result = {}
1533
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1534
  result['INSTANCE_NAME'] = instance.name
1535
  result['HYPERVISOR'] = instance.hypervisor
1536
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1537
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1538
  result['DEBUG_LEVEL'] = '%d' % debug
1539
  for idx, disk in enumerate(instance.disks):
1540
    real_disk = _RecursiveFindBD(disk)
1541
    if real_disk is None:
1542
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1543
                                    str(disk))
1544
    real_disk.Open()
1545
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1546
    # FIXME: When disks will have read-only mode, populate this
1547
    result['DISK_%d_ACCESS' % idx] = 'W'
1548
    if constants.HV_DISK_TYPE in instance.hvparams:
1549
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1550
        instance.hvparams[constants.HV_DISK_TYPE]
1551
    if disk.dev_type in constants.LDS_BLOCK:
1552
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1553
    elif disk.dev_type == constants.LD_FILE:
1554
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1555
        'file:%s' % disk.physical_id[0]
1556
  for idx, nic in enumerate(instance.nics):
1557
    result['NIC_%d_MAC' % idx] = nic.mac
1558
    if nic.ip:
1559
      result['NIC_%d_IP' % idx] = nic.ip
1560
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1561
    if constants.HV_NIC_TYPE in instance.hvparams:
1562
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1563
        instance.hvparams[constants.HV_NIC_TYPE]
1564

    
1565
  return result
1566

    
1567
def GrowBlockDevice(disk, amount):
1568
  """Grow a stack of block devices.
1569

1570
  This function is called recursively, with the childrens being the
1571
  first ones to resize.
1572

1573
  @type disk: L{objects.Disk}
1574
  @param disk: the disk to be grown
1575
  @rtype: (status, result)
1576
  @return: a tuple with the status of the operation
1577
      (True/False), and the errors message if status
1578
      is False
1579

1580
  """
1581
  r_dev = _RecursiveFindBD(disk)
1582
  if r_dev is None:
1583
    return False, "Cannot find block device %s" % (disk,)
1584

    
1585
  try:
1586
    r_dev.Grow(amount)
1587
  except errors.BlockDeviceError, err:
1588
    return False, str(err)
1589

    
1590
  return True, None
1591

    
1592

    
1593
def SnapshotBlockDevice(disk):
1594
  """Create a snapshot copy of a block device.
1595

1596
  This function is called recursively, and the snapshot is actually created
1597
  just for the leaf lvm backend device.
1598

1599
  @type disk: L{objects.Disk}
1600
  @param disk: the disk to be snapshotted
1601
  @rtype: string
1602
  @return: snapshot disk path
1603

1604
  """
1605
  if disk.children:
1606
    if len(disk.children) == 1:
1607
      # only one child, let's recurse on it
1608
      return SnapshotBlockDevice(disk.children[0])
1609
    else:
1610
      # more than one child, choose one that matches
1611
      for child in disk.children:
1612
        if child.size == disk.size:
1613
          # return implies breaking the loop
1614
          return SnapshotBlockDevice(child)
1615
  elif disk.dev_type == constants.LD_LV:
1616
    r_dev = _RecursiveFindBD(disk)
1617
    if r_dev is not None:
1618
      # let's stay on the safe side and ask for the full size, for now
1619
      return r_dev.Snapshot(disk.size)
1620
    else:
1621
      return None
1622
  else:
1623
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1624
                                 " '%s' of type '%s'" %
1625
                                 (disk.unique_id, disk.dev_type))
1626

    
1627

    
1628
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1629
  """Export a block device snapshot to a remote node.
1630

1631
  @type disk: L{objects.Disk}
1632
  @param disk: the description of the disk to export
1633
  @type dest_node: str
1634
  @param dest_node: the destination node to export to
1635
  @type instance: L{objects.Instance}
1636
  @param instance: the instance object to whom the disk belongs
1637
  @type cluster_name: str
1638
  @param cluster_name: the cluster name, needed for SSH hostalias
1639
  @type idx: int
1640
  @param idx: the index of the disk in the instance's disk list,
1641
      used to export to the OS scripts environment
1642
  @rtype: boolean
1643
  @return: the success of the operation
1644

1645
  """
1646
  export_env = OSEnvironment(instance)
1647

    
1648
  inst_os = OSFromDisk(instance.os)
1649
  export_script = inst_os.export_script
1650

    
1651
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1652
                                     instance.name, int(time.time()))
1653
  if not os.path.exists(constants.LOG_OS_DIR):
1654
    os.mkdir(constants.LOG_OS_DIR, 0750)
1655
  real_disk = _RecursiveFindBD(disk)
1656
  if real_disk is None:
1657
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1658
                                  str(disk))
1659
  real_disk.Open()
1660

    
1661
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1662
  export_env['EXPORT_INDEX'] = str(idx)
1663

    
1664
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1665
  destfile = disk.physical_id[1]
1666

    
1667
  # the target command is built out of three individual commands,
1668
  # which are joined by pipes; we check each individual command for
1669
  # valid parameters
1670
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1671
                               export_script, logfile)
1672

    
1673
  comprcmd = "gzip"
1674

    
1675
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1676
                                destdir, destdir, destfile)
1677
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1678
                                                   constants.GANETI_RUNAS,
1679
                                                   destcmd)
1680

    
1681
  # all commands have been checked, so we're safe to combine them
1682
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1683

    
1684
  result = utils.RunCmd(command, env=export_env)
1685

    
1686
  if result.failed:
1687
    logging.error("os snapshot export command '%s' returned error: %s"
1688
                  " output: %s", command, result.fail_reason, result.output)
1689
    return False
1690

    
1691
  return True
1692

    
1693

    
1694
def FinalizeExport(instance, snap_disks):
1695
  """Write out the export configuration information.
1696

1697
  @type instance: L{objects.Instance}
1698
  @param instance: the instance which we export, used for
1699
      saving configuration
1700
  @type snap_disks: list of L{objects.Disk}
1701
  @param snap_disks: list of snapshot block devices, which
1702
      will be used to get the actual name of the dump file
1703

1704
  @rtype: boolean
1705
  @return: the success of the operation
1706

1707
  """
1708
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1709
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1710

    
1711
  config = objects.SerializableConfigParser()
1712

    
1713
  config.add_section(constants.INISECT_EXP)
1714
  config.set(constants.INISECT_EXP, 'version', '0')
1715
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1716
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1717
  config.set(constants.INISECT_EXP, 'os', instance.os)
1718
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1719

    
1720
  config.add_section(constants.INISECT_INS)
1721
  config.set(constants.INISECT_INS, 'name', instance.name)
1722
  config.set(constants.INISECT_INS, 'memory', '%d' %
1723
             instance.beparams[constants.BE_MEMORY])
1724
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1725
             instance.beparams[constants.BE_VCPUS])
1726
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1727

    
1728
  nic_total = 0
1729
  for nic_count, nic in enumerate(instance.nics):
1730
    nic_total += 1
1731
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1732
               nic_count, '%s' % nic.mac)
1733
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1734
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1735
               '%s' % nic.bridge)
1736
  # TODO: redundant: on load can read nics until it doesn't exist
1737
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1738

    
1739
  disk_total = 0
1740
  for disk_count, disk in enumerate(snap_disks):
1741
    if disk:
1742
      disk_total += 1
1743
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1744
                 ('%s' % disk.iv_name))
1745
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1746
                 ('%s' % disk.physical_id[1]))
1747
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1748
                 ('%d' % disk.size))
1749

    
1750
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1751

    
1752
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1753
                  data=config.Dumps())
1754
  shutil.rmtree(finaldestdir, True)
1755
  shutil.move(destdir, finaldestdir)
1756

    
1757
  return True
1758

    
1759

    
1760
def ExportInfo(dest):
1761
  """Get export configuration information.
1762

1763
  @type dest: str
1764
  @param dest: directory containing the export
1765

1766
  @rtype: L{objects.SerializableConfigParser}
1767
  @return: a serializable config file containing the
1768
      export info
1769

1770
  """
1771
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1772

    
1773
  config = objects.SerializableConfigParser()
1774
  config.read(cff)
1775

    
1776
  if (not config.has_section(constants.INISECT_EXP) or
1777
      not config.has_section(constants.INISECT_INS)):
1778
    return None
1779

    
1780
  return config
1781

    
1782

    
1783
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1784
  """Import an os image into an instance.
1785

1786
  @type instance: L{objects.Instance}
1787
  @param instance: instance to import the disks into
1788
  @type src_node: string
1789
  @param src_node: source node for the disk images
1790
  @type src_images: list of string
1791
  @param src_images: absolute paths of the disk images
1792
  @rtype: list of boolean
1793
  @return: each boolean represent the success of importing the n-th disk
1794

1795
  """
1796
  import_env = OSEnvironment(instance)
1797
  inst_os = OSFromDisk(instance.os)
1798
  import_script = inst_os.import_script
1799

    
1800
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1801
                                        instance.name, int(time.time()))
1802
  if not os.path.exists(constants.LOG_OS_DIR):
1803
    os.mkdir(constants.LOG_OS_DIR, 0750)
1804

    
1805
  comprcmd = "gunzip"
1806
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1807
                               import_script, logfile)
1808

    
1809
  final_result = []
1810
  for idx, image in enumerate(src_images):
1811
    if image:
1812
      destcmd = utils.BuildShellCmd('cat %s', image)
1813
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1814
                                                       constants.GANETI_RUNAS,
1815
                                                       destcmd)
1816
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1817
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1818
      import_env['IMPORT_INDEX'] = str(idx)
1819
      result = utils.RunCmd(command, env=import_env)
1820
      if result.failed:
1821
        logging.error("Disk import command '%s' returned error: %s"
1822
                      " output: %s", command, result.fail_reason,
1823
                      result.output)
1824
        final_result.append(False)
1825
      else:
1826
        final_result.append(True)
1827
    else:
1828
      final_result.append(True)
1829

    
1830
  return final_result
1831

    
1832

    
1833
def ListExports():
1834
  """Return a list of exports currently available on this machine.
1835

1836
  @rtype: list
1837
  @return: list of the exports
1838

1839
  """
1840
  if os.path.isdir(constants.EXPORT_DIR):
1841
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1842
  else:
1843
    return []
1844

    
1845

    
1846
def RemoveExport(export):
1847
  """Remove an existing export from the node.
1848

1849
  @type export: str
1850
  @param export: the name of the export to remove
1851
  @rtype: boolean
1852
  @return: the success of the operation
1853

1854
  """
1855
  target = os.path.join(constants.EXPORT_DIR, export)
1856

    
1857
  shutil.rmtree(target)
1858
  # TODO: catch some of the relevant exceptions and provide a pretty
1859
  # error message if rmtree fails.
1860

    
1861
  return True
1862

    
1863

    
1864
def RenameBlockDevices(devlist):
1865
  """Rename a list of block devices.
1866

1867
  @type devlist: list of tuples
1868
  @param devlist: list of tuples of the form  (disk,
1869
      new_logical_id, new_physical_id); disk is an
1870
      L{objects.Disk} object describing the current disk,
1871
      and new logical_id/physical_id is the name we
1872
      rename it to
1873
  @rtype: boolean
1874
  @return: True if all renames succeeded, False otherwise
1875

1876
  """
1877
  result = True
1878
  for disk, unique_id in devlist:
1879
    dev = _RecursiveFindBD(disk)
1880
    if dev is None:
1881
      result = False
1882
      continue
1883
    try:
1884
      old_rpath = dev.dev_path
1885
      dev.Rename(unique_id)
1886
      new_rpath = dev.dev_path
1887
      if old_rpath != new_rpath:
1888
        DevCacheManager.RemoveCache(old_rpath)
1889
        # FIXME: we should add the new cache information here, like:
1890
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1891
        # but we don't have the owner here - maybe parse from existing
1892
        # cache? for now, we only lose lvm data when we rename, which
1893
        # is less critical than DRBD or MD
1894
    except errors.BlockDeviceError, err:
1895
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1896
      result = False
1897
  return result
1898

    
1899

    
1900
def _TransformFileStorageDir(file_storage_dir):
1901
  """Checks whether given file_storage_dir is valid.
1902

1903
  Checks wheter the given file_storage_dir is within the cluster-wide
1904
  default file_storage_dir stored in SimpleStore. Only paths under that
1905
  directory are allowed.
1906

1907
  @type file_storage_dir: str
1908
  @param file_storage_dir: the path to check
1909

1910
  @return: the normalized path if valid, None otherwise
1911

1912
  """
1913
  cfg = _GetConfig()
1914
  file_storage_dir = os.path.normpath(file_storage_dir)
1915
  base_file_storage_dir = cfg.GetFileStorageDir()
1916
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1917
      base_file_storage_dir):
1918
    logging.error("file storage directory '%s' is not under base file"
1919
                  " storage directory '%s'",
1920
                  file_storage_dir, base_file_storage_dir)
1921
    return None
1922
  return file_storage_dir
1923

    
1924

    
1925
def CreateFileStorageDir(file_storage_dir):
1926
  """Create file storage directory.
1927

1928
  @type file_storage_dir: str
1929
  @param file_storage_dir: directory to create
1930

1931
  @rtype: tuple
1932
  @return: tuple with first element a boolean indicating wheter dir
1933
      creation was successful or not
1934

1935
  """
1936
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1937
  result = True,
1938
  if not file_storage_dir:
1939
    result = False,
1940
  else:
1941
    if os.path.exists(file_storage_dir):
1942
      if not os.path.isdir(file_storage_dir):
1943
        logging.error("'%s' is not a directory", file_storage_dir)
1944
        result = False,
1945
    else:
1946
      try:
1947
        os.makedirs(file_storage_dir, 0750)
1948
      except OSError, err:
1949
        logging.error("Cannot create file storage directory '%s': %s",
1950
                      file_storage_dir, err)
1951
        result = False,
1952
  return result
1953

    
1954

    
1955
def RemoveFileStorageDir(file_storage_dir):
1956
  """Remove file storage directory.
1957

1958
  Remove it only if it's empty. If not log an error and return.
1959

1960
  @type file_storage_dir: str
1961
  @param file_storage_dir: the directory we should cleanup
1962
  @rtype: tuple (success,)
1963
  @return: tuple of one element, C{success}, denoting
1964
      whether the operation was successfull
1965

1966
  """
1967
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1968
  result = True,
1969
  if not file_storage_dir:
1970
    result = False,
1971
  else:
1972
    if os.path.exists(file_storage_dir):
1973
      if not os.path.isdir(file_storage_dir):
1974
        logging.error("'%s' is not a directory", file_storage_dir)
1975
        result = False,
1976
      # deletes dir only if empty, otherwise we want to return False
1977
      try:
1978
        os.rmdir(file_storage_dir)
1979
      except OSError, err:
1980
        logging.exception("Cannot remove file storage directory '%s'",
1981
                          file_storage_dir)
1982
        result = False,
1983
  return result
1984

    
1985

    
1986
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1987
  """Rename the file storage directory.
1988

1989
  @type old_file_storage_dir: str
1990
  @param old_file_storage_dir: the current path
1991
  @type new_file_storage_dir: str
1992
  @param new_file_storage_dir: the name we should rename to
1993
  @rtype: tuple (success,)
1994
  @return: tuple of one element, C{success}, denoting
1995
      whether the operation was successful
1996

1997
  """
1998
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1999
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2000
  result = True,
2001
  if not old_file_storage_dir or not new_file_storage_dir:
2002
    result = False,
2003
  else:
2004
    if not os.path.exists(new_file_storage_dir):
2005
      if os.path.isdir(old_file_storage_dir):
2006
        try:
2007
          os.rename(old_file_storage_dir, new_file_storage_dir)
2008
        except OSError, err:
2009
          logging.exception("Cannot rename '%s' to '%s'",
2010
                            old_file_storage_dir, new_file_storage_dir)
2011
          result =  False,
2012
      else:
2013
        logging.error("'%s' is not a directory", old_file_storage_dir)
2014
        result = False,
2015
    else:
2016
      if os.path.exists(old_file_storage_dir):
2017
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2018
                      old_file_storage_dir, new_file_storage_dir)
2019
        result = False,
2020
  return result
2021

    
2022

    
2023
def _IsJobQueueFile(file_name):
2024
  """Checks whether the given filename is in the queue directory.
2025

2026
  @type file_name: str
2027
  @param file_name: the file name we should check
2028
  @rtype: boolean
2029
  @return: whether the file is under the queue directory
2030

2031
  """
2032
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2033
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2034

    
2035
  if not result:
2036
    logging.error("'%s' is not a file in the queue directory",
2037
                  file_name)
2038

    
2039
  return result
2040

    
2041

    
2042
def JobQueueUpdate(file_name, content):
2043
  """Updates a file in the queue directory.
2044

2045
  This is just a wrapper over L{utils.WriteFile}, with proper
2046
  checking.
2047

2048
  @type file_name: str
2049
  @param file_name: the job file name
2050
  @type content: str
2051
  @param content: the new job contents
2052
  @rtype: boolean
2053
  @return: the success of the operation
2054

2055
  """
2056
  if not _IsJobQueueFile(file_name):
2057
    return False
2058

    
2059
  # Write and replace the file atomically
2060
  utils.WriteFile(file_name, data=_Decompress(content))
2061

    
2062
  return True
2063

    
2064

    
2065
def JobQueueRename(old, new):
2066
  """Renames a job queue file.
2067

2068
  This is just a wrapper over os.rename with proper checking.
2069

2070
  @type old: str
2071
  @param old: the old (actual) file name
2072
  @type new: str
2073
  @param new: the desired file name
2074
  @rtype: boolean
2075
  @return: the success of the operation
2076

2077
  """
2078
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2079
    return False
2080

    
2081
  utils.RenameFile(old, new, mkdir=True)
2082

    
2083
  return True
2084

    
2085

    
2086
def JobQueueSetDrainFlag(drain_flag):
2087
  """Set the drain flag for the queue.
2088

2089
  This will set or unset the queue drain flag.
2090

2091
  @type drain_flag: boolean
2092
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2093
  @rtype: boolean
2094
  @return: always True
2095
  @warning: the function always returns True
2096

2097
  """
2098
  if drain_flag:
2099
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2100
  else:
2101
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2102

    
2103
  return True
2104

    
2105

    
2106
def CloseBlockDevices(instance_name, disks):
2107
  """Closes the given block devices.
2108

2109
  This means they will be switched to secondary mode (in case of
2110
  DRBD).
2111

2112
  @param instance_name: if the argument is not empty, the symlinks
2113
      of this instance will be removed
2114
  @type disks: list of L{objects.Disk}
2115
  @param disks: the list of disks to be closed
2116
  @rtype: tuple (success, message)
2117
  @return: a tuple of success and message, where success
2118
      indicates the succes of the operation, and message
2119
      which will contain the error details in case we
2120
      failed
2121

2122
  """
2123
  bdevs = []
2124
  for cf in disks:
2125
    rd = _RecursiveFindBD(cf)
2126
    if rd is None:
2127
      return (False, "Can't find device %s" % cf)
2128
    bdevs.append(rd)
2129

    
2130
  msg = []
2131
  for rd in bdevs:
2132
    try:
2133
      rd.Close()
2134
    except errors.BlockDeviceError, err:
2135
      msg.append(str(err))
2136
  if msg:
2137
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2138
  else:
2139
    if instance_name:
2140
      _RemoveBlockDevLinks(instance_name, disks)
2141
    return (True, "All devices secondary")
2142

    
2143

    
2144
def ValidateHVParams(hvname, hvparams):
2145
  """Validates the given hypervisor parameters.
2146

2147
  @type hvname: string
2148
  @param hvname: the hypervisor name
2149
  @type hvparams: dict
2150
  @param hvparams: the hypervisor parameters to be validated
2151
  @rtype: tuple (success, message)
2152
  @return: a tuple of success and message, where success
2153
      indicates the succes of the operation, and message
2154
      which will contain the error details in case we
2155
      failed
2156

2157
  """
2158
  try:
2159
    hv_type = hypervisor.GetHypervisor(hvname)
2160
    hv_type.ValidateParameters(hvparams)
2161
    return (True, "Validation passed")
2162
  except errors.HypervisorError, err:
2163
    return (False, str(err))
2164

    
2165

    
2166
def DemoteFromMC():
2167
  """Demotes the current node from master candidate role.
2168

2169
  """
2170
  # try to ensure we're not the master by mistake
2171
  master, myself = ssconf.GetMasterAndMyself()
2172
  if master == myself:
2173
    return (False, "ssconf status shows I'm the master node, will not demote")
2174
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2175
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2176
    return (False, "The master daemon is running, will not demote")
2177
  try:
2178
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2179
  except EnvironmentError, err:
2180
    if err.errno != errno.ENOENT:
2181
      return (False, "Error while backing up cluster file: %s" % str(err))
2182
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2183
  return (True, "Done")
2184

    
2185

    
2186
def _FindDisks(nodes_ip, disks):
2187
  """Sets the physical ID on disks and returns the block devices.
2188

2189
  """
2190
  # set the correct physical ID
2191
  my_name = utils.HostInfo().name
2192
  for cf in disks:
2193
    cf.SetPhysicalID(my_name, nodes_ip)
2194

    
2195
  bdevs = []
2196

    
2197
  for cf in disks:
2198
    rd = _RecursiveFindBD(cf)
2199
    if rd is None:
2200
      return (False, "Can't find device %s" % cf)
2201
    bdevs.append(rd)
2202
  return (True, bdevs)
2203

    
2204

    
2205
def DrbdDisconnectNet(nodes_ip, disks):
2206
  """Disconnects the network on a list of drbd devices.
2207

2208
  """
2209
  status, bdevs = _FindDisks(nodes_ip, disks)
2210
  if not status:
2211
    return status, bdevs
2212

    
2213
  # disconnect disks
2214
  for rd in bdevs:
2215
    try:
2216
      rd.DisconnectNet()
2217
    except errors.BlockDeviceError, err:
2218
      logging.exception("Failed to go into standalone mode")
2219
      return (False, "Can't change network configuration: %s" % str(err))
2220
  return (True, "All disks are now disconnected")
2221

    
2222

    
2223
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2224
  """Attaches the network on a list of drbd devices.
2225

2226
  """
2227
  status, bdevs = _FindDisks(nodes_ip, disks)
2228
  if not status:
2229
    return status, bdevs
2230

    
2231
  if multimaster:
2232
    for idx, rd in enumerate(bdevs):
2233
      try:
2234
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2235
      except EnvironmentError, err:
2236
        return (False, "Can't create symlink: %s" % str(err))
2237
  # reconnect disks, switch to new master configuration and if
2238
  # needed primary mode
2239
  for rd in bdevs:
2240
    try:
2241
      rd.AttachNet(multimaster)
2242
    except errors.BlockDeviceError, err:
2243
      return (False, "Can't change network configuration: %s" % str(err))
2244
  # wait until the disks are connected; we need to retry the re-attach
2245
  # if the device becomes standalone, as this might happen if the one
2246
  # node disconnects and reconnects in a different mode before the
2247
  # other node reconnects; in this case, one or both of the nodes will
2248
  # decide it has wrong configuration and switch to standalone
2249
  RECONNECT_TIMEOUT = 2 * 60
2250
  sleep_time = 0.100 # start with 100 miliseconds
2251
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2252
  while time.time() < timeout_limit:
2253
    all_connected = True
2254
    for rd in bdevs:
2255
      stats = rd.GetProcStatus()
2256
      if not (stats.is_connected or stats.is_in_resync):
2257
        all_connected = False
2258
      if stats.is_standalone:
2259
        # peer had different config info and this node became
2260
        # standalone, even though this should not happen with the
2261
        # new staged way of changing disk configs
2262
        try:
2263
          rd.ReAttachNet(multimaster)
2264
        except errors.BlockDeviceError, err:
2265
          return (False, "Can't change network configuration: %s" % str(err))
2266
    if all_connected:
2267
      break
2268
    time.sleep(sleep_time)
2269
    sleep_time = min(5, sleep_time * 1.5)
2270
  if not all_connected:
2271
    return (False, "Timeout in disk reconnecting")
2272
  if multimaster:
2273
    # change to primary mode
2274
    for rd in bdevs:
2275
      rd.Open()
2276
  if multimaster:
2277
    msg = "multi-master and primary"
2278
  else:
2279
    msg = "single-master"
2280
  return (True, "Disks are now configured as %s" % msg)
2281

    
2282

    
2283
def DrbdWaitSync(nodes_ip, disks):
2284
  """Wait until DRBDs have synchronized.
2285

2286
  """
2287
  status, bdevs = _FindDisks(nodes_ip, disks)
2288
  if not status:
2289
    return status, bdevs
2290

    
2291
  min_resync = 100
2292
  alldone = True
2293
  failure = False
2294
  for rd in bdevs:
2295
    stats = rd.GetProcStatus()
2296
    if not (stats.is_connected or stats.is_in_resync):
2297
      failure = True
2298
      break
2299
    alldone = alldone and (not stats.is_in_resync)
2300
    if stats.sync_percent is not None:
2301
      min_resync = min(min_resync, stats.sync_percent)
2302
  return (not failure, (alldone, min_resync))
2303

    
2304

    
2305
class HooksRunner(object):
2306
  """Hook runner.
2307

2308
  This class is instantiated on the node side (ganeti-noded) and not
2309
  on the master side.
2310

2311
  """
2312
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2313

    
2314
  def __init__(self, hooks_base_dir=None):
2315
    """Constructor for hooks runner.
2316

2317
    @type hooks_base_dir: str or None
2318
    @param hooks_base_dir: if not None, this overrides the
2319
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2320

2321
    """
2322
    if hooks_base_dir is None:
2323
      hooks_base_dir = constants.HOOKS_BASE_DIR
2324
    self._BASE_DIR = hooks_base_dir
2325

    
2326
  @staticmethod
2327
  def ExecHook(script, env):
2328
    """Exec one hook script.
2329

2330
    @type script: str
2331
    @param script: the full path to the script
2332
    @type env: dict
2333
    @param env: the environment with which to exec the script
2334
    @rtype: tuple (success, message)
2335
    @return: a tuple of success and message, where success
2336
        indicates the succes of the operation, and message
2337
        which will contain the error details in case we
2338
        failed
2339

2340
    """
2341
    # exec the process using subprocess and log the output
2342
    fdstdin = None
2343
    try:
2344
      fdstdin = open("/dev/null", "r")
2345
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2346
                               stderr=subprocess.STDOUT, close_fds=True,
2347
                               shell=False, cwd="/", env=env)
2348
      output = ""
2349
      try:
2350
        output = child.stdout.read(4096)
2351
        child.stdout.close()
2352
      except EnvironmentError, err:
2353
        output += "Hook script error: %s" % str(err)
2354

    
2355
      while True:
2356
        try:
2357
          result = child.wait()
2358
          break
2359
        except EnvironmentError, err:
2360
          if err.errno == errno.EINTR:
2361
            continue
2362
          raise
2363
    finally:
2364
      # try not to leak fds
2365
      for fd in (fdstdin, ):
2366
        if fd is not None:
2367
          try:
2368
            fd.close()
2369
          except EnvironmentError, err:
2370
            # just log the error
2371
            #logging.exception("Error while closing fd %s", fd)
2372
            pass
2373

    
2374
    return result == 0, output
2375

    
2376
  def RunHooks(self, hpath, phase, env):
2377
    """Run the scripts in the hooks directory.
2378

2379
    @type hpath: str
2380
    @param hpath: the path to the hooks directory which
2381
        holds the scripts
2382
    @type phase: str
2383
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2384
        L{constants.HOOKS_PHASE_POST}
2385
    @type env: dict
2386
    @param env: dictionary with the environment for the hook
2387
    @rtype: list
2388
    @return: list of 3-element tuples:
2389
      - script path
2390
      - script result, either L{constants.HKR_SUCCESS} or
2391
        L{constants.HKR_FAIL}
2392
      - output of the script
2393

2394
    @raise errors.ProgrammerError: for invalid input
2395
        parameters
2396

2397
    """
2398
    if phase == constants.HOOKS_PHASE_PRE:
2399
      suffix = "pre"
2400
    elif phase == constants.HOOKS_PHASE_POST:
2401
      suffix = "post"
2402
    else:
2403
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2404
    rr = []
2405

    
2406
    subdir = "%s-%s.d" % (hpath, suffix)
2407
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2408
    try:
2409
      dir_contents = utils.ListVisibleFiles(dir_name)
2410
    except OSError, err:
2411
      # FIXME: must log output in case of failures
2412
      return rr
2413

    
2414
    # we use the standard python sort order,
2415
    # so 00name is the recommended naming scheme
2416
    dir_contents.sort()
2417
    for relname in dir_contents:
2418
      fname = os.path.join(dir_name, relname)
2419
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2420
          self.RE_MASK.match(relname) is not None):
2421
        rrval = constants.HKR_SKIP
2422
        output = ""
2423
      else:
2424
        result, output = self.ExecHook(fname, env)
2425
        if not result:
2426
          rrval = constants.HKR_FAIL
2427
        else:
2428
          rrval = constants.HKR_SUCCESS
2429
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2430

    
2431
    return rr
2432

    
2433

    
2434
class IAllocatorRunner(object):
2435
  """IAllocator runner.
2436

2437
  This class is instantiated on the node side (ganeti-noded) and not on
2438
  the master side.
2439

2440
  """
2441
  def Run(self, name, idata):
2442
    """Run an iallocator script.
2443

2444
    @type name: str
2445
    @param name: the iallocator script name
2446
    @type idata: str
2447
    @param idata: the allocator input data
2448

2449
    @rtype: tuple
2450
    @return: four element tuple of:
2451
       - run status (one of the IARUN_ constants)
2452
       - stdout
2453
       - stderr
2454
       - fail reason (as from L{utils.RunResult})
2455

2456
    """
2457
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2458
                                  os.path.isfile)
2459
    if alloc_script is None:
2460
      return (constants.IARUN_NOTFOUND, None, None, None)
2461

    
2462
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2463
    try:
2464
      os.write(fd, idata)
2465
      os.close(fd)
2466
      result = utils.RunCmd([alloc_script, fin_name])
2467
      if result.failed:
2468
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2469
                result.fail_reason)
2470
    finally:
2471
      os.unlink(fin_name)
2472

    
2473
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2474

    
2475

    
2476
class DevCacheManager(object):
2477
  """Simple class for managing a cache of block device information.
2478

2479
  """
2480
  _DEV_PREFIX = "/dev/"
2481
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2482

    
2483
  @classmethod
2484
  def _ConvertPath(cls, dev_path):
2485
    """Converts a /dev/name path to the cache file name.
2486

2487
    This replaces slashes with underscores and strips the /dev
2488
    prefix. It then returns the full path to the cache file.
2489

2490
    @type dev_path: str
2491
    @param dev_path: the C{/dev/} path name
2492
    @rtype: str
2493
    @return: the converted path name
2494

2495
    """
2496
    if dev_path.startswith(cls._DEV_PREFIX):
2497
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2498
    dev_path = dev_path.replace("/", "_")
2499
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2500
    return fpath
2501

    
2502
  @classmethod
2503
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2504
    """Updates the cache information for a given device.
2505

2506
    @type dev_path: str
2507
    @param dev_path: the pathname of the device
2508
    @type owner: str
2509
    @param owner: the owner (instance name) of the device
2510
    @type on_primary: bool
2511
    @param on_primary: whether this is the primary
2512
        node nor not
2513
    @type iv_name: str
2514
    @param iv_name: the instance-visible name of the
2515
        device, as in objects.Disk.iv_name
2516

2517
    @rtype: None
2518

2519
    """
2520
    if dev_path is None:
2521
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2522
      return
2523
    fpath = cls._ConvertPath(dev_path)
2524
    if on_primary:
2525
      state = "primary"
2526
    else:
2527
      state = "secondary"
2528
    if iv_name is None:
2529
      iv_name = "not_visible"
2530
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2531
    try:
2532
      utils.WriteFile(fpath, data=fdata)
2533
    except EnvironmentError, err:
2534
      logging.exception("Can't update bdev cache for %s", dev_path)
2535

    
2536
  @classmethod
2537
  def RemoveCache(cls, dev_path):
2538
    """Remove data for a dev_path.
2539

2540
    This is just a wrapper over L{utils.RemoveFile} with a converted
2541
    path name and logging.
2542

2543
    @type dev_path: str
2544
    @param dev_path: the pathname of the device
2545

2546
    @rtype: None
2547

2548
    """
2549
    if dev_path is None:
2550
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2551
      return
2552
    fpath = cls._ConvertPath(dev_path)
2553
    try:
2554
      utils.RemoveFile(fpath)
2555
    except EnvironmentError, err:
2556
      logging.exception("Can't update bdev cache for %s", dev_path)