diff --git a/sapi/fpm/tests/fcgi.inc b/sapi/fpm/tests/fcgi.inc index 7d236c1b03a..26edb8d88a8 100644 --- a/sapi/fpm/tests/fcgi.inc +++ b/sapi/fpm/tests/fcgi.inc @@ -22,17 +22,346 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ -namespace Adoy\FastCGI; +namespace FPM\FastCGI; class TimedOutException extends \Exception {} class ForbiddenException extends \Exception {} class ReadLimitExceeded extends \Exception {} +class TransportException extends \Exception {} + +interface Transport +{ + /** + * Connect to the application. + * + * @param string $host Host address. + * @param int $port Port number. + * @throws TransportException + */ + public function connect(string $host, int $port, ?int $connectionTimeout): void; + + /** + * Set keep alive. + * + * @param bool $keepAlive Whether to enable keep alive. + */ + public function setKeepAlive(bool $keepAlive): void; + + /** + * Set data reading and writing timeout. + * + * @param int $timeoutMs + * @return bool + */ + public function setDataTimeout(int $timeoutMs): bool; + + /** + * Read data. + * + * @param int $numBytes Number of bytes to read. + * @throws TransportException + * @return string + */ + public function read(int $numBytes): string; + + /** + * Write data. + * + * @param string $bytes Bytes to write. + * @throws TransportException + * @return int Number of bytes written. + */ + public function write(string $bytes): int; + + public function getMetaData(): array; + + /** + * Flush data. + * + * @return bool + */ + public function flush(): bool; + + /** + * Close connection. + * + * @return bool + */ + public function close(): bool; +} + +/** + * Stream transport. + * + * Iis based on PHP streams and should be more reliable as it automatically handles timeouts and + * other features. + */ +class StreamTransport implements Transport +{ + /** + * @var resource|null|false + */ + private $stream = null; + + /** + * @var bool + */ + private bool $keepAlive = false; + + /** + * @inheritDoc + */ + public function connect(string $host, int $port, ?int $connectionTimeout = 5000): void + { + if ($this->stream) { + return; + } + $this->stream = fsockopen( + $host, + $port, + $errno, + $errstr, + $connectionTimeout / 1000 + ); + + if (!$this->stream) { + throw new TransportException('Unable to connect to FastCGI application: ' . $errstr); + } + + if ($this->keepAlive) { + $this->setKeepAlive(true); + } + } + + /** + * @inheritDoc + */ + public function setDataTimeout(int $timeoutMs): bool + { + if (!$this->stream) { + return false; + } + return stream_set_timeout( + $this->stream, + floor($timeoutMs / 1000), + ($timeoutMs % 1000) * 1000 + ); + } + + /** + * @inheritDoc + */ + public function setKeepAlive(bool $keepAlive): void + { + $this->keepAlive = $keepAlive; + if (!$this->stream) { + return; + } + if ($keepAlive) { + $socket = socket_import_stream($this->stream); + if ($socket) { + socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1); + } + } else { + $this->close(); + } + } + + /** + * @inheritDoc + */ + public function read(int $numBytes): string + { + $result = fread($this->stream, $numBytes); + if ($result === false) { + throw new TransportException('Reading from the stream failed'); + } + return $result; + } + + /** + * @inheritDoc + */ + public function write(string $bytes): int + { + $result = fwrite($this->stream, $bytes); + if ($result === false) { + throw new TransportException('Writing to the stream failed'); + } + return $result; + } + + public function getMetaData(): array + { + return stream_get_meta_data($this->stream); + } + + /** + * @inheritDoc + */ + public function flush(): bool + { + return fflush($this->stream); + } + + public function close(): bool + { + if ($this->stream) { + $result = fclose($this->stream); + $this->stream = null; + return $result; + } + + return false; + } +} + +/** + * Socket transport. + * + * This transport is more low level than stream and supports some extra socket options like + * SO_KEEPALIVE. However, it is currently less robust and missing some stream features like + * connection timeout. It should be used only for specific use cases. + */ +class SocketTransport implements Transport +{ + /** + * @var \Socket + */ + private ?\Socket $socket = null; + + /** + * @var int + */ + protected int $dataTimeoutMs = 5000; + + /** + * @var bool + */ + private bool $keepAlive = false; + + /** + * @inheritDoc + */ + public function connect(string $host, int $port, ?int $connectionTimeout = 5000): void + { + if ($this->socket) { + return; + } + $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (!$this->socket) { + throw new TransportException('Unable to create socket: ' . socket_strerror(socket_last_error())); + } + + $ip = filter_var($host, FILTER_VALIDATE_IP) ? $host : gethostbyname($host); + + if (!socket_connect($this->socket, $ip, $port)) { + $error = socket_strerror(socket_last_error($this->socket)); + throw new TransportException('Unable to connect to FastCGI application: ' . $error); + } + + if ($this->keepAlive) { + $this->setKeepAlive(true); + } + } + + /** + * @inheritDoc + */ + public function setDataTimeout(int $timeoutMs): bool + { + $this->dataTimeoutMs = $timeoutMs; + return true; + } + + /** + * @inheritDoc + */ + public function setKeepAlive(bool $keepAlive): void + { + $this->keepAlive = $keepAlive; + if (!$this->socket) { + return; + } + if ($keepAlive) { + socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1); + } else { + $this->close(); + } + } + + private function select(array $read, array $write = [], array $except = []): bool + { + return socket_select( + $read, + $write, + $except, + floor($this->dataTimeoutMs / 1000), + ($this->dataTimeoutMs % 1000) * 1000 + ); + } + + /** + * @inheritDoc + */ + public function read(int $numBytes): string + { + if ($this->select([$this->socket]) === false) { + throw new TimedOutException('Reading timeout'); + } + $result = socket_read($this->socket, $numBytes); + if ($result === false) { + throw new TransportException('Reading from the stream failed'); + } + return $result; + } + + /** + * @inheritDoc + */ + public function write(string $bytes): int + { + if ($this->select([], [$this->socket]) === false) { + throw new TimedOutException('Writing timeout'); + } + $result = socket_write($this->socket, $bytes); + if ($result === false) { + throw new TransportException('Writing to the stream failed'); + } + return $result; + } + + public function getMetaData(): array + { + return []; + } + + /** + * @inheritDoc + */ + public function flush(): bool + { + return true; + } + + public function close(): bool + { + if ($this->socket) { + socket_close($this->socket); + $this->socket = null; + return true; + } + + return false; + } +} + /** * Handles communication with a FastCGI application * - * @author Pierrick Charron - * @version 1.0 + * @author Pierrick Charron , Jakub Zelenka + * @version 2.0 */ class Client { @@ -71,12 +400,6 @@ class Client const REQ_STATE_ERR = 3; const REQ_STATE_TIMED_OUT = 4; - /** - * Socket - * @var resource - */ - private $_sock = null; - /** * Host * @var string @@ -109,12 +432,6 @@ class Client */ private $_requests = array(); - /** - * Use persistent sockets to connect to backend - * @var bool - */ - private $_persistentSocket = false; - /** * Connect timeout in milliseconds * @var int @@ -127,16 +444,25 @@ class Client */ private $_readWriteTimeout = 5000; + /** + * Data transport instance + * @var Transport + */ + private Transport $transport; + /** * Constructor * * @param string $host Host of the FastCGI application * @param int $port Port of the FastCGI application + * @param Transport $transport Transport */ - public function __construct($host, $port) + public function __construct($host, $port, Transport $transport) { $this->_host = $host; $this->_port = $port; + + $this->transport = $transport; } /** @@ -157,10 +483,9 @@ class Client */ public function setKeepAlive($b) { - $this->_keepAlive = (bool)$b; - if (!$this->_keepAlive && $this->_sock) { - fclose($this->_sock); - } + $value = (bool) $b; + $this->_keepAlive = $value; + $this->transport->setKeepAlive($value); } /** @@ -173,32 +498,6 @@ class Client return $this->_keepAlive; } - /** - * Define whether or not PHP should attempt to re-use sockets opened by previous - * request for efficiency - * - * @param bool $b true if persistent socket should be used, false otherwise - */ - public function setPersistentSocket($b) - { - $was_persistent = ($this->_sock && $this->_persistentSocket); - $this->_persistentSocket = (bool)$b; - if (!$this->_persistentSocket && $was_persistent) { - fclose($this->_sock); - } - } - - /** - * Get the pesistent socket status - * - * @return bool true if the socket should be persistent, false otherwise - */ - public function getPersistentSocket() - { - return $this->_persistentSocket; - } - - /** * Set the connect timeout * @@ -227,7 +526,7 @@ class Client public function setReadWriteTimeout($timeoutMs) { $this->_readWriteTimeout = $timeoutMs; - $this->set_ms_timeout($this->_readWriteTimeout); + $this->transport->setDataTimeout($this->_readWriteTimeout); } /** @@ -240,56 +539,13 @@ class Client return $this->_readWriteTimeout; } - /** - * Helper to avoid duplicating milliseconds to secs/usecs in a few places - * - * @param int millisecond timeout - * @return bool - */ - private function set_ms_timeout($timeoutMs) { - if (!$this->_sock) { - return false; - } - return stream_set_timeout( - $this->_sock, - floor($timeoutMs / 1000), - ($timeoutMs % 1000) * 1000 - ); - } - - /** * Create a connection to the FastCGI application */ private function connect() { - if (!$this->_sock) { - if ($this->_persistentSocket) { - $this->_sock = pfsockopen( - $this->_host, - $this->_port, - $errno, - $errstr, - $this->_connectTimeout/1000 - ); - } else { - $this->_sock = fsockopen( - $this->_host, - $this->_port, - $errno, - $errstr, - $this->_connectTimeout/1000 - ); - } - - if (!$this->_sock) { - throw new \Exception('Unable to connect to FastCGI application: ' . $errstr); - } - - if (!$this->set_ms_timeout($this->_readWriteTimeout)) { - throw new \Exception('Unable to set timeout on socket'); - } - } + $this->transport->connect($this->_host, $this->_port, $this->_connectTimeout); + $this->transport->setDataTimeout($this->_readWriteTimeout); } /** @@ -408,27 +664,27 @@ class Client * @param int $readLimit max content size * @return array * @throws ReadLimitExceeded + * @throws TransportException */ private function readPacket($readLimit = -1) { - if ($packet = fread($this->_sock, self::HEADER_LEN)) { + if ($packet = $this->transport->read(self::HEADER_LEN)) { $resp = $this->decodePacketHeader($packet); $resp['content'] = ''; if ($resp['contentLength']) { $len = $resp['contentLength']; if ($readLimit >= 0 && $len > $readLimit) { // close connection so it can be re-set reset and throw an error - fclose($this->_sock); - $this->_sock = null; + $this->transport->close(); throw new ReadLimitExceeded("Content has $len bytes but the limit is $readLimit bytes"); } - while ($len && $buf = fread($this->_sock, $len)) { + while ($len && $buf = $this->transport->read($len)) { $len -= strlen($buf); $resp['content'] .= $buf; } } if ($resp['paddingLength']) { - $buf = fread($this->_sock, $resp['paddingLength']); + $this->transport->read($resp['paddingLength']); } return $resp; } else { @@ -451,7 +707,7 @@ class Client foreach ($requestedInfo as $info) { $request .= $this->buildNvpair($info, ''); } - fwrite($this->_sock, $this->buildPacket(self::GET_VALUES, $request, 0)); + $this->transport->write($this->buildPacket(self::GET_VALUES, $request, 0)); $resp = $this->readPacket(); if ($resp['type'] == self::GET_VALUES_RESULT) { @@ -483,15 +739,16 @@ class Client * @param array $params Array of parameters * @param string $stdin Content * @param int $readLimit [optional] the number of bytes to accept in a single packet or -1 if unlimited + * @param int $writeDelayMs Number of milliseconds to wait before write * @return array * @throws ForbiddenException * @throws TimedOutException * @throws \Exception */ - public function request_data(array $params, $stdin, $readLimit = -1) + public function request_data(array $params, $stdin, int $readLimit = -1, int $timoutMs = 0, int $writeDelayMs = 0) { - $id = $this->async_request($params, $stdin); - return $this->wait_for_response_data($id, 0, $readLimit); + $id = $this->async_request($params, $stdin, $writeDelayMs); + return $this->wait_for_response_data($id, $timoutMs, $readLimit); } /** @@ -507,11 +764,12 @@ class Client * * @param array $params Array of parameters * @param string $stdin Content + * @param int $writeDelayMs Number of milliseconds to wait before write * @return int * @throws TimedOutException * @throws \Exception */ - public function async_request(array $params, $stdin) + public function async_request(array $params, $stdin, int $writeDelayMs = 0) { $this->connect(); @@ -519,7 +777,7 @@ class Client $id = mt_rand(1, (1 << 16) - 1); // Using persistent sockets implies you want them kept alive by server! - $keepAlive = intval($this->_keepAlive || $this->_persistentSocket); + $keepAlive = intval($this->_keepAlive); $request = $this->buildPacket( self::BEGIN_REQUEST, @@ -542,16 +800,20 @@ class Client } $request .= $this->buildPacket(self::STDIN, '', $id); - if (fwrite($this->_sock, $request) === false || fflush($this->_sock) === false) { + if ($writeDelayMs > 0) { + usleep($writeDelayMs * 1000); + } - $info = stream_get_meta_data($this->_sock); + if ($this->transport->write($request) === false || $this->transport->flush() === false) { - if ($info['timed_out']) { + $info = $this->transport->getMetaData(); + + if (!empty($info) && $info['timed_out']) { throw new TimedOutException('Write timed out'); } // Broken pipe, tear down so future requests might succeed - fclose($this->_sock); + $this->transport->close(); throw new \Exception('Failed to write request to socket'); } @@ -610,7 +872,7 @@ class Client if ($timeoutMs > 0) { // Reset timeout on socket for now - $this->set_ms_timeout($timeoutMs); + $this->transport->setDataTimeout($timeoutMs); } else { $timeoutMs = $this->_readWriteTimeout; } @@ -636,32 +898,32 @@ class Client } if (microtime(true) - $startTime >= ($timeoutMs * 1000)) { // Reset - $this->set_ms_timeout($this->_readWriteTimeout); + $this->transport->setDataTimeout($this->_readWriteTimeout); throw new \Exception('Timed out'); } } if (!is_array($resp)) { - $info = stream_get_meta_data($this->_sock); + $info = $this->transport->getMetaData(); // We must reset timeout but it must be AFTER we get info - $this->set_ms_timeout($this->_readWriteTimeout); + $this->transport->setDataTimeout($this->_readWriteTimeout); - if ($info['timed_out']) { - throw new TimedOutException('Read timed out'); - } + if (!empty($info)) { + if ($info['timed_out']) { + throw new TimedOutException( 'Read timed out' ); + } - if ($info['unread_bytes'] == 0 - && $info['blocked'] - && $info['eof']) { - throw new ForbiddenException('Not in white list. Check listen.allowed_clients.'); + if ($info['unread_bytes'] == 0 && $info['blocked'] && $info['eof']) { + throw new ForbiddenException( 'Not in white list. Check listen.allowed_clients.' ); + } } throw new \Exception('Read failed'); } // Reset timeout - $this->set_ms_timeout($this->_readWriteTimeout); + $this->transport->setDataTimeout($this->_readWriteTimeout); switch (ord($resp['content'][4])) { case self::CANT_MPX_CONN: diff --git a/sapi/fpm/tests/tester.inc b/sapi/fpm/tests/tester.inc index 533bebea899..fd536e83024 100644 --- a/sapi/fpm/tests/tester.inc +++ b/sapi/fpm/tests/tester.inc @@ -2,7 +2,10 @@ namespace FPM; -use Adoy\FastCGI\Client; +use FPM\FastCGI\Client; +use FPM\FastCGI\SocketTransport; +use FPM\FastCGI\StreamTransport; +use FPM\FastCGI\Transport; require_once 'fcgi.inc'; require_once 'logreader.inc'; @@ -65,6 +68,11 @@ class Tester */ private array $clients = []; + /** + * @var string + */ + private string $clientTransport; + /** * @var LogReader */ @@ -368,15 +376,30 @@ class Tester string $code = '', array $options = [], string $fileName = null, - bool $debug = null + bool $debug = null, + string $clientTransport = 'stream' ) { - $this->configTemplate = $configTemplate; - $this->code = $code; - $this->options = $options; - $this->fileName = $fileName ?: self::getCallerFileName(); - $this->debug = $debug !== null ? $debug : (bool)getenv('TEST_FPM_DEBUG'); - $this->logReader = new LogReader($this->debug); - $this->logTool = new LogTool($this->logReader, $this->debug); + $this->configTemplate = $configTemplate; + $this->code = $code; + $this->options = $options; + $this->fileName = $fileName ?: self::getCallerFileName(); + $this->debug = $debug !== null ? $debug : (bool)getenv('TEST_FPM_DEBUG'); + $this->logReader = new LogReader($this->debug); + $this->logTool = new LogTool($this->logReader, $this->debug); + $this->clientTransport = $clientTransport; + } + + /** + * Creates new client transport. + * + * @return Transport + */ + private function createTransport() + { + return match ($this->clientTransport) { + 'stream' => new StreamTransport(), + 'socket' => new SocketTransport(), + }; } /** @@ -770,6 +793,7 @@ class Tester * @param string|array|null $stdin = null * @param bool $expectError * @param int $readLimit + * @param int $writeDelay * * @return Response * @throws \Exception @@ -787,6 +811,7 @@ class Tester string|array $stdin = null, bool $expectError = false, int $readLimit = -1, + int $writeDelay = 0, ): Response { if ($this->hasError()) { return new Response(null, true); @@ -801,7 +826,7 @@ class Tester try { $this->response = new Response( - $this->getClient($address, $connKeepAlive)->request_data($params, $stdin, $readLimit) + $this->getClient($address, $connKeepAlive)->request_data($params, $stdin, $readLimit, $writeDelay) ); if ($expectError) { $this->error('Expected request error but the request was successful'); @@ -834,6 +859,7 @@ class Tester * @param string|null $errorMessage * @param bool $connKeepAlive * @param int $readTimeout + * @param int $writeDelay * * @return Response[] * @throws \Exception @@ -844,7 +870,8 @@ class Tester string $successMessage = null, string $errorMessage = null, bool $connKeepAlive = false, - int $readTimeout = 0 + int $readTimeout = 0, + int $writeDelay = 0, ) { if (is_numeric($requests)) { $requests = array_fill(0, $requests, []); @@ -855,7 +882,7 @@ class Tester } try { - $connections = array_map(function ($requestData) use ($address, $connKeepAlive) { + $connections = array_map(function ($requestData) use ($address, $connKeepAlive, $writeDelay) { $client = $this->getClient($address, $connKeepAlive); $params = $this->getRequestParams( $requestData['query'] ?? '', @@ -869,7 +896,7 @@ class Tester return [ 'client' => $client, - 'requestId' => $client->async_request($params, false), + 'requestId' => $client->async_request($params, false, $writeDelay), ]; }, $requests); @@ -903,7 +930,7 @@ class Tester * * @return Client */ - private function getClient(string $address = null, $keepAlive = false): Client + private function getClient(string $address = null, bool $keepAlive = false): Client { $address = $address ? $this->processTemplate($address) : $this->getAddr(); if ($address[0] === '/') { // uds @@ -925,11 +952,11 @@ class Tester } if ( ! $keepAlive) { - return new Client($host, $port); + return new Client($host, $port, $this->createTransport()); } if ( ! isset($this->clients[$host][$port])) { - $client = new Client($host, $port); + $client = new Client($host, $port, $this->createTransport()); $client->setKeepAlive(true); $this->clients[$host][$port] = $client; }