Back | Home
الـ Path الحالي: /home/picotech/domains/instantly.picotech.app/public_html/public/csv/../.././vendor/maennchen/../dragonmantank/../phpoffice/../dflydev/../symfony/event-dispatcher/../http-client/Response
الملفات الموجودة في هذا الـ Path:
.
..
AmpResponse.php
AsyncContext.php
AsyncResponse.php
CommonResponseTrait.php
CurlResponse.php
HttplugPromise.php
JsonMockResponse.php
MockResponse.php
NativeResponse.php
ResponseStream.php
StreamWrapper.php
StreamableInterface.php
TraceableResponse.php
TransportResponseTrait.php

مشاهدة ملف: AsyncResponse.php

<?php

/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <fabien@symfony.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */

namespace Symfony\Component\HttpClient\Response;

use Symfony\Component\HttpClient\Chunk\ErrorChunk;
use Symfony\Component\HttpClient\Chunk\FirstChunk;
use Symfony\Component\HttpClient\Chunk\LastChunk;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

/**
 * Provides a single extension point to process a response's content stream.
 *
 * @author Nicolas Grekas <p@tchwork.com>
 */
class AsyncResponse implements ResponseInterface, StreamableInterface
{
    use CommonResponseTrait;

    private const FIRST_CHUNK_YIELDED = 1;
    private const LAST_CHUNK_YIELDED = 2;

    private ?HttpClientInterface $client;
    private ResponseInterface $response;
    private array $info = ['canceled' => false];
    private $passthru;
    private $stream;
    private $yieldedState;

    /**
     * @param ?callable(ChunkInterface, AsyncContext): ?\Iterator $passthru
     */
    public function __construct(HttpClientInterface $client, string $method, string $url, array $options, callable $passthru = null)
    {
        $this->client = $client;
        $this->shouldBuffer = $options['buffer'] ?? true;

        if (null !== $onProgress = $options['on_progress'] ?? null) {
            $thisInfo = &$this->info;
            $options['on_progress'] = static function (int $dlNow, int $dlSize, array $info) use (&$thisInfo, $onProgress) {
                $onProgress($dlNow, $dlSize, $thisInfo + $info);
            };
        }
        $this->response = $client->request($method, $url, ['buffer' => false] + $options);
        $this->passthru = $passthru;
        $this->initializer = static function (self $response, float $timeout = null) {
            if (null === $response->shouldBuffer) {
                return false;
            }

            while (true) {
                foreach (self::stream([$response], $timeout) as $chunk) {
                    if ($chunk->isTimeout() && $response->passthru) {
                        foreach (self::passthru($response->client, $response, new ErrorChunk($response->offset, new TransportException($chunk->getError()))) as $chunk) {
                            if ($chunk->isFirst()) {
                                return false;
                            }
                        }

                        continue 2;
                    }

                    if ($chunk->isFirst()) {
                        return false;
                    }
                }

                return false;
            }
        };
        if (\array_key_exists('user_data', $options)) {
            $this->info['user_data'] = $options['user_data'];
        }
        if (\array_key_exists('max_duration', $options)) {
            $this->info['max_duration'] = $options['max_duration'];
        }
    }

    public function getStatusCode(): int
    {
        if ($this->initializer) {
            self::initialize($this);
        }

        return $this->response->getStatusCode();
    }

    public function getHeaders(bool $throw = true): array
    {
        if ($this->initializer) {
            self::initialize($this);
        }

        $headers = $this->response->getHeaders(false);

        if ($throw) {
            $this->checkStatusCode();
        }

        return $headers;
    }

    public function getInfo(string $type = null): mixed
    {
        if (null !== $type) {
            return $this->info[$type] ?? $this->response->getInfo($type);
        }

        return $this->info + $this->response->getInfo();
    }

    /**
     * @return resource
     */
    public function toStream(bool $throw = true)
    {
        if ($throw) {
            // Ensure headers arrived
            $this->getHeaders(true);
        }

        $handle = function () {
            $stream = $this->response instanceof StreamableInterface ? $this->response->toStream(false) : StreamWrapper::createResource($this->response);

            return stream_get_meta_data($stream)['wrapper_data']->stream_cast(\STREAM_CAST_FOR_SELECT);
        };

        $stream = StreamWrapper::createResource($this);
        stream_get_meta_data($stream)['wrapper_data']
            ->bindHandles($handle, $this->content);

        return $stream;
    }

    public function cancel(): void
    {
        if ($this->info['canceled']) {
            return;
        }

        $this->info['canceled'] = true;
        $this->info['error'] = 'Response has been canceled.';
        $this->close();
        $client = $this->client;
        $this->client = null;

        if (!$this->passthru) {
            return;
        }

        try {
            foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
                // no-op
            }

            $this->passthru = null;
        } catch (ExceptionInterface) {
            // ignore any errors when canceling
        }
    }

    public function __destruct()
    {
        $httpException = null;

        if ($this->initializer && null === $this->getInfo('error')) {
            try {
                self::initialize($this, -0.0);
                $this->getHeaders(true);
            } catch (HttpExceptionInterface $httpException) {
                // no-op
            }
        }

        if ($this->passthru && null === $this->getInfo('error')) {
            $this->info['canceled'] = true;

            try {
                foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
                    // no-op
                }
            } catch (ExceptionInterface) {
                // ignore any errors when destructing
            }
        }

        if (null !== $httpException) {
            throw $httpException;
        }
    }

    /**
     * @internal
     */
    public static function stream(iterable $responses, float $timeout = null, string $class = null): \Generator
    {
        while ($responses) {
            $wrappedResponses = [];
            $asyncMap = new \SplObjectStorage();
            $client = null;

            foreach ($responses as $r) {
                if (!$r instanceof self) {
                    throw new \TypeError(sprintf('"%s::stream()" expects parameter 1 to be an iterable of AsyncResponse objects, "%s" given.', $class ?? static::class, get_debug_type($r)));
                }

                if (null !== $e = $r->info['error'] ?? null) {
                    yield $r => $chunk = new ErrorChunk($r->offset, new TransportException($e));
                    $chunk->didThrow() ?: $chunk->getContent();
                    continue;
                }

                if (null === $client) {
                    $client = $r->client;
                } elseif ($r->client !== $client) {
                    throw new TransportException('Cannot stream AsyncResponse objects with many clients.');
                }

                $asyncMap[$r->response] = $r;
                $wrappedResponses[] = $r->response;

                if ($r->stream) {
                    yield from self::passthruStream($response = $r->response, $r, new FirstChunk(), $asyncMap);

                    if (!isset($asyncMap[$response])) {
                        array_pop($wrappedResponses);
                    }

                    if ($r->response !== $response && !isset($asyncMap[$r->response])) {
                        $asyncMap[$r->response] = $r;
                        $wrappedResponses[] = $r->response;
                    }
                }
            }

            if (!$client || !$wrappedResponses) {
                return;
            }

            foreach ($client->stream($wrappedResponses, $timeout) as $response => $chunk) {
                $r = $asyncMap[$response];

                if (null === $chunk->getError()) {
                    if ($chunk->isFirst()) {
                        // Ensure no exception is thrown on destruct for the wrapped response
                        $r->response->getStatusCode();
                    } elseif (0 === $r->offset && null === $r->content && $chunk->isLast()) {
                        $r->content = fopen('php://memory', 'w+');
                    }
                }

                if (!$r->passthru) {
                    if (null !== $chunk->getError() || $chunk->isLast()) {
                        unset($asyncMap[$response]);
                    } elseif (null !== $r->content && '' !== ($content = $chunk->getContent()) && \strlen($content) !== fwrite($r->content, $content)) {
                        $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
                        $r->info['error'] = $chunk->getError();
                        $r->response->cancel();
                    }

                    yield $r => $chunk;
                    continue;
                }

                if (null !== $chunk->getError()) {
                    // no-op
                } elseif ($chunk->isFirst()) {
                    $r->yieldedState = self::FIRST_CHUNK_YIELDED;
                } elseif (self::FIRST_CHUNK_YIELDED !== $r->yieldedState && null === $chunk->getInformationalStatus()) {
                    throw new \LogicException(sprintf('Instance of "%s" is already consumed and cannot be managed by "%s". A decorated client should not call any of the response\'s methods in its "request()" method.', get_debug_type($response), $class ?? static::class));
                }

                foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
                    yield $r => $chunk;
                }

                if ($r->response !== $response && isset($asyncMap[$response])) {
                    break;
                }
            }

            if (null === $chunk->getError() && $chunk->isLast()) {
                $r->yieldedState = self::LAST_CHUNK_YIELDED;
            }
            if (null === $chunk->getError() && self::LAST_CHUNK_YIELDED !== $r->yieldedState && $r->response === $response && null !== $r->client) {
                throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
            }

            $responses = [];
            foreach ($asyncMap as $response) {
                $r = $asyncMap[$response];

                if (null !== $r->client) {
                    $responses[] = $asyncMap[$response];
                }
            }
        }
    }

    /**
     * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
     */
    private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
    {
        $r->stream = null;
        $response = $r->response;
        $context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
        if (null === $stream = ($r->passthru)($chunk, $context)) {
            if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
                throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
            }

            return;
        }

        if (!$stream instanceof \Iterator) {
            throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
        }
        $r->stream = $stream;

        yield from self::passthruStream($response, $r, null, $asyncMap);
    }

    /**
     * @param \SplObjectStorage<ResponseInterface, AsyncResponse>|null $asyncMap
     */
    private static function passthruStream(ResponseInterface $response, self $r, ?ChunkInterface $chunk, ?\SplObjectStorage $asyncMap): \Generator
    {
        while (true) {
            try {
                if (null !== $chunk && $r->stream) {
                    $r->stream->next();
                }

                if (!$r->stream || !$r->stream->valid() || !$r->stream) {
                    $r->stream = null;
                    break;
                }
            } catch (\Throwable $e) {
                unset($asyncMap[$response]);
                $r->stream = null;
                $r->info['error'] = $e->getMessage();
                $r->response->cancel();

                yield $r => $chunk = new ErrorChunk($r->offset, $e);
                $chunk->didThrow() ?: $chunk->getContent();
                break;
            }

            $chunk = $r->stream->current();

            if (!$chunk instanceof ChunkInterface) {
                throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
            }

            if (null !== $chunk->getError()) {
                // no-op
            } elseif ($chunk->isFirst()) {
                $e = $r->openBuffer();

                yield $r => $chunk;

                if ($r->initializer && null === $r->getInfo('error')) {
                    // Ensure the HTTP status code is always checked
                    $r->getHeaders(true);
                }

                if (null === $e) {
                    continue;
                }

                $r->response->cancel();
                $chunk = new ErrorChunk($r->offset, $e);
            } elseif ('' !== $content = $chunk->getContent()) {
                if (null !== $r->shouldBuffer) {
                    throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
                }

                if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
                    $chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
                    $r->info['error'] = $chunk->getError();
                    $r->response->cancel();
                }
            }

            if (null !== $chunk->getError() || $chunk->isLast()) {
                $stream = $r->stream;
                $r->stream = null;
                unset($asyncMap[$response]);
            }

            if (null === $chunk->getError()) {
                $r->offset += \strlen($content);

                yield $r => $chunk;

                if (!$chunk->isLast()) {
                    continue;
                }

                $stream->next();

                if ($stream->valid()) {
                    throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
                }

                $r->passthru = null;
            } else {
                if ($chunk instanceof ErrorChunk) {
                    $chunk->didThrow(false);
                } else {
                    try {
                        $chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
                    } catch (TransportExceptionInterface $e) {
                        $chunk = new ErrorChunk($chunk->getOffset(), $e);
                    }
                }

                yield $r => $chunk;
                $chunk->didThrow() ?: $chunk->getContent();
            }

            break;
        }
    }

    private function openBuffer(): ?\Throwable
    {
        if (null === $shouldBuffer = $this->shouldBuffer) {
            throw new \LogicException('A chunk passthru cannot yield more than one "isFirst()" chunk.');
        }

        $e = $this->shouldBuffer = null;

        if ($shouldBuffer instanceof \Closure) {
            try {
                $shouldBuffer = $shouldBuffer($this->getHeaders(false));

                if (null !== $e = $this->response->getInfo('error')) {
                    throw new TransportException($e);
                }
            } catch (\Throwable $e) {
                $this->info['error'] = $e->getMessage();
                $this->response->cancel();
            }
        }

        if (true === $shouldBuffer) {
            $this->content = fopen('php://temp', 'w+');
        } elseif (\is_resource($shouldBuffer)) {
            $this->content = $shouldBuffer;
        }

        return $e;
    }

    private function close(): void
    {
        $this->response->cancel();
    }
}