Revision d5d76ab2 test/ganeti.utils.process_unittest.py

b/test/ganeti.utils.process_unittest.py
274 274
    (out, err, status, ta) = \
275 275
      utils.process._RunCmdPipe(cmd, {}, False, "/", False,
276 276
                                timeout, [self.proc_ready_helper.write_fd],
277
                                None,
277 278
                                _linger_timeout=0.2,
278 279
                                _postfork_fn=self.proc_ready_helper.Ready)
279 280
    self.assert_(status < 0)
......
377 378
    finally:
378 379
      temp.close()
379 380

  
381
  def testNoInputRead(self):
382
    testfile = self._TestDataFilename("cert1.pem")
383

  
384
    result = utils.RunCmd(["cat"], timeout=10.0)
385
    self.assertFalse(result.failed)
386
    self.assertEqual(result.stderr, "")
387
    self.assertEqual(result.stdout, "")
388

  
389
  def testInputFileHandle(self):
390
    testfile = self._TestDataFilename("cert1.pem")
391

  
392
    result = utils.RunCmd(["cat"], input_fd=open(testfile, "r"))
393
    self.assertFalse(result.failed)
394
    self.assertEqual(result.stdout, utils.ReadFile(testfile))
395
    self.assertEqual(result.stderr, "")
396

  
397
  def testInputNumericFileDescriptor(self):
398
    testfile = self._TestDataFilename("cert2.pem")
399

  
400
    fh = open(testfile, "r")
401
    try:
402
      result = utils.RunCmd(["cat"], input_fd=fh.fileno())
403
    finally:
404
      fh.close()
405

  
406
    self.assertFalse(result.failed)
407
    self.assertEqual(result.stdout, utils.ReadFile(testfile))
408
    self.assertEqual(result.stderr, "")
409

  
410
  def testInputWithCloseFds(self):
411
    testfile = self._TestDataFilename("cert1.pem")
412

  
413
    temp = open(self.fname, "r+")
414
    try:
415
      temp.write("test283523367")
416
      temp.seek(0)
417

  
418
      result = utils.RunCmd(["/bin/bash", "-c",
419
                             ("cat && read -u %s; echo $REPLY" %
420
                              temp.fileno())],
421
                            input_fd=open(testfile, "r"),
422
                            noclose_fds=[temp.fileno()])
423
      self.assertFalse(result.failed)
424
      self.assertEqual(result.stdout.strip(),
425
                       utils.ReadFile(testfile) + "test283523367")
426
      self.assertEqual(result.stderr, "")
427
    finally:
428
      temp.close()
429

  
430
  def testOutputAndInteractive(self):
431
    self.assertRaises(errors.ProgrammerError, utils.RunCmd,
432
                      [], output=self.fname, interactive=True)
433

  
434
  def testOutputAndInput(self):
435
    self.assertRaises(errors.ProgrammerError, utils.RunCmd,
436
                      [], output=self.fname, input_fd=open(self.fname))
437

  
380 438

  
381 439
class TestRunParts(testutils.GanetiTestCase):
382 440
  """Testing case for the RunParts function"""

Also available in: Unified diff