Skip to content

Commit

Permalink
bug #37048 [HttpClient] fix monitoring timeouts when other streams ar…
Browse files Browse the repository at this point in the history
…e active (nicolas-grekas)

This PR was merged into the 4.4 branch.

Discussion
----------

[HttpClient] fix monitoring timeouts when other streams are active

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        | -

Commits
-------

d2a53f0 [HttpClient] fix monitoring timeouts when other streams are active
  • Loading branch information
nicolas-grekas committed Jun 7, 2020
2 parents 1e7f3e2 + d2a53f0 commit 3755efd
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 31 deletions.
Expand Up @@ -28,8 +28,6 @@ final class NativeClientState extends ClientState
public $responseCount = 0;
/** @var string[] */
public $dnsCache = [];
/** @var resource[] */
public $handles = [];
/** @var bool */
public $sleep = false;

Expand Down
16 changes: 3 additions & 13 deletions src/Symfony/Component/HttpClient/Response/NativeResponse.php
Expand Up @@ -220,11 +220,6 @@ private static function schedule(self $response, array &$runningResponses): void
*/
private static function perform(ClientState $multi, array &$responses = null): void
{
// List of native handles for stream_select()
if (null !== $responses) {
$multi->handles = [];
}

foreach ($multi->openHandles as $i => [$h, $buffer, $onProgress]) {
$hasActivity = false;
$remaining = &$multi->openHandles[$i][3];
Expand Down Expand Up @@ -291,8 +286,6 @@ private static function perform(ClientState $multi, array &$responses = null): v
$multi->handlesActivity[$i][] = $e;
unset($multi->openHandles[$i]);
$multi->sleep = false;
} elseif (null !== $responses) {
$multi->handles[] = $h;
}
}

Expand All @@ -307,7 +300,7 @@ private static function perform(ClientState $multi, array &$responses = null): v
}
}

if (\count($multi->handles) >= $multi->maxHostConnections) {
if (\count($multi->openHandles) >= $multi->maxHostConnections) {
return;
}

Expand All @@ -318,10 +311,6 @@ private static function perform(ClientState $multi, array &$responses = null): v
$multi->sleep = false;
self::perform($multi);

if (null !== $response->handle) {
$multi->handles[] = $response->handle;
}

break;
}
}
Expand All @@ -335,7 +324,8 @@ private static function perform(ClientState $multi, array &$responses = null): v
private static function select(ClientState $multi, float $timeout): int
{
$_ = [];
$handles = array_column($multi->openHandles, 0);

return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($multi->handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
return (!$multi->sleep = !$multi->sleep) ? -1 : stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
}
}
15 changes: 8 additions & 7 deletions src/Symfony/Component/HttpClient/Response/ResponseTrait.php
Expand Up @@ -316,7 +316,7 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
}

$lastActivity = microtime(true);
$isTimeout = false;
$enlapsedTimeout = 0;

while (true) {
$hasActivity = false;
Expand All @@ -338,15 +338,15 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
} elseif (!isset($multi->openHandles[$j])) {
unset($responses[$j]);
continue;
} elseif ($isTimeout) {
} elseif ($enlapsedTimeout >= $timeoutMax) {
$multi->handlesActivity[$j] = [new ErrorChunk($response->offset, sprintf('Idle timeout reached for "%s".', $response->getInfo('url')))];
} else {
continue;
}

while ($multi->handlesActivity[$j] ?? false) {
$hasActivity = true;
$isTimeout = false;
$enlapsedTimeout = 0;

if (\is_string($chunk = array_shift($multi->handlesActivity[$j]))) {
if (null !== $response->inflate && false === $chunk = @inflate_add($response->inflate, $chunk)) {
Expand Down Expand Up @@ -379,7 +379,7 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
}
} elseif ($chunk instanceof ErrorChunk) {
unset($responses[$j]);
$isTimeout = true;
$enlapsedTimeout = $timeoutMax;
} elseif ($chunk instanceof FirstChunk) {
if ($response->logger) {
$info = $response->getInfo();
Expand Down Expand Up @@ -447,10 +447,11 @@ public static function stream(iterable $responses, float $timeout = null): \Gene
continue;
}

switch (self::select($multi, $timeoutMin)) {
case -1: usleep(min(500, 1E6 * $timeoutMin)); break;
case 0: $isTimeout = microtime(true) - $lastActivity > $timeoutMax; break;
if (-1 === self::select($multi, min($timeoutMin, $timeoutMax - $enlapsedTimeout))) {
usleep(min(500, 1E6 * $timeoutMin));
}

$enlapsedTimeout = microtime(true) - $lastActivity;
}
}
}
9 changes: 9 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/CurlHttpClientTest.php
Expand Up @@ -112,6 +112,15 @@ public function testHttp2PushVulcainWithUnusedResponse()
$this->assertSame($expected, $logger->logs);
}

public function testTimeoutIsNotAFatalError()
{
if ('\\' === \DIRECTORY_SEPARATOR) {
$this->markTestSkipped('Too transient on Windows');
}

parent::testTimeoutIsNotAFatalError();
}

private function getVulcainClient(): CurlHttpClient
{
if (\PHP_VERSION_ID >= 70300 && \PHP_VERSION_ID < 70304) {
Expand Down
6 changes: 4 additions & 2 deletions src/Symfony/Component/HttpClient/Tests/HttpClientTestCase.php
Expand Up @@ -72,16 +72,17 @@ public function testToStream404()
$this->assertSame($response, stream_get_meta_data($stream)['wrapper_data']->getResponse());
$this->assertSame(404, $response->getStatusCode());

$this->expectException(ClientException::class);
$response = $client->request('GET', 'http://localhost:8057/404');
$stream = $response->toStream();
$this->expectException(ClientException::class);
$response->toStream();
}

public function testNonBlockingStream()
{
$client = $this->getHttpClient(__FUNCTION__);
$response = $client->request('GET', 'http://localhost:8057/timeout-body');
$stream = $response->toStream();
usleep(10000);

$this->assertTrue(stream_set_blocking($stream, false));
$this->assertSame('<1>', fread($stream, 8192));
Expand All @@ -99,6 +100,7 @@ public function testTimeoutIsNotAFatalError()
$response = $client->request('GET', 'http://localhost:8057/timeout-body', [
'timeout' => 0.25,
]);
$this->assertSame(200, $response->getStatusCode());

try {
$response->getContent();
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/MockHttpClientTest.php
Expand Up @@ -69,6 +69,10 @@ protected function getHttpClient(string $testCase): HttpClientInterface
$this->markTestSkipped("MockHttpClient doesn't unzip");
break;

case 'testTimeoutWithActiveConcurrentStream':
$this->markTestSkipped('Real transport required');
break;

case 'testDestruct':
$this->markTestSkipped("MockHttpClient doesn't timeout on destruct");
break;
Expand Down
24 changes: 24 additions & 0 deletions src/Symfony/Contracts/HttpClient/Test/HttpClientTestCase.php
Expand Up @@ -786,6 +786,30 @@ public function testUncheckedTimeoutThrows()
}
}

public function testTimeoutWithActiveConcurrentStream()
{
$p1 = TestHttpServer::start(8067);
$p2 = TestHttpServer::start(8077);

$client = $this->getHttpClient(__FUNCTION__);
$streamingResponse = $client->request('GET', 'http://localhost:8067/max-duration');
$blockingResponse = $client->request('GET', 'http://localhost:8077/timeout-body', [
'timeout' => 0.25,
]);

$this->assertSame(200, $streamingResponse->getStatusCode());
$this->assertSame(200, $blockingResponse->getStatusCode());

$this->expectException(TransportExceptionInterface::class);

try {
$blockingResponse->getContent();
} finally {
$p1->stop();
$p2->stop();
}
}

public function testDestruct()
{
$client = $this->getHttpClient(__FUNCTION__);
Expand Down
19 changes: 12 additions & 7 deletions src/Symfony/Contracts/HttpClient/Test/TestHttpServer.php
Expand Up @@ -19,23 +19,28 @@
*/
class TestHttpServer
{
private static $process;
private static $process = [];

public static function start()
public static function start(int $port = 8057)
{
if (self::$process) {
self::$process->stop();
if (isset(self::$process[$port])) {
self::$process[$port]->stop();
} else {
register_shutdown_function(static function () use ($port) {
self::$process[$port]->stop();
});
}

$finder = new PhpExecutableFinder();
$process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:8057']));
$process = new Process(array_merge([$finder->find(false)], $finder->findArguments(), ['-dopcache.enable=0', '-dvariables_order=EGPCS', '-S', '127.0.0.1:'.$port]));
$process->setWorkingDirectory(__DIR__.'/Fixtures/web');
$process->start();
self::$process[$port] = $process;

do {
usleep(50000);
} while (!@fopen('http://127.0.0.1:8057/', 'r'));
} while (!@fopen('http://127.0.0.1:'.$port, 'r'));

self::$process = $process;
return $process;
}
}

0 comments on commit 3755efd

Please sign in to comment.