Phase1 后端核心:
- 新增 fsgx_v1.sql 迁移脚本(is_queue_goods/frozen_points/available_points/no_assess)
- SystemConfigServices 返佣设置扩展(周期人数/分档比例/范围/时机)
- StoreOrderCreateServices 周期循环佣金计算
- StoreOrderTakeServices 佣金发放后同步冻结积分
- StoreProductServices/StoreProduct 保存 is_queue_goods
Phase2 后端接口:
- GET /api/hjf/brokerage/progress 佣金周期进度
- GET /api/hjf/assets/overview 资产总览
- HjfPointsServices 每日 frozen_points 0.4‰ 释放定时任务
- PUT /adminapi/hjf/member/{uid}/no_assess 不考核接口
- GET /adminapi/hjf/points/release_log 积分日志接口
Phase3 前端清理:
- hjfCustom.js 路由精简(仅保留 points/log)
- hjfQueue.js/hjfMember.js API 清理/重定向至 CRMEB 原生接口
- pages.json 公排→推荐佣金/佣金记录/佣金规则
Phase4-5 前端改造:
- queue/status.vue 推荐佣金进度页整体重写
- 商品详情/订单确认/支付结果页文案与逻辑改造
- 个人中心/资产页/引导页/规则页文案改造
- HjfQueueProgress/HjfRefundNotice/HjfAssetCard 组件改造
- 推广中心嵌入佣金进度摘要
- hjfMockData.js 全量更新(公排字段→佣金字段)
Phase6 Admin 增强:
- 用户列表新增 frozen_points/available_points 列及不考核操作按钮
- hjfPoints.js USE_MOCK=false 对接真实积分日志接口
Phase7 配置文档:
- docs/fsgx-phase7-config-checklist.md 后台配置与全链路验收清单
Made-with: Cursor
280 lines
9.4 KiB
PHP
280 lines
9.4 KiB
PHP
<?php
|
|
|
|
namespace Smf\ConnectionPool;
|
|
|
|
use Smf\ConnectionPool\Connectors\ConnectorInterface;
|
|
use Swoole\Coroutine\Channel;
|
|
use Swoole\Coroutine;
|
|
|
|
class ConnectionPool implements ConnectionPoolInterface
|
|
{
|
|
/**@var float The timeout of the operation channel */
|
|
const CHANNEL_TIMEOUT = 0.001;
|
|
|
|
/**@var float The minimum interval to check the idle connections */
|
|
const MIN_CHECK_IDLE_INTERVAL = 10;
|
|
|
|
/**@var string The key about the last active time of connection */
|
|
const KEY_LAST_ACTIVE_TIME = '__lat';
|
|
|
|
/**@var bool Whether the connection pool is initialized */
|
|
protected $initialized;
|
|
|
|
/**@var bool Whether the connection pool is closed */
|
|
protected $closed;
|
|
|
|
/**@var Channel The connection pool */
|
|
protected $pool;
|
|
|
|
/**@var ConnectorInterface The connector */
|
|
protected $connector;
|
|
|
|
/**@var array The config of connection */
|
|
protected $connectionConfig;
|
|
|
|
/**@var int Current all connection count */
|
|
protected $connectionCount = 0;
|
|
|
|
/**@var int The minimum number of active connections */
|
|
protected $minActive = 1;
|
|
|
|
/**@var int The maximum number of active connections */
|
|
protected $maxActive = 1;
|
|
|
|
/**@var float The maximum waiting time for connection, when reached, an exception will be thrown */
|
|
protected $maxWaitTime = 5;
|
|
|
|
/**@var float The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool */
|
|
protected $maxIdleTime = 5;
|
|
|
|
/**@var float The interval to check idle connection */
|
|
protected $idleCheckInterval = 5;
|
|
|
|
/**@var int The timer id of balancer */
|
|
protected $balancerTimerId;
|
|
|
|
/**
|
|
* ConnectionPool constructor.
|
|
* @param array $poolConfig The minimum number of active connections, the detail keys:
|
|
* int minActive The minimum number of active connections
|
|
* int maxActive The maximum number of active connections
|
|
* float maxWaitTime The maximum waiting time for connection, when reached, an exception will be thrown
|
|
* float maxIdleTime The maximum idle time for the connection, when reached, the connection will be removed from pool, and keep the least $minActive connections in the pool
|
|
* float idleCheckInterval The interval to check idle connection
|
|
* @param ConnectorInterface $connector The connector instance of ConnectorInterface
|
|
* @param array $connectionConfig The config of connection
|
|
*/
|
|
public function __construct(array $poolConfig, ConnectorInterface $connector, array $connectionConfig)
|
|
{
|
|
$this->initialized = false;
|
|
$this->closed = false;
|
|
$this->minActive = $poolConfig['minActive'] ?? 20;
|
|
$this->maxActive = $poolConfig['maxActive'] ?? 100;
|
|
$this->maxWaitTime = $poolConfig['maxWaitTime'] ?? 5;
|
|
$this->maxIdleTime = $poolConfig['maxIdleTime'] ?? 30;
|
|
$poolConfig['idleCheckInterval'] = $poolConfig['idleCheckInterval'] ?? 15;
|
|
$this->idleCheckInterval = $poolConfig['idleCheckInterval'] >= static::MIN_CHECK_IDLE_INTERVAL ? $poolConfig['idleCheckInterval'] : static::MIN_CHECK_IDLE_INTERVAL;
|
|
$this->connectionConfig = $connectionConfig;
|
|
$this->connector = $connector;
|
|
}
|
|
|
|
/**
|
|
* Initialize the connection pool
|
|
* @return bool
|
|
*/
|
|
public function init(): bool
|
|
{
|
|
if ($this->initialized) {
|
|
return false;
|
|
}
|
|
$this->initialized = true;
|
|
$this->pool = new Channel($this->maxActive);
|
|
$this->balancerTimerId = $this->startBalanceTimer($this->idleCheckInterval);
|
|
Coroutine::create(function () {
|
|
for ($i = 0; $i < $this->minActive; $i++) {
|
|
$connection = $this->createConnection();
|
|
$ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
|
|
if ($ret === false) {
|
|
$this->removeConnection($connection);
|
|
}
|
|
}
|
|
});
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Borrow a connection from the connection pool, throw an exception if timeout
|
|
* @return mixed The connection resource
|
|
* @throws BorrowConnectionTimeoutException
|
|
* @throws \RuntimeException
|
|
*/
|
|
public function borrow()
|
|
{
|
|
if (!$this->initialized) {
|
|
throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
|
|
}
|
|
if ($this->pool->isEmpty()) {
|
|
// Create more connections
|
|
if ($this->connectionCount < $this->maxActive) {
|
|
return $this->createConnection();
|
|
}
|
|
}
|
|
|
|
$connection = $this->pool->pop($this->maxWaitTime);
|
|
if ($connection === false) {
|
|
$exception = new BorrowConnectionTimeoutException(sprintf(
|
|
'Borrow the connection timeout in %.2f(s), connections in pool: %d, all connections: %d',
|
|
$this->maxWaitTime,
|
|
$this->pool->length(),
|
|
$this->connectionCount
|
|
));
|
|
$exception->setTimeout($this->maxWaitTime);
|
|
throw $exception;
|
|
}
|
|
if ($this->connector->isConnected($connection)) {
|
|
// Reset the connection for the connected connection
|
|
$this->connector->reset($connection, $this->connectionConfig);
|
|
} else {
|
|
// Remove the disconnected connection, then create a new connection
|
|
$this->removeConnection($connection);
|
|
$connection = $this->createConnection();
|
|
}
|
|
return $connection;
|
|
}
|
|
|
|
/**
|
|
* Return a connection to the connection pool
|
|
* @param mixed $connection The connection resource
|
|
* @return bool
|
|
*/
|
|
public function return($connection): bool
|
|
{
|
|
if (!$this->connector->validate($connection)) {
|
|
throw new \RuntimeException('Connection of unexpected type returned.');
|
|
}
|
|
|
|
if (!$this->initialized) {
|
|
throw new \RuntimeException('Please initialize the connection pool first, call $pool->init().');
|
|
}
|
|
if ($this->pool->isFull()) {
|
|
// Discard the connection
|
|
$this->removeConnection($connection);
|
|
return false;
|
|
}
|
|
$connection->{static::KEY_LAST_ACTIVE_TIME} = time();
|
|
$ret = $this->pool->push($connection, static::CHANNEL_TIMEOUT);
|
|
if ($ret === false) {
|
|
$this->removeConnection($connection);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Get the number of created connections
|
|
* @return int
|
|
*/
|
|
public function getConnectionCount(): int
|
|
{
|
|
return $this->connectionCount;
|
|
}
|
|
|
|
/**
|
|
* Get the number of idle connections
|
|
* @return int
|
|
*/
|
|
public function getIdleCount(): int
|
|
{
|
|
return $this->pool->length();
|
|
}
|
|
|
|
/**
|
|
* Close the connection pool and disconnect all connections
|
|
* @return bool
|
|
*/
|
|
public function close(): bool
|
|
{
|
|
if (!$this->initialized) {
|
|
return false;
|
|
}
|
|
if ($this->closed) {
|
|
return false;
|
|
}
|
|
$this->closed = true;
|
|
swoole_timer_clear($this->balancerTimerId);
|
|
Coroutine::create(function () {
|
|
while (true) {
|
|
if ($this->pool->isEmpty()) {
|
|
break;
|
|
}
|
|
$connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
|
|
if ($connection !== false) {
|
|
$this->connector->disconnect($connection);
|
|
}
|
|
}
|
|
$this->pool->close();
|
|
});
|
|
return true;
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
$this->close();
|
|
}
|
|
|
|
protected function startBalanceTimer(float $interval)
|
|
{
|
|
return swoole_timer_tick(round($interval) * 1000, function () {
|
|
$now = time();
|
|
$validConnections = [];
|
|
while (true) {
|
|
if ($this->closed) {
|
|
break;
|
|
}
|
|
if ($this->connectionCount <= $this->minActive) {
|
|
break;
|
|
}
|
|
if ($this->pool->isEmpty()) {
|
|
break;
|
|
}
|
|
$connection = $this->pool->pop(static::CHANNEL_TIMEOUT);
|
|
if ($connection === false) {
|
|
continue;
|
|
}
|
|
$lastActiveTime = $connection->{static::KEY_LAST_ACTIVE_TIME} ?? 0;
|
|
if ($now - $lastActiveTime < $this->maxIdleTime) {
|
|
$validConnections[] = $connection;
|
|
} else {
|
|
$this->removeConnection($connection);
|
|
}
|
|
}
|
|
|
|
foreach ($validConnections as $validConnection) {
|
|
$ret = $this->pool->push($validConnection, static::CHANNEL_TIMEOUT);
|
|
if ($ret === false) {
|
|
$this->removeConnection($validConnection);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
protected function createConnection()
|
|
{
|
|
$this->connectionCount++;
|
|
$connection = $this->connector->connect($this->connectionConfig);
|
|
$connection->{static::KEY_LAST_ACTIVE_TIME} = time();
|
|
return $connection;
|
|
}
|
|
|
|
protected function removeConnection($connection)
|
|
{
|
|
$this->connectionCount--;
|
|
Coroutine::create(function () use ($connection) {
|
|
try {
|
|
$this->connector->disconnect($connection);
|
|
} catch (\Throwable $e) {
|
|
// Ignore this exception.
|
|
}
|
|
});
|
|
}
|
|
}
|