sql server 锁与事务拨云见日(下) - 花阴偷移 - 博客园

mikel阅读(627)

来源: sql server 锁与事务拨云见日(下) – 花阴偷移 – 博客园

在锁与事务系列里已经写完了上篇中篇,这次写完下篇。这个系列俺自认为是有条不紊的进行,但感觉锁与事务还是有多很细节没有讲到,温故而知新可以为师矣,也算是一次自我提高总结吧,也谢谢大伙的支持。在上一篇的末尾写了事务隔离级别的不同表现,还没写完,只写到了重复读的不同隔离表现,这篇继续写完序列化,快照的不同隔离表现,事务隔离级别的总结。最后讲下事务的死锁,事务的分布式,事务的并发检查。

一. 事务隔离不同表现

设置序列化

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE

设置行版本控制已提交读

ALTER DATABASE  Test  SET  READ_COMMITTED_SNAPSHOT on; 
SET TRANSACTION ISOLATION LEVEL READ COMMITTED

设置快照隔离

ALTER DATABASE Test
SET ALLOW_SNAPSHOT_ISOLATION ON;
SET TRANSACTION ISOLATION LEVEL SNAPSHOT

1.1 已重复读和序列化与其它事务并发,的区别如下表格:

可重复读 序列化 其它事务
SET TRANSACTION ISOLATION

LEVEL REPEATABLE READ

SET TRANSACTION ISOLATION

LEVEL SERIALIZABLE

begin tran

select count(*) from product

where memberID=9708

这里显示500条数据,事务还没有结束

begin tran

select count(*) from product

where memberID=9708

这里显示500条数据,事务还没有结束

begin tran

insert into product

values(‘test2’,9708)

其它事务里,想增加一条数据。

如果并发的事务是可重复读,

这条数据可以插入成功。

如果并发的事务是序列化,

这条数据插入是阻塞的。

select count(*) from product

where memberID=9708

在事务里再次查询时,发现显示501条数据

 select count(*) from productwhere memberID=9708

在事务再次查询时,还是显示500条数据

 commit tran

在一个事务里,对批数据多次读取,符合条件

的行数会不一样。

 commit tran

事务结束

 如果并发是可序列化并且commit,

其它事务新增阻塞消失,插入开始执行。

1.2 已提交读、行版本控制已提交读、快照隔离,与其它事务并发,的区别如下表格:

已提交读 行版本控制已提交读 快照隔离 其它事务
SET TRANSACTION ISOLATION

LEVEL READ COMMITTED

ALTER DATABASE Test SET
READ_COMMITTED_SNAPSHOT
ON;

SET TRANSACTION ISOLATION
LEVEL READ COMMITTED

ALTER DATABASE TEST SET
ALLOW_SNAPSHOT_ISOLATION
ON;

SET TRANSACTION ISOLATION
LEVEL SNAPSHOT

begin tran

select model from product
where sid=9708

得到值为test

begin tran

select model from product
where sid=9708

得到值为test

begin tran

select model from product
where sid=9708

得到值为test

begin tran
update product set
model=’test1′
where sid=1
select model from product
where sid=9708

事务里再次查询 阻塞

select model from product
where sid=9708

事务里再次查询值为test, 读到行版本

select model from product
where sid=9708
事务里再次查询值为test,读到行版本
 阻塞解除,再次查询返回 test1 再次查询 test1
其它事务提交后,这里读到的是新
(修改后的)数据
再次查询 test

其它事务提交后,这里读取还是旧数据
(行版本数据)

 commit tran
 事务里updaate修改 修改成功  事务里updaate修改 修改成功  事务里updaate修改, 修改失败报错  

二. 事务总结

2.1   事务不同隔离级别的优缺点,以及使用场景 如下表格:

隔离级别          优点 缺点 使用场景
未提交读  读数据的时候,不申请共享锁,所以不会被阻塞 读到的数据,可能会脏读,不一致。 如做年度,月度统计报表,数据不一定要非常精确
已提交读   比较折中,而且是推荐的默认设置 有可能会阻塞,在一个事务里,多次读取相同的数据行,得到的结果可能不同。 一般业务都是使用此场景
可重复读 在一个事务里,多次读取相同的数据行,得到的结果可保证一致、 更严重的阻塞,在一个事务里,读取符合某查询的行数,会有变化(这是因为事务里允许新增)  如当我们在事务里需要,多次统计查询范围条件行数, 做精确逻辑运算时,需要考虑逻辑是否会前后不一致.
可序列化 最严重格的数据保护,读取符合某查询的行数,不会有变化(不允许新增)。 其它事务的增,删,改,查 范围内都会阻塞  如当我们在写事务时,不用考虑新增数据带来的逻辑错误。
行版本控制已提交读 阻塞大大减少(读与读不阻塞,读与写不阻塞)

阻塞减少,能读到新数据
大多情况下行版本控制的已提交读比快照隔离更受欢迎:
1、RCSI比SI占用更少的tempdb空间 。
2、RCSI支持分布式事务,而SI不支持 。
3、RCSI不会产生更新冲突 。
4、RCSI无需再应用程序端作任何修改。唯一要更改的只是一个数据库选项。

写与写还是会阻塞,行版本是存放在tempdb里,数据修改的越多,需要

存储的信息越多,维护行版本就

需要越多的的开销

如果默认方式阻塞比较严重,推荐用行版本控制已提交读,改善性能
快照隔离 阻塞大大减少(读与读不阻塞,读与写不阻塞)

阻塞减少,有可能读到旧数据
1、不太可能由于更新冲突而导致事务必须回滚得情况
2、需要基于运行时间长、能保证时间点一致性的多语句来生成报表的情况

维护行版本需要额外开销,且可能读到旧的数据 允许读取稍微比较旧版本信息的情况下

2.2 锁的隔离级别(补充)

了解了事务的隔离级别,锁也是有隔离级别的,只是它针对是单独的SQL查询。下面包括显示如下

     select  COUNT(1) from dbo.product(HOLDLOCK)
HOLDLOCK 在该表上保持共享锁,直到整个事务结束,而不是在语句执行完立即释放所添加的锁。

与SERIALIZABLE一样

NOLOCK 不添加共享锁和排它锁,仅应用于SELECT语句

与READ UNCOMMITTED一样

PAGLOCK 指定添加页锁(否则通常可能添加表锁)。
READPAST 跳过已经加锁的数据行, 仅应用于READ COMMITTED隔离性级别下事务操作中的SELECT语句操作
ROWLOCK 使用行级锁,而不使用粒度更粗的页级锁和表级锁

建议中用在UPDATE和DELETE语句中。

TABLOCKX 表上使用排它锁, 这个锁可以阻止其他事务读或更新这个表的数据
UPDLOCK 指定在读表中数据时设置更新锁(update lock)而不是设置共享锁,作用是允许用户先读取数据(而且不阻塞其他用户读数据),并且保证在后来再更新数据时,这一段时间内这些数据没有被其他用户修改

五.分布式事务

分布式事务是跨越两个或多个称为资源管理器的服务器。 称为事务管理器的服务器组件必须在资源管理器之间协调事务管理。在 .NET Framework 中,分布式事务通过 System.Transactions 命名空间中的 API 进行管理。 如果涉及多个永久资源管理器,System.Transactions API 会将分布式事务处理委托给事务监视器,例如 Microsoft 分布式事务协调程序 (MS DTC),在Windows服务里该服务叫Distributed Transaction Coordinator 默认未启动。

SQL server里 分布式是通过BEGIN DISTRIBUTED TRANSACTION 的T-SQL来实现,是分布式事务处理协调器 (MS DTC) 管理的 Microsoft 分布式事务的起点。执行 BEGIN DISTRIBUTED TRANSACTION 语句的 SQL Server 数据库引擎的实例是事务创建者。并控制事务的完成。 当为会话发出后续 COMMIT TRANSACTION 或 ROLLBACK TRANSACTION 语句时,控制事务实例请求 MS DTC 在所涉及的所有实例间管理分布式事务的完成(事务级别的快照隔离不支持分布式事务)。

在执行T-sql里 查询多个数据库主要是通过引用链接服务器的分布式查询,下面添加了RemoteServer链接服务器

复制代码
USE AdventureWorks2012;  
GO  
BEGIN DISTRIBUTED TRANSACTION;  
-- Delete candidate from local instance.  
DELETE AdventureWorks2012.HumanResources.JobCandidate  
    WHERE JobCandidateID = 13;  
-- Delete candidate from remote instance.  
DELETE RemoteServer.AdventureWorks2012.HumanResources.JobCandidate  
    WHERE JobCandidateID = 13;  
COMMIT TRANSACTION;  
GO
复制代码

六.事务死锁

6.1 在关系型数据库里都有死锁的概念,在并发访问量高时,事务里或者T-sql大批量操作(特别是修改删除结果集),都有可能导致死锁。死锁是由两个互相阻塞的线程组成也称为抱死。sql server死锁监视器进程会定期检查死锁,默认间隔为5秒,会自动判断将回滚开销影响最少的事务作为死锁牺牲者,并收到1025 错误,消息模板来自master.dbo.sysmessages表的where error=1205。当发生死锁时要了解两方进程的sessionid各是多少, 各会话的查询语句,冲突资源是什么。请查看死锁的分析排查

会产生死锁的资源主要是:锁 (就是上篇讲的数据行,页,表等资源),其它的死锁包括如:1. 工作者线程调度程序或CLR同步对象。2.两个线程需要更多内存,但获得授权前一个必须等待另一个。3.同一个查询的并行线程。4.多动态结果集(MARS)资源线程内部冲突。这四种很少出现死锁,重点只要关注锁资源带来的死锁。

6.2 下面事务锁资源产生死锁的原理:

1. 事务T1和事务T2 分别占用共享锁RID第1行和共享锁RID第2行。

2. 事务T1更新RID2试图获取X阻塞,事务T2更新RID2试图获取X阻塞。

3.  事务各自占有共享锁未释放,而要申请对方X锁会排斥一切锁

6.3 死锁与阻塞的区别

阻塞是指:当一个事务请求一个资源尝试获取锁时,被其它事务锁定,请求的事务会一直等待,直到其它事务把该锁释放,这就发生了阻塞,默认情况SQLServer会一直等下去。所以阻塞往往能持续很长时间,这对程序的并发性能影响很大。

死锁是两个或多个进程之间的相互等待,一般在5秒就会检测出来,消除死锁。并发性能不像阻塞那么严重。

阻塞是单向的,互相阻塞就变成了死锁。

6.3 尽量避免死锁的方法

按同一顺序访问对象

避免事务中的用户交互

保持事务简短

合理使用隔离级别

调整语句的执行计划,减少锁的申请数目。

七.事务并发检查

在检查并发方面,有很多种方式像原来的如sp_who,sp_who2等系统存储过程,perfmon计数器,sql Trace/profiler工具等,检测和分析并发问题,还包括sql server 2005以及以上的:

DMV  特别是sys.dm_os_wait_stats和sys.dm_os_waiting_tasks ,这里简单讲下并发检查

例如:查询用户会话的相关信息

SELECT  blocking_session_id FROM sys.dm_os_waiting_tasks WHERE session_id>50

blocking_session_id 阻塞会话值有时为负数:

-2 :被阻塞资源属于孤立分布式事务。

-3: 被阻塞资源属于递延恢复事务。

-4: 对于锁存器等待,内锁存器状态转换阻止了session的识别。

例如:下面查询阻塞超5秒的等待

SELECT blocking_session_id FROM sys.dm_os_waiting_tasks WHERE wait_duration_ms>5000

例如:只关注锁的阻塞,可以查看sys.dm_tran_locks
SELECT * FROM sys.dm_tran_locks WHERE request_status=’wait’

通过sys.dm_exec_requests查看用户请求

通过sqlDiag.exe收集运行系统的信息

通过errorlog里打开跟踪标识1222 来分析死锁

通过sys.sysprocess 检测阻塞。

sql server 锁与事务拨云见日(中) - 花阴偷移 - 博客园

mikel阅读(581)

来源: sql server 锁与事务拨云见日(中) – 花阴偷移 – 博客园

一.事务的概述

上一章节里,重点讲到了锁,以及锁与事务的关系。离上篇发布时间好几天了,每天利用一点空闲时间还真是要坚持。听《明朝那些事儿》中讲到”人与人最小的差距是聪明,人与人最大的差距是坚持”很经典的一句话一直记得。这篇重点围绕事务来开展。涉及的知识点包括:事务的概述,事务并发控制模型,并发产生的负面影响,事务隔离级别以及不同的表现。本章多以文字描述为主,没有多少代码量,重点是阐述不同隔离级别的不同表现,在以后的业务中,涉及到事务时,本文可以用来做个参考。

1.1 事务ACID

事务作为一个逻辑工作单元执行一系列的操作,它包括四个属性:原子性、一致性、隔离性和持久性 (ACID) 属性, 只有这样才能成为一个事务。

原子性:当一个事务被当作一个单独的工作单元时,不管事务内有什么,都是一个整体。对于其数据修改,要么全都执行,要么全都不执行。

一致性:事务在完成时,必须使所有的数据都保持一个逻辑一致状态。

隔离性:并发事务所做的修改必须与其他并发事务所做的修改隔离。 事务能识别数据所处的状态,要么是另一并发事务修改它之前的状态,要么是并发事务修改它之后的状态。

持久性:一但事务完全,它的效果是永久存于系统的。该修改即使出现系统故障也将一直保持。 SQL Server 2014和更高版本启用延迟的持久事务。

1.2 事务的操作模式有几下几种:

自动提交事务:每条单独的语句都是一个事务。

显式事务:每个事务均以 BEGIN TRANSACTION 语句显式开始,以 COMMIT 或 ROLLBACK 语句显式结束。

隐式事务:在前一个事务完成时新事务隐式启动,但每个事务仍以 COMMIT 或 ROLLBACK 语句显式完成。

批处理级事务:只能应用于多个活动结果集 (MARS),在 MARS 会话中启动的 Transact-SQL 显式或隐式事务变为批处理级事务。在SQL server 2000 必须对每个 SqlCommand 对象使用独立的 SqlConnection 对象。但是 SQL Server 2005 启用了 MARS,可以共用一个SqlConnection 对象。

本章重点讲到显式事务的隔离级别

二. 事务并发模型

2.1 并发访问是指:多用户同时访问一种资源被视为并发访问资源。 并发数据访问需要某些机制,以防止多个用户试图修改其他用户正在使用的资源时产生负面影响,机制就是下面讲的事务隔离级别。处于活动状态而不互相干涉的并发用户数据越多,并发性就越好。当一个正在修改数据的用户阻止了其他用户读取数据,或者当一个正在读取数据的用户阻止了其它用户修改数据时,并发性就降低了。

2.2 并发类型

SQLServer里数据库系统可以采用两种方式来管理并发数据访问:乐观并发控制和悲观并发控制,在sql server 2000以前只有悲观并发。乐观并发控制是一种称为行版本控制(row versioning)的技术支持。这二种技术并发控制的区别在于:是在冲突发生前进行防止,还是在发生后采用某种方法来处理冲突。

悲观并发控制

在悲观并发中,sql server是获取锁来阻塞对于其它用户正在使用数据的访问。  用户操作的读与写之间是会互相阻塞的。

乐观并发控制

乐观并发控制默认采用行版本控制使其它用户能够看到修改操作发生以前的数据状态,旧版本数据行会保存下来。因些读取数据不会受到其它用户正对该数据进行修改操作的影响,换言之修改数据不会受到其它用户正对该数据进行读取影响。 因为读取用户访问的数据行是一个被保存过的版本。  用户读与写之间不会互相阻塞,但写与写还是会发生阻塞。

2.3  事务并发带来的负面影响

修改数据的用户会影响同时读取或修改相同数据的其他用户。 即这些用户可以并发访问数据。 如果数据存储系统没有并发控制,则用户可能会看到以下负面影响:

并发影响 定义
丢失更新        当两个或多个事务选择同一行,然后基于最初选定的值更新该行时,会发生丢失更新问题。 每个事务都不知道其他事务的存在。   最后的更新 将覆盖由其他事务所做的更新,这将导致数据丢失。
脏读

 当一个用户修改了数据但尚未提交修改,而另一个正在读取的用户会读到这个修改从而导致不一致的状态发生。

不可重复读

一个用户在同一个事务中分别以两个读操作间隔读取相同资源时可能会得到不同的值。

虚拟读取(幻影)

一个事务里执行两个相同的查询,但第二个查询返回的行集合是不同的,此时就会发生虚拟读取。这种情况发生在where 查询中,比如 where count(1)<10。  同一个事务中多次使用相同的条件查询,select操作返回不同数据的结果集。

三.事务隔离级别

在sql server 2005及以上 支持五种隔离级别来控制“读”操作的行为,其中有三个是悲观并发模式,一个是乐观并发模式,剩下一个存在两种模式。 下面介绍隔离级别从允许的并发负作用(例如脏读或虚拟读取)的角度进行描述。

隔离级别  定义
未提交读
READUNCOMMITTED
 隔离事务的最低级别,未提交读不会发出共享锁,允许脏读,一个事务可能看见其他事务所做的尚未提交的更改。未提交读不会发出共享锁. 该项的作用与与SELECT表上加NOLOCK相同。
 

已提交读
READ COMMITTED

 一个事务不能读取其它事务修改但未提交的数据,避免了脏读。事务内语句运行完后便会释放共享锁,而不是等到事务提交的时候。 这是数据库引擎默认级别。
可重复读
REPEATABLE READ
 事务内查询语句运行完后不会释放共享锁,而是等到事务提交后.其它事务不能修改,删除,但可以插入新数据。
因为不是范围锁,可能发生虚拟读取
 可序列化SERIALIZABLE  隔离事务的最高级别,事务之间完全隔离。 阻止其它事务删除或插入任何行。 相当于SELECT上加HOLDLOCK相同, SELECT 操作使用 WHERE 子句时获取范围锁,主要为了避免虚拟读取
已提交读 快照隔离
READ COMMITTED SNAPSHOT ISOLATION level (RCSI)
当 READ_COMMITTED_SNAPSHOT 数据库选项设置为 ON 时,已提交读隔离使用行版本控制提供语句级读取一致性。 读取操作只需要 SCH-S 表级别的锁,不需要页锁或行锁。 使用行版本控制为每个语句提供一个在事务上一致的数据快照,因为该数据在语句开始时就存在。
快照隔离
SNAPSHOT ISOLATION level
(SI)
 快照隔离级别使用行版本控制来提供事务级别的读取一致性。 读取操作不获取页锁或行锁,只获取 SCH-S 表锁。 读取其他事务修改的行时,读取操作将检索启动事务时存在的行的版本。 当 ALLOW_SNAPSHOT_ISOLATION 数据库选项设置为 ON 时,只能对数据库使用快照隔离。 默认情况下,用户数据库的此选项设置为 OFF。

sql server主要是通过共享锁申请和释放机制的不同处理,来实现不同的事务隔离级别。不同隔离级别允许的并发副作用如下:

隔离级别 脏读 不可重复读 幻影读 并发控制模型
 未提交读 悲观
 已提交读 悲观
 已提交读快照 乐观
 可重复读 悲观
 快照 乐观
可串行化 悲观

不同隔离级别对共享锁的不同处理方式如下:

隔离级别 是否申请共享锁 何时释放 有无范围锁
未提交读
已提交读 当前语句做完时
可重复读 事务提交时
可序列化 事务提交时

四.事务隔离不同表现

   设置未提交读

SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED

设置提交读

SET TRANSACTION ISOLATION LEVEL READ COMMITTED

设置可重复读

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ

4.1 未提交读和提交读与其它事务并发,的区别如下表格:

未提交读 提交读 其它事务
SELECT Model FROM Product

WHERE SID=10905

显示model 值为test

SELECT Model FROM Product

WHERE SID=10905

显示model 值为test

begin  tran

update  product set model=’test1′

where SID=10905

SET TRANSACTION ISOLATION
LEVEL READ UNCOMMITTED
SET TRANSACTION ISOLATION

LEVEL READ COMMITTED

 这个事务将model值改为test1.

此时修改的X锁未释放

SELECT Model FROM Product

WHERE SID=10905

显示model值为test1,但这并不正确,

因为其它事务还没有提交。没有获取共享锁

SELECT Model FROM Product

WHERE SID=10905

查询被阻塞

申请获取共享锁时失败,因为X锁未释放

 
阻塞消失,得到的值还是test  rollback tran

这里事务回滚了x锁释放,值还是test

4.2  已提交读和可重复读与其它事务并发,的区别如下表格:

已提交读 可重复读 其它事务
SET TRANSACTION ISOLATION
LEVEL READ UNCOMMITTED
begin tran
SELECT Model FROM
ProductWHERE SID=10905
第一次查询显示model值为 test
SET TRANSACTION ISOLATION
LEVEL REPEATABLE READ
begin tran
SELECT Model FROM Product
WHERE SID=10905
第一次查询显示model值为 test
 
begin tran
update product set model=’test1′
where SID=10905
将model值改为 test1

另一事务是已提交读时,这里事务修改成功
提交读共享锁查询后就释放。

另一事务是可重复读时,这里事务修改阻塞
可重复读共享锁一直保留到事务提交

SELECT Model FROM Product
WHERE SID=10905
第二次查询值显示为 test1
SELECT Model FROM Product
WHERE SID=10905
第二次查询显示值显示为 test
commit tran

这里就是一个事务里多次读取同一值
结果可能不一致

  commit tran

未完…sql server 锁与事务拨云见日(下)

sql server 锁与事务拨云见日(上) - 花阴偷移 - 博客园

mikel阅读(520)

来源: sql server 锁与事务拨云见日(上) – 花阴偷移 – 博客园

一.概述

讲到SQL server锁管理时,感觉它是一个大话题,因为它不但重要而且涉及的知识点很多,重点在于要掌握高并发要先要掌握锁与事务,涉及的知识点多它包括各式各样的锁,锁的组合,锁的排斥,锁延伸出来的事务隔离级别, 锁住资源带来的阻塞,锁之间的争用造成的死锁,索引数据与锁等。这次介绍锁和事务,我想分上中下篇,上篇详细介绍锁,中篇介绍事务,下篇总结, 针对锁与事务我想把我掌握的以及参考多方面资料,整合出来尽量说详细。 最后说下,对于高级开发人员或DBA,锁与事务应该是重点关注的,它就像是数据库里的一个大boss,如完全掌握了它,数据库就会像就像庖丁解牛一样游刃有余  哈哈 。

二.锁的产生背景

在关系型数据库里锁是无处不再的。当我们在执行增删改查的SQL语句时,锁也就产生了。锁对应的就的是事务,不去显示加tran就是常说的隐式事务。当我们写个存储过程希望数据一致性时, 要么同时回滚,要么同时提交,这时我们用begin tran 来做显示事务。锁的范围就是事务。在sql server里事务默认是提交读(Read Committed) 。
锁是对目标资源(行、页、区、表..)获取所有权的锁定,是一个逻辑概念,用来保存事务的ACID. 当多用户并发同时操作数据时,为了避免出现不一致的数据,锁定是必须的机制。 但同时如果锁的数量太多,持续时间太长,对系统的并发和性能都没有好处。

三.锁的全面认识

3.1 锁住的资源

我们知道sql server的存储数据单元包括文件组,页,区,行。锁住资源范围从低到高依次对应的是:行(RID/KEY)锁,页(PAGE)锁, 表(OBJECT)锁。可通过sp_lock查看,比如: 当我们操作一条数据时应该是行锁, 大批量操作时是页锁或表锁, 这是大批量操作会使锁的数量越多,锁就会自动升级 将大量行锁合成多个页锁或表锁,来避免资源耗尽。SQL SERVER要锁定资源时,默认是从最底级开始锁起(行) 。锁住的常见资源如下:

名称 资源

说明

数据行 RID 锁住堆中(表没有建聚集索引)的单个行。格式为File:Page:SlotID  如 1:8787:4
索引键 KEY 锁住T-tree(索引)中单个行,是一个哈值值。如:(fb00a499286b)
PAGE 锁住数据页(一页8kb,除了页头和页尾,页内容存储数据)可在sys.dm_os_buffer_descriptors找到。格式FileID :Page Number 如1:187541
范围 extent 锁住区(一组连续的8个页 64kb)FileID:N页 。如:1:78427
数据表 object 通常是锁整个表。 如:2858747171
文件 File 一般是数据库文件增加或移除时。如:1
数据库 database 锁住整个数据库,比如设置修改库为只读模式时。 database ID如:7

下图是通过sp_lock的查看的,显示了锁住的资源类型以及资源

3.2 锁的类型及锁说明

锁类型 锁说明
共享锁 (S锁) 用于不更改或不更新数据的读取操作,如 SELECT 语句。
更新锁 (U锁) 它是S与X锁的混合,更新实际操作是先查出所需的数据,为了保护这数据不会被其它事务修改,加上U锁,在真正开始更新时,转成X锁。U锁和S锁兼容, 但X锁和U锁不兼容。
独占锁(排它锁)(X锁) 用于数据修改操作,例如 INSERT、UPDATE 或 DELETE。 确保不会同时对同一资源进行多重更新
意向锁(I锁) (I)锁也不是单独的锁模式,用于建立锁的层次结构。 意向锁包含三种类型:意向共享 (IS)、意向排他 (IX) 和意向排他共享 (SIX)。意识锁是用来标识一个资源是否已经被锁定,比如一个事务尝试锁住一个表,首先会检查是否已有锁在该表的行或者页上。
架构锁(Sch-M,Sch-S) 在执行依赖于表架构操作时使用,例如:添加列或删除列 这个时候使用的架构修改锁(Sch-M),用来防止其它用户对这个表格进行操作。别一种是数据库引擎在编译和执行查询时使用架构性  (Sch-S),它不会阻止其它事务访问表格里的数据,但会阻止对表格做修改性的ddl操作和dml操作。
大容量更新 (BU) 是指数据大容量复制到表中时使用BU锁,它允许多个线程将数据并发地大容量加载到同一表,同时防止其它不进行大容量加载数据的进程访问该表。
键范围 当使用可序列化事务隔离级别时(SERIALIZABLE)保护查询读取的行的范围。 确保再次运行查询时其他事务无法插入符合可序列化事务的查询的行。下章介绍的事务时再详细说

四 锁的互斥(兼容性)

在sql server里有个表,来维护锁与锁之间的兼容性,这是SQLServer预先定义好的,没有任务参数或配置能够去修改它们。如何提高兼容性呢?那就是在设计数据库结构和处理sql语句时应该考虑,尽量保持锁粒度小,这样产生阻塞的概率就会比较小,如果一个连接经常申请页面级,表级,甚至是数据库级的锁资源,程序产生的阻塞的可能性就越大。假设:事务1要申请锁时,该资源已被事务2锁住,并且事务1要申请的锁与事务2的锁不兼容。事务1申请锁就会出现wait状态,直到事务2的锁释放才能申请到。 可通过sp_lock查看wait等待(也就是常说的阻塞)

下面是最常见的锁模式的兼容性

五. 锁与事务关系

如今系统并发现象,引起的资源急用,出现的阻塞死锁一直是技术人员比较关心的。这就涉及到了事务, 事务分五种隔离级别,每个隔离级别有一个特定的并发模式,不同的隔离级别中,事务里锁的作用域,锁持续的时间都不同,后面再详细介绍事务。这里看下客户端并发下的锁与事务的关系, 可以理解事务是对锁的封装,事务就是在并发与锁之间的中间层。如下图:

六. 锁的持续时间

下面是锁在不同事务隔离级别里,所持续占用的时间:

6.1  SELECT动作要申请的锁

我们知道select 会申请到共享锁,下面来演示下共享锁在Repeatable 重复读的级别下,共享锁保留到事件提交时才释放。

具体是1.事务A设置隔离级别为Repeatable重复读,开启事务运行且不提交事务。

2.再打开一个会话窗口,使用sys.dm_tran_locks来分析查看事务的持有锁。

--开启一个事务A, 设置可重复读, 不提交
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ 
BEGIN TRAN 
SELECT  * FROM dbo.Product WHERE SID=204144
--上面执行完后,打开另一会话查询锁状态
SELECT  k.request_session_id,k.resource_type,k.request_status,k.request_mode,k.resource_description,
 OBJECT_NAME( p.object_id) as objectName,p.index_id FROM SYS.dm_tran_locks k LEFT JOIN SYS.PARTITIONS p
ON k.resource_associated_entity_id=p.hobt_id
ORDER BY request_session_id,resource_type

先看看查询单条语句的执行计划,再看看锁住的资源

通过DMV查询,我们看到:

(1)首先是锁住DATABASE资源,是数据库级别的共享锁,以防止别人将数据库删除。

(2)锁住OBJECT表资源,在Product表上加了意向共享锁IS,以防止别人修改表的定义。

(3)锁住了二个PAGE页加了意向共享锁IS,通过上面执行计划可以看出来,查询出来的数据是通过索引查询50%,RID堆查询50%。这条数据分布在二个页上,通过where SID来查找没有完全走索引查找。

(4)通过第3点可以看出,数据1个页是对应RID行,另一页对应KEY行 二个共享锁,堆位置1:112205:25  ,KEY的哈希值(70009fe3578a) 。

总结下:通过Repeatable 重复读,直要事务不提交,共享锁一直会存在。针对想减少被别人阻塞或者阻塞别人的概率,能考虑事情有:1. 尽量减少返回的记录,返回的记录越多,需要的锁也就越多,在Repeatable隔离级别及以上,更是容易造成阻塞。2.返回的数据如果是一小部份,尽量使用索引查找,避免全表扫描。3.可以的话,根据业务设计好最合适的几个索引,避免通过多个索引找到结果。

4.2  UPDATE动作要申请的锁

对于UPDATE需要先查询,再修改。具体是查询加S锁,找到将要修改的记录后先加U锁,真正修改时升级成X锁。还是通过上面的product表来演示具体:选用Repeatable级别,运行一个update语句(先kill 掉之前的会放52)

--开启一个事务, 设置可重复读, 不提交
BEGIN TRAN 
UPDATE    dbo.Product SET model='test'
 WHERE SID IN(10905,119921,204144)

通过 dmv查看,吓一跳没想到锁住了这么多资源,纠结 那下面试着来分析下为什么锁住这么多资源:使用sys.indexes查看index_id 的0,2,4各使用了什么索引

  SELECT  * FROM sys.indexes WHERE object_id= OBJECT_id('product')

(1)这个product表并没有建聚集索引,是在堆结构上建立的非索聚索引,index_id=0 是堆, index_id=2和4 又是分别二个非索聚索引

(2)同样在DATABASE和OBJECT资源 上都加了共享锁。

(3)意向排它锁IX,锁住的Page共9页 说明数据关联了9页,其中堆上3页,ix_1非索聚索引上3页,ixUpByMemberID非索聚索引上3页。

(4) 排它锁X锁住RID堆上3行,KEY索引上6行。大家可能会觉得奇怪明明只改三行的model值,为什么会涉及到9行呢?  我来解释下这个表是建了三个非聚集索引,其中ix_1索引里有包含列model,xUpByMemberID索引里也同样有包含列model,还有model数据是在堆,当堆上数据修改后,model关联的非聚集索引也要重新维护。如下图

(5) 这里还有架构锁Sch-s ,锁住了元数据。

总结:1.一定要给表做聚集索引,除了特殊情况使用堆结构。2.要修改的数据列越多,锁的数目就会越多,这里model就涉及到了9行维护。3. 描述的页面越多,意向锁就会越多,对扫描的记录也会加锁,哪怕没有修改。所以想减少阻塞要做到:1).尽量修改少的数据集,修改量越多,需要的锁也就越多。2) 尽量减少无谓的索引,索引的数目越多,需要的锁也可能越多。3.严格避免全局扫描,修改表格记录时,尽量使用索引查询来修改。

4.3  DELETE动作要申请的锁

BEGIN TRAN 
DELETE     dbo.Product WHERE SID =10905

(1) 删除了RID堆的数据,以及关联的非聚集索引三个key的值分别是(2,5,4)

(2) 在要删除的4个page上加了意向排它锁,同样对应一个RID和三个KEY。

(3)在OBJECT资源表上加了意向排它锁。

总结:在DELETE过程中是先找到符合条件的记录,然后再删除, 可以说是先SELECT后DELETE,如果有索引第一步查询申请的锁会比较 少。 对于DELETE不但删除数据本身,还会删除所有相关的索引键,一个表上的索引越多,锁的数目就会越多,也容易阻塞。为了防步阻塞我们不能不建索引,也不能随便就建索引,而是要根据业务建查询绝对有利的索引。

4.4  INSERT动作要申请的锁

BEGIN TRAN 
INSERT into    dbo.Product VALUES('modeltest','brandtest',GETDATE(),9708,'test')

对于以上三种动作,INSERT相对简单点,只需要对要插入数据本身加上X锁,对应的页加IX锁,同步更新了关联的索引三个key。

这里新增跟删除最终显示的锁一样,但在锁申请的过程中,新增不需要先查询到数据s锁,升级u锁,再升级成X锁。

七. 锁的升级

7.1 使用profiler窗口查看实时的锁升级

以单次批操作受影响的行数超过5000条时(锁数量最大值5000),升级为表锁。在SQLServer里可以选择完全关掉锁升级,虽然可以减少阻塞,但锁内存会增加,降低性能还可能造成更多死锁。

锁升级缺点:会给其它会话带来阻塞和死锁。锁升级优点:减少锁的内存开销。

检测方法:在profiler中查看lock:escalation事件类。通过查看Type列,可查看锁升级的范围,升级成表锁(object是表锁)

如下图:

如果减少批操作量,就没有看到升级表锁, 可自行通过 escalation事件查看,下图就是减少了受影响的行数。

总结:将批操作量受影响行数减少到5000以下,减少锁的升级后,发生了更频繁的死锁,原因是多个page页的争用。后有人指出你先把并行度降下来(删除500一下的数据可以不使用并行) 在语句中设置maxdop = 1 这样应该不会死锁了。具体原因还需具体分析。

7.2 使用dmv查看锁升级

sys.dm_db_index_operational_stats返回数据库中的当前较低级别 I/O、 锁定、 闩锁,和将表或索引的每个分区的访问方法活动。

index_lock_promotion_attempt_count:数据库引擎尝试升级锁的累积次数。

index_lock_promotion_count:数据库引擎升级锁的累积次数。

复制代码
SELECT  OBJECT_NAME(ddios.[object_id], ddios.database_id) AS [object_name] ,
        i.name AS index_name ,
        ddios.index_id ,
        ddios.partition_number ,
        ddios.index_lock_promotion_attempt_count ,
        ddios.index_lock_promotion_count ,
        ( ddios.index_lock_promotion_attempt_count
          / ddios.index_lock_promotion_count ) AS percent_success
FROM    sys.dm_db_index_operational_stats(DB_ID(), NULL, NULL, NULL) ddios
        INNER JOIN sys.indexes i ON ddios.object_id = i.object_id
                                    AND ddios.index_id = i.index_id
WHERE   ddios.index_lock_promotion_count > 0
ORDER BY index_lock_promotion_count DESC;
复制代码

7.3 使用dmv查看页级锁资源争用

page_lock_wait_count:数据库引擎等待页锁的累积次数。

  page_lock_wait_in_ms:数据库引擎等待页锁的总毫秒数。

  missing_index_identified:缺失索引的表。

复制代码
SELECT  OBJECT_NAME(ddios.object_id, ddios.database_id) AS object_name ,
        i.name AS index_name ,
        ddios.index_id ,
        ddios.partition_number ,
        ddios.page_lock_wait_count ,
        ddios.page_lock_wait_in_ms ,
        CASE WHEN DDMID.database_id IS NULL THEN 'N'
             ELSE 'Y'
        END AS missing_index_identified
FROM    sys.dm_db_index_operational_stats(DB_ID(), NULL, NULL, NULL) ddios
        INNER JOIN sys.indexes i ON ddios.object_id = i.object_id
                                    AND ddios.index_id = i.index_id
        LEFT OUTER JOIN ( SELECT DISTINCT
                                    database_id ,
                                    object_id
                          FROM      sys.dm_db_missing_index_details
                        ) AS DDMID ON DDMID.database_id = ddios.database_id
                                      AND DDMID.object_id = ddios.object_id
WHERE   ddios.page_lock_wait_in_ms > 0
ORDER BY ddios.page_lock_wait_count DESC;
复制代码

八. 锁的超时

在sql server 里锁默认是不会超时的,是无限的等待。多数客户端编程允许用户连接设置一个超时限制,因此在指定时间内没有反馈,客户端就会自动撤销查询, 但数据库里锁是没有释放的。

可以通 select @@lock_timeout  查看默认值是 ” -1″, 可以修改超时时间  例如5秒超时 set  lock_timeout  5000;

下面是查看锁的等待时间, wait_time是当前会话的等待资源的持续时间(毫秒)

select  session_id, blocking_session_id,command,sql_handle,database_id,wait_type
,wait_time,wait_resource
from sys.dm_exec_requests 
where blocking_session_id>50

从wait_type入手模拟SQL Server Lock - iSun - 博客园

mikel阅读(644)

来源: 从wait_type入手模拟SQL Server Lock – iSun – 博客园

一、LCK_M_S,等待获取共享锁

开始一SQL TRAN,其中执行对某数据的UPDATE。但并不COMMIT,也不ROLLBACK。

begin tran
update [dbo].[HR_Employee] set [Description]='ZZ'

这样,便使用排它锁锁定了该[Employee]表。

 

在另一会话中,执行对该表的SELECT操作。至此,死锁产生。

select * from [dbo].[HR_Employee]

 

使用下列script查询当前锁情况。

复制代码
 1 SELECT wt.blocking_session_id                    AS BlockingSessesionId
 2         ,sp.program_name                           AS ProgramName
 3         ,COALESCE(sp.LOGINAME, sp.nt_username)     AS HostName    
 4         ,ec1.client_net_address                    AS ClientIpAddress
 5         ,db.name                                   AS DatabaseName        
 6         ,wt.wait_type                              AS WaitType                    
 7         ,ec1.connect_time                          AS BlockingStartTime
 8         ,wt.WAIT_DURATION_MS/1000                  AS WaitDuration
 9         ,ec1.session_id                            AS BlockedSessionId
10         ,h1.TEXT                                   AS BlockedSQLText
11         ,h2.TEXT                                   AS BlockingSQLText
12   FROM sys.dm_tran_locks AS tl
13   INNER JOIN sys.databases db
14     ON db.database_id = tl.resource_database_id
15   INNER JOIN sys.dm_os_waiting_tasks AS wt
16     ON tl.lock_owner_address = wt.resource_address
17   INNER JOIN sys.dm_exec_connections ec1
18     ON ec1.session_id = tl.request_session_id
19   INNER JOIN sys.dm_exec_connections ec2
20     ON ec2.session_id = wt.blocking_session_id
21   LEFT OUTER JOIN master.dbo.sysprocesses sp
22     ON SP.spid = wt.blocking_session_id
23   CROSS APPLY sys.dm_exec_sql_text(ec1.most_recent_sql_handle) AS h1
24   CROSS APPLY sys.dm_exec_sql_text(ec2.most_recent_sql_handle) AS h2
复制代码

发现该LOCK的wait_type为LCK_M_S,意味着后一会话在等待着获取对该表的共享锁已完成查询工作。

 

二、LCK_M_U,等待获取更新锁。

发起一SQL会话,在其中使用更新锁(UPDLOCK)SELECT数据,而后WAIT一定的时间。

1 begin tran
2 select * from [dbo].[HR_Employee] WITH (UPDLOCK) where [Id]=7
3 waitfor delay '00:01:00' 
4 update [dbo].[HR_Employee] set [Description]='ZZ' where [Id]=7
5 commit tran

在wait的时间内,[Id]=7的行被更新锁锁住。

 

发起另一会话,使用更新锁(UPDLOCK)完成SELECT操作。

1 select * from [dbo].[HR_Employee] WITH (UPDLOCK)

 

发现后一会话被block。wait_type为LCK_M_U,表示其在等待该表的更新锁。

 

三、LCK_M_X,等待获取排它锁

将上一小节中第二个会话的操作改为UPDATE。

update [dbo].[HR_Employee] set [Description]='ZZy' where [Id]=7

后一会话同样被block,但这次的wait_type为LCK_M_X,表明其在等待用于UPDATE DATA的排它锁。

关闭”xx程序已停止工作”提示窗口_顺其自然~的博客-CSDN博客

mikel阅读(609)

来源: 关闭”xx程序已停止工作”提示窗口_顺其自然~的博客-CSDN博客

近日在工作中,接手一个项目,程序运行起来后偶发性间隔几个小时或几天就会出现如下(图1, 图2)的”xx程序已停止工作”的提示窗口,这时需要用户手动点击”关闭程序”按钮,进程才会退出。

图1

图2

当然最好的解决办法就是找出程序中导致”程序错误”的原因,但由于对接手的项目不是很熟悉,再加上时间紧迫,难以在短时间找到问题原因,于是给此程序添加一个”守护程序”(即: 检测到进程退出后就自动重启)。

但程序崩溃时,弹出的”xx程序已停止工作”导致程序进程无法退出,“守护程序”自然也起不到相应的作用。

在网上查找了一番,终于找到两种解决方法:

第一种方法

运行注册表编辑器(【开始】-【运行】中输入regedit并回车),依次定位到HKEY_CURRENT_USER\Software\Microsoft\Windows\WindowsError Reporting,在右侧窗口中找到并双击打开DontshowUI,然后在弹出的窗口中将默认值“0”修改为“1”。

那么,当程序崩溃时,就不会再出现”xx程序已停止工作”的提示框,崩溃程序进程会自动退出。

这种修改系统注册表的方法是最方便和直接的,但会对所有程序生效,如果特别注重系统的安全性,只想让指定的程序在崩溃时不出现”xx程序已停止工作”,请参考”第二种方法”。

图3

第二种方法

查看Windows任务管理器(图4)发现,程序崩溃时之所以出现”xx程序已停止”工作,是因为触发了”Windows的错误报告”机制,在我的系统(Windows 10 64位)任务管理器进程列表中会出现一个名称为”Windows问题报告”的进程,点击此进程左侧的”下拉箭头”,会出现一个窗口列表,此窗口列表就代表了当前所有弹出”xx程序已停止工作”的窗口(图5),而窗口标题就是我们崩溃程序的进程名。

图4

图5

看到此,不知道你是否已经有了启发。

解决思路如下:

在”守护程序”中定期检测Windows系统进程列表中是否出现”WerFault.exe”进程(“Windows问题报告”的进程名), 如果出现, 则查找”WerFault.exe”进程下的窗口名称是否存在”要守护程序的进程名”, 如果存在,则表示“要守护的程序崩溃并出现已停止工作”的提示框, 那么则向“WerFault.exe”进程下的“窗口”发送 WM_Close 消息,关闭此“提示窗口”,如此, “要守护的程序进程就会完全退出”, 守护程序就可以重新启动此程序了。

其实就是用程序模拟“用户手动关闭‘已停止工作’窗口。

  1. #include <windows.h>
  2. #include <tlhelp32.h> //声明快照函数文件
  3. #include “stdio.h”
  4. #include <cstring>
  5. // 根据进程ID, 返回指定进程下”第一个”窗口的窗口句柄
  6. // 注: 此程序还不够完善, 因为指定进程下可能有多个窗口
  7. HWND GetWindowHandleByPID(DWORD dwProcessID)
  8. {
  9.     HWND h = GetTopWindow(0);
  10.     while (h)
  11.     {
  12.         DWORD pid = 0;
  13.         DWORD dwTheardId = GetWindowThreadProcessId(h, &pid);
  14.         if (dwTheardId != 0)
  15.         {
  16.             if (pid == dwProcessID /*your process id*/)
  17.             {
  18.                 // here h is the handle to the window
  19.                 return h;
  20.             }
  21.         }
  22.         h = GetNextWindow(h, GW_HWNDNEXT);
  23.     }
  24.     return NULL;
  25. }
  26. int main(int argc, char *argv[])
  27. {
  28.     PROCESSENTRY32 pe32;
  29.     //在使用这个结构之前,先设置它的大小
  30.     pe32.dwSize = sizeof(pe32);
  31.     //给系统内的所有进程拍一个快照
  32.     HANDLE hProcessSnap = ::CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
  33.     //Includes the process list in the snapshot
  34.     if (hProcessSnap == INVALID_HANDLE_VALUE)
  35.     {
  36.         printf(“CreateToolhelp32Snapshot 调用失败! n”);
  37.         return -1;
  38.     }
  39.     //遍历进程快照,轮流显示每个进程信息
  40.     BOOL bMore = ::Process32First(hProcessSnap, &pe32);
  41.     while (bMore)
  42.     {
  43.         /*printf(” 进程名称为:%s\n”, pe32.szExeFile);
  44.         printf(” 进程ID为:%u \n\n”, pe32.th32ProcessID);*/
  45.         if (_stricmp(pe32.szExeFile, “werfault.exe”) == 0)
  46.         {
  47.             printf(” 进程名称为:%s\n”, pe32.szExeFile);
  48.             printf(” 进程ID为:%u \n\n”, pe32.th32ProcessID);
  49.             HWND hwnd = GetWindowHandleByPID(pe32.th32ProcessID);
  50.             if (hwnd)
  51.             {
  52.                 char szText[256] = { 0 };
  53.                 GetWindowText(hwnd, szText, 256);
  54.                 // 自己崩溃程序的”进程名”
  55.                 if (_stricmp(szText, “myProcessName.exe”) == 0)
  56.                 {
  57.                     printf(“Text: %s\n\n”, szText);
  58.                     // 关闭”xx程序已停止”提示窗口
  59.                     SendMessage(hwnd, WM_CLOSE, NULL, NULL);
  60.                 }
  61.             }
  62.         }
  63.         //遍历下一个
  64.         bMore = ::Process32Next(hProcessSnap, &pe32);
  65.     }
  66.     //清除snapshot对象
  67.     ::CloseHandle(hProcessSnap);
  68.     return 0;
  69. }

补充

此程序还不够完善,因为对于下面方法:

HWNDGetWindowHandleByPID(DWORD dwProcessID);

其根据进程ID,返回指定进程下”第一个”窗口的窗口句柄,但一个进程下可能会有多个窗口(如图6)。

图6

但对“WerFault.exe”进程,我在测试中发现(测试系统:Windows10 64位),当有多个程序出现”已停止工作“提示窗口时(图7),每个程序会各自对应一个”WerFault.exe”进程(图8)。即: 每个WerFault.exe进程下只会出现一个“已停止工作”窗口标题。

当然其他Windows系统我没有测试是否也是这样,以后有时间再进一步完善此程序。

图7

图8

参考文章:

修改DontshowUI默认值弹出窗口关闭小秘密

https://www.baidu.com/link?url=hTh2Zy1SF_KozYKGGdD_HRLEUm4jDB5Uv3jaN3GKP9L40M84F7pHl2upszMXA1lqeRgCpNAeusYrMLhr1rq_QlZ3G6qx5Ypc0CLoCZKQ7s3&wd=&eqid=e02a9f000000a25b0000000357ce11b6

VC++ 通过进程名或进程ID获取进程句柄

http://blog.csdn.net/luxiaoyu_sdc/article/details/6534783

VC 显示当前运行的所有进程

http://www.cnblogs.com/xianyunhe/archive/2011/06/09/2076878.html

分布式锁 redis实现分布式锁,分布式id生成方案,秒杀设计方案 - 战斗小人 - 博客园

mikel阅读(608)

来源: 分布式锁 redis实现分布式锁,分布式id生成方案,秒杀设计方案 – 战斗小人 – 博客园

分布式锁

作用:不同系统上的不同进程,去抢一把锁,谁抢到了,谁才能改数据

要求:高可用性,可冲入性(拿到锁的节点挂了,得有超时过期机制)

实现方式:

基于数据库实现分布式锁;

基于缓存(Redis等)实现分布式锁;

基于Zookeeper实现分布式锁;

 

redis实现分布式锁

复制代码
# 1 分布式锁: 锁住不同机器上的不同进程
# 2 redis实现:官方提供了    https://github.com/SPSCommerce/redlock-py

#Redlock    pip install redlock-py     安装,此为官方提供版

from redlock import Redlock

dlm = Redlock([{"host": "localhost", "port": 6379, "db": 0}, ])    # 创建锁管理器
# dlm = Redlock([{"host":"localhost",'port':6379,'db':0,'password':"admin123"},])

# 获取锁,my_resource_name是锁的唯一标识符。1000代表1000毫秒数,超过这个时间,锁自动释放(防机器宕机)
my_lock = dlm.lock("my_resource_name",1000)    
# 自己的逻辑---》悲观锁---你的代码
print("xxx")

#你写的业务
# 自己的逻辑---》悲观锁---你的代码
dlm.unlock(my_lock)    # 把锁释放了

原理:https://www.cnblogs.com/liuqingzheng/p/11080501.html  # 在4、分布式锁的简单实现代码中
# 也可以参考第三方库  https://github.com/glasslion/redlock
复制代码

 

分布式id生成方案

复制代码
# 分布式id:为了保证全局唯一
    -分表,默认自增--》两个库上--》可能id号重复       uuid:有没有重复(数据量极大的情况下可能会重复,概率较低)
# 设计思路:
  low版本的出来:
    -一个库 1,3,5,7,9
    -另一个库2,4,6,8,10
  分布式id生成方案
      -UUID:不是趋势自增,性能挺高(5台机器生成,一般不会重复)
    -mysql生成:性能低      机器都去mysql中要id
    -redis生成:很快,自增 ,必须还得有台redis服务器  incrby   16位: '当前时间戳+自增'  
    -雪花算法:python版雪花算法
    # 雪花算法是64位二进制数,第1位不用;41位是时间戳,可用69年;10位代表机器id(进程号),最大1024;12位表示4096个数字。雪花算法同一毫秒内最多产生4096个id
    # python实现雪花算法代码参考:https://www.cnblogs.com/oklizz/p/11865750.html
复制代码

秒杀设计方案

复制代码
# 思路一:    # 该方案适用于客户提前充好钱了,不适合连支付宝支付方案
    -某个时间段---》卖商品---》别卖超了---》mysql悲观锁实现---》缺陷,性能低
  -100商品---》预热---》100这个数,放到redis中----》incrby--》[来一个秒杀请求-1,在redis集合中把用户id放进去](加锁,使用分布式锁或者使用pipline做),最后100这个数变成了0,---》起个异步任务---》消费集合中的id,生成订单,扣减库存,扣减账户余额,提前充钱了
  -用户真去看订单的时候---》异步任务完成了

# 思路二:
  -用户发了秒杀请求---》前端看到--》您正在排队 
  -请求来了---》放到队列里---》(djnago中间件:请求放到队列中,直接返回,告诉用户,您正在排队)

## 有些公司潜规则,秒杀超了无所谓,优惠券而已
复制代码

 

如何将 Redis 用于微服务通信的事件存储 - 中间件小哥 - 博客园

mikel阅读(613)

来源: 如何将 Redis 用于微服务通信的事件存储 – 中间件小哥 – 博客园

以我的经验,将某些应用拆分成更小的、松耦合的、可协同工作的独立逻辑业务服务会更易于构建和维护。这些服务(也被称为微服务)各自管理自己的技术栈,因此很容易独立于其他服务进行开发和部署。前人已经总结了很多关于使用这种架构设计的好处,在此我就不再赘述了。关于这种设计,有一个方面我一直在重点关注,因为如果没有它,将会导致一些有趣的挑战。虽然构建松耦合的微服务是一个非常轻量级和快速的开发过程,但是这些服务之间共享状态、事件以及数据的通信模型却不那么简单。我使用过的最简单的通信模型就是服务间直接通信,但是这种模型被 Fernando Dogio 明确地证明一旦服务规模扩大就会失效,会导致服务崩溃、重载逻辑以及负载增加等问题,从而可能引起的巨大麻烦,因此应该尽量避免使用这种模型。还有一些其他通信模型,比如通用的发布/订阅模型、复杂的 kafka 事件流模型等,但是最近我在使用 Redis 构建微服务间的通信模型。

 

拯救者 Redis!

微服务通过网络边界发布状态,为了跟踪这种状态,事件通常需要被保存在事件存储中。由于事件通常是一种异步写入操作的不可变流的记录(又被称为事务日志),因此适用于以下场景:

1. 顺序很重要(时间序列数据)

2. 丢失一个事件会导致错误状态

3. 回放状态在任何给定时间点都是已知的

4. 写操作简单且快捷

5. 读操作需要更多的时间,以至于需要缓存

6. 需要高可扩展性,服务之间都是解耦的,没有关联

使用 Redis,我始终可以轻松实现发布-订阅模式。但现在,Redis 5.0 提供了新的Streams 数据类型,我们可以以一种更加抽象的方式对日志数据结构进行建模-使之成为时间序列数据的理想用例(例如最多一次或最少一次传递语义的事务日志)。基于双主功能,轻松简单的部署以及内存中的超快速处理能力,Redis 流成为一种管理大规模微服务通信的必备工具。基本的模型被称为命令查询职责分离(CQRS),它将命令和查询分开执行,命令使用 HTTP 协议,而查询采用 RESP(Redis 序列化协议)。让我们使用一个例子来说明如何使用 Redis 作为事件存储。

OrderShop简单应用概述

我创建了一个简单但是通用的电子商务应用作为例子。当创建/删除客户、库存物品或订单时,使用 RESP 将事件异步传递到 CRM 服务,以管理 OrderShop 与当前和潜在客户的互动。像许多常见应用程序的需求一样,CRM 服务可以在运行时启动和停止,而不会影响其他微服务。这需要捕获在其停机期间发送给它的所有消息以进行后续处理。
下图展示了 9 个解耦的微服务的互连性,这些微服务使用由 Redis 流构建的事件存储进行服务间通信。他们通过侦听事件存储(即 Redis 实例)中特定事件流上的任何新创建的事件来执行此操作。

图1. OrderShop 架构

 

我们的 OrderShop 应用程序的域模型由以下 5 个实体组成:

  • 顾客
  • 产品
  • 库存
  • 订单
  • 账单

通过侦听域事件并保持实体缓存为最新状态,事件存储的聚合功能仅需调用一次或在响应时调用。

 

 

图2. OrderShop 域模型

安装并运行OrderShop

按照如下步骤安装并运行 OrderShop 应用:

1. 从这里下载代码仓库:

https://github.com/Redislabs-Solution-Architects/ordershop

2. 确保已经安装了 Docker Engine和Docker Compose

3. 安装 Python3:

https://python-docs.readthedocs.io/en/latest/starting/install3/osx.html

4. 使用 docker-compose up启动应用程序

5. 使用 pip3 install -r client / requirements.txt 安装需求

6. 然后使用 python3 -m unittest client / client.py 执行客户端

7. 使用 docker-compose stop crm-service 停止 CRM 服务

8. 重新执行客户端,您会看到该应用程序正常运行,没有任何错误

 

深入了解

以下是来自 client.py 的一些简单测试用例,以及相应的 Redis 数据类型和键。

 

 

 

我选择流数据类型来保存这些事件,因为它们背后的抽象数据类型是事务日志,非常适合我们连续事件流的用例。我选择了不同的键来分配分区,并决定为每个流生成自己的条目 ID,ID 包含秒“-”微秒的时间戳(为了保持 ID 的唯一,并保留了键/分区之间事件的顺序)。我选择集合来存储 ID(UUID),并选择列表和哈希来对数据建模,因为它反映了它们的结构,并且实体缓存只是域模型的简单投影。

 

结论

Redis 提供的各种数据结构-包括集合,有序集合,哈希,列表,字符串,位数组,HyperLogLogs,地理空间索引以及现在的流-可以轻松适应任何数据模型。流包含的元素不仅是单个字符串,而且是由字段和值组成的对象。范围查询速度很快,并且流中的每个条目都有一个 ID,这是一个逻辑偏移量。流提供了针对时间序列等应用的解决方案,并可为其他应用提供流消息,例如,替换需要更高可靠性的通用发布/ 订阅应用程序,以及其他全新的应用。
您可以通过分片(聚集多个实例)来扩展 Redis 实例并提供容灾恢复的持久性选项,所以 Redis 可以作为企业级应用的选择。

【高并发】Redis如何助力高并发秒杀系统,看完这篇我彻底懂了!! - 冰河团队 - 博客园

mikel阅读(611)

来源: 【高并发】Redis如何助力高并发秒杀系统,看完这篇我彻底懂了!! – 冰河团队 – 博客园

写在前面

之前,我们在《【高并发】高并发秒杀系统架构解密,不是所有的秒杀都是秒杀!》一文中,详细讲解了高并发秒杀系统的架构设计,其中,我们介绍了可以使用Redis存储秒杀商品的库存数量。很多小伙伴看完后,觉得一头雾水,看完是看完了,那如何实现呢?今天,我们就一起来看看Redis是如何助力高并发秒杀系统的!

有关高并发秒杀系统的架构设计,小伙伴们可以关注 冰河技术 公众号,查看《【高并发】高并发秒杀系统架构解密,不是所有的秒杀都是秒杀!》一文。

秒杀业务

在电商领域,存在着典型的秒杀业务场景,那何谓秒杀场景呢。简单的来说就是一件商品的购买人数远远大于这件商品的库存,而且这件商品在很短的时间内就会被抢购一空。比如每年的618、双11大促,小米新品促销等业务场景,就是典型的秒杀业务场景。

秒杀业务最大的特点就是瞬时并发流量高,在电商系统中,库存数量往往会远远小于并发流量,比如:天猫的秒杀活动,可能库存只有几百、几千件,而瞬间涌入的抢购并发流量可能会达到几十到几百万。

所以,我们可以将秒杀系统的业务特点总结如下。

(1)限时、限量、限价

在规定的时间内进行;秒杀活动中商品的数量有限;商品的价格会远远低于原来的价格,也就是说,在秒杀活动中,商品会以远远低于原来的价格出售。

例如,秒杀活动的时间仅限于某天上午10点到10点半,商品数量只有10万件,售完为止,而且商品的价格非常低,例如:1元购等业务场景。

限时、限量和限价可以单独存在,也可以组合存在。

(2)活动预热

需要提前配置活动;活动还未开始时,用户可以查看活动的相关信息;秒杀活动开始前,对活动进行大力宣传。

(3)持续时间短

购买的人数数量庞大;商品会迅速售完。

在系统流量呈现上,就会出现一个突刺现象,此时的并发访问量是非常高的,大部分秒杀场景下,商品会在极短的时间内售完。

秒杀三阶段

通常,从秒杀开始到结束,往往会经历三个阶段:

  • 准备阶段:这个阶段也叫作系统预热阶段,此时会提前预热秒杀系统的业务数据,往往这个时候,用户会不断刷新秒杀页面,来查看秒杀活动是否已经开始。在一定程度上,通过用户不断刷新页面的操作,可以将一些数据存储到Redis中进行预热。
  • 秒杀阶段:这个阶段主要是秒杀活动的过程,会产生瞬时的高并发流量,对系统资源会造成巨大的冲击,所以,在秒杀阶段一定要做好系统防护。
  • 结算阶段: 完成秒杀后的数据处理工作,比如数据的一致性问题处理,异常情况处理,商品的回仓处理等。

Redis助力秒杀系统

我们可以在Redis中设计一个Hash数据结构,来支持商品库存的扣减操作,如下所示。

seckill:goodsStock:${goodsId}{
	totalCount:200,
	initStatus:0,
	seckillCount:0
}

在我们设计的Hash数据结构中,有三个非常主要的属性。

  • totalCount:表示参与秒杀的商品的总数量,在秒杀活动开始前,我们就需要提前将此值加载到Redis缓存中。
  • initStatus:我们把这个值设计成一个布尔值。秒杀开始前,这个值为0,表示秒杀未开始。可以通过定时任务或者后台操作,将此值修改为1,则表示秒杀开始。
  • seckillCount:表示秒杀的商品数量,在秒杀过程中,此值的上限为totalCount,当此值达到totalCount时,表示商品已经秒杀完毕。

我们可以通过下面的代码片段在秒杀预热阶段,将要参与秒杀的商品数据加载的缓存。

/**
 * @author binghe
 * @description 秒杀前构建商品缓存代码示例
 */
public class SeckillCacheBuilder{
    private static final String GOODS_CACHE = "seckill:goodsStock:"; 
    private String getCacheKey(String id) { 
        return  GOODS_CACHE.concat(id);
    } 
    public void prepare(String id, int totalCount) { 
        String key = getCacheKey(id); 
        Map<String, Integer> goods = new HashMap<>(); 
        goods.put("totalCount", totalCount); 
        goods.put("initStatus", 0); 
        goods.put("seckillCount", 0); 
        redisTemplate.opsForHash().putAll(key, goods); 
     }
}

秒杀开始的时候,我们需要在代码中首先判断缓存中的seckillCount值是否小于totalCount值,如果seckillCount值确实小于totalCount值,我们才能够对库存进行锁定。在我们的程序中,这两步其实并不是原子性的。如果在分布式环境中,我们通过多台机器同时操作Redis缓存,就会发生同步问题,进而引起“超卖”的严重后果。

在电商领域,有一个专业名词叫作“超卖”。顾名思义:“超卖”就是说卖出的商品数量比商品的库存数量多,这在电商领域是一个非常严重的问题。那么,我们如何解决“超卖”问题呢?

Lua脚本完美解决超卖问题

我们如何解决多台机器同时操作Redis出现的同步问题呢?一个比较好的方案就是使用Lua脚本。我们可以使用Lua脚本将Redis中扣减库存的操作封装成一个原子操作,这样就能够保证操作的原子性,从而解决高并发环境下的同步问题。

例如,我们可以编写如下的Lua脚本代码,来执行Redis中的库存扣减操作。

local resultFlag = "0" 
local n = tonumber(ARGV[1]) 
local key = KEYS[1] 
local goodsInfo = redis.call("HMGET",key,"totalCount","seckillCount") 
local total = tonumber(goodsInfo[1]) 
local alloc = tonumber(goodsInfo[2]) 
if not total then 
    return resultFlag 
end 
if total >= alloc + n  then 
    local ret = redis.call("HINCRBY",key,"seckillCount",n) 
    return tostring(ret) 
end 
return resultFlag

我们可以使用如下的Java代码来调用上述Lua脚本。

public int secKill(String id, int number) { 
    String key = getCacheKey(id); 
    Object seckillCount =  redisTemplate.execute(script, Arrays.asList(key), String.valueOf(number)); 
    return Integer.valueOf(seckillCount.toString()); 
}

这样,我们在执行秒杀活动时,就能够保证操作的原子性,从而有效的避免数据的同步问题,进而有效的解决了“超卖”问题。

消息队列与快递柜之间的奇妙关系 - 字母哥博客 - 博客园

mikel阅读(555)

来源: 消息队列与快递柜之间的奇妙关系 – 字母哥博客 – 博客园

提到消息队列可能一些朋友经常听别人说起一些名词,比如:服务程序解耦,处理流量削峰,通过异步处理提升用户体验,缓冲批处理提高处理性能。笔者擅于白话解说,所以我就不用专业的术语去解释专业的问题了。我一直觉得消息队列的功能和快递柜的功能非常相似,怎么个相似法呢?让我来详细给你说说。

一、白话消息队列

我们来将快递柜与消息队列做一个对比

  • 消息队列比作快递柜:有很多厂家生产快递柜,如:丰巢(apache kafka),速递易(alibaba RocketMQ),近邻宝(ActiveMQ)等等,反正常用的就这几个。快递柜负责临时保存邮件,消息队列负责临时保存消息数据。
  • 快递员比作消息生产者:快递员负责向快递柜投递邮件,生产者负责向消息队列投递消息。异曲同工之妙啊!
  • 消费者比作消息消费者: 可能是这个例子太贴切了,以至于这句怎么看都是废话。废话也还是要说,生活中的消费者取邮件,程序中的消费者取消息数据。

二、快递柜(消息队列)带来的好处

我们先回顾一下在没有快递柜的日子里是怎么度过的:某天早上突然接到快递员电话:”兄弟,有你的快递啊”。心里想真糟糕:“你早不来晚不来,我马上就要上班了,你这个时候来。好吧,我等你一会”。结果有可能快递员很不靠谱,一会说堵车,一会说马上到,等来等去你上班迟到了。这种情况让你很崩溃!

突然有一天,小区里突然出现了一个叫做快递柜的东西,这东西好啊。”兄弟,有你快递啊”,心想谁是你兄弟:”啊,你放快递柜里面吧,我晚上下班回来取”。快递员愉快的把快递放入快递柜,开始打下一个电话,一早上10个邮件。如果每个都送上门快递员最少要半小时。现在好了,9个放快递柜,1个用户要求送上门,10分钟就搞定了。快递员觉得这个东东真的很好!

快递员高兴了,消费者用户其实也很满意,有的购物狂一天有可能收10来个邮件。没有快递柜的时候,快递员来一个电话就去取一次(等一次)快递。有了快递柜,下班的时候就一起全都取了。上面的例子,体现了消息队列(快递柜)的几个优越性,请读者仔细品评:

  • 异步解耦:有了快递柜,消费者不用等待快递员,用户体验增强。消费者与生产者(快递员)之间解耦,不会因为对方的操作行为,影响自己独立处事的程序。用户不用疲于等待与接收事件阻塞耗时。
  • 流量削峰:我们假定一种极端的情况,你通过各个渠道买了1000本书,突然某一小时集中的给你打电话。你肯定不具备一个小时收1000个邮件的能力,所以你让快递员将邮件放入快递柜。所以你就可以按照自己的处理能力,按照自己的时间安排去取邮件。同样我们的消费者程序在面临多用户、高并发的请求情况下,将数据放入消息队列保存可以将流量数据削峰,按照程序能够处理的能力和资源进行数据消费。
  • 缓冲批处理:生产者批量投递,爽!消费者一次性取多个邮件,爽!

三、引入快递柜带来的缺点

说了这么多的优点,那么快递柜有没有缺点呢?当然有

  1. 引入复杂度。毫无疑问,快递柜(消息队列)这东西是多出来的,在原来的收取过程中是不存在的。所以需要地方放它,还需要防火、防盗防潮,需要去维护它。消息队列中间件也是一样的,你需要服务器区安装它,还要对它进行维护。
  2. 会导致暂时的数据不一致。 如果没有快递柜,你收到了邮件件就是真的收到了。但是使用快递柜之后,你收到了”邮件放入快递柜的消息”,但是与你真的取到邮件这中间会有一定的延时。当然你最终还是会取到邮件,选择”消息队列”快递柜,就是要忍受暂时不一致,接受”最终一致性”。
  3. 当然极端情况下,快递柜坏了,你要不可避免地接受”邮件可能会丢失”的事实,对于安保系数高的小区这几乎不会发生。

redis 分布式锁的 5个坑,真是又大又深 - YoungDeng - 博客园

mikel阅读(556)

来源: redis 分布式锁的 5个坑,真是又大又深 – YoungDeng – 博客园

引言

最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。

由于是做商城业务,要频繁的对商品库存进行扣减,应用是集群部署,为避免并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本以为给扣库存的代码加上锁lock.tryLock就万事大吉了

    /**
     * @author xiaofu
     * @description 扣减库存
     * @date 2020/4/21 12:10
     */
   public String stockLock() {
        RLock lock = redissonClient.getLock("stockLock");
        try {
            /**
             * 获取锁
             */
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
                /**
                 * 查询库存数
                 */
                Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
                /**
                 * 扣减库存
                 */
                if (stock > 0) {
                    stock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stockCount", stock.toString());
                    LOGGER.info("库存扣减成功,剩余库存数量:{}", stock);
                } else {
                    LOGGER.info("库存不足~");
                }
            } else {
                LOGGER.info("未获取到锁业务结束..");
            }
        } catch (Exception e) {
            LOGGER.info("处理异常", e);
        } finally {
            lock.unlock();
        }
        return "ok";
  }

结果业务代码执行完以后我忘了释放锁lock.unlock(),导致redis线程池被打满,redis服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。

随着 使用redis 锁的时间越长,我发现 redis 锁的坑远比想象中要多。就算在面试题当中redis分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。

今天就分享一下我用redis 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。

一、锁未被释放

这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用Jedis客户端会报如下的错误信息

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool

redis线程池已经没有空闲线程来处理客户端命令。

解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。

  public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
                TODO .........
          } else {
                // 释放当前redis连接
                redis.close();
                // 休眠1000毫秒
                sleep(1000);
          }
        }
    }

二、B的锁被A给释放了

我们知道Redis实现锁的原理在于 SETNX命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key 已经存在,则 SETNX不做任何动作,返回值为 0 。

SETNX key value

我们来设想一下这个场景:AB两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。

那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。

但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。

为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定valuekey,否则就会出现释放锁混乱的场景。

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

   @Transaction
   public void lock() {
   
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
            }
        }
    }

给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

    @Autowired
    DataSourceTransactionManager dataSourceTransactionManager;

    @Transaction
    public void lock() {
        //手动开启事务
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            while (true) {
                boolean flag = this.getLock(key);
                if (flag) {
                    insert();
                    //手动提交事务
                    dataSourceTransactionManager.commit(transactionStatus);
                }
            }
        } catch (Exception e) {
            //手动回滚事务
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }

四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,redis锁的过期时间再弄长点不就解决了吗?

那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

要是redis锁的过期时间能够自动续期就好了。

为了解决这个问题我们使用redis客户端redissonredisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。

redisson对分布式锁做了很好封装,只需调用API即可。

  RLock lock = redissonClient.getLock("stockLock");

redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。

举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边redisson的源码实现可以发现,不管是加锁解锁续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性

 
@Slf4j
@Service
public class RedisDistributionLockPlus {
 
    /**
     * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
     */
    private static final long DEFAULT_LOCK_TIMEOUT = 30;
 
    private static final long TIME_SECONDS_FIVE = 5 ;
 
    /**
     * 每个key的过期时间 {@link LockContent}
     */
    private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
 
    /**
     * redis执行成功的返回
     */
    private static final Long EXEC_SUCCESS = 1L;
 
    /**
     * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
     */
    private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
            "if redis.call('exists', KEYS[1]) == 0 then " +
               "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
               "for k, v in pairs(t) do " +
                 "if v == 'OK' then return tonumber(ARGV[2]) end " +
               "end " +
            "return 0 end";
 
    /**
     * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
     */
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "local ctime = tonumber(ARGV[2]) " +
            "local biz_timeout = tonumber(ARGV[3]) " +
            "if ctime > 0 then  " +
               "if redis.call('exists', KEYS[2]) == 1 then " +
                   "local avg_time = redis.call('get', KEYS[2]) " +
                   "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                   "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                   "else redis.call('del', KEYS[2]) end " +
               "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
            "end " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
    /**
     * 续约lua脚本
     */
    private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
 
 
    private final StringRedisTemplate redisTemplate;
 
    public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        ScheduleTask task = new ScheduleTask(this, lockContentMap);
        // 启动定时任务
        ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
    }
 
    /**
     * 加锁
     * 取到锁加锁,取不到锁一直等待知道获得锁
     *
     * @param lockKey
     * @param requestId 全局唯一
     * @param expire   锁过期时间, 单位秒
     * @return
     */
    public boolean lock(String lockKey, String requestId, long expire) {
        log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
        for (; ; ) {
            // 判断是否已经有线程持有锁,减少redis的压力
            LockContent lockContentOld = lockContentMap.get(lockKey);
            boolean unLocked = null == lockContentOld;
            // 如果没有被锁,就获取锁
            if (unLocked) {
                long startTime = System.currentTimeMillis();
                // 计算超时时间
                long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                String lockKeyRenew = lockKey + "_renew";
 
                RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                List<String> keys = new ArrayList<>();
                keys.add(lockKey);
                keys.add(lockKeyRenew);
                Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                if (null != lockExpire && lockExpire > 0) {
                    // 将锁放入map
                    LockContent lockContent = new LockContent();
                    lockContent.setStartTime(startTime);
                    lockContent.setLockExpire(lockExpire);
                    lockContent.setExpireTime(startTime + lockExpire * 1000);
                    lockContent.setRequestId(requestId);
                    lockContent.setThread(Thread.currentThread());
                    lockContent.setBizExpire(bizExpire);
                    lockContent.setLockCount(1);
                    lockContentMap.put(lockKey, lockContent);
                    log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                    return true;
                }
            }
            // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
            if (Thread.currentThread() == lockContentOld.getThread()
                      && requestId.equals(lockContentOld.getRequestId())){
                // 计数 +1
                lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                return true;
            }
 
            // 如果被锁或获取锁失败,则等待100毫秒
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // 这里用lombok 有问题
                log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
                return false;
            }
        }
    }
 
 
    /**
     * 解锁
     *
     * @param lockKey
     * @param lockValue
     */
    public boolean unlock(String lockKey, String lockValue) {
        String lockKeyRenew = lockKey + "_renew";
        LockContent lockContent = lockContentMap.get(lockKey);
 
        long consumeTime;
        if (null == lockContent) {
            consumeTime = 0L;
        } else if (lockValue.equals(lockContent.getRequestId())) {
            int lockCount = lockContent.getLockCount();
            // 每次释放锁, 计数 -1,减到0时删除redis上的key
            if (--lockCount > 0) {
                lockContent.setLockCount(lockCount);
                return false;
            }
            consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
        } else {
            log.info("释放锁失败,不是自己的锁。");
            return false;
        }
 
        // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
        lockContentMap.remove(lockKey);
 
        RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        keys.add(lockKeyRenew);
 
        Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                Long.toString(lockContent.getBizExpire()));
        return EXEC_SUCCESS.equals(result);
 
    }
 
    /**
     * 续约
     *
     * @param lockKey
     * @param lockContent
     * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
     */
    public boolean renew(String lockKey, LockContent lockContent) {
 
        // 检测执行业务线程的状态
        Thread.State state = lockContent.getThread().getState();
        if (Thread.State.TERMINATED == state) {
            log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
            return false;
        }
 
        String requestId = lockContent.getRequestId();
        long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
 
        RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
        log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
        return EXEC_SUCCESS.equals(result);
    }
 
 
    static class ScheduleExecutor {
 
        public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
            long delay = unit.toMillis(initialDelay);
            long period_ = unit.toMillis(period);
            // 定时执行
            new Timer("Lock-Renew-Task").schedule(task, delay, period_);
        }
    }
 
    static class ScheduleTask extends TimerTask {
 
        private final RedisDistributionLockPlus redisDistributionLock;
        private final Map<String, LockContent> lockContentMap;
 
        public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
            this.redisDistributionLock = redisDistributionLock;
            this.lockContentMap = lockContentMap;
        }
 
        @Override
        public void run() {
            if (lockContentMap.isEmpty()) {
                return;
            }
            Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
            for (Map.Entry<String, LockContent> entry : entries) {
                String lockKey = entry.getKey();
                LockContent lockContent = entry.getValue();
                long expireTime = lockContent.getExpireTime();
                // 减少线程池中任务数量
                if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                    //线程池异步续约
                    ThreadPool.submit(() -> {
                        boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                        if (renew) {
                            long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                            lockContent.setExpireTime(expireTimeNew);
                        } else {
                            // 续约失败,说明已经执行完 OR redis 出现问题
                            lockContentMap.remove(lockKey);
                        }
                    });
                }
            }
        }
    }
}

五、redis主从复制的坑

redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。

redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。

如果此时redis master节点宕机,为保证集群可用性,会进行主备切换slave变为了redis masterB客户端在新的master节点上加锁成功,而A客户端也以为自己还是成功加了锁的。

此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。

至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。

总结

上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。