/
BulkCommand.php
134 lines (120 loc) · 4.4 KB
/
BulkCommand.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
<?php
/**
* @link https://www.yiiframework.com/
* @copyright Copyright (c) 2008 Yii Software LLC
* @license https://www.yiiframework.com/license/
*/
namespace yii\elasticsearch;
use yii\base\Component;
use yii\base\InvalidCallException;
use yii\helpers\Json;
/**
* The [[BulkCommand]] class implements the API for accessing the Elasticsearch bulk REST API.
*
* Further details on bulk API is available in
* [Elasticsearch guide](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html).
*
* @author Konstantin Sirotkin <beowulfenator@gmail.com>
* @since 2.0.5
*/
class BulkCommand extends Component
{
/**
* @var Connection
*/
public $db;
/**
* @var string Default index to execute the queries on. Defaults to null meaning that index needs to be specified in every action.
*/
public $index;
/**
* @var string Default type to execute the queries on. Defaults to null meaning that type needs to be specified in every action.
*/
public $type;
/**
* @var array|string Actions to be executed in this bulk command, given as either an array of arrays or as one newline-delimited string.
* All actions except delete span two lines.
*/
public $actions;
/**
* @var array Options to be appended to the query URL.
*/
public $options = [];
/**
* Executes the bulk command.
* @return mixed
* @throws \yii\base\InvalidCallException
*/
public function execute()
{
//valid endpoints are /_bulk, /{index}/_bulk, and {index}/{type}/_bulk
//for ES7+ type is omitted
if ($this->index === null && $this->type === null) {
$endpoint = ['_bulk'];
} elseif ($this->index !== null && $this->type === null) {
$endpoint = [$this->index, '_bulk'];
} elseif ($this->index !== null && $this->type !== null) {
if ($this->db->dslVersion >= 7) {
$endpoint = [$this->index, '_bulk'];
} else {
$endpoint = [$this->index, $this->type, '_bulk'];
}
} else {
throw new InvalidCallException('Invalid endpoint: if type is defined, index must be defined too.');
}
if (empty($this->actions)) {
$body = '{}';
} elseif (is_array($this->actions)) {
$body = '';
$prettyPrintSupported = property_exists('yii\\helpers\\Json', 'prettyPrint');
if ($prettyPrintSupported) {
$originalPrettyPrint = Json::$prettyPrint;
Json::$prettyPrint = false; // ElasticSearch bulk API uses new lines as delimiters.
}
foreach ($this->actions as $action) {
$body .= Json::encode($action) . "\n";
}
if ($prettyPrintSupported) {
Json::$prettyPrint = $originalPrettyPrint;
}
} else {
$body = $this->actions;
}
return $this->db->post($endpoint, $this->options, $body);
}
/**
* Adds an action to the command. Will overwrite existing actions if they are specified as a string.
* @param array $line1 First action expressed as an array (will be encoded to JSON automatically).
* @param array|null $line2 Second action expressed as an array (will be encoded to JSON automatically).
* @see https://www.elastic.co/guide/en/elasticsearch/reference/7.x/docs-bulk.html
*/
public function addAction($line1, $line2 = null)
{
if (!is_array($this->actions)) {
$this->actions = [];
}
$this->actions[] = $line1;
if ($line2 !== null) {
$this->actions[] = $line2;
}
}
/**
* Adds a delete action to the command.
* @param string $id Document ID
* @param string|null $index Index that the document belongs to. Can be set to null if the command has
* a default index ([[BulkCommand::$index]]) assigned.
* @param string|null $type Type that the document belongs to. Can be set to null if the command has
* a default type ([[BulkCommand::$type]]) assigned.
*/
public function addDeleteAction($id, $index = null, $type = null)
{
$actionData = ['_id' => $id];
if (!empty($index)) {
$actionData['_index'] = $index;
}
if (!empty($type)) {
$actionData['_type'] = $type;
}
$this->addAction(['delete' => $actionData]);
}
}