Multiple Connections Server
In order to build a multiple connection server, it is good to know that all the socket methods are blocking, that is, when it reads from a socket the program can't do anything else. However it is possible to solve this "problem" working with multiple threads, each for working with one client, but that's not a cheap operation in terms of computer processing power.
To address this, there is a way to work with sockets in an asynchronous way, delegating the maintenance of the socket's state to the operating system and letting the program know when there is something to read or write from/to the socket.
That can be done in a couple of ways, depending on the OS that is used:
poll
andelpoll
(Linux)kqueue
andkevent
(BSD)select
(Cross platform)
This time we will do it by parts to make it more comprehensible. The full code will be at the end of this file.
import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind(('localhost', 9999))
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}
At first, the code is pretty similar to the simple socket that we've built before.
The main difference is at
server.setblocking(0)
, that is used to make the socket nonblocking.We also create two lists, input and output, where the inputs and outputs (duh) will be stored for each client.
Here is what the differences starts to appear:
while inputs:
readable, writable, exceptional = select.select(
inputs, outputs, inputs)
for s in readable:
if s is server:
connection, client_address = s.accept()
connection.setblocking(0)
inputs.append(connection)
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data:
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
else:
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
#...
First, while there is an input from the client, we
select
the sockets.Here we call
select.select
to ask the OS to check given sockets whether they are ready to read, write or if there is any exception. That is why we pass the list of inputs, outputs and inputs again, so that each one respectively is checked if they are ready for the given operation.After that, for each socket
s
that is in the list of the readable ones:- If it is a server socket, we
accept
the connection, set it as non-blocking, append it to theinputs
and adds aQueue
for incoming messages which will be sent back. - Otherwise, that means that there is data on the buffer of the socket, so we
recv
it. In case there is no data it means that the connection was closed, so we remove it from the outputs and then close the socket, deleting the message_queues. If there is data, it simply appends the data to the message queues.
The output part is pretty straightforward:
#...
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
outputs.remove(s)
else:
s.send(next_msg)
#...
For each socket that was selected as writable, if there is a pending message, it writes itself to the socket. In the case that there occurs an error in the socket, it removes the socket from the lists.
#...
for s in exceptional:
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
Following the same logic as the writable sockets, for each socket that happened an exception within, removes it from the inputs, outputs, closes it and deletes all the message queues.
import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind(('localhost', 9999))
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}
while inputs:
readable, writable, exceptional = select.select(
inputs, outputs, inputs)
for s in readable:
if s is server:
connection, client_address = s.accept()
connection.setblocking(0)
inputs.append(connection)
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data:
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
else:
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
outputs.remove(s)
else:
s.send(next_msg)
for s in exceptional:
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
Last modified 4yr ago