[转载]使用Redis实现高并发分布式序列号生成服务 - 浮云-Mignet - 博客园

来源: [转载]使用Redis实现高并发分布式序列号生成服务 – 浮云-Mignet – 博客园

序列号的构成

为建立良好的数据治理方案,作数据掌握、分析、统计、商业智能等用途,业务数据的编码制定通常都会遵循一定的规则,一般来讲,都会有自己的编码规则和自增序列构成。比如我们常见的身份证号、银行卡号、社保电脑号等等。

以某公司产品标识码(代表该产品的唯一编码)的构成为例:

规则定义:商品款号(8位)+颜色号(3位)+号型码(3位) (共14位)

其标识码为:62X19001 001 46A

业务含义为: 2009年男装秋冬季仿毛套西黑色170A版

简单来讲,业务编码是由规则和序列构成,规则是允许定义和编辑的,序列通常要求并发安全。整个序列号生成规则要求读写并发安全。

序列号生成方案

由于Redis的高性能,高并发和数据一致性的保证,以及断电数据不丢失,分布式扩展能力等优势。我们采用Redis存储并持久化序列和业务规则来配置和管理整个序列号的产生。

规则定义举例:前缀+时间(YYYYMMDD)+所使用的序列(指定长度),那么产生的序列号类似于SO20150520000124

具体规则可根据实际业务需求来设计。

实现要求:整个生成过程使用Jedis完成,保证原子事务。并通过压力测试。

序列号实现方案

1.         规则配置管理

在Redis的设计中,要想实现比如

select * from users where user.location=”shanghai”

这样的查询,是没办法通过value进行比较得出结果的。但是可以通过不同的数据结构类型来做到这一点。比如如下的数据定义

users:1 {name:Jack,age:28,location:shanghai}

users:2 {name:Frank,age:30,location:beijing}

users:location:shanghai [1]

其中users:1 users:2 分别定义了两个用户信息,通过Redis中的hash数据结构,而users:location:shanghai 记录了所有上海的用户id,通过集合数据结构实现。

这样通过两次简单的Redis命令调用就可以实现我们上面的查询。

Jedis jedis = jedisPool.getResource();

Set<String> shanghaiIDs = jedis.smembers(“users:location:shanghai”); //遍历该set

 //… //通过hgetall获取对应的user信息

jedis.hgetAll(“users:” + shanghaiIDs[0]);

通过诸如以上的设计,可以实现简单的条件查询。但是这样的问题也很多,首先需要多维护一个ID索引的集合,其次对于一些 复杂查询无能为力(当然也不能期望Redis实现像关系数据库那样的查询,Redis不是干这的)。针对本序列号生成方案,这种方式完全是够用的,可以直 接参考本节的代码示例。

如果想更进一步,Redis2.6集成了Lua(Redis是用ANSI C写的,可以想象支持Lua是一件很自然的事),可以通过eval命令,直接在RedisServer环境中执行Lua脚本,就是说可以让你用Lua脚 本,对Redis中存储的key value进行操作,这个意义就大了,甚至可以将系统所需的各种业务写成一个个lua脚本,提前加载进入Redis,然后对于请求的响应,只需要调用一个 个lua脚本就行。(当然这些操作也完全可以使用Jedis来完成,但显然lua效率更高)

比如,现在我们要实现一个‘所有年龄(age)大于28岁的用户(user)’这样一个查询,那么通过以下的Lua脚本就可以实现

public static final String SCRIPT = “local resultKeys={};”

 + “for k,v in ipairs(KEYS) do “

 + ”  local tmp = redis.call(‘hget’, v, ‘age’);”

+ ”  if tmp > ARGV[1] then “

+ ”   table.insert(resultKeys,v);”

 + ”  end;”

 + ” end;”

 + “return resultKeys;”;

执行脚本代码

Jedis jedis = jedisPool.getResource();

jedis.auth(auth);

List<String> keys = Arrays.asList(allUserKeys);

List<String> args = new ArrayList<>();

args.add(“28”);

List<String> resultKeys = (List<String>)jedis.evalsha(funcKey, keys, args);

return resultKeys;

注意,以上的代码中使用的是evalsha命令,该命令参数的不是直接用Lua脚本字符串,而是提前已经加载到 Redis中的函数的一个SHA索引,通过以下的代码将系统中所有需要执行的函数提前加载到Redis中,通常在自己的系统中维护一个函数哈希表 funcTable,后续需要实现什么功能,就从函数表中获取对应功能的SHA索引,通过evalsha调用就行。

String shaFuncKey = jedis.scriptLoad(SCRIPT);//加载脚本到Redis中,获取sha索引 funcTable.put(funcName_age, shaFuncKey);//添加到系统维护的函数表中

通过以上的方法,便可以使较为复杂的查询放到Redis中去执行,提高效率。

可见,想要将全部业务代码都使用lua脚本来实现的业务系统是可能的,lua脚本等同于关系型数据库中的存储过程或者函数。当然,全部使用lua的开发成本未必不大,毕竟不是关系型数据库,存储思维不同。

代码示例:

//配置生成规则(CRUD):

//假设销售单号生成规则:prefix+time+seq

//生成之后的结果类似于:SO20150520023014

//——模拟常规数据库操作——

//添加数据

shardedJedis.hset(“rules”, “somaster”,

 “name:销售单号,prefix:SO,time:YYYYMMDD,seq:seq_so,seq_len:6”);

shardedJedis.hset(“rules”, “pomaster”,

 “name:采购单号,prefix:PO,time:YYYYMMDD,seq:seq_po,seq_len:6”);

shardedJedis.hset(“rules”, “test”, “name:test,prefix:PO,time:YYYYMMDD,seq:seq_po,seq_len:6”);

//判断某个值是否存在

System.out.println(shardedJedis.hexists(“rules”, “test”));

// 删除指定的值

System.out.println(shardedJedis.hdel(“rules”, “test”));

// 获取指定的值

System.out.println(shardedJedis.hget(“rules”, “somaster”));

// 获取所有的keys

System.out.println(shardedJedis.hkeys(“rules”));

// 获取所有的values

System.out.println(shardedJedis.hvals(“rules”));

//更新 = 插入同名的key

System.out.println(“update before:”+shardedJedis.hvals(“rules”));

System.out.println(shardedJedis.hset(“rules”, “somaster”, “new test somaster”));

System.out.println(“update after:”+shardedJedis.hvals(“rules”));

我示例代码中使用的是hash而不是直接使用key-value来存储,是更优的方案。至此CRUD都能直接满足了,最后,你获取到所有values,需要自己处理分页。

也可以使用list和set组合的方式存储。这种方式是将list index和set key对应起来,根据序号进行分页是容易的,但在每次新增和删除时,都需要修改序号和key的对应关系。

两者相比,使用hash的成本显然更低,也不易出错。

2.         序列号的使用

Redis中对序列的生成早已考虑周到,使用单线程操作序列的方式以保证并发安全,同时,使用也极其简单。更多操作详见官网API

代码示例

//sequence

System.out.println(“seq:”+shardedJedis.incr(“seq”));

System.out.println(“seq:”+shardedJedis.incr(“seq”));

System.out.println(“another_seq:”+shardedJedis.incr(“another_seq”));

最后,生成序列服务只需要通过对应的规则名,获取规则表达式,解析之后结合序列号,最终生成即可。

并发测试

这里我们使用CyclicBarrier做并发测试,CyclicBarrier会开启指定数量的线程,等待这些线程就绪之后,同时执行测试内容,以达到真实并发的测试目的。

Loadrunner等压力测试工具也能完成测试任务。

<div class="container">
<div class="line number1 index0 alt2"><code class="java keyword">public</code> <code class="java keyword">class</code> <code class="java plain">CyclicBarrierTest {</code></div>
<div class="line number2 index1 alt1"><code class="java spaces">    </code><code class="java comments">//初始化</code></div>
<div class="line number3 index2 alt2"><code class="java spaces">    </code><code class="java plain">JedisPoolTest test = </code><code class="java keyword">new</code> <code class="java plain">JedisPoolTest();</code></div>
<div class="line number4 index3 alt1"><code class="java spaces">    </code><code class="java keyword">public</code> <code class="java keyword">static</code> <code class="java keyword">void</code> <code class="java plain">main(String[] args) {</code></div>
<div class="line number5 index4 alt2"><code class="java spaces">         </code><code class="java keyword">int</code> <code class="java plain">count = </code><code class="java value">1000</code><code class="java plain">;</code></div>
<div class="line number6 index5 alt1"><code class="java spaces">         </code><code class="java plain">CyclicBarrier cyclicBarrier = </code><code class="java keyword">new</code> <code class="java plain">CyclicBarrier(count);</code></div>
<div class="line number7 index6 alt2"><code class="java spaces">         </code><code class="java plain">ExecutorService executorService = Executors.newFixedThreadPool(count);</code></div>
<div class="line number8 index7 alt1"><code class="java spaces">         </code><code class="java keyword">for</code> <code class="java plain">(</code><code class="java keyword">int</code> <code class="java plain">i = </code><code class="java value">0</code><code class="java plain">; i &lt; count; i++)</code></div>
<div class="line number9 index8 alt2"><code class="java spaces">              </code><code class="java plain">executorService.execute(</code><code class="java keyword">new</code> <code class="java plain">CyclicBarrierTest().</code><code class="java keyword">new</code> <code class="java plain">Task(cyclicBarrier));</code></div>
<div class="line number10 index9 alt1"></div>
<div class="line number11 index10 alt2"><code class="java spaces">         </code><code class="java plain">executorService.shutdown();</code></div>
<div class="line number12 index11 alt1"><code class="java spaces">         </code><code class="java keyword">while</code> <code class="java plain">(!executorService.isTerminated()) {</code></div>
<div class="line number13 index12 alt2"><code class="java spaces">              </code><code class="java keyword">try</code> <code class="java plain">{</code></div>
<div class="line number14 index13 alt1"><code class="java spaces">                   </code><code class="java plain">Thread.sleep(</code><code class="java value">10</code><code class="java plain">);</code></div>
<div class="line number15 index14 alt2"><code class="java spaces">              </code><code class="java plain">} </code><code class="java keyword">catch</code> <code class="java plain">(InterruptedException e) {</code></div>
<div class="line number16 index15 alt1"><code class="java spaces">                   </code><code class="java plain">e.printStackTrace();</code></div>
<div class="line number17 index16 alt2"><code class="java spaces">              </code><code class="java plain">}</code></div>
<div class="line number18 index17 alt1"><code class="java spaces">         </code><code class="java plain">}</code></div>
<div class="line number19 index18 alt2"><code class="java spaces">    </code><code class="java plain">}</code></div>
<div class="line number20 index19 alt1"></div>
<div class="line number21 index20 alt2"><code class="java spaces">    </code><code class="java keyword">public</code> <code class="java keyword">class</code> <code class="java plain">Task </code><code class="java keyword">implements</code> <code class="java plain">Runnable {</code></div>
<div class="line number22 index21 alt1"><code class="java spaces">         </code><code class="java keyword">private</code> <code class="java plain">CyclicBarrier cyclicBarrier;</code></div>
<div class="line number23 index22 alt2"></div>
<div class="line number24 index23 alt1"><code class="java spaces">         </code><code class="java keyword">public</code> <code class="java plain">Task(CyclicBarrier cyclicBarrier) {</code></div>
<div class="line number25 index24 alt2"><code class="java spaces">              </code><code class="java keyword">this</code><code class="java plain">.cyclicBarrier = cyclicBarrier;</code></div>
<div class="line number26 index25 alt1"><code class="java spaces">         </code><code class="java plain">}</code></div>
<div class="line number27 index26 alt2"></div>
<div class="line number28 index27 alt1"><code class="java spaces">         </code><code class="java color1">@Override</code></div>
<div class="line number29 index28 alt2"><code class="java spaces">         </code><code class="java keyword">public</code> <code class="java keyword">void</code> <code class="java plain">run() {</code></div>
<div class="line number30 index29 alt1"><code class="java spaces">              </code><code class="java keyword">try</code> <code class="java plain">{</code></div>
<div class="line number31 index30 alt2"><code class="java spaces">                   </code><code class="java comments">// 等待所有任务准备就绪</code></div>
<div class="line number32 index31 alt1"><code class="java spaces">                   </code><code class="java plain">cyclicBarrier.await();</code></div>
<div class="line number33 index32 alt2"><code class="java spaces">                   </code><code class="java comments">// 测试内容</code></div>
<div class="line number34 index33 alt1"><code class="java spaces">                </code><code class="java comments">// 待测试的url</code></div>
<div class="line number35 index34 alt2"><code class="java spaces">                    </code><code class="java plain">String host = </code><code class="java string">"<a href="http://172.25.2.14/seqno?">http://172.25.2.14/seqno?</a>"</code><code class="java plain">;</code></div>
<div class="line number36 index35 alt1"><code class="java spaces">                    </code><code class="java plain">String para = </code><code class="java string">"sysTemNo=ERP&amp;seqName=WH-ZONE-ID&amp;iVar=00"</code><code class="java plain">;</code></div>
<div class="line number37 index36 alt2"><code class="java spaces">                    </code><code class="java plain">System.out.println(host + para);</code></div>
<div class="line number38 index37 alt1"><code class="java spaces">                    </code><code class="java plain">URL url = </code><code class="java keyword">new</code> <code class="java plain">URL(host);</code></div>
<div class="line number39 index38 alt2"><code class="java spaces">                    </code><code class="java plain">HttpURLConnection connection = (HttpURLConnection) url.openConnection();</code></div>
<div class="line number40 index39 alt1"><code class="java spaces">                    </code><code class="java comments">// connection.setRequestMethod("POST");</code></div>
<div class="line number41 index40 alt2"><code class="java spaces">                    </code><code class="java comments">// connection.setRequestProperty("Proxy-Connection", "Keep-Alive");</code></div>
<div class="line number42 index41 alt1"><code class="java spaces">                    </code><code class="java plain">connection.setDoOutput(</code><code class="java keyword">true</code><code class="java plain">);</code></div>
<div class="line number43 index42 alt2"><code class="java spaces">                    </code><code class="java plain">connection.setDoInput(</code><code class="java keyword">true</code><code class="java plain">);</code></div>
<div class="line number44 index43 alt1"><code class="java spaces">                    </code><code class="java plain">PrintWriter out = </code><code class="java keyword">new</code> <code class="java plain">PrintWriter(connection.getOutputStream());</code></div>
<div class="line number45 index44 alt2"><code class="java spaces">                    </code><code class="java plain">out.print(para);</code></div>
<div class="line number46 index45 alt1"><code class="java spaces">                    </code><code class="java plain">out.flush();</code></div>
<div class="line number47 index46 alt2"><code class="java spaces">                    </code><code class="java plain">out.close();</code></div>
<div class="line number48 index47 alt1"><code class="java spaces">                    </code><code class="java plain">BufferedReader in = </code><code class="java keyword">new</code> <code class="java plain">BufferedReader(</code><code class="java keyword">new</code> <code class="java plain">InputStreamReader(connection.getInputStream()));</code></div>
<div class="line number49 index48 alt2"><code class="java spaces">                    </code><code class="java plain">String line = </code><code class="java string">""</code><code class="java plain">;</code></div>
<div class="line number50 index49 alt1"><code class="java spaces">                    </code><code class="java plain">String result = </code><code class="java string">""</code><code class="java plain">;</code></div>
<div class="line number51 index50 alt2"><code class="java spaces">                    </code><code class="java keyword">while</code> <code class="java plain">((line = in.readLine()) != </code><code class="java keyword">null</code><code class="java plain">) {</code></div>
<div class="line number52 index51 alt1"><code class="java spaces">                        </code><code class="java plain">result += line;</code></div>
<div class="line number53 index52 alt2"><code class="java spaces">                    </code><code class="java plain">}</code></div>
<div class="line number54 index53 alt1"><code class="java spaces">                    </code><code class="java plain">System.out.println(result);</code></div>
<div class="line number55 index54 alt2"><code class="java comments">//                   System.out.println(test.getJedis().incr("seq"));</code></div>
<div class="line number56 index55 alt1"><code class="java comments">//                System.out.println(test.getShardedJedis().incr("seq"));</code></div>
<div class="line number57 index56 alt2"><code class="java spaces">              </code><code class="java plain">} </code><code class="java keyword">catch</code> <code class="java plain">(Exception e) {</code></div>
<div class="line number58 index57 alt1"><code class="java spaces">                   </code><code class="java plain">e.printStackTrace();</code></div>
<div class="line number59 index58 alt2"><code class="java spaces">              </code><code class="java plain">}</code></div>
<div class="line number60 index59 alt1"><code class="java spaces">         </code><code class="java plain">}</code></div>
<div class="line number61 index60 alt2"><code class="java spaces">    </code><code class="java plain">}</code></div>
<div class="line number62 index61 alt1"><code class="java plain">}</code></div>
<div class="line number62 index61 alt1">

测试结果:

单台Redis未经任何设置,500并发100% pass,到1000并发时只有67%pass率,此时存在连接超时和被拒的情形。但不存在任何重复号码或丢失号码。500并发数其实已经完全满足我当前 系统的要求。考虑到Redis本身可以集群扩展,完全能够应对将来更高的并发需求。

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏