干掉RedisHelper,请这样用分布式缓存 - 掘金

mikel阅读(646)

来源: 干掉RedisHelper,请这样用分布式缓存 – 掘金

前言

我们在项目中使用Redis时通常是写一个单例模式的RedisHelper静态类,暴露一些常用的GetSet等操作,在需要使用地方直接RedisHelper.StringGet(xx,xx)就可以了,这样虽然简单粗暴地满足我们对Redis的所有操作需要,但是这在ASP.NET Core的项目显得不是那么优雅了。首先你的RedisHelper静态类无法使用ASP.NET Core容器,又如何优雅的通过依赖注入获取IConfiguration中的配置项呢?既然我们使用Asp.Net Core这么优秀的框架,最佳实践当然就是遵循官方建议的开发规范优雅的编写代码。

IDistributedCache

若要使用 SQL Server 分布式缓存,请添加对 Microsoft.Extensions.Caching.SQLServer 包的包引用。

若要使用 Redis 分布式缓存,请添加对 Microsoft.Extensions.Caching.StackExchangeRedis 包的包引用。

若要使用 NCache 分布式缓存,请添加对 NCache.Microsoft.Extensions.Caching.OpenSource 包的包引用。

无论选择哪种实现,应用都将使用 IDistributedCache 接口与缓存进行交互。

来看下IDistributedCache这个接口的定义

namespace Microsoft.Extensions.Caching.Distributed;

/// <summary>
/// Represents a distributed cache of serialized values.
/// </summary>
public interface IDistributedCache
{
    /// <summary>
    /// Gets a value with the given key.
    /// </summary>
    byte[]? Get(string key);

    /// <summary>
    /// Gets a value with the given key.
    /// </summary>
    Task<byte[]?> GetAsync(string key, CancellationToken token = default(CancellationToken));

    void Set(string key, byte[] value, DistributedCacheEntryOptions options);

    /// <summary>
    /// Sets the value with the given key.
    /// </summary>
    Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default(CancellationToken));

    /// <summary>
    /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
    /// </summary>
    void Refresh(string key);

    /// <summary>
    /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
    /// </summary>
    Task RefreshAsync(string key, CancellationToken token = default(CancellationToken));

    /// <summary>
    /// Removes the value with the given key.
    /// </summary>
    void Remove(string key);

    /// <summary>
    /// Removes the value with the given key.
    /// </summary>
    Task RemoveAsync(string key, CancellationToken token = default(CancellationToken));
}
复制代码

IDistributedCache 接口提供以下方法来处理分布式缓存实现中的项:

  • GetGetAsync:如果在缓存中找到,则接受字符串键并以 byte[] 数组的形式检索缓存项。
  • SetSetAsync:使用字符串键将项(作为 byte[] 数组)添加到缓存。
  • RefreshRefreshAsync:根据键刷新缓存中的项,重置其可调到期超时(如果有)。
  • RemoveRemoveAsync:根据字符串键删除缓存项。

干掉RedisHelper

官方不仅提出了如何最佳实践分布式缓存的使用,还提供了基本的实现库给我们直接用,比如我们在项目中用Redis为我们提供缓存服务:

  1. 添加引用Microsoft.Extensions.Caching.StackExchangeRedis
  2. 注册容器AddStackExchangeRedisCache,并配置参数
 builder.Services.AddStackExchangeRedisCache(options =>
     {
         options.Configuration = builder.Configuration.GetConnectionString("MyRedisConStr");
         options.InstanceName = "SampleInstance";
     });
复制代码
  1. 在需要使用Redis的地方通过构造函数注入IDistributedCache实例调用即可

这样就可以优雅的使用Redis了,更加符合Asp.Net Core的设计风格,养成通过容器注入的方式来调用我们的各种服务,而不是全局使用RedisHelper静态类,通过IOC的方式,结合面向接口开发,能方便的替换我们的实现类,统一由容器提供对象的创建,这种控制反转带来的好处只可意会不可言传,这里就不赘述了。

AddStackExchangeRedisCache到底干了什么

上面已经知道如何优雅的使用我们的Redis了,但是不看下源码就不知道底层实现,总是心里不踏实的。

源码比较好理解的,因为这个Nuget包的源码也就四个类,而上面注册容器的逻辑也比较简单
AddStackExchangeRedisCache主要干的活

// 1.启用Options以使用IOptions
services.AddOptions();
// 2.注入配置自定义配置,可以通过IOptions<T>注入到需要使用该配置的地方
services.Configure(setupAction);
// 3.注入一个单例IDistributedCache的实现类RedisCache
services.Add(ServiceDescriptor.Singleton<IDistributedCache, RedisCache>());
复制代码

所以我们在需要用Redis的地方通过构造函数注入IDistributedCache,而它对应的实现就是RedisCache,那看下它的源码。

这里就不细看所有的实现了,重点只需要知道它继承了IDistributedCache就行了,通过AddStackExchangeRedisCache传入的ConnectionString,实现IDistributedCacheGetSetRefreshRemove四个核心的方法,我相信这难不倒你,而它也就是干了这么多事情,只不过它的实现有点巧妙。 通过LUA脚本和HSET数据结构实现,HashKey是我们传入的InstanceName+key,做了一层包装。

源码中还有需要注意的就是,我们要保证Redis连接对象IConnectionMultiplexer的单例,不能重复创建多个实例,这个想必在RedisHelper中也是要保证的,而且是通过lock来实现的。

然而微软不是那么用的,玩了个花样,注意下面的_connectionLock.Wait();

private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

[MemberNotNull(nameof(_cache), nameof(_connection))]
private void Connect()
{
    CheckDisposed();
    if (_cache != null)
    {
        Debug.Assert(_connection != null);
        return;
    }

    _connectionLock.Wait();
    try
    {
        if (_cache == null)
        {
            if (_options.ConnectionMultiplexerFactory == null)
            {
                if (_options.ConfigurationOptions is not null)
                {
                    _connection = ConnectionMultiplexer.Connect(_options.ConfigurationOptions);
                }
                else
                {
                    _connection = ConnectionMultiplexer.Connect(_options.Configuration);
                }
            }
            else
            {
                _connection = _options.ConnectionMultiplexerFactory().GetAwaiter().GetResult();
            }

            PrepareConnection();
            _cache = _connection.GetDatabase();
        }
    }
    finally
    {
        _connectionLock.Release();
    }

    Debug.Assert(_connection != null);
}
复制代码

通过SemaphoreSlim限制同一时间只能有一个线程能访问_connectionLock.Wait();后面的代码。

学到装逼技巧+1

思考

IDistributedCache只有四个操作:GetSetRefreshRemove,我们表示很希望跟着官方走,但这个接口过于简单,不能满足我的其他需求咋办?
比如我们需要调用 StackExchange.Redis封装的LockTake,LockRelease来实现分布式锁的功能,那该怎么通过注入IDistributedCache调用? 我们可以理解官方上面是给我们做了示范,我们完全可以自己定义一个接口,比如:

public interface IDistributedCachePlus : IDistributedCache
{
    bool LockRelease(string key, byte[] value);

    bool LockTake(string key, byte[] value, TimeSpan expiry);
}
复制代码

继承IDistributedCache,对其接口进行增强,然后自己实现实现AddStackExchangeRedisCache的逻辑,我们不用官方给的实现,但是我们山寨官方的思路,实现任意标准的接口,满足我们业务。

services.Add(ServiceDescriptor.Singleton<IDistributedCachePlus, RedisCachePlus>());
复制代码

在需要使用缓存的地方通过构造函数注入IDistributedCachePlus

总结

官方提供的IDistributedCache标准及其实现类库,能方便的实现我们对缓存的简单的需求,通过遵循官方的建议,我们干掉了RedisHelper,优雅的实现了分布式Redis缓存的使用,你觉得这样做是不是很优雅呢?

作者:SpringHgui
链接:https://juejin.cn/post/7099476430001537060
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Redis的五种数据类型的简单介绍和使用 - DingYu - 博客园

mikel阅读(514)

来源: Redis的五种数据类型的简单介绍和使用 – DingYu – 博客园

1.准备工作:

1.1在Linux下安装Redis 

https://www.cnblogs.com/dddyyy/p/9763098.html

  1.2启动Redis

先把root/redis的redis.conf放到 /usr/local/redis/bin/目录下

使用vi 修改redis.conf 把daemonize no 变成daemonize yes

启动 ./redis-server redis.conf

测试一下 ps -ef|grep redis

   1.3连接客户端

2.第一种类型:String Key-Value

set key value 设置一个key 值为 value 

get key 获得key值得value 

注意:redis中的Key和Value时区分大小写的,命令不区分大小写, redis是单线程 不适合存储大容量的数据

incr key      —对应的value 自增1,如果没有这个key值 自动给你创建创建 并赋值为1

decr key     —对应的value 自减1

注意:自增的value是可以转成数字的

3.第二种类型:Hash:key-filed-value

相当于1个key 对应一个map

hset key filed value 设置值

hget key filed  获取值

 

4.第三种类型:List

List 有顺序可重复

lpush list 1  2  3  4 从左添加元素

rpush list 1 2 3 4    从右添加元素

lrange list 0 -1 (从0 到-1 元素查看:也就表示查看所有)

lpop list (从左边取,删除)

rpop list  (从右边取,删除)

5.第四种类型 :Set

Set 无顺序,不能重复

sadd set1 a b c d d (向set1中添加元素) 元素不重复

smembers set1 (查询元素)

srem set1 a (删除元素)

6.第五种类型:SortedSet(zset)

有顺序,不能重复

适合做排行榜 排序需要一个分数属性

zadd zset1 9 a 8 c 10 d 1 e   (添加元素 zadd key score member )

(ZRANGE key start stop [WITHSCORES])(查看所有元素:zrange key  0  -1  withscores)

如果要查看分数,加上withscores.

zrange zset1 0 -1 (从小到大)

zrevrange zset1 0 -1 (从大到小)

zincrby zset2 score member (对元素member 增加 score)

复制代码
127.0.0.1:6379> zadd zset1 8 a 4 b 5 c 1 d
(integer) 4
127.0.0.1:6379> zrange zset1 0 -1 
1) "d"
2) "b"
3) "c"
4) "a"
127.0.0.1:6379> zadd zset1 9 a
(integer) 0
127.0.0.1:6379> zrange zset1 0 -1 
1) "d"
2) "b"
3) "c"
4) "a"
127.0.0.1:6379> zrange zset1 0 -1 withscores
1) "d"
2) "1"
3) "b"
4) "4"
5) "c"
6) "5"
7) "a"
8) "9"
127.0.0.1:6379> zrevrange zset1 0 -1
1) "a"
2) "c"
3) "b"
4) "d"
127.0.0.1:6379> zincrby zset1 1 a
"10"
127.0.0.1:6379> zrevrange zset1 0 -1 withscores
1) "a"
2) "10"
3) "c"
4) "5"
5) "b"
6) "4"
7) "d"
8) "1"
复制代码

7.Key命令

expire key second  (设置key的过期时间)

ttl key (查看剩余时间)(-2 表示不存在,-1 表示已被持久化,正数表示剩余的时间)

persist key (清除过期时间,也即是持久化 持久化成功体提示 1 不成功0)。

del key: 删除key

select 0 表示:选择0号数据库。默认是0号数据库

c#使用csredis操作redis - 小y - 博客园

mikel阅读(1014)

来源: c#使用csredis操作redis – 小y – 博客园

现在流行的redis连接客户端有StackExchange.Redis和ServiceStack.Redis,为什么选择csredis而不是这两个?

  • .net 最有名望的 ServiceStack.Redis 早已沦为商业用途,在 .NETCore 中使用只能充值;
  • 后来居上的 StackExchange.Redis 虽然能用,但线上各种 Timeout 错误把人坑到没脾气,两年多两年多两年多都不解决,最近发布的 2.0 版本不知道是否彻底解决了底层。
  • csredis支持.net40/.net45/.netstandard2.0,基本上满足了常见运行平台,而上面两个基本已经放弃.net40了。
  • csredis所有方法名与redis-cli保持一持,很容易上手!!!

环境:

csredis 源码地址: https://github.com/2881099/csredis

 

以windows服务安装Redis方法:

下载Redis服务安装包:https://github.com/tporadowski/redis/releases

下载完成后直接点击.exe下一步下一步OK。安装完后我们会在windows服务中找到Redis Service服务。注意启动服务后在进行相关测试。

1.在.net项目中引入CSRedisCore

包安装命令:

Install-Package CSRedisCore -Version 3.6.5

 

2.使用:

复制代码
//初始化RedisHelper对象
var csredis = new CSRedis.CSRedisClient("127.0.0.1:6379, defaultDatabase = 0, poolsize = 500, ssl = false, writeBuffer = 10240");
RedisHelper.Initialization(csredis);


//-------------字符串(string)----------------
// 添加字符串键-值对
csredis.Set("hello", "1");
csredis.Set("world", "2");
csredis.Set("hello", "3");

// 根据键获取对应的值
csredis.Get("hello");

// 移除元素
csredis.Del("world");

/*    数值操作    */
csredis.Set("num-key", "24");

// value += 5
csredis.IncrBy("num-key",5); 
// output -> 29

// value -= 10
csredis.IncrBy("num-key", -10); 
// output -> 19

/*    字节串操作    */
csredis.Set("string-key", "hello ");

// 在指定key的value末尾追加字符串
csredis.Append("string-key", "world"); 
// output -> "hello world"

// 获取从指定范围所有字符构成的子串(start:3,end:7)
csredis.GetRange("string-key",3,7)  
// output ->  "lo wo"
    
// 用新字符串从指定位置覆写原value(index:4)
csredis.SetRange("string-key", 4, "aa"); 
// output -> "hellaaword"





//-----------------列表(list)----------------
// 从右端推入元素
csredis.RPush("my-list", "item1", "item2", "item3", "item4"); 
// 从右端弹出元素
csredis.RPop("my-list");
// 从左端推入元素
csredis.LPush("my-list","LeftPushItem");
// 从左端弹出元素
csredis.LPop("my-list");

// 遍历链表元素(start:0,end:-1即可返回所有元素)
foreach (var item in csredis.LRange("my-list", 0, -1))
{
    Console.WriteLine(item);
}
// 按索引值获取元素(当索引值大于链表长度,返回空值,不会报错)
Console.WriteLine($"{csredis.LIndex("my-list", 1)}"); 

// 修剪指定范围内的元素(start:4,end:10)
csredis.LTrim("my-list", 4, 10);

// 将my-list最后一个元素弹出并压入another-list的头部
csredis.RPopLPush("my-list", "another-list");





//------------------集合(set)----------------
// 实际上只插入了两个元素("item1","item2")
csredis.SAdd("my-set", "item1", "item1", "item2"); 

// 集合的遍历
foreach (var member in csredis.SMembers("my-set"))
{
    Console.WriteLine($"集合成员:{member.ToString()}");
}

// 判断元素是否存在
string member = "item1";
Console.WriteLine($"{member}是否存在:{csredis.SIsMember("my-set", member)}"); 
// output -> True

// 移除元素
csredis.SRem("my-set", member);
Console.WriteLine($"{member}是否存在:{csredis.SIsMember("my-set", member)}"); 
// output ->  False

// 随机移除一个元素
csredis.SPop("my-set");

csredis.SAdd("set-a", "item1", "item2", "item3","item4","item5");
csredis.SAdd("set-b", "item2", "item5", "item6", "item7");

// 差集
csredis.SDiff("set-a", "set-b"); 
// output -> "item1", "item3","item4"

// 交集
csredis.SInter("set-a", "set-b"); 
// output -> "item2","item5"

// 并集
csredis.SUnion("set-a", "set-b");
// output -> "item1","item2","item3","item4","item5","item6","item7"







//------------------散列(hashmap)----------------
// 向散列添加元素
csredis.HSet("ArticleID:10001", "Title", "了解简单的Redis数据结构");
csredis.HSet("ArticleID:10001", "Author", "xscape");
csredis.HSet("ArticleID:10001", "PublishTime", "2019-01-01");

// 根据Key获取散列中的元素
csredis.HGet("ArticleID:10001", "Title");

// 获取散列中的所有元素
foreach (var item in csredis.HGetAll("ArticleID:10001"))
{
    Console.WriteLine(item.Value);
}

//HMGet和HMSet是他们的多参数版本,一次可以处理多个键值对
var keys = new string[] { "Title","Author","publishTime"};
csredis.HMGet("ID:10001", keys);

//和处理字符串一样,我们也可以对散列中的值进行自增、自减操作,原理同字符串是一样的
csredis.HSet("ArticleID:10001", "votes", "257");
csredis.HIncrBy("ID:10001", "votes", 40);
// output -> 297




//------------------有序集合----------------
// 向有序集合添加元素
csredis.ZAdd("Quiz", (79, "Math"));
csredis.ZAdd("Quiz", (98, "English"));
csredis.ZAdd("Quiz", (87, "Algorithm"));
csredis.ZAdd("Quiz", (84, "Database"));
csredis.ZAdd("Quiz", (59, "Operation System"));

//返回集合中的元素数量
csredis.ZCard("Quiz");

// 获取集合中指定范围(90~100)的元素集合
csredis.ZRangeByScore("Quiz",90,100);

// 获取集合所有元素并升序排序
csredis.ZRangeWithScores("Quiz", 0, -1);

// 移除集合中的元素
csredis.ZRem("Quiz", "Math");

//Key的过期
redis.Set("MyKey", "hello,world");
Console.WriteLine(redis.Get("MyKey")); 
// output -> "hello,world"

redis.Expire("MyKey", 5); // key在5秒后过期,也可以使用ExpireAt方法让它在指定时间自动过期

Thread.Sleep(6000); // 线程暂停6秒
Console.WriteLine(redis.Get("MyKey"));
// output -> ""
复制代码

https://www.cnblogs.com/xscape/p/10208638.html

3.高级玩法:发布订阅

复制代码
//普通订阅
rds.Subscribe(
  ("chan1", msg => Console.WriteLine(msg.Body)),
  ("chan2", msg => Console.WriteLine(msg.Body)));

//模式订阅(通配符)
rds.PSubscribe(new[] { "test*", "*test001", "test*002" }, msg => {
  Console.WriteLine($"PSUB   {msg.MessageId}:{msg.Body}    {msg.Pattern}: chan:{msg.Channel}");
});
//模式订阅已经解决的难题:
//1、分区的节点匹配规则,导致通配符最大可能匹配全部节点,所以全部节点都要订阅
//2、本组 "test*", "*test001", "test*002" 订阅全部节点时,需要解决同一条消息不可执行多次

//发布
rds.Publish("chan1", "123123123");
//无论是分区或普通模式,rds.Publish 都可以正常通信
复制代码

4.高级玩法:缓存壳

复制代码
//不加缓存的时候,要从数据库查询
var t1 = Test.Select.WhereId(1).ToOne();

//一般的缓存代码,如不封装还挺繁琐的
var cacheValue = rds.Get("test1");
if (!string.IsNullOrEmpty(cacheValue)) {
    try {
        return JsonConvert.DeserializeObject(cacheValue);
    } catch {
        //出错时删除key
        rds.Remove("test1");
        throw;
    }
}
var t1 = Test.Select.WhereId(1).ToOne();
rds.Set("test1", JsonConvert.SerializeObject(t1), 10); //缓存10秒

//使用缓存壳效果同上,以下示例使用 string 和 hash 缓存数据
var t1 = rds.CacheShell("test1", 10, () => Test.Select.WhereId(1).ToOne());
var t2 = rds.CacheShell("test", "1", 10, () => Test.Select.WhereId(1).ToOne());
var t3 = rds.CacheShell("test", new [] { "1", "2" }, 10, notCacheFields => new [] {
  ("1", Test.Select.WhereId(1).ToOne()),
  ("2", Test.Select.WhereId(2).ToOne())
});
复制代码

5.高级玩法:管道

使用管道模式,打包多条命令一起执行,从而提高性能。

var ret1 = rds.StartPipe().Set("a", "1").Get("a").EndPipe();
var ret2 = rds.StartPipe(p => p.Set("a", "1").Get("a"));

var ret3 = rds.StartPipe().Get("b").Get("a").Get("a").EndPipe();
//与 rds.MGet("b", "a", "a") 性能相比,经测试差之毫厘

6.高级玩法:多数据库

var connectionString = "127.0.0.1:6379,password=123,poolsize=10,ssl=false,writeBuffer=10240,prefix=key前辍";
var redis = new CSRedisClient[14]; //定义成单例
for (var a = 0; a< redis.Length; a++) redis[a] = new CSRedisClient(connectionString + "; defualtDatabase=" + a);

//访问数据库1的数据
redis[1].Get("test1");

 

7.性能比拼

 

 

Redis的五种数据类型的简单介绍和使用 https://www.cnblogs.com/dddyyy/p/9803828.html

.net 5.0 - 使用CSRedisCore操作redis - gygtech - 博客园

mikel阅读(783)

来源: .net 5.0 – 使用CSRedisCore操作redis – gygtech – 博客园

 为什么选择CSRedisCore

ServiceStack.Redis 是商业版,免费版有限制;

StackExchange.Redis 是免费版,但是内核在 .NETCore 运行有问题经常 Timeout,暂无法解决;

CSRedis于2016年开始支持.NETCore一直迭代至今,实现了低门槛、高性能,和分区高级玩法的.NETCore redis-cli SDK;

在v3.0版本更新中,CSRedis中的所有方法名称进行了调整,使其和redis-cli保持一致,如果你熟悉redis-cli的命令的话,CSRedis可以直接上手,这样学习成本就降低很多。

 如何集成:引用和配置
  •  引用包
1
CSRedisCore
  •  appsettings.json
1
2
3
4
5
6
7
8
9
10
{
  //Redis服务配置
  "Redis": {
    "Default": {
      "Connection": "192.168.1.101:6379",
      "InstanceName": "local",
      "DefaultDB": 0
    }
  }
}
 如何集成:redis 控制台 引用方式
  •  初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using CSRedis;
namespace RedisCommon
{
    public class RedisInit
    {
        public static void RedisInitialization()
        {
            string connection = Appsettings.Instance.GetByKey("Redis:Default:Connection");
            string defaultDB = Appsettings.Instance.GetByKey("Redis:Default:DefaultDB");
            var csRedis = new CSRedisClient($"{connection},defaultDatabase={defaultDB},prefix=test");
            RedisHelper.Initialization(csRedis);
        }
    }
}

 

1
2
3
4
5
static void Main(string[] args)
{
    //初始化Redis
    RedisInit.RedisInitialization();   
}
 如何集成:redis webapi 引用方式
  •  StartUp类配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/// <summary>
/// 初始化Redis缓存
/// </summary>
private void InitRedis()
{
    //redis缓存
    var section = Configuration.GetSection("Redis:Default");
    //连接字符串
    string _connectionString = section.GetSection("Connection").Value;
    //默认数据库
    int _defaultDB = int.Parse(section.GetSection("DefaultDB").Value ?? "0");
    var csredis = new CSRedis.CSRedisClient($"{_connectionString},defaultDatabase={_defaultDB},idleTimeout=3000,poolsize=5,prefix=GYG-API:KEY_");
    RedisHelper.Initialization(csredis);
}

 链接字符串详解

127.0.0.1:6379,password=YourPassword,defaultDatabase=0,prefix=hr_

Parameter   Default Explain  说明 
 password  <Empty>  Redis server password  Redis服务器密码
 defaultDatabase  0  Redis server database  Redis服务器数据库
 asyncPipeline  false  The asynchronous method automatically uses pipeline, and the 10W concurrent time is 450ms (welcome to feedback)  异步方式自动使用管道,10W并发时间450ms(欢迎反馈)
 poolsize  50  Connection pool size  连接池大小
 idleTimeout  20000  idle time of elements in the connection pool(MS),suitable for connecting to remote redis server  连接池中元素的空闲时间(MS),适合连接到远程redis服务器
 connectTimeout  5000  Connection timeout(MS)  连接超时(毫秒)
 syncTimeout  10000  Send / receive timeout(MS)  发送/接收超时(毫秒)
 preheat  5  Preheat connections, receive values such as preheat = 5 preheat 5 connections  预热连接,接收值,例如Preheat=5 Preheat 5 connections
 autoDispose  true  Follow system exit event to release automatically  跟随系统退出事件自动释放
 ssl  false  Enable encrypted transmission  启用加密传输
 testcluster  true  是否尝试集群模式,阿里云、腾讯云集群需要设置此选项为false
 tryit  0  Execution error, retry  attempts  执行错误,重试次数
 name  <Empty>  Connection name, use client list command to view  连接名称,使用client list命令查看
 prefix  <Empty>  key前缀,所有方法都会附带此前缀,csredis.Set(prefix + “key”, 111);
 0、通用指令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//查找所有分区节点中符合给定模式(pattern)的 key
string[] keyAll = RedisHelper.Keys("*");
            
//以秒为单位,返回给定 key 的剩余生存时间
long ttl1 = RedisHelper.Ttl("keyString1");
//用于在 key 存在时删除 key
long del1 = RedisHelper.Del("keyString1");
//检查给定 key 是否存在
bool isExists1 = RedisHelper.Exists("keyString1");
//为给定 key 设置过期时间
bool isExpire1 = RedisHelper.Expire("keyString1", 100);
//为给定 key 设置过期时间
RedisHelper.ExpireAt("keyString1", new DateTime(2021, 6, 11, 16, 0, 0));
 1、string(字符串)
  • 简单操作
1
2
3
4
5
6
7
8
9
10
11
// 设置指定 key 值,默认不过期
bool set_string1 = RedisHelper.Set("keyString_String1", "测试值1");
// 设置指定 key 值,并设置过期时间(单位:秒)
bool set_string2 = RedisHelper.Set("keyString_String2", "测试值2", 1);
// 获取指定 key 的值,不存在的 key,值返回null
string get_string1 = RedisHelper.Get("keyString_String1");
// 获取指定 key 的值,不存在的 key,或者指定的 key 不是int型,则返回int类型的默认值0
int get_int1 = RedisHelper.Get<int>("keyString_String1");
  • 对整数类型进行自增,自减操作
1
2
3
4
5
6
7
bool set_int1 = RedisHelper.Set("keyString_Num1", "23");
// 将 key 所储存的值加上指定的增量值(increment)
long incrBy1 = RedisHelper.IncrBy("keyString_Num1", 2);// #25
// 将 key 所储存的值加上指定的增量值(increment),负数就是减量值
long incrBy2 = RedisHelper.IncrBy("keyString_Num1", -1);// #24
  • 在指定 key 的 value 末尾追加字符串
1
2
3
4
bool set_append1 = RedisHelper.Set("keyString_Append1", "qaz", 30);
// 将指定的 value 追加到该 key 原来值(value)的末尾
long append1 = RedisHelper.Append("keyString_Append1", "wsx");// #6 结果:key 中字符串的长度
 2、hash(哈希)
  • #HSet、HGet、HDel方法 [只能处理一个键值对]
1
2
3
4
5
6
7
8
9
10
11
12
13
// 将哈希表 key 中的字段 field 的值设为 value
bool set_hash_user1_uname = RedisHelper.HSet("User:10001", "uname", "gongyg"); // 冒号的作用相当于创建一个文件夹
bool set_hash_user1_upwd = RedisHelper.HSet("User:10001", "upassword", "123456");
bool set_hash_user1_uid = RedisHelper.HSet("User:10001", "uid", "12");
// 获取存储在哈希表中指定字段的值
string uName = RedisHelper.HGet("User:10001", "uname");
// 获取存储在哈希表中指定字段的值,并指定类型
int uId = RedisHelper.HGet<int>("User:10001", "uid");
// 删除一个或多个哈希表字段,不能删除key
long hDel1 = RedisHelper.HDel("User:10001", "uname");
  • #HGetAll、HKeys、HVals
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 获取在哈希表中指定 key 的所有字段和值
Dictionary<string, string> user10001 = RedisHelper.HGetAll("User:10001");
foreach (var item in user10001)
{
    string key = item.Key;
    string value = item.Value;
}
// 获取所有哈希表中的字段 [虽然使用HGetAll可以取出所有的value,但是有时候散列包含的值可能非常大,容易造成服务器的堵塞,为了避免这种情况,我们可以使用HKeys取到散列的所有键(HVals可以取出所有值),然后再使用HGet方法一个一个地取出键对应的值。]
string[] fields = RedisHelper.HKeys("User:10001");
foreach (string item in fields)
{
    string val = RedisHelper.HGet("User:10001", item);
}
// 获取哈希表中所有的值
string[] vals = RedisHelper.HVals("User:10001");
  • #HMSet、HMGet [HGet和HSet方法执行一次只能处理一个键值对,而HMGet和HMSet是他们的多参数版本,一次可以处理多个键值对。]
1
2
3
4
5
6
7
8
//var keyValues = dic.Select(a => new [] { a.Key, a.Value.ToString() }).SelectMany(a => a).ToArray();
string[] user2 = new string[] { "uname", "gmd", "upwd", "123" };
// 同时将多个field-value(域-值)对设置到哈希表 key 中
bool set_hash_user2 = RedisHelper.HMSet("User:10002", user2);
string[] user_get2 = new string[] { "uname", "upwd", "sj" };
// 获取存储在哈希表中多个字段的值
string[] user_val2 = RedisHelper.HMGet("User:10002", user_get2); // #gmd,123,
  • #对散列中的值进行自增、自减操作
1
2
3
4
bool set_hash_user1_usex = RedisHelper.HSet("User:10003", "uage", "23");
// 为哈希表 key 中的指定字段和整数值加上增量(increment)自增(正数),自减(负数)
long hIncrBy = RedisHelper.HIncrBy("User:10001", "uage", 2);
 3、list(列表)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 将一个或多个值插入到列表头部
string[] lpush1 = new string[] { "003", "004" };
long len1 = RedisHelper.LPush("list", "000");
long len2 = RedisHelper.LPush("list", "001", "002");
long len3 = RedisHelper.LPush("list", lpush1);
// 在列表中添加一个或多个值 [列表尾部]
long len4 = RedisHelper.RPush("list", "010");
// 移除并获取列表的第一个元素
string val1 = RedisHelper.LPop("list");
// 移除并获取列表的最后一个元素
string val2 = RedisHelper.RPop("list");
// 获取列表指定范围内的元素[key, start, stop]
string[] lrang1 = RedisHelper.LRange("list", 0, 2); // #左侧开始,获取前3个元素
string[] lrang2 = RedisHelper.LRange("list", 0, -1); // #左侧开始,获取全部元素
// 将 list 最后一个元素弹出并压入 list_another 的头部 [只有一个元素的改变,源列表会少一个元素,目标列表多出一个元素]
RedisHelper.RPopLPush("list", "list_another");
RedisHelper.Expire("list_another", 30);
 4、set(无序集合)
  • #对集合中的成员进行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 向集合添加一个或多个成员 [返回添加成功个数]
long sadd1 = RedisHelper.SAdd("my_set", "qaz");
long sadd2 = RedisHelper.SAdd("my_set", "tgb", "yhn");
string[] set1 = new string[] { "wsx", "edc" , "rfv" };
long sadd3 = RedisHelper.SAdd("my_set", set1);
// 判断 member 元素是否是集合 key 的成员
bool isMember = RedisHelper.SIsMember("my_set", "qaz");
// 返回集合中的所有成员
string[] members = RedisHelper.SMembers("my_set");
// 返回集合中的一个随机成员
string member1 = RedisHelper.SRandMember("my_set");
// 移除集合中一个或多个成员
long sRem = RedisHelper.SRem("my_set", "qaz");
// 移除并返回集合中一个随机成员
string member2 = RedisHelper.SPop("my_set");
  • #对两个集合进行交、并、差操作
1
2
3
4
5
6
7
8
9
10
11
12
13
RedisHelper.SAdd("set-a", "item1", "item2", "item3", "item4", "item5");
RedisHelper.SAdd("set-b", "item2", "item5", "item6", "item7");
// 差集
RedisHelper.SDiff("set-a", "set-b"); // "item1", "item3","item4"
// 交集
RedisHelper.SInter("set-a", "set-b"); // "item2","item5"
// 并集
RedisHelper.SUnion("set-a", "set-b"); // "item1","item2","item3","item4","item5","item6","item7"
//#另外还可以用SDiffStore,SInterStore,SUnionStore将操作后的结果存储在新的集合中。
 5、zset(sorted set:有序集合)
  • 有序集合可以看作是可排序的散列,不过有序集合的val成为score分值,集合内的元素就是基于score进行排序的,score以双精度浮点数的格式存储。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 向有序集合添加一个或多个成员,或者更新已存在成员的分数
RedisHelper.ZAdd("sorted_set", (1, "beijing"));
RedisHelper.ZAdd("sorted_set", (2, "shanghai"), (3, "shenzhen"));
(decimal, object)[] set1 = new (decimal, object)[] { (4, "guangzhou"), (5, "tianjing"), (6, "chengdu") };
RedisHelper.ZAdd("sorted_set", set1);
// 有序集合中对指定成员的分数加上增量 increment
decimal incr = RedisHelper.ZIncrBy("sorted_set", "beijing", -2);
// 通过索引区间返回有序集合成指定区域内的成员,分数从低到高 [key, start, stop]
string[] zRange1 = RedisHelper.ZRange("sorted_set", 0, 2);
string[] zRange2 = RedisHelper.ZRange("sorted_set", 0, -1); // #stop=-1返回全部
// 返回有序集合中指定区域内的成员,通过索引,分数从高到底 [key, start, stop]
string[] zRevRange1 = RedisHelper.ZRevRange("sorted_set", 0 , 2);
string[] zRevRange2 = RedisHelper.ZRevRange("sorted_set", 0, -1); // #stop=-1返回全部
// 移除有序集合中一个或多个成员
RedisHelper.ZRem("sorted_set", "shenzhen");
// 获取有序集合的成员数量
long number = RedisHelper.ZCard("sorted_set");
// 通过分数返回有序集合指定区间内的成员
string[] ZRangByScore1 = RedisHelper.ZRangeByScore("sorted_set", 2, 4);
// 通过索引区间返回有序集合成指定区间内的成员和分数
(string member, decimal score)[] sets = RedisHelper.ZRangeWithScores("Quiz", 0, -1);
 6、Geo(经纬度)
1
2
3
4
5
6
//1. 添加地点经纬度 [存储到 sorted set 中]
RedisHelper.GeoAdd("myLocation", Convert.ToDecimal(116.20), Convert.ToDecimal(39.56), "北京");
RedisHelper.GeoAdd("myLocation", Convert.ToDecimal(120.51), Convert.ToDecimal(30.40), "上海");
//2. 求两点之间的距离
var d1 = RedisHelper.GeoDist("myLocation", "北京", "上海", GeoUnit.km);
 7、事务
1
2
3
4
5
6
7
8
// 开启事务
var pipe = RedisHelper.StartPipe();
//中间对redis进行操作
pipe.Set("pipe1", "wsx");
// 提交
pipe.EndPipe();

 

Redis ZADD 命令

mikel阅读(620)

来源: Redis ZADD 命令

Redis ZADD 命令用于将一个或多个 member 元素及其 score 值加入到有序集 key 当中。

如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。

如果有序集合 key 不存在,则创建一个空的有序集并执行 ZADD 操作。

当 key 存在但不是有序集类型时,返回一个错误。

score 值可以是整数值或双精度浮点数,score 可为正也可以为负。

对有序集的更多介绍请参见 sorted set 。

注意: 在 Redis 2.4 版本以前, ZADD 每次只能添加一个元素。

语法

redis ZADD 命令基本语法如下:

redis 127.0.0.1:6379> ZADD key [NX|XX] [CH] [INCR] score member [score member …]

添加单个元素

redis> ZADD page_rank 10 google.com
(integer) 1

添加多个元素

redis> ZADD page_rank 9 baidu.com 8 redis.com.cn
(integer) 2

redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "redis.com.cn"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

添加已存在元素,且 score 值不变

redis> ZADD page_rank 10 google.com
(integer) 0

redis> ZRANGE page_rank 0 -1 WITHSCORES  # 没有改变
1) "redis.com.cn"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

添加已存在元素,但是改变 score 值

redis> ZADD page_rank 6 redis.com.cn
(integer) 0

redis> ZRANGE page_rank 0 -1 WITHSCORES  # redis.com.cn 元素的 score 值被改变
1) "redis.com.cn"
2) "6"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

ZADD 参数

ZADD 支持参数,参数位于 key 名字和第一个 score 参数之间:

  • XX: 仅更新存在的成员,不添加新成员。
  • NX: 不更新存在的成员。只添加新成员。
  • LT: 更新新的分值比当前分值小的成员,不存在则新增。
  • GT: 更新新的分值比当前分值大的成员,不存在则新增。
  • CH: 返回变更成员的数量。变更的成员是指 新增成员 和 score值更新的成员,命令指明的和之前score值相同的成员不计在内。 注意: 在通常情况下,ZADD返回值只计算新添加成员的数量。
  • INCRZADD 使用该参数与 ZINCRBY 功能一样。一次只能操作一个score-element对。

注意: GTLT 和 NX 三者互斥不能同时使用。

scores 有效值范围

Redis 有序集合的分数使用双精度64位浮点数表示。在Redis所支持的平台上,称为IEEE 754 floating point number,它能包括的整数范围是-(2^53) 到 +(2^53)。或者说是-9007199254740992 到 9007199254740992。更大的整数在内部用指数形式表示,所以,如果为分数设置一个非常大的整数,你得到的是一个近似的十进制数。

Sorted sets 101

有序集合按照分数以递增的方式进行排序。相同的成员(member)只存在一次,有序集合不允许存在重复的成员。 分数可以通过ZADD命令进行更新或者也可以通过ZINCRBY命令递增来修改之前的值,相应的他们的排序位置也会随着分数变化而改变。

获取一个成员当前的分数可以使用 ZSCORE 命令,也可以用它来验证成员是否存在。

更多关于有序集合的信息请参考 sorted sets.

相同分数的成员

有序集合里面的成员是不能重复的都是唯一的,但是,不同成员间有可能有相同的分数。当多个成员有相同的分数时,他们将是按字典排序(ordered lexicographically)(仍由分数作为第一排序条件,然后,相同分数的成员按照字典序排序)。

字典顺序排序用的是二进制,它比较的是字符串的字节数组。

如果用户将所有元素设置相同分数(例如0),有序集合里面的所有元素将按照字典顺序进行排序,范围查询元素可以使用 ZRANGEBYLEX 命令(注:范围查询分数可以使用 ZRANGEBYSCORE 命令)。

返回值

整数:

  • 被成功添加的新成员的数量,不包括那些被更新分数的、已经存在的成员。

如果使用 INCR 选项,则返回 多行字符串:

  • 以字符串形式表示的 member 的 score 值(双精度浮点数)。(双精度浮点数) , 执行失败返回 nil (当使用 XX 或 NX 选项)。

历史

  • >= 2.4: 支持一次增加或更新多个成员。
  • >= 3.0.2: 增加 XXNXCH 和 INCR 选项。
  • >=6.2: 增加 GT 和 LT 选项。

例子

redis> ZADD myzset 1 “one”

(integer) 1

redis> ZADD myzset 1 “uno”

(integer) 1

redis> ZADD myzset 2 “two” 3 “three”

(integer) 2

redis> ZRANGE myzset 0 -1 WITHSCORES

1) "one"
2) "1"
3) "uno"
4) "1"
5) "two"
6) "2"
7) "three"
8) "3"
redis> 

MSMQ 的持久化_走错路的程序员的博客-CSDN博客

mikel阅读(517)

来源: MSMQ 的持久化_走错路的程序员的博客-CSDN博客

MSMQ的消息默认是放在内存里面的. 重启服务或者断电的时候消息就没了. 对于重要的消息来讲这样肯定是不行的.

百度了好久也没发现如何持久化. 后来实在不行上Google 才找到对应的答案.
废话少说. 直接上答案.

标准答案就一句话 msg1.Recoverable = true;

System.Messaging.Message msg1 = new System.Messaging.Message(person);
msg1.Recoverable = true;//为true 的时候进行持久化.每个信息的Recoverable 为true 那么持久保存
mq.Send(msg1, transaction);

//下面的是全部的测试代码.. 可以不看..
//主测试方法

public class TestMQ
{

static string path = “.\\private$\\TestMQ”;
// 接收消息的函数
public static Object[] Receive()
{
MessageQueue mq;
if (MessageQueue.Exists(path))
{
mq = new MessageQueue(path);
}
else
{
mq = MessageQueue.Create(path);
}

// 定义消息队列中所有的消息种类(种类之间的顺序可以互换)
Type[] msgTypes = new Type[] { typeof(Person), typeof(Order), typeof(String) };

// 规定消息以XML格式编码
mq.Formatter = new XmlMessageFormatter(msgTypes);

// 定义事务
MessageQueueTransaction transaction = new MessageQueueTransaction();

try
{
// 如果消息队列采用了事务,则开始事务
if (mq.Transactional)
transaction.Begin();

System.Messaging.Message msg1 = mq.Receive(transaction);
Person person = (Person)msg1.Body;

System.Messaging.Message msg2 = mq.Receive(transaction);
Order order = (Order)msg2.Body;

System.Messaging.Message msg3 = mq.Receive(transaction);
string description = (String)msg3.Body;

// 如果消息队列采用了事务,则停止事务
if (mq.Transactional)
transaction.Commit();

return new Object[] { person, order, description };
}
catch (Exception ex)
{
// 如果消息队列采用了事务并且出现了异常,则终止事务
if (mq.Transactional)
transaction.Abort();

return null;
}
}

// 发送消息的函数
public static bool Send(Person person, Order order, string description)
{
MessageQueue mq;
if (MessageQueue.Exists(path))
{
mq = new MessageQueue(path);
}
else
{
mq = MessageQueue.Create(path);
}
mq.DefaultPropertiesToSend.Recoverable = true; //为true 的时候进行持久化. 经测试好像没用.
//mq.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.NegativeReceive; 消息是否需要反馈.
// 规定消息以XML格式编码
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Person), typeof(Order), typeof(String) });

// 定义事务
MessageQueueTransaction transaction = new MessageQueueTransaction();
System.Messaging.Message msg1 = new System.Messaging.Message(person);
System.Messaging.Message msg2 = new System.Messaging.Message(order);
System.Messaging.Message msg3 = new System.Messaging.Message(description);

try
{
// 如果消息队列采用了事务,则开始事务
if (mq.Transactional)
transaction.Begin();

msg1.Recoverable = true;//为true 的时候进行持久化.
mq.Send(msg1, transaction);

msg2.Recoverable = true;//为true 的时候进行持久化.
mq.Send(msg2, transaction);

msg3.Recoverable = false;//为true 的时候进行持久化.
mq.Send(msg3, transaction);

// 如果消息队列采用了事务,则停止事务
if (mq.Transactional) {
transaction.Commit();
}
return true;
}
catch (Exception ex)
{
// 如果消息队列采用了事务并且出现了异常,则终止事务
if (mq.Transactional)
transaction.Abort();

return false;
}
finally
{

mq.Close();
msg1 = null;
msg2 = null;
msg3 = null;
}
}

}

下面是两个model类

// 购物清单
public class Order
{
public Order()
{
Price = 0.0;
Number = 0;
Time = DateTime.Now;
}

public Order(Double price, UInt64 number, DateTime time)
{
Price = price;
Number = number;
Time = time;
}

// 物品单价
Double Price;

// 物品数量
UInt64 Number;

// 下单时间
DateTime Time;
}

// 人员类
public class Person
{
public Person()
{
m_Name = “”;
m_Age = 0;
}

public Person(string name, UInt16 age)
{
m_Name = name;
m_Age = age;
}

// 姓名
public string Name
{
get { return m_Name; }
set { m_Name = value; }
}

// 年龄
public UInt16 Age
{
get { return m_Age; }
set { m_Age = value; }
}

private string m_Name;
private UInt16 m_Age;
}

//发送按钮
private void 发送按钮_Click(object sender, EventArgs e)
{

for (int i = 0; i < 100000; i++)
{
Person person = new Person(“Jackie”, 30);
Order order = new Order(110.0, 10, DateTime.Now);
string description = “This is a new order.”;
TestMQ.Send(person, order, description);
System.GC.Collect();
}

}

private void 接收按钮_Click(object sender, EventArgs e)
{
// 接收消息。返回值为 null 时表示接收失败
Object[] message = TestMQ.Receive();

if (message != null)
{
// 当消息接收成功时,将消息依次取出并存入下面的对象中
Person person = (Person)message[0];
Order prder = (Order)message[1];
string description = (string)message[2];
if (description == “This is a new order.”)
{
MessageBox.Show(“消息接收成功 !”);
}
else
{
MessageBox.Show(“消息接收失败 !”);
}

}
else
{
MessageBox.Show(“消息接收失败 !”);
}
}
————————————————
版权声明:本文为CSDN博主「走错路的程序员」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/phker/article/details/71124256

Redis实现消息队列 - 简书

mikel阅读(554)

来源: Redis实现消息队列 – 简书

Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而且对消息的可靠性不是那么严格可以尝试使用。

Redis实现消息队列

列表类型

队列

Redis中列表List类型是按照插入顺序排序的字符串链表,和数据结构中的普通链表一样,可以在头部left和尾部right添加新的元素。插入时如果键不存在Redis将为该键创建一个新的链表。如果链表中所有元素均被删除,那么该键也会被删除。

Redis List

Redis的列表List可以包含的最大元素数量为4294967295,从元素插入和删除的效率来看,如果是在链表的两头插入或删除元素将是非常高效的操作。即使链表中已经存储了数百万条记录,该操作也能在常量时间内完成。然后需要说明的是,如果元素插入或删除操作是作用于链表中间,那将是非常低效的。

Redis中对列表List的操作命令中,L表示从左侧头部开始插入和弹出,R表示从右侧尾部开始插入和弹出。

Redis提供了两种方式来做消息队列,一种是生产消费模式,另一种是发布订阅模式。

生产消费模式

生产消费模式会让一个或多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的。如果队列中没有消息,消费者会继续监听。

PUSH/POP

Redis数据结构的列表List提供了pushpup命令,遵循着先入先出FIFO的原则。使用push/pop方式的优点在于消息可以持久化,缺点是一条消息只能被一个消费者接收,消费者完全靠手速来获取,是一种比较简陋的消息队列。

Redis的队列list是有序的且可以重复的,作为消息队列使用时可使用rpush/lpush操作入队,使用lpop/rpop操作出队。当发布消息是执行lpush命令,将消息从列表左侧加入队列。消息接收方执行rpop命令从列表右侧弹出消息。

如果队列空了怎么办呢?

如果队列空了,消费者会陷入pop死循环,即使没有数据也不会停止。空轮询不但消耗消费者的CPU资源还会影响Redis的性能。傻瓜式的做法是让消费者的线程按照一定的时间间隔不停的循环和监控队列,虽然可行但显然会造成不必要的资源浪费,而且循环周期也很难确定。

对于消费者而言存在一个问题,需要不停的调用rpop查看列表中是否有待处理的消息。每调用一次都会发起一次连接,势必造成不必要的资源浪费。如果使用休眠的方式让消费者线程间隔一段时间再消费,但这样做也有两个问题:

  • 如果生产者速度大于消费者消费的速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
  • 如果休眠时间过长,就无法处理一些时效性的消息。如果休眠时间过短也会在连接上造成比较大的开销。

LPOP返回一个元素给客户端时会从List中将该元素移除,这意味着该元素只存在于客户端的上下文中。如果客户端在处理这个返回元素的过程中崩溃了,这个元素就会永远的丢失掉。

LPUSH/BRPOP

LPUSH BRPOP

使用brpopblpop实现阻塞读取

由于需要一直调用rpop/lpop才可以实现不停的监听且消费消息,为解决这个问题,Redis提供了阻塞命令brpop/blpop。使用brpop会阻塞队列,而且每次只会弹出一个消息,如果没有消息则会阻塞。

Redis列表List支持带阻塞的命令,生产者从列表左侧lpush加入消息到队列,消费者使用brpop命令从列表右侧弹出消息并设置超时时间,如果列表中没有消息则一直阻塞直到超时。这样做的目的在于减小Redis的压力。

对于Redis来说提供了blpop/brpop阻塞读,阻塞读在队列没有数据时会立即进入休眠状态,一旦数据到来则立即被唤醒,消息的延迟几乎为零。需要注意的是如果线程一直阻塞在那里,连接就会被服务器主动断开来减少资源占用,这时blpop/brpop会抛出异常,所以编写消费段时需要注意异常的处理。

BRPOP key [key ...] timeout

BLPOP/BRPOP 列表阻塞式弹出

当给定列表内没有任何元素可供弹出时,连接将被BRPOP命令阻塞,直到等待超时或发现可弹出元素为止。当给定多个key参数时,按参数key的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。另外,BRPOP除了弹出元素的位置和BLPOP不同之处,其他表现一致。

列表的阻塞式弹出特点是如果列表中没有任务时,连接将会被阻塞。连接的阻塞存在一个超时时间,当超时时间设置为0时刻无限等待直到弹出消息。

Redis的PUSH/POP机制,利用Redis的列表list数据结构,生产者lpush消息,消费者brpop消息并设定超时时间以减少Redis压力。这种方案相对于发布订阅模式的好处是数据可靠性提高了,只有在Redis宕机且数据没有持久化的情况下会丢失数据。可以根据业务通过AOF和缩短持久化间隔来保证较高的可靠性,也可以通过多个客户端来提高消息速度。但相对于专业的消息队列中间件,发布订阅模式的状态过于简单(没有状态),而且没有ACK机制,消息取出后消费失败依赖于客户端记录日志或重新push到队列中。

Redis中实现生产者和消费者模型,可使用LPUSHRPOP来实现该功能。不过当列表为空时消费者就需要轮询来获取消息,这样会增加Redis的访问压力和消费者的CPU时间,另外很多访问也是无用的。为此Redis提供了阻塞式访问BRPOPBLPOP命令,消费者可以在获取数据时指定如果数据不存在阻塞的时间,如果在时限内获得数据则立即返回,如果超时还没有数据则返回NULL,可使用0表示一直阻塞。同时Redis会为所有阻塞的消费者以先后顺序排序。

使用Redis的列表来实现一个任务队列,开启两个程序,一个作为生产者使用LPUSH写队列,一个作为消费者使用RPOP读队列。由于消费者并不知道什么时候会有消息过来,所以消费者需要一直循环读取数据。两者的消息可以使用JSON进行封装协议传输。由于消费者在没有读到数据的情况下,会一直循环读取,对系统来说十分耗费资源,此时可利用Redis提供的阻塞读取命令BRPOP进行改进。使用BRPOP改进后,消费者不会一直循环读取,而是一直阻塞等到有消息过来时才读取。

发布订阅模式

发布订阅模式是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。

PUB/SUB

Redis自带pub/sub机制即发布订阅模式,此模式中生产者producer和消费者consumer之间的关系是一对多的,也就是一条消息会被多个消费者所消费,当只有一个消费者时可视为一对一的消息队列。

发布订阅机制模型

首先发布者将消息发布到频道,客户端订阅频道后就能获得频道的消息。

发布订阅模式命令

  • psubscribe 订阅一个或多个符合给定模式的频道
  • publish 将消息发布到指定的频道
  • pubsub查看订阅与发布系统状态
  • pubsub channels pattern 列出当前的活跃频道
  • pubsub numsub channel-1 channel-n 获取给定频道的订阅者数量
  • pubsub numpat 获取订阅模式的数量
  • punsubscribe 指示客户端退订所有给定模式
  • subscribe 订阅给定的一个或多个频道的消息
  • unsubscribe 指示客户端退订给定的频道

实现

使用PHP+Redis实现消息队列

操作流程

  1. PHP接收请求和数据
  2. PHP将数据写入Redis队列(入队)
  3. Shell定时调用PHP读取队列数据并写入数据库(出队)

入队inqueue.php

$result = $redis->rpush("queue", json_encode($data));
if($result){
  echo "inqueue success";
}

出队 outqueue.php

#! /usr/bin/php
<?php
$result = $redis->lpop("queue");
if($result){
  $data = json_decode($result, true);
}

定时任务:process.sh

# 每分钟调用一次定时脚本
* * * * * /scripts/process.sh

定时脚本:process.sh

#! /bin/bash
#filename : process.sh
php /scripts/outqueue.php

出队采用死循环方式,感谢 行走平凡 的指正。

$ vim outqueue.php

while(true){
  $result = $redis->lpop("queue");
  if($result){
    $data = json_decode($result, true);
  }
}

作者:JunChow520
链接:https://www.jianshu.com/p/02a923fea175
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

C#实现消息队列_Shuai_Sir的博客-CSDN博客_c# 消息队列

mikel阅读(604)

来源: C#实现消息队列_Shuai_Sir的博客-CSDN博客_c# 消息队列

C#实现消息队列
1.认识消息队列
消息队列 (MSMQ Microsoft Message Queuing)是MS提供的服务,也就是Windows操作系统的功能,并不是.Net提供的。

2.什么是消息队列
消息队列一般简称为 MQ (Messges Queue),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器。消息队列本质上是一个队列,而队列中存放的是一个个消息。

队列是一个数据结构,具有先进先出的特点。而消息队列就是将消息放到队列里,用队列做存储消息的介质。消息的发送放称为生产者,消息的接收方称为消费者。

消息队列由 Broker(消息服务器,核心部分)、Producer(消息生产者)、Consumer(消息消费者)、Topic(主题)、Queue(队列)和Message(消息体)组成。

3. 消息队列特点
消息队列有三个作用,分别是削峰、解耦和异步。

流量削峰:主要用于在高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

假设系统只能处理1000个请求,但这时突然来了3000个请求,如果不加以限制就会造成系统瘫痪。使用消息队列做缓冲,将多余的请求存放在消息队列中,等系统根据自己处理请求的能力去消息队列去。

应用解耦:主要用于当一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理时,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

假设某个服务 A 需要调用服务 B,但是服务 B 突然出现问题,这样会导致服务 A 也会出现问题。如果使用消息队列,当服务 A 执行完成之后,发送一条消息到队列中,服务 B 读取到这条消息,那么它立刻开始进行业务的执行。

异步通信:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

假设有一个业务,要先执行服务 A ,然后服务 A 去调用服务 B ,当服务 B 完成之后,服务 A 调用服务 C,这个业务需要一步步走下去。当使用了消息队列之后,服务 A 完成之后,可以同时执行服务 B 和 服务 C ,这样就减低业务的响应时间,提高用户体验。

消息队列简单例子
一下是WIN10的操作步骤:
1.打开运行,输入”OptionalFeatures”,钩上Microsoft Message Queue(MSMQ)服务器。

2. 消息队列分为以下几种,每种队列的路径表示形式如下:

公用队列 MachineName\QueueName

专用队列 MachineName\Private$\QueueName

日记队列 MachineName\QueueName\Journal$

计算机日记队列 MachineName\Journal$

计算机死信队列 MachineName\Deadletter$

计算机事务性死信队列 MachineName\XactDeadletter$

这里的MachineName可以用 “.”代替,代表当前计算机

3.打开“此电脑”–>左上角点击“计算机”–>点击“管理”

注意:如果没看到“消息队列”这个选项,那就是这东西在这电脑上还没使用过,没关系,一会编程的时候创建就会显示出来了(我印象中一开始我也没有这个选项,写了代码后就有了)

知道这东西在哪里后,开始进入正题。C#中使用消息队列需要添加新的引用System.Messaging

4.添加引用后在代码里添加命名空间就可以开始了

这里我们新建一个控制台程序:

存入数据:

static void Main()
{
//存
string msgPath = “.\\Private$\\MyMsg”;
//指定路径一个名称为“MyMsg”的专用队列的路径字符串
string studentName = “hello word”;
// 要写入消息消息队列的信息
SendMsg(msgPath, studentName);
Console.WriteLine(“发送成功” + studentName);
Console.ReadLine();
}
/// 存
/// </summary>
/// <param name=”mQPath”></param>
/// <param name=”studentName”></param>
public static void SendMsg (string mQPath,string studentName)
{
//先判断这个消息队列是否存在,如果存在则直接实例化对象,如果不存在则创建该消息队列
MessageQueue mq = (MessageQueue.Exists(mQPath)) ? (new MessageQueue(mQPath)) : (MessageQueue.Create(mQPath));
mq.Send(studentName);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
读取数据:

static void Main()
{
//读取数据
string msgPath = “.\\Private$\\MyMsg”;
ReceiveMsg(msgPath);//异步读取
// ReceiveMsgT(msgPath); 同步读取
Console.ReadLine();

}
///异步读取
private static void ReceiveMsg(string msgPath) {
if (MessageQueue.Exists(msgPath))
{
MessageQueue mq=new MessageQueue(msgPath);
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(ReceiveMethon);
mq.BeginReceive(MessageQueue.InfiniteTimeout);
Console.WriteLine(“异步已接收数据”);
}

}
static readonly XmlMessageFormatter f=new XmlMessageFormatter(new Type[] { typeof(string)}); //格式
private static void ReceiveMethon(object sender,ReceiveCompletedEventArgs e) {
Message m=e.Message;
m.Formatter = f;//转换格式
Console.WriteLine(m.Body.ToString());
}
//同步读取
private static void ReceiveMsgT(string msgPath) {
if (MessageQueue.Exists(msgPath))
{
MessageQueue mq= new MessageQueue(msgPath);
Message m = mq.Receive();
m.Formatter= f;//格式转换
Console.WriteLine(m.Body.ToString());
Console.WriteLine(“同步已接受数据”);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
这里简单说一下两种方式读取的区别,同步读取数据的话,会堵塞当前线程,直到读到数据为止才继续运行之后的程序,异步读取的话就避免了这个问题。如果读取的数据量小、速度快,为了编写代码方便可选择同步读取,如果读取的数据量大、速度慢,则建议使用异步读取。

消息队列有点像我们常见的数据库,能进行数据的存取,但是与数据库其中一点不同的就是消息队里中的消息被读取后就会被销毁,不需要我们手动删除。
————————————————
版权声明:本文为CSDN博主「Shuai_Sir」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Shuai_Sir/article/details/127899893

c#之Redis队列 - wolfy - 博客园

mikel阅读(488)

来源: c#之Redis队列 – wolfy – 博客园

摘要

这两天一直在考虑redis队列:一个生产者,多个消费者的情况,这里弄了一个demo进行测试。

一个例子

关于如何引用Redisclient 可以参考之前的这篇文章:c#之Redis实践list,hashtable

生产者一个线程,然后开启多个线程用来消费数据。

代码如下:

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NServiceKit.Redis;
using NServiceKit.ServiceClient;
using System.Threading;
namespace RedisDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            Thread thread = new Thread(run);
            thread.Start();

            Thread[] threads = new Thread[10];
            for (int i = 0; i < threads.Length; i++)
            {
                threads[i] = new Thread(Pull);
                threads[i].Start();
            }

            Console.Read();
        }
        private static void Pull()
        {
            IRedisClientFactory factory = RedisClientFactory.Instance;
            using (IRedisClient client = factory.CreateRedisClient("192.168.1.37", 6379))
            {
                client.Password = "wolfy";
                while (true)
                {
                    if (client.GetListCount("Myqueue") > 0)
                    {
                        string result = client.DequeueItemFromList("Myqueue");
                        //如果获取的内容为空,将当前线程挂起1s
                        if (string.IsNullOrEmpty(result))
                        {
                            Thread.SpinWait(1000);
                        }
                        else
                        {
                            Console.WriteLine("Threadid:" + Thread.CurrentThread.ManagedThreadId.ToString() + "\t" + result);
                        }
                    }
                    else
                    {
                        //如果当前队列为空,挂起1s
                        Thread.SpinWait(1000);
                    }
                }
            }

        }
        private static void run()
        {
            IRedisClientFactory factory = RedisClientFactory.Instance;
            using (IRedisClient client = factory.CreateRedisClient("192.168.1.37", 6379))
            {
                client.Password = "wolfy";
                while (true)
                {
                    client.EnqueueItemOnList("Myqueue", DateTime.Now.ToString());
                }
            }


        }
    }
}
复制代码

测试

总结

关于队列有考虑过rabbitmq,msmq等,考虑到公司有现成的redis服务器,所以就考虑使用redis队列。既然实现了一生产者,多个消费者,那么接下来,想实现一种多队列,然后设置队列的容量,通过容量,生产者在入队的时候,根据队列是否满,然后对数据进行分发的情况。

Windows电脑反编译微信小程序含分包详细操作 - 掘金

mikel阅读(580)

来源: Windows电脑反编译微信小程序含分包详细操作 – 掘金

现在网上也有很多关于小程序反编译的教程,随时间的流逝或许随着微信的更新,有出现编译不成功的现象。

本篇文章总结一下最新的编译过程,已成功获得小程序源码(有分包的小程序)

环境准备

1、 node 环境准备

下载链接:nodejs.org/en/

image-20210901154759757

安装后将nodejs设置为环境变量。
打开cmd,测试是否安装成功:在命令行输入node -v 出现版本号说明已经安装成功。

2、反编译工具

项目地址来自于: github.com/xuedingmiao…

通过下面链接下载:

链接:pan.baidu.com/s/1p-wnX-mX…
提取码:z06a

下载下来解压到某个位置就可以了,一定要通过网盘下载,里面有解密包的工具和安装后的npm环境,直接使用即可

具体操作

1、微信PC获取小程序

在通过微信PC打开小程序前,我们最好先找到缓存到本地的小程序包路径,一般都是在 微信PC安装目录\WeChat Files\WeChat Files\Applet

比如我的就是安装到 D盘根目录的,所以路径为: D:\WeChat\WeChat Files\WeChat Files\Applet

image-20210901160649022

上图中每个文件夹代表一个小程序,一般最新打开的小程序都是在第一个,如果不确定可以排序一下修改日期

找到路径了我们就可以用微信PC打开小程序了,打开后就会发现当前目录新增了一个文件夹,里面存放的就是加密后的小程序包

image-20210901161153138

2、解密包

刚获取到的包我们还不能进行反编译,必须要通过 解密软件 修改一下才能反编译

image-20210901161556010

本篇就演示一个主包和一个分包反编译的过程就可以了,先通过解密软件修改一下主包

image-20210901161936026

解密的主包自动到 wxpack 这个包里面来了,同样的步骤解密一个分包,下图是我解密好的两个,并且修改了一下名称,好区分

image-20210901162219406

3、反编译

进入 wxpack 的同级目录 wxappUnpacker-master,在路径栏输入 cmd 自动打开当前目录的 命令窗口了

image-20210901162538413

先反编译一下主包,把反编译后的文件夹放到 wxpack 同级目录中

node wuWxapkg.js ..\wxpack\master-app.wxapkg

复制代码

image-20210901163011475

再反编译分包,把反编译后的文件夹放到 wxpack 同级目录中

node wuWxapkg.js -s=..\ ..\wxpack\_pages_app.wxapkg

复制代码
  • -s 表示分包
  • 第一个..\ 表示输出位置
  • ..\wxpack\_pages_app.wxapkg 需要反编译的分包位置

image-20210901163627239

好了剩下的就是自己组合一下包的架构目录了~~~~

如果本篇文章给予了您一点帮助,还请点个赞收藏一下~~

谢谢您的支持!!!

作者:LexKun
链接:https://juejin.cn/post/7002889906582192158
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。