Multiple Connections Server

In order to build a multiple connection server, it is good to know that all the socket methods are blocking, that is, when it reads from a socket the program can't do anything else. However it is possible to solve this "problem" working with multiple threads, each for working with one client, but that's not a cheap operation in terms of computer processing power.

To address this, there is a way to work with sockets in an asynchronous way, delegating the maintenance of the socket's state to the operating system and letting the program know when there is something to read or write from/to the socket.

That can be done in a couple of ways, depending on the OS that is used:

  • poll andelpoll (Linux)

  • kqueue and kevent (BSD)

  • select (Cross platform)

Building a Multiple Listener Using select

This time we will do it by parts to make it more comprehensible. The full code will be at the end of this file.

Creating the Socket

import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind(('localhost', 9999))
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}

At first, the code is pretty similar to the simple socket that we've built before.

The main difference is at server.setblocking(0), that is used to make the socket nonblocking.

We also create two lists, input and output, where the inputs and outputs (duh) will be stored for each client.

Taking input from the client

Here is what the differences starts to appear:

while inputs:
    readable, writable, exceptional = select.select(
        inputs, outputs, inputs)
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            connection.setblocking(0)
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
         #...

First, while there is an input from the client, we select the sockets.

Here we call select.select to ask the OS to check given sockets whether they are ready to read, write or if there is any exception. That is why we pass the list of inputs, outputs and inputs again, so that each one respectively is checked if they are ready for the given operation.

After that, for each socket s that is in the list of the readable ones:

  • If it is a server socket, we acceptthe connection, set it as non-blocking, append it to the inputs and adds a Queuefor incoming messages which will be sent back.

  • Otherwise, that means that there is data on the buffer of the socket, so we recv it. In case there is no data it means that the connection was closed, so we remove it from the outputs and then close the socket, deleting the message_queues. If there is data, it simply appends the data to the message queues.

Sending outputs to the clients

The output part is pretty straightforward:

    #...
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            outputs.remove(s)
        else:
            s.send(next_msg)
     #...

For each socket that was selected as writable, if there is a pending message, it writes itself to the socket. In the case that there occurs an error in the socket, it removes the socket from the lists.

Treating exceptions

    #...
    for s in exceptional:
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

Following the same logic as the writable sockets, for each socket that happened an exception within, removes it from the inputs, outputs, closes it and deletes all the message queues.

Full Code

import select, socket, sys, Queue
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
server.bind(('localhost', 9999))
server.listen(5)
inputs = [server]
outputs = []
message_queues = {}

while inputs:
    readable, writable, exceptional = select.select(
        inputs, outputs, inputs)
    for s in readable:
        if s is server:
            connection, client_address = s.accept()
            connection.setblocking(0)
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            outputs.remove(s)
        else:
            s.send(next_msg)

    for s in exceptional:
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

Last updated