import socket
import shutil
import re
+import select
import ganeti
import testutils
from ganeti.errors import LockError, UnitParseError, GenericError, \
ProgrammerError
-def _ChildHandler(signal, stack):
- global _ChildFlag
- _ChildFlag = True
-
class TestIsProcessAlive(unittest.TestCase):
"""Testing case for IsProcessAlive"""
- def setUp(self):
- global _ChildFlag
- # create a (most probably) non-existing process-id
- self.pid_non_existing = os.fork()
- if self.pid_non_existing == 0:
- os._exit(0)
- elif self.pid_non_existing > 0:
- os.waitpid(self.pid_non_existing, 0)
- else:
- raise SystemError("can't fork")
- _ChildFlag = False
- # Use _ChildHandler for SIGCHLD
- self.chldOrig = signal.signal(signal.SIGCHLD, _ChildHandler)
+ def _CreateZombie(self):
# create a zombie
- self.pid_zombie = os.fork()
- if self.pid_zombie == 0:
+ r_fd, w_fd = os.pipe()
+ pid_zombie = os.fork()
+ if pid_zombie == 0:
+ # explicit close of read, write end will be closed only due to exit
+ os.close(r_fd)
os._exit(0)
- elif self.pid_zombie < 0:
+ elif pid_zombie < 0:
raise SystemError("can't fork")
-
- def tearDown(self):
- signal.signal(signal.SIGCHLD, self.chldOrig)
+ # parent: we close our end of the w_fd, so reads will error as
+ # soon as the OS cleans the child's filedescriptors on its exit
+ os.close(w_fd)
+ # wait for 60 seconds at max for the exit (pathological case, just
+ # so that the test doesn't hang indefinitely)
+ r_set, w_set, e_set = select.select([r_fd], [], [], 60)
+ if not r_set and not e_set:
+ self.fail("Timeout exceeded in zombie creation")
+ return pid_zombie
def testExists(self):
mypid = os.getpid()
"can't find myself running")
def testZombie(self):
- global _ChildFlag
- timeout = 10
-
- while not _ChildFlag:
- if timeout >= 0:
- time.sleep(0.2)
- timeout -= 0.2
- else:
- self.fail("timed out waiting for child's signal")
- break # not executed...
-
- self.assert_(not IsProcessAlive(self.pid_zombie),
- "zombie not detected as zombie")
+ pid_zombie = self._CreateZombie()
+ is_zombie = not IsProcessAlive(pid_zombie)
+ self.assert_(is_zombie, "zombie not detected as zombie")
+ os.waitpid(pid_zombie, os.WNOHANG)
def testNotExisting(self):
- self.assert_(not IsProcessAlive(self.pid_non_existing),
- "noexisting process detected")
+ pid_non_existing = os.fork()
+ if pid_non_existing == 0:
+ os._exit(0)
+ elif pid_non_existing < 0:
+ raise SystemError("can't fork")
+ os.waitpid(pid_non_existing, 0)
+ self.assert_(not IsProcessAlive(pid_non_existing),
+ "nonexisting process detected")
class TestPidFileFunctions(unittest.TestCase):