new files

This commit is contained in:
panchengyong
2026-03-07 22:29:07 +08:00
parent cd7e80b502
commit 7acbf45ff7
12516 changed files with 1808447 additions and 194 deletions

View File

@@ -0,0 +1,4 @@
vendor
.idea
composer.lock
*.sw[a-z]

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 Swoole Micro-Framework
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,223 @@
# Connection pool
A common connection pool based on Swoole is usually used as the database connection pool.
[![Latest Version](https://img.shields.io/github/release/open-smf/connection-pool.svg)](https://github.com/open-smf/connection-pool/releases)
[![PHP Version](https://img.shields.io/packagist/php-v/open-smf/connection-pool.svg?color=green)](https://secure.php.net)
[![Total Downloads](https://poser.pugx.org/open-smf/connection-pool/downloads)](https://packagist.org/packages/open-smf/connection-pool)
[![License](https://poser.pugx.org/open-smf/connection-pool/license)](LICENSE)
## Requirements
| Dependency | Requirement |
| -------- | -------- |
| [PHP](https://secure.php.net/manual/en/install.php) | `>=7.0.0` |
| [Swoole](https://github.com/swoole/swoole-src) | `>=4.2.9` `Recommend 4.2.13+` |
## Install
> Install package via [Composer](https://getcomposer.org/).
```shell
composer require "open-smf/connection-pool:~1.0"
```
## Usage
> See more [examples](examples).
- Available connectors
| Connector | Connection description |
| -------- | -------- |
| CoroutineMySQLConnector | Instance of `Swoole\Coroutine\MySQL` |
| CoroutinePostgreSQLConnector | Instance of `Swoole\Coroutine\PostgreSQL`, require configuring `Swoole` with `--enable-coroutine-postgresql`|
| CoroutineRedisConnector | Instance of `Swoole\Coroutine\Redis` |
| PhpRedisConnector | Instance of `Redis`, require [redis](https://pecl.php.net/package/redis) |
| PDOConnector | Instance of `PDO`, require [PDO](https://www.php.net/manual/en/book.pdo.php) |
| YourConnector | `YourConnector` must implement interface `ConnectorInterface`, any object can be used as a connection instance |
- Basic usage
```php
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;
use Swoole\Coroutine\MySQL;
go(function () {
// All MySQL connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new CoroutineMySQLConnector,
[
'host' => '127.0.0.1',
'port' => '3306',
'user' => 'root',
'password' => 'xy123456',
'database' => 'mysql',
'timeout' => 10,
'charset' => 'utf8mb4',
'strict_type' => true,
'fetch_mode' => true,
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Closing connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var MySQL $connection */
$connection = $pool->borrow();
$status = $connection->query('SHOW STATUS LIKE "Threads_connected"');
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($status);
});
```
- Usage in Swoole Server
```php
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\ConnectionPoolTrait;
use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use Swoole\Coroutine\MySQL;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Server;
class HttpServer
{
use ConnectionPoolTrait;
protected $swoole;
public function __construct(string $host, int $port)
{
$this->swoole = new Server($host, $port);
$this->setDefault();
$this->bindWorkerEvents();
$this->bindHttpEvent();
}
protected function setDefault()
{
$this->swoole->set([
'daemonize' => false,
'dispatch_mode' => 1,
'max_request' => 8000,
'open_tcp_nodelay' => true,
'reload_async' => true,
'max_wait_time' => 60,
'enable_reuse_port' => true,
'enable_coroutine' => true,
'http_compression' => false,
'enable_static_handler' => false,
'buffer_output_size' => 4 * 1024 * 1024,
'worker_num' => 4, // Each worker holds a connection pool
]);
}
protected function bindHttpEvent()
{
$this->swoole->on('Request', function (Request $request, Response $response) {
$pool1 = $this->getConnectionPool('mysql');
/**@var MySQL $mysql */
$mysql = $pool1->borrow();
$status = $mysql->query('SHOW STATUS LIKE "Threads_connected"');
// Return the connection to pool as soon as possible
$pool1->return($mysql);
$pool2 = $this->getConnectionPool('redis');
/**@var \Redis $redis */
$redis = $pool2->borrow();
$clients = $redis->info('Clients');
// Return the connection to pool as soon as possible
$pool2->return($redis);
$json = [
'status' => $status,
'clients' => $clients,
];
// Other logic
// ...
$response->header('Content-Type', 'application/json');
$response->end(json_encode($json));
});
}
protected function bindWorkerEvents()
{
$createPools = function () {
// All MySQL connections: [4 workers * 2 = 8, 4 workers * 10 = 40]
$pool1 = new ConnectionPool(
[
'minActive' => 2,
'maxActive' => 10,
],
new CoroutineMySQLConnector,
[
'host' => '127.0.0.1',
'port' => '3306',
'user' => 'root',
'password' => 'xy123456',
'database' => 'mysql',
'timeout' => 10,
'charset' => 'utf8mb4',
'strict_type' => true,
'fetch_mode' => true,
]);
$pool1->init();
$this->addConnectionPool('mysql', $pool1);
// All Redis connections: [4 workers * 5 = 20, 4 workers * 20 = 80]
$pool2 = new ConnectionPool(
[
'minActive' => 5,
'maxActive' => 20,
],
new PhpRedisConnector,
[
'host' => '127.0.0.1',
'port' => '6379',
'database' => 0,
'password' => null,
]);
$pool2->init();
$this->addConnectionPool('redis', $pool2);
};
$closePools = function () {
$this->closeConnectionPools();
};
$this->swoole->on('WorkerStart', $createPools);
$this->swoole->on('WorkerStop', $closePools);
$this->swoole->on('WorkerError', $closePools);
}
public function start()
{
$this->swoole->start();
}
}
// Enable coroutine for PhpRedis
Swoole\Runtime::enableCoroutine();
$server = new HttpServer('0.0.0.0', 5200);
$server->start();
```
## License
[MIT](LICENSE)

View File

@@ -0,0 +1,40 @@
{
"name": "open-smf/connection-pool",
"type": "library",
"license": "MIT",
"support": {
"issues": "https://github.com/open-smf/connection-pool/issues",
"source": "https://github.com/open-smf/connection-pool"
},
"description": "A common connection pool based on Swoole is usually used as the database connection pool.",
"keywords": [
"swoole",
"connection-pool",
"database-connection-pool"
],
"homepage": "https://github.com/open-smf/connection-pool",
"authors": [
{
"name": "Xie Biao",
"email": "hhxsv5@sina.com"
}
],
"require": {
"php": ">=7.0.0",
"ext-json": "*",
"ext-swoole": ">=4.2.9"
},
"suggest": {
"ext-redis": "A PHP extension for Redis."
},
"autoload": {
"psr-4": {
"Smf\\ConnectionPool\\": "src"
}
},
"prefer-stable": true,
"minimum-stability": "dev",
"require-dev": {
"swoole/ide-helper": "@dev"
}
}

View File

@@ -0,0 +1,48 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;
use Swoole\Coroutine\MySQL;
go(function () {
// All MySQL connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new CoroutineMySQLConnector,
[
'host' => '127.0.0.1',
'port' => '3306',
'user' => 'root',
'password' => 'xy123456',
'database' => 'mysql',
'timeout' => 10,
'charset' => 'utf8mb4',
'strict_type' => true,
'fetch_mode' => true,
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Closing connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var MySQL $connection */
$connection = $pool->borrow();
$status = $connection->query('SHOW STATUS LIKE "Threads_connected"');
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($status);
});

View File

@@ -0,0 +1,41 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\CoroutinePostgreSQLConnector;
use Swoole\Coroutine\PostgreSQL;
go(function () {
// All PostgreSQL connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new CoroutinePostgreSQLConnector,
[
'connection_strings' => 'host=127.0.0.1 port=5432 dbname=postgres user=postgres password=xy123456',
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Closing connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var PostgreSQL $connection */
$connection = $pool->borrow();
$result = $connection->query("SELECT * FROM pg_stat_database where datname='postgres';");
$stat = $connection->fetchAssoc($result);
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($stat);
});

View File

@@ -0,0 +1,48 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\CoroutineRedisConnector;
use Swoole\Coroutine\Redis;
go(function () {
// All Redis connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new CoroutineRedisConnector,
[
'host' => '127.0.0.1',
'port' => '6379',
'database' => 0,
'password' => null,
'options' => [
'connect_timeout' => 1,
'timeout' => 5,
],
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Close connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var Redis $connection */
$connection = $pool->borrow();
$connection->set('test', uniqid());
$test = $connection->get('test');
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($test);
});

View File

@@ -0,0 +1,49 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PDOConnector;
// Enable coroutine for PDO
Swoole\Runtime::enableCoroutine();
go(function () {
// All PDO connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new PDOConnector,
[
'dsn' => 'mysql:host=127.0.0.1;port=3306;dbname=mysql;charset=utf8mb4',
'username' => 'root',
'password' => 'xy123456',
'options' => [
\PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
\PDO::ATTR_TIMEOUT => 30,
],
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Close connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var \PDO $connection */
$connection = $pool->borrow();
$statement = $connection->query('SHOW STATUS LIKE "Threads_connected"');
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($statement->fetch());
});

View File

@@ -0,0 +1,47 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
// Enable coroutine for PhpRedis
Swoole\Runtime::enableCoroutine();
go(function () {
// All Redis connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new PhpRedisConnector,
[
'host' => '127.0.0.1',
'port' => '6379',
'database' => 0,
'password' => null,
'timeout' => 5,
]
);
echo "Initializing connection pool\n";
$pool->init();
defer(function () use ($pool) {
echo "Close connection pool\n";
$pool->close();
});
echo "Borrowing the connection from pool\n";
/**@var Redis $connection */
$connection = $pool->borrow();
$connection->set('test', uniqid());
$test = $connection->get('test');
echo "Return the connection to pool as soon as possible\n";
$pool->return($connection);
var_dump($test);
});

View File

@@ -0,0 +1,65 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\Connectors\PDOConnector;
use Swoole\Coroutine;
Swoole\Runtime::enableCoroutine();
go(function () {
// All MySQL connections: [10, 30]
$pool = new ConnectionPool(
[
'minActive' => 10,
'maxActive' => 30,
'maxWaitTime' => 5,
'maxIdleTime' => 20,
'idleCheckInterval' => 10,
],
new PDOConnector,
[
'dsn' => 'mysql:host=127.0.0.1;port=3306;dbname=mysql;charset=utf8mb4',
'username' => 'root',
'password' => 'xy123456',
'options' => [
\PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
\PDO::ATTR_TIMEOUT => 30,
],
]
);
$pool->init();
// For debug
$peakCount = 0;
swoole_timer_tick(1000, function () use ($pool, &$peakCount) {
$count = $pool->getConnectionCount();
$idleCount = $pool->getIdleCount();
if ($peakCount < $count) {
$peakCount = $count;
}
echo "Pool connection count: $count, peak count: $peakCount, idle count: $idleCount\n";
});
while (true) {
$count = mt_rand(1, 45);
echo "Query count: $count\n";
for ($i = 0; $i < $count; $i++) {
go(function () use ($pool) {
/**@var \PDO $pdo */
$pdo = $pool->borrow();
defer(function () use ($pool, $pdo) {
$pool->return($pdo);
});
$statement = $pdo->query('show status like \'Threads_connected\'');
$ret = $statement->fetch();
if (!isset($ret['Variable_name'])) {
echo "Invalid query result: \n", print_r($ret, true);
}
echo $ret['Variable_name'] . ': ' . $ret['Value'] . "\n";
});
}
Coroutine::sleep(mt_rand(1, 15));
}
});

View File

@@ -0,0 +1,132 @@
<?php
include '../vendor/autoload.php';
use Smf\ConnectionPool\ConnectionPool;
use Smf\ConnectionPool\ConnectionPoolTrait;
use Smf\ConnectionPool\Connectors\CoroutineMySQLConnector;
use Smf\ConnectionPool\Connectors\PhpRedisConnector;
use Swoole\Coroutine\MySQL;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Http\Server;
class HttpServer
{
use ConnectionPoolTrait;
protected $swoole;
public function __construct(string $host, int $port)
{
$this->swoole = new Server($host, $port);
$this->setDefault();
$this->bindWorkerEvents();
$this->bindHttpEvent();
}
protected function setDefault()
{
$this->swoole->set([
'daemonize' => false,
'dispatch_mode' => 1,
'max_request' => 8000,
'open_tcp_nodelay' => true,
'reload_async' => true,
'max_wait_time' => 60,
'enable_reuse_port' => true,
'enable_coroutine' => true,
'http_compression' => false,
'enable_static_handler' => false,
'buffer_output_size' => 4 * 1024 * 1024,
'worker_num' => 4, // Each worker holds a connection pool
]);
}
protected function bindHttpEvent()
{
$this->swoole->on('Request', function (Request $request, Response $response) {
$pool1 = $this->getConnectionPool('mysql');
/**@var MySQL $mysql */
$mysql = $pool1->borrow();
$status = $mysql->query('SHOW STATUS LIKE "Threads_connected"');
// Return the connection to pool as soon as possible
$pool1->return($mysql);
$pool2 = $this->getConnectionPool('redis');
/**@var \Redis $redis */
$redis = $pool2->borrow();
$clients = $redis->info('Clients');
// Return the connection to pool as soon as possible
$pool2->return($redis);
$json = [
'status' => $status,
'clients' => $clients,
];
// Other logic
// ...
$response->header('Content-Type', 'application/json');
$response->end(json_encode($json));
});
}
protected function bindWorkerEvents()
{
$createPools = function () {
// All MySQL connections: [4 workers * 2 = 8, 4 workers * 10 = 40]
$pool1 = new ConnectionPool(
[
'minActive' => 2,
'maxActive' => 10,
],
new CoroutineMySQLConnector,
[
'host' => '127.0.0.1',
'port' => '3306',
'user' => 'root',
'password' => 'xy123456',
'database' => 'mysql',
'timeout' => 10,
'charset' => 'utf8mb4',
'strict_type' => true,
'fetch_mode' => true,
]);
$pool1->init();
$this->addConnectionPool('mysql', $pool1);
// All Redis connections: [4 workers * 5 = 20, 4 workers * 20 = 80]
$pool2 = new ConnectionPool(
[
'minActive' => 5,
'maxActive' => 20,
],
new PhpRedisConnector,
[
'host' => '127.0.0.1',
'port' => '6379',
'database' => 0,
'password' => null,
]);
$pool2->init();
$this->addConnectionPool('redis', $pool2);
};
$closePools = function () {
$this->closeConnectionPools();
};
$this->swoole->on('WorkerStart', $createPools);
$this->swoole->on('WorkerStop', $closePools);
$this->swoole->on('WorkerError', $closePools);
}
public function start()
{
$this->swoole->start();
}
}
// Enable coroutine for PhpRedis
Swoole\Runtime::enableCoroutine();
$server = new HttpServer('0.0.0.0', 5200);
$server->start();

View File

@@ -0,0 +1,19 @@
<?php
namespace Smf\ConnectionPool;
class BorrowConnectionTimeoutException extends \Exception
{
protected $timeout;
public function getTimeout(): float
{
return $this->timeout;
}
public function setTimeout(float $timeout): self
{
$this->timeout = $timeout;
return $this;
}
}

View File

@@ -0,0 +1,279 @@
<?php
namespace Smf\ConnectionPool;
use Smf\ConnectionPool\Connectors\ConnectorInterface;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine;
class ConnectionPool implements ConnectionPoolInterface
{
/**@var float The timeout of the operation channel */
const CHANNEL_TIMEOUT = 0.001;
/**@var float The minimum interval to check the idle connections */
const MIN_CHECK_IDLE_INTERVAL = 10;
/**@var string The key about the last active time of connection */
const KEY_LAST_ACTIVE_TIME = '__lat';
/**@var bool Whether the connection pool is initialized */
protected $initialized;
/**@var bool Whether the connection pool is closed */
protected $closed;
/**@var Channel The connection pool */
protected $pool;
/**@var ConnectorInterface The connector */
protected $connector;
/**@var array The config of connection */
protected $connectionConfig;
/**@var int Current all connection count */
protected $connectionCount = 0;
/**@var int The minimum number of active connections */
protected $minActive = 1;
/**@var int The maximum number of active connections */
protected $maxActive = 1;
/**@var float The maximum waiting time for connection, when reached, an exception will be thrown */
protected $maxWaitTime = 5;
/**@var float The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool */
protected $maxIdleTime = 5;
/**@var float The interval to check idle connection */
protected $idleCheckInterval = 5;
/**@var int The timer id of balancer */
protected $balancerTimerId;
/**
* ConnectionPool constructor.
* @param array $poolConfig The minimum number of active connections, the detail keys:
* int minActive The minimum number of active connections
* int maxActive The maximum number of active connections
* float maxWaitTime The maximum waiting time for connection, when reached, an exception will be thrown
* float maxIdleTime The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool
* float idleCheckInterval The interval to check idle connection
* @param ConnectorInterface $connector The connector instance of ConnectorInterface
* @param array $connectionConfig The config of connection
*/
public function __construct(array $poolConfig, ConnectorInterface $connector, array $connectionConfig)
{
$this->initialized = false;
$this->closed = false;
$this->minActive = $poolConfig['minActive'] ?? 20;
$this->maxActive = $poolConfig['maxActive'] ?? 100;
$this->maxWaitTime = $poolConfig['maxWaitTime'] ?? 5;
$this->maxIdleTime = $poolConfig['maxIdleTime'] ?? 30;
$poolConfig['idleCheckInterval'] = $poolConfig['idleCheckInterval'] ?? 15;
$this->idleCheckInterval = $poolConfig['idleCheckInterval'] >= static::MIN_CHECK_IDLE_INTERVAL ? $poolConfig['idleCheckInterval'] : static::MIN_CHECK_IDLE_INTERVAL;
$this->connectionConfig = $connectionConfig;
$this->connector = $connector;
}
/**
* Initialize the connection pool
* @return bool
*/
public function init(): bool
{
if ($this->initialized) {
return false;
}
$this->initialized = true;
$this->pool = new Channel($this->maxActive);
$this->balancerTimerId = $this->startBalanceTimer($this->idleCheckInterval);
Coroutine::create(function () {
for ($i = 0; $i < $this->minActive; $i++) {
$connection = $this->createConnection();
$ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
if ($ret === false) {
$this->removeConnection($connection);
}
}
});
return true;
}
/**
* Borrow a connection from the connection pool, throw an exception if timeout
* @return mixed The connection resource
* @throws BorrowConnectionTimeoutException
* @throws \RuntimeException
*/
public function borrow()
{
if (!$this->initialized) {
throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
}
if ($this->pool->isEmpty()) {
// Create more connections
if ($this->connectionCount < $this->maxActive) {
return $this->createConnection();
}
}
$connection = $this->pool->pop($this->maxWaitTime);
if ($connection === false) {
$exception = new BorrowConnectionTimeoutException(sprintf(
'Borrow the connection timeout in %.2f(s), connections in pool: %d, all connections: %d',
$this->maxWaitTime,
$this->pool->length(),
$this->connectionCount
));
$exception->setTimeout($this->maxWaitTime);
throw $exception;
}
if ($this->connector->isConnected($connection)) {
// Reset the connection for the connected connection
$this->connector->reset($connection, $this->connectionConfig);
} else {
// Remove the disconnected connection, then create a new connection
$this->removeConnection($connection);
$connection = $this->createConnection();
}
return $connection;
}
/**
* Return a connection to the connection pool
* @param mixed $connection The connection resource
* @return bool
*/
public function return($connection): bool
{
if (!$this->connector->validate($connection)) {
throw new \RuntimeException('Connection of unexpected type returned.');
}
if (!$this->initialized) {
throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
}
if ($this->pool->isFull()) {
// Discard the connection
$this->removeConnection($connection);
return false;
}
$connection->{static::KEY_LAST_ACTIVE_TIME} = time();
$ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
if ($ret === false) {
$this->removeConnection($connection);
}
return true;
}
/**
* Get the number of created connections
* @return int
*/
public function getConnectionCount(): int
{
return $this->connectionCount;
}
/**
* Get the number of idle connections
* @return int
*/
public function getIdleCount(): int
{
return $this->pool->length();
}
/**
* Close the connection pool and disconnect all connections
* @return bool
*/
public function close(): bool
{
if (!$this->initialized) {
return false;
}
if ($this->closed) {
return false;
}
$this->closed = true;
swoole_timer_clear($this->balancerTimerId);
Coroutine::create(function () {
while (true) {
if ($this->pool->isEmpty()) {
break;
}
$connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
if ($connection !== false) {
$this->connector->disconnect($connection);
}
}
$this->pool->close();
});
return true;
}
public function __destruct()
{
$this->close();
}
protected function startBalanceTimer(float $interval)
{
return swoole_timer_tick(round($interval) * 1000, function () {
$now = time();
$validConnections = [];
while (true) {
if ($this->closed) {
break;
}
if ($this->connectionCount <= $this->minActive) {
break;
}
if ($this->pool->isEmpty()) {
break;
}
$connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
if ($connection === false) {
continue;
}
$lastActiveTime = $connection->{static::KEY_LAST_ACTIVE_TIME} ?? 0;
if ($now - $lastActiveTime < $this->maxIdleTime) {
$validConnections[] = $connection;
} else {
$this->removeConnection($connection);
}
}
foreach ($validConnections as $validConnection) {
$ret = $this->pool->push($validConnection, static::CHANNEL_TIMEOUT);
if ($ret === false) {
$this->removeConnection($validConnection);
}
}
});
}
protected function createConnection()
{
$this->connectionCount++;
$connection = $this->connector->connect($this->connectionConfig);
$connection->{static::KEY_LAST_ACTIVE_TIME} = time();
return $connection;
}
protected function removeConnection($connection)
{
$this->connectionCount--;
Coroutine::create(function () use ($connection) {
try {
$this->connector->disconnect($connection);
} catch (\Throwable $e) {
// Ignore this exception.
}
});
}
}

View File

@@ -0,0 +1,33 @@
<?php
namespace Smf\ConnectionPool;
interface ConnectionPoolInterface
{
/**
* Initialize the connection pool
* @return bool
*/
public function init(): bool;
/**
* Return a connection to the connection pool
* @param mixed $connection
* @return bool
*/
public function return($connection): bool;
/**
* Borrow a connection to the connection pool
* @return mixed
* @throws BorrowConnectionTimeoutException
*/
public function borrow();
/**
* Close the connection pool, release the resource of all connections
* @return bool
*/
public function close(): bool;
}

View File

@@ -0,0 +1,51 @@
<?php
namespace Smf\ConnectionPool;
trait ConnectionPoolTrait
{
/**
* @var ConnectionPool[] $pools
*/
protected $pools = [];
/**
* Add a connection pool
* @param string $key
* @param ConnectionPool $pool
*/
public function addConnectionPool(string $key, ConnectionPool $pool)
{
$this->pools[$key] = $pool;
}
/**
* Get a connection pool by key
* @param string $key
* @return ConnectionPool
*/
public function getConnectionPool(string $key): ConnectionPool
{
return $this->pools[$key];
}
/**
* Close the connection by key
* @param string $key
* @return bool
*/
public function closeConnectionPool(string $key)
{
return $this->pools[$key]->close();
}
/**
* Close all connection pools
*/
public function closeConnectionPools()
{
foreach ($this->pools as $pool) {
$pool->close();
}
}
}

View File

@@ -0,0 +1,43 @@
<?php
namespace Smf\ConnectionPool\Connectors;
interface ConnectorInterface
{
/**
* Connect to the specified Server and returns the connection resource
* @param array $config
* @return mixed
*/
public function connect(array $config);
/**
* Disconnect and free resources
* @param mixed $connection
* @return mixed
*/
public function disconnect($connection);
/**
* Whether the connection is established
* @param mixed $connection
* @return bool
*/
public function isConnected($connection): bool;
/**
* Reset the connection
* @param mixed $connection
* @param array $config
* @return mixed
*/
public function reset($connection, array $config);
/**
* Validate the connection
*
* @param mixed $connection
* @return bool
*/
public function validate($connection): bool;
}

View File

@@ -0,0 +1,39 @@
<?php
namespace Smf\ConnectionPool\Connectors;
use Swoole\Coroutine\MySQL;
class CoroutineMySQLConnector implements ConnectorInterface
{
public function connect(array $config)
{
$connection = new MySQL();
if ($connection->connect($config) === false) {
throw new \RuntimeException(sprintf('Failed to connect MySQL server: [%d] %s', $connection->connect_errno, $connection->connect_error));
}
return $connection;
}
public function disconnect($connection)
{
/**@var MySQL $connection */
$connection->close();
}
public function isConnected($connection): bool
{
/**@var MySQL $connection */
return $connection->connected;
}
public function reset($connection, array $config)
{
}
public function validate($connection): bool
{
return $connection instanceof MySQL;
}
}

View File

@@ -0,0 +1,42 @@
<?php
namespace Smf\ConnectionPool\Connectors;
use Swoole\Coroutine\PostgreSQL;
class CoroutinePostgreSQLConnector implements ConnectorInterface
{
public function connect(array $config)
{
if (!isset($config['connection_strings'])) {
throw new \InvalidArgumentException('The key "connection_string" is missing.');
}
$connection = new PostgreSQL();
$ret = $connection->connect($config['connection_strings']);
if ($ret === false) {
throw new \RuntimeException(sprintf('Failed to connect PostgreSQL server: %s', $connection->error));
}
return $connection;
}
public function disconnect($connection)
{
/**@var PostgreSQL $connection */
}
public function isConnected($connection): bool
{
/**@var PostgreSQL $connection */
return true;
}
public function reset($connection, array $config)
{
/**@var PostgreSQL $connection */
}
public function validate($connection): bool
{
return $connection instanceof PostgreSQL;
}
}

View File

@@ -0,0 +1,53 @@
<?php
namespace Smf\ConnectionPool\Connectors;
use Swoole\Coroutine\Redis;
class CoroutineRedisConnector implements ConnectorInterface
{
public function connect(array $config)
{
$connection = new Redis($config['options'] ?? []);
$ret = $connection->connect($config['host'], $config['port']);
if ($ret === false) {
throw new \RuntimeException(sprintf('Failed to connect Redis server: [%s] %s', $connection->errCode, $connection->errMsg));
}
if (isset($config['password'])) {
$config['password'] = (string)$config['password'];
if ($config['password'] !== '') {
$connection->auth($config['password']);
}
}
if (isset($config['database'])) {
$connection->select($config['database']);
}
return $connection;
}
public function disconnect($connection)
{
/**@var Redis $connection */
$connection->close();
}
public function isConnected($connection): bool
{
/**@var Redis $connection */
return $connection->connected;
}
public function reset($connection, array $config)
{
/**@var Redis $connection */
$connection->setDefer(false);
if (isset($config['database'])) {
$connection->select($config['database']);
}
}
public function validate($connection): bool
{
return $connection instanceof Redis;
}
}

View File

@@ -0,0 +1,42 @@
<?php
namespace Smf\ConnectionPool\Connectors;
class PDOConnector implements ConnectorInterface
{
public function connect(array $config)
{
try {
$connection = new \PDO($config['dsn'], $config['username'] ?? '', $config['password'] ?? '', $config['options'] ?? []);
} catch (\Throwable $e) {
throw new \RuntimeException(sprintf('Failed to connect the requested database: [%d] %s', $e->getCode(), $e->getMessage()));
}
return $connection;
}
public function disconnect($connection)
{
/**@var \PDO $connection */
$connection = null;
}
public function isConnected($connection): bool
{
/**@var \PDO $connection */
try {
return !!@$connection->getAttribute(\PDO::ATTR_SERVER_INFO);
} catch (\Throwable $e) {
return false;
}
}
public function reset($connection, array $config)
{
}
public function validate($connection): bool
{
return $connection instanceof \PDO;
}
}

View File

@@ -0,0 +1,53 @@
<?php
namespace Smf\ConnectionPool\Connectors;
class PhpRedisConnector implements ConnectorInterface
{
public function connect(array $config)
{
$connection = new \Redis();
$ret = $connection->connect($config['host'], $config['port'], $config['timeout'] ?? 10);
if ($ret === false) {
throw new \RuntimeException(sprintf('Failed to connect Redis server: %s', $connection->getLastError()));
}
if (isset($config['password'])) {
$config['password'] = (string)$config['password'];
if ($config['password'] !== '') {
$connection->auth($config['password']);
}
}
if (isset($config['database'])) {
$connection->select($config['database']);
}
foreach ($config['options'] ?? [] as $key => $value) {
$connection->setOption($key, $value);
}
return $connection;
}
public function disconnect($connection)
{
/**@var \Redis $connection */
$connection->close();
}
public function isConnected($connection): bool
{
/**@var \Redis $connection */
return $connection->isConnected();
}
public function reset($connection, array $config)
{
/**@var \Redis $connection */
if (isset($config['database'])) {
$connection->select($config['database']);
}
}
public function validate($connection): bool
{
return $connection instanceof \Redis;
}
}