Back to Sockets?
The Robot World Repo on GitHub
The Forth Repo on GitHub
I think the bowling nerd sniping virus has worn off, so I suppose I have to look at the Robot World socket stuff again. Here’s how I learn code I’m not familiar with. Resist fascism!
I really was not enjoying the server work. I enjoy the various games I program, including the Robot World. I enjoyed making a Forth, although that joy was recently dampened somewhat when I learned that Bill Wake is doing a Forth in Mac assembler, which I surely do not want to do but I bow to the master on that one.
Let’s see if I can get back into the server thing and find some reason to like it. Where were we?
We have but four tests:
def prefix(msg):
num = len(msg)
prefix = struct.pack('>H', num)
return prefix + msg
class TestServer:
def test_pack_unpack(self):
value = 1234
packed = struct.pack('>H', value)
seq = struct.unpack('>H', packed)
assert seq[0] == value
def test_one_go_round(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("127.0.0.1", 34567))
msg = b"Hello, world"
prefixed = prefix(msg)
s.sendall(prefixed)
data = s.recv(1024)
assert data == "I saw: Hello, world".encode('utf-8')
def test_two_messages(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("127.0.0.1", 34567))
msg = prefix(b"Hello, world")
s.sendall(msg[:6])
s.sendall(msg[6:])
data = s.recv(1024)
assert data == "I saw: Hello, world".encode('utf-8')
how_ya = prefix(b'How ya doing?')
s.sendall(how_ya[:1])
s.sendall(how_ya[1:4])
s.sendall(how_ya[4:])
data = s.recv(1024)
assert data == "I saw: How ya doing?".encode('utf-8')
def test_incomplete_message(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("127.0.0.1", 34567))
msg = prefix(b"Hello, world")
s.sendall(msg[:6])
s.setblocking(False)
with pytest.raises(BlockingIOError) as e:
data = s.recv(1024)
The first one was just a test to discover and document how to use struct.pack
and unpack
, and it runs. The other two need me to start up the server code, which we’ll get to in a moment. For now, I’ll start it up and see if the tests will be kind enough to run.
I am delighted to say that they run.
I last worked on this code back on March 14th, so all my caches have long since been flushed, and I have no useful recollection of what the tests test, how they manage to run, or what might be next. In the prior article, my reflection included:
I am pleased with the changes to WorldServer, breaking down its main loop into small methods. The
accept_wrapper
is improved. It’s six lines long but they’re all cohesive, setting up the new connection.
I remain unsatisfied with
service_connection
and theprocess_read
andprocess_write
methods, but we’ve only taken one whack at them. Possibly … I’m not at all sure of this … possibly we’d prefer that MessageHandler accept bytes in on a read request and provide bytes out as a return to a write request.
I think the general flow is to process input bytes until a message is complete, then submit that message to the application, and get a response back, which only then produces bytes for output. The MessageHandler removes the prefix on input, so it might add the prefix upon output, and then return the resulting bytes to the server.
That sounds almost sensible, doesn’t it? Maybe we’ll try that next time.
I doubt that we’ll do too much of whatever that meant this time. Our mission, first, is just to try to bring our internal thinking engines back up to speed. Let’s see how the first actual test works:
def test_one_go_round(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(("127.0.0.1", 34567))
msg = b"Hello, world"
prefixed = prefix(msg)
s.sendall(prefixed)
data = s.recv(1024)
assert data == "I saw: Hello, world".encode('utf-8')
OK, I think I’ve got this. We connect to our server on some port (34567) and prepare the message by prefixing it with its length. (The example from RealPython was much more complicated, supporting binary messages. We’re not gonna need that.) We send the message and hang to receive what comes back, which, one gathers, will be “I saw: “ followed by the message that was sent.
With that in mind, let’s check out the server code. We’ll display it incrementally, as I focus on things.
There are two classes, MessageHandler and WorldServer. I expect that the latter uses the former somehow yet to be read about. Since the server seems to be the root, I look there and first find the main code that runs it:
ws = WorldServer()
ws.go()
I think that this should really be wrapped in that standard if this is main code. Perhaps soon. For now, we explore WorldServer creation and its go
method:
class WorldServer:
HOST = "127.0.0.1"
PORT = 34567
def __init__(self):
self.sel = selectors.DefaultSelector()
def go(self):
self.start_listener()
self.run_events_until_interrupted()
OK, clearly we start listening for messages and then loop nearly forever dealing with them. How do we start listening?
def start_listener(self):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind((self.HOST, self.PORT))
listener.listen()
print(f"Listening on {(self.HOST, self.PORT)}")
listener.setblocking(False)
self.sel.register(listener, selectors.EVENT_READ, data=None)
It begins to come back to me. The selector, very informally, is an object where we register all our sockets and when they want to be processed, the selector deals them out to us in some fashion. In this method we create what will be the main listener, that triggers when someone connects. We’ll see that in a moment, I reckon.
What about the main loop? It encompasses four small methods:
def run_events_until_interrupted(self):
try:
self.event_loop_forever()
except KeyboardInterrupt:
print("Caught keyboard interrupt, exiting")
finally:
self.sel.close()
def event_loop_forever(self):
while True:
self.handle_socket_events()
def handle_socket_events(self):
for key, mask in self.sel.select(timeout=None):
self.handle_one_event(key, mask)
def handle_one_event(self, key, mask):
if key.data is None:
self.accept_wrapper(key.fileobj)
else:
self.service_connection(key, mask)
OK, makes sense in a fashion. We’ll be in event_loop_forever
unless something goes wrong. If it’s a keyboard interrupt, we print, and in any case we close our selector before exiting.
Apparently the selector will have one or more events, which provide a key and mask. At this moment I have no useful recollection of what that key is, but in handle_one_event
we get a clue: it might contain data. And I do recall that the ones that come in with no data are attempts to open new connections, which we’ll handle in accept_wrapper
. Otherwise we do service_connection
, which seems more interesting:
def service_connection(self, key, mask):
sock = key.fileobj
handler = key.data
if mask & selectors.EVENT_READ:
handler.process_read(self.sel, sock)
if mask & selectors.EVENT_WRITE:
handler.process_write(sock)
Ah. I’m willing to bet that the handler
is a MessageHandler. We can glance at accept_wrapper
to be sure:
def accept_wrapper(self, listener: socket.socket):
connecting_socket, addr = listener.accept() # Should be ready to read
connecting_socket.setblocking(False)
handler = MessageHandler(addr=addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
self.sel.register(connecting_socket, events, data=handler)
print(f"Accepted connection from {addr}")
Yes, we associate a MessageHandler instance with each new connection. I don’t really know what addr
is, but we’ll see what MessageHandler does with it in a moment. Looking back at service_connection
, we see that we call either process_read
or process_write
on the MessageHandler. First read:
class MessageHandler:
def __init__(self, addr):
self.addr = addr
self.inb = b""
self.outb = b""
self.size = None
def process_read(self, sel, sock):
bytes = sock.recv(1024) # Should be ready to read
if not bytes:
print(f"Closing connection {self}")
sel.unregister(sock)
sock.close()
return
self.inb += bytes
if self.size is None:
if len(self.inb) >= 2:
self.size = struct.unpack('>H', self.inb[:2])[0]
self.inb = self.inb[2:]
if self.size is not None and len(self.inb) >= self.size:
print(f'{self.size=} {self.inb=} {len(self.inb)=}')
self.outb = b"I saw: " + self.inb[:self.size]
self.inb = self.inb[self.size:]
self.size = None
Wow, that’s a double black diamond ski run isn’t it? (Oh, you’ve seen worse? Implemented worse? OK, so have I: this is a short double black diamond run.)
Curiously, there seems to be no point to addr
. We do print it but otherwise it seems to have no purpose. I’ll have to look it up, I guess. Anyway …
if we get no bytes back, that’s a sign that the client has disconnected. Let’s refactor a bit as we go, maybe we can make this more clear for next time.
def process_read(self, sel, sock):
bytes = sock.recv(1024) # Should be ready to read
if not bytes:
self.client_closed_connection(sel, sock)
return
self.inb += bytes
if self.size is None:
if len(self.inb) >= 2:
self.size = struct.unpack('>H', self.inb[:2])[0]
self.inb = self.inb[2:]
if self.size is not None and len(self.inb) >= self.size:
print(f'{self.size=} {self.inb=} {len(self.inb)=}')
self.outb = b"I saw: " + self.inb[:self.size]
self.inb = self.inb[self.size:]
self.size = None
I think I’m going to want to make that guard clause / early out into an if-else
, but we’ll see.
Now we add the bytes to our inb
, which presumably is short for input_bytes
. I hope I copied that from someone and didn’t create that name all on my own.
If size
is None, that tells us that we have not as yet read the message length from the front of the message. We can only do that if we have at least two bytes, since it is two bytes long. If it is, we unpack self.size
and remove those two bytes from the input bytes.
Then, if size
is not None, it is the expected length of the message and we have already buffered however many have come in. In principle there could be fewer or more than size
. If there are at least size
bytes, we compute outb
, another very clever name by prepending “I saw: “, and we consume the bytes that we used up. If we’ve done that, we set size
back to None, signifying that future bytes are a new message.
I’d like the code to express that story better. Let’s try some more refactoring.
def process_read(self, sel, sock):
bytes = sock.recv(1024) # Should be ready to read
if not bytes:
self.client_closed_connection(sel, sock)
return
self.inb += bytes
if self.size is None:
self.get_size_if_available()
if self.size is not None and len(self.inb) >= self.size:
self.process_complete_message()
def process_complete_message(self):
print(f'{self.size=} {self.inb=} {len(self.inb)=}')
self.outb = b"I saw: " + self.inb[:self.size]
self.inb = self.inb[self.size:]
self.size = None
def get_size_if_available(self):
if len(self.inb) >= 2:
self.size = struct.unpack('>H', self.inb[:2])[0]
self.inb = self.inb[2:]
def client_closed_connection(self, sel, sock):
print(f"Closing connection {self}")
sel.unregister(sock)
sock.close()
In the spirit of Smalltalk, I think we would like to give those last two ifs better names. Clearly the last one is something like “message is ready to be processed” or “input message is complete”. Let’s try this:
def process_read(self, sel, sock):
bytes = sock.recv(1024) # Should be ready to read
if not bytes:
self.client_closed_connection(sel, sock)
return
self.inb += bytes
if self.size is None:
self.get_size_if_available()
if self.input_message_complete():
self.process_complete_message()
def input_message_complete(self):
return self.size is not None and len(self.inb) >= self.size
self.size
is None when the message is brand new: we know nothing of it. Let’s call it that. And we’ll probably rename what is inside the if, but not yet.
def process_read(self, sel, sock):
bytes = sock.recv(1024) # Should be ready to read
if not bytes:
self.client_closed_connection(sel, sock)
return
self.inb += bytes
if self.message_is_new():
self.get_size_if_available()
if self.input_message_complete():
self.process_complete_message()
I think I like that version of process_read
. And there is something particularly good about it which we’ll look at in a moment. First let’s make sure that it works. I stop the server and restart it. I type a space and the tests run green. Cool. Commit: refactoring
What is the particularly good thing? It’s this:
def process_complete_message(self):
print(f'{self.size=} {self.inb=} {len(self.inb)=}')
self.outb = b"I saw: " + self.inb[:self.size]
self.inb = self.inb[self.size:]
self.size = None
This method centralizes the semantic processing of the input message. Here we just prepend the “I saw”, but this is where in a real application we’d dispatch the message off to the Robot World or whatever. Sure, we could have identified that place in the original process_read
method, it was only 18 lines long and in there is the one line that should be replaced with a call to the application. Now it’s down to four lines, counting the print. And it’s the first operational line that gets replaced.
We could do more refactoring, separating the manipulation of inb
and outb
from the actual message processing (the +
in this case), but that will come.
Now most of the code that I’ve seen for processing network connections, or dealing with files or database connections, looks more like what we started with than where we are now. And a lot of teams, I find, think that they prefer things written out in that open not very cohesive form. It’s what they’re used to.
I come from a community that is more concerned with what is going on than with how it is done, deferring the how down into small objects and methods that one rarely looks at. So I prefer today’s form
But enough.
Reflection
What you’ve seen here is how I become familiar with code that I have never seen before or at least haven’t seen since a few days ago. (I really do not try to remember code. I do often have today’s code in mind tonight when I start going to sleep, or tomorrow morning as I wake up, but in general I don’t try to remember how something works. I try to write it so that I can quickly relearn how it works when I need to.)
And I try to take code that is too much about “how” and refactor it into methods with names that are about “what” and whose code inside is about “how”. This tends to give me code like this, that mostly talks about what it’s doing, not how it’s doing it:
if self.message_is_new():
self.get_size_if_available()
if self.input_message_complete():
self.process_complete_message()
I find that with code like this, I come up to speed faster, and generally I don’t have to drill down into any code except for the particular bit that I need to work on.
Assuming that I continue with this socket thing, I think we’ll find that beginning to happen.
It’s worth remembering that this project, WorldServer, is basically an experiment to learn about sockets. I do not know how we’ll evolve toward our actual intended use, a Robot World server that can be driven by anyone with a connection.
I have had a suggestion that might “change everything”, namely that using WebSockets would be better, or more likely to attract people to play with the thing. At this writing I don’t even know what WebSockets are. I guess I’d better find out.
Summary
I feel as if I’m nearly up to full speed, a pretty good use of an hour and a half of reading, writing and a bit of refactoring.
I hope your days go well, that your situation improves, and that you’ll fight fascism in every way you can find. If you find any good ones, let me know.
See you next time!