Python Asteroids+Invaders on GitHub

Let’s try a spike that may lead to an improved version of our invader fleet logic. I promise to throw it away.

When we look at this code:

class InvaderFleet:
    def process_result(self, result):
        if result == CycleStatus.CONTINUE:
            pass
        elif result == CycleStatus.NEW_CYCLE:
            if self.reverse:
                self.reverse_travel()
            else:
                self.step_origin()

We see that it does one of three things: just carry on, or step the origin, or reverse direction. We currently call this method on every update, each update moving at most one invader:

class InvaderFleet:
    def update(self, delta_time, _fleets):
        result = self.invader_group.update_next(self.origin)
        self.process_result(result)

The return values come from InvaderGroup:

class InvaderGroup:
    def update_next(self, origin):
        if self._next_invader >= len(self.invaders):
            self._next_invader = 0
            return CycleStatus.NEW_CYCLE
        invader = self.next_invader()
        invader.position = origin
        self._next_invader += 1
        return CycleStatus.CONTINUE

The determination of whether to step the origin or reverse direction is made in a rather complex fashion. As invaders move into a side bumper, each one in the bumper currently sends at_edge to the InvaderFleet, including the bumper’s “incoming direction”:

class Invader:
    def interact_with_bumper(self, bumper, invader_fleet):
        if bumper.intersecting(self.rect):
            invader_fleet.at_edge(bumper.incoming_direction)

This is used to determine whether the colliding invaders are moving into the bumper or away from it. If they are moving into it, it is time to reverse. If not, we should just carry on.

But because up to five invaders can hit the bumper before reversing, and because we move invaders one at a time, that call back can be made over a hundred times.

What would be ideal would be for the InvaderGroup, at the end of a cycle, to return one of two values, either NEW_CYCLE, or REVERSE. Then InvaderFleet would know exactly what to do, and could be coded like this:

    def process_result(self, result):
        if result == CycleStatus.CONTINUE:
            pass
        elif result == CycleStatus.NEW_CYCLE:
            self.step_origin()
        elif result == CycleStatus.REVERSE:
            self.reverse_travel()

I’ll add REVERSE to CycleStatus and leave the code that way, and spike to make it work.

class CycleStatus(Enum):
    CONTINUE = "continue"
    NEW_CYCLE = "new cycle"
    REVERSE = "reverse"

I am starting to think that this might go smoothly. It is tempting to keep what I get but really, working without new tests and such, I should throw this away. We’ll see. My willingness to toss it is fading.

At this point, I have at least one broken test.

    def test_end_at_edge_steps_down_and_left(self):
        fleet = InvaderFleet()
        origin = fleet.origin
        direction = fleet.direction
        fleet.at_edge(+1)
        fleet.process_result(CycleStatus.NEW_CYCLE)
        assert fleet.direction == -direction
        assert fleet.origin == origin - fleet.step + fleet.down_step

This should call with REVERSE. That’s actually a useful test. Well, semi-useful anyway. With NEW_CYCLE replaced with REVERSE, it passes. So far so good.

But the game will not work and we do not have a test showing that. As things stand, the invaders just march off the screen to the right, because InvaderGroup is never returning REVERSE.

We update each invader just by changing its origin: they all know their personal offset from origin and we update each one’s origin separately:

    def update_next(self, origin):
        if self._next_invader >= len(self.invaders):
            self._next_invader = 0
            return CycleStatus.NEW_CYCLE
        invader = self.next_invader()
        invader.position = origin
        self._next_invader += 1
        return CycleStatus.CONTINUE

When are these guys calling back?

class InvaderGroup:
    def interact_with_bumper(self, bumper, fleet):
        for invader in self.invaders:
            invader.interact_with_bumper(bumper, fleet)

class Invader:
    def interact_with_bumper(self, bumper, invader_fleet):
        if bumper.intersecting(self.rect):
            invader_fleet.at_edge(bumper.incoming_direction)

We’re trying to get InvaderGroup to handle a greater share of the reversing decision. So let’s pass in the InvaderGroup, rather than the fleet, and have the invaders call back to the group, which can then buffer things a bit.

class InvaderGroup:
    def interact_with_bumper(self, bumper, fleet):
        for invader in self.invaders:
            invader.interact_with_bumper(bumper, self)

class Invader:
    def interact_with_bumper(self, bumper, invader_group):
        if bumper.intersecting(self.rect):
            invader_group.at_edge(bumper.incoming_direction)

That really should be breaking some tests, but it is not. I think my tests are too specific. Let’s beef them up a bit: we can save them even if we don’t save the code. Here’s one.

    def test_fleet_motion(self):
        fleet = InvaderFleet()
        # fleet.step = Vector2(30, 0)
        pos = fleet.testing_only_invaders[0].position
        fleet.update(1.0, None)
        new_pos = fleet.testing_only_invaders[0].position
        assert fleet.direction == +1
        assert new_pos - pos == fleet.step
        fleet.at_edge(+1)
        assert fleet.reverse

I don’t see a quick fix for that one.

    def test_ok_leaves_step_alone(self):
        fleet = InvaderFleet()
        origin = fleet.origin
        fleet.process_result("ok")
        assert fleet.origin == origin

That one is interesting because now our if only checks for values of CycleStatus and does nothing. This one needs to be changed for sure.

    def test_ok_leaves_step_alone(self):
        fleet = InvaderFleet()
        origin = fleet.origin
        fleet.process_result(CycleStatus.CONTINUE)
        assert fleet.origin == origin

That still passes, of course. We now have these additional two, also passing:

    def test_end_increments_step(self):
        fleet = InvaderFleet()
        origin = fleet.origin
        fleet.process_result(CycleStatus.NEW_CYCLE)
        assert fleet.origin == origin + fleet.step

    def test_end_at_edge_steps_down_and_left(self):
        fleet = InvaderFleet()
        origin = fleet.origin
        direction = fleet.direction
        fleet.process_result(CycleStatus.REVERSE)
        assert fleet.direction == -direction
        assert fleet.origin == origin - fleet.step + fleet.down_step

So the fleet does the right thing, though the first test is a bit weak.

Now, what we need is for the group to field the at_edge and save up enough information so that at the end of the cycle if knows whether to continue. But the group does not currently know which direction we are going. Let’s imagine that it does:

    def at_edge(self, bumper_incoming_direction):
        if bumper_incoming_direction == self.current_direction:
            self.should_reverse = True

And of course we immediately rewrite that:

    def at_edge(self, bumper_incoming_direction):
        self.should_reverse = bumper_incoming_direction == self.current_direction

But we don’t know our current_direction. Let’s provide it:

class InvaderGroup:
    def update_next(self, origin, current_direction):
        self.current_direction = current_direction
        if self._next_invader >= len(self.invaders):
            self._next_invader = 0
            return CycleStatus.NEW_CYCLE
        invader = self.next_invader()
        invader.position = origin
        self._next_invader += 1

class InvaderFleet:
    def update(self, delta_time, _fleets):
        result = self.invader_group.update_next(self.origin, self.direction)
        self.process_result(result)

Tests are failing. This one’s fix is just to add a direction:

    def test_update_next(self):
        group = InvaderGroup()
        origin = Vector2(100, 100)
        for i in range(55):
            result = group.update_next(origin, 1)
            assert result == CycleStatus.CONTINUE
        result = group.update_next(origin, 1)
        assert result == CycleStatus.NEW_CYCLE

Probably the other is similar. Yes:

    def test_remove_last_invader(self):
        group = InvaderGroup()
        for count in range(55):
            group.kill(group.invaders[0])
        result = group.update_next(Vector2(0, 0), 1)
        assert result == CycleStatus.NEW_CYCLE

Now we are still never sending the REVERSE.

    def update_next(self, origin, current_direction):
        self.current_direction = current_direction
        if self._next_invader >= len(self.invaders):
            self._next_invader = 0
            if self.should_reverse:
                self.should_reverse = False
                return CycleStatus.REVERSE
            else:
                return CycleStatus.NEW_CYCLE
        invader = self.next_invader()
        invader.position = origin
        self._next_invader += 1
        return CycleStatus.CONTINUE

And it works.

I promised not to commit this, and I won’t. But I am going to stop here, because I’m tired, and pick up next time.

I think this is righteous. Will decide tomorrow. See you then!