FPM tester FastCGI client transport (#11764)

This improves FastCGI client by separating transport functions and adding support for more low level socket transport that is useful for debugging in some cases.

In addition to that it introduces an option for delaying of fcgi request writing.
This commit is contained in:
Jakub Zelenka 2023-08-26 19:03:59 +01:00 committed by GitHub
parent b365fbd014
commit 99a222cd80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 424 additions and 135 deletions

View File

@ -22,17 +22,346 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE. * SOFTWARE.
*/ */
namespace Adoy\FastCGI; namespace FPM\FastCGI;
class TimedOutException extends \Exception {} class TimedOutException extends \Exception {}
class ForbiddenException extends \Exception {} class ForbiddenException extends \Exception {}
class ReadLimitExceeded extends \Exception {} class ReadLimitExceeded extends \Exception {}
class TransportException extends \Exception {}
interface Transport
{
/**
* Connect to the application.
*
* @param string $host Host address.
* @param int $port Port number.
* @throws TransportException
*/
public function connect(string $host, int $port, ?int $connectionTimeout): void;
/**
* Set keep alive.
*
* @param bool $keepAlive Whether to enable keep alive.
*/
public function setKeepAlive(bool $keepAlive): void;
/**
* Set data reading and writing timeout.
*
* @param int $timeoutMs
* @return bool
*/
public function setDataTimeout(int $timeoutMs): bool;
/**
* Read data.
*
* @param int $numBytes Number of bytes to read.
* @throws TransportException
* @return string
*/
public function read(int $numBytes): string;
/**
* Write data.
*
* @param string $bytes Bytes to write.
* @throws TransportException
* @return int Number of bytes written.
*/
public function write(string $bytes): int;
public function getMetaData(): array;
/**
* Flush data.
*
* @return bool
*/
public function flush(): bool;
/**
* Close connection.
*
* @return bool
*/
public function close(): bool;
}
/**
* Stream transport.
*
* Iis based on PHP streams and should be more reliable as it automatically handles timeouts and
* other features.
*/
class StreamTransport implements Transport
{
/**
* @var resource|null|false
*/
private $stream = null;
/**
* @var bool
*/
private bool $keepAlive = false;
/**
* @inheritDoc
*/
public function connect(string $host, int $port, ?int $connectionTimeout = 5000): void
{
if ($this->stream) {
return;
}
$this->stream = fsockopen(
$host,
$port,
$errno,
$errstr,
$connectionTimeout / 1000
);
if (!$this->stream) {
throw new TransportException('Unable to connect to FastCGI application: ' . $errstr);
}
if ($this->keepAlive) {
$this->setKeepAlive(true);
}
}
/**
* @inheritDoc
*/
public function setDataTimeout(int $timeoutMs): bool
{
if (!$this->stream) {
return false;
}
return stream_set_timeout(
$this->stream,
floor($timeoutMs / 1000),
($timeoutMs % 1000) * 1000
);
}
/**
* @inheritDoc
*/
public function setKeepAlive(bool $keepAlive): void
{
$this->keepAlive = $keepAlive;
if (!$this->stream) {
return;
}
if ($keepAlive) {
$socket = socket_import_stream($this->stream);
if ($socket) {
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
}
} else {
$this->close();
}
}
/**
* @inheritDoc
*/
public function read(int $numBytes): string
{
$result = fread($this->stream, $numBytes);
if ($result === false) {
throw new TransportException('Reading from the stream failed');
}
return $result;
}
/**
* @inheritDoc
*/
public function write(string $bytes): int
{
$result = fwrite($this->stream, $bytes);
if ($result === false) {
throw new TransportException('Writing to the stream failed');
}
return $result;
}
public function getMetaData(): array
{
return stream_get_meta_data($this->stream);
}
/**
* @inheritDoc
*/
public function flush(): bool
{
return fflush($this->stream);
}
public function close(): bool
{
if ($this->stream) {
$result = fclose($this->stream);
$this->stream = null;
return $result;
}
return false;
}
}
/**
* Socket transport.
*
* This transport is more low level than stream and supports some extra socket options like
* SO_KEEPALIVE. However, it is currently less robust and missing some stream features like
* connection timeout. It should be used only for specific use cases.
*/
class SocketTransport implements Transport
{
/**
* @var \Socket
*/
private ?\Socket $socket = null;
/**
* @var int
*/
protected int $dataTimeoutMs = 5000;
/**
* @var bool
*/
private bool $keepAlive = false;
/**
* @inheritDoc
*/
public function connect(string $host, int $port, ?int $connectionTimeout = 5000): void
{
if ($this->socket) {
return;
}
$this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!$this->socket) {
throw new TransportException('Unable to create socket: ' . socket_strerror(socket_last_error()));
}
$ip = filter_var($host, FILTER_VALIDATE_IP) ? $host : gethostbyname($host);
if (!socket_connect($this->socket, $ip, $port)) {
$error = socket_strerror(socket_last_error($this->socket));
throw new TransportException('Unable to connect to FastCGI application: ' . $error);
}
if ($this->keepAlive) {
$this->setKeepAlive(true);
}
}
/**
* @inheritDoc
*/
public function setDataTimeout(int $timeoutMs): bool
{
$this->dataTimeoutMs = $timeoutMs;
return true;
}
/**
* @inheritDoc
*/
public function setKeepAlive(bool $keepAlive): void
{
$this->keepAlive = $keepAlive;
if (!$this->socket) {
return;
}
if ($keepAlive) {
socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);
} else {
$this->close();
}
}
private function select(array $read, array $write = [], array $except = []): bool
{
return socket_select(
$read,
$write,
$except,
floor($this->dataTimeoutMs / 1000),
($this->dataTimeoutMs % 1000) * 1000
);
}
/**
* @inheritDoc
*/
public function read(int $numBytes): string
{
if ($this->select([$this->socket]) === false) {
throw new TimedOutException('Reading timeout');
}
$result = socket_read($this->socket, $numBytes);
if ($result === false) {
throw new TransportException('Reading from the stream failed');
}
return $result;
}
/**
* @inheritDoc
*/
public function write(string $bytes): int
{
if ($this->select([], [$this->socket]) === false) {
throw new TimedOutException('Writing timeout');
}
$result = socket_write($this->socket, $bytes);
if ($result === false) {
throw new TransportException('Writing to the stream failed');
}
return $result;
}
public function getMetaData(): array
{
return [];
}
/**
* @inheritDoc
*/
public function flush(): bool
{
return true;
}
public function close(): bool
{
if ($this->socket) {
socket_close($this->socket);
$this->socket = null;
return true;
}
return false;
}
}
/** /**
* Handles communication with a FastCGI application * Handles communication with a FastCGI application
* *
* @author Pierrick Charron <pierrick@adoy.net> * @author Pierrick Charron <pierrick@adoy.net>, Jakub Zelenka <bukka@php.net>
* @version 1.0 * @version 2.0
*/ */
class Client class Client
{ {
@ -71,12 +400,6 @@ class Client
const REQ_STATE_ERR = 3; const REQ_STATE_ERR = 3;
const REQ_STATE_TIMED_OUT = 4; const REQ_STATE_TIMED_OUT = 4;
/**
* Socket
* @var resource
*/
private $_sock = null;
/** /**
* Host * Host
* @var string * @var string
@ -109,12 +432,6 @@ class Client
*/ */
private $_requests = array(); private $_requests = array();
/**
* Use persistent sockets to connect to backend
* @var bool
*/
private $_persistentSocket = false;
/** /**
* Connect timeout in milliseconds * Connect timeout in milliseconds
* @var int * @var int
@ -127,16 +444,25 @@ class Client
*/ */
private $_readWriteTimeout = 5000; private $_readWriteTimeout = 5000;
/**
* Data transport instance
* @var Transport
*/
private Transport $transport;
/** /**
* Constructor * Constructor
* *
* @param string $host Host of the FastCGI application * @param string $host Host of the FastCGI application
* @param int $port Port of the FastCGI application * @param int $port Port of the FastCGI application
* @param Transport $transport Transport
*/ */
public function __construct($host, $port) public function __construct($host, $port, Transport $transport)
{ {
$this->_host = $host; $this->_host = $host;
$this->_port = $port; $this->_port = $port;
$this->transport = $transport;
} }
/** /**
@ -157,10 +483,9 @@ class Client
*/ */
public function setKeepAlive($b) public function setKeepAlive($b)
{ {
$this->_keepAlive = (bool)$b; $value = (bool) $b;
if (!$this->_keepAlive && $this->_sock) { $this->_keepAlive = $value;
fclose($this->_sock); $this->transport->setKeepAlive($value);
}
} }
/** /**
@ -173,32 +498,6 @@ class Client
return $this->_keepAlive; return $this->_keepAlive;
} }
/**
* Define whether or not PHP should attempt to re-use sockets opened by previous
* request for efficiency
*
* @param bool $b true if persistent socket should be used, false otherwise
*/
public function setPersistentSocket($b)
{
$was_persistent = ($this->_sock && $this->_persistentSocket);
$this->_persistentSocket = (bool)$b;
if (!$this->_persistentSocket && $was_persistent) {
fclose($this->_sock);
}
}
/**
* Get the pesistent socket status
*
* @return bool true if the socket should be persistent, false otherwise
*/
public function getPersistentSocket()
{
return $this->_persistentSocket;
}
/** /**
* Set the connect timeout * Set the connect timeout
* *
@ -227,7 +526,7 @@ class Client
public function setReadWriteTimeout($timeoutMs) public function setReadWriteTimeout($timeoutMs)
{ {
$this->_readWriteTimeout = $timeoutMs; $this->_readWriteTimeout = $timeoutMs;
$this->set_ms_timeout($this->_readWriteTimeout); $this->transport->setDataTimeout($this->_readWriteTimeout);
} }
/** /**
@ -240,56 +539,13 @@ class Client
return $this->_readWriteTimeout; return $this->_readWriteTimeout;
} }
/**
* Helper to avoid duplicating milliseconds to secs/usecs in a few places
*
* @param int millisecond timeout
* @return bool
*/
private function set_ms_timeout($timeoutMs) {
if (!$this->_sock) {
return false;
}
return stream_set_timeout(
$this->_sock,
floor($timeoutMs / 1000),
($timeoutMs % 1000) * 1000
);
}
/** /**
* Create a connection to the FastCGI application * Create a connection to the FastCGI application
*/ */
private function connect() private function connect()
{ {
if (!$this->_sock) { $this->transport->connect($this->_host, $this->_port, $this->_connectTimeout);
if ($this->_persistentSocket) { $this->transport->setDataTimeout($this->_readWriteTimeout);
$this->_sock = pfsockopen(
$this->_host,
$this->_port,
$errno,
$errstr,
$this->_connectTimeout/1000
);
} else {
$this->_sock = fsockopen(
$this->_host,
$this->_port,
$errno,
$errstr,
$this->_connectTimeout/1000
);
}
if (!$this->_sock) {
throw new \Exception('Unable to connect to FastCGI application: ' . $errstr);
}
if (!$this->set_ms_timeout($this->_readWriteTimeout)) {
throw new \Exception('Unable to set timeout on socket');
}
}
} }
/** /**
@ -408,27 +664,27 @@ class Client
* @param int $readLimit max content size * @param int $readLimit max content size
* @return array * @return array
* @throws ReadLimitExceeded * @throws ReadLimitExceeded
* @throws TransportException
*/ */
private function readPacket($readLimit = -1) private function readPacket($readLimit = -1)
{ {
if ($packet = fread($this->_sock, self::HEADER_LEN)) { if ($packet = $this->transport->read(self::HEADER_LEN)) {
$resp = $this->decodePacketHeader($packet); $resp = $this->decodePacketHeader($packet);
$resp['content'] = ''; $resp['content'] = '';
if ($resp['contentLength']) { if ($resp['contentLength']) {
$len = $resp['contentLength']; $len = $resp['contentLength'];
if ($readLimit >= 0 && $len > $readLimit) { if ($readLimit >= 0 && $len > $readLimit) {
// close connection so it can be re-set reset and throw an error // close connection so it can be re-set reset and throw an error
fclose($this->_sock); $this->transport->close();
$this->_sock = null;
throw new ReadLimitExceeded("Content has $len bytes but the limit is $readLimit bytes"); throw new ReadLimitExceeded("Content has $len bytes but the limit is $readLimit bytes");
} }
while ($len && $buf = fread($this->_sock, $len)) { while ($len && $buf = $this->transport->read($len)) {
$len -= strlen($buf); $len -= strlen($buf);
$resp['content'] .= $buf; $resp['content'] .= $buf;
} }
} }
if ($resp['paddingLength']) { if ($resp['paddingLength']) {
$buf = fread($this->_sock, $resp['paddingLength']); $this->transport->read($resp['paddingLength']);
} }
return $resp; return $resp;
} else { } else {
@ -451,7 +707,7 @@ class Client
foreach ($requestedInfo as $info) { foreach ($requestedInfo as $info) {
$request .= $this->buildNvpair($info, ''); $request .= $this->buildNvpair($info, '');
} }
fwrite($this->_sock, $this->buildPacket(self::GET_VALUES, $request, 0)); $this->transport->write($this->buildPacket(self::GET_VALUES, $request, 0));
$resp = $this->readPacket(); $resp = $this->readPacket();
if ($resp['type'] == self::GET_VALUES_RESULT) { if ($resp['type'] == self::GET_VALUES_RESULT) {
@ -483,15 +739,16 @@ class Client
* @param array $params Array of parameters * @param array $params Array of parameters
* @param string $stdin Content * @param string $stdin Content
* @param int $readLimit [optional] the number of bytes to accept in a single packet or -1 if unlimited * @param int $readLimit [optional] the number of bytes to accept in a single packet or -1 if unlimited
* @param int $writeDelayMs Number of milliseconds to wait before write
* @return array * @return array
* @throws ForbiddenException * @throws ForbiddenException
* @throws TimedOutException * @throws TimedOutException
* @throws \Exception * @throws \Exception
*/ */
public function request_data(array $params, $stdin, $readLimit = -1) public function request_data(array $params, $stdin, int $readLimit = -1, int $timoutMs = 0, int $writeDelayMs = 0)
{ {
$id = $this->async_request($params, $stdin); $id = $this->async_request($params, $stdin, $writeDelayMs);
return $this->wait_for_response_data($id, 0, $readLimit); return $this->wait_for_response_data($id, $timoutMs, $readLimit);
} }
/** /**
@ -507,11 +764,12 @@ class Client
* *
* @param array $params Array of parameters * @param array $params Array of parameters
* @param string $stdin Content * @param string $stdin Content
* @param int $writeDelayMs Number of milliseconds to wait before write
* @return int * @return int
* @throws TimedOutException * @throws TimedOutException
* @throws \Exception * @throws \Exception
*/ */
public function async_request(array $params, $stdin) public function async_request(array $params, $stdin, int $writeDelayMs = 0)
{ {
$this->connect(); $this->connect();
@ -519,7 +777,7 @@ class Client
$id = mt_rand(1, (1 << 16) - 1); $id = mt_rand(1, (1 << 16) - 1);
// Using persistent sockets implies you want them kept alive by server! // Using persistent sockets implies you want them kept alive by server!
$keepAlive = intval($this->_keepAlive || $this->_persistentSocket); $keepAlive = intval($this->_keepAlive);
$request = $this->buildPacket( $request = $this->buildPacket(
self::BEGIN_REQUEST, self::BEGIN_REQUEST,
@ -542,16 +800,20 @@ class Client
} }
$request .= $this->buildPacket(self::STDIN, '', $id); $request .= $this->buildPacket(self::STDIN, '', $id);
if (fwrite($this->_sock, $request) === false || fflush($this->_sock) === false) { if ($writeDelayMs > 0) {
usleep($writeDelayMs * 1000);
}
$info = stream_get_meta_data($this->_sock); if ($this->transport->write($request) === false || $this->transport->flush() === false) {
if ($info['timed_out']) { $info = $this->transport->getMetaData();
if (!empty($info) && $info['timed_out']) {
throw new TimedOutException('Write timed out'); throw new TimedOutException('Write timed out');
} }
// Broken pipe, tear down so future requests might succeed // Broken pipe, tear down so future requests might succeed
fclose($this->_sock); $this->transport->close();
throw new \Exception('Failed to write request to socket'); throw new \Exception('Failed to write request to socket');
} }
@ -610,7 +872,7 @@ class Client
if ($timeoutMs > 0) { if ($timeoutMs > 0) {
// Reset timeout on socket for now // Reset timeout on socket for now
$this->set_ms_timeout($timeoutMs); $this->transport->setDataTimeout($timeoutMs);
} else { } else {
$timeoutMs = $this->_readWriteTimeout; $timeoutMs = $this->_readWriteTimeout;
} }
@ -636,32 +898,32 @@ class Client
} }
if (microtime(true) - $startTime >= ($timeoutMs * 1000)) { if (microtime(true) - $startTime >= ($timeoutMs * 1000)) {
// Reset // Reset
$this->set_ms_timeout($this->_readWriteTimeout); $this->transport->setDataTimeout($this->_readWriteTimeout);
throw new \Exception('Timed out'); throw new \Exception('Timed out');
} }
} }
if (!is_array($resp)) { if (!is_array($resp)) {
$info = stream_get_meta_data($this->_sock); $info = $this->transport->getMetaData();
// We must reset timeout but it must be AFTER we get info // We must reset timeout but it must be AFTER we get info
$this->set_ms_timeout($this->_readWriteTimeout); $this->transport->setDataTimeout($this->_readWriteTimeout);
if ($info['timed_out']) { if (!empty($info)) {
throw new TimedOutException('Read timed out'); if ($info['timed_out']) {
} throw new TimedOutException( 'Read timed out' );
}
if ($info['unread_bytes'] == 0 if ($info['unread_bytes'] == 0 && $info['blocked'] && $info['eof']) {
&& $info['blocked'] throw new ForbiddenException( 'Not in white list. Check listen.allowed_clients.' );
&& $info['eof']) { }
throw new ForbiddenException('Not in white list. Check listen.allowed_clients.');
} }
throw new \Exception('Read failed'); throw new \Exception('Read failed');
} }
// Reset timeout // Reset timeout
$this->set_ms_timeout($this->_readWriteTimeout); $this->transport->setDataTimeout($this->_readWriteTimeout);
switch (ord($resp['content'][4])) { switch (ord($resp['content'][4])) {
case self::CANT_MPX_CONN: case self::CANT_MPX_CONN:

View File

@ -2,7 +2,10 @@
namespace FPM; namespace FPM;
use Adoy\FastCGI\Client; use FPM\FastCGI\Client;
use FPM\FastCGI\SocketTransport;
use FPM\FastCGI\StreamTransport;
use FPM\FastCGI\Transport;
require_once 'fcgi.inc'; require_once 'fcgi.inc';
require_once 'logreader.inc'; require_once 'logreader.inc';
@ -65,6 +68,11 @@ class Tester
*/ */
private array $clients = []; private array $clients = [];
/**
* @var string
*/
private string $clientTransport;
/** /**
* @var LogReader * @var LogReader
*/ */
@ -368,15 +376,30 @@ class Tester
string $code = '', string $code = '',
array $options = [], array $options = [],
string $fileName = null, string $fileName = null,
bool $debug = null bool $debug = null,
string $clientTransport = 'stream'
) { ) {
$this->configTemplate = $configTemplate; $this->configTemplate = $configTemplate;
$this->code = $code; $this->code = $code;
$this->options = $options; $this->options = $options;
$this->fileName = $fileName ?: self::getCallerFileName(); $this->fileName = $fileName ?: self::getCallerFileName();
$this->debug = $debug !== null ? $debug : (bool)getenv('TEST_FPM_DEBUG'); $this->debug = $debug !== null ? $debug : (bool)getenv('TEST_FPM_DEBUG');
$this->logReader = new LogReader($this->debug); $this->logReader = new LogReader($this->debug);
$this->logTool = new LogTool($this->logReader, $this->debug); $this->logTool = new LogTool($this->logReader, $this->debug);
$this->clientTransport = $clientTransport;
}
/**
* Creates new client transport.
*
* @return Transport
*/
private function createTransport()
{
return match ($this->clientTransport) {
'stream' => new StreamTransport(),
'socket' => new SocketTransport(),
};
} }
/** /**
@ -770,6 +793,7 @@ class Tester
* @param string|array|null $stdin = null * @param string|array|null $stdin = null
* @param bool $expectError * @param bool $expectError
* @param int $readLimit * @param int $readLimit
* @param int $writeDelay
* *
* @return Response * @return Response
* @throws \Exception * @throws \Exception
@ -787,6 +811,7 @@ class Tester
string|array $stdin = null, string|array $stdin = null,
bool $expectError = false, bool $expectError = false,
int $readLimit = -1, int $readLimit = -1,
int $writeDelay = 0,
): Response { ): Response {
if ($this->hasError()) { if ($this->hasError()) {
return new Response(null, true); return new Response(null, true);
@ -801,7 +826,7 @@ class Tester
try { try {
$this->response = new Response( $this->response = new Response(
$this->getClient($address, $connKeepAlive)->request_data($params, $stdin, $readLimit) $this->getClient($address, $connKeepAlive)->request_data($params, $stdin, $readLimit, $writeDelay)
); );
if ($expectError) { if ($expectError) {
$this->error('Expected request error but the request was successful'); $this->error('Expected request error but the request was successful');
@ -834,6 +859,7 @@ class Tester
* @param string|null $errorMessage * @param string|null $errorMessage
* @param bool $connKeepAlive * @param bool $connKeepAlive
* @param int $readTimeout * @param int $readTimeout
* @param int $writeDelay
* *
* @return Response[] * @return Response[]
* @throws \Exception * @throws \Exception
@ -844,7 +870,8 @@ class Tester
string $successMessage = null, string $successMessage = null,
string $errorMessage = null, string $errorMessage = null,
bool $connKeepAlive = false, bool $connKeepAlive = false,
int $readTimeout = 0 int $readTimeout = 0,
int $writeDelay = 0,
) { ) {
if (is_numeric($requests)) { if (is_numeric($requests)) {
$requests = array_fill(0, $requests, []); $requests = array_fill(0, $requests, []);
@ -855,7 +882,7 @@ class Tester
} }
try { try {
$connections = array_map(function ($requestData) use ($address, $connKeepAlive) { $connections = array_map(function ($requestData) use ($address, $connKeepAlive, $writeDelay) {
$client = $this->getClient($address, $connKeepAlive); $client = $this->getClient($address, $connKeepAlive);
$params = $this->getRequestParams( $params = $this->getRequestParams(
$requestData['query'] ?? '', $requestData['query'] ?? '',
@ -869,7 +896,7 @@ class Tester
return [ return [
'client' => $client, 'client' => $client,
'requestId' => $client->async_request($params, false), 'requestId' => $client->async_request($params, false, $writeDelay),
]; ];
}, $requests); }, $requests);
@ -903,7 +930,7 @@ class Tester
* *
* @return Client * @return Client
*/ */
private function getClient(string $address = null, $keepAlive = false): Client private function getClient(string $address = null, bool $keepAlive = false): Client
{ {
$address = $address ? $this->processTemplate($address) : $this->getAddr(); $address = $address ? $this->processTemplate($address) : $this->getAddr();
if ($address[0] === '/') { // uds if ($address[0] === '/') { // uds
@ -925,11 +952,11 @@ class Tester
} }
if ( ! $keepAlive) { if ( ! $keepAlive) {
return new Client($host, $port); return new Client($host, $port, $this->createTransport());
} }
if ( ! isset($this->clients[$host][$port])) { if ( ! isset($this->clients[$host][$port])) {
$client = new Client($host, $port); $client = new Client($host, $port, $this->createTransport());
$client->setKeepAlive(true); $client->setKeepAlive(true);
$this->clients[$host][$port] = $client; $this->clients[$host][$port] = $client;
} }