`

Redis Sentinel 读写分离 Slave 连接池

 
阅读更多
更好阅读体验,请移步:http://www.jack-yin.com/coding/spring-boot/2683.html

0. 背景

Reids除了配置集群实现高可用之外,对于单机版的Redis,可以通过Master-Slave架构,配合使用Sentinel机制实现高可用架构,
同时客户端可以实现自动失效转移。

类似于JdbcTemplate,Spring中使用RedisTemplate来操作Redis。Spring Boot中只需引入如下Maven依赖,即可自动配置
一个RedisTemplate实例。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

RedisTemplate需要一个RedisConnectionFactory来管理Redis连接。 可以在项目中定义一个RedisSentinelConfiguration给
RedisConnectionFactory,即可生成一个基于Sentinel的连接池,并且实现了自动失效转移:当master失效时,Sentinel自动提升一个slave
成为master保证Redis的master连接高可用。

下面是基于Sentinel的RedisConnectionFactory的典型配置

@Value("${spring.redis.password}")
    private String redisPasswd;

    @Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(sentinelConfig);
        System.out.println(jedisConnectionFactory.getClientConfiguration().getClientName());
        return jedisConnectionFactory;
    }

查看 org.springframework.data.redis.connection.jedis.JedisConnectionFactory源码发现,
当配置了RedisSentinelConfiguration后,RedisConnectionFactory会返回一个JedisSentinelPool连接池。该连接池里面所有的连接
都是连接到Master上面的。 同时,在JedisSentinelPool中为每一个Sentinel都配置了+switch-master频道的监听。 当监听到+switch-master消息后
表示发生了master切换,有新的Master产生,然后会重新初始化到新Master的连接池。

至此,我们知道基于Sentinel可以创建RedisConnectionFactory,并可实现自动失效转移,
但RedisConnectionFactory只会创建到Master的连接。 一般情况下,如果所有的连接都是连接到Master上面,Slave就完全当成Master的备份了,造成性能浪费。
通常,Slave只是单纯的复制Master的数据,为避免数据不一致,不应该往Slave写数据,可以在Redis配置文件中配置slave-read-only yes,让Slave拒绝所有的写操作。
于是,对于一个基于Sentinel的Master-Slave Redis 服务器来说,可以将Master配置为可读写服务器,将所有Slave配置为只读服务器来实现读写分离,以充分利用服务器资源,
并提高整个Redis系统的性能。

1. 提出问题

JedisSentinelPool连接池中的连接都是到Master的连接,那么如何获取到Slave的连接池呢? 分析了spring-boot-starter-data-redis和jedis之后,发现,
并没有现成的Slave连接池可以拿来用,于是决定写一个。

2. 分析问题

通过RedisSentinelConfiguration,可以拿到sentinel的IP和端口,就可以连接到sentinel,再调用sentinel slaves mymaster命令,就可以拿到slave的IP和port。
然后就可以创建到slave的连接了。

继续查看JedisFactory源码,了解到其实现了PooledObjectFactory<Jedis>接口,该接口来自org.apache.commons.pool2,由此可见,Jedis连接池是借助Apache
commons.pool2来实现的。

[-----------------UML-1---------------------------]


由图看到,JedisConnectionFactory创建一个JedisSentinelPool,JedisSentinelPool创建JedisFactory,JedisFactory实现了PooledObjectFactory接口
,在MakeObject()方法中产生新的Redis连接。 在JedisSentinelPool中定义MasterListener还订阅+switch-master频道,一旦发生Master转移事件,自动作失效转移
重新初始化master连接池。

3. 解决问题

模仿JedisConnectionFactory,JedisSentinelPool,和JedisFactory, 创建JedisSentinelSlaveConnectionFactory,JedisSentinelSlavePool和JedisSentinelSlaveFactory
它们之间的关系,如图UML-2所示。

[-----------------UML-2---------------------------]

其中,JedisSentinelSlaveConnectionFactory就是可以传递给RedisTemplate的。JedisSentinelSlaveConnectionFactory继承自JedisConnectionFactory
并且覆盖了createRedisSentinelPool方法,在JedisConnectionFactory中,该方法会返回一个JedisSentinelPool,而新的方法会返回JedisSentinelSlavePool。
JedisSentinelSlavePool和JedisSentinelPool都是继承自Pool<Jedis>的。 JedisSentinelSlavePool会生成JedisSentinelSlaveFactory,
JedisSentinelSlaveFactory实现了PooledObjectFactory<Jedis>接口,在public PooledObject<Jedis> makeObject()方法中,通过sentinel连接,
调用sentinel slaves命令,获取所有可用的slave的ip和port,然后随机的创建一个slave连接并返回。

JedisSentinelSlaveConnectionFactory的createRedisSentinelPool方法
@Override
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

1) 通过配置RedisSentinelConfiguration传递sentinel配置和master name给JedisSentinelSlaveConnectionFactory,然后sentinel配置和master name
会传递到JedisSentinelSlavePool和JedisSentinelSlaveFactory中
2)创建 JedisSentinelSlavePool,在JedisSentinelSlavePool中启动监听,监听"+switch-master"频道,一旦新master产生,即初始化连接池
3) 连接池有JedisSentinelSlaveFactory来代理,JedisSentinelSlaveFactory实现了PooledObjectFactory<Jedis>
在makeObject()中首先根据配置的Sentinel Set找到一个可用的sentinel连接,然后执行sentinel slaves master_name获取所有slave列表
随机选择一个slave创建连接。 如果连接不成功则重试,最大重试5次,依然不能成功创建连接则抛出异常。
4) 由图uml-2可知,JedisConnectionFactory实现了InitializingBean,Spring会在Bean初始化之后,调用接口方法void afterPropertiesSet() throws Exception;
在这个方法中创建连接池
5) JedisConnectionFactory实现了DisposableBean,会在Spring 容器销毁时,调用public void destroy() 方法销毁连接池
6)

4 实战
4.1  redis-sentinel-slave-connection-factory 工程结构
1) pom文件
---------------------------pom.xml-------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.jack.yin</groupId>
  <artifactId>redis-sentinel-slave-connection-factory</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>spring-boot-starter-redis-readonly-connection-factory</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>


  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.0.1.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>


    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2) JedisSentinelSlaveFactory.java
----------------------JedisSentinelSlaveFactory.java----------------------
package redis.clients.jedis;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import redis.clients.jedis.exceptions.InvalidURIException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.JedisURIHelper;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.net.URI;
import java.security.SecureRandom;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class JedisSentinelSlaveFactory implements PooledObjectFactory<Jedis> {
    private final  String masterName;
    private final int retryTimeWhenRetrieveSlave = 5;

    private final AtomicReference<HostAndPort> hostAndPortOfASentinel = new AtomicReference<HostAndPort>();
    private final int connectionTimeout;
    private final int soTimeout;
    private final String password;
    private final int database;
    private final String clientName;
    private final boolean ssl;
    private final SSLSocketFactory sslSocketFactory;
    private SSLParameters sslParameters;
    private HostnameVerifier hostnameVerifier;

    public JedisSentinelSlaveFactory(final String host, final int port, final int connectionTimeout,
                        final int soTimeout, final String password, final int database, final String clientName,
                        final boolean ssl, final SSLSocketFactory sslSocketFactory, final SSLParameters sslParameters,
                        final HostnameVerifier hostnameVerifier,String masterName) {
        this.hostAndPortOfASentinel.set(new HostAndPort(host, port));
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        this.ssl = ssl;
        this.sslSocketFactory = sslSocketFactory;
        this.sslParameters = sslParameters;
        this.hostnameVerifier = hostnameVerifier;
        this.masterName = masterName;
    }

    public JedisSentinelSlaveFactory(final URI uri, final int connectionTimeout, final int soTimeout,
                        final String clientName, final boolean ssl, final SSLSocketFactory sslSocketFactory,
                        final SSLParameters sslParameters, final HostnameVerifier hostnameVerifier,String masterName) {
        if (!JedisURIHelper.isValid(uri)) {
            throw new InvalidURIException(String.format(
                "Cannot open Redis connection due invalid URI. %s", uri.toString()));
        }

        this.hostAndPortOfASentinel.set(new HostAndPort(uri.getHost(), uri.getPort()));
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = JedisURIHelper.getPassword(uri);
        this.database = JedisURIHelper.getDBIndex(uri);
        this.clientName = clientName;
        this.ssl = ssl;
        this.sslSocketFactory = sslSocketFactory;
        this.sslParameters = sslParameters;
        this.hostnameVerifier = hostnameVerifier;
        this.masterName = masterName;
    }

    public void setHostAndPortOfASentinel(final HostAndPort hostAndPortOfASentinel) {
        this.hostAndPortOfASentinel.set(hostAndPortOfASentinel);
    }

    @Override
    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.getDB() != database) {
            jedis.select(database);
        }

    }

    @Override
    public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
        final BinaryJedis jedis = pooledJedis.getObject();
        if (jedis.isConnected()) {
            try {
                try {
                    jedis.quit();
                } catch (Exception e) {
                }
                jedis.disconnect();
            } catch (Exception e) {

            }
        }

    }

    @Override
    public PooledObject<Jedis> makeObject() throws Exception {
        final Jedis jedisSentinel = getASentinel();

        List<Map<String,String>> slaves = jedisSentinel.sentinelSlaves(this.masterName);
        if(slaves == null || slaves.isEmpty()) {
            throw new JedisException(String.format("No valid slave for master: %s",this.masterName));
        }

        DefaultPooledObject<Jedis> result = tryToGetSlave(slaves);

        if(null != result) {
            return result;
        } else {
            throw new JedisException(String.format("No valid slave for master: %s, after try %d times.",
                this.masterName,retryTimeWhenRetrieveSlave));
        }

    }

    private DefaultPooledObject<Jedis> tryToGetSlave(List<Map<String,String>> slaves) {
        SecureRandom sr = new SecureRandom();
        int retry = retryTimeWhenRetrieveSlave;
        while(retry >= 0) {
            retry--;
            int randomIndex = sr.nextInt(slaves.size());
            String host = slaves.get(randomIndex).get("ip");
            String port = slaves.get(randomIndex).get("port");
            final Jedis jedisSlave = new Jedis(host,Integer.valueOf(port), connectionTimeout,soTimeout,
                ssl, sslSocketFactory,sslParameters, hostnameVerifier);
            try {
                jedisSlave.connect();
                if (null != this.password) {
                    jedisSlave.auth(this.password);
                }
                if (database != 0) {
                    jedisSlave.select(database);
                }
                if (clientName != null) {
                    jedisSlave.clientSetname(clientName);
                }
                return  new DefaultPooledObject<Jedis>(jedisSlave);

            } catch (Exception e) {
                jedisSlave.close();
                slaves.remove(randomIndex);
                continue;
            }
        }

        return null;
    }

    private Jedis getASentinel() {
        final HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();
        final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
            soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

        try {
            jedis.connect();
        } catch (JedisException je) {
            jedis.close();
            throw je;
        }
        return jedis;
    }

    @Override
    public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
        // TODO maybe should select db 0? Not sure right now.
    }

    @Override
    public boolean validateObject(PooledObject<Jedis> pooledJedis) {
        final BinaryJedis jedis = pooledJedis.getObject();
        try {
            HostAndPort hostAndPort = this.hostAndPortOfASentinel.get();

            String connectionHost = jedis.getClient().getHost();
            int connectionPort = jedis.getClient().getPort();

            return hostAndPort.getHost().equals(connectionHost)
                && hostAndPort.getPort() == connectionPort && jedis.isConnected()
                && jedis.ping().equals("PONG");
        } catch (final Exception e) {
            return false;
        }
    }
}

3) JedisSentinelSlavePool.java
-----------------------------------JedisSentinelSlavePool.java-------------------------------
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.util.Pool;

import java.security.InvalidParameterException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

public class JedisSentinelSlavePool extends Pool<Jedis> {
    private final  String masterName;

    protected GenericObjectPoolConfig poolConfig;

    protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
    protected int soTimeout = Protocol.DEFAULT_TIMEOUT;

    protected String password;

    protected int database = Protocol.DEFAULT_DATABASE;

    protected String clientName;

    protected final Set<JedisSentinelSlavePool.MasterListener> masterListeners = new HashSet<JedisSentinelSlavePool.MasterListener>();

    protected Logger logger = LoggerFactory.getLogger(JedisSentinelSlavePool.class.getName());

    private volatile JedisSentinelSlaveFactory factory;
    private volatile HostAndPort currentSentinel;

    private Set<String> sentinels;

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig) {
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
            Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,
            Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels, String password) {
        this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, password);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password) {
        this(masterName, sentinels, poolConfig, timeout, password, Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int timeout) {
        this(masterName, sentinels, poolConfig, timeout, null, Protocol.DEFAULT_DATABASE);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final String password) {
        this(masterName, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, password);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password,
                             final int database) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, int timeout, final String password,
                             final int database, final String clientName) {
        this(masterName, sentinels, poolConfig, timeout, timeout, password, database, clientName);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout,
                             final String password, final int database) {
        this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);
    }

    public JedisSentinelSlavePool(String masterName, Set<String> sentinels,
                             final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,
                             final String password, final int database, final String clientName) {
        this.poolConfig = poolConfig;
        this.connectionTimeout = connectionTimeout;
        this.soTimeout = soTimeout;
        this.password = password;
        this.database = database;
        this.clientName = clientName;
        this.masterName = masterName;
        this.sentinels = sentinels;

        HostAndPort aSentinel = initsentinels(this.sentinels, masterName);
        initPool(aSentinel);
    }

    public void destroy() {
        for (JedisSentinelSlavePool.MasterListener m : masterListeners) {
            m.shutdown();
        }

        super.destroy();
    }

    public HostAndPort getCurrentSentinel() {
        return currentSentinel;
    }

    private void initPool(HostAndPort sentinel) {
        if (!sentinel.equals(currentSentinel)) {
            currentSentinel = sentinel;
            if (factory == null) {
                factory = new JedisSentinelSlaveFactory(sentinel.getHost(), sentinel.getPort(), connectionTimeout,
                    soTimeout, password, database, clientName, false, null, null, null,masterName);
                initPool(poolConfig, factory);
            } else {
                factory.setHostAndPortOfASentinel(currentSentinel);
                // although we clear the pool, we still have to check the
                // returned object
                // in getResource, this call only clears idle instances, not
                // borrowed instances
                internalPool.clear();
            }

            logger.info("Created JedisPool to sentinel at " + sentinel);
        }
    }

    private HostAndPort initsentinels(Set<String> sentinels, final String masterName) {

        HostAndPort aSentinel = null;
        boolean sentinelAvailable = false;

        logger.info("Trying to find a valid sentinel from available Sentinels...");

        for (String sentinelStr : sentinels) {
            final HostAndPort hap = HostAndPort.parseString(sentinelStr);

            logger.info("Connecting to Sentinel " + hap);

            Jedis jedis = null;
            try {
                jedis = new Jedis(hap.getHost(), hap.getPort());
                sentinelAvailable = true;

                List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
                if (masterAddr == null || masterAddr.size() != 2) {
                    logger.warn("Can not get master addr from sentinel, master name: " + masterName
                        + ". Sentinel: " + hap + ".");
                    continue;
                }

                aSentinel = hap;
                logger.info("Found a Redis Sentinel at " + aSentinel);
                break;
            } catch (JedisException e) {
                logger.warn("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e
                    + ". Trying next one.");
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }

        if (aSentinel == null) {
            if (sentinelAvailable) {
                // can connect to sentinel, but master name seems to not monitored
                throw new JedisException("Can connect to sentinel, but " + masterName
                    + " seems to be not monitored...");
            } else {
                throw new JedisConnectionException("All sentinels down, cannot determine where is "
                    + masterName + " master is running...");
            }
        }

        logger.info("Found Redis sentinel running at " + aSentinel + ", starting Sentinel listeners...");

        for (String sentinel : sentinels) {
            final HostAndPort hap = HostAndPort.parseString(sentinel);
            JedisSentinelSlavePool.MasterListener masterListener = new JedisSentinelSlavePool.MasterListener(masterName, hap.getHost(), hap.getPort());
            // whether MasterListener threads are alive or not, process can be stopped
            masterListener.setDaemon(true);
            masterListeners.add(masterListener);
            masterListener.start();
        }

        return aSentinel;
    }


    /**
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
     *             done using @see {@link redis.clients.jedis.Jedis#close()}
     */
    @Override
    @Deprecated
    public void returnBrokenResource(final Jedis resource) {
        if (resource != null) {
            returnBrokenResourceObject(resource);
        }
    }

    /**
     * @deprecated starting from Jedis 3.0 this method will not be exposed. Resource cleanup should be
     *             done using @see {@link redis.clients.jedis.Jedis#close()}
     */
    @Override
    @Deprecated
    public void returnResource(final Jedis resource) {
        if (resource != null) {
            resource.resetState();
            returnResourceObject(resource);
        }
    }

    private HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
        String host = getMasterAddrByNameResult.get(0);
        int port = Integer.parseInt(getMasterAddrByNameResult.get(1));

        return new HostAndPort(host, port);
    }

    protected class MasterListener extends Thread {

        protected String masterName;
        protected String host;
        protected int port;
        protected long subscribeRetryWaitTimeMillis = 5000;
        protected volatile Jedis j;
        protected AtomicBoolean running = new AtomicBoolean(false);

        protected MasterListener() {
        }

        public MasterListener(String masterName, String host, int port) {
            super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
            this.masterName = masterName;
            this.host = host;
            this.port = port;
        }

        public MasterListener(String masterName, String host, int port,
                              long subscribeRetryWaitTimeMillis) {
            this(masterName, host, port);
            this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
        }

        @Override
        public void run() {

            running.set(true);

            while (running.get()) {

                j = new Jedis(host, port);

                try {
                    // double check that it is not being shutdown
                    if (!running.get()) {
                        break;
                    }

                    j.subscribe(new SentinelSlaveChangePubSub(), "+switch-master","+slave","+sdown","+odown","+reboot");

                } catch (JedisConnectionException e) {

                    if (running.get()) {
                        logger.error("Lost connection to Sentinel at " + host + ":" + port
                            + ". Sleeping 5000ms and retrying.", e);
                        try {
                            Thread.sleep(subscribeRetryWaitTimeMillis);
                        } catch (InterruptedException e1) {
                            logger.info( "Sleep interrupted: ", e1);
                        }
                    } else {
                        logger.info("Unsubscribing from Sentinel at " + host + ":" + port);
                    }
                } finally {
                    j.close();
                }
            }
        }

        public void shutdown() {
            try {
                logger.info("Shutting down listener on " + host + ":" + port);
                running.set(false);
                // This isn't good, the Jedis object is not thread safe
                if (j != null) {
                    j.disconnect();
                }
            } catch (Exception e) {
                logger.error("Caught exception while shutting down: ", e);
            }
        }

        private class SentinelSlaveChangePubSub extends JedisPubSub {
            @Override
            public void onMessage(String channel, String message) {
                if(masterName==null) {
                    logger.error("Master Name is null!");
                    throw new InvalidParameterException("Master Name is null!");
                }
                logger.info("Get message on chanel: "  + channel + " published: " + message + "." +   " current sentinel " + host + ":" + port );

                String[] msg = message.split(" ");
                List<String> msgList = Arrays.asList(msg);
                if(msgList.isEmpty()) {return;}
                boolean needResetPool = false;
                if( masterName.equalsIgnoreCase(msgList.get(0))) { //message from channel +switch-master
                    //message looks like [+switch-master mymaster 192.168.0.2 6479 192.168.0.1 6479]
                    needResetPool = true;
                }
                int tmpIndex = msgList.indexOf("@") + 1;
                //message looks like  [+reboot slave 192.168.0.3:6479 192.168.0.3 6479 @ mymaster 192.168.0.1 6479]
                if(tmpIndex >0 && masterName.equalsIgnoreCase(msgList.get(tmpIndex)) ) { //message from other channels
                    needResetPool = true;
                }
                if(needResetPool) {
                    HostAndPort aSentinel = initsentinels(sentinels, masterName);
                    initPool(aSentinel);
                } else {
                    logger.info("message is not for master " + masterName);
                }

            }
        }
    }
}
4) JedisSentinelSlaveConnectionFactory.java
-------------------------------------JedisSentinelSlaveConnectionFactory.java------------------
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import redis.clients.util.Pool;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocketFactory;
import java.time.Duration;
import java.util.*;

public class JedisSentinelSlaveConnectionFactory extends JedisConnectionFactory {
    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig) {
        super(sentinelConfig);
    }

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisClientConfiguration clientConfig){
        super(sentinelConfig,clientConfig);
    }

    public JedisSentinelSlaveConnectionFactory(RedisSentinelConfiguration sentinelConfig, JedisPoolConfig poolConfig) {
        super(sentinelConfig,poolConfig);
    }


    @Override
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config){
        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelSlavePool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
            poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    private int getConnectTimeout() {
        return Math.toIntExact(getClientConfiguration().getConnectTimeout().toMillis());
    }

    private Set<String> convertToJedisSentinelSet(Collection<RedisNode> nodes) {

        if (CollectionUtils.isEmpty(nodes)) {
            return Collections.emptySet();
        }

        Set<String> convertedNodes = new LinkedHashSet<>(nodes.size());
        for (RedisNode node : nodes) {
            if (node != null) {
                convertedNodes.add(node.asString());
            }
        }
        return convertedNodes;
    }

    private int getReadTimeout() {
        return Math.toIntExact(getClientConfiguration().getReadTimeout().toMillis());
    }

    static class MutableJedisClientConfiguration implements JedisClientConfiguration {

        private boolean useSsl;
        private @Nullable
        SSLSocketFactory sslSocketFactory;
        private @Nullable
        SSLParameters sslParameters;
        private @Nullable
        HostnameVerifier hostnameVerifier;
        private boolean usePooling = true;
        private GenericObjectPoolConfig poolConfig = new JedisPoolConfig();
        private @Nullable
        String clientName;
        private Duration readTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);
        private Duration connectTimeout = Duration.ofMillis(Protocol.DEFAULT_TIMEOUT);

        public static JedisClientConfiguration create(JedisShardInfo shardInfo) {

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
            configuration.setShardInfo(shardInfo);
            return configuration;
        }

        public static JedisClientConfiguration create(GenericObjectPoolConfig jedisPoolConfig) {

            JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration configuration = new JedisSentinelSlaveConnectionFactory.MutableJedisClientConfiguration();
            configuration.setPoolConfig(jedisPoolConfig);
            return configuration;
        }

        /* (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUseSsl()
         */
        @Override
        public boolean isUseSsl() {
            return useSsl;
        }

        public void setUseSsl(boolean useSsl) {
            this.useSsl = useSsl;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslSocketFactory()
         */
        @Override
        public Optional<SSLSocketFactory> getSslSocketFactory() {
            return Optional.ofNullable(sslSocketFactory);
        }

        public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
            this.sslSocketFactory = sslSocketFactory;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getSslParameters()
         */
        @Override
        public Optional<SSLParameters> getSslParameters() {
            return Optional.ofNullable(sslParameters);
        }

        public void setSslParameters(SSLParameters sslParameters) {
            this.sslParameters = sslParameters;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getHostnameVerifier()
         */
        @Override
        public Optional<HostnameVerifier> getHostnameVerifier() {
            return Optional.ofNullable(hostnameVerifier);
        }

        public void setHostnameVerifier(HostnameVerifier hostnameVerifier) {
            this.hostnameVerifier = hostnameVerifier;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#isUsePooling()
         */
        @Override
        public boolean isUsePooling() {
            return usePooling;
        }

        public void setUsePooling(boolean usePooling) {
            this.usePooling = usePooling;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getPoolConfig()
         */
        @Override
        public Optional<GenericObjectPoolConfig> getPoolConfig() {
            return Optional.ofNullable(poolConfig);
        }

        public void setPoolConfig(GenericObjectPoolConfig poolConfig) {
            this.poolConfig = poolConfig;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getClientName()
         */
        @Override
        public Optional<String> getClientName() {
            return Optional.ofNullable(clientName);
        }

        public void setClientName(String clientName) {
            this.clientName = clientName;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getReadTimeout()
         */
        @Override
        public Duration getReadTimeout() {
            return readTimeout;
        }

        public void setReadTimeout(Duration readTimeout) {
            this.readTimeout = readTimeout;
        }

        /*
         * (non-Javadoc)
         * @see org.springframework.data.redis.connection.jedis.JedisClientConfiguration#getConnectTimeout()
         */
        @Override
        public Duration getConnectTimeout() {
            return connectTimeout;
        }

        public void setConnectTimeout(Duration connectTimeout) {
            this.connectTimeout = connectTimeout;
        }

        public void setShardInfo(JedisShardInfo shardInfo) {

            setSslSocketFactory(shardInfo.getSslSocketFactory());
            setSslParameters(shardInfo.getSslParameters());
            setHostnameVerifier(shardInfo.getHostnameVerifier());
            setUseSsl(shardInfo.getSsl());
            setConnectTimeout(Duration.ofMillis(shardInfo.getConnectionTimeout()));
            setReadTimeout(Duration.ofMillis(shardInfo.getSoTimeout()));
        }
    }
}


4.2 测试

在应用中,只需配置如下的JedisSentinelSlaveConnectionFactory,Spring Boot会自动配置一个
RedisTemplate<String,String> redisTemplate和StringRedisTemplate stringRedisTemplate;
在代码中使用@Autowired注入即可。

@Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
            .clientName("MyRedisClient")
            .build();
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory;
    }

1) pom.xml
------------------pom.xml-------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>demo</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>com.jack.yin</groupId>
<artifactId>redis-sentinel-slave-connection-factory</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>


</project>
2) RedisConfiguration.java
----------------------------------RedisConfiguration.java-------------------------------
package com.jack.yin.redis.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelSlaveConnectionFactory;

@Configuration
public class RedisConfiguration {
    @Value("${spring.redis.password}")
    private String redisPasswd;

    @Bean
    public RedisConnectionFactory jedisConnectionFactory() {
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
                .master("mymaster")
                .sentinel("192.168.0.1", 26479)
                .sentinel("192.168.0.2", 26479)
                .sentinel("192.168.0.3", 26479);
        sentinelConfig.setPassword(RedisPassword.of(redisPasswd));
        JedisClientConfiguration clientConfiguration = JedisClientConfiguration.builder()
            .clientName("MyRedisClient")
            .build();
        JedisConnectionFactory jedisConnectionFactory = new JedisSentinelSlaveConnectionFactory(sentinelConfig,clientConfiguration);
        return jedisConnectionFactory;
    }

}

3) RedisDemoApplication.java
-----------------------------RedisDemoApplication.java-------------------------------
package com.jack.yin.redis.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication(scanBasePackages = "com.jack.yin.redis")
public class RedisDemoApplication {

public static void main(String[] args) {
SpringApplication.run(RedisDemoApplication.class, args);
}
}

4) DemoApplicationTests.java
-------------------------DemoApplicationTests.java-------------------------------
package com.jack.yin.redis.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Enumeration;
import java.util.logging.ConsoleHandler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=RedisDemoApplication.class)
public class DemoApplicationTests {

@Autowired
private RedisTemplate<String,String> redisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;

    protected Logger log = Logger.getLogger(getClass().getName());

@Test
public void testGetAndSet() throws  Exception{
System.out.println(redisTemplate.opsForValue().get("hello"));
redisTemplate.opsForValue().set("set-key","don't allowed to set");
//org.springframework.dao.InvalidDataAccessApiUsageException: READONLY You can't write against a read only slave.;
System.out.println(redisTemplate.opsForValue().get("sss"));
System.out.println(redisTemplate.opsForValue().get("bbb"));
}

}


5. 总结
优点:
连接池中的连接是随机建立的到所有slave的连接
当监测到master失效转移会自动初始化连接池,确保不会连接到master上
新增slave时可以自动被发现
slave下线会被自动侦测到,然后重新初始化连接池,确保不会连接到已经下线的slave
缺点:
reids slave 需要设置slave-read-only yes
slave同步master数据需要时间,在一个短暂时间内master和slave数据会不一致
  • 大小: 57.1 KB
  • 大小: 43.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics