TLA+ and PHP 03

Previously

In the first post of the series, I've introduced the project I'm trying to complete using TLA+ and its specification verification power.
In the second post, I've started moving on with the specification first and the PHP implementation second.

This prior art contains insight into my thinking process and flow and provides a longer description of what I'm trying to build.

Pipes

My last iteration on the specification and implementation did nail the part where the main PHP thread, the Loop, dispatches the processing of each test block (whatever that will end up being) to separate PHP processes.
The issue with the last implementation was CPU-locking, all the possible mitigations of it (see the first post), and the lack of communication between the Loop thread and the worker threads.

Both issues can be solved using PHP stream_select function: the function, when provided a set of streams to observe, will block the execution of the PHP script until at least one of them is updated.

Why is this important? Because translating TLC's await keyword into PHP code that will actually wait, not poll on a CPU-locked cycle, cannot be achieved with many functions in PHP. Furthermore, the stream_select function will deal with the problem of the Inter-Process Communication (IPC from now on), allowing the Loop to get "push updates" from the workers.

Pipes specification

I've modeled this new approach in the specification below:

----------------------------- MODULE spec_loop -----------------------------
EXTENDS TLC, Integers, FiniteSets, Sequences
CONSTANTS Loop, JobSet, Parallelism, NULL

(*--algorithm loop
variables
	jobsCount = Cardinality(JobSet),
	_startedRegister = [x \in JobSet |-> FALSE],
	_processStatusRegister = [x \in JobSet |-> NULL],
	_processPipesRegister = [x \in JobSet |-> <<>>];

define
	StartedCount == Cardinality({x \in DOMAIN _startedRegister: _startedRegister[x] = TRUE})
	NextNotStarted == CHOOSE x \in DOMAIN _startedRegister: _startedRegister[x] = FALSE
	CompletedCount == Cardinality({x \in JobSet: _processStatusRegister[x] /= NULL})
	Running == StartedCount - CompletedCount
	ParallelismRespected == Running <= Parallelism
	StreamSelectUpdates == {x \in JobSet: _processPipesRegister[x] /= <<>>}
end define;

fair process loop = Loop
variables
	streamSelectUpdates = {},
	updatedProcess = NULL,
	processStatus = NULL,
	processToOutputMap = [x \in JobSet |-> <<>>],
	processToExitStatusMap = [x \in JobSet |-> NULL];

	begin
		StartInitialBatch:
			while StartedCount < Parallelism do
				with p = NextNotStarted do
					_startedRegister[p] := TRUE;
				end with;
			end while;

		WaitForStreamUpdates:
			\* Collect the live value into a copy.
			streamSelectUpdates := StreamSelectUpdates;
			await Cardinality(StreamSelectUpdates) > 0;

		HandleStreamUpdates:
			while Cardinality(streamSelectUpdates) > 0 do
				\* A hack to pop the Jobs from the updates one by one.
				updatedProcess := CHOOSE x \in streamSelectUpdates: 1 > 0;
				streamSelectUpdates := streamSelectUpdates \ {updatedProcess};

				GetProcessOutput:
					with processOutput = _processPipesRegister[updatedProcess] do
						processToOutputMap[updatedProcess] := Append(processToOutputMap[updatedProcess], processOutput);
					end with;
					_processPipesRegister[updatedProcess] := <<>>;

				GetProcessStatus:
					processStatus := _processStatusRegister[updatedProcess];

				UpdateTrackedProcessPipes:
					if processStatus /= NULL then
						processToExitStatusMap[updatedProcess] := processStatus;
					end if;

				CheckLoopStatus:

					if processToExitStatusMap[updatedProcess] /= NULL then

						MaybeStartOneMore:
							if StartedCount < jobsCount then
								with p = NextNotStarted do
								_startedRegister[p] := TRUE;
								end with;
								goto WaitForStreamUpdates;
							end if;

					end if;

				CheckAllDone:
					if Cardinality({x \in JobSet: processToExitStatusMap[x] /= NULL}) = jobsCount then
						goto Done;
					else
						goto WaitForStreamUpdates;
				end if;
			end while;
end process

fair process worker \in JobSet
begin
	WaitToStart:
		await _startedRegister[self] = TRUE;

	Work:
		either
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
		or
			skip;
		end either;

	ExitStatus: \* Either exit 0 or 1, a process MUST produce output.
		either
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
			_processStatusRegister[self] := 0;
			goto Done;
		or
			_processPipesRegister[self] := Append(_processPipesRegister[self], "*");
			_processStatusRegister[self] := 1;
			goto Done;
		end either;
end process;

end algorithm;*)
\* BEGIN TRANSLATION
\* [...]
\* END TRANSLATION 

OneWorkerPerJobStarted == <>[](StartedCount = Cardinality(JobSet))
AllWorkersCompleted == <>[](CompletedCount = Cardinality(JobSet))
=============================================================================

Before unpacking the code, a successful run with the following settings:

  • What to check? - Temporal formula
  • Deadlock - checked
  • Properties - something that should eventually be true.
* `Termination` - all processes will always eventually terminate.
* `OneWorkerPerJobStarted` - each job will, eventually, have a worker assigned to it.
* `AllWorkersCompleted` - all workers should eventually complete.
  • Invariants - citing the IDE "Formulas true in every reachable state".
* `ParallelismRespected` - There should never be a state with more workers than parallelism would allow.

The constants are set up as follows:

  • JobSet <- [ model value ] {J1, J2, J3}
  • Parallelism <- 2

Model checking passing with parallelism 2 and 3 jobs

Registers

The registers, prefixed with a _, are artifacts of the specification that will not make it into code: they represent, utilizing a global variable, the inter-process communication between the Loop and the workers.

To "start" the processes, I'm using the same approach used in the previous post: a _startedRegister variable that maps Jobs to their having a Worker started for them.
This specification global variable is the equivalent of the proc_open PHP function.
Where the proc_open function returns a process resource handle of the resource type, the specification will set the flag indicating a worker started for a job in the _startedRegister variable to TRUE.

The _processPipesRegister variable keeps track of the output emitted by the workers on the write end of the IPC pipe assigned to them.
The proc_open function will create, contextually with starting the process in a separate thread, "pipes" for inter-process communication. Typically one to communicate with the process, the STDIN one, and two to get output from the process: the STDOUT and STDERR ones. For the sake of the specification, I conflated the two pipes into one, and the IPC is represented by the worker process writing the _processPipesRegister global variable, and the Loop process reading from it.

The last register is the _processStatusRegister and represents what a call to the proc_get_status function would return in PHP: the exit status of the worker processes.

More cases mean more models

The specification passes when I use 3 jobs and parallelism of 2, but what about other cases?
I found out a "data-provider-like" approach to TLA+ is not how I should use the tools. To test different cases, I should create a new model. Better: clone the one I used so far and change the value of the constants.

Creating a parallelism 1 model

The first alternate model is to test if the Loop specification works in "serial" mode: jobs are executed one after the other, with a parallelism of 1.

Nothing changes from previous checks, if not the constants:

  • JobSet <- [ model value ] {J1, J2, J3}
  • Parallelism <- 1

Serial model checking fails

And it fails, the ParallelismRespected invariant violated at some point.

To debug the specification failure, I find it extremely useful getting, first, a view of the run, using the UI control to collapse all I can get an idea of where the violation happened (the MaybeStartOneMore phase of the Loop process) and what sequence led the run to that.

Collapse all UI

The first guiding light to find the cause of the model checking failure is that values changed by the state are highlighted in red; the second is that invariants and (temporal) properties will run after the state has changed the values.
After the MaybeStartOneMore state ran, the ParallelismRespected property is checked and will find 3 started processes in the _startedRegister variable, only 1 of which had its exit status collected and thus was deemed as completed. So, at this stage, there are 2 processes running which is more than the allowed parallelism of 1.

Details of the MaybeStartOneMore phase failure

The fix is easy enough: in the MaybeStartOneMore phase, I have to check two conditions:

  • There are still jobs that do not have workers assigned; the specification was already checking this.
  • And the number of running workers is less than, or equal to, the parallelism; this check is the missing one.

Translated in PlusCal, the MaybeStartOneMore phase will change as follows:

MaybeStartOneMore:
-	if StartedCount < jobsCount then
+	if StartedCount < jobsCount /\ Running < Parallelism then
		with p = NextNotStarted do
		_startedRegister[p] := TRUE;
		end with;
		goto WaitForStreamUpdates;
	end if;

Now both models will pass.

More parallelism than jobs

The last model I want to check against the specification is the one where the parallelism is more than the number of jobs.

Clone another model and set:

  • JobSet <- [ model value ] {J1, J2}
  • Parallelism <- 3

And this one fails again; this time, it's a TLC syntax violation.

TLC syntax violation

The message:

TLC threw an unexpected exception.
This was probably caused by an error in the spec or model.
See the User Output or TLC Console for clues to what happened.
The exception was a java.lang.RuntimeException
: Attempted to compute the value of an expression of form
CHOOSE x \in S: P, but no element of S satisfied P.
line 102, col 19 to line 102, col 83 of module spec_loop
The error occurred when TLC was evaluating the nested
expressions at the following positions:
0. Line 131, column 22 to line 142, column 62 in spec_loop
1. Line 131, column 25 to line 131, column 54 in spec_loop
2. Line 132, column 25 to line 137, column 61 in spec_loop
3. Line 133, column 33 to line 135, column 82 in spec_loop
4. Line 133, column 36 to line 134, column 94 in spec_loop
5. Line 134, column 38 to line 134, column 94 in spec_loop
6. Line 134, column 58 to line 134, column 94 in spec_loop
7. Line 134, column 85 to line 134, column 85 in spec_loop
8. Line 133, column 45 to line 133, column 58 in spec_loop
9. Line 102, column 19 to line 102, column 83 in spec_loop

Three things to note:

  • The line and columns number refer to the translation, not the PlusCal code. So, better learn to read that.
  • The message is pretty clear about the failure but will use symbols.
  • TLA+ does not handle the concept of returning anything.

When I say that the error report will use symbols, I mean that I did not write CHOOSE x \in S: P, but no element of S satisfied P anywhere in my PlusCal code, and that is nowhere to be found in the translation.
That error is about this line of the PlusCal code:

NextNotStarted == CHOOSE x \in DOMAIN _startedRegister: _startedRegister[x] = FALSE

And when I say that TLA+ does not handle returning anything, I mean that it does not work as PHP would.
If I had to write the same code in PHP I would translate the code above into the following:

$_startedRegister = ['J1' => true, 'J2' => true];
$nextNotStarted = array_filter($_startedRegister, function($started){ return !$started; });

Next

In the next post, I will make the specification more robust adding more invariants and properties to, finally, move into the PHP translation of it.