主从架构介绍

主从服务架构是一种常见的分布式系统设计模式,常用于提高系统的性能、可用性和扩展性。在这种架构中,系统中的节点被分为两类:主节点(Master)和从节点(Slave)。
在这里插入图片描述

zookeeper

zookeeper有以下几个特点:
1.集群部署:一般是3~5台机器组成一个集群,每台机器都在内存保存了zk的全部数据,机器之间互相通信同步数据,客户端连接任何一台机器都可以。
2.顺序一致性:所有的写请求都是有序的;集群中只有leader机器可以写,所有机器都可以读,所有写请求都会分配一个zk集群全局的唯一递增编号:zxid,用来保证各种客户端发起的写请求都是有顺序的。
3.原子性:要么全部机器成功,要么全部机器都不成功。
4.数据一致性:无论客户端连接到哪台节点,读取到的数据都是一致的;leader收到了写请求之后都会同步给其他机器,保证数据的强一致,你连接到任何一台zk机器看到的数据都是一致的。
5.高可用:如果某台机器宕机,会保证数据不丢失。集群中挂掉不超过一半的机器,都能保证集群可用。比如3台机器可以挂1台,5台机器可以挂2台。
6.实时性:一旦数据发生变更,其他节点会实时感知到。
7.高性能:每台zk机器都在内存维护数据,所以zk集群绝对是高并发高性能的,如果将zk部署在高配置物理机上,一个3台机器的zk集群抗下每秒几万请求是没有问题的。
8.高并发:高性能决定的,主要是基于纯内存数据结构来处理,并发能力是很高的,只有一台机器进行写,但是高配置的物理机,比如16核32G,可以支撑几万的写入QPS。所有机器都可以读,选用3台高配机器的话,可以支撑十万+的QPS。

利用ZK实现主从架构

导入依赖

<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<zookeeper.version>3.4.14</zookeeper.version>
		<curator-framework.version>2.12.0</curator-framework.version>
		<curator-recipes.version>2.12.0</curator-recipes.version>
		<ssdb.version>9.4</ssdb.version>
		<jodatime.version>2.10</jodatime.version>
		<binlog.version>0.21.0</binlog.version>
		<disruptor.version>3.4.2</disruptor.version>
	</properties>

<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
		<exclusions>
			<!-- 去除旧log依赖 -->
			<exclusion>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-starter-logging</artifactId>
			</exclusion>
		</exclusions>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-quartz</artifactId>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-log4j2 -->
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-log4j2</artifactId>
	</dependency>

	<!-- log4j2异步日志需要加载disruptor-3.0.0.jar或者更高的版本 -->
	<dependency>
		<groupId>com.lmax</groupId>
		<artifactId>disruptor</artifactId>
		<version>3.4.2</version>
	</dependency>

	<!-- zookeeper -->
	<dependency>
		<groupId>org.apache.zookeeper</groupId>
		<artifactId>zookeeper</artifactId>
		<version>${zookeeper.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-framework</artifactId>
		<version>${curator-framework.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-recipes</artifactId>
		<version>${curator-recipes.version}</version>
	</dependency>

zk客户端:

public class ZookeeperClient {

    /**
     * 客户端
     */
    private CuratorFramework client;

    /**
     * Leader选举
     */
    private LeaderLatch leader;

    public ZookeeperClient(LeaderLatch leader,CuratorFramework client){
        this.client = client;
        this.leader = leader;
    }

    /**
     * 启动客户端
     * @throws Exception
     */
    public void startZKClient() throws Exception {
        client.start();
        leader.start();
    }

    /**
     * 关闭客户端
     * @throws Exception
     */
    public void closeZKClient() throws Exception {
        leader.close();
        client.close();
    }

    /**
     * 判断是否变为领导者
     * @return
     */
    public boolean hasLeadership(){
        return leader.hasLeadership();
    }

    public CuratorFramework getClient() {
        return client;
    }

    public void setClient(CuratorFramework client) {
        this.client = client;
    }

    public LeaderLatch getLeader() {
        return leader;
    }

    public void setLeader(LeaderLatch leader) {
        this.leader = leader;
    }

public class ZookeeperClientInfo {

    /**
     * 是否是leader 默认为false
     */
    public static boolean isLeader = false;

    /**
     * 客户端ID
     */
    private String id;

    /**
     * 连接信息字符串
     */
    private String connectString;

    /**
     * 节点路径
     */
    private String path;

    /**
     * 连接超时时间
     */
    private Integer connectTimeOut;

    /**
     * 最大连接次数
     */
    private Integer maxRetries;

    /**
     * 重连休眠时间
     */
    private Integer retrySleepTime;

    public static boolean isLeader() {
        return isLeader;
    }

    public static void setIsLeader(boolean isLeader) {
        ZookeeperClientInfo.isLeader = isLeader;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getConnectString() {
        return connectString == null ? null : connectString.replaceAll("//s+", "");
    }

    public void setConnectString(String connectString) {
        this.connectString = connectString;
    }

    public String getPath() {
        return path;
    }

    public void setPath(String path) {
        this.path = path;
    }

    public Integer getConnectTimeOut() {
        return connectTimeOut;
    }

    public void setConnectTimeOut(Integer connectTimeOut) {
        this.connectTimeOut = connectTimeOut;
    }

    public Integer getMaxRetries() {
        return maxRetries;
    }

    public void setMaxRetries(Integer maxRetries) {
        this.maxRetries = maxRetries;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("ZookeeperClientInfo{ ").append("id=").append(id)
                .append(",isLeader=").append(isLeader).append(", connectString=").append(connectString)
                .append(", path=").append(path).append(",connectTimeOut=").append(connectTimeOut)
                .append(", maxRetries=").append(maxRetries).append(", retrySleepTime=").append(retrySleepTime).append(" }");
        return sb.toString();
    }
}

监听器,来监听主节点变更,实现主要是继承LeaderLatchListener类

@Component
public class ZookeeperClientListener implements LeaderLatchListener {

    private final static Logger log = LoggerFactory.getLogger(ZookeeperClientListener.class);

    @Autowired
    private ChangeLeaderService changeLeaderService;

    /**
     *  将本服务达成jar包,部署到2台服务器上。启动两个服务。
     *  1、第一台服务(机器1)启动后抢到leader,会进入到该方法中。另外一台服务(机器2)会进入到notLeader()中。
     *  2、当机器1宕机后,连接断开后zookeeper会删除临时节点。机器2根据选举会成为leader,成为leader后会进入到isLeader()中
     *     然后在changeLeaderService.taskExecut() 再次将定时任务做补偿处理。
     */
    @Override
    public void isLeader() {
        log.error("{},当前服务已变为leader,将从事业务消费======>>>>", JodaDateUtil.date2String(new Date()));
        ZookeeperClientInfo.isLeader = true;

        // 切换机器后,继续执行上一个机器未完成的定时任务。
        changeLeaderService.taskExecut();
    }

    @Override
    public void notLeader() {
        log.error("{},当前服务已退出leader,不再从事消费业务=====>>>", JodaDateUtil.date2String(new Date()));
        ZookeeperClientInfo.isLeader = false;
    }
}

当服务主从切换时的补偿措施:

public interface ChangeLeaderService {

    /**
     * 主从服务切换时,手动触发分配到leader机器上的定时任务中的业务逻辑
     */
    void taskExecut();
}
@Service
public class ChangeLeaderServiceImpl implements ChangeLeaderService {

    private final static Logger log = LoggerFactory.getLogger(ChangeLeaderServiceImpl.class);
;
    @Autowired
    private SSDB ssdb;

    @Override
    public void taskExecut() {
        //TODO 从ssdb 中查询出定时任务的标识,是否正常执行完成,未完成的话,在这里再触发执行。
        log.info("===ChangeLeaderServiceImpl===taskExecut()===");
        /*Response response = ssdb.get(Constant.TEST_STATE);
        if (response.ok() && response.datas.size() > 0) {
            int tenantState = byteArrayToInt(response.datas.get(0));
            if (Constant.TASK_EXECUTING == tenantState) { //当前任务未完成,接着完成
                // service.do(); 伪代码
                log.info("service.do();");
                ssdb.set(Constant.TEST_STATE, Constant.TASK_END);
            }
        }*/
    }

    /**
     * byte数组转int
     * @param b
     * @return
     */
    private int byteArrayToInt(byte[] b){
        String str = byteArrayToString(b);
        return StringToInt(str);
    }

    /**
     * byte数组转string
     * @param b
     * @return
     */
    private String byteArrayToString(byte[] b) {
        if (null == b || b.length == 0) {
            return "";
        }
        return new String(b);
    }

    /**
     * string转int
     * @param str
     * @return
     */
    private int StringToInt(String str){
        if (StringUtils.isEmpty(str)){
            return 0;
        }
        return Integer.parseInt(str);
    }
}

zk的配置信息

@Component
public class ZookeeperConfig {

    /**
     * zk 地址
     */
    @Value("${spring.slaveof.zk.addr}")
    private String addr;

    /**
     * 重试策略----最大重试次数
     */
    @Value("${spring.slaveof.zk.max}")
    private int max;

    /**
     * 重试策略-----sleepTime
     */
    @Value("${spring.slaveof.zk.sleep}")
    private int sleepTime;

    /**
     * 连接超时时间
     */
    @Value("${spring.slaveof.zk.connection}")
    private int connectionTime;

    /**
     * 会话超时时间
     */
    @Value("${spring.slaveof.zk.session}")
    private int sessionTime;

    public String getAddr() {
        return addr;
    }

    public int getMax() {
        return max;
    }

    public int getSleepTime() {
        return sleepTime;
    }

    public int getConnectionTime() {
        return connectionTime;
    }

    public int getSessionTime() {
        return sessionTime;
    }
}

服务启动限制性的类,了解ApplicationRunner,SpringBoot项目启动时,若想在启动之后直接执行某一段代码,就可以用 ApplicationRunner这个接口,并实现接口里面的run(ApplicationArguments args)方法,方法中写上自己的代码逻辑。也就是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

@Component
public class ZkDemoApplicationRunner implements ApplicationRunner {

    private final static Logger log = LoggerFactory.getLogger(ZkDemoApplicationRunner.class);

    @Autowired
    private ZookeeperClientListener zkClientListener;

    @Autowired
    private ZookeeperConfig zookeeperConfig;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        log.info("====================>>>>>>>>启动执行zk>>>>>>>>==================");
        log.error("===>>>>>>>>zookeeper: addr:{}, sleepTime:{}, max:{}, connectionTime:{}=====", zookeeperConfig.getAddr(), zookeeperConfig.getSleepTime(), zookeeperConfig.getMax(), zookeeperConfig.getConnectionTime());
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperConfig.getAddr())
                .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getSleepTime(), zookeeperConfig.getMax()))
                .connectionTimeoutMs(zookeeperConfig.getConnectionTime()).build();
        LeaderLatch leaderLatch = new LeaderLatch(client, "/diaoliwei", "client1", LeaderLatch.CloseMode.NOTIFY_LEADER);
        if (zkClientListener == null) {
            log.error("==================>>>>>>>>>>>>>>>>zkClientListener is null=====>>>>>>>>>>>");
        }
        leaderLatch.addListener(zkClientListener);
        ZookeeperClient zkClient = new ZookeeperClient(leaderLatch, client);
        try {
            zkClient.startZKClient();
        } catch (Exception e) {
            log.error("======>>>>>>zk客户端连接失败<<<<<=====error:{}===", e);
            return;
        }
        CuratorFrameworkState state = client.getState();
        if (CuratorFrameworkState.STOPPED == state) {
            log.error("zk客户端已关闭");
            return;
        }

        /*while (true) {  // 测试日志用
            try {
                if(!zkClient.hasLeadership()){
                    log.info("2当前服务不是leader");
                    Thread.sleep(2000);
                    log.error("error:::::::Test02 do it...>>>>>>> ");
                    continue;
                }  else {
                    log.info("2当前服务是leader");
                }
                log.info("Test02 do it... ");
                log.error("Test02 do it...>>>>>>> ");
            } catch (Exception e) {
                log.error("Exception=====>>>>>>>>>>>>eeee:", e);
            }
        }*/

        //log.info("======>>>>>zk客户端连接成功<<<<<<=======");
    }
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部