题外话:学习一个技术的必经之路是不仅仅看到他解决的问题,还需要看到技术的缺陷,帮助我们全面的认识这个技术。

1.实现分布式事务的三种方案

1.XA:两阶段提交,维护一个事务处理器,协调多个资源管理器的事务。在事务开始前,预先询问各个资源的准备情况,多个资源均回复ack后就真正的应用事务,否则回滚。(有点像etcd的多个节点通信前commit事务,在一半节点应答后才apply)但是如果依赖多个资源管理器(数据库)那么肯定是不适合高并发的,效率太低。

2.TCC强一致性方案:Try,comfirm,cancel。Try表示预留或者锁定资源,comfirm表示执行事务,cancel表示任何一个服务失败后,事务失败后补偿部分,回滚事务中成功的部分,因此这个补偿代码调用者自己写过于复杂。我们希望他能像mysql一样自动回滚。

3.利用mq的最终一致性方案:在mq中有一个**半消息(消息需要发送和确认才能被消费)**很适合分布式事务。半消息通过消息队列的事务或可靠性机制实现可靠性和一致性,不依赖于消费者的确认。A服务向mq发送一个prepare消息,A服务执行事务成功后确认prepare消息,如此消费者才能消费消息。如果A服务超时或失败,这条半消息也会被丢弃,事务回滚。为了防止你事务执行成功但是消息没发出去,mq会轮询所有prepare消息回调我们的接口看服务是不是事务执行成功。那么如果B服务事务失败了怎么办?重试&想办法通知A服务回滚。

还有一种依赖消费者确认的方法:ACK消息,生产者事务执行后发送消息,等待消费者ack,消费者执行后需要向mq发送ack.生产者可以选择重新发送消息,以确保消息的可靠性。

“Kafka事务处理”,可以用来实现分布式事务:参考
开启Kafka事务:在生产者端,可以通过调用initTransactions()方法来开启Kafka事务。这将为当前线程关联一个事务ID,并将生产者置于事务模式。
开始事务:生产者可以调用 beginTransaction()方法来开始一个新的事务。在这之后,所有发送到Kafka主题的消息都将与该事务相关联。
发送消息:在事务中,通过调用send()方法发送消息到Kafka主题。这些消息不会立即提交到主题,而是在事务提交之前缓存在生产者端。
执行其他操作:在事务中,你可以执行其他的操作,比如查询数据库、调用其他服务等。
提交事务:一旦所有的消息都发送完成,你可以调用commitTransaction()方法来提交事务。这将把所有缓存在生产者端的消息一起提交到Kafka主题。
处理事务回滚:如果在事务过程中发生了错误或者需要回滚事务,你可以调用abortTransaction()方法来中止当前事务。这将清除生产者端缓存的消息,使其不会提交到Kafka主题。

就是说开启事务会与当前线程关联一个事务id中间各个服务事务执行成功可以发送消息给mq这时消息会缓存到生产者的缓存中,如果所有事务都成功执行提交事务,真正的应用,持久化到kafka的topic分区。任何一个事务处理失败,利用abortTransaction()中止事务。

2.分布式锁

redis实现:setns,ex和reddsion。

  1. 获得当前时间
  2. 挨个向各个节点获得锁
  3. 如果有一半以上节点获得成功,且锁的有效时间大于获取锁的过程花费的时间,则认为成功。
  4. 否则,向所有节点释放锁,因为失败。

数据库的乐观锁:资源修改后:update(操作失败就重试)

  1. 获得资源的版本号(设置version字段)
  2. 直接修改资源
  3. 这时检查version与原先是不是一样,如果一样就认为只有当前线程修改了资源
  4. 如果不一样就回滚刚才的修改,再从第一步重试。

数据库的悲观锁:资源修改前:select …for update(其他线程阻塞等待)

  1. 开启事务,预取资源(select …for update)
  2. 修改资源
  3. 提交事务

zookeeper来实现:

  1. 在某节点下创建一个临时有序节点,获得序号
  2. 检查当前序号是不是该目录下最小的,如果是就算获得到了锁
  3. 否则监听(监听事件)前一个节点,等他释放就获得。
  4. 因为监听到的是前一个结点所以我认为不会有惊群效应。

etcd来实现

  1. 开启事务,申请一个租约和id
  2. 比较reversion是否为0,存储kv对,
  3. revision最小的获得锁
  4. 监听前一个revision的key.因为监听到的是前一个结点所以我认为不会有惊群效应。

Lease 机制」:即租约机制(TTL),Etcd 可以为存储的 kv 对设置租约,当租约到期,kv 将失效删除;当然也支持 refresh 续约。
Revision 机制」:存储的每个 key 带有一个 Revision 属性值,Etcd 每进行一次事务操作,对应的全局 Revision 值都会加一,因此每个 key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就能知道写操作的顺序。
公平锁机制」:多个程序同时抢锁时,会根据 Revision 值大小依次获得锁,可以有效避免 “惊群效应”,公平获取。
Watch 机制」:即监听机制,Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个目录(前缀机制),当被 Watch 的 key 或目录发生变化,客户端将收到通知。

func (m *Mutex) Lock(ctx context.Context) error {
 resp, err := m.tryAcquire(ctx)
 ...
}
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
 s := m.s
 client := m.s.Client()

 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
 cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
 // put self in lock waiters via myKey; oldest waiter holds lock
 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 // reuse key in case this session already holds the lock
 get := v3.OpGet(m.myKey)
 // fetch current holder to complete uncontended path with only one RPC
 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
 resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
 if err != nil {
  return nil, err
 }
 m.myRev = resp.Header.Revision
 if !resp.Succeeded {
  m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
 }
 return resp, nil
}

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部