快速实现Docker到Redis的连接_sannerlittle的博客-CSDN博客_docker redis 连接

mikel阅读(762)

来源: 快速实现Docker到Redis的连接_sannerlittle的博客-CSDN博客_docker redis 连接

OS:Xenial

$ docker pull redis 

运行上面的命令下载镜像,Docker daemon会自动输出该Redis镜像的来源信息、下载状态,下载完成之后系统也会显示最终状态信息。

镜像拉取完成之后,大家可以用下面的命令启动Redis容器,记得要带上“-d”参数:

$ docker run --name myredis-itsmine -d redis 

“-d”参数的作用是让Redis在后台运行,因为本例中采用这种后台运行的方式较为合适,所以这里我们写上了这个参数。如果不带 “-d”参数的话处理起来就要麻烦一些,这种情况下我们需要先停止终端的运行或者退出容器,然后才能通过宿主机来访问Redis。

下面我们要进行最重要的一步操作,连接Redis。由于我们并没有实际的需要连接到Redis的应用,所以这里我们用了redis-cli工具。大家可以在宿主机上安装redis-cli,不过我建议大家新建一个容器,将redis-cli运行在里面,然后用下面的命令把这两个容器连接起来,这样我们就可以看到详细的应用信息:

$docker run --rm -it --link myredis-itsmine:redis redis /bin/bash 

运行该命令之后我们就可以在bash命令行下面看到容器的提示信息了:

root@f75bacab2715:/data#
$ docker run --rm -it --link myredis:redis redis /bin/bash
$ root@af47015c4a76:/data# redis-cli -h redis -p 6379
$ redis:6379> ping
$ PONG
$ redis:6379> set "Abc" 123
$ OK
$ redis:6379> get "Abc"
$ "123"
$ redis:6379> exit
root@af47015c4a76:/data# exit
$ exit

在上面的命令中,docker run命令后面跟的“–link myredis-itsmine:redis” 参数用于创建连接,Docker收到该指令后,就会尝试将我们新建的容器连接到当前的“myredis-itsmine” 容器,同时会将新容器中的redis-cli命名为“redis”。Docker会在容器中的/etc/hosts路径下为“redis”创建一个入口,并指向“myredis-itsmine”容器的IP地址。这样我们就可以在redis-cli中直接使用“redis”作为主机名,这种做法是很方便的,我们不必再去找其他办法来“发现”Redis的IP地址,然后加以引用了。

接下来我们就可以通过set和put命令来执行Redis的存取操作了,这里我们可以用一些示例数据来做个试验。当然,在开始存取操作之前,大家还要再运行一下Redis的ping命令,验证一下Redis服务器是否已经连接上了。

Recommend: http://www.hawu.me/operation/802

DOCKER简明教程 : 通过容器连接REDIS数据库

mikel阅读(696)

本文来源:Ghostcloud翻译(http://www.ghostcloud.cn/

序言
本文重点讲解了如何通过Redis的官方镜像和Docker容器来搭建redis-cli,并将其连接到Redis镜像。首先要跟大家简单介绍一下Redis,这是一个键值存储系统,除了对场景进行缓存之外,Redis还提供了很多强大的功能,因此也目前是非常受欢迎的一个数据库。

Docker镜像仓库简介
大家可以在Docker Hub里搜索到目前所有的主流应用和服务的镜像,像Python语言、MySQL数据库等等镜像在Docker Hub里面都有。而且Docker Hub里面的镜像数量非常多,不管我们搜什么关键词,都能搜出来一大堆结果。之所以会这样,是因为Docker官方镜像仓库的宗旨就是将已知来源、满足一定质量标准的Docker镜像组织在一起提供给用户。一般情况下,我建议大家都尽量使用Docker Hub提供的官方镜像,大家可以在查询结果列表中看到当前分类下有哪些官方镜像,一般官方镜像都会在列表中置顶显示,而且会有标有“官方”字样。

从官方镜像仓库pull镜像的时候,用户名的部分可以为空,也可以设置为library,比如说拉取 Casandra镜像的时候就可以设置成从Apache Cassandra项目获取。大家也可以在自己的终端上运行下面的命令,在Docker Hub中查找Cassandra镜像:

$docker search Cassandra

通过这种方式对Docker Hub进行查询的时候,系统会返回一条消息,提示用户“你所拉取的镜像已通过验证”,看到这条信息就表示镜像的校验码通过了Docker daemon的验证,来源是可靠的。

快速实现Docker到Redis的连接
闲话少说,我们下面就进入实战教程。首先运行下面命令,从Docker Hub拉取Redis镜像:

$ docker pull redis

运行上面的命令下载镜像,Docker daemon会自动输出该Redis镜像的来源信息、下载状态,下载完成之后系统也会显示最终状态信息。

镜像拉取完成之后,大家可以用下面的命令启动Redis容器,记得要带上“-d”参数:

$ docker run –name myredis-itsmine -d redis

“-d”参数的作用是让Redis在后台运行,因为本例中采用这种后台运行的方式较为合适,所以这里我们写上了这个参数。如果不带 “-d”参数的话处理起来就要麻烦一些,这种情况下我们需要先停止终端的运行或者退出容器,然后才能通过宿主机来访问Redis。

下面我们要进行最重要的一步操作,连接Redis。由于我们并没有实际的需要连接到Redis的应用,所以这里我们用了redis-cli工具。大家可以在宿主机上安装redis-cli,不过我建议大家新建一个容器,将redis-cli运行在里面,然后用下面的命令把这两个容器连接起来,这样我们就可以看到详细的应用信息:

$docker run –rm -it –link myredis-itsmine:redis redis /bin/bash

运行该命令之后我们就可以在bash命令行下面看到容器的提示信息了:

root@f75bacab2715:/data#
$ docker run –rm -it –link myredis:redis redis /bin/bash
$ root@af47015c4a76:/data# redis-cli -h redis -p 6379
$ redis:6379> ping
$ PONG
$ redis:6379> set “Abc” 123
$ OK
$ redis:6379> get “Abc”
$ “123”
$ redis:6379> exit
root@af47015c4a76:/data# exit
$ exit

在上面的命令中,docker run命令后面跟的“–link myredis-itsmine:redis” 参数用于创建连接,Docker收到该指令后,就会尝试将我们新建的容器连接到当前的“myredis-itsmine” 容器,同时会将新容器中的redis-cli命名为“redis”。Docker会在容器中的/etc/hosts路径下为“redis”创建一个入口,并指向“myredis-itsmine”容器的IP地址。这样我们就可以在redis-cli中直接使用“redis”作为主机名,这种做法是很方便的,我们不必再去找其他办法来“发现”Redis的IP地址,然后加以引用了。

接下来我们就可以通过set和put命令来执行Redis的存取操作了,这里我们可以用一些示例数据来做个试验。当然,在开始存取操作之前,大家还要再运行一下Redis的ping命令,验证一下Redis服务器是否已经连接上了。

本文讲述了如何通过容器来实现Redis数据库的连接,看到这里,大家是否已经对容器的网络连接有个初步的概念了?新版的Docker在网络功能方面也做出了一定的改进,相信在不久的将来,所有用户都可以很方便地通过Docker容器实现应用和服务的互连。

Redis安装完后redis-cli无法使用(redis-cli: command not found)_坤哥的博客-CSDN博客

mikel阅读(715)

来源: Redis安装完后redis-cli无法使用(redis-cli: command not found)_坤哥的博客-CSDN博客

之前安装redis后客户端无法使用,即redis-cli执行后报找不到的错误。这主要是安装redis的时候没有把客户端装上,在StackOverFlow上找到了一种只安装redis cli的方法,这里跟大家分享下。

wget http://download.redis.io/redis-stable.tar.gz(下载redis-cli的压缩包)
tar xvzf redis-stable.tar.gz(解压)
cd redis-stable(进入redis-stable目录)
make(安装)
sudo cp src/redis-cli /usr/local/bin/(将redis-cli拷贝到bin下,让redis-cli指令可以在任意目录下直接使用)

按照上面的指令执行之后redis-cli就可以正常执行了,注意上面的几条指令必须都执行,make是单独的一条。
————————————————
版权声明:本文为CSDN博主「月未明」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_35981283/article/details/71631540

.NETcore使用CSRedisCore操作Redis - 心冰之海 - 博客园

mikel阅读(1490)

来源: .NETcore使用CSRedisCore操作Redis – 心冰之海 – 博客园

因为Servicestack.Redies免费每天只能调用6000次,所以找了个免费的能在.NETcore使用的插件CSRedisCore,封装了一下。

redies订阅模式的调用:RedisCoreHelper.Sub(“quote”, action);

1
2
3
4
5
6
7
8
9
10
11
12
13
public void action(string message)
{
    if (!message.IsNullOrEmpty() && !"null".Equals(message))
    {
        
//dosomething
               
    }
    else
    {
        //Thread.Sleep(200);
    }
}       

 

封装如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
internal class RedisConfigManager
{
    /// <summary>
    /// 获取配置
    /// </summary>
    /// <returns></returns>
    public static RedisConfig GetConfig()
    {
        var path = WorkPath.CurrentDirectory + "\\redis.config.json";
        Log.Info("path:"+ path);
        var json = FileManager.GetTextFromPath(path);
        if (json.IsNullOrEmpty())
            return new RedisConfig();
        var config = JsonConvert.Deserialize<RedisConfig>(json);
        return config;
    }
}

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
namespace LT.Cache
{
    public class RedisCoreHelper
    {
        static CSRedisClient redisManger = null;
        static CSRedisClient GetClient()
        {
            return redisManger;
        }
        static RedisCoreHelper()
        {
            var redisconfig = RedisConfigManager.GetConfig();
            redisManger = new CSRedisClient(redisconfig.CoreRedisServer);      //Redis的连接字符串
        }
        /// <summary>
        /// TradeManageMessage 和 TradeManageMessage:MQ队列
        /// </summary>
        /// <returns></returns>
        public static bool EnQeenTradeManageMessage(string value)
        {
            try
            {
                Log.Info("yinzhou--EnQeenTradeManageMessage:" + value);
                //从头部插入
                GetClient().LPush("TradeManageMessage", value);
                GetClient().LPush("TradeManageMessage:MQ", value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQeenTradeManageMessage:key=TradeManageMessage:MQ,value={value}", e);
                return false;
            }
        }
        /// <summary>
        /// TradeManageMessage 和 TradeManageMessage:MQ队列
        /// </summary>
        /// <returns></returns>
        public static bool EnQeenTradeManageMessage<T>(T value)
        {
            try
            {
                //从头部插入
                GetClient().LPush("TradeManageMessage", value);
                GetClient().LPush("TradeManageMessage:MQ", value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQeenTradeManageMessage:key=TradeManageMessage:MQ,value={value}", e);
                return false;
            }
        }
        public static bool EnQueen(string key, string value)
        {
            try
            {
                //从头部插入
                GetClient().LPush(key, value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"EnQueen:key={key},value={value}", e);
                return false;
            }
        }
        public static string DeQueen(string key)
        {
            string result = "";
            try
            {
                //从尾部取值
                result = GetClient().RPop(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
    //redis订阅模式
        public  static void Sub(string key,Action<string> action)
        {
            GetClient().Subscribe((key, msg => action(msg.Body)));
        }
        public static string[] DeQueenAll(string key)
        {
            string[] result = { };
            try
            {
                long len = GetClient().LLen(key);
            
                //取出指定数量数据
                result = GetClient().LRange(key,0, len-1);
                //删除指定数据
                bool res=GetClient().LTrim(key, len, -1);
  
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
        public static bool EnQueen<T>(string key, T value)
        {
            try
            {
                //从头部插入
                long len= GetClient().LPush(key, value);
                if(len>0)
                    return true;
                else
                    return false;
            }
            catch (Exception e)
            {
                Log.Error($"EnQueenObj:key={key},value={value}", e);
                return false;
            }
        }
        public static T DeQueen<T>(string key)
        {
            T result=default(T);
            try
            {
                //从尾部取值
                result = GetClient().RPop<T>(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"DeQueen:key={key}", e);
                return result;
            }
        }
        /// <summary>
        /// 设置hash值
        /// </summary>
        /// <param name="key"></param>
        /// <param name="field"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public static bool SetHash(string key, string field,string value)
        {
            try
            {
                GetClient().HSet(key, field, value);
                return true;
            }
            catch (Exception e)
            {
                Log.Error($"SetHash:key={key},value={value}", e);
                return false;
            }
        }
        /// <summary>
        /// 根据表名,键名,获取hash值
        /// </summary>
        /// <param name="key">表名</param>
        /// <param name="field">键名</param>
        /// <returns></returns>
        public static string GetHash(string key,string field)
        {
            string result = "";
            try
            {
          
                result = GetClient().HGet(key,field);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return result;
            }
        }
        /// <summary>
        /// 获取指定key中所有字段
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static Dictionary<string,string> GetHashAll(string key)
        {
            try
            {
                var result = GetClient().HGetAll(key);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return new Dictionary<stringstring>();
            }
        }
        /// <summary>
        /// 根据表名,键名,删除hash值
        /// </summary>
        /// <param name="key">表名</param>
        /// <param name="field">键名</param>
        /// <returns></returns>
        public static long DeleteHash(string key, string field)
        {
            long result = 0;
            try
            {
                result = GetClient().HDel(key, field);
                return result;
            }
            catch (Exception e)
            {
                Log.Error($"GetHash:key={key}", e);
                return result;
            }
        }
        //private object deleteCache( Method method, Object[] args)
        //{
        //    object flag = false;
        //    String fieldkey = parseKey(method, args);
        //    try
        //    {
        //        if (fieldkey.equals(""))
        //        {
        //            cacheClient.del(cache.key());
        //        }
        //        else
        //        {
        //            cacheClient.hdel(cache.key(), fieldkey);
        //        }
        //        flag = true;
        //    }
        //    catch (Exception e)
        //    {
        //        //System.out.println(e.getMessage());
        //        flag = false;
        //    }
        //    return flag;
        //}
        /**
         * 获取值field
         * @param key
         * @param method
         * @param args
         * @return
         * @throws Exception
         */
        //        public string parseKey(Method method, Object[] args)
        //        {
        //            string fieldKey = "";
        //            for (int i = 0; i < method.getParameters().length; i++)
        //            {
        //                Parameter p = method.getParameters()[i];
        //                FieldAnnotation f = p.getAnnotation(FieldAnnotation.class);
        //          if(f!=null) {
        //              fieldKey+=args[i].toString()+":";
        //          }else {
        //              FieldOnlyKeyAnnotation fo = p.getAnnotation(FieldOnlyKeyAnnotation.class);
        //              if(fo != null) {
        //                  fieldKey+=args[i].toString();
        //}
        //          }
        //      }
        //      return fieldKey;
        //    }
    }
}

使用 .NET CLI 构建项目脚手架 - SpringLeee - 博客园

mikel阅读(929)

来源: 使用 .NET CLI 构建项目脚手架 – SpringLeee – 博客园

前言

在微服务场景中,开发人员分配到不同的小组,系统会拆分为很多个微服务,有一点是,每个项目都需要单元测试,接口文档,WebAPI接口等,创建新项目这些都是重复的工作,而且还要保证各个项目结构的大体一致,这时就需要一个适用于企业内部的框架模板,类似于前端的脚手架,可以做到开箱即用,注重业务功能开发,提升工作效率。

简介

NET 命令行接口 (CLI) 工具是用于开发、生成、运行和发布 .NET 应用程序的跨平台工具链。

本次主要介绍的是 dotnet new 命令,可以通过这个命令创建我们的自定义模板,我们安装完.NET SDK后,本身自带了一些项目模板,可以通过 dotnet new --list 查看已经安装的模板。

接下来,我会介绍如何构建自定义的项目模板。

准备工作

首先,我们需要准备一个简单的项目模板,我们希望以后可以通过脚手架,自动为我们生成这些项目和文件,这里面可能包含了单元测试项目,WebAPI项目等。

你也可以在这里找到项目源代码,https://github.com/SpringLeee/Dy.Template

在本地创建 Dy.Template 文件夹,并在文件夹内创建 templates 文件夹(后边所有的模板文件都在这里), 这里我创建了一个解决方案,里面包含了3个项目,WebAPI,Test 和 Task,项目结构如下:

构建模板

在 templates 文件夹内,创建一个名为 “.template.config” 的文件夹(可以通过命令 mkdir .template.config 创建, 然后进入该文件夹,再创建一个名为 “template.json” 的新文件, 文件夹结构应如下所示:

然后修改配置文件如下:

{
  "$schema": "http://json.schemastore.org/template",
  "author": "SpringLee",
  "classifications": [ "Template" ],
  "name": "Dy.Template",
  "identity": "Dy.Template", 
  "shortName": "dy-template",
  "tags": {
    "language": "C#" 
  },
  "sourceName": "Template" 
}

上面是一些基本的描述信息,需要注意的是 “sourceName” 属性,它相当于一个变量,我们通过这个属性,可以创建 Dy.Order.WebAPI, Dy.User.WebAPI 这样的项目,后边我会进行详细介绍。

打包模板

基础工作已经准备完成,我们还需要把项目打包,发布到Nuget.org 或者是公司的内部 Nuget Server,这样其他人才可以下载和安装这个模板。

你可能很熟悉在.NET 中对单个项目进行打包,比如类库,可以在VS中直接对项目使用右键打包,也可以使用dotnet pack命令,不一样的是,我们需要打包的是整个项目结构,而不是单个项目。

我们在 Dy.Template 文件夹下,创建 template-pack.csproj 文件

修改内容如下:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <PackageType>Template</PackageType>
    <PackageVersion>1.0.0</PackageVersion>
    <PackageId>Dy.Template</PackageId>
    <Title>Dy.Template</Title>
    <Authors>SpringLee</Authors>
    <Description>Dy.Template</Description>
    <PackageTags>dotnet-new;templates;Dy.Template</PackageTags>

    <TargetFramework>netstandard2.0</TargetFramework>

    <IncludeContentInPack>true</IncludeContentInPack>
    <IncludeBuildOutput>false</IncludeBuildOutput>
    <ContentTargetFolders>content</ContentTargetFolders>
    <NoWarn>$(NoWarn);NU5128</NoWarn>
  </PropertyGroup>

  <ItemGroup>
    <Content Include="templates\**\*" Exclude="templates\**\bin\**;templates\**\obj\**" />
    <Compile Remove="**\*" />
  </ItemGroup>

</Project>

我们指定了程序包的基础信息,版本ID, 描述信息,包含了 templates 文件夹下的所有文件,然后排除了 bin\ obj\ 文件夹的dll文件。

然后,运行 dotnet pack 命令进行打包, 你可以在 /bin/nuget/ 文件夹找到 生成的 nupkg 文件

在win10的应用商店(Microsoft Store)安装 Nuget Package Explore

我们把生成的 nupkg 文件 丢到 Nuget Package Explore 里查看,结构如下,包含了我们的 .config 配置文件,各个项目,解决方案。

最后,你可以把程序包推送到 nuget 服务器。

安装并使用

在终端中运行 dotnet new --install Dy.Template 命令安装,安装成功后,应该可以看到下边的输出,里边包含了我们的自定义模板

运行 dotnet new Dy.Template --name=Order,–name 指定了变量值,它会自动帮我们生成 Order 项目,这很棒!

欢迎扫码关注我们的公众号 【全球技术精选】,专注国外优秀博客的翻译和开源项目分享。

[Redis知识体系] 一文全面总结Redis知识体系 - pdai - 博客园

mikel阅读(950)

来源: [Redis知识体系] 一文全面总结Redis知识体系 – pdai – 博客园

作者:@pdai
本文为作者原创,转载请注明出处:https://www.cnblogs.com/pengdai/p/14509958.html


 


♥Redis教程 – Redis知识体系详解♥

本系列主要对Redis知识体系进行详解。@pdai

知识体系

知识体系

相关文章

首先,我们通过学习Redis的概念基础,了解它适用的场景。

  • Redis入门 – Redis概念和基础
    • Redis是一种支持key-value等多种数据结构的存储系统。可用于缓存,事件发布或订阅,高速队列等场景。支持网络,提供字符串,哈希,列表,队列,集合结构直接存取,基于内存,可持久化。

其次,这些适用场景都是基于Redis支持的数据类型的,所以我们需要学习它支持的数据类型;同时在redis优化中还需要对底层数据结构了解,所以也需要了解一些底层数据结构的设计和实现。

再者,需要学习Redis支持的核心功能,包括持久化,消息,事务,高可用;高可用方面包括,主从,哨兵等;高可拓展方面,比如 分片机制等。

  • Redis进阶 – 持久化:RDB和AOF机制详解
    • 为了防止数据丢失以及服务重启时能够恢复数据,Redis支持数据的持久化,主要分为两种方式,分别是RDB和AOF; 当然实际场景下还会使用这两种的混合模式。
  • Redis进阶 – 消息传递:发布订阅模式详解
    • Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
  • Redis进阶 – 事件:Redis事件机制详解
    • Redis 采用事件驱动机制来处理大量的网络IO。它并没有使用 libevent 或者 libev 这样的成熟开源方案,而是自己实现一个非常简洁的事件驱动库 ae_event。
  • Redis进阶 – 事务:Redis事务详解
    • Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
  • Redis进阶 – 高可用:主从复制详解
    • 我们知道要避免单点故障,即保证高可用,便需要冗余(副本)方式提供集群服务。而Redis 提供了主从库模式,以保证数据副本的一致,主从库之间采用的是读写分离的方式。本文主要阐述Redis的主从复制。
  • Redis进阶 – 高可用:哨兵机制(Redis Sentinel)详解
    • 在上文主从复制的基础上,如果注节点出现故障该怎么办呢? 在 Redis 主从集群中,哨兵机制是实现主从库自动切换的关键机制,它有效地解决了主从复制模式下故障转移的问题。
  • Redis进阶 – 高可拓展:分片技术(Redis Cluster)详解
    • 前面两篇文章,主从复制和哨兵机制保障了高可用,就读写分离而言虽然slave节点来扩展主从的读并发能力,但是写能力和存储能力是无法进行扩展的,就只能是master节点能够承载的上限。如果面对海量数据那么必然需要构建master(主节点分片)之间的集群,同时必然需要吸收高可用(主从复制和哨兵机制)能力,即每个master分片节点还需要有slave节点,这是分布式系统中典型的纵向扩展(集群的分片技术)的体现;所以在Redis 3.0版本中对应的设计就是Redis Cluster。

最后,就是具体的实践以及实践中遇到的问题和解决方法了:在不同版本中有不同特性,所以还需要了解版本;以及性能优化,大厂实践等。

学习资料

除此之外,我还推荐你看下 极客时间 《Redis核心技术与实战》(作者:蒋德钧)的相关内容,它是我看到的为数不多的含有实战经验比较多的专栏,部分文章中图片也来源于这个系列。

更多文章请参考 [Java 全栈知识体系](https://pdai.tech)

基于redis分布式锁实现“秒杀”_ 刘劭的博客-CSDN博客

mikel阅读(646)

来源: 基于redis分布式锁实现“秒杀”_ 刘劭的博客-CSDN博客

最近在项目中遇到了类似“秒杀”的业务场景,在本篇博客中,我将用一个非常简单的demo,阐述实现所谓“秒杀”的基本思路。

业务场景
所谓秒杀,从业务角度看,是短时间内多个用户“争抢”资源,这里的资源在大部分秒杀场景里是商品;将业务抽象,技术角度看,秒杀就是多个线程对资源进行操作,所以实现秒杀,就必须控制线程对资源的争抢,既要保证高效并发,也要保证操作的正确。

一些可能的实现
刚才提到过,实现秒杀的关键点是控制线程对资源的争抢,根据基本的线程知识,可以不加思索的想到下面的一些方法:
1、秒杀在技术层面的抽象应该就是一个方法,在这个方法里可能的操作是将商品库存-1,将商品加入用户的购物车等等,在不考虑缓存的情况下应该是要操作数据库的。那么最简单直接的实现就是在这个方法上加上synchronized关键字,通俗的讲就是锁住整个方法;
2、锁住整个方法这个策略简单方便,但是似乎有点粗暴。可以稍微优化一下,只锁住秒杀的代码块,比如写数据库的部分;
3、既然有并发问题,那我就让他“不并发”,将所有的线程用一个队列管理起来,使之变成串行操作,自然不会有并发问题。

上面所述的方法都是有效的,但是都不好。为什么?第一和第二种方法本质上是“加锁”,但是锁粒度依然比较高。什么意思?试想一下,如果两个线程同时执行秒杀方法,这两个线程操作的是不同的商品,从业务上讲应该是可以同时进行的,但是如果采用第一二种方法,这两个线程也会去争抢同一个锁,这其实是不必要的。第三种方法也没有解决上面说的问题。

那么如何将锁控制在更细的粒度上呢?可以考虑为每个商品设置一个互斥锁,以和商品ID相关的字符串为唯一标识,这样就可以做到只有争抢同一件商品的线程互斥,不会导致所有的线程互斥。分布式锁恰好可以帮助我们解决这个问题。

何为分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

我们来假设一个最简单的秒杀场景:数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀两件商品,500个线程秒杀第一个商品,500个线程秒杀第二个商品。我们来根据这个简单的业务场景来解释一下分布式锁。
通常具有秒杀场景的业务系统都比较复杂,承载的业务量非常巨大,并发量也很高。这样的系统往往采用分布式的架构来均衡负载。那么这1000个并发就会是从不同的地方过来,商品库存就是共享的资源,也是这1000个并发争抢的资源,这个时候我们需要将并发互斥管理起来。这就是分布式锁的应用。
而key-value存储系统,如redis,因为其一些特性,是实现分布式锁的重要工具。

具体的实现
先来看看一些redis的基本命令:
SETNX key value
如果key不存在,就设置key对应字符串value。在这种情况下,该命令和SET一样。当key已经存在时,就不做任何操作。SETNX是”SET if Not eXists”。
expire KEY seconds
设置key的过期时间。如果key已过期,将会被自动删除。
del KEY
删除key
由于笔者的实现只用到这三个命令,就只介绍这三个命令,更多的命令以及redis的特性和使用,可以参考redis官网。

需要考虑的问题
1、用什么操作redis?幸亏redis已经提供了jedis客户端用于java应用程序,直接调用jedis API即可。
2、怎么实现加锁?“锁”其实是一个抽象的概念,将这个抽象概念变为具体的东西,就是一个存储在redis里的key-value对,key是于商品ID相关的字符串来唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个商品已经上锁。
3、如何释放锁?既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。
4、阻塞还是非阻塞?笔者采用了阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。
5、如何处理异常情况?比如一个线程把一个商品上了锁,但是由于各种原因,没有完成操作(在上面的业务场景里就是没有将库存-1写入数据库),自然没有释放锁,这个情况笔者加入了锁超时机制,利用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁(可以认为超时释放锁是一个异步操作,由redis完成,应用程序只需要根据系统特点设置超时时间即可)。

talk is cheap,show me the code
在代码实现层面,注解有并发的方法和参数,通过动态代理获取注解的方法和参数,在代理中加锁,执行完被代理的方法后释放锁。

几个注解定义:
cachelock是方法级的注解,用于注解会产生并发问题的方法:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
String lockedPrefix() default “”;//redis 锁key的前缀
long timeOut() default 2000;//轮询锁的时间
int expireTime() default 1000;//key在redis里存在的时间,1000S
}
1
2
3
4
5
6
7
8
lockedObject是参数级的注解,用于注解商品ID等基本类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedObject {
//不需要值
}
1
2
3
4
5
6
LockedComplexObject也是参数级的注解,用于注解自定义类型的参数:

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockedComplexObject {
String field() default “”;//含有成员变量的复杂对象中需要加锁的成员变量,如一个商品对象的商品ID

}
1
2
3
4
5
6
7
CacheLockInterceptor实现InvocationHandler接口,在invoke方法中获取注解的方法和参数,在执行注解的方法前加锁,执行被注解的方法后释放锁:

public class CacheLockInterceptor implements InvocationHandler{
public static int ERROR_COUNT = 0;
private Object proxied;

public CacheLockInterceptor(Object proxied) {
this.proxied = proxied;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

CacheLock cacheLock = method.getAnnotation(CacheLock.class);
//没有cacheLock注解,pass
if(null == cacheLock){
System.out.println(“no cacheLock annotation”);
return method.invoke(proxied, args);
}
//获得方法中参数的注解
Annotation[][] annotations = method.getParameterAnnotations();
//根据获取到的参数注解和参数列表获得加锁的参数
Object lockedObject = getLockedObject(annotations,args);
String objectValue = lockedObject.toString();
//新建一个锁
RedisLock lock = new RedisLock(cacheLock.lockedPrefix(), objectValue);
//加锁
boolean result = lock.lock(cacheLock.timeOut(), cacheLock.expireTime());
if(!result){//取锁失败
ERROR_COUNT += 1;
throw new CacheLockException(“get lock fail”);

}
try{
//加锁成功,执行方法
return method.invoke(proxied, args);
}finally{
lock.unlock();//释放锁
}

}
/**
*
* @param annotations
* @param args
* @return
* @throws CacheLockException
*/
private Object getLockedObject(Annotation[][] annotations,Object[] args) throws CacheLockException{
if(null == args || args.length == 0){
throw new CacheLockException(“方法参数为空,没有被锁定的对象”);
}

if(null == annotations || annotations.length == 0){
throw new CacheLockException(“没有被注解的参数”);
}
//不支持多个参数加锁,只支持第一个注解为lockedObject或者lockedComplexObject的参数
int index = -1;//标记参数的位置指针
for(int i = 0;i < annotations.length;i++){
for(int j = 0;j < annotations[i].length;j++){
if(annotations[i][j] instanceof LockedComplexObject){//注解为LockedComplexObject
index = i;
try {
return args[i].getClass().getField(((LockedComplexObject)annotations[i][j]).field());
} catch (NoSuchFieldException | SecurityException e) {
throw new CacheLockException(“注解对象中没有该属性” + ((LockedComplexObject)annotations[i][j]).field());
}
}

if(annotations[i][j] instanceof LockedObject){
index = i;
break;
}
}
//找到第一个后直接break,不支持多参数加锁
if(index != -1){
break;
}
}

if(index == -1){
throw new CacheLockException(“请指定被锁定参数”);
}

return args[index];
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
最关键的RedisLock类中的lock方法和unlock方法:

/**
* 加锁
* 使用方式为:
* lock();
* try{
* executeMethod();
* }finally{
* unlock();
* }
* @param timeout timeout的时间范围内轮询锁
* @param expire 设置锁超时时间
* @return 成功 or 失败
*/
public boolean lock(long timeout,int expire){
long nanoTime = System.nanoTime();
timeout *= MILLI_NANO_TIME;
try {
//在timeout的时间范围内不断轮询锁
while (System.nanoTime() – nanoTime < timeout) {
//锁不存在的话,设置锁并设置锁过期时间,即加锁
if (this.redisClient.setnx(this.key, LOCKED) == 1) {
this.redisClient.expire(key, expire);//设置锁过期时间是为了在没有释放
//锁的情况下锁过期后消失,不会造成永久阻塞
this.lock = true;
return this.lock;
}
System.out.println(“出现锁等待”);
//短暂休眠,避免可能的活锁
Thread.sleep(3, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException(“locking error”,e);
}
return false;
}

public void unlock() {
try {
if(this.lock){
redisClient.delKey(key);//直接删除
}
} catch (Throwable e) {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
上述的代码是框架性的代码,现在来讲解如何使用上面的简单框架来写一个秒杀函数。
先定义一个接口,接口里定义了一个秒杀方法:

public interface SeckillInterface {
/**
*现在暂时只支持在接口方法上注解
*/
//cacheLock注解可能产生并发的方法
@CacheLock(lockedPrefix=”TEST_PREFIX”)
public void secKill(String userID,@LockedObject Long commidityID);//最简单的秒杀方法,参数是用户ID和商品ID。可能有多个线程争抢一个商品,所以商品ID加上LockedObject注解
}
1
2
3
4
5
6
7
8
上述SeckillInterface接口的实现类,即秒杀的具体实现:

public class SecKillImpl implements SeckillInterface{
static Map<Long, Long> inventory ;
static{
inventory = new HashMap<>();
inventory.put(10000001L, 10000l);
inventory.put(10000002L, 10000l);
}

@Override
public void secKill(String arg1, Long arg2) {
//最简单的秒杀,这里仅作为demo示例
reduceInventory(arg2);
}
//模拟秒杀操作,姑且认为一个秒杀就是将库存减一,实际情景要复杂的多
public Long reduceInventory(Long commodityId){
inventory.put(commodityId,inventory.get(commodityId) – 1);
return inventory.get(commodityId);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
模拟秒杀场景,1000个线程来争抢两个商品:

@Test
public void testSecKill(){
int threadCount = 1000;
int splitPoint = 500;
CountDownLatch endCount = new CountDownLatch(threadCount);
CountDownLatch beginCount = new CountDownLatch(1);
SecKillImpl testClass = new SecKillImpl();

Thread[] threads = new Thread[threadCount];
//起500个线程,秒杀第一个商品
for(int i= 0;i < splitPoint;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill(“test”, commidityId1);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();

}
//再起500个线程,秒杀第二件商品
for(int i= splitPoint;i < threadCount;i++){
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
//用动态代理的方式调用secKill方法
SeckillInterface proxy = (SeckillInterface) Proxy.newProxyInstance(SeckillInterface.class.getClassLoader(),
new Class[]{SeckillInterface.class}, new CacheLockInterceptor(testClass));
proxy.secKill(“test”, commidityId2);
//testClass.testFunc(“test”, 10000001L);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();

}

long startTime = System.currentTimeMillis();
//主线程释放开始信号量,并等待结束信号量,这样做保证1000个线程做到完全同时执行,保证测试的正确性
beginCount.countDown();

try {
//主线程等待结束信号量
endCount.await();
//观察秒杀结果是否正确
System.out.println(SecKillImpl.inventory.get(commidityId1));
System.out.println(SecKillImpl.inventory.get(commidityId2));
System.out.println(“error count” + CacheLockInterceptor.ERROR_COUNT);
System.out.println(“total cost ” + (System.currentTimeMillis() – startTime));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
在正确的预想下,应该每个商品的库存都减少了500,在多次试验后,实际情况符合预想。如果不采用锁机制,会出现库存减少499,498的情况。
这里采用了动态代理的方法,利用注解和反射机制得到分布式锁ID,进行加锁和释放锁操作。当然也可以直接在方法进行这些操作,采用动态代理也是为了能够将锁操作代码集中在代理中,便于维护。
通常秒杀场景发生在web项目中,可以考虑利用spring的AOP特性将锁操作代码置于切面中,当然AOP本质上也是动态代理。

小结
这篇文章从业务场景出发,从抽象到实现阐述了如何利用redis实现分布式锁,完成简单的秒杀功能,也记录了笔者思考的过程,希望能给阅读到本篇文章的人一些启发。

源码仓库:https://github.com/lsfire/redisframework
————————————————
版权声明:本文为CSDN博主「lsfire」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010359884/article/details/50310387

RabbitMQ系列(二)--基础组件 - Diamond-Shine - 博客园

mikel阅读(854)

来源: RabbitMQ系列(二)–基础组件 – Diamond-Shine – 博客园

声明:对于RabbitMQ的学习基于某课网相关视频和《RabbitMQ实战指南》一书,后续关于RabbitMQ的博客都是基于二者

一、什么是RabbitMQ

RabbitMQ是开源代理和队列服务器,通过普通协议在不同的应用之间共享数据,使用Erlang编写(Erlang进行数据交换的性能很好,

和原生socket一样好的延迟响应效果),基于AMQP协议

二、AMQP

AMQP高级消息队列协议:具有现代特征的二进制协议,和JMS有点像,模型如下:

AMQP核心概念

1、Server:Broker,接受client连接,实现AMQP实体服务

2、Connection:应用程序和Broker的网络连接

3、Channel:网络信道,读写都是在Channel中进行(NIO的概念),包括对MQ进行的一些操作(例如clear queue等)都是在Channel中进行,

客户端可建立多个Channel,每个Channel代表一个会话任务

4、Message:由properties(有消息优先级、延迟等特性)和Body(消息内容)组成

5、Virtual host:用于消息隔离(类似Redis 16个db这种概念),最上层的消息路由,一个包含若干Exchange和Queue,同一个里面Exchange

和Queue的名称不能存在相同的。

6、Exchange:Routing and Filter

7、Binding:把Exchange和Queue进行Binding

8、Routing key:路由规则

9、Queue:物理上存储消息

三、哪些大厂在使用RabbitMQ,为什么?

滴滴、美团、头条、去哪儿。。。。都再使用RabbitMQ

原因:

1、开源、性能优秀、能保证稳定性,提供可靠性消息投递模式confirm、返回模式return,和springAMQP完美整合、API丰富

2、集群模式丰富,表达式配置,HA模式,镜像队列模式

3、保证数据不丢失的前提下做到高可靠性、高可用性

四、RabbitMQ基础组件

1、Exchange:

如果不指定Exchange的话,RabbitMQ默认使用,(AMQP default)注意一下,需要将routing key等于queue name相同

2、name、type:

fanout(效率最好,不需要routing key,routing key如何设置都可以)、direct、topic(#一个或多个,*一个)、headers

3、Auto Delete:

当最后一个Binding到Exchange的Queue删除之后,自动删除该Exchange

4、Binding:

Exchange和Queue之间的连接关系,Exchange之间也可以Binding

5、Queue:

实际物理上存储消息的

6、Durability:

是否持久化,Durable:是,即使服务器重启,这个队列也不会消失,Transient:否

7、Exclusive:

这个queue只能由一个exchange监听restricted to this connection,使用场景:顺序消费

8、Message:

由properties(有消息优先级、延迟等特性)和Body(Payload消息内容)组成,还有content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id等属性

五、安装

本人很早之前在centos安装过了,具体步骤都忘了,大家可以百度一下,有一大堆呢,比较推荐Docker安装,很方便

需要注意:注意rabbitMQ和erlang版本的对应,而且服务器的host name不要瞎改,当初RabbitMQ一直启动报错,就是这个原因,搞了两天,真的

蛋疼,先要确定erlang安装成功,然后按照RabbitMQ

浏览器可视化工具:rabbitmq-plugins enable rabbitmq_management  用户名、密码:guest guest

 

RabbitMQ系列(三)--Java API - Diamond-Shine - 博客园

mikel阅读(654)

来源: RabbitMQ系列(三)–Java API – Diamond-Shine – 博客园

基于java使用RabbitMQ

框架:SpringBoot1.5.14.RELEASE

maven依赖:

复制代码
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码

本文只是操作原生RabbitMQ,并没有和SpringBoot进行整合,后面介绍整合,基于注解使用

一、quick start

1.1、Consumer

复制代码
public static void main(String[] args) throws Exception{
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);
        //2 通过连接工厂创建连接
        Connection connection = factory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        //4 声明(创建)一个队列
        channel.queueDeclare("test002", true, false, false, null);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6 设置Channel
        channel.basicConsume("test002", true, queueingConsumer);

        while(true){
            //7 获取消息
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            log.info(msg);
        }
    }
}
复制代码

参数解释:

durable:是否持久化,Durable:是,即使服务器重启,这个队列也不会消失,Transient:否

exclusive:这个queue只能由一个exchange监听restricted to this connection,使用场景:顺序消费

autoDelete:当最后一个Binding到Exchange的Queue删除之后,自动删除该Exchange

arguments:参数

autoACK:是否自动签收,对应着手动签收

1.2、Producer

复制代码
public class Producer {

    public static void main(String[] args) throws Exception{
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);
        //2 通过连接工厂创建连接
        Connection connection = factory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4 通过Channel发送数据
        for(int i=0; i < 5; i++){
            String msg = "Hello RabbitMQ!";
            //1 exchange   2 routingKey
            channel.basicPublish("", "test002", null, msg.getBytes());
        }
        //5 关闭相关的连接
        channel.close();
        factory.clone();
    }
}
复制代码

参数解释:

exchange name:

routingKey:路由规则

BasicProperties:

body:message中的body

结果:

17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!

我们使用RabbitMQ,需要首先在可视化界面确定queue,exchange是否创建,对应关系是否正常,这是一个大前提

1.3、自定义消费者

之前接收message,通过while(true),感觉太low了,RabbitMQ支持实现自定义消费者,只需要集成DefaultConsumer,重写handlerDelivery,

构造器

复制代码
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}
复制代码

而consumer只需要修改

//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);            //去掉这一步
//6 设置Channel
channel.basicConsume("test002", true, new MyConsumer(channel));

结果:

-----------consume message----------
consumerTag: amq.ctag-YK7CnvWxTpm6hmuyUyqSkQ
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test002)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ By MyConsumer!

二、Exchange

Exchange有四种方式Fanout、Direct、Topic、Headers,而我们上面的例子,并没有定义Exchange,RabbitMQ默认使用AMQP default,

要求routing key和queue name相同

本文只是介绍前三种,Headers几乎用不到

2.1、Fanout

fanout效率是最好的,不需要routing key,你可以随便设置都无所谓,只要consumer和producer的Exchange name相同

producer

复制代码
public class Producer {

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "exchange_fanout";
        channel.basicPublish(exchangeName, "", null, "send message by fanout".getBytes());
    }
}
复制代码

consumer

复制代码
public class Consumer {

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);

        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "exchange_fanout";
        String exchangeType = "fanout";
        String queueName = "queue_fanout";
        String routingKey = "";    //不设置路由键

        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}
复制代码

结果:

-----------consume message----------
consumerTag: amq.ctag-rXw7SbaR5aWVMQxZY6SfEA
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=exchange_fanout, routingKey=)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: send message by fanout

我们声明queue和exchange之后,要先观察exchange和queue对应关系

点击exchange name,进去查看是否binding成功,如果成功如下图

注意点:

1、fanout模式下不是直接操作Queue,而是把消息发送给Exchange,由Exchange把消息分发给与之绑定的Queue,也就是广播模式

2、Queue必须和Exchange进行绑定

3、每个Consumer的Queue name不能相同,个人测试下,相同的Queue name,只能有一个Consumer收到消息

2.2、Direct

完全把代码贴出来太浪费篇幅了,而且毫无意思,后面只说一下区别,顺便说一下,对于queue和exchange的declare无论放到producer还是

Consumer都可以的,个人习惯Consumer,而且二者启动顺序没有要求,因为RabbitMQ的消息具有堆积功能

Producer:

 String exchangeName = "exchange_direct";
 String routingKey = "key.direct";

 channel.basicPublish(exchangeName, routingKey, null, "send message by direct".getBytes());

Consumer:

复制代码
String exchangeName = "exchange_direct";
String exchangeType = "direct";
String queueName = "queue_direct";
String routingKey = "key.direct";

channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

channel.basicConsume(queueName, true, new MyConsumer(channel));
复制代码

结果:

-----------consume message----------
consumerTag: amq.ctag-6UeNiGs1K-gMWbwvVn8F5A
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=exchange_direct, routingKey=key.direct)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: send message by direct

说明:

Direct要求routingKey完全一致

2.3、 topic

Producer:

复制代码
String exchangeName = "exchange_topic";
String routingKey1 = "user.save";
String routingKey2 = "user.update";
String routingKey3 = "user.delete.abc";
//5 发送

String msg = "Send Message By topic";
channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
复制代码

Consumer:

复制代码
String exchangeName = "exchange_topic";
String exchangeType = "topic";
String queueName = "queue_topic";
String routingKey = "user.*";

channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
复制代码

说明:

Topic支持通配符匹配,#匹配一个或多个单词,*匹配一个单词

RabbitMQ系列(四)--消息如何保证可靠性传输以及幂等性 - Diamond-Shine - 博客园

mikel阅读(797)

来源: RabbitMQ系列(四)–消息如何保证可靠性传输以及幂等性 – Diamond-Shine – 博客园

一、消息如何保证可靠性传输

1.1、可能出现消息丢失的情况

1、Producer在把Message发送Broker的过程中,因为网络问题等发生丢失,或者Message到了Broker,但是出了问题,没有保存下来

针对这个问题,Producer可以开启MQ的事务,如果这个过程出现异常,进行回滚,但是有个很大的问题,你提交一个事务就会阻塞在那,

非常影响性能,生产环境肯定不会开启事务,一般都是使用confirm机制

2、Broker接收到Message暂存到内存,Consumer还没来得及消费,Broker挂掉了

可以通过持久化设置去解决:

1).创建Queue的时候设置持久化,保证Broker持久化Queue的元数据,但是不会持久化Queue里面的消息

2).将Message的deliveryMode设置为2,可以将消息持久化到磁盘,这样只有Message支持化到磁盘之后才会发送通知Producer ack

这两步过后,即使Broker挂了,Producer肯定收不到ack的,就可以进行重发

3、Consumer有消费到Message,但是内部出现问题,Message还没处理,Broker以为Consumer处理完了,只会把后续的消息发送

这时候,就要关闭autoack,消息处理过后,进行手动ack

1.2、一般通过生产端保证可靠性投递

1、保证消息的成功发出

2、保证MQ节点的成功接收

3、发送端收到MQ节点(Broker)的确认应答

4、完善的消息补偿机制

1.3、解决方案

1、消息落库,对消息状态进行变更,对于高并发环境下数据库压力很大,因为需要写多次数据库

 

整体流程:

1、业务数据和消息都进行落库

2、生产端发送message给Broker

3、Broker给Confirm响应返回生产端

4、接收到confirm,对message状态更改

5、分布式定时任务获取消息的状态

6、如果消息不能成功投递,重新进行发送,记录重发次数

7、当重发3次之后,将状态修改,只能人工进行干预

 

2、消息的延迟投递,做二次确认,回调检查。适合高并发环境,减少写库的次数

 

整体流程:

1、上游服务首先将业务代码入库,发送message给Broker

2、发送第二个延迟确认消息

3、下游服务监听消息进行消费

4、发送确认消息,这里不是confirm机制,而是一条新的消息

5、通过回调服务监听这个confirm消息,然后把消息进行入库

6、回调服务检查到延迟确认消息,会在数据库查询是否有这条消息

7、如果没有查到这条消息,回调服务通过RPC给一个重新发送命令到上游系统

 

相比第一种方案,这里减少了一次message入库,confirm机制是消息可靠性投递的一个核心,在下篇文章会讲到

二、如何保证消息的幂等性

首先,无论是RabbitMQ、RocketMQ还是kafka,都有可能出现消息的重复发送,这个是MQ无法保障的,而幂等性是开发或者运维人员需要保证的

所谓消息的幂等性是指即使收到多次消息,也不会重复消费,这点很重要,例如用户付钱,点的太快了,付了多次,但是你也只能扣一次钱,

不然要骂人了

2.1、RabbitMQ可能导致出现非幂等性的情况

1、可靠性消息投递机制:consumer回复confirm出现网络闪断,producer没有收到ack,定时任务轮询可能就会重新发送消息,这样consumer就

会收到两条消息

2、MQ Broker与消费端传输消息的过程出现网络抖动

3、消费端故障或异常

2.2、kafka可能出现非幂等性的情况

在Consumer端offset没有提交的时候,Consumer重启了,这时候就会出现重复消费的情况

2.3、解决方案

1、唯一ID+指纹码

整体实现相对简单,需要进行数据库写入,利用数据库主键去重,使用ID进行分库分表算法路由,从单库的幂等性到多库的幂等性

1).这里唯一ID一般就是业务表的主键,比如商品ID

2).指纹码:每次操作都要生成指纹码,可以用时间戳+业务编号+…组成,目的是保证每次操作都是正常的

整体流程:

1、需要一个统一ID生成服务,为了保证可靠性,上游服务也要有个本地ID生成服务,然后发送消息给Broker

2、需要ID规则路由组件去监听消息,先入库,如果入库成功,证明没有重复,然后发给下游,如果发现库里面有了这条消息,就不发给下游

 

2、利用Redis的原子性实现

Redis的实现性能比较好,而且Redis一般使用集群,不用担心某台机器挂掉了,影响服务。

存在的问题:

是否要进行数据落库,如果落库的话,怎么保证缓存和storage的一致性、事务,如果不落库,如何设置定时同步策略