blob: 5e3df0ae2ccaab41a04da3032429d964791ee290 [file] [log] [blame]
"""
This is an select module for use on JVMs > 1.5.
It is documented, along with known issues and workarounds, on the jython wiki.
http://wiki.python.org/jython/SelectModule
"""
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
from java.nio.channels.SelectionKey import OP_ACCEPT, OP_CONNECT, OP_WRITE, OP_READ
import errno
import os
import Queue
import socket
class error(Exception): pass
ALL = None
_exception_map = {
# (<javaexception>, <circumstance>) : lambda: <code that raises the python equivalent>
(java.nio.channels.IllegalBlockingModeException, ALL) : error(errno.ESOCKISBLOCKING, 'socket must be in non-blocking mode'),
}
def _map_exception(exc, circumstance=ALL):
try:
mapped_exception = _exception_map[(exc.__class__, circumstance)]
mapped_exception.java_exception = exc
return mapped_exception
except KeyError:
return error(-1, 'Unmapped java exception: <%s:%s>' % (exc.toString(), circumstance))
POLLIN = 1
POLLOUT = 2
# The following event types are completely ignored on jython
# Java does not support them, AFAICT
# They are declared only to support code compatibility with cpython
POLLPRI = 4
POLLERR = 8
POLLHUP = 16
POLLNVAL = 32
def _getselectable(selectable_object):
try:
channel = selectable_object.getchannel()
except:
try:
channel = selectable_object.fileno().getChannel()
except:
raise TypeError("Object '%s' is not watchable" % selectable_object,
errno.ENOTSOCK)
if channel and not isinstance(channel, java.nio.channels.SelectableChannel):
raise TypeError("Object '%s' is not watchable" % selectable_object,
errno.ENOTSOCK)
return channel
class poll:
def __init__(self):
self.selector = java.nio.channels.Selector.open()
self.chanmap = {}
self.unconnected_sockets = []
def _register_channel(self, socket_object, channel, mask):
jmask = 0
if mask & POLLIN:
# Note that OP_READ is NOT a valid event on server socket channels.
if channel.validOps() & OP_ACCEPT:
jmask = OP_ACCEPT
else:
jmask = OP_READ
if mask & POLLOUT:
if channel.validOps() & OP_WRITE:
jmask |= OP_WRITE
if channel.validOps() & OP_CONNECT:
jmask |= OP_CONNECT
selectionkey = channel.register(self.selector, jmask)
self.chanmap[channel] = (socket_object, selectionkey)
def _check_unconnected_sockets(self):
temp_list = []
for socket_object, mask in self.unconnected_sockets:
channel = _getselectable(socket_object)
if channel is not None:
self._register_channel(socket_object, channel, mask)
else:
temp_list.append( (socket_object, mask) )
self.unconnected_sockets = temp_list
def register(self, socket_object, mask = POLLIN|POLLOUT|POLLPRI):
try:
channel = _getselectable(socket_object)
if channel is None:
# The socket is not yet connected, and thus has no channel
# Add it to a pending list, and return
self.unconnected_sockets.append( (socket_object, mask) )
return
self._register_channel(socket_object, channel, mask)
except java.lang.Exception, jlx:
raise _map_exception(jlx)
def unregister(self, socket_object):
try:
channel = _getselectable(socket_object)
self.chanmap[channel][1].cancel()
del self.chanmap[channel]
except java.lang.Exception, jlx:
raise _map_exception(jlx)
def _dopoll(self, timeout):
if timeout is None or timeout < 0:
self.selector.select()
else:
try:
timeout = int(timeout)
if not timeout:
self.selector.selectNow()
else:
# No multiplication required: both cpython and java use millisecond timeouts
self.selector.select(timeout)
except ValueError, vx:
raise error("poll timeout must be a number of milliseconds or None", errno.EINVAL)
# The returned selectedKeys cannot be used from multiple threads!
return self.selector.selectedKeys()
def poll(self, timeout=None):
try:
self._check_unconnected_sockets()
selectedkeys = self._dopoll(timeout)
results = []
for k in selectedkeys.iterator():
jmask = k.readyOps()
pymask = 0
if jmask & OP_READ: pymask |= POLLIN
if jmask & OP_WRITE: pymask |= POLLOUT
if jmask & OP_ACCEPT: pymask |= POLLIN
if jmask & OP_CONNECT: pymask |= POLLOUT
# Now return the original userobject, and the return event mask
results.append( (self.chanmap[k.channel()][0], pymask) )
return results
except java.lang.Exception, jlx:
raise _map_exception(jlx)
def _deregister_all(self):
try:
for k in self.selector.keys():
k.cancel()
# Keys are not actually removed from the selector until the next select operation.
self.selector.selectNow()
except java.lang.Exception, jlx:
raise _map_exception(jlx)
def close(self):
try:
self._deregister_all()
self.selector.close()
except java.lang.Exception, jlx:
raise _map_exception(jlx)
def _calcselecttimeoutvalue(value):
if value is None:
return None
try:
floatvalue = float(value)
except Exception, x:
raise TypeError("Select timeout value must be a number or None")
if value < 0:
raise error("Select timeout value cannot be negative", errno.EINVAL)
if floatvalue < 0.000001:
return 0
return int(floatvalue * 1000) # Convert to milliseconds
# This cache for poll objects is required because of a bug in java on MS Windows
# http://bugs.jython.org/issue1291
class poll_object_cache:
def __init__(self):
self.is_windows = os._name == 'nt'
if self.is_windows:
self.poll_object_queue = Queue.Queue()
import atexit
atexit.register(self.finalize)
def get_poll_object(self):
if not self.is_windows:
return poll()
try:
return self.poll_object_queue.get(False)
except Queue.Empty:
return poll()
def release_poll_object(self, pobj):
if self.is_windows:
pobj._deregister_all()
self.poll_object_queue.put(pobj)
else:
pobj.close()
def finalize(self):
if self.is_windows:
while True:
try:
p = self.poll_object_queue.get(False)
p.close()
except Queue.Empty:
return
_poll_object_cache = poll_object_cache()
def native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None):
timeout = _calcselecttimeoutvalue(timeout)
# First create a poll object to do the actual watching.
pobj = _poll_object_cache.get_poll_object()
try:
registered_for_read = {}
# Check the read list
for fd in read_fd_list:
pobj.register(fd, POLLIN)
registered_for_read[fd] = 1
# And now the write list
for fd in write_fd_list:
if fd in registered_for_read:
# registering a second time overwrites the first
pobj.register(fd, POLLIN|POLLOUT)
else:
pobj.register(fd, POLLOUT)
results = pobj.poll(timeout)
# Now start preparing the results
read_ready_list, write_ready_list, oob_ready_list = [], [], []
for fd, mask in results:
if mask & POLLIN:
read_ready_list.append(fd)
if mask & POLLOUT:
write_ready_list.append(fd)
return read_ready_list, write_ready_list, oob_ready_list
finally:
_poll_object_cache.release_poll_object(pobj)
select = native_select
def cpython_compatible_select(read_fd_list, write_fd_list, outofband_fd_list, timeout=None):
# First turn all sockets to non-blocking
# keeping track of which ones have changed
modified_channels = []
try:
for socket_list in [read_fd_list, write_fd_list, outofband_fd_list]:
for s in socket_list:
channel = _getselectable(s)
if channel.isBlocking():
modified_channels.append(channel)
channel.configureBlocking(0)
return native_select(read_fd_list, write_fd_list, outofband_fd_list, timeout)
finally:
for channel in modified_channels:
channel.configureBlocking(1)