Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HttpClient] fix monitoring timeouts when other streams are active #37048

Merged
merged 1 commit into from Jun 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,8 +28,6 @@ final class NativeClientState extends ClientState
public $responseCount = 0;
/** @var string[] */
public $dnsCache = [];
/** @var resource[] */
public $handles = [];
/** @var bool */
public $sleep = false;

Expand Down
16 changes: 3 additions & 13 deletions src/Symfony/Component/HttpClient/Response/NativeResponse.php
Expand Up @@ -220,11 +220,6 @@ private static function schedule(self $response, array &$runningResponses): void
*/
private static function perform(ClientState $multi, array &$responses = null): void
{
// List of native handles for stream_select()
if (null !== $responses) {
$multi->handles = [];
}

foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false;
$remaining = &$multi->openHandles[$i][3];
Expand Down Expand Up @@ -291,8 +286,6 @@ private static function perform(ClientState $multi, array &$responses = null): v
$multi->handlesActivity[$i][] = $e;
unset($multi->openHandles[$i]);
$multi->sleep = false;
} elseif (null !== $responses) {
$multi->handles[] = $h;
}
}

Expand All @@ -307,7 +300,7 @@ private static function perform(ClientState $multi, array &$responses = null): v
}
}

if (\count($multi->handles) >= $multi->maxHostConnections) {
if (\count($multi->openHandles) >= $multi->maxHostConnections) {
return;
}

Expand All @@ -318,10 +311,6 @@ private static function perform(ClientState $multi, array &$responses = null): v
$multi->sleep = false;
self::perform($multi);

if (null !== $response->handle) {
$multi->handles[] = $response->handle;
}

break;
}
}
Expand All @@ -335,7 +324,8 @@ private static function perform(ClientState $multi, array &$responses = null): v
private static function select(ClientState $multi, float $timeout): int
{
$_ = [];
$handles = array_column($multi->openHandles, 0);

return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($multi->handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
}
}
15 changes: 8 additions & 7 deletions src/Symfony/Component/HttpClient/Response/ResponseTrait.php
Expand Up @@ -316,7 +316,7 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
}

$lastActivity = microtime(true);
$isTimeout = false;
$enlapsedTimeout = 0;

while (true) {
$hasActivity = false;
Expand All @@ -338,15 +338,15 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
} elseif (!isset($multi->openHandles[$j])) {
unset($responses[$j]);
continue;
} elseif ($isTimeout) {
} elseif ($enlapsedTimeout >= $timeoutMax) {
$multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))];
} else {
continue;
}

while ($multi->handlesActivity[$j] ?? false) {
$hasActivity = true;
$isTimeout = false;
$enlapsedTimeout = 0;

if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
Expand Down Expand Up @@ -379,7 +379,7 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
}
} elseif ($chunk instanceof ErrorChunk) {
unset($responses[$j]);
$isTimeout = true;
$enlapsedTimeout = $timeoutMax;
} elseif ($chunk instanceof FirstChunk) {
if ($response->logger) {
$info = $response->getInfo();
Expand Down Expand Up @@ -447,10 +447,11 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
continue;
}

switch (self::select($multi, $timeoutMin)) {
case -1: usleep(min(500, 1E6 * $timeoutMin)); break;
case 0: $isTimeout = microtime(true) - $lastActivity > $timeoutMax; break;
if (-1 === self::select($multi, min($timeoutMin, $timeoutMax - $enlapsedTimeout))) {
usleep(min(500, 1E6 * $timeoutMin));
}

$enlapsedTimeout = microtime(true) - $lastActivity;
}
}
}
9 changes: 9 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/CurlHttpClientTest.php
Expand Up @@ -112,6 +112,15 @@ public function testHttp2PushVulcainWithUnusedResponse()
$this->assertSame($expected, $logger->logs);
}

public function testTimeoutIsNotAFatalError()
{
if ('\\' === \DIRECTORY_SEPARATOR) {
$this->markTestSkipped('Too transient on Windows');
}

parent::testTimeoutIsNotAFatalError();
}

private function getVulcainClient(): CurlHttpClient
{
if (\PHP_VERSION_ID >= 70300 && \PHP_VERSION_ID < 70304) {
Expand Down
6 changes: 4 additions & 2 deletions src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php
Expand Up @@ -72,16 +72,17 @@ public function testToStream404()
$this->assertSame($response, stream_get_meta_data($stream)['wrapper_data']->getResponse());
$this->assertSame(404, $response->getStatusCode());

$this->expectException(ClientException::class);
$response = $client->request('GET', 'http://localhost:8057/404');
$stream = $response->toStream();
$this->expectException(ClientException::class);
$response->toStream();
}

public function testNonBlockingStream()
{
$client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/timeout-body');
$stream = $response->toStream();
usleep(10000);

$this->assertTrue(stream_set_blocking($stream, false));
$this->assertSame('<1>', fread($stream, 8192));
Expand All @@ -99,6 +100,7 @@ public function testTimeoutIsNotAFatalError()
$response = $client->request('GET', 'http://localhost:8057/timeout-body', [
'timeout' => 0.25,
]);
$this->assertSame(200, $response->getStatusCode());

try {
$response->getContent();
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php
Expand Up @@ -69,6 +69,10 @@ protected function getHttpClient(string $testCase): HttpClientInterface
$this->markTestSkipped("MockHttpClient doesn't unzip");
break;

case 'testTimeoutWithActiveConcurrentStream':
$this->markTestSkipped('Real transport required');
break;

case 'testDestruct':
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
break;
Expand Down
24 changes: 24 additions & 0 deletions src/Symfony/Contracts/HttpClient/Test/HttpClientTestCase.php
Expand Up @@ -786,6 +786,30 @@ public function testUncheckedTimeoutThrows()
}
}

public function testTimeoutWithActiveConcurrentStream()
{
$p1 = TestHttpServer::start(8067);
$p2 = TestHttpServer::start(8077);

$client = $this->getHttpClient(__FUNCTION__);
$streamingResponse = $client->request('GET', 'http://localhost:8067/max-duration');
$blockingResponse = $client->request('GET', 'http://localhost:8077/timeout-body', [
'timeout' => 0.25,
]);

$this->assertSame(200, $streamingResponse->getStatusCode());
$this->assertSame(200, $blockingResponse->getStatusCode());

$this->expectException(TransportExceptionInterface::class);

try {
$blockingResponse->getContent();
} finally {
$p1->stop();
$p2->stop();
}
}

public function testDestruct()
{
$client = $this->getHttpClient(__FUNCTION__);
Expand Down
19 changes: 12 additions & 7 deletions src/Symfony/Contracts/HttpClient/Test/TestHttpServer.php
Expand Up @@ -19,23 +19,28 @@
*/
class TestHttpServer
{
private static $process;
private static $process = [];

public static function start()
public static function start(int $port = 8057)
{
if (self::$process) {
self::$process->stop();
if (isset(self::$process[$port])) {
self::$process[$port]->stop();
} else {
register_shutdown_function(static function () use ($port) {
self::$process[$port]->stop();
});
}

$finder = new PhpExecutableFinder();
$process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:8057']));
$process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:'.$port]));
$process->setWorkingDirectory(__DIR__.'/Fixtures/web');
$process->start();
self::$process[$port] = $process;

do {
usleep(50000);
} while (!@fopen('http://127.0.0.1:8057/', 'r'));
} while (!@fopen('http://127.0.0.1:'.$port, 'r'));

self::$process = $process;
return $process;
}
}