The Robot World Repo on GitHub
The Forth Repo on GitHub

Let’s look at what we have, see how it can be improved, and decide what’s next. Some nice refactoring. Also: Love one another.

I think—before reviewing the code—that our WorldServer experiment is in pretty good shape. It expects and deals with a half-word length field ahead of the message; it accumulates bytes until it has the complete message, and then (in a very trivial way) it computes a result and writes it back to the socket as a reply. We have tests that include sending the half-word in two pieces, and sending messages split into pieces, and they all pass.

I think we could plug this into a version of World and drive the world with it. I believe—that’s slightly less strong than “think”—that we could pretty easily draw the game while this is going on. Certainly we could draw on each message cycle, but I think we could keep drawing while waiting for input … for example if we had NPCs or something in the game that moved around even if players provided no input.

I also suspect—like think but more negative—that our code could use a bit of improvement. Let’s review what we have and decide what to do next. We’ll probably improve some things as we go.

Our server is started like this:

ws = WorldServer()
ws.go()

That runs our simple init and takes us to go:

class WorldServer:
    HOST = "127.0.0.1"
    PORT = 34567

    def __init__(self):
        self.sel = selectors.DefaultSelector()

    def go(self):
        lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        lsock.bind((self.HOST, self.PORT))
        lsock.listen()
        print(f"Listening on {(self.HOST, self.PORT)}")
        lsock.setblocking(False)
        self.sel.register(lsock, selectors.EVENT_READ, data=None)

        try:
            while True:
                events = self.sel.select(timeout=None)
                for key, mask in events:
                    if key.data is None:
                        self.accept_wrapper(key.fileobj)
                    else:
                        self.service_connection(key, mask)
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

Let’s see what I can “explain” about this. Some of what I say may be wrong: this is my understanding and I am still no expert in sockets.

The sel selector is an object such that we can register sockets with it and when an event occurs on one of the registered sockets, sel.select will return an event for that socket, consisting of a key and mask. The mask, based on how we use it, includes flags for read and write (and probably other flags about which I do not know). The key has an attribute fileobj, which gives PyCharm some concern, but which we’ll see used in accept_wrapper in a moment.

The first block paragraph in go sets up a non-blocking listener socket on the provided host and port. Let’s refactor a bit. We’ll extract a method, and I’d like to have a better name than lsock:

    def go(self):
        self.start_listener()
        try:
            while True:
                events = self.sel.select(timeout=None)
                for key, mask in events:
                    if key.data is None:
                        self.accept_wrapper(key.fileobj)
                    else:
                        self.service_connection(key, mask)
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

    def start_listener(self):
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.bind((self.HOST, self.PORT))
        listener.listen()
        print(f"Listening on {(self.HOST, self.PORT)}")
        listener.setblocking(False)
        self.sel.register(listener, selectors.EVENT_READ, data=None)

That seems better to me. The go method now has two parts, a very simple method call, and a slalom run that is at least one black diamond. Let’s refactor a bit more. I’ll work inside out:

    def go(self):
        self.start_listener()
        try:
            while True:
                events = self.sel.select(timeout=None)
                for key, mask in events:
                    self.handle_one_event(key, mask)
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

    def handle_one_event(self, key, mask):
        if key.data is None:
            self.accept_wrapper(key.fileobj)
        else:
            self.service_connection(key, mask)

I think I’ll extract the whole while loop next:

    def go(self):
        self.start_listener()
        try:
            self.event_loop_forever()
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

    def event_loop_forever(self):
        while True:
            events = self.sel.select(timeout=None)
            for key, mask in events:
                self.handle_one_event(key, mask)

Inline events:

    def event_loop_forever(self):
        while True:
            for key, mask in self.sel.select(timeout=None):
                self.handle_one_event(key, mask)

Would you believe one more extract? Let’s try it:

    def go(self):
        self.start_listener()
        try:
            self.event_loop_forever()
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

    def event_loop_forever(self):
        while True:
            self.handle_socket_events()

    def handle_socket_events(self):
        for key, mask in self.sel.select(timeout=None):
            self.handle_one_event(key, mask)

    def handle_one_event(self, key, mask):
        if key.data is None:
            self.accept_wrapper(key.fileobj)
        else:
            self.service_connection(key, mask)

    def start_listener(self):
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.bind((self.HOST, self.PORT))
        listener.listen()
        print(f"Listening on {(self.HOST, self.PORT)}")
        listener.setblocking(False)
        self.sel.register(listener, selectors.EVENT_READ, data=None)

I think that’s a lot easier to explain and therefore a lot easier to understand. Tests are green. Commit: refactoring.

It remains to look at accept_wrapper and service_connection. The latter is where the joy happens. We’ll look at accept_wrapper first:

    def accept_wrapper(self, sock):
        conn, addr = sock.accept()  # Should be ready to read
        print(f"Accepted connection from {addr}")
        conn.setblocking(False)
        handler = MessageHandler(addr=addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        self.sel.register(conn, events, data=handler)

This is called when there is no key.data, which is the signal that someone is knocking on the door. We have been passed the fileobj from the key, which, we can see here, must be a socket. It’s a socket because when we register a selector, as we did here:

    def start_listener(self):
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        listener.bind((self.HOST, self.PORT))
        listener.listen()
        print(f"Listening on {(self.HOST, self.PORT)}")
        listener.setblocking(False)
        self.sel.register(listener, selectors.EVENT_READ, data=None)

The first parameter is the one that will be returned as key.fileobj, and we gave it our listener socket.

I am inclined to rename the parameter on accept_wrapper to listener:

    def accept_wrapper(self, listener):
        conn, addr = listener.accept()  # Should be ready to read
        print(f"Accepted connection from {addr}")
        conn.setblocking(False)
        handler = MessageHandler(addr=addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        self.sel.register(conn, events, data=handler)

What is conn? Isn’t that likely a socket? Yes. I add a type hint, rename, and do the print last because the working lines are more important and the print kind of stops one’s scan:

    def accept_wrapper(self, listener: socket.socket):
        connecting_socket, addr = listener.accept()  # Should be ready to read
        connecting_socket.setblocking(False)
        handler = MessageHandler(addr=addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        self.sel.register(connecting_socket, events, data=handler)
        print(f"Accepted connection from {addr}")

All that boilerplate just accepts the new connecting socket and sets up a selector for it. The only fancy part is that the data field that will come back will be the new instance of MessageHandler that we have created for this connection. Each connection gets its own MessageHandler. We’ll see in a moment what happens when we get a message from our new friend:

    def handle_one_event(self, key, mask):
        if key.data is None:
            self.accept_wrapper(key.fileobj)
        else:
            self.service_connection(key, mask)

If key.data isn’t None, it’ll be a MessageHandler, and we will go to service_connection:

    def service_connection(self, key, mask):
        sock = key.fileobj
        handler = key.data
        if mask & selectors.EVENT_READ:
            handler.process_read(self.sel, sock)
        if mask & selectors.EVENT_WRITE:
            handler.process_write(sock)

What we see here may need a bit of improvement. We retrieve the socket from key.filename and get the handler. Then we decode the mask events … and decide on behalf of the handler what method it wants us to call.

I think it might be better if the handler decided what it wants to do, based on the mask. In favor of that idea, it makes sense for the handler to determine what to do. Against the idea is that we can keep knowledge of the specific flags here in WorldServer, and let MessageHandler be a bit more abstract.

However, if we’re going to protect MessageHandler from details, it’s awkward that we pass in the selector. Why do we do that?

class MessageHandler:
    def process_read(self, sel, sock):
        bytes = sock.recv(1024)  # Should be ready to read
        if not bytes:
            print(f"Closing connection {self}")
            sel.unregister(sock)
            sock.close()
            return
        ...

If there is no data to receive, the connection is dead and we have no choice but to close the socket, and the polite thing to do is to unregister the selector.

The above are the only references to sock in process_read. So what if we were to pass in the bytes, if we have any, instead?

I don’t see any PyCharm tools that will make this move for me. I’ll have to do it by hand.

class WorldServer:
    def service_connection(self, key, mask):
        sock = key.fileobj
        handler = key.data
        if mask & selectors.EVENT_READ:
            bytes = sock.recv(1024)
            if not bytes:
                print(f"Closing connection {self}")
                self.sel.unregister(sock)
                sock.close()
            else:
                handler.process_read(bytes)
        if mask & selectors.EVENT_WRITE:
            handler.process_write(sock)

class MessageHandler:
    def process_read(self, bytes):
        self.inb += bytes
        if self.size is None:
            if len(self.inb) >= 2:
                self.size = struct.unpack('>H', self.inb[:2])[0]
                self.inb = self.inb[2:]
        if self.size is not None and len(self.inb) >= self.size:
            print(f'{self.size=} {self.inb=} {len(self.inb)=}')
            self.outb = b"I saw: " + self.inb[:self.size]
            self.inb = self.inb[self.size:]
            self.size = None

That’s better in MessageHandler, I think, but now we see that process_read does not use the socket, but process_write does:

    def process_write(self, sock):
        if self.outb:
            print(f"Echoing {self.outb!r} to {self.addr}")
            sent = sock.send(self.outb)  # Should be ready to write
            self.outb = self.outb[sent:]

I don’t think I like those most recent changes, and I think I’ve skipped a couple of commits that would be nice to fall back to.

I manage to undo from the diff and before I forget, commit: improve accept_wrapper.

OK, time for a break after that interesting but unsatisfactory attempt. Let’s reflect and sum up.

Reflection

I am pleased with the changes to WorldServer, breaking down its main loop into small methods. The accept_wrapper is improved. It’s six lines long but they’re all cohesive, setting up the new connection.

I remain unsatisfied with service_connection and the process_read and process_write methods, but we’ve only taken one whack at them. Possibly … I’m not at all sure of this … possibly we’d prefer that MessageHandler accept bytes in on a read request and provide bytes out as a return to a write request.

I think the general flow is to process input bytes until a message is complete, then submit that message to the application, and get a response back, which only then produces bytes for output. The MessageHandler removes the prefix on input, so it might add the prefix upon output, and then return the resulting bytes to the server.

That sounds almost sensible, doesn’t it? Maybe we’ll try that next time.

Summary

By my lights, the only lights we have here, breaking WorldServer into small methods makes the code tell the story better and certainly provides a much better shape if we were going a live code review. I’m not sure whether one more extract would improve this:

    def go(self):
        self.start_listener()
        try:
            self.event_loop_forever()
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

The Composed Method pattern suggests that we might do something like this:

    def go(self):
        self.start_listener()
        self.run_events_until_interrupted()

    def run_events_until_interrupted(self):
        try:
            self.event_loop_forever()
        except KeyboardInterrupt:
            print("Caught keyboard interrupt, exiting")
        finally:
            self.sel.close()

We’ll commit that and live with it a bit.

Overall, refactoring WorldServer has improved it (by my aforementioned lights) and we have a concern and idea about the relationship with MessageHandler to work on next time.

A nice session. See you next time. And love each other, even the ones who are not quite like you.