<?php 
namespace Aws\Multipart; 
 
use Aws\AwsClientInterface as Client; 
use GuzzleHttp\Psr7; 
use InvalidArgumentException as IAE; 
use Psr\Http\Message\StreamInterface as Stream; 
 
abstract class AbstractUploader extends AbstractUploadManager 
{ 
    /** @var Stream Source of the data to be uploaded. */ 
    protected $source; 
 
    /** 
     * @param Client $client 
     * @param mixed  $source 
     * @param array  $config 
     */ 
    public function __construct(Client $client, $source, array $config = []) 
    { 
        $this->source = $this->determineSource($source); 
        parent::__construct($client, $config); 
    } 
 
    /** 
     * Create a stream for a part that starts at the current position and 
     * has a length of the upload part size (or less with the final part). 
     * 
     * @param Stream $stream 
     * 
     * @return Psr7\LimitStream 
     */ 
    protected function limitPartStream(Stream $stream) 
    { 
        // Limit what is read from the stream to the part size. 
        return new Psr7\LimitStream( 
            $stream, 
            $this->state->getPartSize(), 
            $this->source->tell() 
        ); 
    } 
 
    protected function getUploadCommands(callable $resultHandler) 
    { 
        // Determine if the source can be seeked. 
        $seekable = $this->source->isSeekable() 
            && $this->source->getMetadata('wrapper_type') === 'plainfile'; 
 
        for ($partNumber = 1; $this->isEof($seekable); $partNumber++) { 
            // If we haven't already uploaded this part, yield a new part. 
            if (!$this->state->hasPartBeenUploaded($partNumber)) { 
                $partStartPos = $this->source->tell(); 
                if (!($data = $this->createPart($seekable, $partNumber))) { 
                    break; 
                } 
                $command = $this->client->getCommand( 
                    $this->info['command']['upload'], 
                    $data + $this->state->getId() 
                ); 
                $command->getHandlerList()->appendSign($resultHandler, 'mup'); 
                yield $command; 
                if ($this->source->tell() > $partStartPos) { 
                    continue; 
                } 
            } 
 
            // Advance the source's offset if not already advanced. 
            if ($seekable) { 
                $this->source->seek(min( 
                    $this->source->tell() + $this->state->getPartSize(), 
                    $this->source->getSize() 
                )); 
            } else { 
                $this->source->read($this->state->getPartSize()); 
            } 
        } 
    } 
 
    /** 
     * Generates the parameters for an upload part by analyzing a range of the 
     * source starting from the current offset up to the part size. 
     * 
     * @param bool $seekable 
     * @param int  $number 
     * 
     * @return array|null 
     */ 
    abstract protected function createPart($seekable, $number); 
 
    /** 
     * Checks if the source is at EOF. 
     * 
     * @param bool $seekable 
     * 
     * @return bool 
     */ 
    private function isEof($seekable) 
    { 
        return $seekable 
            ? $this->source->tell() < $this->source->getSize() 
            : !$this->source->eof(); 
    } 
 
    /** 
     * Turns the provided source into a stream and stores it. 
     * 
     * If a string is provided, it is assumed to be a filename, otherwise, it 
     * passes the value directly to `Psr7\stream_for()`. 
     * 
     * @param mixed $source 
     * 
     * @return Stream 
     */ 
    private function determineSource($source) 
    { 
        // Use the contents of a file as the data source. 
        if (is_string($source)) { 
            $source = Psr7\try_fopen($source, 'r'); 
        } 
 
        // Create a source stream. 
        $stream = Psr7\stream_for($source); 
        if (!$stream->isReadable()) { 
            throw new IAE('Source stream must be readable.'); 
        } 
 
        return $stream; 
    } 
} 
 
 |