/ swool

Swoole 协程使用注意事项

近期公司的项目需要,用Swoole搭建TCP服务器,过程中使用了Coroutine\Redis,以及Coroutine\Mysql,在第一个版本中为了节省TCP及Redis的连接数,协程的理解的不够深入,多个协程使用了同一个Redis及Mysql客户端,导致不同的协程之间发生了数据错乱。
在此简要分析如下:

简单数据类型:
$number = 1;
go(function () use(&$number) {
  Co::sleep(1);
  $number++;
  echo "In First Coroutine Number is:".$number;
});

$number++;
echo "In Main ".$number."\n";

go(function () use(&$number) {
  $number++;
  echo "In Second Coroutine Number is:".$number;
});

本来想获得顺序为

In First Coroutine Number is:2
In Main 3
In Second Coroutine Number is:4

运行此代码获得如下结果:

In Main 2
In Second Coroutine Number is:3
In First Coroutine Number is:4

示例很简单,但是展示了协程的乱序可能会导致的程序错误。
在调试过程中因为使用了单一连接客户端导致过:

Uncaught Swoole\Error: Socket#8 has already been bound to another coroutine

因此在swoole中使用协程时,一般使用协程客户端连接池。
示例代码如下:


/**
 * Class RedisPool
 */
class RedisPool
{
  protected $mAvailable;
  protected $mPool;
  protected $mConfig;

  public function __construct($config)
  {
    $this->mAvailable = true;
    $this->mPool = new SplQueue();
    $this->mConfig = $config;
  }

  public function push($redisClient)
  {
    if ($this->mAvailable) {
      $this->mPool->enqueue($redisClient);
    }
  }

  public function pop()
  {
    $redisClient = null;
    if ($this->mAvailable) {
      try {
        $redisClient = $this->mPool->dequeue();
      } catch (\RuntimeException $exception) {
        //队列中为空
        $redisClient = new RedisClient($this->mConfig);
        if (empty($redisClient) || $redisClient->connect()) {
          $redisClient = null;
        }
      }
    }
    return $redisClient;
  }

  public function isAvailable()
  {
    return $this->mAvailable;
  }

  public function setAvailable($available)
  {
    $this->mAvailable = $available;
  }


  public function __destruct()
  {
    // 连接池销毁, 置不可用状态, 防止新的客户端进入常驻连接池, 导致服务器无法平滑退出
    $this->mAvailable = false;
    while (!$this->mPool->isEmpty()) {
      $this->mPool->dequeue();
    }
  }

  public function __call($name, $arguments)
  {
    // TODO: Implement __call() method.
    if (!is_callable(array($this->mPool, $name))) {
      throw new BadFunctionCallException("Function Name:" . $name . " Args:" . implode(",", $arguments));
    }
    return call_user_func_array([$this->mPool, $name], $arguments);
  }

在协程中,从连接池中获取连接客户端,保证每个协程使用不同的连接