Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Minor] Class attributes being private prevents code from being easily adapted #39

Open
ThibautSF opened this issue Oct 7, 2020 · 5 comments

Comments

@ThibautSF
Copy link

ThibautSF commented Oct 7, 2020

Hi,

I began to use elastically and implement it.
But due to some specific use, I need to extend some classes and overload some methods.
For example, overload the IndexBuilder.createIndex($indexName) method in order to change how index name is generated. But it's impossible to access private attributes like $this->configurationDirectory.
Another example is to add request param to the bulk request generated in Indexer.getCurrentBulk() like set a pipeline $this->currentBulk->setRequestParam('pipeline', $pipelineID); (this part is pretty similar to #31)

Note: Another solution could be having more getters&setters for class attributes

@damienalexandre
Copy link
Member

Hi!
Thanks for reaching out!

Change how index name is generated

What would you need for example?

We have a PREFIX option, but that's all at the moment. I'm completely ok adding an option to change the date format, but using a completely non date-related name is not desirable.

For example the method purgeOldIndices needs to sort indexes by date.

Was that your plan/need?

Change to createIndex

This ticket #22 is still in my mind, allowing to fetch the mapping/setting from another source than YAML.

Access $this->configurationDirectory

You can use $client->getConfig(Client::CONFIG_MAPPINGS_DIRECTORY) it's the same 👍

Pipeline to the bulk

You are right, #31 should be able that too! Thanks!

Let me know if this is helpful and what we can do to help!

@ThibautSF
Copy link
Author

ThibautSF commented Oct 16, 2020

Change how index name is generated

What would you need for example?

We have a PREFIX option, but that's all at the moment. I'm completely ok adding an option to change the date format, but using a completely non-date-related name is not desirable.

I discovered just after posting my message the CONFIG_INDEX_PREFIX which is indeed what I needed. I just wanted to add more info before the index name.
I have multiple groups, each group have its own indexes -> so now each index name has the group id as prefix

I will explain again what I have done now since my implementation is in a more advanced stage.
I wanted to add pipelines to bulk requests but also add a bytes size limit for bulk requests (-> I use the ingest-attachement pipeline and send request with some file binaries, thus queries might be too heavy)

I extend two classes (which are copied from src code with protected attributes and overridden in composer autoload):

  • Client (mainly protected $indexer;) -> in CustomClient class
  • Indexer (mainly protected $currentBulk;) -> in CustomIndexer class

CustomClient

use JoliCode\Elastically\Client;
...

class CustomClient extends Client
{
    const CONFIG_BULK_BYTES_SIZE = 'elastically_bulk_bytes_size'; //New conf

    public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
    {
        parent::__construct($config, $callback, $logger);
    }

    public function getIndexer(): Indexer //Use a custom indexer -> OOP
    {
        if (!$this->indexer) {
            $this->indexer = new CustomIndexer($this, $this->getSerializer(), $this->getConfigValue(self::CONFIG_BULK_SIZE, 100), $this->getConfigValue(self::CONFIG_BULK_BYTES_SIZE, (int) (ini_get('post_max_size')) * 1.0e+6));
        }

        return $this->indexer;
    }
}

As you see, I extend the class in order to add a new conf value and to override getIndexer(): Indexer in order to instantiate and return my CustomIndexer (a child of Indexer).

CustomIndexer

use JoliCode\Elastically\Indexer;
...

class LabCollectorIndexer extends Indexer
{
    protected $bulkBytesMaxSize; //New attribute

    public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100, int $bulkBytesMaxSize) //Extended constructor
    {
        parent::__construct($client, $serializer, $bulkMaxSize ?? 100, $serializer);
        $this->bulkBytesMaxSize = $bulkBytesMaxSize;
    }

    public function scheduleIndex($index, Document $document, string $pipelineID = null) //add string $pipelineID = null to all methods which might call flush method
    {
        $document->setIndex($index instanceof Index ? $index->getName() : $index);
        if (is_object($document->getData())) {
            $context = $this->client->getSerializerContext(get_class($document->getData()));
            $document->setData($this->serializer->serialize($document->getData(), 'json', $context));
        }

        $docsize = $this->getBytesSize($document) * 2; // After adding a document $currentBulk size increase 2 times the document size

        if ($docsize > $this->bulkBytesMaxSize) {
            //Document too heavy
            //TODO return exception
            return;
        }

        $queuesize = $this->getQueueBytesSize();
        $response = null;
        if ($this->getQueueSize() + 1 >= $this->bulkMaxSize || $queuesize + $docsize >= $this->bulkBytesMaxSize) {
            //Queue too long or heavy in case we add the document -> send the bulk
            try {
                $response = $this->flush($pipelineID); //store response
            } catch (ResponseException $e) {
                $response = $e->getResponseSet(); //Get response even in case of exception (ie in my case, tka exception with encrypted files)
            }
        }

        $this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

        return $response; //Return the bulk response or null
    }

    // Note: insert modifications of scheduleDelete, scheduleUpdate and scheduleCreate which are very similar (but I want to reduce this issue post size)

    public function getQueueBytesSize() //New method
    {
        if (null === $this->currentBulk) {
            return 0;
        }

        return $this->getBytesSize($this->currentBulk->getActions());
    }

    public function getBytesSize($arr): int //New method
    {
        //TODO might need improvements
        $tot = 0;

        if (is_array($arr)) {
            foreach ($arr as $a) {
                $tot += $this->getBytesSize($a);
            }
        }
        if (is_string($arr)) {
            $tot += strlen($arr);
        }
        if (is_int($arr)) {
            $tot += PHP_INT_SIZE;
        }
        if (is_object($arr)) {
            $tot += strlen(serialize($arr));
        }

        return $tot;
    }

    protected function getCurrentBulk(string $pipelineID = null): Bulk //Override to add pipeline
    {
        if (!($this->currentBulk instanceof Bulk)) {
            $this->currentBulk = new Bulk($this->client);

            if (isset($pipelineID)) {
                $this->currentBulk->setRequestParam('pipeline', $pipelineID);
            }
        }

        return $this->currentBulk;
    }
}

Implementation of bulk pipelines and max byte size.
Also, add a return of Elastica ResponseSet for each intermediary flush -> I use that to track document status and errors at index

@ThibautSF
Copy link
Author

As I read my code again, getCurrentBulk override in CustomIndexer might be useless since I also can replace the line

$this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

with the lines

$bulk = $this->getCurrentBulk($pipelineID);
if (isset($pipelineID)) {
    $bulk ->setRequestParam('pipeline', $pipelineID); //Note: It will reset the value at each schedule* call
}
$bulk->addDocument($document, Bulk\Action::OP_TYPE_INDEX);

@damienalexandre
Copy link
Member

Thanks for sharing.

Just found this setter on the Elastica Document $doc->setPipeline('coucou'); but it does not work on Bulk, it's only for one-shot indexing.

One possible implementation we could do is adding the pipeline as a Indexer option, like you did for bulkBytesMaxSize (that way you create one indexer per pipeline you want to use). ping #31 for reference.

@ThibautSF
Copy link
Author

ThibautSF commented Oct 16, 2020

Just found this setter on the Elastica Document $doc->setPipeline('coucou'); but it does not work on Bulk, it's only for one-shot indexing.

Yeah, it was my first try to add a pipeline (before adding to CustomIndexer) and you remind me that I still have this line

/**
 * Index a FileSystem file.
 *
 * @param string  $attachId The file id in DB
 * @param string  $hash     An hash of the file (ie: filename or content, depends on what we want to be identified as unique), will be used as Elasticsearch index ID
 * @param string  $contents The file content (will be encoded in base64)
 * @param string  $filepath The direct path to the file
 * @param string  $dbtable  The file id in DB
 * @param Indexer $indexer  The Indexer instance
 * @param Index   $index    The Index instance
 *
 * @return null|ResponseSet
 */
function indexFSFile(string $attachId, string $hash, string $contents, string $filepath, string $dbtable, Indexer $indexer, Index $index)
{
    $b64 = base64_encode($contents);

    $path_parts = pathinfo($filepath);

    $dto = MyIndexedFile::createFSdoc($attachId, $b64, $dbtable, $path_parts['basename'], $path_parts['dirname'], $filepath);

    $doc = new Document($hash, $dto);
    $doc->setPipeline('fileattachment'); //TODO useless because it's a bulk request

    return $indexer->scheduleIndex($index, $doc, 'fileattachment');
}

One possible implementation we could do is adding the pipeline as a Indexer option, like you did for bulkBytesMaxSize (that way you create one indexer per pipeline you want to use). ping #31 for reference.

This is how I create the pipeline (at index creation) using only Elastica code (apart the $client which is an instance of CustomClient, which extends Elastically\Client which extends etc...)

//Add pipeline for ingest-attachement
$pipeline = new Pipeline($client);
$pipeline->setId('fileattachment')->setDescription('Extract attachment information');

//Create attachment processor pipeline -> set the field where are file binaries and infinite indexed chars
$attachproc = new Attachment('contentBinary');
$attachproc->setIndexedChars(-1);

//Create remove processor pipeline -> remove the file binaries after contentBinary is indexed (reduce weight)
$removeprc = new Remove('contentBinary');

//Add processors in the correct order to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeprc);
//Create the pipeline
$response = $pipeline->create();

And this 'fileattachement' pipeline ID is given to bulk in the first shared code in this post (last line). But indeed, through Indexer option, it could be useful 👍
Could also be a mapping based on index name (because for one client we might want to have index 'A' to use 'pipelineA' and index 'B' to use 'pipelineB') -> thus might even mean (yeah I'm extending again the idea) being able to set the pipeline in an equivalent way than the YAML mappings conf (ping #31 again)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants