Skip to content

Commit

Permalink
Merge branch '2.0' of https://github.com/top-think/think-orm into 2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liu21st committed Jan 2, 2020
2 parents e90ef8c + a343dbb commit b6f61fc
Showing 1 changed file with 92 additions and 7 deletions.
99 changes: 92 additions & 7 deletions src/db/connector/Mongo.php
Expand Up @@ -23,6 +23,7 @@
use MongoDB\Driver\Manager;
use MongoDB\Driver\Query as MongoQuery;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\WriteConcern;
use think\db\BaseQuery;
use think\db\builder\Mongo as Builder;
use think\db\Connection;
Expand All @@ -41,6 +42,8 @@ class Mongo extends Connection implements ConnectionInterface
protected $typeMap = 'array';
protected $mongo; // MongoDb Object
protected $cursor; // MongoCursor Object
protected $session_uuid; // sessions会话列表当前会话数组key 随机生成
protected $sessions = []; // 会话列表

// 数据库连接参数配置
protected $config = [
Expand Down Expand Up @@ -265,7 +268,14 @@ public function getCursor(BaseQuery $query, $mongoQuery, bool $master = false):
$readPreference = $options['readPreference'] ?? null;
$this->queryStartTime = microtime(true);

$this->cursor = $this->mongo->executeQuery($namespace, $mongoQuery, $readPreference);
if ($session = $this->getSession()) {
$this->cursor = $this->mongo->executeQuery($namespace, $query, [
'readPreference' => is_null($readPreference) ? new ReadPreference(ReadPreference::RP_PRIMARY) : $readPreference,
'session' => $session
]);
} else {
$this->cursor = $this->mongo->executeQuery($namespace, $mongoQuery, $readPreference);
}

// SQL监控
if (!empty($this->config['trigger_sql'])) {
Expand Down Expand Up @@ -351,7 +361,14 @@ public function execute(BaseQuery $query, BulkWrite $bulk)
$writeConcern = $options['writeConcern'] ?? null;
$this->queryStartTime = microtime(true);

$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
if ($session = $this->getSession()) {
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, [
'session' => $session,
'writeConcern' => is_null($writeConcern) ? new WriteConcern(1) : $writeConcern
]);
} else {
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
}

// SQL监控
if (!empty($this->config['trigger_sql'])) {
Expand Down Expand Up @@ -403,7 +420,14 @@ public function command(Command $command, string $dbName = '', ReadPreference $r
$this->queryStr = 'db.' . $this->queryStr;
}

$this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
if ($session = $this->getSession()) {
$this->cursor = $this->mongo->executeCommand($dbName, $command, [
'readPreference' => is_null($readPreference) ? new ReadPreference(ReadPreference::RP_PRIMARY) : $readPreference,
'session' => $session
]);
} else {
$this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
}

// SQL监控
if (!empty($this->config['trigger_sql'])) {
Expand Down Expand Up @@ -1033,7 +1057,23 @@ public function getTableFields($tableName): array
* @throws \Throwable
*/
public function transaction(callable $callback)
{}
{
$this->startTrans();
try {
$result = null;
if (is_callable($callback)) {
$result = call_user_func_array($callback, [$this]);
}
$this->commit();
return $result;
} catch (\Exception $e) {
$this->rollback();
throw $e;
} catch (\Throwable $e) {
$this->rollback();
throw $e;
}
}

/**
* 启动事务
Expand All @@ -1043,7 +1083,13 @@ public function transaction(callable $callback)
* @throws \Exception
*/
public function startTrans()
{}
{
$this->initConnect(true);
$this->session_uuid = uniqid();
$this->sessions[$this->session_uuid] = $this->getMongo()->startSession();

$this->sessions[$this->session_uuid]->startTransaction([]);
}

/**
* 用于非自动提交状态下面的查询提交
Expand All @@ -1052,7 +1098,12 @@ public function startTrans()
* @throws PDOException
*/
public function commit()
{}
{
if ($session = $this->getSession()) {
$session->commitTransaction();
$this->setLastSession();
}
}

/**
* 事务回滚
Expand All @@ -1061,6 +1112,40 @@ public function commit()
* @throws PDOException
*/
public function rollback()
{}
{
if ($session = $this->getSession()) {
$session->abortTransaction();
$this->setLastSession();
}
}

/**
* 结束当前会话,设置上一个会话为当前会话
* @author klinson <klinson@163.com>
*/
protected function setLastSession()
{
if ($session = $this->getSession()) {
$session->endSession();
unset($this->sessions[$this->session_uuid]);
if (empty($this->sessions)) {
$this->session_uuid = null;
} else {
end($this->sessions);
$this->session_uuid = key($this->sessions);
}
}
}

/**
* 获取当前会话
* @return \MongoDB\Driver\Session|null
* @author klinson <klinson@163.com>
*/
public function getSession()
{
return ($this->session_uuid && isset($this->sessions[$this->session_uuid]))
? $this->sessions[$this->session_uuid]
: null;
}
}

0 comments on commit b6f61fc

Please sign in to comment.