Skip to content

Latest commit

 

History

History
412 lines (364 loc) · 15.2 KB

redisCluster模式源码.md

File metadata and controls

412 lines (364 loc) · 15.2 KB

核心逻辑源码部分可以不看,因为predis源码很绕,我就是把代码贴过来而已,有兴趣的童鞋可以自己去追下

目录

整体运行流程

  • 初始化项目(源码很绕,有兴趣的同学可以自己去追一下,就是Client的构造方法)
  • 假如客户端执行set操作,会根据key做slot,并且根据slot去获取一个节点
    1. 如果节点配置中指定了slots,就获取指定的slot对应的节点(如50的slot会分配到7000端口,注意这个可能不是真实的redis槽的分配关系,比如redis做了槽的修改而PHP程序没有更新)
    2. 如果节点配置中没有指定slots或者指定的slots不匹配,就去猜节点(guessNode),猜节点会认为你槽分配是平均的(如有3个节点,配置列表中的第一个节点会认为是0-5469,第二个是5461-10922,第三个是10923-16383,然后用slot去匹配节点)

指定slots对节点对应关系的配置如下

$redis_list = [
        'redis://192.168.124.10:7000?slots=1-100,500-1000',
        'redis://192.168.124.10:7001?slots=101-499',
        'redis://192.168.124.10:7002'
];
$redis = new Client($redis_list, ['cluster'=>'redis']);

猜节点算法如下

$count = count($this->pool);
$index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
$nodes = array_keys($this->pool);
return $nodes[$index];
  • 将set命令格式化,如果命令是
$redis->set("a",1234);

会变成

*3
$3
SET
$1
a
$4
1234
  • 创建与节点的连接,并且发送密码和数据库号(如果配置的话)
    1. 如果连接失败,将该节点从配置列表中删除,然后从配置列表中随机取一个节点去做cluster slots操作,获取所有节点真实的slot信息、主从关系信息,然后重新去用真实的对应关系去匹配节点,用真实的节点重新连接
    2. 然后节点再次连接失败,就直接失败了,代码写死了只有一次重试机会
  • 将格式化后的命令发给redis节点
    1. 如果命令发送失败,将该节点从配置列表中删除,然后从配置列表中随机取一个节点重新连接,做cluster slots,获取所有节点真实的slot信息、主从关系信息,然后重新去用真实的对应关系去匹配节点,用真实的节点重新连接,可能重新选择的节点和发生失败的节点是一个,如果再次失败就直接异常
      • 这里第一次发送命令失败其实连接的是主节点,如果主节点宕机,redis会将从提升为主,然后PHP获取cluster slots信息的时候,会获取新提升为主节点的这个信息(比如配置了7000端口的master,连接上了7000端口但是在发送命令时候7000宕机,就会获取cluster slots,然后发现7000已经变成了slave,将7005提升成了master,就会去连接7005)
  • 读取redis节点的响应
    1. 有MOVED情况,就是节点与slot的对应关系和真实的redis服务器不一样,处理MOVED信息,获取真实的slot和节点信息(从响应信息中获取)
      • 给MOVED的节点发cluster slots获取集群所有节点的slot信息、主从关系信息,然后重新用真实的槽节点去发信息
      • 如果给MOVED节点发cluster slots失败,就配置的节点列表中删除此节点,并且随机返回一个节点去做cluster slots,然后重新发命令(有可能再次发的节点还是MOVED失败的节点,然后再次失败就是不可用了;或者再次发的节点是已经被提升为master的slave节点)
    2. 响应成功,收到OK MOVED的响应
- MOVED 15495 127.0.0.1:7002 

成功的响应

+ OK

CRC16算法源码

根据key获取slot的方法在predis\src\Cluster\ClusterStrategy.php里面,getSlot

public function getSlot(CommandInterface $command)
{
    $slot = $command->getSlot();
    if (!isset($slot) && isset($this->commands[$cmdID = $command->getId()])) {

        $key = call_user_func($this->commands[$cmdID], $command);
        if (isset($key)) {
            $slot = $this->getSlotByKey($key);
            $command->setSlot($slot);
        }
    }

    return $slot;
}

会调用getSlotByKey方法

public function getSlotByKey($key)
{
    $key = $this->extractKeyTag($key);  //获取key
    $slot = $this->hashGenerator->hash($key) & 0x3FFF;   //做hash后和16383取余

    return $slot;
}

核心的CRC16算法在predis\src\Cluster\Hash\CRC16.php,可以直接拿来用

private static $CCITT_16 = array(
    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
    0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
    0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
    0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
    0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
    0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
    0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
    0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
    0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
    0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
    0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
    0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
    0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
    0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
    0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
    0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
    0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
    0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
    0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
    0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
    0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
    0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
    0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
    0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
    0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
    0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
    0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
    0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
    0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
    0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
    0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
    0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0,
);
public function hash($value)
{
    // CRC-CCITT-16 algorithm
    $crc = 0;
    $CCITT_16 = self::$CCITT_16;

    $value = (string) $value;
    $strlen = strlen($value);

    for ($i = 0; $i < $strlen; ++$i) {
        $crc = (($crc << 8) ^ $CCITT_16[($crc >> 8) ^ ord($value[$i])]) & 0xFFFF;
    }

    return $crc;
}

这个算法和redis算slot的结果是一样的,a的在PHP中算的slot也是15495

127.0.0.1:7001> set a 123
-> Redirected to slot [15495] located at 127.0.0.1:7002
OK

核心逻辑源码

Cluster执行一条命令走的是predis\src\Connection\Aggregate\PredisCluster.php里面的executeCommand方法

public function executeCommand(CommandInterface $command)
{
    $response = $this->retryCommandOnFailure($command, __FUNCTION__);

    if ($response instanceof ErrorResponseInterface) {   //move或者master不可用会执行
        return $this->onErrorResponse($command, $response);
    }

    return $response;
}

retryCommandOnFailure方法就是去连接客户端、执行命令、捕获异常方法,这个方法居然用到了goto,这是我第一次在PHP程序里面看到goto

private function retryCommandOnFailure(CommandInterface $command, $method)
{
    $failure = false;
    RETRY_COMMAND: {
        try {
            $response = $this->getConnection($command)->$method($command);
        } catch (ConnectionException $exception) {
            $connection = $exception->getConnection();
            $connection->disconnect();

            $this->remove($connection);

            if ($failure) {   //注意这里下下面的$failure=true,所以如果第一次连接异常再次连接再异常的话,就直接不可用了
                throw $exception;
            } elseif ($this->useClusterSlots) {
                $this->askSlotsMap();
            }

            $failure = true;

            goto RETRY_COMMAND;
        }
    }

    return $response;
}

由于php客户端不知道这个slot应该连接哪个redis节点,所以predis需要去猜一个节点

public function getConnection(CommandInterface $command)
{
    //获取slot
    $slot = $this->strategy->getSlot($command);
    if (!isset($slot)) {
        throw new NotSupportedException(
            "Cannot use '{$command->getId()}' with redis-cluster."
        );
    }

    if (isset($this->slots[$slot])) {
        //如果这个slot和节点有对应关系
        return $this->slots[$slot];
    } else {
        //根据slot来猜一个节点
        return $this->getConnectionBySlot($slot);
    }
}
public function getConnectionBySlot($slot)
{
    //判断slot是否合法
    if ($slot < 0x0000 || $slot > 0x3FFF) {
        throw new \OutOfBoundsException("Invalid slot [$slot].");
    }

    if (isset($this->slots[$slot])) {
        return $this->slots[$slot];
    }
    //猜一个节点
    $connectionID = $this->guessNode($slot);
    if (!$connection = $this->getConnectionById($connectionID)) {
        $connection = $this->createConnection($connectionID);
        $this->pool[$connectionID] = $connection;
    }
    //先存一个slot和猜出来的节点的对应关系
    return $this->slots[$slot] = $connection;
}

然后就是连接服务、发送命令、处理响应操作
会先将命令格式化,处理成redis能读懂的格式

public function writeRequest(CommandInterface $command)
{
    $commandID = $command->getId();
    $arguments = $command->getArguments();

    $cmdlen = strlen($commandID);
    $reqlen = count($arguments) + 1;

    $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";

    foreach ($arguments as $argument) {
        $arglen = strlen($argument);
        $buffer .= "\${$arglen}\r\n{$argument}\r\n";
    }
    $this->write($buffer);
}

然后去连接redis

protected function tcpStreamInitializer(ParametersInterface $parameters)
{
    if (!filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
        $address = "tcp://$parameters->host:$parameters->port";
    } else {
        $address = "tcp://[$parameters->host]:$parameters->port";
    }

    $flags = STREAM_CLIENT_CONNECT;

    if (isset($parameters->async_connect) && $parameters->async_connect) {
        $flags |= STREAM_CLIENT_ASYNC_CONNECT;
    }
    if (isset($parameters->persistent)) {
        if (false !== $persistent = filter_var($parameters->persistent, FILTER_VALIDATE_BOOLEAN, FILTER_NULL_ON_FAILURE)) {
            $flags |= STREAM_CLIENT_PERSISTENT;

            if ($persistent === null) {
                $address = "{$address}/{$parameters->persistent}";
            }
        }
    }
    $resource = $this->createStreamSocket($parameters, $address, $flags);

    return $resource;
}
protected function createStreamSocket(ParametersInterface $parameters, $address, $flags)
{
    $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
    if (!$resource = @stream_socket_client($address, $errno, $errstr, $timeout, $flags)) {
        $this->onConnectionError(trim($errstr), $errno);
    }

    if (isset($parameters->read_write_timeout)) {
        $rwtimeout = (float) $parameters->read_write_timeout;
        $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
        $timeoutSeconds = floor($rwtimeout);
        $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
        stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
    }

    if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
        $socket = socket_import_stream($resource);
        socket_set_option($socket, SOL_TCP, TCP_NODELAY, (int) $parameters->tcp_nodelay);
    }

    return $resource;
}

最后发命令并读取响应

public function read()
{
    $socket = $this->getResource();
    $chunk = fgets($socket);
    if ($chunk === false || $chunk === '') {
        $this->onConnectionError('Error while reading line from the server.');
    }
    $prefix = $chunk[0];
    $payload = substr($chunk, 1, -2);

    switch ($prefix) {
        case '+':
            return StatusResponse::get($payload);

        case '$':
            $size = (int) $payload;

            if ($size === -1) {
                return;
            }

            $bulkData = '';
            $bytesLeft = ($size += 2);

            do {
                $chunk = fread($socket, min($bytesLeft, 4096));

                if ($chunk === false || $chunk === '') {
                    $this->onConnectionError('Error while reading bytes from the server.');
                }

                $bulkData .= $chunk;
                $bytesLeft = $size - strlen($bulkData);
            } while ($bytesLeft > 0);

            return substr($bulkData, 0, -2);

        case '*':
            $count = (int) $payload;

            if ($count === -1) {
                return;
            }

            $multibulk = array();

            for ($i = 0; $i < $count; ++$i) {
                $multibulk[$i] = $this->read();
            }

            return $multibulk;

        case ':':
            $integer = (int) $payload;
            return $integer == $payload ? $integer : $payload;

        case '-':
            return new ErrorResponse($payload);

        default:
            $this->onProtocolError("Unknown response prefix: '$prefix'.");

            return;
    }
}

如果有异常,比如发生了MOVED,就会去处理报错信息

protected function onMovedResponse(CommandInterface $command, $details)
{
    list($slot, $connectionID) = explode(' ', $details, 2);
    if (!$connection = $this->getConnectionById($connectionID)) {
        $connection = $this->createConnection($connectionID);
    }

    if ($this->useClusterSlots) {
        $this->askSlotsMap($connection);
    }
    $this->move($connection, $slot);
    $response = $this->executeCommand($command);

    return $response;
}

然后用MOVED指向的真实节点去获取所有节点的槽、主从对应关系,会发送一个cluster slots命令

public function askSlotsMap(NodeConnectionInterface $connection = null)
{
    if (!$connection && !$connection = $this->getRandomConnection()) {
        return array();
    }
    $this->resetSlotsMap();

    $response = $this->queryClusterNodeForSlotsMap($connection);
    foreach ($response as $slots) {
        // We only support master servers for now, so we ignore subsequent
        // elements in the $slots array identifying slaves.
        list($start, $end, $master) = $slots;
        if ($master[0] === '') {
            $this->setSlots($start, $end, (string) $connection);
        } else {
            $this->setSlots($start, $end, "{$master[0]}:{$master[1]}");
        }
    }
    return $this->slotsMap;
}

然后会再次处理发送的set命令,再次走到executeCommand里面