Skip to content

mongodb加入事务支持,需要mongodb版本V4.0以上且开启复制集或分布式 #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 117 additions & 8 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ class Connection
protected $linkID;
protected $linkRead;
protected $linkWrite;

/**
* sessions key
* @var string
* @author klinson <[email protected]>
*/
protected $session_uuid;
/**
* @var array \MongoDB\Driver\Session
* @author klinson <[email protected]>
*/
protected $sessions = [];

// 返回或者影响记录数
protected $numRows = 0;
Expand Down Expand Up @@ -281,7 +293,14 @@ public function query($namespace, MongoQuery $query, ReadPreference $readPrefere
$this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
}
$this->debug(true);
$this->cursor = $this->mongo->executeQuery($namespace, $query, $readPreference);
if ($session = $this->getSession()) {
$this->cursor = $this->mongo->executeQuery($namespace, $query, [
'readPreference' => is_null($readPreference) ? new ReadPreference(ReadPreference::RP_PRIMARY) : $readPreference,
'session' => $session
]);
} else {
$this->cursor = $this->mongo->executeQuery($namespace, $query, $readPreference);
}
$this->debug(false);
return $this->getResult($class, $typeMap);
}
Expand Down Expand Up @@ -310,7 +329,14 @@ public function command(Command $command, $dbName = '', ReadPreference $readPref
if ($this->config['debug'] && !empty($this->queryStr)) {
$this->queryStr = 'db.' . $this->queryStr;
}
$this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
if ($session = $this->getSession()) {
$this->cursor = $this->mongo->executeCommand($dbName, $command, [
'readPreference' => is_null($readPreference) ? new ReadPreference(ReadPreference::RP_PRIMARY) : $readPreference,
'session' => $session
]);
} else {
$this->cursor = $this->mongo->executeCommand($dbName, $command, $readPreference);
}
$this->debug(false);
return $this->getResult($class, $typeMap);

Expand Down Expand Up @@ -388,7 +414,14 @@ public function execute($namespace, BulkWrite $bulk, WriteConcern $writeConcern
$this->queryStr = 'db' . strstr($namespace, '.') . '.' . $this->queryStr;
}
$this->debug(true);
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
if ($session = $this->getSession()) {
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, [
'session' => $session,
'writeConcern' => is_null($writeConcern) ? new WriteConcern(1) : $writeConcern
]);
} else {
$writeResult = $this->mongo->executeBulkWrite($namespace, $bulk, $writeConcern);
}
$this->debug(false);
$this->numRows = $writeResult->getMatchedCount();
return $writeResult;
Expand Down Expand Up @@ -640,33 +673,109 @@ private function buildUrl()
return rtrim($url, ",") . '/';
}

/**
* 执行数据库事务
* @access public
* @param callable $callback 数据操作方法回调
* @return mixed
* @throws \PDOException
* @throws \Exception
* @throws \Throwable
* @author klinson <[email protected]>
*/
public function transaction($callback)
{
$this->startTrans();
try {
$result = null;
if (is_callable($callback)) {
$result = call_user_func_array($callback, [$this]);
}
$this->commit();
return $result;
} catch (\Exception $e) {
$this->rollback();
throw $e;
} catch (\Throwable $e) {
$this->rollback();
throw $e;
}
}

/**
* 启动事务
* @access public
* @return void
* @throws \PDOException
* @throws \Exception
* @author klinson <[email protected]>
*/
public function startTrans()
{}
{
$this->initConnect(true);
$this->session_uuid = uniqid();
$this->sessions[$this->session_uuid] = $this->getMongo()->startSession();

$this->sessions[$this->session_uuid]->startTransaction([]);
}

/**
* 用于非自动提交状态下面的查询提交
* @access public
* @return void
* @throws PDOException
* @throws \PDOException
* @author klinson <[email protected]>
*/
public function commit()
{}
{
if ($session = $this->getSession()) {
$session->commitTransaction();
$this->setLastSession();
}
}

/**
* 事务回滚
* @access public
* @return void
* @throws PDOException
* @throws \PDOException
* @author klinson <[email protected]>
*/
public function rollback()
{}
{
if ($session = $this->getSession()) {
$session->abortTransaction();
$this->setLastSession();
}
}

/**
* 结束当前会话,设置上一个会话为当前会话
* @author klinson <[email protected]>
*/
protected function setLastSession()
{
if ($session = $this->getSession()) {
$session->endSession();
unset($this->sessions[$this->session_uuid]);
if (empty($this->sessions)) {
$this->session_uuid = null;
} else {
end($this->sessions);
$this->session_uuid = key($this->sessions);
}
}
}

/**
* 获取当前会话
* @return \MongoDB\Driver\Session|null
* @author klinson <[email protected]>
*/
public function getSession()
{
return isset($this->sessions[$this->session_uuid]) ? $this->sessions[$this->session_uuid] : null;
}

/**
* 析构方法
Expand Down