blob: 18421980ae825da2d98df9024c9a494a9ae2c594 [file] [log] [blame]
import threading
import unittest
import sys
import pydevconsole
from pydev_imports import xmlrpclib, SimpleXMLRPCServer, StringIO
try:
raw_input
raw_input_name = 'raw_input'
except NameError:
raw_input_name = 'input'
#=======================================================================================================================
# Test
#=======================================================================================================================
class Test(unittest.TestCase):
def testConsoleHello(self):
self.original_stdout = sys.stdout
sys.stdout = StringIO()
try:
client_port, _server_port = self.getFreeAddresses()
client_thread = self.startClientThread(client_port) #@UnusedVariable
import time
time.sleep(.3) #let's give it some time to start the threads
import pydev_localhost
interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread())
(result,) = interpreter.hello("Hello pydevconsole")
self.assertEqual(result, "Hello eclipse")
finally:
sys.stdout = self.original_stdout
def testConsoleRequests(self):
self.original_stdout = sys.stdout
sys.stdout = StringIO()
try:
client_port, _server_port = self.getFreeAddresses()
client_thread = self.startClientThread(client_port) #@UnusedVariable
import time
time.sleep(.3) #let's give it some time to start the threads
import pydev_localhost
from pydev_console_utils import CodeFragment
interpreter = pydevconsole.InterpreterInterface(pydev_localhost.get_localhost(), client_port, threading.currentThread())
sys.stdout = StringIO()
interpreter.addExec(CodeFragment('class Foo:'))
interpreter.addExec(CodeFragment(' CONSTANT=1'))
interpreter.addExec(CodeFragment(''))
interpreter.addExec(CodeFragment('foo=Foo()'))
interpreter.addExec(CodeFragment('foo.__doc__=None'))
interpreter.addExec(CodeFragment('val = %s()' % (raw_input_name,)))
interpreter.addExec(CodeFragment('50'))
interpreter.addExec(CodeFragment('print (val)'))
found = sys.stdout.getvalue().split()
try:
self.assertEqual(['50', 'input_request'], found)
except:
self.assertEqual(['input_request'], found) #IPython
comps = interpreter.getCompletions('foo.', 'foo.')
self.assert_(
('CONSTANT', '', '', '3') in comps or ('CONSTANT', '', '', '4') in comps, \
'Found: %s' % comps
)
comps = interpreter.getCompletions('"".', '"".')
self.assert_(
('__add__', 'x.__add__(y) <==> x+y', '', '3') in comps or
('__add__', '', '', '4') in comps or
('__add__', 'x.__add__(y) <==> x+y\r\nx.__add__(y) <==> x+y', '()', '2') in comps or
('__add__', 'x.\n__add__(y) <==> x+yx.\n__add__(y) <==> x+y', '()', '2'),
'Did not find __add__ in : %s' % (comps,)
)
completions = interpreter.getCompletions('', '')
for c in completions:
if c[0] == 'AssertionError':
break
else:
self.fail('Could not find AssertionError')
completions = interpreter.getCompletions('Assert', 'Assert')
for c in completions:
if c[0] == 'RuntimeError':
self.fail('Did not expect to find RuntimeError there')
self.assert_(('__doc__', None, '', '3') not in interpreter.getCompletions('foo.CO', 'foo.'))
comps = interpreter.getCompletions('va', 'va')
self.assert_(('val', '', '', '3') in comps or ('val', '', '', '4') in comps)
interpreter.addExec(CodeFragment('s = "mystring"'))
desc = interpreter.getDescription('val')
self.assert_(desc.find('str(object) -> string') >= 0 or
desc == "'input_request'" or
desc.find('str(string[, encoding[, errors]]) -> str') >= 0 or
desc.find('str(Char* value)') >= 0 or
desc.find('str(object=\'\') -> string') >= 0 or
desc.find('str(value: Char*)') >= 0 or
desc.find('str(object=\'\') -> str') >= 0
,
'Could not find what was needed in %s' % desc)
desc = interpreter.getDescription('val.join')
self.assert_(desc.find('S.join(sequence) -> string') >= 0 or
desc.find('S.join(sequence) -> str') >= 0 or
desc.find('S.join(iterable) -> string') >= 0 or
desc == "<builtin method 'join'>" or
desc == "<built-in method join of str object>" or
desc.find('str join(str self, list sequence)') >= 0 or
desc.find('S.join(iterable) -> str') >= 0 or
desc.find('join(self: str, sequence: list) -> str') >= 0,
"Could not recognize: %s" % (desc,))
finally:
sys.stdout = self.original_stdout
def startClientThread(self, client_port):
class ClientThread(threading.Thread):
def __init__(self, client_port):
threading.Thread.__init__(self)
self.client_port = client_port
def run(self):
class HandleRequestInput:
def RequestInput(self):
client_thread.requested_input = True
return 'input_request'
def NotifyFinished(self, *args, **kwargs):
client_thread.notified_finished += 1
return 1
handle_request_input = HandleRequestInput()
import pydev_localhost
client_server = SimpleXMLRPCServer((pydev_localhost.get_localhost(), self.client_port), logRequests=False)
client_server.register_function(handle_request_input.RequestInput)
client_server.register_function(handle_request_input.NotifyFinished)
client_server.serve_forever()
client_thread = ClientThread(client_port)
client_thread.requested_input = False
client_thread.notified_finished = 0
client_thread.setDaemon(True)
client_thread.start()
return client_thread
def startDebuggerServerThread(self, debugger_port, socket_code):
class DebuggerServerThread(threading.Thread):
def __init__(self, debugger_port, socket_code):
threading.Thread.__init__(self)
self.debugger_port = debugger_port
self.socket_code = socket_code
def run(self):
import socket
s = socket.socket()
s.bind(('', debugger_port))
s.listen(1)
socket, unused_addr = s.accept()
socket_code(socket)
debugger_thread = DebuggerServerThread(debugger_port, socket_code)
debugger_thread.setDaemon(True)
debugger_thread.start()
return debugger_thread
def getFreeAddresses(self):
import socket
s = socket.socket()
s.bind(('', 0))
port0 = s.getsockname()[1]
s1 = socket.socket()
s1.bind(('', 0))
port1 = s1.getsockname()[1]
s.close()
s1.close()
if port0 <= 0 or port1 <= 0:
#This happens in Jython...
from java.net import ServerSocket
s0 = ServerSocket(0)
port0 = s0.getLocalPort()
s1 = ServerSocket(0)
port1 = s1.getLocalPort()
s0.close()
s1.close()
assert port0 != port1
assert port0 > 0
assert port1 > 0
return port0, port1
def testServer(self):
self.original_stdout = sys.stdout
sys.stdout = StringIO()
try:
client_port, server_port = self.getFreeAddresses()
class ServerThread(threading.Thread):
def __init__(self, client_port, server_port):
threading.Thread.__init__(self)
self.client_port = client_port
self.server_port = server_port
def run(self):
import pydev_localhost
pydevconsole.StartServer(pydev_localhost.get_localhost(), self.server_port, self.client_port)
server_thread = ServerThread(client_port, server_port)
server_thread.setDaemon(True)
server_thread.start()
client_thread = self.startClientThread(client_port) #@UnusedVariable
import time
time.sleep(.3) #let's give it some time to start the threads
sys.stdout = StringIO()
import pydev_localhost
server = xmlrpclib.Server('http://%s:%s' % (pydev_localhost.get_localhost(), server_port))
server.execLine('class Foo:')
server.execLine(' pass')
server.execLine('')
server.execLine('foo = Foo()')
server.execLine('a = %s()' % (raw_input_name,))
server.execLine('print (a)')
initial = time.time()
while not client_thread.requested_input:
if time.time() - initial > 2:
raise AssertionError('Did not get the return asked before the timeout.')
time.sleep(.1)
while ['input_request'] != sys.stdout.getvalue().split():
if time.time() - initial > 2:
break
time.sleep(.1)
self.assertEqual(['input_request'], sys.stdout.getvalue().split())
finally:
sys.stdout = self.original_stdout
#=======================================================================================================================
# main
#=======================================================================================================================
if __name__ == '__main__':
unittest.main()