TLA+ and PHP 04

Previously

This post is part of a series of posts where I'm exploring how to use TLA+ to specify, check and then implement a PHP project that deals with concurrent processes; here are the links to the first post, the second, and the third one.

Checking for output

In the previous post, I have completed a specification of the Loop that would pass model checking in different scenarios with more jobs than parallelism, more parallelism than jobs, and, finally, more of a "serial" case with a parallelism of 1.
I've added two more models to check:

  • 3 jobs, parallelism 3
  • 1 job, parallelism 1

The specification modified as outlined in the third post passes all model checks without any change.

There is something fundamental to the Loop correct execution that I'm currently not checking: the Loop should collect all the output emitted by the Workers.

In the specification, I've represented the output emitted by the workers as a * char appended to a sequence. I'm keeping that over-simplification in place and leveraging that by ensuring the amount of * chars collected by the Loop is the same emitted by the workers.
Since the workers use an Inter-process Communication pipe to share their output with the Loop, I want to make another assertion that all worker pipes are drained by the end of the Loop.

I've done the first update to the specification to support this new check:

----------------------------- MODULE spec_loop -----------------------------
EXTENDS TLC, Integers, FiniteSets, Sequences
CONSTANTS Loop, JobSet, Parallelism, NULL

(*--algorithm loop
variables
	jobsCount = Cardinality(JobSet),
	_startedRegister = [x \in JobSet |-> FALSE],
	_processStatusRegister = [x \in JobSet |-> NULL],
	_processPipesRegister = [x \in JobSet |-> <<>>],
	_emittedOutputLen = 0,
	_collectedOutputLen = 0;

define
	StartedCount == Cardinality({x \in DOMAIN _startedRegister: _startedRegister[x] = TRUE})
	NextNotStarted == CHOOSE x \in DOMAIN _startedRegister: _startedRegister[x] = FALSE
	CompletedCount == Cardinality({x \in JobSet: _processStatusRegister[x] /= NULL})
	Running == StartedCount - CompletedCount
	ParallelismRespected == Running <= Parallelism
	StreamSelectUpdates == {x \in JobSet: _processPipesRegister[x] /= <<>>}
end define;

fair process loop = Loop
variables
	streamSelectUpdates = {},
	updatedProcess = NULL,
	processStatus = NULL,
	processToOutputMap = [x \in JobSet |-> <<>>],
	processToExitStatusMap = [x \in JobSet |-> NULL];

	begin
		StartInitialBatch:
			while StartedCount < Parallelism /\ StartedCount < jobsCount do
				with p = NextNotStarted do
					_startedRegister[p] := TRUE;
				end with;
			end while;
		WaitForStreamUpdates:
			streamSelectUpdates := StreamSelectUpdates;
			await Cardinality(StreamSelectUpdates) > 0;
		HandleStreamUpdates:
			while Cardinality(streamSelectUpdates) > 0 do
				updatedProcess := CHOOSE x \in streamSelectUpdates: TRUE;
				streamSelectUpdates := streamSelectUpdates \ {updatedProcess};
				GetProcessOutput:
					with processOutput = _processPipesRegister[updatedProcess] do
						processToOutputMap[updatedProcess] := Append(processToOutputMap[updatedProcess], processOutput);
						_collectedOutputLen := _collectedOutputLen + Len(processOutput);
					end with;
					_processPipesRegister[updatedProcess] := <<>>;
				GetProcessStatus:
					processStatus := _processStatusRegister[updatedProcess];
				UpdateTrackedProcessPipes:
					if processStatus /= NULL then
						processToExitStatusMap[updatedProcess] := processStatus;
					end if;
				CheckLoopStatus:
					if processToExitStatusMap[updatedProcess] /= NULL then
						MaybeStartOneMore:
							if StartedCount < jobsCount /\ Running < Parallelism then
								with p = NextNotStarted do
								_startedRegister[p] := TRUE;
								end with;
								goto WaitForStreamUpdates;
							end if;
					end if;
					CheckAllDone:
						if Cardinality({x \in JobSet: processToExitStatusMap[x] /= NULL}) = jobsCount then
							assert(_collectedOutputLen = _emittedOutputLen);
							goto Done;
						else
							goto WaitForStreamUpdates;
						end if;
			end while;
end process

fair process worker \in JobSet
begin
	WaitToStart:
		await _startedRegister[self] = TRUE;
	Work:
		either
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
			_emittedOutputLen := _emittedOutputLen + 1
		or
			skip;
		end either;
	ExitStatus:
		either
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
			_emittedOutputLen := _emittedOutputLen + 1;
			_processStatusRegister[self] := 0;
			goto Done;
		or
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
			_emittedOutputLen := _emittedOutputLen + 1;
			_processStatusRegister[self] := 1;
			goto Done;
		end either;
end process;

end algorithm;*)
\* BEGIN TRANSLATION
\* [...]
\* END TRANSLATION 

OneWorkerPerJobStarted == <>[](StartedCount = Cardinality(JobSet))
AllWorkersCompleted == <>[](CompletedCount = Cardinality(JobSet))
=============================================================================

To note in the specification:

  • I'm using the _emittedOutputLen to store the length of the output generated by the workers; only the worker processes will update it.
  • The _collectedOutputLen global variable stores the length of the output collected by the Loop process.
  • I've not added any Invariant Property to my model

The specification is failing when checked against a model:

Failing collected output assertion

The error message reads as follows (I'm omitting the lines):

The first argument of Assert evaluated to FALSE; the second argument was:
"Failure of assertion at line 69, column 57."
The error occurred when TLC was evaluating the nested
expressions at the following positions: ...

The failing assertion is the one I've added in the Loop CheckAllDone phase; before moving to the Done phase of the Loop, I want to make sure the loop collected all the emitted output.
Instead of using a temporal property, I'm using an assertion. The reason for this approach is that a temporal property would be an "eventually, then always" one.
But the length of the emitted and collected output would be 0 at the start of the specification, and it would be invalidated now and then as the Loop and the workers alternate emitting and collecting it. Changing the temporal condition to just "eventually" would not guarantee me that is true at the end, when the Loop is completed.
And "when the loop is done" is precisely the only moment I want to make this check, so an assertion makes sense.

That assertion fails, telling me the current specification is not correctly collecting all the output emitted by the workers.

Another pass on the specification adds a step in the CheckLoopStatus phase to get the output the process might have emitted before exiting. This update will make the specification pass across all models:

"`tla ----------------------------- MODULE spec_loop ----------------------------- EXTENDS TLC, Integers, FiniteSets, Sequences CONSTANTS Loop, JobSet, Parallelism, NULL

(*--algorithm loop variables jobsCount = Cardinality(JobSet), _startedRegister = [x \in JobSet |-> FALSE], _processStatusRegister = [x \in JobSet |-> NULL], _processPipesRegister = [x \in JobSet |-> <<>>], _emittedOutputLen = 0, _collectedOutputLen = 0;

define StartedCount == Cardinality({x \in DOMAIN _startedRegister: _startedRegister[x] = TRUE}) NextNotStarted == CHOOSE x \in DOMAIN _startedRegister: _startedRegister[x] = FALSE CompletedCount == Cardinality({x \in JobSet: _processStatusRegister[x] /= NULL}) Running == StartedCount - CompletedCount ParallelismRespected == Running <= Parallelism StreamSelectUpdates == {x \in JobSet: _processPipesRegister[x] /= <<>>} end define;

macro startProcess() begin with p = NextNotStarted do _startedRegister[NextNotStarted] := TRUE; end with; end macro;

macro collectProcessOutput(p, processToOutputMap) begin with processOutput = _processPipesRegister[updatedProcess] do processToOutputMap[updatedProcess] := Append(processToOutputMap[updatedProcess], processOutput); _collectedOutputLen := _collectedOutputLen + Len(processOutput); end with; _processPipesRegister[updatedProcess] := <<>>; end macro;

macro emitOutput(p, output) begin _processPipesRegister[p] := Append(_processPipesRegister[p], output); _emittedOutputLen := _emittedOutputLen + 1; end macro;

macro exitWithStatus(p, status) begin _processStatusRegister[p] := status; end macro;

fair process loop = Loop variables streamSelectUpdates = {}, updatedProcess = NULL, processStatus = NULL, processToOutputMap = [x \in JobSet |-> <<>>], processToExitStatusMap = [x \in JobSet |-> NULL];

begin
	StartInitialBatch:
		while StartedCount < Parallelism /\ StartedCount < jobsCount do
		    startProcess();
		end while;
	WaitForStreamUpdates:
		streamSelectUpdates := StreamSelectUpdates;
		await Cardinality(StreamSelectUpdates) > 0;
	HandleStreamUpdates:
		while Cardinality(streamSelectUpdates) > 0 do
			updatedProcess := CHOOSE x \in streamSelectUpdates: TRUE;
			streamSelectUpdates := streamSelectUpdates \ {updatedProcess};
			GetProcessOutput:
			    collectProcessOutput(updatedProcess, processToOutputMap);
			GetProcessStatus:
				processStatus := _processStatusRegister[updatedProcess];
			UpdateTrackedProcessPipes:
				if processStatus /= NULL then
					processToExitStatusMap[updatedProcess] := processStatus;
				end if;
			CheckLoopStatus:
				if processToExitStatusMap[updatedProcess] /= NULL then
					GetProcessExitOutput:	
					    collectProcessOutput(updatedProcess, processToOutputMap);
					MaybeStartOneMore:
						if StartedCount < jobsCount /\ Running < Parallelism then
		                    startProcess();
							goto WaitForStreamUpdates;
						end if;
				end if;
				CheckAllDone:
					if Cardinality({x \in JobSet: processToExitStatusMap[x] /= NULL}) = jobsCount then
						assert(_collectedOutputLen = _emittedOutputLen);
						goto Done;
					else
						goto WaitForStreamUpdates;
					end if;
		end while;

end process

fair process worker \in JobSet begin WaitToStart: await _startedRegister[self] = TRUE; Work: either emitOutput(self, ""); or skip; end either; ExitStatus: either emitOutput(self, ""); exitWithStatus(self, 0); goto Done; or _processPipesRegister[self] := Append(_processPipesRegister[self], "*"); _emittedOutputLen := _emittedOutputLen + 1; _processStatusRegister[self] := 1; goto Done; end either; end process;

end algorithm;*) * BEGIN TRANSLATION * [...] * END TRANSLATION

OneWorkerPerJobStarted == <>[](StartedCount = Cardinality(JobSet))

AllWorkersCompleted == <>[](CompletedCount = Cardinality(JobSet))


Besides putting in place the fix to make sure the specification will pass all models check, I've also refactored the code to extract duplicated code into `macro's: `startProcess`, `collectProcessOutput`, `emitOutput`, and `exitWithStatus`.  
What each of them does is pretty easy to understand, and macros work like functions. For the most: the one exception is macros will only be able to change the value of variables that are either local to the macro (but this is pretty common in any programming language) or that are passed to the macros. Once this is taken care of, macros help reduce the verbosity and duplication of the code a bit.

## PHP translation
There are more features I want my Loop-based code to support, but it's worth trying to translate that into PHP code before I move on and find myself having to write too complicated code.

```php
<?php

class Loop
{
    private $jobs;
    private $jobsCount;
    protected $parallelism;
    private $procs = [];
    private $startedCount = 0;
    private $runningCount = 0;
    private $readStdoutStreams = [];
    private $readStderrStreams = [];
    private $jobToExitStatusMap = [];
    private $jobToStdoutContents = [];
    private $jobToStderrContents = [];

    public function __construct($jobs, $parallelism)
    {
        $this->jobs = $jobs;
        $this->jobsCount = count($jobs);
        $this->parallelism = $parallelism;
        reset($this->jobs);
    }

    private function collectProcessOutput(stdClass $proc)
    {
        if (!isset($this->jobToStdoutContents[$proc->job])) {
            $this->jobToStdoutContents[$proc->job] = '';
        }
        if (!isset($this->jobToStderrContents[$proc->job])) {
            $this->jobToStderrContents[$proc->job] = '';
        }
        $this->jobToStdoutContents[$proc->job] .= stream_get_contents($proc->stdoutStream);
        $this->jobToStderrContents[$proc->job] .= stream_get_contents($proc->stderrStream);
    }

    private function startWorker()
    {
        $job = current($this->jobs);
        next($this->jobs);
        $command = sprintf("%s %s %s", PHP_BINARY, escapeshellarg(__FILE__), $job);
        $desc = [
            0 => ['pipe', 'r'], // Proc STDIN.
            1 => ['pipe', 'w'], // Proc STDOUT.
            2 => ['pipe', 'w'], // Proc STDERR.
        ];
        $procHandle = proc_open($command, $desc, $pipes, null, null, ['bypass_shell']);
        $procStdout = $pipes[1];
        $procStderr = $pipes[2];
        $this->procs[] = (object) [
            'procHandle' => $procHandle, 'stdoutStream' => $procStdout, 'stderrStream' => $procStderr, 'job' => $job
        ];
        $this->startedCount++;
        $this->runningCount++;
        $this->readStdoutStreams[] = $procStdout;
        $this->readStderrStreams[] = $procStderr;
    }

    private function getProcFromStream($stream)
    {
        foreach ($this->procs as $key => $proc) {
            if ($proc->stdoutStream === $stream || $proc->stderrStream === $stream) {
                return $proc;
            }
        }

        return null;
    }

    public function run()
    {
        // StartInitialBatch
        while ($this->startedCount < $this->parallelism && $this->startedCount < $this->jobsCount) {
            $this->startWorker();
        }

        while (true) {
            //WaitForStreamUpdates

            // Reset this on each run to make sure we only wait for updates from active process streams.
            $readStreams = array_merge($this->readStdoutStreams, $this->readStderrStreams);
            $read = $readStreams;
            $write = [];
            $except = [];
            $streamUpdates = stream_select($read, $write, $except, 10);

            if (!$streamUpdates) {
                continue;
            }

            // HandleStreamUpdates
            $skipRead = [];
            foreach ($read as $index => $stream) {
                if (isset($skipRead[$index])) {
                    // This stream was coupled with an already read one, it's been read already.
                    continue;
                }

                // GetProcessOutput
                $proc = $this->getProcFromStream($stream);
                $this->collectProcessOutput($proc);
                // GetProcessStatus
                $procStatus = proc_get_status($proc->procHandle);
                if (!$procStatus['running']) {
                    $this->jobToExitStatusMap[$proc->job] = $procStatus['exitcode'];

                    // UpdateTrackedProcessPipes
                    // We'll empty the other stream now, skip that.
                    $otherStreamIndex = in_array($proc->stdoutStream, $this->readStdoutStreams, true) ?
                        array_search($proc->stderrStream, $readStreams, true)
                        : array_search($proc->stdoutStream, $readStreams, true);
                    $skipRead[$otherStreamIndex] = true;
                    --$this->runningCount;
                    $this->readStdoutStreams = array_diff($this->readStdoutStreams, [$proc->stdoutStream]);
                    $this->readStderrStreams = array_diff($this->readStderrStreams, [$proc->stderrStream]);

                    // GetProcessExitOutput
                    $this->collectProcessOutput($proc);
                    // MaybeStartOneMore
                    if ($this->startedCount < $this->jobsCount && $this->runningCount < $this->parallelism) {
                        $this->startWorker();
                    }
                }
                //CheckAllDone
                if (count($this->jobToExitStatusMap) === $this->jobsCount) {
                    break 2;
                }
            }
        }

        return array_map(function ($job) {
            return [
                'job' => $job,
                'stdout' => $this->jobToStdoutContents[$job],
                'sterr' => $this->jobToStderrContents[$job],
                'exitStatus' => $this->jobToExitStatusMap[$job]
            ];
        }, $this->jobs);
    }
}

function worker()
{
    do {
        $stdoutOrStderr = mt_rand(0, 1);
        if ($stdoutOrStderr === 0) {
            fwrite(STDOUT, "*", 1);
        } else {
            fwrite(STDERR, "*", 1);
        }
        $action = mt_rand(0, 3);
    } while ($action > 0);

    $exitStatus = mt_rand(0, 1);
    exit($exitStatus);
}

if (!isset($argv[1])) {
    // Start the loop.
    $loop = new Loop(range(1, 5), 2);
    echo json_encode($loop->run(), JSON_PRETTY_PRINT);
} else {
    // Handle a worker request.
    worker();
}

I've tried to port over, as comment blocks, the labels that would be in the specification.
Running the script will print output similar to this on the screen:

» php test-loop-02.php
[
    {
        "job": 1,
        "stdout": "**",
        "sterr": "******",
        "exitStatus": 1
    },
    {
        "job": 2,
        "stdout": "",
        "sterr": "*",
        "exitStatus": 1
    },
    {
        "job": 3,
        "stdout": "***",
        "sterr": "*****",
        "exitStatus": 1
    },
    {
        "job": 4,
        "stdout": "**",
        "sterr": "",
        "exitStatus": 0
    },
    {
        "job": 5,
        "stdout": "*",
        "sterr": "***",
        "exitStatus": 0
    }

Nothing to be too excited about, but enough to demonstrate the Loop component works as intended.
I've gone for clarity and verbosity over cleverness; it could be polished further; I'm not doing that now as there are more features I want to add to the specification that will need at least a further iteration over the code.
The most significant differences between the PHP code and the specification are:

  • In PHP, `resource's cannot be used as keys to arrays, so I've added some indirect maps to deal with that.
  • Again, in PHP, processes will emit output on the STDOUT and the STDERR streams. To account for that, the Loop will read from both streams. I've taken some additional care to avoid calling proc_get_status twice on a terminated process, as the second call will return a -1 and not the actual exit code.

Besides the implementation differences, the structure is the same.
I've not gone as far as to use goto in the PHP code, though, and relied on regular loops.
The code lacks several security features like error handling and closing of the streams; it conveys the idea good enough as it is.

Next

In the next post, I will be adding fast-failure support to the Loop and start on a shared resources lock sharing system.